1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
use async_std::{sync::Arc, task};
use futures::{
    channel::mpsc::{self, Receiver, Sender},
    sink::SinkExt,
    stream::StreamExt,
};
use log::error;

use koibumi_core::{message::UserAgent, net::SocketAddrExt};

use crate::{
    config::Config,
    connection_loop::{ConnectionManager, Stat},
    node_manager::Rating,
};

/// The events which occur in a Bitmessage node.
#[derive(Clone, Debug)]
pub enum Event {
    /// Indicates the stats of the counts of connections have been changed.
    ConnectionCounts {
        incoming_initiated: usize,
        incoming_connected: usize,
        incoming_established: usize,
        outgoing_initiated: usize,
        outgoing_connected: usize,
        outgoing_established: usize,
    },

    /// Indicates the count of known node addresses has been changed.
    AddrCount(usize),

    /// Indicates a connection to a node has newly been established.
    /// The socket address, the user agent
    /// and the rating of the connectivity of the node are returned.
    Established {
        addr: SocketAddrExt,
        user_agent: UserAgent,
        rating: Rating,
    },

    /// Indicates an established connection to a node has been disconnected.
    /// The socket address of the node is returned.
    Disconnected { addr: SocketAddrExt },

    /// Indicates the stats of the counts of objects are changed.
    Objects {
        missing: usize,
        loaded: usize,
        uploaded: usize,
    },

    /// Indicates the node has been stopped.
    Stopped,
}

/// The commands which accepted by a node.
pub enum Command {
    /// Initializes the node with specified configuration set
    /// and starts the various background tasks
    /// with specified database connection pool.
    /// No outgoing connections are initiated yet.
    /// Incoming connections can be accepted.
    ///
    /// `Response::Started` with event receiver will be returned.
    Start(Box<Config>, sqlx::SqlitePool),

    /// Disconnects all connections and stop the node.
    Stop,

    /// Abort the tasks which remain after stop command was issued.
    Abort,

    /// Initiates an outgoing connection to the node that has
    /// the specified socket address.
    Connect(SocketAddrExt),
}

/// The responses to the commands.
pub enum Response {
    /// Indicates the node has been started.
    /// The receiver for events is returned.
    Started(Receiver<Event>),
}

/// Runs an event loop that manage a Bitmessage node.
pub async fn run(receiver: Receiver<Command>, sender: Sender<Response>) {
    let mut receiver = receiver;
    let mut sender = sender;

    let mut conn_mngr: Option<ConnectionManager> = None;
    let mut stat: Option<Arc<Stat>> = None;
    let mut _handle: Option<task::JoinHandle<()>> = None;
    while let Some(event) = receiver.next().await {
        match event {
            Command::Start(config, pool) => {
                if conn_mngr.is_none() {
                    let (bm_event_sender, bm_event_receiver) =
                        mpsc::channel(config.channel_buffer());
                    let cm = ConnectionManager::new(config.as_ref().clone(), pool, bm_event_sender);
                    sender
                        .send(Response::Started(bm_event_receiver))
                        .await
                        .unwrap();
                    stat = Some(cm.stat());
                    conn_mngr = Some(cm);
                }
            }
            Command::Stop => {
                if let Some(cm) = conn_mngr {
                    _handle = Some(cm.stop());
                    conn_mngr = None;
                }
            }
            Command::Abort => {
                if let Some(s) = stat {
                    s.abort();
                    stat = None;
                }
            }
            Command::Connect(addr) => {
                if let Some(cm) = &mut conn_mngr {
                    if let Err(err) = cm.connect(addr).await {
                        error!(target: "koibumi", "{}", err);
                    }
                }
            }
        }
    }
}