use std::{convert::TryFrom, io::Cursor, sync::Arc};
use crossbeam_channel::{Receiver, Sender};
use log::{debug, error};
use koibumi_core::{
io::SizedReadFrom,
message,
net::SocketAddrExt,
object::{self, ObjectKind},
time::Time,
};
use crate::{
connection_loop::Context,
node_manager::{Entry as NodeEntry, Event as NodeEvent},
user_manager::Event as UserEvent,
};
#[derive(Debug)]
pub enum Event {
Process(message::Object),
}
fn process_getpubkey(_object: message::Object, _user_sender: &mut Sender<UserEvent>) {
}
fn process_pubkey(_object: message::Object, _user_sender: &mut Sender<UserEvent>) {
}
fn process_msg(object: message::Object, user_sender: &mut Sender<UserEvent>) {
if let Err(err) = user_sender.send(UserEvent::Msg(object)) {
error!("{}", err);
}
}
fn process_broadcast(object: message::Object, user_sender: &mut Sender<UserEvent>) {
if let Err(err) = user_sender.send(UserEvent::Broadcast(object)) {
error!("{}", err);
}
}
fn process_onionpeer(object: message::Object, node_sender: &mut Sender<NodeEvent>) {
let onionpeer = {
let mut bytes = Cursor::new(object.object_payload());
object::Onionpeer::sized_read_from(&mut bytes, object.object_payload().len())
};
match onionpeer {
Ok(onionpeer) => match SocketAddrExt::try_from(onionpeer) {
Ok(addr) => {
let entry =
NodeEntry::new(object.header().stream_number(), addr.into(), Time::now());
if let Err(err) = node_sender.send(NodeEvent::Add(vec![entry])) {
error!("{}", err);
}
}
Err(err) => {
error!("{}", err);
}
},
Err(err) => {
debug!("{}", err);
}
}
}
pub fn process(ctx: Arc<Context>, receiver: Receiver<Event>) {
let mut node_sender = ctx.node_sender().clone();
let mut user_sender = ctx.user_sender().clone();
while let Ok(event) = receiver.recv() {
match event {
Event::Process(object) => {
let kind = ObjectKind::try_from(object.header().object_type());
if let Err(err) = &kind {
debug!("{}", err);
continue;
}
match kind.unwrap() {
ObjectKind::Getpubkey => process_getpubkey(object, &mut user_sender),
ObjectKind::Pubkey => process_pubkey(object, &mut user_sender),
ObjectKind::Msg => process_msg(object, &mut user_sender),
ObjectKind::Broadcast => process_broadcast(object, &mut user_sender),
ObjectKind::Onionpeer => process_onionpeer(object, &mut node_sender),
}
}
}
}
}