koibumi_node/manager.rs
1use async_std::{sync::Arc, task};
2use futures::{
3 channel::mpsc::{self, Receiver, Sender},
4 sink::SinkExt,
5 stream::StreamExt,
6};
7use log::error;
8
9use koibumi_core::{
10 address::Address,
11 identity::Private as PrivateIdentity,
12 message::{self, UserAgent},
13 object,
14};
15
16use crate::{
17 config::Config,
18 connection_loop::{ConnectionManager, Context},
19 db,
20 net::SocketAddrNode,
21 node_manager::Rating,
22 user_manager::User,
23};
24
25/// The events which occur in a Bitmessage node.
26#[derive(Clone, Debug)]
27pub enum Event {
28 /// Indicates the stats of the counts of connections have been changed.
29 ConnectionCounts {
30 /// The count of incoming initiated connections.
31 incoming_initiated: usize,
32 /// The count of incoming connected connections.
33 incoming_connected: usize,
34 /// The count of incoming established connections.
35 incoming_established: usize,
36 /// The count of outgoing initiated connections.
37 outgoing_initiated: usize,
38 /// The count of outgoing connected connections.
39 outgoing_connected: usize,
40 /// The count of outgoing established connections.
41 outgoing_established: usize,
42 },
43
44 /// Indicates the count of known node addresses has been changed.
45 AddrCount(usize),
46
47 /// Indicates a connection to a node has newly been established.
48 /// The socket address, the user agent
49 /// and the rating of the connectivity of the node are returned.
50 Established {
51 /// The socket address.
52 addr: SocketAddrNode,
53 /// The user agent.
54 user_agent: UserAgent,
55 /// The rating.
56 rating: Rating,
57 },
58
59 /// Indicates an established connection to a node has been disconnected.
60 /// The socket address of the node is returned.
61 Disconnected {
62 /// The socket address.
63 addr: SocketAddrNode,
64 },
65
66 /// Indicates the stats of the counts of objects are changed.
67 Objects {
68 /// The count of missing objects.
69 missing: usize,
70 /// The count of loaded objects.
71 loaded: usize,
72 /// The count of uploaded objects.
73 uploaded: usize,
74 },
75
76 /// Indicates the node has been stopped.
77 Stopped,
78
79 /// Indicates that an user received a msg message.
80 Msg {
81 /// The user ID.
82 user_id: Vec<u8>,
83 /// The Bitmessage address of the receiver of the msg message.
84 address: Address,
85 /// The message object.
86 object: message::Object,
87 },
88
89 /// Indicates that an user received a broadcast message.
90 Broadcast {
91 /// The user ID.
92 user_id: Vec<u8>,
93 /// The Bitmessage address of the sender of the broadcast message.
94 address: Address,
95 /// The message object.
96 object: message::Object,
97 },
98}
99
100/// The commands which accepted by a node.
101pub enum Command {
102 /// Initializes the node with specified configuration set
103 /// and starts the various background tasks
104 /// with specified database connection pool.
105 /// No outgoing connections are initiated yet.
106 /// Incoming connections can be accepted.
107 ///
108 /// `Response::Started` with event receiver will be returned.
109 Start(Box<Config>, db::SqlitePool, Vec<User>),
110
111 /// Disconnects all connections and stop the node.
112 Stop,
113
114 /// Abort the tasks which remain after stop command was issued.
115 Abort,
116
117 /// Performs PoW and sends the object.
118 Send {
119 /// The header of the object to send.
120 header: object::Header,
121 /// The payload of the object to send.
122 payload: Vec<u8>,
123 },
124
125 /// Adds an identity to the user specified by the ID.
126 AddIdentity {
127 /// The user ID.
128 id: Vec<u8>,
129 /// The identity.
130 identity: PrivateIdentity,
131 },
132
133 /// Makes the user subscribe to the address.
134 Subscribe {
135 /// The user ID.
136 id: Vec<u8>,
137 /// The address.
138 address: Address,
139 },
140}
141
142/// The responses to the commands.
143pub enum Response {
144 /// Indicates the node has been started.
145 /// The receiver for events is returned.
146 Started(Receiver<Event>),
147}
148
149/// Runs an event loop that manage a Bitmessage node.
150pub async fn run(receiver: Receiver<Command>, sender: Sender<Response>) {
151 let mut receiver = receiver;
152 let mut sender = sender;
153
154 let mut conn_mngr: Option<ConnectionManager> = None;
155 let mut ctx: Option<Arc<Context>> = None;
156 let mut _handle: Option<task::JoinHandle<()>> = None;
157 while let Some(event) = receiver.next().await {
158 match event {
159 Command::Start(config, pool, users) => {
160 if conn_mngr.is_none() {
161 let (bm_event_sender, bm_event_receiver) =
162 mpsc::channel(config.channel_buffer());
163 let cm = ConnectionManager::new(
164 config.as_ref().clone(),
165 pool,
166 users,
167 bm_event_sender,
168 );
169 sender
170 .send(Response::Started(bm_event_receiver))
171 .await
172 .unwrap();
173 ctx = Some(cm.ctx());
174 conn_mngr = Some(cm);
175 }
176 }
177 Command::Stop => {
178 if let Some(cm) = conn_mngr {
179 _handle = Some(cm.stop());
180 conn_mngr = None;
181 }
182 }
183 Command::Abort => {
184 if let Some(s) = ctx {
185 s.abort();
186 ctx = None;
187 }
188 }
189 Command::Send { header, payload } => {
190 if let Some(cm) = &conn_mngr {
191 if let Err(err) = cm.send(header, payload).await {
192 error!("{}", err);
193 }
194 }
195 }
196 Command::AddIdentity { id, identity } => {
197 if let Some(cm) = &conn_mngr {
198 if let Err(err) = cm.add_identity(id, identity).await {
199 error!("{}", err);
200 }
201 }
202 }
203 Command::Subscribe { id, address } => {
204 if let Some(cm) = &conn_mngr {
205 if let Err(err) = cm.subscribe(id, address).await {
206 error!("{}", err);
207 }
208 }
209 }
210 }
211 }
212}