1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
use async_std::{sync::Arc, task};
use futures::{
    channel::mpsc::{self, Receiver, Sender},
    sink::SinkExt,
    stream::StreamExt,
};
use log::error;

use koibumi_core::{
    address::Address,
    message::{self, UserAgent},
    net::SocketAddrExt,
    object,
};

use crate::{
    config::Config,
    connection_loop::{ConnectionManager, Context},
    db,
    node_manager::Rating,
    user_manager::User,
};

/// The events which occur in a Bitmessage node.
#[derive(Clone, Debug)]
pub enum Event {
    /// Indicates the stats of the counts of connections have been changed.
    ConnectionCounts {
        /// The count of incoming initiated connections.
        incoming_initiated: usize,
        /// The count of incoming connected connections.
        incoming_connected: usize,
        /// The count of incoming established connections.
        incoming_established: usize,
        /// The count of outgoing initiated connections.
        outgoing_initiated: usize,
        /// The count of outgoing connected connections.
        outgoing_connected: usize,
        /// The count of outgoing established connections.
        outgoing_established: usize,
    },

    /// Indicates the count of known node addresses has been changed.
    AddrCount(usize),

    /// Indicates a connection to a node has newly been established.
    /// The socket address, the user agent
    /// and the rating of the connectivity of the node are returned.
    Established {
        /// The socket address.
        addr: SocketAddrExt,
        /// The user agent.
        user_agent: UserAgent,
        /// The rating.
        rating: Rating,
    },

    /// Indicates an established connection to a node has been disconnected.
    /// The socket address of the node is returned.
    Disconnected {
        /// The socket address.
        addr: SocketAddrExt,
    },

    /// Indicates the stats of the counts of objects are changed.
    Objects {
        /// The count of missing objects.
        missing: usize,
        /// The count of loaded objects.
        loaded: usize,
        /// The count of uploaded objects.
        uploaded: usize,
    },

    /// Indicates the node has been stopped.
    Stopped,

    /// Indicates that an user received a broadcast message.
    Broadcast {
        /// The user ID.
        user_id: Vec<u8>,
        /// The Bitmessage address of the sender of the broadcast message.
        address: Address,
        /// The message object.
        object: message::Object,
    },
}

/// The commands which accepted by a node.
pub enum Command {
    /// Initializes the node with specified configuration set
    /// and starts the various background tasks
    /// with specified database connection pool.
    /// No outgoing connections are initiated yet.
    /// Incoming connections can be accepted.
    ///
    /// `Response::Started` with event receiver will be returned.
    Start(Box<Config>, db::SqlitePool, Vec<User>),

    /// Disconnects all connections and stop the node.
    Stop,

    /// Abort the tasks which remain after stop command was issued.
    Abort,

    /// Performs PoW and sends the object.
    Send {
        /// The header of the object to send.
        header: object::Header,
        /// The payload of the object to send.
        payload: Vec<u8>,
    },
}

/// The responses to the commands.
pub enum Response {
    /// Indicates the node has been started.
    /// The receiver for events is returned.
    Started(Receiver<Event>),
}

/// Runs an event loop that manage a Bitmessage node.
pub async fn run(receiver: Receiver<Command>, sender: Sender<Response>) {
    let mut receiver = receiver;
    let mut sender = sender;

    let mut conn_mngr: Option<ConnectionManager> = None;
    let mut ctx: Option<Arc<Context>> = None;
    let mut _handle: Option<task::JoinHandle<()>> = None;
    while let Some(event) = receiver.next().await {
        match event {
            Command::Start(config, pool, users) => {
                if conn_mngr.is_none() {
                    let (bm_event_sender, bm_event_receiver) =
                        mpsc::channel(config.channel_buffer());
                    let cm = ConnectionManager::new(
                        config.as_ref().clone(),
                        pool,
                        users,
                        bm_event_sender,
                    );
                    sender
                        .send(Response::Started(bm_event_receiver))
                        .await
                        .unwrap();
                    ctx = Some(cm.ctx());
                    conn_mngr = Some(cm);
                }
            }
            Command::Stop => {
                if let Some(cm) = conn_mngr {
                    _handle = Some(cm.stop());
                    conn_mngr = None;
                }
            }
            Command::Abort => {
                if let Some(s) = ctx {
                    s.abort();
                    ctx = None;
                }
            }
            Command::Send { header, payload } => {
                if let Some(cm) = &conn_mngr {
                    if let Err(err) = cm.send(header, payload).await {
                        error!(target: "koibumi", "{}", err);
                    }
                }
            }
        }
    }
}