koibumi_node_sync/manager.rs
1use std::{path::PathBuf, sync::Arc, thread};
2
3use crossbeam_channel::{bounded, Receiver, Sender};
4use log::error;
5
6use koibumi_core::{
7 address::Address,
8 identity::Private as PrivateIdentity,
9 message::{self, UserAgent},
10 object,
11};
12
13use crate::{
14 config::Config,
15 connection_loop::{ConnectionManager, Context},
16 net::SocketAddrNode,
17 node_manager::Rating,
18 user_manager::User,
19};
20
21/// The events which occur in a Bitmessage node.
22#[derive(Clone, Debug)]
23pub enum Event {
24 /// Indicates the stats of the counts of connections have been changed.
25 ConnectionCounts {
26 /// The count of incoming initiated connections.
27 incoming_initiated: usize,
28 /// The count of incoming connected connections.
29 incoming_connected: usize,
30 /// The count of incoming established connections.
31 incoming_established: usize,
32 /// The count of outgoing initiated connections.
33 outgoing_initiated: usize,
34 /// The count of outgoing connected connections.
35 outgoing_connected: usize,
36 /// The count of outgoing established connections.
37 outgoing_established: usize,
38 },
39
40 /// Indicates the count of known node addresses has been changed.
41 AddrCount(usize),
42
43 /// Indicates a connection to a node has newly been established.
44 /// The socket address, the user agent
45 /// and the rating of the connectivity of the node are returned.
46 Established {
47 /// The socket address.
48 addr: SocketAddrNode,
49 /// The user agent.
50 user_agent: UserAgent,
51 /// The rating.
52 rating: Rating,
53 },
54
55 /// Indicates an established connection to a node has been disconnected.
56 /// The socket address of the node is returned.
57 Disconnected {
58 /// The socket address.
59 addr: SocketAddrNode,
60 },
61
62 /// Indicates the stats of the counts of objects are changed.
63 Objects {
64 /// The count of missing objects.
65 missing: usize,
66 /// The count of loaded objects.
67 loaded: usize,
68 /// The count of uploaded objects.
69 uploaded: usize,
70 },
71
72 /// Indicates the node has been stopped.
73 Stopped,
74
75 /// Indicates that an user received a msg message.
76 Msg {
77 /// The user ID.
78 user_id: Vec<u8>,
79 /// The Bitmessage address of the receiver of the msg message.
80 address: Address,
81 /// The message object.
82 object: message::Object,
83 },
84
85 /// Indicates that an user received a broadcast message.
86 Broadcast {
87 /// The user ID.
88 user_id: Vec<u8>,
89 /// The Bitmessage address of the sender of the broadcast message.
90 address: Address,
91 /// The message object.
92 object: message::Object,
93 },
94}
95
96/// The commands which accepted by a node.
97pub enum Command {
98 /// Initializes the node with specified configuration set
99 /// and starts the various background tasks
100 /// with specified database connection pool.
101 /// No outgoing connections are initiated yet.
102 /// Incoming connections can be accepted.
103 ///
104 /// `Response::Started` with event receiver will be returned.
105 Start(Box<Config>, PathBuf, Vec<User>),
106
107 /// Disconnects all connections and stop the node.
108 Stop,
109
110 /// Abort the tasks which remain after stop command was issued.
111 Abort,
112
113 /// Performs PoW and sends the object.
114 Send {
115 /// The header of the object to send.
116 header: object::Header,
117 /// The payload of the object to send.
118 payload: Vec<u8>,
119 },
120
121 /// Adds an identity to the user specified by the ID.
122 AddIdentity {
123 /// The user ID.
124 id: Vec<u8>,
125 /// The identity.
126 identity: PrivateIdentity,
127 },
128
129 /// Makes the user subscribe to the address.
130 Subscribe {
131 /// The user ID.
132 id: Vec<u8>,
133 /// The address.
134 address: Address,
135 },
136}
137
138/// The responses to the commands.
139pub enum Response {
140 /// Indicates the node has been started.
141 /// The receiver for events is returned.
142 Started(Receiver<Event>),
143}
144
145/// Runs an event loop that manage a Bitmessage node.
146pub fn run(receiver: Receiver<Command>, sender: Sender<Response>) {
147 let receiver = receiver;
148 let sender = sender;
149
150 let mut conn_mngr: Option<ConnectionManager> = None;
151 let mut ctx: Option<Arc<Context>> = None;
152 let mut _handle: Option<thread::JoinHandle<()>> = None;
153 while let Ok(event) = receiver.recv() {
154 match event {
155 Command::Start(config, db_path, users) => {
156 if conn_mngr.is_none() {
157 let (bm_event_sender, bm_event_receiver) = bounded(config.channel_buffer());
158 let cm = ConnectionManager::new(
159 config.as_ref().clone(),
160 db_path,
161 users,
162 bm_event_sender,
163 );
164 sender.send(Response::Started(bm_event_receiver)).unwrap();
165 ctx = Some(cm.ctx());
166 conn_mngr = Some(cm);
167 }
168 }
169 Command::Stop => {
170 if let Some(cm) = conn_mngr {
171 _handle = Some(cm.stop());
172 conn_mngr = None;
173 }
174 }
175 Command::Abort => {
176 if let Some(s) = ctx {
177 s.abort();
178 ctx = None;
179 }
180 }
181 Command::Send { header, payload } => {
182 if let Some(cm) = &conn_mngr {
183 if let Err(err) = cm.send(header, payload) {
184 error!("{}", err);
185 }
186 }
187 }
188 Command::AddIdentity { id, identity } => {
189 if let Some(cm) = &conn_mngr {
190 if let Err(err) = cm.add_identity(id, identity) {
191 error!("{}", err);
192 }
193 }
194 }
195 Command::Subscribe { id, address } => {
196 if let Some(cm) = &conn_mngr {
197 if let Err(err) = cm.subscribe(id, address) {
198 error!("{}", err);
199 }
200 }
201 }
202 }
203 }
204}