use crate::{
config::{Config, I2cpConfig, MetricsConfig, SamConfig},
crypto::{SigningPrivateKey, StaticPrivateKey},
error::Error,
events::{EventManager, EventSubscriber},
i2cp::I2cpServer,
netdb::NetDb,
primitives::RouterInfo,
profile::ProfileStorage,
router::context::RouterContext,
runtime::{AddressBook, Runtime, Storage},
sam::SamServer,
shutdown::ShutdownContext,
subsystem::SubsystemKind,
transport::{Ntcp2Transport, Ssu2Transport, TransportManager, TransportManagerBuilder},
tunnel::{TunnelManager, TunnelManagerHandle},
};
use bytes::Bytes;
use futures::FutureExt;
use rand_core::RngCore;
use alloc::{string::ToString, sync::Arc, vec::Vec};
use core::{
future::Future,
marker::PhantomData,
net::{Ipv4Addr, SocketAddr},
pin::Pin,
task::{Context, Poll},
time::Duration,
};
pub mod context;
const LOG_TARGET: &str = "emissary::router";
const NET_ID: u8 = 2u8;
const IMMEDIATE_SHUTDOWN_COUNT: usize = 2usize;
const PROFILE_STORAGE_BACKUP_INTERVAL: Duration = Duration::from_secs(15 * 60);
#[derive(Debug, Default, Copy, Clone)]
pub struct ProtocolAddressInfo {
pub ntcp2_port: Option<u16>,
pub sam_tcp: Option<SocketAddr>,
pub sam_udp: Option<SocketAddr>,
pub ssu2_port: Option<u16>,
}
#[derive(Default)]
pub struct RouterBuilder<R> {
address_book: Option<Arc<dyn AddressBook>>,
config: Config,
storage: Option<Arc<dyn Storage>>,
_runtime: PhantomData<R>,
}
impl<R: Runtime> RouterBuilder<R> {
pub fn new(config: Config) -> Self {
Self {
address_book: None,
config,
storage: None,
_runtime: Default::default(),
}
}
pub fn with_address_book(mut self, address_book: Arc<dyn AddressBook>) -> Self {
self.address_book = Some(address_book);
self
}
pub fn with_storage(mut self, storage: Arc<dyn Storage>) -> Self {
self.storage = Some(storage);
self
}
pub async fn build(self) -> crate::Result<(Router<R>, EventSubscriber, Vec<u8>)> {
Router::new(self.config, self.address_book, self.storage).await
}
}
pub struct Router<R: Runtime> {
address_info: ProtocolAddressInfo,
event_manager: EventManager<R>,
shutdown_context: ShutdownContext<R>,
shutdown_count: usize,
transport_manager: TransportManager<R>,
_tunnel_manager_handle: TunnelManagerHandle,
}
impl<R: Runtime> Router<R> {
pub async fn new(
mut config: Config,
address_book: Option<Arc<dyn AddressBook>>,
storage: Option<Arc<dyn Storage>>,
) -> crate::Result<(Self, EventSubscriber, Vec<u8>)> {
let (ntcp2_context, ntcp2_address) =
Ntcp2Transport::<R>::initialize(config.ntcp2.take()).await?;
let (ssu2_context, ssu2_address) =
Ssu2Transport::<R>::initialize(config.ssu2.take()).await?;
if ntcp2_context.is_none() && ssu2_context.is_none() {
tracing::warn!(
target: LOG_TARGET,
"cannot start router, no active transport protocol",
);
return Err(Error::Custom("no transport".to_string()));
}
let local_static_key = StaticPrivateKey::from(config.static_key.unwrap_or_else(|| {
let mut key = [0u8; 32];
R::rng().fill_bytes(&mut key);
key
}));
let local_signing_key = SigningPrivateKey::from(config.signing_key.unwrap_or_else(|| {
let mut key = [0u8; 32];
R::rng().fill_bytes(&mut key);
key
}));
let local_router_info = RouterInfo::new::<R>(
&config,
ntcp2_address,
ssu2_address,
&local_static_key,
&local_signing_key,
config.transit.is_none(),
);
let Config {
i2cp_config,
samv3_config,
floodfill,
net_id,
exploratory,
insecure_tunnels,
routers,
profiles,
allow_local,
metrics,
transit,
refresh_interval,
..
} = config;
let profile_storage = ProfileStorage::<R>::new(&routers, &profiles);
let serialized_router_info = local_router_info.serialize(&local_signing_key);
let local_router_id = local_router_info.identity.id();
let mut address_info = ProtocolAddressInfo::default();
let (event_manager, event_subscriber, event_handle) =
EventManager::<R>::new(refresh_interval.and_then(|refresh_interval| {
if refresh_interval == 0 {
tracing::warn!(
target: LOG_TARGET,
"invalid refresh interval, using default value"
);
return None;
}
Some(Duration::from_secs(refresh_interval as u64))
}));
let mut shutdown_context = ShutdownContext::<R>::new();
let transit_shutdown_handle = shutdown_context.handle();
tracing::info!(
target: LOG_TARGET,
?local_router_id,
net_id = ?net_id.unwrap_or(NET_ID),
"starting emissary",
);
let metrics_handle = match metrics {
None => R::register_metrics(Vec::new(), None),
Some(MetricsConfig { port }) => {
let metrics = TransportManager::<R>::metrics(Vec::new());
let metrics = TunnelManager::<R>::metrics(metrics);
let metrics = NetDb::<R>::metrics(metrics);
R::register_metrics(metrics, Some(port))
}
};
let router_ctx = RouterContext::new(
metrics_handle.clone(),
profile_storage.clone(),
local_router_id.clone(),
Bytes::from(serialized_router_info.clone()),
local_static_key.clone(),
local_signing_key.clone(),
net_id.unwrap_or(NET_ID),
event_handle,
);
let sam_event_handle = router_ctx.event_handle().clone();
let mut transport_manager_builder =
TransportManagerBuilder::new(router_ctx.clone(), local_router_info, allow_local);
transport_manager_builder.with_transit_tunnels_disabled(transit.is_none());
let (tunnel_manager_handle, exploratory_pool_handle, netdb_msg_rx) = {
let transport_service =
transport_manager_builder.register_subsystem(SubsystemKind::Tunnel);
let (tunnel_manager, tunnel_manager_handle, tunnel_pool_handle, netdb_msg_rx) =
TunnelManager::<R>::new(
transport_service,
router_ctx.clone(),
exploratory.into(),
insecure_tunnels,
transit,
transit_shutdown_handle,
);
R::spawn(tunnel_manager);
(tunnel_manager_handle, tunnel_pool_handle, netdb_msg_rx)
};
let netdb_handle = {
let transport_service =
transport_manager_builder.register_subsystem(SubsystemKind::NetDb);
let (netdb, netdb_handle) = NetDb::<R>::new(
router_ctx,
floodfill,
transport_service,
exploratory_pool_handle,
netdb_msg_rx,
);
R::spawn(netdb);
netdb_handle
};
transport_manager_builder.register_netdb_handle(netdb_handle.clone());
if let Some(I2cpConfig { host, port }) = i2cp_config {
let i2cp_server = I2cpServer::<R>::new(
host,
port,
netdb_handle.clone(),
tunnel_manager_handle.clone(),
address_book.clone(),
profile_storage.clone(),
)
.await?;
R::spawn(i2cp_server);
}
if let Some(SamConfig {
tcp_port,
udp_port,
host,
}) = samv3_config
{
let sam_server = SamServer::<R>::new(
tcp_port,
udp_port,
host,
netdb_handle.clone(),
tunnel_manager_handle.clone(),
metrics_handle,
address_book,
sam_event_handle,
profile_storage.clone(),
)
.await?;
address_info.sam_tcp = sam_server.tcp_local_address();
address_info.sam_udp = sam_server.udp_local_address();
R::spawn(sam_server)
}
if let Some(storage) = storage {
R::spawn(async move {
loop {
let _ = R::delay(PROFILE_STORAGE_BACKUP_INTERVAL).await;
let routers = profile_storage.backup();
if !routers.is_empty() {
tracing::info!(
target: LOG_TARGET,
num_routers = ?routers.len(),
"taking backup of profile storage",
);
storage.save_to_disk(routers);
}
}
});
}
if let Some(context) = ntcp2_context {
address_info.ntcp2_port = Some(context.port());
transport_manager_builder.register_ntcp2(context);
}
if let Some(context) = ssu2_context {
address_info.ssu2_port = Some(context.port());
transport_manager_builder.register_ssu2(context);
}
Ok((
Self {
address_info,
event_manager,
shutdown_context,
shutdown_count: 0usize,
transport_manager: transport_manager_builder.build(),
_tunnel_manager_handle: tunnel_manager_handle,
},
event_subscriber,
serialized_router_info,
))
}
pub fn shutdown(&mut self) {
self.shutdown_count += 1;
if self.shutdown_count == 1 {
tracing::info!(
target: LOG_TARGET,
"starting graceful shutdown",
);
self.shutdown_context.shutdown();
self.transport_manager.shutdown();
self.event_manager.shutdown();
} else {
tracing::info!(
target: LOG_TARGET,
"shutting down router",
);
}
}
pub fn protocol_address_info(&self) -> &ProtocolAddressInfo {
&self.address_info
}
pub fn add_external_address(&mut self, address: Ipv4Addr) {
self.transport_manager.add_external_address(address);
}
}
impl<R: Runtime> Future for Router<R> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.shutdown_count >= IMMEDIATE_SHUTDOWN_COUNT {
return Poll::Ready(());
}
if self.shutdown_context.poll_unpin(cx).is_ready() {
return Poll::Ready(());
}
if self.event_manager.poll_unpin(cx).is_ready() {
tracing::warn!(
target: LOG_TARGET,
"event manager crashed",
);
return Poll::Ready(());
}
match self.transport_manager.poll_unpin(cx) {
Poll::Pending => {}
Poll::Ready(()) => return Poll::Ready(()),
}
Poll::Pending
}
}