use std::path;
use tracing::{
info_span,
instrument,
Instrument,
Span
};
use tokio::task;
use futures::future;
use crate::net::InterfaceAddress;
use crate::config::{
Config,
HandlerDirectory
};
use crate::control::{
KeyArg,
ControlSocket,
AnyRequest,
DaemonReq,
NodeReq,
MirrorReq,
DispatcherReq,
RouterReq,
NodeJoinRequest
};
use crate::event::{
AnyEvent,
DaemonEvent,
Dispatcher,
DispatcherSetup,
Interest,
parse_all,
launch,
get_all_entries
};
use crate::message::Payload;
use crate::node::Node;
use crate::net::router::{
Router,
RouterMonitor,
RouterSetup
};
use crate::net::Mirror;
use crate::peer::NodeID;
use crate::store::{
self,
Store
};
use crate::util::*;
#[derive(Debug)]
pub struct Daemon {
node: Node,
ctrl_rx: Receiver<AnyRequest>,
net: NetBundle,
dispatch: DispatchBundle,
mirror_query_tx: Option<Sender<MirrorReq>>,
tasks: Vec<Task>,
span: Span
}
#[derive(Debug)]
pub struct NetBundle {
pub inbound_pkt_rx: Receiver<PktFrom>,
pub router_tx: Sender<RouterReq>,
pub monitor: RouterMonitor
}
#[derive(Debug)]
pub struct DispatchBundle {
pub event_tx: Sender<AnyEvent>,
pub query_tx: Sender<DispatcherReq>,
pub interest_tx: Sender<Interest>
}
impl Daemon {
pub fn from_raw_parts(
node: Node,
ctrl_rx: Receiver<AnyRequest>,
net: NetBundle,
dispatch: DispatchBundle,
tasks: Vec<Task>,
span: Span)
-> Self
{
Self {
node,
ctrl_rx,
net,
dispatch,
mirror_query_tx: None,
tasks,
span
}
}
pub async fn init(cfg: Config) -> Option<Self> {
let span = info_span!("daemon");
let init = info_span!("daemon_init");
let _g = init.enter();
let addrs: Vec<InterfaceAddress> = cfg.server.addrs
.iter()
.flatten()
.map(|addr| addr.parse())
.filter_map(|parse_res| parse_res
.map_err(|err| error!(:init, %err, "invalid interface address"))
.ok())
.collect();
let db = Store::open(&cfg.db_path())
.map_err(|err| error!(:init, %err, "failed to open db"))
.ok()?;
let (ctrl_task, ctrl_rx) = Self::spawn_control_socket(
cfg.cnsprcy.control_socket_path,
&span
)?;
let (dispatcher_task, dispatch) = Self::spawn_dispatcher(
cfg.handlers,
&db,
&span
).in_current_span().await?;
match db.read_config(store::Config::KEY) {
Ok(Some(_)) => {},
Ok(None) => {
let serialized = cfg.cnsprcy.key.to_string();
match db.write_config(store::Config::KEY, &serialized) {
Ok(true) => {},
Ok(false) => {
error!(:init, "failed to store key in db");
return None;
},
Err(err) => {
error!(:init, %err, "failed to store key in db");
return None;
}
}
},
Err(err) => {
error!(:init, %err, "failed to check if key is already stored");
return None;
}
}
let (router_task, outbound_pkt_tx, net) = Self::spawn_router(
cfg.cnsprcy.key,
addrs,
&span
).in_current_span().await;
let mut tasks = vec![
ctrl_task,
dispatcher_task,
router_task
];
let (mirror_task, mirror_query_tx) = Self::spawn_mirror(
cfg.server.interfaces,
net.router_tx.clone(),
&span
).in_current_span().await.unzip();
tasks.extend(mirror_task);
let node = Node::load(db, outbound_pkt_tx, dispatch.event_tx.clone())
.map_err(|err| error!(:init, %err, "failed to load node from db"))
.ok()
.flatten()?;
info!(:init, "complete");
Some(Self {node, ctrl_rx, net, dispatch, mirror_query_tx, tasks, span})
}
pub async fn run(mut self) -> Option<()> {
self.dispatch_event(DaemonEvent::Start);
let mut ticker = self.span.in_scope(Sleeper::start);
let mut is_active = true;
loop {
if is_active != *self.net.monitor.borrow() {
if !is_active {
info!(@self, "entering active state");
self.dispatch_event(DaemonEvent::Active);
is_active = true;
}
else {
info!(@self, "entering dormant state");
self.dispatch_event(DaemonEvent::Inactive);
is_active = false;
}
}
tokio::select! {
biased;
_ = tokio::signal::ctrl_c() => {
info!(@self, "Interrupt signal received, shutting down!");
break;
},
q = self.ctrl_rx.recv() => match q {
Some(q) => match self.handle_query(q).await {
true => continue,
false => break,
},
None => {
error!(@self, "control socket shut down unexpectedly!");
break
}
},
r = self.net.monitor.changed() => if r.is_err() {
error!(@self, "router crashed, shutting down!");
break;
},
rep = self.net.inbound_pkt_rx.recv() => match rep {
Some((from, pkt)) => self.span.in_scope(|| {
self.node.handle_pkt(from, pkt);
ticker.push(self.node.tick_peers());
}),
None => {
error!(@self, "router crashed, shutting down!");
break;
}
},
Some(()) = ticker.sleep(), if is_active => {
let _g = self.span.enter();
ticker.push(self.node.tick_peers());
}
}
}
let _g = self.span.enter();
self.dispatch_event(DaemonEvent::Stop);
drop(self.node);
drop(self.dispatch.event_tx);
self.ctrl_rx.close();
self.mirror_query_tx.take();
future::join_all(self.tasks).in_current_span().await;
info!("Done, shutdown complete!");
Some(())
}
#[instrument(skip(daemon_span), level = "info", name = "control socket")]
fn spawn_control_socket(
at: path::PathBuf,
daemon_span: &Span)
-> Option<(Task, Receiver<AnyRequest>)>
{
let (tx, rx) = new_channel();
let socket = ControlSocket::open(tx, at, daemon_span)?;
let ctrl_task = tokio::task::spawn(async move {
socket.run().await
});
info!("Done!");
Some((ctrl_task, rx))
}
async fn spawn_dispatcher(
handlers: Option<Vec<HandlerDirectory>>,
db: &Store,
daemon_span: &Span)
-> Option<(Task, DispatchBundle)>
{
let DispatcherSetup {
dispatcher,
event_tx,
query_tx,
interest_tx
} = Dispatcher::setup(daemon_span);
let task = task::spawn(dispatcher.run());
let resolver = |name: &str| -> Option<NodeID> {
db.find_conspirator_id(name)
.map_err(|err| error!(%err, name, "failed to look up id"))
.ok()
.flatten()
};
for handler_dir in handlers.iter().flatten() {
get_all_entries(&handler_dir.path)
.in_current_span()
.await
.map_err(|err| error!(%err, ?handler_dir, "failed to load"))
.map(|entries| parse_all(&resolver, entries)
.into_iter()
.flat_map(|res| res
.map_err(|err| error!(%err, "failed to parse handler"))
.ok()
)
)
.ok()
.into_iter()
.flatten()
.map(launch)
.for_each(|interest| {let _ = interest_tx.send(interest);});
}
Some((task, DispatchBundle {event_tx, query_tx, interest_tx}))
}
async fn spawn_router(
key: KeyArg,
addrs: Vec<InterfaceAddress>,
daemon_span: &Span)
-> (Task, Sender<PktTo>, NetBundle)
{
let RouterSetup {
router,
inbound_packet_receiver,
router_monitor,
outbound_packet_sender,
router_query_sender
} = Router::setup(key, daemon_span);
let router_task = task::spawn(router.run());
for addr in addrs {
let reply = router_query_sender
.request(RouterReq::BindInterface, addr.clone())
.in_current_span()
.await
.unwrap_or(false);
if !reply {
warn!(%addr, "skipping address");
}
}
let bundle = NetBundle {
inbound_pkt_rx: inbound_packet_receiver,
router_tx: router_query_sender,
monitor: router_monitor
};
(router_task, outbound_packet_sender, bundle)
}
async fn spawn_mirror(
interfaces: Option<Vec<String>>,
router_query_sender: Sender<RouterReq>,
daemon_span: &Span)
-> Option<(Task, Sender<MirrorReq>)>
{
let (query_tx, query_rx) = new_channel();
match Mirror::init(interfaces?, query_rx, daemon_span).await {
Ok(mirror) => Some((
task::spawn(mirror.track(router_query_sender)),
query_tx
)),
Err(error) => {
error!(?error, "failed to initialize Mirror");
None
}
}
}
fn dispatch_event<E: Into<AnyEvent>>(&self, event: E) {
self.dispatch.event_tx
.send(event.into())
.unwrap_or_else(|e| error!(@self, event = ?e.0, "failed to send"))
}
async fn handle_query(&mut self, req: AnyRequest) -> bool {
let _g = self.span.enter();
match req {
AnyRequest::Daemon(DaemonReq::GetStatus(((), replier))) => {
let addrs = self.net.router_tx
.get(RouterReq::GetInterfaces)
.timeout_ms(100)
.in_current_span()
.await
.ok()
.and_then(Result::ok)
.unwrap_or_default();
let handlers = self.dispatch.query_tx
.get(DispatcherReq::GetHandlers)
.timeout_ms(100)
.in_current_span()
.await
.ok()
.and_then(Result::ok)
.unwrap_or_default();
self.node.handle_request(
NodeReq::GetStatus(((addrs, handlers), replier))
);
return true
},
AnyRequest::Daemon(DaemonReq::StopServer(req)) => {
info!("received termination signal, shutting down…");
self.node.leave();
req.reply(true);
tokio::task::yield_now().await;
return false
},
AnyRequest::Daemon(DaemonReq::Invite((public_key, replier))) => {
let addrs = self.net.router_tx
.get(RouterReq::GetInterfaces)
.timeout_ms(100)
.in_current_span()
.await
.ok()
.and_then(Result::ok)
.unwrap_or_default()
.iter()
.map(InterfaceAddress::get_address)
.collect();
let req = NodeReq::Invite(((public_key, addrs), replier));
self.node.handle_request(req);
},
AnyRequest::Daemon(DaemonReq::Accept((invitation, replier))) => {
let content = match self.node.decrypt(invitation) {
Ok(content) => content,
Err(err) => {
error!(%err, "failed to accept invitation");
replier.reply(false);
return true;
}
};
let rekeyed = self.net.router_tx
.request(RouterReq::Rekey, content.key)
.timeout_ms(100)
.in_current_span()
.await
.ok()
.and_then(Result::ok)
.unwrap_or(false);
if !rekeyed {
error!("re-key request failed or timed out");
replier.reply(false);
return true;
}
let Some(&addr) = content.addresses.first() else {
error!("invitation contains no addresses");
replier.reply(false);
return true;
};
let id = content.id.into();
let join_req = NodeReq::Join((
NodeJoinRequest { id, name: content.name, addr },
replier
));
self.node.handle_request(join_req);
for address in content.addresses.iter().skip(1).copied() {
debug!(%address, %id, "reaching out");
self.node.send_payload(
id.into(),
Payload::None,
Some(address)
);
}
},
AnyRequest::Node(req) => self.node.handle_request(req),
AnyRequest::Mirror(req) => match self.mirror_query_tx.as_ref() {
Some(tx) => tx.send_request(req),
None => match req {
MirrorReq::Enable((_, r)) |
MirrorReq::Disable((_, r)) => r.reply(false),
}
}
AnyRequest::Router(req) => self.net.router_tx.send_request(req),
AnyRequest::Dispatcher(req) =>
self.dispatch.query_tx.send_request(req),
}
true
}
}