koibumi-node-sync 0.0.0

A Bitmessage node implementation as a library for Koibumi (sync version), an experimental Bitmessage client
Documentation
use std::sync::{atomic::Ordering, Arc};

use crossbeam_channel::Sender;
use log::{error, info};

use koibumi_core::{
    message::{self, InvHash, Message, NetAddr, Pack, StreamNumbers},
    net::SocketAddrExt,
    time::Time,
};

use crate::{
    connection_loop::{Context, Event, Result},
    constant::{OBJECT_FUTURE_LIFETIME, OBJECT_PAST_LIFETIME},
    inv_manager::Event as InvEvent,
    manager::Event as BmEvent,
    net::SocketAddrNode,
    node_manager::{Entry as NodeEntry, Event as NodeEvent},
};

#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub enum Direction {
    Incoming,
    Outgoing,
}

#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
enum State {
    Pending,
    Established,
}

pub struct Connection {
    broker: Sender<Event>,
    addr: SocketAddrNode,
    direction: Direction,
    ctx: Arc<Context>,
    bm_event_sender: Sender<BmEvent>,
    node_sender: Sender<NodeEvent>,
    inv_sender: Sender<InvEvent>,

    state: State,
    version_sent: bool,
    version_received: bool,
    verack_received: bool,

    peer_version: Option<message::Version>,
}

impl Connection {
    pub fn new(
        broker: Sender<Event>,
        addr: SocketAddrNode,
        direction: Direction,
        ctx: Arc<Context>,
        bm_event_sender: Sender<BmEvent>,
        node_sender: Sender<NodeEvent>,
        inv_sender: Sender<InvEvent>,
    ) -> Self {
        Self {
            broker,
            addr,
            direction,
            ctx,
            bm_event_sender,
            node_sender,
            inv_sender,

            state: State::Pending,
            version_sent: false,
            version_received: false,
            verack_received: false,

            peer_version: None,
        }
    }

    pub fn direction(&self) -> Direction {
        self.direction
    }

    pub fn ctx(&self) -> &Arc<Context> {
        &self.ctx
    }

    pub fn write_message<T>(&mut self, message: &T) -> Result<()>
    where
        T: Message + Pack,
    {
        let packet = message.pack(self.ctx.config().core()).unwrap();
        let list = vec![packet];
        self.broker.send(Event::Write {
            addr: self.addr.clone(),
            list,
        })?;
        Ok(())
    }

    pub fn stream_numbers(&self) -> StreamNumbers {
        // TODO
        vec![1u32.into()].into()
    }

    pub fn common_stream_numbers(&self, v: &StreamNumbers) -> StreamNumbers {
        self.stream_numbers().intersection(v)
    }

    pub fn write_version(&mut self) -> Result<()> {
        let version = message::Version::builder(
            self.ctx.config().core(),
            self.ctx.node_nonce(),
            self.ctx.config().user_agent().clone(),
        )
        .stream_numbers(self.ctx.config().stream_numbers().clone())
        .build();
        self.write_message(&version)?;

        self.version_sent = true;
        Ok(())
    }

    pub fn write_version_if_not_sent(&mut self) -> Result<()> {
        if self.version_sent {
            return Ok(());
        }
        self.write_version()?;
        self.version_sent = true;
        Ok(())
    }

    pub fn write_verack(&mut self) -> Result<()> {
        let verack = message::Verack::new();
        self.write_message(&verack)
    }

    pub fn write_error(&mut self, fatal: u64, error_text: impl AsRef<[u8]>) -> Result<()> {
        let error = message::Error::new(fatal.into(), error_text.as_ref().to_vec().into());
        self.write_message(&error)
    }

    pub fn write_pong(&mut self) -> Result<()> {
        let pong = message::Pong::new();
        self.write_message(&pong)
    }

    pub fn set_peer_version(&mut self, version: Option<message::Version>) {
        self.peer_version = version;
    }

