koibumi-node-sync 0.0.0

A Bitmessage node implementation as a library for Koibumi (sync version), an experimental Bitmessage client
Documentation
use std::{convert::TryFrom, io::Cursor, sync::Arc};

use crossbeam_channel::{Receiver, Sender};
use log::{debug, error};

use koibumi_core::{
    io::SizedReadFrom,
    message,
    net::SocketAddrExt,
    object::{self, ObjectKind},
    time::Time,
};

use crate::{
    connection_loop::Context,
    node_manager::{Entry as NodeEntry, Event as NodeEvent},
    user_manager::Event as UserEvent,
};

#[derive(Debug)]
pub enum Event {
    Process(message::Object),
}

fn process_getpubkey(_object: message::Object, _user_sender: &mut Sender<UserEvent>) {
    // TODO
}

fn process_pubkey(_object: message::Object, _user_sender: &mut Sender<UserEvent>) {
    // TODO
}

fn process_msg(object: message::Object, user_sender: &mut Sender<UserEvent>) {
    if let Err(err) = user_sender.send(UserEvent::Msg(object)) {
        error!("{}", err);
    }
}

fn process_broadcast(object: message::Object, user_sender: &mut Sender<UserEvent>) {
    if let Err(err) = user_sender.send(UserEvent::Broadcast(object)) {
        error!("{}", err);
    }
}

fn process_onionpeer(object: message::Object, node_sender: &mut Sender<NodeEvent>) {
    let onionpeer = {
        let mut bytes = Cursor::new(object.object_payload());
        object::Onionpeer::sized_read_from(&mut bytes, object.object_payload().len())
    };
    match onionpeer {
        Ok(onionpeer) => match SocketAddrExt::try_from(onionpeer) {
            Ok(addr) => {
                let entry =
                    NodeEntry::new(object.header().stream_number(), addr.into(), Time::now());
                if let Err(err) = node_sender.send(NodeEvent::Add(vec![entry])) {
                    error!("{}", err);
                }
            }
            Err(err) => {
                error!("{}", err);
            }
        },
        Err(err) => {
            debug!("{}", err);
        }
    }
}

pub fn process(ctx: Arc<Context>, receiver: Receiver<Event>) {
    let mut node_sender = ctx.node_sender().clone();
    let mut user_sender = ctx.user_sender().clone();

    while let Ok(event) = receiver.recv() {
        match event {
            Event::Process(object) => {
                let kind = ObjectKind::try_from(object.header().object_type());
                if let Err(err) = &kind {
                    debug!("{}", err);
                    continue;
                }
                match kind.unwrap() {
                    ObjectKind::Getpubkey => process_getpubkey(object, &mut user_sender),
                    ObjectKind::Pubkey => process_pubkey(object, &mut user_sender),
                    ObjectKind::Msg => process_msg(object, &mut user_sender),
                    ObjectKind::Broadcast => process_broadcast(object, &mut user_sender),
                    ObjectKind::Onionpeer => process_onionpeer(object, &mut node_sender),
                }
            }
        }
    }
}