use crate::{
config::TransitConfig,
i2np::Message,
primitives::RouterId,
router::context::RouterContext,
runtime::{MetricType, Runtime},
shutdown::ShutdownHandle,
subsystem::{Source, SubsystemHandle},
tunnel::{
handle::{CommandRecycle, TunnelManagerCommand},
pool::{ClientSelector, ExploratorySelector, TunnelPool, TunnelPoolBuildParameters},
transit::TransitTunnelManager,
},
};
use thingbuf::mpsc::Receiver;
use alloc::vec::Vec;
use core::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
mod fragment;
mod garlic;
mod handle;
mod hop;
mod metrics;
mod noise;
mod pool;
mod transit;
#[cfg(test)]
mod tests;
#[cfg(test)]
pub use garlic::GarlicHandler;
#[cfg(test)]
pub use pool::{TunnelMessage, TunnelMessageRecycle};
pub use garlic::DeliveryInstructions;
pub use handle::TunnelManagerHandle;
pub use noise::NoiseContext;
pub use pool::{TunnelMessageSender, TunnelPoolConfig, TunnelPoolEvent, TunnelPoolHandle};
const LOG_TARGET: &str = "emissary::tunnel";
const TUNNEL_EXPIRATION: Duration = Duration::from_secs(10 * 60);
pub struct TunnelManager<R: Runtime> {
command_rx: Receiver<TunnelManagerCommand, CommandRecycle>,
exploratory_selector: ExploratorySelector<R>,
router_ctx: RouterContext<R>,
subsystem_handle: SubsystemHandle,
}
impl<R: Runtime> TunnelManager<R> {
pub fn new(
router_ctx: RouterContext<R>,
exploratory_config: TunnelPoolConfig,
insecure_tunnels: bool,
transit_config: Option<TransitConfig>,
transit_shutdown_handle: ShutdownHandle,
subsystem_handle: SubsystemHandle,
subsys_transit_rx: Receiver<Vec<(RouterId, Message)>>,
) -> (Self, TunnelManagerHandle, TunnelPoolHandle) {
tracing::info!(
target: LOG_TARGET,
?insecure_tunnels,
"starting tunnel manager",
);
R::spawn(TransitTunnelManager::<R>::new(
transit_config,
router_ctx.clone(),
subsystem_handle.clone().with_source(Source::Transit),
subsys_transit_rx,
transit_shutdown_handle,
));
let (pool_handle, exploratory_selector) = {
let build_parameters = TunnelPoolBuildParameters::new(exploratory_config);
let selector = ExploratorySelector::new(
router_ctx.profile_storage().clone(),
build_parameters.context_handle.clone(),
insecure_tunnels,
);
let (tunnel_pool, tunnel_pool_handle) = TunnelPool::<R, _>::new(
build_parameters,
selector.clone(),
subsystem_handle.clone().with_source(Source::Exploratory),
router_ctx.clone(),
);
R::spawn(tunnel_pool);
(tunnel_pool_handle, selector)
};
let (manager_handle, command_rx) = TunnelManagerHandle::new();
(
Self {
command_rx,
exploratory_selector,
router_ctx,
subsystem_handle: subsystem_handle.with_source(Source::Client),
},
manager_handle,
pool_handle,
)
}
pub fn metrics(metrics: Vec<MetricType>) -> Vec<MetricType> {
metrics::register_metrics(metrics)
}
fn on_create_tunnel_pool(&self, config: TunnelPoolConfig) -> TunnelPoolHandle {
tracing::info!(
target: LOG_TARGET,
?config,
"create tunnel pool",
);
let build_parameters = TunnelPoolBuildParameters::new(config);
let selector = ClientSelector::new(
self.exploratory_selector.clone(),
build_parameters.context_handle.clone(),
);
let (tunnel_pool, tunnel_pool_handle) = TunnelPool::<R, _>::new(
build_parameters,
selector,
self.subsystem_handle.clone(),
self.router_ctx.clone(),
);
R::spawn(tunnel_pool);
tunnel_pool_handle
}
}
impl<R: Runtime> Future for TunnelManager<R> {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match self.command_rx.poll_recv(cx) {
Poll::Pending => break,
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(TunnelManagerCommand::CreateTunnelPool { config, tx })) => {
let _ = tx.send(self.on_create_tunnel_pool(config));
}
Poll::Ready(Some(TunnelManagerCommand::Dummy)) => unreachable!(),
}
}
Poll::Pending
}
}