    pub fn set_state_established(&mut self) -> Result<()> {
        // TODO initial anti intersection delay

        self.state = State::Established;

        let established = self
            .ctx
            .established(self.direction)
            .fetch_add(1, Ordering::SeqCst)
            + 1;
        info!("({:?}) Established: {}", self.direction, established);

        if let Err(err) = self.broker.send(Event::Established {
            addr: self.addr.clone(),
        }) {
            error!("{}", err);
        }

        self.send_bm_event_connection_counts();
        #[allow(clippy::collapsible_if)]
        if let Some(version) = &self.peer_version {
            if self.direction == Direction::Outgoing {
                if let Err(err) = self.node_sender.send(NodeEvent::ConnectionSucceeded(
                    self.addr.clone(),
                    version.user_agent().clone(),
                )) {
                    error!("{}", err);
                }
            } else {
                if let Err(err) = self.bm_event_sender.send(BmEvent::Established {
                    addr: self.addr.clone(),
                    user_agent: version.user_agent().clone(),
                    rating: 0.into(),
                }) {
                    error!("{}", err)
                }
            }
        }

        let mut close = false;
        if self.direction == Direction::Incoming {
            let count = self.ctx.established(self.direction).load(Ordering::SeqCst);
            if count > self.ctx.config().max_incoming_established() {
                close = true;
            }
        }

        if let Err(err) = self
            .node_sender
            .send(NodeEvent::Send(self.addr.clone(), close))
        {
            error!("{}", err);
        }

        if !close {
            if let Err(err) = self.inv_sender.send(InvEvent::SendBigInv {
                addr: self.addr.clone(),
            }) {
                error!("{}", err);
            }
        }

        Ok(())
    }

    pub fn established(&self) -> bool {
        self.state == State::Established
    }

    pub fn version_sent(&self) -> bool {
        self.version_sent
    }

    pub fn version_received(&self) -> bool {
        self.version_received
    }

    pub fn set_version_received(&mut self) {
        self.version_received = true
    }

    pub fn verack_received(&self) -> bool {
        self.verack_received
    }

    pub fn set_verack_received(&mut self) {
        self.verack_received = true
    }

    pub fn send_bm_event_connection_counts(&mut self) {
        if let Err(err) = self.bm_event_sender.send(BmEvent::ConnectionCounts {
            incoming_initiated: self
                .ctx
                .initiated(Direction::Incoming)
                .load(Ordering::SeqCst),
            incoming_connected: self
                .ctx
                .connected(Direction::Incoming)
                .load(Ordering::SeqCst),
            incoming_established: self
                .ctx
                .established(Direction::Incoming)
                .load(Ordering::SeqCst),
            outgoing_initiated: self
                .ctx
                .initiated(Direction::Outgoing)
                .load(Ordering::SeqCst),
            outgoing_connected: self
                .ctx
                .connected(Direction::Outgoing)
                .load(Ordering::SeqCst),
            outgoing_established: self
                .ctx
                .established(Direction::Outgoing)
                .load(Ordering::SeqCst),
        }) {
            error!("{}", err);
        }
    }

    pub fn add_addrs(&mut self, addrs: &[NetAddr]) {
        let mut list = Vec::with_capacity(addrs.len());
        for addr in addrs {
            let saddr: SocketAddrExt = addr.socket_addr().clone().into();
            let entry = NodeEntry::new(addr.stream_number(), saddr.into(), addr.time());
            list.push(entry);
        }
        if let Err(err) = self.node_sender.send(NodeEvent::Add(list)) {
            error!("{}", err);
        }
    }

    pub fn add_inv_hashes(&mut self, list: Vec<InvHash>) {
        if let Err(err) = self.inv_sender.send(InvEvent::Inv {
            addr: self.addr.clone(),
            list,
        }) {
            error!("{}", err);
        }
    }

    pub fn send_objects(&mut self, list: Vec<InvHash>) {
        if let Err(err) = self.broker.send(Event::GetObjects {
            addr: self.addr.clone(),
            list,
        }) {
            error!("{}", err);
        }
    }

    pub fn add_object(&mut self, object: message::Object) {
        let now = Time::now();
        let expires_time = object.header().expires_time();
        match expires_time.checked_add(OBJECT_PAST_LIFETIME) {
            Some(target) => {
                if now > target {
                    return;
                }
            }
            None => return,
        }
        match now.checked_add(OBJECT_FUTURE_LIFETIME) {
            Some(target) => {
                if expires_time > target {
                    return;
                }
            }
            None => return,
        }
        match object.validate_pow(self.ctx.config().core()) {
            Ok(_) => {
                if let Err(err) = self.inv_sender.send(InvEvent::Object {
                    addr: self.addr.clone(),
                    object,
                }) {
                    error!("{}", err);
                }
            }
            Err(_) => {
                if let Err(err) = self.inv_sender.send(InvEvent::Drop(object.inv_hash())) {
                    error!("{}", err);
                }
            }
        }
    }
}