koibumi_node/
manager.rs

1use async_std::{sync::Arc, task};
2use futures::{
3    channel::mpsc::{self, Receiver, Sender},
4    sink::SinkExt,
5    stream::StreamExt,
6};
7use log::error;
8
9use koibumi_core::{
10    address::Address,
11    identity::Private as PrivateIdentity,
12    message::{self, UserAgent},
13    object,
14};
15
16use crate::{
17    config::Config,
18    connection_loop::{ConnectionManager, Context},
19    db,
20    net::SocketAddrNode,
21    node_manager::Rating,
22    user_manager::User,
23};
24
25/// The events which occur in a Bitmessage node.
26#[derive(Clone, Debug)]
27pub enum Event {
28    /// Indicates the stats of the counts of connections have been changed.
29    ConnectionCounts {
30        /// The count of incoming initiated connections.
31        incoming_initiated: usize,
32        /// The count of incoming connected connections.
33        incoming_connected: usize,
34        /// The count of incoming established connections.
35        incoming_established: usize,
36        /// The count of outgoing initiated connections.
37        outgoing_initiated: usize,
38        /// The count of outgoing connected connections.
39        outgoing_connected: usize,
40        /// The count of outgoing established connections.
41        outgoing_established: usize,
42    },
43
44    /// Indicates the count of known node addresses has been changed.
45    AddrCount(usize),
46
47    /// Indicates a connection to a node has newly been established.
48    /// The socket address, the user agent
49    /// and the rating of the connectivity of the node are returned.
50    Established {
51        /// The socket address.
52        addr: SocketAddrNode,
53        /// The user agent.
54        user_agent: UserAgent,
55        /// The rating.
56        rating: Rating,
57    },
58
59    /// Indicates an established connection to a node has been disconnected.
60    /// The socket address of the node is returned.
61    Disconnected {
62        /// The socket address.
63        addr: SocketAddrNode,
64    },
65
66    /// Indicates the stats of the counts of objects are changed.
67    Objects {
68        /// The count of missing objects.
69        missing: usize,
70        /// The count of loaded objects.
71        loaded: usize,
72        /// The count of uploaded objects.
73        uploaded: usize,
74    },
75
76    /// Indicates the node has been stopped.
77    Stopped,
78
79    /// Indicates that an user received a msg message.
80    Msg {
81        /// The user ID.
82        user_id: Vec<u8>,
83        /// The Bitmessage address of the receiver of the msg message.
84        address: Address,
85        /// The message object.
86        object: message::Object,
87    },
88
89    /// Indicates that an user received a broadcast message.
90    Broadcast {
91        /// The user ID.
92        user_id: Vec<u8>,
93        /// The Bitmessage address of the sender of the broadcast message.
94        address: Address,
95        /// The message object.
96        object: message::Object,
97    },
98}
99
100/// The commands which accepted by a node.
101pub enum Command {
102    /// Initializes the node with specified configuration set
103    /// and starts the various background tasks
104    /// with specified database connection pool.
105    /// No outgoing connections are initiated yet.
106    /// Incoming connections can be accepted.
107    ///
108    /// `Response::Started` with event receiver will be returned.
109    Start(Box<Config>, db::SqlitePool, Vec<User>),
110
111    /// Disconnects all connections and stop the node.
112    Stop,
113
114    /// Abort the tasks which remain after stop command was issued.
115    Abort,
116
117    /// Performs PoW and sends the object.
118    Send {
119        /// The header of the object to send.
120        header: object::Header,
121        /// The payload of the object to send.
122        payload: Vec<u8>,
123    },
124
125    /// Adds an identity to the user specified by the ID.
126    AddIdentity {
127        /// The user ID.
128        id: Vec<u8>,
129        /// The identity.
130        identity: PrivateIdentity,
131    },
132
133    /// Makes the user subscribe to the address.
134    Subscribe {
135        /// The user ID.
136        id: Vec<u8>,
137        /// The address.
138        address: Address,
139    },
140}
141
142/// The responses to the commands.
143pub enum Response {
144    /// Indicates the node has been started.
145    /// The receiver for events is returned.
146    Started(Receiver<Event>),
147}
148
149/// Runs an event loop that manage a Bitmessage node.
150pub async fn run(receiver: Receiver<Command>, sender: Sender<Response>) {
151    let mut receiver = receiver;
152    let mut sender = sender;
153
154    let mut conn_mngr: Option<ConnectionManager> = None;
155    let mut ctx: Option<Arc<Context>> = None;
156    let mut _handle: Option<task::JoinHandle<()>> = None;
157    while let Some(event) = receiver.next().await {
158        match event {
159            Command::Start(config, pool, users) => {
160                if conn_mngr.is_none() {
161                    let (bm_event_sender, bm_event_receiver) =
162                        mpsc::channel(config.channel_buffer());
163                    let cm = ConnectionManager::new(
164                        config.as_ref().clone(),
165                        pool,
166                        users,
167                        bm_event_sender,
168                    );
169                    sender
170                        .send(Response::Started(bm_event_receiver))
171                        .await
172                        .unwrap();
173                    ctx = Some(cm.ctx());
174                    conn_mngr = Some(cm);
175                }
176            }
177            Command::Stop => {
178                if let Some(cm) = conn_mngr {
179                    _handle = Some(cm.stop());
180                    conn_mngr = None;
181                }
182            }
183            Command::Abort => {
184                if let Some(s) = ctx {
185                    s.abort();
186                    ctx = None;
187                }
188            }
189            Command::Send { header, payload } => {
190                if let Some(cm) = &conn_mngr {
191                    if let Err(err) = cm.send(header, payload).await {
192                        error!("{}", err);
193                    }
194                }
195            }
196            Command::AddIdentity { id, identity } => {
197                if let Some(cm) = &conn_mngr {
198                    if let Err(err) = cm.add_identity(id, identity).await {
199                        error!("{}", err);
200                    }
201                }
202            }
203            Command::Subscribe { id, address } => {
204                if let Some(cm) = &conn_mngr {
205                    if let Err(err) = cm.subscribe(id, address).await {
206                        error!("{}", err);
207                    }
208                }
209            }
210        }
211    }
212}