use crate::{
config::BandwidthConfig,
crypto::{chachapoly::ChaChaPoly, EphemeralPublicKey},
error::{ChannelError, RoutingError},
i2np::{
garlic::{
DeliveryInstructions as CloveDeliveryInstructions, GarlicMessage, GarlicMessageBlock,
},
tunnel::{data::EncryptedTunnelData, gateway::TunnelGateway},
Message, MessageBuilder, MessageType,
},
primitives::{MessageId, RouterId, TunnelId},
runtime::{Gauge, MetricType, MetricsHandle, Runtime},
subsystem::{
bandwidth::{BandwidthTracker, Congestion, CongestionLevel},
metrics::*,
},
tunnel::{DeliveryInstructions, NoiseContext},
};
use futures::FutureExt;
use futures_channel::oneshot;
use hashbrown::HashMap;
use rand::{CryptoRng, Rng};
use thingbuf::mpsc::{channel, errors::TrySendError, with_recycle, Receiver, Sender};
use zeroize::Zeroize;
#[cfg(feature = "std")]
use parking_lot::RwLock;
#[cfg(feature = "no_std")]
use spin::rwlock::RwLock;
use alloc::{collections::VecDeque, format, sync::Arc, vec, vec::Vec};
use core::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
pub mod bandwidth;
mod metrics;
const LOG_TARGET: &str = "emissary::subsystem";
const NETDB_EVENT_QUEUE_LEN: usize = 512usize;
#[derive(Default, Debug, Clone)]
pub enum SubsystemEvent {
ConnectionEstablished {
router_id: RouterId,
tx: Sender<OutboundMessage, OutboundMessageRecycle>,
},
ConnectionClosed {
router_id: RouterId,
},
ConnectionFailure {
router_id: RouterId,
},
Message {
messages: Vec<(RouterId, Message)>,
},
#[default]
Dummy,
}
#[derive(Default, Debug, Clone)]
pub enum NetDbEvent {
ConnectionEstablished {
router_id: RouterId,
},
Message {
messages: Vec<(RouterId, Message)>,
},
#[default]
Dummy,
}
#[derive(Debug, Default, Clone)]
pub struct SubsystemManagerEventRecycle(());
impl thingbuf::Recycle<SubsystemManagerEvent> for SubsystemManagerEventRecycle {
fn new_element(&self) -> SubsystemManagerEvent {
SubsystemManagerEvent::Dummy
}
fn recycle(&self, element: &mut SubsystemManagerEvent) {
*element = SubsystemManagerEvent::Dummy;
}
}
#[derive(Debug, Default)]
pub enum OutboundMessage {
Message(Message),
#[allow(unused)]
Messages(Vec<Message>),
MessageWithFeedback(Message, oneshot::Sender<()>),
#[default]
Dummy,
}
impl OutboundMessage {
fn len(&self) -> usize {
match self {
Self::Message(message) => message.serialized_len_short(),
Self::MessageWithFeedback(message, _) => message.serialized_len_short(),
Self::Messages(messages) =>
messages.iter().fold(0, |total, message| total + message.serialized_len_short()),
Self::Dummy => unreachable!(),
}
}
}
#[derive(Default, Debug, Clone)]
pub struct OutboundMessageRecycle(());
impl thingbuf::Recycle<OutboundMessage> for OutboundMessageRecycle {
fn new_element(&self) -> OutboundMessage {
OutboundMessage::Dummy
}
fn recycle(&self, element: &mut OutboundMessage) {
*element = OutboundMessage::Dummy;
}
}
#[derive(Debug, Default, Clone, Copy)]
pub enum Source {
Transit,
Client,
Exploratory,
NetDb,
#[default]
Unknown,
}
impl Source {
fn is_transit(&self) -> bool {
matches!(self, Source::Transit)
}
}
#[derive(Debug, Default)]
pub enum SubsystemManagerEvent {
Message {
router_id: RouterId,
message: OutboundMessage,
source: Source,
},
#[default]
Dummy,
}
#[derive(Debug, Clone)]
pub struct SubsystemHandle {
event_tx: Sender<SubsystemManagerEvent, SubsystemManagerEventRecycle>,
listeners: Arc<RwLock<HashMap<MessageId, oneshot::Sender<Message>>>>,
congestion: Congestion,
source: Source,
tunnels: Arc<RwLock<HashMap<TunnelId, (Source, Sender<Message>)>>>,
}
impl SubsystemHandle {
pub fn congestion(&self) -> CongestionLevel {
self.congestion.load()
}
pub fn with_source(mut self, source: Source) -> Self {
self.source = source;
self
}
pub fn send(&self, router_id: &RouterId, message: Message) -> Result<(), ChannelError> {
self.event_tx
.try_send(SubsystemManagerEvent::Message {
router_id: router_id.clone(),
message: OutboundMessage::Message(message),
source: self.source,
})
.map_err(From::from)
}
#[allow(unused)]
pub fn send_many(
&self,
router_id: &RouterId,
messages: Vec<Message>,
) -> Result<(), ChannelError> {
self.event_tx
.try_send(SubsystemManagerEvent::Message {
router_id: router_id.clone(),
message: OutboundMessage::Messages(messages),
source: self.source,
})
.map_err(From::from)
}
pub fn send_with_feedback(
&self,
router_id: &RouterId,
message: Message,
feedback_tx: oneshot::Sender<()>,
) -> Result<(), ChannelError> {
self.event_tx
.try_send(SubsystemManagerEvent::Message {
router_id: router_id.clone(),
message: OutboundMessage::MessageWithFeedback(message, feedback_tx),
source: self.source,
})
.map_err(From::from)
}
pub fn insert_listener(
&self,
rng: &mut impl CryptoRng,
) -> (MessageId, oneshot::Receiver<Message>) {
let (tx, rx) = oneshot::channel();
let mut listeners = self.listeners.write();
loop {
let message_id = MessageId::from(rng.next_u32());
if !listeners.contains_key(&message_id) {
listeners.insert(message_id, tx);
return (message_id, rx);
}
}
}
pub fn insert_tunnel<const SIZE: usize>(
&self,
rng: &mut impl CryptoRng,
) -> (TunnelId, Receiver<Message>) {
let (tx, rx) = channel(SIZE);
let mut tunnels = self.tunnels.write();
loop {
let tunnel_id = TunnelId::from(rng.next_u32());
if !tunnels.contains_key(&tunnel_id) {
tunnels.insert(tunnel_id, (self.source, tx));
return (tunnel_id, rx);
}
}
}
pub fn remove_tunnel(&self, tunnel_id: &TunnelId) {
self.tunnels.write().remove(tunnel_id);
}
pub fn remove_listener(&self, message_id: &MessageId) {
self.listeners.write().remove(message_id);
}
pub fn try_insert_tunnel<const SIZE: usize>(
&self,
tunnel_id: TunnelId,
) -> Result<Receiver<Message>, RoutingError> {
let mut tunnels = self.tunnels.write();
match tunnels.contains_key(&tunnel_id) {
true => Err(RoutingError::TunnelExists(tunnel_id)),
false => {
let (tx, rx) = channel(SIZE);
tunnels.insert(tunnel_id, (self.source, tx));
Ok(rx)
}
}
}
#[cfg(test)]
pub fn new() -> (
Self,
Receiver<SubsystemManagerEvent, SubsystemManagerEventRecycle>,
) {
let (event_tx, event_rx) = with_recycle(100, SubsystemManagerEventRecycle::default());
(
Self {
event_tx,
listeners: Default::default(),
congestion: Default::default(),
tunnels: Default::default(),
source: Source::Unknown,
},
event_rx,
)
}
}
enum RouterState {
Dialing {
pending: Vec<OutboundMessage>,
},
Connected {
tx: Sender<OutboundMessage, OutboundMessageRecycle>,
},
}
pub struct SubsystemManagerContext<R: Runtime> {
pub congestion: Congestion,
pub dial_rx: Receiver<RouterId>,
pub handle: SubsystemHandle,
pub manager: SubsystemManager<R>,
pub netdb_rx: Receiver<NetDbEvent>,
pub transit_rx: Receiver<Vec<(RouterId, Message)>>,
pub transport_tx: Sender<SubsystemEvent>,
}
pub struct SubsystemManager<R: Runtime> {
bandwidth_tracker: BandwidthTracker<R>,
dial_tx: Sender<RouterId>,
event_rx: Receiver<SubsystemManagerEvent, SubsystemManagerEventRecycle>,
listeners: Arc<RwLock<HashMap<MessageId, oneshot::Sender<Message>>>>,
netdb_tx: Sender<NetDbEvent>,
noise: NoiseContext,
pending_netdb_events: VecDeque<NetDbEvent>,
router_id: RouterId,
routers: HashMap<RouterId, RouterState>,
transit_tx: Sender<Vec<(RouterId, Message)>>,
transport_rx: Receiver<SubsystemEvent>,
tunnels: Arc<RwLock<HashMap<TunnelId, (Source, Sender<Message>)>>>,
}
impl<R: Runtime> SubsystemManager<R> {
pub fn new(
router_id: RouterId,
noise: NoiseContext,
config: BandwidthConfig,
metrics_handle: R::MetricsHandle,
) -> SubsystemManagerContext<R> {
assert!(config.share_ratio <= 1.0);
tracing::info!(
target: LOG_TARGET,
bandwidth = config.bandwidth,
share_ratio = %format!("{}%", config.share_ratio * 100.0),
"starting SubsystemManager",
);
metrics_handle.gauge(MAX_BANDWIDTH).increment(config.bandwidth);
metrics_handle
.gauge(MAX_TRANSIT_BANDWIDTH)
.increment((config.bandwidth as f64 * config.share_ratio) as usize);
let (event_tx, event_rx) = with_recycle(8192, SubsystemManagerEventRecycle::default());
let (transit_tx, transit_rx) = channel(256);
let (netdb_tx, netdb_rx) = channel(256);
let (dial_tx, dial_rx) = channel(256);
let (transport_tx, transport_rx) = channel(256);
let listeners = Arc::new(RwLock::new(HashMap::new()));
let tunnels = Arc::new(RwLock::new(HashMap::new()));
let (bandwidth_tracker, congestion_short, congestion_medium) =
BandwidthTracker::new(config);
SubsystemManagerContext {
congestion: congestion_medium,
netdb_rx,
transit_rx,
transport_tx,
dial_rx,
manager: Self {
bandwidth_tracker,
dial_tx,
event_rx,
listeners: Arc::clone(&listeners),
netdb_tx,
noise,
pending_netdb_events: VecDeque::new(),
router_id,
routers: HashMap::new(),
transit_tx,
transport_rx,
tunnels: Arc::clone(&tunnels),
},
handle: SubsystemHandle {
event_tx,
listeners,
congestion: congestion_short,
tunnels,
source: Source::Unknown,
},
}
}
pub fn metrics(metrics: Vec<MetricType>) -> Vec<MetricType> {
register_metrics(metrics)
}
fn on_outbound_message(
&mut self,
router_id: RouterId,
message: OutboundMessage,
source: Source,
) {
if self.bandwidth_tracker.update_outbound(message.len(), source) {
return;
}
if router_id == self.router_id {
tracing::trace!(
target: LOG_TARGET,
?message,
"route message to self",
);
return match message {
OutboundMessage::Message(message) =>
self.on_inbound_message(vec![(router_id, message)]),
OutboundMessage::Messages(messages) => self.on_inbound_message(
messages.into_iter().map(|message| (router_id.clone(), message)).collect(),
),
OutboundMessage::MessageWithFeedback(message, feedback_tx) => {
self.on_inbound_message(vec![(router_id, message)]);
let _ = feedback_tx.send(());
}
OutboundMessage::Dummy => unreachable!(),
};
}
match self.routers.get_mut(&router_id) {
Some(RouterState::Dialing { pending }) => {
tracing::trace!(
target: LOG_TARGET,
%router_id,
num_pending = ?pending.len(),
"router is being dialed, queue message",
);
pending.push(message);
}
Some(RouterState::Connected { tx }) =>
if let Err(error) = tx.try_send(message) {
tracing::debug!(
target: LOG_TARGET,
%router_id,
?error,
"failed to send message to router",
);
},
None => match self.dial_tx.try_send(router_id.clone()) {
Ok(()) => {
tracing::debug!(
target: LOG_TARGET,
%router_id,
"started dialing router",
);
self.routers.insert(
router_id,
RouterState::Dialing {
pending: vec![message],
},
);
}
Err(error) => tracing::warn!(
target: LOG_TARGET,
%router_id,
?error,
"failed to send dial request to transport manager",
),
},
}
}
fn on_inbound_message(&mut self, messages: Vec<(RouterId, Message)>) {
let mut netdb = Vec::<(RouterId, Message)>::new();
let mut transit = Vec::<(RouterId, Message)>::new();
let mut tunnels = HashMap::<TunnelId, Vec<Message>>::new();
for (router_id, message) in messages {
match message.message_type {
MessageType::DatabaseStore
| MessageType::DatabaseLookup
| MessageType::DatabaseSearchReply
| MessageType::DeliveryStatus =>
if !self
.bandwidth_tracker
.update_inbound(message.serialized_len_long(), Source::NetDb)
{
netdb.push((router_id, message));
},
MessageType::Garlic =>
if let Some(messages) = self.on_garlic_message(message) {
let mut inbound = vec![];
let mut outbound = vec![];
for message in messages {
match message {
DeliveryInstructions::Local { message } => {
inbound.push((router_id.clone(), message));
}
DeliveryInstructions::Router { router, message }
| DeliveryInstructions::Tunnel {
router, message, ..
} => {
outbound.push((router, OutboundMessage::Message(message)));
}
DeliveryInstructions::Destination => {}
}
}
self.on_inbound_message(inbound);
outbound.into_iter().for_each(|(router_id, message)| {
self.on_outbound_message(router_id, message, Source::Unknown);
});
},
MessageType::TunnelData => {
if let Some(tunnel_id) = EncryptedTunnelData::parse(&message.payload)
.map(|message| message.tunnel_id())
{
tunnels.entry(tunnel_id).or_default().push(message);
}
}
MessageType::TunnelGateway => {
if let Some(tunnel_id) =
TunnelGateway::parse(&message.payload).map(|message| message.tunnel_id)
{
tunnels.entry(tunnel_id).or_default().push(message);
}
}
MessageType::VariableTunnelBuild
| MessageType::ShortTunnelBuild
| MessageType::OutboundTunnelBuildReply
| MessageType::TunnelBuild => {
if !self
.bandwidth_tracker
.update_inbound(message.serialized_len_short(), Source::Unknown)
{
if let Ok(Some(message)) = self.route_tunnel_build_message(message) {
transit.push((router_id.clone(), message));
}
}
}
MessageType::VariableTunnelBuildReply
| MessageType::Data
| MessageType::TunnelBuildReply => {
tracing::warn!(
target: LOG_TARGET,
message_type = ?message.message_type,
"unhandled message type",
);
self.bandwidth_tracker
.update_inbound(message.serialized_len_short(), Source::Unknown);
}
}
}
if !netdb.is_empty() {
self.route_netdb_event(NetDbEvent::Message { messages: netdb });
}
if !transit.is_empty() {
if let Err(error) = self.transit_tx.try_send(transit) {
tracing::warn!(
target: LOG_TARGET,
?error,
"failed to send i2np messages to transit tunnel manager",
);
}
}
if !tunnels.is_empty() {
let inner = self.tunnels.read();
for (tunnel_id, messages) in tunnels {
let Some((source, tunnel)) = inner.get(&tunnel_id) else {
tracing::debug!(
target: LOG_TARGET,
%tunnel_id,
"tunnel doesn't exist",
);
continue;
};
for message in messages {
if self
.bandwidth_tracker
.update_inbound(message.serialized_len_short(), *source)
{
continue;
}
if let Err(TrySendError::Closed(_)) = tunnel.try_send(message) {
tracing::warn!(
target: LOG_TARGET,
%tunnel_id,
"tunnel exist in the routing table but is closed",
);
}
}
}
}
}
fn route_tunnel_build_message(
&self,
message: Message,
) -> Result<Option<Message>, RoutingError> {
let mut listeners = self.listeners.write();
match listeners.remove(&MessageId::from(message.message_id)) {
Some(listener) => listener.send(message).map(|_| None).map_err(|message| {
tracing::warn!(
target: LOG_TARGET,
message_id = %message.message_id,
"listener exist in the routing table but is closed",
);
RoutingError::ChannelClosed(message)
}),
None => {
drop(listeners);
Ok(Some(message))
}
}
}
fn route_netdb_event(&mut self, event: NetDbEvent) {
let event = match self.pending_netdb_events.is_empty() {
true => match self.netdb_tx.try_send(event) {
Ok(()) => return,
Err(TrySendError::Full(event)) => event,
Err(error) => {
tracing::error!(
target: LOG_TARGET,
?error,
"failed to route event to netdb",
);
return;
}
},
false => event,
};
tracing::warn!(
target: LOG_TARGET,
"event queue to netdb is clogged",
);
if self.pending_netdb_events.len() < NETDB_EVENT_QUEUE_LEN {
self.pending_netdb_events.push_back(event);
return;
}
match self
.pending_netdb_events
.iter()
.position(|event| core::matches!(event, NetDbEvent::Message { .. }))
{
None => tracing::error!(
target: LOG_TARGET,
"event queue netdb is fully clogged, dropping event",
),
Some(index) => {
self.pending_netdb_events.remove(index);
self.pending_netdb_events.push_back(event);
}
}
}
fn on_garlic_message(&self, message: Message) -> Option<Vec<DeliveryInstructions>> {
let Message {
message_id,
expiration,
payload,
..
} = message;
tracing::trace!(
target: LOG_TARGET,
?message_id,
?expiration,
"garlic message",
);
if payload.len() < 36 {
tracing::warn!(
target: LOG_TARGET,
?message_id,
?expiration,
"garlic message is too short",
);
return None;
}
let message = {
let (mut cipher_key, associated_data) = self
.noise
.derive_inbound_garlic_key(EphemeralPublicKey::try_from_bytes(&payload[4..36])?);
let mut message = payload[36..].to_vec();
ChaChaPoly::new(&cipher_key)
.decrypt_with_ad(&associated_data, &mut message)
.ok()?;
cipher_key.zeroize();
message
};
let Ok(message) = GarlicMessage::parse(&message) else {
return None;
};
Some(
message
.blocks
.into_iter()
.filter_map(|block| match block {
GarlicMessageBlock::GarlicClove {
message_type,
message_id,
expiration,
delivery_instructions,
message_body,
} => {
if expiration < R::time_since_epoch() {
tracing::debug!(
target: LOG_TARGET,
?message_id,
?message_type,
?delivery_instructions,
"dropping expired i2np message",
);
return None;
}
match delivery_instructions {
CloveDeliveryInstructions::Local => Some(DeliveryInstructions::Local {
message: Message {
message_type,
message_id: *message_id,
expiration,
payload: message_body.to_vec(),
},
}),
CloveDeliveryInstructions::Router { hash } =>
Some(DeliveryInstructions::Router {
router: RouterId::from(hash),
message: Message {
message_type,
message_id: *message_id,
expiration,
payload: message_body.to_vec(),
},
}),
CloveDeliveryInstructions::Tunnel { hash, tunnel_id } => {
let message = MessageBuilder::standard()
.with_message_type(message_type)
.with_message_id(message_id)
.with_expiration(expiration)
.with_payload(message_body)
.build();
let message = TunnelGateway {
tunnel_id: tunnel_id.into(),
payload: &message,
}
.serialize();
Some(DeliveryInstructions::Tunnel {
tunnel: TunnelId::from(tunnel_id),
router: RouterId::from(hash),
message: Message {
message_type: MessageType::TunnelGateway,
message_id: R::rng().next_u32(),
expiration,
payload: message.to_vec(),
},
})
}
CloveDeliveryInstructions::Destination { hash } => {
tracing::warn!(
target: LOG_TARGET,
?hash,
"ignoring destination",
);
None
}
}
}
block => {
tracing::trace!(
target: LOG_TARGET,
?block,
"ignoring garlic block",
);
None
}
})
.collect::<Vec<_>>(),
)
}
}
impl<R: Runtime> Future for SubsystemManager<R> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let _ = self.bandwidth_tracker.poll_unpin(cx);
loop {
match self.event_rx.poll_recv(cx) {
Poll::Pending => break,
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(event)) => match event {
SubsystemManagerEvent::Message {
router_id,
message,
source,
} => self.on_outbound_message(router_id, message, source),
SubsystemManagerEvent::Dummy => unreachable!(),
},
}
}
loop {
match self.transport_rx.poll_recv(cx) {
Poll::Pending => break,
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(event)) => match event {
SubsystemEvent::ConnectionEstablished { router_id, tx } => {
tracing::trace!(
target: LOG_TARGET,
%router_id,
"connection opened",
);
self.route_netdb_event(NetDbEvent::ConnectionEstablished {
router_id: router_id.clone(),
});
if let Some(RouterState::Dialing { pending }) =
self.routers.remove(&router_id)
{
for message in pending {
if let Err(error) = tx.try_send(message) {
tracing::debug!(
target: LOG_TARGET,
%router_id,
?error,
"failed to send pending message",
);
}
}
}
self.routers.insert(router_id, RouterState::Connected { tx });
}
SubsystemEvent::ConnectionClosed { router_id } => {
tracing::trace!(
target: LOG_TARGET,
%router_id,
"connection closed",
);
self.routers.remove(&router_id);
}
SubsystemEvent::ConnectionFailure { router_id } => {
tracing::trace!(
target: LOG_TARGET,
%router_id,
"connection failure",
);
self.routers.remove(&router_id);
}
SubsystemEvent::Message { messages } => self.on_inbound_message(messages),
SubsystemEvent::Dummy => unreachable!(),
},
}
}
while let Some(event) = self.pending_netdb_events.pop_front() {
match self.netdb_tx.try_send(event) {
Ok(()) => {}
Err(TrySendError::Full(event)) => {
self.pending_netdb_events.push_front(event);
break;
}
Err(error) => tracing::warn!(
target: LOG_TARGET,
?error,
"failed to send pending event to netdb",
),
}
}
Poll::Pending
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
crypto::{EphemeralPrivateKey, StaticPrivateKey, StaticPublicKey},
i2np::{garlic::GarlicMessageBuilder, I2NP_MESSAGE_EXPIRATION},
runtime::mock::MockRuntime,
};
use bytes::{BufMut, Bytes, BytesMut};
use futures::FutureExt;
macro_rules! poll_manager {
($manager:ident) => {
futures::future::poll_fn(|cx| match $manager.poll_unpin(cx) {
Poll::Pending => return Poll::Ready(()),
Poll::Ready(_) => panic!("ready"),
})
.await;
};
}
#[tokio::test]
async fn inbound_router_connection_disconnection() {
let private_key = StaticPrivateKey::random(MockRuntime::rng());
let router_id = RouterId::random();
let SubsystemManagerContext {
netdb_rx: _netdb_rx,
mut manager,
handle: _handle,
transit_rx: _transit_rx,
transport_tx: tx,
..
} = SubsystemManager::<MockRuntime>::new(
router_id.clone(),
NoiseContext::new(private_key, Bytes::from(router_id.to_vec())),
Default::default(),
MockRuntime::register_metrics(vec![], None),
);
let (msg_tx, _msg_rx) = with_recycle(16, OutboundMessageRecycle::default());
let router = RouterId::random();
tx.send(SubsystemEvent::ConnectionEstablished {
router_id: router.clone(),
tx: msg_tx,
})
.await
.unwrap();
poll_manager!(manager);
assert!(manager.routers.contains_key(&router));
tx.send(SubsystemEvent::ConnectionClosed {
router_id: router.clone(),
})
.await
.unwrap();
poll_manager!(manager);
assert!(!manager.routers.contains_key(&router));
}
#[tokio::test]
async fn dial_router() {
let private_key = StaticPrivateKey::random(MockRuntime::rng());
let router_id = RouterId::random();
let SubsystemManagerContext {
netdb_rx: _netdb_rx,
mut manager,
handle,
transit_rx: _transit_rx,
transport_tx: tx,
dial_rx,
..
} = SubsystemManager::<MockRuntime>::new(
router_id.clone(),
NoiseContext::new(private_key, Bytes::from(router_id.to_vec())),
Default::default(),
MockRuntime::register_metrics(vec![], None),
);
let router_id = RouterId::random();
handle.send(&router_id, Message::default()).unwrap();
poll_manager!(manager);
match manager.routers.get(&router_id) {
Some(RouterState::Dialing { pending }) => {
assert_eq!(pending.len(), 1);
}
_ => panic!("invalid state"),
}
assert_eq!(dial_rx.try_recv(), Ok(router_id.clone()));
let (msg_tx, msg_rx) = with_recycle(16, OutboundMessageRecycle::default());
tx.send(SubsystemEvent::ConnectionEstablished {
router_id: router_id.clone(),
tx: msg_tx,
})
.await
.unwrap();
poll_manager!(manager);
match manager.routers.get(&router_id) {
Some(RouterState::Connected { .. }) => {}
_ => panic!("invalid state"),
}
assert!(msg_rx.try_recv().is_ok());
}
#[tokio::test]
async fn dial_fails() {
let private_key = StaticPrivateKey::random(MockRuntime::rng());
let router_id = RouterId::random();
let SubsystemManagerContext {
netdb_rx: _netdb_rx,
mut manager,
handle,
transit_rx: _transit_rx,
dial_rx,
transport_tx: tx,
..
} = SubsystemManager::<MockRuntime>::new(
router_id.clone(),
NoiseContext::new(private_key, Bytes::from(router_id.to_vec())),
Default::default(),
MockRuntime::register_metrics(vec![], None),
);
let (feedback_tx, feedback_rx) = oneshot::channel();
let router_id = RouterId::random();
handle.send(&router_id, Message::default()).unwrap();
handle.send_with_feedback(&router_id, Message::default(), feedback_tx).unwrap();
poll_manager!(manager);
match manager.routers.get(&router_id) {
Some(RouterState::Dialing { pending }) => {
assert_eq!(pending.len(), 2);
}
_ => panic!("invalid state"),
}
assert_eq!(dial_rx.try_recv(), Ok(router_id.clone()));
tx.send(SubsystemEvent::ConnectionFailure {
router_id: router_id.clone(),
})
.await
.unwrap();
poll_manager!(manager);
match manager.routers.get(&router_id) {
None => {}
_ => panic!("invalid state"),
}
assert!(feedback_rx.await.is_err());
}
#[tokio::test]
async fn dial_fails_with_feedback() {
let private_key = StaticPrivateKey::random(MockRuntime::rng());
let router_id = RouterId::random();
let SubsystemManagerContext {
netdb_rx: _netdb_rx,
mut manager,
handle,
transit_rx: _transit_rx,
transport_tx: tx,
dial_rx,
..
} = SubsystemManager::<MockRuntime>::new(
router_id.clone(),
NoiseContext::new(private_key, Bytes::from(router_id.to_vec())),
Default::default(),
MockRuntime::register_metrics(vec![], None),
);
let router_id = RouterId::random();
let (feedback_tx, feedback_rx) = oneshot::channel();
handle.send_with_feedback(&router_id, Message::default(), feedback_tx).unwrap();
poll_manager!(manager);
match manager.routers.get(&router_id) {
Some(RouterState::Dialing { pending }) => {
assert_eq!(pending.len(), 1);
}
_ => panic!("invalid state"),
}
assert_eq!(dial_rx.try_recv(), Ok(router_id.clone()));
tx.send(SubsystemEvent::ConnectionFailure {
router_id: router_id.clone(),
})
.await
.unwrap();
poll_manager!(manager);
match manager.routers.get(&router_id) {
None => {}
_ => panic!("invalid state"),
}
assert!(feedback_rx.await.is_err());
}
#[tokio::test]
async fn send_message_to_router() {
let private_key = StaticPrivateKey::random(MockRuntime::rng());
let router_id = RouterId::random();
let SubsystemManagerContext {
netdb_rx: _netdb_rx,
mut manager,
handle,
transit_rx: _transit_rx,
transport_tx: tx,
..
} = SubsystemManager::<MockRuntime>::new(
router_id.clone(),
NoiseContext::new(private_key, Bytes::from(router_id.to_vec())),
Default::default(),
MockRuntime::register_metrics(vec![], None),
);
let router_id = RouterId::random();
let (msg_tx, msg_rx) = with_recycle(16, OutboundMessageRecycle::default());
tx.send(SubsystemEvent::ConnectionEstablished {
router_id: router_id.clone(),
tx: msg_tx,
})
.await
.unwrap();
poll_manager!(manager);
match manager.routers.get(&router_id) {
Some(RouterState::Connected { .. }) => {}
_ => panic!("invalid state"),
}
handle
.send(
&router_id,
Message {
message_id: 1337u32,
..Default::default()
},
)
.unwrap();
handle
.send_many(
&router_id,
vec![
Message {
message_id: 1338u32,
..Default::default()
},
Message {
message_id: 1339u32,
..Default::default()
},
Message {
message_id: 1340u32,
..Default::default()
},
],
)
.unwrap();
let (feedback_tx, feedback_rx) = oneshot::channel();
handle
.send_with_feedback(
&router_id,
Message {
message_id: 1341u32,
..Default::default()
},
feedback_tx,
)
.unwrap();
poll_manager!(manager);
{
match msg_rx.try_recv().unwrap() {
OutboundMessage::Message(message) => assert_eq!(message.message_id, 1337),
_ => panic!("invalid message type"),
}
}
{
match msg_rx.try_recv().unwrap() {
OutboundMessage::Messages(messages) => assert!((1338u32..=1340)
.zip(messages.into_iter())
.all(|(id, message)| { message.message_id == id })),
_ => panic!("invalid message type"),
}
}
{
match msg_rx.try_recv().unwrap() {
OutboundMessage::MessageWithFeedback(message, feedback) => {
assert_eq!(message.message_id, 1341);
feedback.send(()).unwrap();
}
_ => panic!("invalid message type"),
}
}
assert!(feedback_rx.await.is_ok());
}
#[tokio::test]
async fn send_pending_messages_to_router() {
let private_key = StaticPrivateKey::random(MockRuntime::rng());
let router_id = RouterId::random();
let SubsystemManagerContext {
netdb_rx: _netdb_rx,
mut manager,
handle,
transit_rx: _transit_rx,
transport_tx: tx,
dial_rx,
..
} = SubsystemManager::<MockRuntime>::new(
router_id.clone(),
NoiseContext::new(private_key, Bytes::from(router_id.to_vec())),
Default::default(),
MockRuntime::register_metrics(vec![], None),
);
let router_id = RouterId::random();
handle
.send(
&router_id,
Message {
message_id: 1337u32,
..Default::default()
},
)
.unwrap();
handle
.send_many(
&router_id,
vec![
Message {
message_id: 1338u32,
..Default::default()
},
Message {
message_id: 1339u32,
..Default::default()
},
Message {
message_id: 1340u32,
..Default::default()
},
],
)
.unwrap();
let (feedback_tx, feedback_rx) = oneshot::channel();
handle
.send_with_feedback(
&router_id,
Message {
message_id: 1341u32,
..Default::default()
},
feedback_tx,
)
.unwrap();
poll_manager!(manager);
match manager.routers.get(&router_id) {
Some(RouterState::Dialing { pending }) => assert_eq!(pending.len(), 3),
_ => panic!("invalid state"),
}
assert_eq!(dial_rx.try_recv().unwrap(), router_id.clone());
let (msg_tx, msg_rx) = with_recycle(16, OutboundMessageRecycle::default());
tx.send(SubsystemEvent::ConnectionEstablished {
router_id: router_id.clone(),
tx: msg_tx,
})
.await
.unwrap();
poll_manager!(manager);
{
match msg_rx.try_recv().unwrap() {
OutboundMessage::Message(message) => assert_eq!(message.message_id, 1337),
_ => panic!("invalid message type"),
}
}
{
match msg_rx.try_recv().unwrap() {
OutboundMessage::Messages(messages) => assert!((1338u32..=1340)
.zip(messages.into_iter())
.all(|(id, message)| { message.message_id == id })),
_ => panic!("invalid message type"),
}
}
{
match msg_rx.try_recv().unwrap() {
OutboundMessage::MessageWithFeedback(message, feedback) => {
assert_eq!(message.message_id, 1341);
feedback.send(()).unwrap();
}
_ => panic!("invalid message type"),
}
}
assert!(feedback_rx.await.is_ok());
}
#[tokio::test]
async fn install_listener_through_handle() {
let private_key = StaticPrivateKey::random(MockRuntime::rng());
let router_id = RouterId::random();
let SubsystemManagerContext {
netdb_rx: _netdb_rx,
manager,
handle,
transit_rx: _transit_rx,
transport_tx: _tx,
..
} = SubsystemManager::<MockRuntime>::new(
router_id.clone(),
NoiseContext::new(private_key, Bytes::from(router_id.to_vec())),
Default::default(),
MockRuntime::register_metrics(vec![], None),
);
let handle_clone = handle.clone();
assert!(manager.listeners.read().is_empty());
assert!(handle.listeners.read().is_empty());
assert!(handle_clone.listeners.read().is_empty());
let (message_id, _rx) = handle.insert_listener(&mut MockRuntime::rng());
assert!(manager.listeners.read().contains_key(&message_id));
assert!(handle.listeners.read().contains_key(&message_id));
assert!(handle_clone.listeners.read().contains_key(&message_id));
}
#[tokio::test]
async fn no_active_listener_variable_tunnel_build() {
let private_key = StaticPrivateKey::random(MockRuntime::rng());
let router_id = RouterId::random();
let SubsystemManagerContext {
netdb_rx: _netdb_rx,
mut manager,
handle: _handle,
transit_rx,
transport_tx: tx,
..
} = SubsystemManager::<MockRuntime>::new(
router_id.clone(),
NoiseContext::new(private_key, Bytes::from(router_id.to_vec())),
Default::default(),
MockRuntime::register_metrics(vec![], None),
);
let message = Message {
message_type: MessageType::VariableTunnelBuild,
..Default::default()
};
tx.send(SubsystemEvent::Message {
messages: vec![(RouterId::random(), message)],
})
.await
.unwrap();
poll_manager!(manager);
let message = transit_rx.try_recv().unwrap();
assert_eq!(message[0].1.message_type, MessageType::VariableTunnelBuild);
}
#[tokio::test]
async fn no_listener_short_tunnel_build() {
let private_key = StaticPrivateKey::random(MockRuntime::rng());
let router_id = RouterId::random();
let SubsystemManagerContext {
netdb_rx: _netdb_rx,
mut manager,
handle: _handle,
transit_rx,
transport_tx: tx,
..
} = SubsystemManager::<MockRuntime>::new(
router_id.clone(),
NoiseContext::new(private_key, Bytes::from(router_id.to_vec())),
Default::default(),
MockRuntime::register_metrics(vec![], None),
);
let message = Message {
message_type: MessageType::ShortTunnelBuild,
..Default::default()
};
tx.send(SubsystemEvent::Message {
messages: vec![(RouterId::random(), message)],
})
.await
.unwrap();
poll_manager!(manager);
let message = transit_rx.try_recv().unwrap();
assert_eq!(message[0].1.message_type, MessageType::ShortTunnelBuild);
}
#[tokio::test]
async fn no_listener_outbound_tunnel_build_reply() {
let private_key = StaticPrivateKey::random(MockRuntime::rng());
let router_id = RouterId::random();
let SubsystemManagerContext {
netdb_rx: _netdb_rx,
mut manager,
handle: _handle,
transit_rx,
transport_tx: tx,
..
} = SubsystemManager::<MockRuntime>::new(
router_id.clone(),
NoiseContext::new(private_key, Bytes::from(router_id.to_vec())),
Default::default(),
MockRuntime::register_metrics(vec![], None),
);
let message = Message {
message_type: MessageType::OutboundTunnelBuildReply,
..Default::default()
};
tx.send(SubsystemEvent::Message {
messages: vec![(RouterId::random(), message)],
})
.await
.unwrap();
poll_manager!(manager);
let message = transit_rx.try_recv().unwrap();
assert_eq!(
message[0].1.message_type,
MessageType::OutboundTunnelBuildReply
);
}
#[tokio::test]
async fn active_listener_short_tunnel_build() {
let private_key = StaticPrivateKey::random(MockRuntime::rng());
let router_id = RouterId::random();
let SubsystemManagerContext {
netdb_rx: _netdb_rx,
mut manager,
handle,
transit_rx,
transport_tx: tx,
..
} = SubsystemManager::<MockRuntime>::new(
router_id.clone(),
NoiseContext::new(private_key, Bytes::from(router_id.to_vec())),
Default::default(),
MockRuntime::register_metrics(vec![], None),
);
let (message_id, rx) = handle.insert_listener(&mut MockRuntime::rng());
let message = Message {
message_type: MessageType::ShortTunnelBuild,
message_id: *message_id,
..Default::default()
};
tx.send(SubsystemEvent::Message {
messages: vec![(RouterId::random(), message)],
})
.await
.unwrap();
poll_manager!(manager);
assert!(transit_rx.try_recv().is_err());
let message = rx.await.unwrap();
assert_eq!(message.message_type, MessageType::ShortTunnelBuild);
assert_eq!(message.message_id, *message_id);
}
#[tokio::test]
async fn active_listener_outbound_tunnel_build_reply() {
let private_key = StaticPrivateKey::random(MockRuntime::rng());
let router_id = RouterId::random();
let SubsystemManagerContext {
netdb_rx: _netdb_rx,
mut manager,
handle,
transit_rx,
transport_tx: tx,
..
} = SubsystemManager::<MockRuntime>::new(
router_id.clone(),
NoiseContext::new(private_key, Bytes::from(router_id.to_vec())),
Default::default(),
MockRuntime::register_metrics(vec![], None),
);
let (message_id, rx) = handle.insert_listener(&mut MockRuntime::rng());
let message = Message {
message_type: MessageType::OutboundTunnelBuildReply,
message_id: *message_id,
..Default::default()
};
tx.send(SubsystemEvent::Message {
messages: vec![(RouterId::random(), message)],
})
.await
.unwrap();
poll_manager!(manager);
assert!(transit_rx.try_recv().is_err());
let message = rx.await.unwrap();
assert_eq!(message.message_type, MessageType::OutboundTunnelBuildReply);
assert_eq!(message.message_id, *message_id);
}
#[tokio::test]
async fn register_tunnel_through_handle() {
let private_key = StaticPrivateKey::random(MockRuntime::rng());
let router_id = RouterId::random();
let SubsystemManagerContext {
netdb_rx: _netdb_rx,
manager,
handle,
transit_rx: _transit_rx,
transport_tx: _tx,
..
} = SubsystemManager::<MockRuntime>::new(
router_id.clone(),
NoiseContext::new(private_key, Bytes::from(router_id.to_vec())),
Default::default(),
MockRuntime::register_metrics(vec![], None),
);
let (tunnel_id, _rx) = handle.insert_tunnel::<16>(&mut MockRuntime::rng());
assert!(handle.tunnels.read().contains_key(&tunnel_id));
assert!(manager.tunnels.read().contains_key(&tunnel_id));
handle.remove_tunnel(&tunnel_id);
assert!(!handle.tunnels.read().contains_key(&tunnel_id));
assert!(!manager.tunnels.read().contains_key(&tunnel_id));
}
#[tokio::test]
async fn route_tunnel_data() {
let private_key = StaticPrivateKey::random(MockRuntime::rng());
let router_id = RouterId::random();
let SubsystemManagerContext {
netdb_rx: _netdb_rx,
mut manager,
handle,
transit_rx: _transit_rx,
transport_tx: tx,
..
} = SubsystemManager::<MockRuntime>::new(
router_id.clone(),
NoiseContext::new(private_key, Bytes::from(router_id.to_vec())),
Default::default(),
MockRuntime::register_metrics(vec![], None),
);
let (tunnel_id, rx) = handle.insert_tunnel::<16>(&mut MockRuntime::rng());
let message = {
let mut data = BytesMut::with_capacity(4 + 16 + 1008);
data.put_u32(*tunnel_id);
data.put_slice(&[0xaa; 16]);
data.put_slice(&[0xbb; 1008]);
Message {
message_type: MessageType::TunnelData,
payload: data.to_vec(),
..Default::default()
}
};
tx.send(SubsystemEvent::Message {
messages: vec![(RouterId::random(), message)],
})
.await
.unwrap();
poll_manager!(manager);
let message = rx.try_recv().unwrap();
let message = EncryptedTunnelData::parse(&message.payload).unwrap();
assert_eq!(message.tunnel_id(), tunnel_id);
assert_eq!(message.iv(), &[0xaa; 16]);
assert_eq!(message.ciphertext(), &[0xbb; 1008]);
}
#[tokio::test]
async fn route_tunnel_gateway() {
let private_key = StaticPrivateKey::random(MockRuntime::rng());
let router_id = RouterId::random();
let SubsystemManagerContext {
netdb_rx: _netdb_rx,
mut manager,
handle,
transit_rx: _transit_rx,
transport_tx: tx,
..
} = SubsystemManager::<MockRuntime>::new(
router_id.clone(),
NoiseContext::new(private_key, Bytes::from(router_id.to_vec())),
Default::default(),
MockRuntime::register_metrics(vec![], None),
);
let (tunnel_id, rx) = handle.insert_tunnel::<16>(&mut MockRuntime::rng());
let message = {
let mut data = BytesMut::with_capacity(4 + 2 + 512);
data.put_u32(*tunnel_id);
data.put_u16(512);
data.put_slice(&[0xaa; 512]);
Message {
message_type: MessageType::TunnelGateway,
payload: data.to_vec(),
..Default::default()
}
};
tx.send(SubsystemEvent::Message {
messages: vec![(RouterId::random(), message)],
})
.await
.unwrap();
poll_manager!(manager);
let message = rx.try_recv().unwrap();
let message = TunnelGateway::parse(&message.payload).unwrap();
assert_eq!(message.tunnel_id, tunnel_id);
assert_eq!(message.payload, &[0xaa; 512]);
}
#[tokio::test]
async fn route_tunnel_data_of_non_existent_tunnel() {
let private_key = StaticPrivateKey::random(MockRuntime::rng());
let router_id = RouterId::random();
let SubsystemManagerContext {
netdb_rx,
mut manager,
handle: _handle,
transit_rx,
transport_tx: tx,
..
} = SubsystemManager::<MockRuntime>::new(
router_id.clone(),
NoiseContext::new(private_key, Bytes::from(router_id.to_vec())),
Default::default(),
MockRuntime::register_metrics(vec![], None),
);
let tunnel_id = TunnelId::random();
let message = {
let mut data = BytesMut::with_capacity(4 + 16 + 1008);
data.put_u32(*tunnel_id);
data.put_slice(&[0xaa; 16]);
data.put_slice(&[0xbb; 1008]);
Message {
message_type: MessageType::TunnelData,
payload: data.to_vec(),
..Default::default()
}
};
tx.send(SubsystemEvent::Message {
messages: vec![(RouterId::random(), message)],
})
.await
.unwrap();
poll_manager!(manager);
assert!(netdb_rx.try_recv().is_err());
assert!(transit_rx.try_recv().is_err());
}
#[tokio::test]
async fn route_tunnel_gateway_of_non_existent_tunnel() {
let private_key = StaticPrivateKey::random(MockRuntime::rng());
let router_id = RouterId::random();
let SubsystemManagerContext {
netdb_rx,
mut manager,
handle: _handle,
transit_rx,
transport_tx: tx,
..
} = SubsystemManager::<MockRuntime>::new(
router_id.clone(),
NoiseContext::new(private_key, Bytes::from(router_id.to_vec())),
Default::default(),
MockRuntime::register_metrics(vec![], None),
);
let tunnel_id = TunnelId::random();
let message = {
let mut data = BytesMut::with_capacity(4 + 2 + 512);
data.put_u32(*tunnel_id);
data.put_u16(512);
data.put_slice(&[0xaa; 512]);
Message {
message_type: MessageType::TunnelGateway,
payload: data.to_vec(),
..Default::default()
}
};
tx.send(SubsystemEvent::Message {
messages: vec![(RouterId::random(), message)],
})
.await
.unwrap();
poll_manager!(manager);
assert!(netdb_rx.try_recv().is_err());
assert!(transit_rx.try_recv().is_err());
}
#[tokio::test]
#[should_panic]
#[ignore] async fn route_message_to_closed_tunnel() {
let private_key = StaticPrivateKey::random(MockRuntime::rng());
let router_id = RouterId::random();
let SubsystemManagerContext {
netdb_rx: _netdb_rx,
mut manager,
handle,
transit_rx: _transit_rx,
transport_tx: tx,
..
} = SubsystemManager::<MockRuntime>::new(
router_id.clone(),
NoiseContext::new(private_key, Bytes::from(router_id.to_vec())),
Default::default(),
MockRuntime::register_metrics(vec![], None),
);
let (tunnel_id, rx) = handle.insert_tunnel::<16>(&mut MockRuntime::rng());
drop(rx);
let message = {
let mut data = BytesMut::with_capacity(4 + 16 + 1008);
data.put_u32(*tunnel_id);
data.put_slice(&[0xaa; 16]);
data.put_slice(&[0xbb; 1008]);
Message {
message_type: MessageType::TunnelData,
payload: data.to_vec(),
..Default::default()
}
};
tx.send(SubsystemEvent::Message {
messages: vec![(RouterId::random(), message)],
})
.await
.unwrap();
poll_manager!(manager);
}
fn make_garlic_message_with_noise_context(
noise: &NoiseContext,
remote_key: StaticPublicKey,
cloves: Vec<(MessageType, MessageId, CloveDeliveryInstructions, &[u8])>,
) -> Message {
let mut message = GarlicMessageBuilder::default()
.with_date_time(MockRuntime::time_since_epoch().as_secs() as u32);
for (msg_type, msg_id, delivery, payload) in cloves {
message = message.with_garlic_clove(
msg_type,
msg_id,
MockRuntime::time_since_epoch() + I2NP_MESSAGE_EXPIRATION,
delivery,
payload,
);
}
let mut message = message.build();
let mut out = BytesMut::with_capacity(message.len() + 16 + 32 + 4);
let ephemeral_secret = EphemeralPrivateKey::random(MockRuntime::rng());
let ephemeral_public = ephemeral_secret.public();
let (local_key, local_state) =
noise.derive_outbound_garlic_key(remote_key, ephemeral_secret);
ChaChaPoly::new(&local_key)
.encrypt_with_ad_new(&local_state, &mut message)
.unwrap();
out.put_u32(message.len() as u32 + 32);
out.put_slice(&ephemeral_public.to_vec());
out.put_slice(&message);
Message {
message_type: MessageType::Garlic,
message_id: MockRuntime::rng().next_u32(),
expiration: MockRuntime::time_since_epoch() + I2NP_MESSAGE_EXPIRATION,
payload: out.to_vec(),
}
}
fn make_garlic_message(
remote_key: StaticPublicKey,
cloves: Vec<(MessageType, MessageId, CloveDeliveryInstructions, &[u8])>,
) -> Message {
let key = StaticPrivateKey::random(MockRuntime::rng());
let router_id = Bytes::from(RouterId::random().to_vec());
let noise = NoiseContext::new(key, router_id);
make_garlic_message_with_noise_context(&noise, remote_key, cloves)
}
#[tokio::test]
async fn outbound_garlic_message_triggers_a_dial() {
let private_key = StaticPrivateKey::random(MockRuntime::rng());
let router_id = RouterId::random();
let SubsystemManagerContext {
netdb_rx: _netdb_rx,
mut manager,
handle: _handle,
transit_rx: _transit_rx,
transport_tx: tx,
dial_rx,
..
} = SubsystemManager::<MockRuntime>::new(
router_id.clone(),
NoiseContext::new(private_key.clone(), Bytes::from(router_id.to_vec())),
Default::default(),
MockRuntime::register_metrics(vec![], None),
);
let remote_router = RouterId::random();
let serialized = remote_router.to_vec();
let message = make_garlic_message(
private_key.public(),
vec![(
MessageType::ShortTunnelBuild,
MessageId::random(),
CloveDeliveryInstructions::Router { hash: &serialized },
&vec![1, 1, 1, 1],
)],
);
tx.send(SubsystemEvent::Message {
messages: vec![(RouterId::random(), message)],
})
.await
.unwrap();
poll_manager!(manager);
match manager.routers.get(&remote_router) {
Some(RouterState::Dialing { pending }) => {
assert_eq!(pending.len(), 1)
}
_ => panic!("invalid state"),
}
assert_eq!(dial_rx.try_recv().unwrap(), remote_router);
}
#[tokio::test]
async fn outbound_garlic_message_pending_dial() {
let private_key = StaticPrivateKey::random(MockRuntime::rng());
let router_id = RouterId::random();
let SubsystemManagerContext {
netdb_rx: _netdb_rx,
mut manager,
handle: _handle,
transit_rx: _transit_rx,
transport_tx: tx,
dial_rx,
..
} = SubsystemManager::<MockRuntime>::new(
router_id.clone(),
NoiseContext::new(private_key.clone(), Bytes::from(router_id.to_vec())),
Default::default(),
MockRuntime::register_metrics(vec![], None),
);
let remote_router = RouterId::random();
let serialized = remote_router.to_vec();
let message = make_garlic_message(
private_key.public(),
vec![(
MessageType::ShortTunnelBuild,
MessageId::random(),
CloveDeliveryInstructions::Router { hash: &serialized },
&vec![1, 1, 1, 1],
)],
);
tx.send(SubsystemEvent::Message {
messages: vec![(RouterId::random(), message)],
})
.await
.unwrap();
poll_manager!(manager);
match manager.routers.get(&remote_router) {
Some(RouterState::Dialing { pending }) => {
assert_eq!(pending.len(), 1)
}
_ => panic!("invalid state"),
}
assert_eq!(dial_rx.try_recv().unwrap(), remote_router);
let message = make_garlic_message(
private_key.public(),
vec![(
MessageType::VariableTunnelBuild,
MessageId::random(),
CloveDeliveryInstructions::Router { hash: &serialized },
&vec![2, 2, 2, 2],
)],
);
tx.send(SubsystemEvent::Message {
messages: vec![(RouterId::random(), message)],
})
.await
.unwrap();
poll_manager!(manager);
match manager.routers.get(&remote_router) {
Some(RouterState::Dialing { pending }) => {
assert_eq!(pending.len(), 2)
}
_ => panic!("invalid state"),
}
}
#[tokio::test]
async fn outbound_garlic_message_for_tunnel() {
let private_key = StaticPrivateKey::random(MockRuntime::rng());
let router_id = RouterId::random();
let SubsystemManagerContext {
netdb_rx: _netdb_rx,
mut manager,
handle: _handle,
transit_rx: _transit_rx,
transport_tx: tx,
..
} = SubsystemManager::<MockRuntime>::new(
router_id.clone(),
NoiseContext::new(private_key.clone(), Bytes::from(router_id.to_vec())),
Default::default(),
MockRuntime::register_metrics(vec![], None),
);
let (msg_tx, msg_rx) = with_recycle(16, OutboundMessageRecycle::default());
let remote_router = RouterId::random();
let serialized = remote_router.to_vec();
tx.send(SubsystemEvent::ConnectionEstablished {
router_id: remote_router.clone(),
tx: msg_tx,
})
.await
.unwrap();
poll_manager!(manager);
let tunnel_id = TunnelId::random();
let message = make_garlic_message(
private_key.public(),
vec![(
MessageType::TunnelData,
MessageId::random(),
CloveDeliveryInstructions::Tunnel {
hash: &serialized,
tunnel_id: *tunnel_id,
},
&vec![2, 2, 2, 2],
)],
);
tx.send(SubsystemEvent::Message {
messages: vec![(RouterId::random(), message)],
})
.await
.unwrap();
poll_manager!(manager);
let message = msg_rx.try_recv().unwrap();
match message {
OutboundMessage::Message(message) => {
assert_eq!(message.message_type, MessageType::TunnelGateway);
let inner = TunnelGateway::parse(&message.payload).unwrap();
assert_eq!(inner.tunnel_id, tunnel_id);
let inner = Message::parse_standard(&inner.payload).unwrap();
assert_eq!(inner.message_type, MessageType::TunnelData);
assert_eq!(inner.payload, vec![2, 2, 2, 2]);
}
_ => panic!("invalid type"),
}
}
#[tokio::test]
async fn outbound_garlic_message_for_router() {
let private_key = StaticPrivateKey::random(MockRuntime::rng());
let router_id = RouterId::random();
let SubsystemManagerContext {
netdb_rx: _netdb_rx,
mut manager,
handle: _handle,
transit_rx: _transit_rx,
transport_tx: tx,
..
} = SubsystemManager::<MockRuntime>::new(
router_id.clone(),
NoiseContext::new(private_key.clone(), Bytes::from(router_id.to_vec())),
Default::default(),
MockRuntime::register_metrics(vec![], None),
);
let (msg_tx, msg_rx) = with_recycle(16, OutboundMessageRecycle::default());
let remote_router = RouterId::random();
let serialized = remote_router.to_vec();
tx.send(SubsystemEvent::ConnectionEstablished {
router_id: remote_router.clone(),
tx: msg_tx,
})
.await
.unwrap();
poll_manager!(manager);
let message = make_garlic_message(
private_key.public(),
vec![(
MessageType::ShortTunnelBuild,
MessageId::random(),
CloveDeliveryInstructions::Router { hash: &serialized },
&vec![3, 3, 3, 3],
)],
);
tx.send(SubsystemEvent::Message {
messages: vec![(RouterId::random(), message)],
})
.await
.unwrap();
poll_manager!(manager);
let message = msg_rx.try_recv().unwrap();
match message {
OutboundMessage::Message(message) => {
assert_eq!(message.message_type, MessageType::ShortTunnelBuild);
assert_eq!(message.payload, vec![3, 3, 3, 3]);
}
_ => panic!("invalid type"),
}
}
#[tokio::test]
async fn multiple_inbound_cloves() {
let private_key = StaticPrivateKey::random(MockRuntime::rng());
let router_id = RouterId::random();
let SubsystemManagerContext {
netdb_rx,
mut manager,
handle,
transit_rx,
transport_tx: tx,
..
} = SubsystemManager::<MockRuntime>::new(
router_id.clone(),
NoiseContext::new(private_key.clone(), Bytes::from(router_id.to_vec())),
Default::default(),
MockRuntime::register_metrics(vec![], None),
);
let (message_id, mut listener_rx) = handle.insert_listener(&mut MockRuntime::rng());
let (tunnel_id, tunnel_rx) = handle.insert_tunnel::<16>(&mut MockRuntime::rng());
let message = {
let mut data = BytesMut::with_capacity(4 + 16 + 1008);
data.put_u32(*tunnel_id);
data.put_slice(&[0xaa; 16]);
data.put_slice(&[0xbb; 1008]);
data.to_vec()
};
let message = make_garlic_message(
private_key.public(),
vec![
(
MessageType::DatabaseStore,
MessageId::random(),
CloveDeliveryInstructions::Local,
&vec![1, 1, 1, 1],
),
(
MessageType::ShortTunnelBuild,
message_id,
CloveDeliveryInstructions::Local,
&vec![1, 1, 1, 1],
),
(
MessageType::VariableTunnelBuild,
MessageId::random(),
CloveDeliveryInstructions::Local,
&vec![1, 1, 1, 1],
),
(
MessageType::TunnelData,
MessageId::random(),
CloveDeliveryInstructions::Local,
&message,
),
],
);
tx.send(SubsystemEvent::Message {
messages: vec![(RouterId::random(), message)],
})
.await
.unwrap();
poll_manager!(manager);
{
match netdb_rx.try_recv().unwrap() {
NetDbEvent::Message { messages } => {
assert_eq!(
messages[0].clone().1.message_type,
MessageType::DatabaseStore
);
}
_ => panic!("invalid event"),
}
}
{
let message = transit_rx.try_recv().unwrap();
assert_eq!(
message[0].clone().1.message_type,
MessageType::VariableTunnelBuild
);
}
{
let message = tunnel_rx.try_recv().unwrap();
assert_eq!(message.message_type, MessageType::TunnelData);
}
{
let message = listener_rx.try_recv().unwrap().unwrap();
assert_eq!(message.message_type, MessageType::ShortTunnelBuild);
}
}
#[tokio::test]
async fn inbound_and_outbound_cloves() {
let private_key = StaticPrivateKey::random(MockRuntime::rng());
let router_id = RouterId::random();
let SubsystemManagerContext {
netdb_rx,
mut manager,
handle,
transit_rx: _transit_rx,
transport_tx: tx,
dial_rx,
..
} = SubsystemManager::<MockRuntime>::new(
router_id.clone(),
NoiseContext::new(private_key.clone(), Bytes::from(router_id.to_vec())),
Default::default(),
MockRuntime::register_metrics(vec![], None),
);
let (msg_tx, msg_rx) = with_recycle(16, OutboundMessageRecycle::default());
let connected_router = RouterId::random();
let unconnected_router = RouterId::random();
let serialized_connected = connected_router.to_vec();
let serialized_unconnected = unconnected_router.to_vec();
tx.send(SubsystemEvent::ConnectionEstablished {
router_id: connected_router.clone(),
tx: msg_tx,
})
.await
.unwrap();
poll_manager!(manager);
assert!(netdb_rx.try_recv().is_ok());
let (tunnel_id, tunnel_rx) = handle.insert_tunnel::<16>(&mut MockRuntime::rng());
let remote_tunnel_id = TunnelId::random();
let message = {
let mut data = BytesMut::with_capacity(4 + 16 + 1008);
data.put_u32(*tunnel_id);
data.put_slice(&[0xaa; 16]);
data.put_slice(&[0xbb; 1008]);
data.to_vec()
};
let message = make_garlic_message(
private_key.public(),
vec![
(
MessageType::DatabaseStore,
MessageId::random(),
CloveDeliveryInstructions::Local,
&vec![1, 1, 1, 1],
),
(
MessageType::TunnelData,
MessageId::random(),
CloveDeliveryInstructions::Local,
&message,
),
(
MessageType::ShortTunnelBuild,
MessageId::random(),
CloveDeliveryInstructions::Router {
hash: &serialized_unconnected,
},
&vec![3, 3, 3, 3],
),
(
MessageType::TunnelData,
MessageId::random(),
CloveDeliveryInstructions::Tunnel {
hash: &serialized_connected,
tunnel_id: *remote_tunnel_id,
},
&vec![2, 2, 2, 2],
),
],
);
tx.send(SubsystemEvent::Message {
messages: vec![(RouterId::random(), message)],
})
.await
.unwrap();
poll_manager!(manager);
{
let message = msg_rx.try_recv().unwrap();
match message {
OutboundMessage::Message(message) => {
assert_eq!(message.message_type, MessageType::TunnelGateway);
let inner = TunnelGateway::parse(&message.payload).unwrap();
assert_eq!(inner.tunnel_id, remote_tunnel_id);
let inner = Message::parse_standard(&inner.payload).unwrap();
assert_eq!(inner.message_type, MessageType::TunnelData);
assert_eq!(inner.payload, vec![2, 2, 2, 2]);
}
_ => panic!("invalid type"),
}
}
{
assert_eq!(dial_rx.try_recv().unwrap(), unconnected_router);
match manager.routers.get(&unconnected_router) {
Some(RouterState::Dialing { pending }) => {
assert_eq!(pending.len(), 1);
}
_ => panic!("invalid state"),
}
}
{
match netdb_rx.try_recv().unwrap() {
NetDbEvent::Message { messages } => {
assert_eq!(
messages[0].clone().1.message_type,
MessageType::DatabaseStore
);
}
_ => panic!("invalid event"),
}
}
{
let message = tunnel_rx.try_recv().unwrap();
assert_eq!(message.message_type, MessageType::TunnelData);
}
}
#[tokio::test]
#[ignore]
async fn recursive_inbound_garlic_message() {
let private_key = StaticPrivateKey::random(MockRuntime::rng());
let router_id = RouterId::random();
let SubsystemManagerContext {
netdb_rx,
mut manager,
handle: _handle,
transit_rx: _transit_rx,
transport_tx: tx,
..
} = SubsystemManager::<MockRuntime>::new(
router_id.clone(),
NoiseContext::new(private_key.clone(), Bytes::from(router_id.to_vec())),
Default::default(),
MockRuntime::register_metrics(vec![], None),
);
let noise = NoiseContext::new(
StaticPrivateKey::random(MockRuntime::rng()),
Bytes::from(RouterId::random().to_vec()),
);
let message = make_garlic_message_with_noise_context(
&noise,
private_key.public(),
vec![(
MessageType::DatabaseStore,
1337.into(),
CloveDeliveryInstructions::Local,
&vec![1, 1, 1, 1],
)],
)
.serialize_short();
let message = make_garlic_message_with_noise_context(
&noise,
private_key.public(),
vec![(
MessageType::Garlic,
1338.into(),
CloveDeliveryInstructions::Local,
&message,
)],
);
tx.send(SubsystemEvent::Message {
messages: vec![(RouterId::random(), message)],
})
.await
.unwrap();
poll_manager!(manager);
{
match netdb_rx.try_recv().unwrap() {
NetDbEvent::Message { messages } => {
assert_eq!(
messages[0].clone().1.message_type,
MessageType::DatabaseStore
);
}
_ => panic!("invalid event"),
}
}
}
#[tokio::test]
async fn route_message_locally() {
let private_key = StaticPrivateKey::random(MockRuntime::rng());
let router_id = RouterId::random();
let SubsystemManagerContext {
netdb_rx: _netdb_rx,
mut manager,
handle,
transit_rx: _transit_rx,
transport_tx: _tx,
..
} = SubsystemManager::<MockRuntime>::new(
router_id.clone(),
NoiseContext::new(private_key, Bytes::from(router_id.to_vec())),
Default::default(),
MockRuntime::register_metrics(vec![], None),
);
let (tunnel_id, tunnel_rx) = handle.insert_tunnel::<16>(&mut MockRuntime::rng());
let message = {
let mut data = BytesMut::with_capacity(4 + 16 + 1008);
data.put_u32(*tunnel_id);
data.put_slice(&[0xaa; 16]);
data.put_slice(&[0xbb; 1008]);
data.to_vec()
};
handle
.send(
&router_id,
Message {
message_type: MessageType::TunnelData,
payload: message,
..Default::default()
},
)
.unwrap();
poll_manager!(manager);
let message = tunnel_rx.try_recv().unwrap();
let message = EncryptedTunnelData::parse(&message.payload).unwrap();
assert_eq!(message.iv(), &[0xaa; 16]);
assert_eq!(message.ciphertext(), &[0xbb; 1008]);
}
}