koibumi_node_sync/
manager.rs

1use std::{path::PathBuf, sync::Arc, thread};
2
3use crossbeam_channel::{bounded, Receiver, Sender};
4use log::error;
5
6use koibumi_core::{
7    address::Address,
8    identity::Private as PrivateIdentity,
9    message::{self, UserAgent},
10    object,
11};
12
13use crate::{
14    config::Config,
15    connection_loop::{ConnectionManager, Context},
16    net::SocketAddrNode,
17    node_manager::Rating,
18    user_manager::User,
19};
20
21/// The events which occur in a Bitmessage node.
22#[derive(Clone, Debug)]
23pub enum Event {
24    /// Indicates the stats of the counts of connections have been changed.
25    ConnectionCounts {
26        /// The count of incoming initiated connections.
27        incoming_initiated: usize,
28        /// The count of incoming connected connections.
29        incoming_connected: usize,
30        /// The count of incoming established connections.
31        incoming_established: usize,
32        /// The count of outgoing initiated connections.
33        outgoing_initiated: usize,
34        /// The count of outgoing connected connections.
35        outgoing_connected: usize,
36        /// The count of outgoing established connections.
37        outgoing_established: usize,
38    },
39
40    /// Indicates the count of known node addresses has been changed.
41    AddrCount(usize),
42
43    /// Indicates a connection to a node has newly been established.
44    /// The socket address, the user agent
45    /// and the rating of the connectivity of the node are returned.
46    Established {
47        /// The socket address.
48        addr: SocketAddrNode,
49        /// The user agent.
50        user_agent: UserAgent,
51        /// The rating.
52        rating: Rating,
53    },
54
55    /// Indicates an established connection to a node has been disconnected.
56    /// The socket address of the node is returned.
57    Disconnected {
58        /// The socket address.
59        addr: SocketAddrNode,
60    },
61
62    /// Indicates the stats of the counts of objects are changed.
63    Objects {
64        /// The count of missing objects.
65        missing: usize,
66        /// The count of loaded objects.
67        loaded: usize,
68        /// The count of uploaded objects.
69        uploaded: usize,
70    },
71
72    /// Indicates the node has been stopped.
73    Stopped,
74
75    /// Indicates that an user received a msg message.
76    Msg {
77        /// The user ID.
78        user_id: Vec<u8>,
79        /// The Bitmessage address of the receiver of the msg message.
80        address: Address,
81        /// The message object.
82        object: message::Object,
83    },
84
85    /// Indicates that an user received a broadcast message.
86    Broadcast {
87        /// The user ID.
88        user_id: Vec<u8>,
89        /// The Bitmessage address of the sender of the broadcast message.
90        address: Address,
91        /// The message object.
92        object: message::Object,
93    },
94}
95
96/// The commands which accepted by a node.
97pub enum Command {
98    /// Initializes the node with specified configuration set
99    /// and starts the various background tasks
100    /// with specified database connection pool.
101    /// No outgoing connections are initiated yet.
102    /// Incoming connections can be accepted.
103    ///
104    /// `Response::Started` with event receiver will be returned.
105    Start(Box<Config>, PathBuf, Vec<User>),
106
107    /// Disconnects all connections and stop the node.
108    Stop,
109
110    /// Abort the tasks which remain after stop command was issued.
111    Abort,
112
113    /// Performs PoW and sends the object.
114    Send {
115        /// The header of the object to send.
116        header: object::Header,
117        /// The payload of the object to send.
118        payload: Vec<u8>,
119    },
120
121    /// Adds an identity to the user specified by the ID.
122    AddIdentity {
123        /// The user ID.
124        id: Vec<u8>,
125        /// The identity.
126        identity: PrivateIdentity,
127    },
128
129    /// Makes the user subscribe to the address.
130    Subscribe {
131        /// The user ID.
132        id: Vec<u8>,
133        /// The address.
134        address: Address,
135    },
136}
137
138/// The responses to the commands.
139pub enum Response {
140    /// Indicates the node has been started.
141    /// The receiver for events is returned.
142    Started(Receiver<Event>),
143}
144
145/// Runs an event loop that manage a Bitmessage node.
146pub fn run(receiver: Receiver<Command>, sender: Sender<Response>) {
147    let receiver = receiver;
148    let sender = sender;
149
150    let mut conn_mngr: Option<ConnectionManager> = None;
151    let mut ctx: Option<Arc<Context>> = None;
152    let mut _handle: Option<thread::JoinHandle<()>> = None;
153    while let Ok(event) = receiver.recv() {
154        match event {
155            Command::Start(config, db_path, users) => {
156                if conn_mngr.is_none() {
157                    let (bm_event_sender, bm_event_receiver) = bounded(config.channel_buffer());
158                    let cm = ConnectionManager::new(
159                        config.as_ref().clone(),
160                        db_path,
161                        users,
162                        bm_event_sender,
163                    );
164                    sender.send(Response::Started(bm_event_receiver)).unwrap();
165                    ctx = Some(cm.ctx());
166                    conn_mngr = Some(cm);
167                }
168            }
169            Command::Stop => {
170                if let Some(cm) = conn_mngr {
171                    _handle = Some(cm.stop());
172                    conn_mngr = None;
173                }
174            }
175            Command::Abort => {
176                if let Some(s) = ctx {
177                    s.abort();
178                    ctx = None;
179                }
180            }
181            Command::Send { header, payload } => {
182                if let Some(cm) = &conn_mngr {
183                    if let Err(err) = cm.send(header, payload) {
184                        error!("{}", err);
185                    }
186                }
187            }
188            Command::AddIdentity { id, identity } => {
189                if let Some(cm) = &conn_mngr {
190                    if let Err(err) = cm.add_identity(id, identity) {
191                        error!("{}", err);
192                    }
193                }
194            }
195            Command::Subscribe { id, address } => {
196                if let Some(cm) = &conn_mngr {
197                    if let Err(err) = cm.subscribe(id, address) {
198                        error!("{}", err);
199                    }
200                }
201            }
202        }
203    }
204}