use std::time::Duration;
use tracing::info_span;
use futures::Future;
use futures::future::BoxFuture;
use tokio::sync::watch;
use tokio::task;
use tokio::time::sleep;
use crate::control::{
KeyArg,
NodeIDArg,
AnyRequest,
DispatcherReq,
RouterReq,
Command,
Response,
command::request
};
use crate::daemon::{
Daemon,
DispatchBundle,
NetBundle
};
use crate::event::{
AnyEvent,
Interest
};
use crate::net::interface::{
DummyServer,
InterfaceLauncher
};
use crate::net::{
InterfaceAddress,
Router,
RouterSetup
};
use crate::node::Node;
use crate::peer::{
Address,
NodeID
};
use crate::store::Store;
use crate::util::*;
mod daemon;
mod event;
mod link;
mod node;
mod ops;
mod parse;
mod router;
struct DaemonRouterKit {
pub id: NodeID,
pub daemon: Daemon,
pub router: Router,
pub ctrl_tx: Sender<AnyRequest>,
pub dispatch_rx: Receiver<AnyEvent>,
pub interest_rx: Receiver<Interest>,
pub tasks: Vec<Task>
}
struct DaemonSetup {
pub id: NodeID,
pub daemon: Daemon,
pub ctrl_tx: Sender<AnyRequest>,
pub dispatch_rx: Receiver<AnyEvent>,
pub interest_rx: Receiver<Interest>,
pub inbound_packet_sender: Sender<PktFrom>,
pub outbound_packet_receiver: Receiver<PktTo>,
pub router_ctrl_rx: Receiver<RouterReq>,
pub router_monitor_tx: watch::Sender<bool>
}
type Task = task::JoinHandle<()>;
async fn sleep_ms(millis: u64) {
sleep(Duration::from_millis(millis)).await;
}
async fn timeout_100ms<T, F: Future<Output = T>>(fut: F) -> T {
fut
.timeout_ms(100)
.await
.expect("100ms elapsed")
}
async fn timeout_unwrap<T, E, F>(fut: F) -> T
where
E: std::fmt::Debug,
F: Future<Output = Result<T, E>>
{
fut
.timeout_ms(100)
.await
.expect("100ms elapsed")
.unwrap()
}
fn plug_pipes<T>(mut rx: Receiver<T>, tx: Sender<T>) -> Task
where T: Send + Sized + 'static
{
task::spawn(async move {
while let Some(thing) = rx.recv().await {
let _ = tx.send(thing);
}
})
}
fn plug_watch<T>(mut rx: watch::Receiver<T>, tx: watch::Sender<T>) -> Task
where T: Copy + Send + Sync + 'static
{
task::spawn(async move {
while rx.changed().await.is_ok() {
let _ = tx.send(*rx.borrow());
}
})
}
fn bare_daemon(name: String) -> DaemonSetup {
let span = info_span!("test_daemon", %name);
let init = info_span!("daemon_test_init", %name);
let _g = init.enter();
let id = NodeIDArg::random().get();
let (ctrl_tx, ctrl_rx) = new_channel::<AnyRequest>();
let (dispatcher_sender, dispatch_rx)
= new_channel::<AnyEvent>();
let (interest_tx, interest_rx) = new_channel::<Interest>();
let (dispatcher_query_tx, _dispatcher_query_rx)
= new_channel::<DispatcherReq>();
let (inbound_packet_sender, inbound_packet_receiver)
= new_channel::<PktFrom>();
let (router_query_sender, router_ctrl_rx)
= new_channel::<RouterReq>();
let (router_monitor_tx, router_monitor)
= watch::channel::<bool>(false);
let (node, dispatch_rx_2, outbound_packet_receiver) = Node::init(
id,
name,
Store::create_in_memory().unwrap()
).unwrap();
let dispatch_pipe = plug_pipes(dispatch_rx_2, dispatcher_sender.clone());
let net_bundle = NetBundle {
inbound_pkt_rx: inbound_packet_receiver,
router_tx: router_query_sender,
monitor: router_monitor
};
let dispatch_bundle = DispatchBundle {
event_tx: dispatcher_sender,
query_tx: dispatcher_query_tx,
interest_tx
};
let daemon = Daemon::from_raw_parts (
node,
ctrl_rx,
net_bundle,
dispatch_bundle,
vec![dispatch_pipe],
span
);
info!(:init, "complete");
DaemonSetup {
id,
daemon,
ctrl_tx,
dispatch_rx,
interest_rx,
inbound_packet_sender,
outbound_packet_receiver,
router_ctrl_rx,
router_monitor_tx
}
}
fn plug_router(daemon_setup: DaemonSetup, router_setup: RouterSetup)
-> DaemonRouterKit
{
let DaemonSetup {
id,
daemon,
ctrl_tx,
dispatch_rx,
interest_rx,
inbound_packet_sender,
outbound_packet_receiver,
router_ctrl_rx,
router_monitor_tx
} = daemon_setup;
let RouterSetup {
router,
inbound_packet_receiver,
router_monitor,
outbound_packet_sender,
router_query_sender,
} = router_setup;
let tasks = vec![
plug_pipes(inbound_packet_receiver, inbound_packet_sender),
plug_pipes(outbound_packet_receiver, outbound_packet_sender),
plug_pipes(router_ctrl_rx, router_query_sender),
plug_watch(router_monitor, router_monitor_tx)
];
DaemonRouterKit {
id,
daemon,
router,
ctrl_tx,
dispatch_rx,
interest_rx,
tasks
}
}
fn loopback_address(i: u8) -> Address {
Address::V4UdpChaCha20(
std::net::SocketAddrV4::new([127, 0, 0, i].into(), 9999)
)
}
fn loopback_iface_address(i: u8) -> InterfaceAddress {
InterfaceAddress::V4UdpChaCha20{
addr: std::net::SocketAddrV4::new([127, 0, 0, i].into(), 9999),
if_name: Some("lo".to_string()),
routes: vec!["127.0.0.1/8".parse().unwrap()]
}
}
fn dummy_address(i: usize) -> Address {
Address::Dummy(i)
}
fn setup_n(names: Vec<String>) -> (Vec<DaemonRouterKit>, Vec<Address>, Task) {
let (dummies, observer_task) = DummyServer::create_n(names.len());
let addrs = dummies
.iter()
.map(|d| d.get_address().get_address())
.collect();
(
names
.into_iter()
.zip(dummies)
.map(|(name, dummy)| daemon_with_dummy(name, dummy))
.collect(),
addrs,
observer_task
)
}
fn daemon_with_dummy(name: String, server: DummyServer) -> DaemonRouterKit {
let span = info_span!("dummy_router");
let daemon_setup = bare_daemon(name);
let router_setup = Router::setup(KeyArg::random(), &span);
let reply = router_setup.router_query_sender.request(
RouterReq::LaunchInterface,
InterfaceLauncher::Dummy(server)
);
drop(reply);
plug_router(daemon_setup, router_setup)
}
type TestMsg = &'static str;
type TestCase<'a> = (TestMsg, Command, &'a dyn Fn(Response) -> bool);
type TestResult<'a> = (
TestMsg,
Command,
BoxFuture<'static, Response>,
&'a dyn Fn(Response) -> bool
);
fn check_false(r: Response) -> bool {
!r.is_ok()
}
fn check_err(r: Response) -> bool {
matches!(r, Response::Err(_))
}
fn make_result<'a>(
ctrl_tx: &Sender<AnyRequest>,
test: TestCase<'a>)
-> TestResult<'a>
{
let (msg, cmd, check) = test;
(msg, cmd.clone(), request(ctrl_tx, cmd), check)
}
async fn check_result(result: TestResult<'_>) {
let (msg, cmd, resp, check) = result;
println!("msg: {msg}");
let res = timeout_100ms(resp).await;
println!("cmd: {cmd:?}");
println!("res: {res:?}");
assert!(check(res), "{cmd:?}");
}