use crate::{
config::TransitConfig,
crypto::{chachapoly::ChaChaPoly, EphemeralPublicKey},
error::TunnelError,
events::EventHandle,
i2np::{
garlic::{DeliveryInstructions, GarlicMessage, GarlicMessageBuilder},
tunnel::{
build::{short, variable},
gateway::TunnelGateway,
},
HopRole, Message, MessageBuilder, MessageType, I2NP_MESSAGE_EXPIRATION,
},
primitives::{RouterId, TunnelId},
router::context::RouterContext,
runtime::{Counter, JoinSet, MetricsHandle, Runtime},
shutdown::ShutdownHandle,
subsystem::{bandwidth::CongestionLevel, SubsystemHandle},
tunnel::{
metrics::*,
noise::TunnelKeys,
transit::{inbound::InboundGateway, outbound::OutboundEndpoint, participant::Participant},
},
Error,
};
use bytes::{BufMut, BytesMut};
use futures::{
future::{select, Either},
FutureExt, StreamExt,
};
use futures_channel::oneshot;
use rand::Rng;
use thingbuf::mpsc::Receiver;
use alloc::{string::ToString, vec::Vec};
use core::{
future::Future,
ops::{Range, RangeFrom},
pin::{pin, Pin},
task::{Context, Poll},
time::Duration,
};
mod inbound;
mod outbound;
mod participant;
const LOG_TARGET: &str = "emissary::tunnel::transit::manager";
const SHORT_RECORD_LEN: usize = 218;
const VARIABLE_RECORD_LEN: usize = 528;
const PUBLIC_KEY_OFFSET: Range<usize> = 16..48;
const RECORD_START_OFFSET: RangeFrom<usize> = 48..;
const TUNNEL_CHANNEL_SIZE: usize = 64usize;
const TRANSIT_TUNNEL_EXPIRATION: Duration = Duration::from_secs(10 * 60 + 20);
const TERMINATION_TIMEOUT: Duration = Duration::from_secs(2 * 60);
pub trait TransitTunnel<R: Runtime>: Future<Output = TunnelId> + Send {
fn new(
tunnel_id: TunnelId,
next_tunnel_id: TunnelId,
next_router: RouterId,
tunnel_keys: TunnelKeys,
subsystem_handle: SubsystemHandle,
metrics_handle: R::MetricsHandle,
message_rx: Receiver<Message>,
event_handle: EventHandle<R>,
) -> Self;
}
pub struct TransitTunnelManager<R: Runtime> {
config: Option<TransitConfig>,
event_handle: EventHandle<R>,
message_rx: Receiver<Vec<(RouterId, Message)>>,
router_ctx: RouterContext<R>,
subsystem_handle: SubsystemHandle,
shutdown_handle: ShutdownHandle,
tunnels: R::JoinSet<Result<TunnelId, TunnelId>>,
}
impl<R: Runtime> TransitTunnelManager<R> {
pub fn new(
config: Option<TransitConfig>,
router_ctx: RouterContext<R>,
subsystem_handle: SubsystemHandle,
message_rx: Receiver<Vec<(RouterId, Message)>>,
shutdown_handle: ShutdownHandle,
) -> Self {
match &config {
Some(TransitConfig { max_tunnels }) => tracing::info!(
target: LOG_TARGET,
max_tunnels = %max_tunnels.map_or(
"unlimited".to_string(),
|max_tunnels| max_tunnels.to_string(),
),
"starting transit tunnel manager",
),
None => tracing::info!(
target: LOG_TARGET,
"starting transit tunnel manager, transit tunnels disabled",
),
}
Self {
config,
event_handle: router_ctx.event_handle().clone(),
message_rx,
router_ctx,
subsystem_handle,
shutdown_handle,
tunnels: R::join_set(),
}
}
fn can_accept_transit_tunnel(&self) -> bool {
if self.shutdown_handle.is_shutting_down() {
tracing::debug!(
target: LOG_TARGET,
num_tunnels = ?self.tunnels.len(),
"router is shutting down, cannot accept transit tunnel",
);
return false;
}
let Some(config) = &self.config else {
tracing::trace!(
target: LOG_TARGET,
"transit tunnels have been disabled, cannot accept transit tunnel",
);
return false;
};
match config.max_tunnels {
Some(max_tunnels) if max_tunnels <= self.tunnels.len() => {
tracing::debug!(
target: LOG_TARGET,
?max_tunnels,
num_tunnels = ?self.tunnels.len(),
"number of transit tunnels already at maximum, cannot accept transit tunnel",
);
false
}
_ => true,
}
}
fn find_local_record<'a, const RECORD_SIZE: usize>(
&self,
payload: &'a mut [u8],
) -> Option<(usize, &'a mut [u8])> {
(payload.len() > RECORD_SIZE && (payload.len() - 1).is_multiple_of(RECORD_SIZE))
.then(|| {
payload[1..].chunks_mut(RECORD_SIZE).enumerate().find(|(_, chunk)| {
chunk[..16] == self.router_ctx.noise().local_router_hash()[..16]
})
})
.flatten()
}
pub fn handle_variable_tunnel_build(
&mut self,
message: Message,
) -> crate::Result<(RouterId, Message, Option<oneshot::Sender<()>>)> {
self.router_ctx.metrics_handle().counter(NUM_BUILD_REQUESTS).increment(1);
let Message {
message_id,
expiration,
mut payload,
..
} = message;
let (_, record) = self
.find_local_record::<VARIABLE_RECORD_LEN>(&mut payload)
.ok_or(Error::Tunnel(TunnelError::RecordNotFound))?;
let mut session = self.router_ctx.noise().create_long_inbound_session(
EphemeralPublicKey::try_from_bytes(&record[PUBLIC_KEY_OFFSET])
.ok_or(Error::InvalidData)?,
);
let decrypted_record =
session.decrypt_build_record(record[RECORD_START_OFFSET].to_vec())?;
let build_record =
variable::TunnelBuildRecord::parse(&decrypted_record).map_err(|error| {
tracing::warn!(
target: LOG_TARGET,
?message_id,
?error,
"malformed variable tunnel build request",
);
Error::InvalidData
})?;
let role = build_record.role();
let tunnel_id = build_record.tunnel_id();
let next_tunnel_id = build_record.next_tunnel_id();
let next_message_id = build_record.next_message_id();
let next_router = build_record.next_router();
tracing::trace!(
target: LOG_TARGET,
?role,
%tunnel_id,
%next_router,
"variable tunnel build request",
);
let has_capacity = match self.subsystem_handle.congestion() {
CongestionLevel::Low => true,
CongestionLevel::Medium => R::rng().next_u64().is_multiple_of(2),
CongestionLevel::High => false,
};
if !has_capacity {
tracing::trace!(
target: LOG_TARGET,
%tunnel_id,
congestion = ?self.subsystem_handle.congestion(),
"rejecting variable tunnel build request due to congestion",
);
}
let maybe_receiver = if self.can_accept_transit_tunnel()
&& core::matches!(role, HopRole::OutboundEndpoint)
&& has_capacity
{
match self.subsystem_handle.try_insert_tunnel::<TUNNEL_CHANNEL_SIZE>(tunnel_id) {
Ok(receiver) => Some(receiver),
Err(error) => {
tracing::warn!(
target: LOG_TARGET,
%tunnel_id,
?error,
"tunnel already exists in routing table, rejecting",
);
None
}
}
} else {
None
};
let maybe_feedback_tx = match maybe_receiver {
None => {
self.router_ctx
.metrics_handle()
.counter(NUM_TRANSIT_TUNNELS_REJECTED)
.increment(1);
record[48] = 0x00; record[49] = 0x00;
record[511] = 30;
session.encrypt_build_record(record)?;
None
}
Some(receiver) => {
self.router_ctx
.metrics_handle()
.counter(NUM_TRANSIT_TUNNELS_ACCEPTED)
.increment(1);
record[48] = 0x00; record[49] = 0x00;
record[511] = 0x00;
session.encrypt_build_record(record)?;
let subsystem_handle = self.subsystem_handle.clone();
let metrics = self.router_ctx.metrics_handle().clone();
let next_router_id = next_router.clone();
let tunnel_keys = session.finalize(
build_record.tunnel_layer_key().to_vec(),
build_record.tunnel_iv_key().to_vec(),
)?;
let (tx, rx) = oneshot::channel::<()>();
let event_handle = self.router_ctx.event_handle().clone();
tracing::debug!(
target: LOG_TARGET,
%tunnel_id,
?role,
"start transit tunnel"
);
match role {
HopRole::InboundGateway => self.tunnels.push(async move {
match select(rx, pin!(R::delay(Duration::from_secs(2 * 60)))).await {
Either::Left((Ok(_), _)) => {}
Either::Left((Err(_), _)) => return Err(tunnel_id),
Either::Right(_) => {
tracing::warn!(
target: LOG_TARGET,
%tunnel_id,
"failed to receive dial result after 2 minutes",
);
debug_assert!(false);
return Err(tunnel_id);
}
}
Ok(InboundGateway::<R>::new(
tunnel_id,
next_tunnel_id,
next_router_id,
tunnel_keys,
subsystem_handle,
metrics,
receiver,
event_handle,
)
.await)
}),
HopRole::Participant => self.tunnels.push(async move {
match select(rx, pin!(R::delay(Duration::from_secs(2 * 60)))).await {
Either::Left((Ok(_), _)) => {}
Either::Left((Err(_), _)) => return Err(tunnel_id),
Either::Right(_) => {
tracing::warn!(
target: LOG_TARGET,
%tunnel_id,
"failed to receive dial result after 2 minutes",
);
debug_assert!(false);
return Err(tunnel_id);
}
}
Ok(Participant::<R>::new(
tunnel_id,
next_tunnel_id,
next_router_id,
tunnel_keys,
subsystem_handle,
metrics,
receiver,
event_handle,
)
.await)
}),
HopRole::OutboundEndpoint => self.tunnels.push(async move {
match select(rx, pin!(R::delay(Duration::from_secs(2 * 60)))).await {
Either::Left((Ok(_), _)) => {}
Either::Left((Err(_), _)) => return Err(tunnel_id),
Either::Right(_) => {
tracing::warn!(
target: LOG_TARGET,
%tunnel_id,
"failed to receive dial result after 2 minutes",
);
debug_assert!(false);
return Err(tunnel_id);
}
}
Ok(OutboundEndpoint::<R>::new(
tunnel_id,
next_tunnel_id,
next_router_id,
tunnel_keys,
subsystem_handle,
metrics,
receiver,
event_handle,
)
.await)
}),
}
Some(tx)
}
};
match role {
HopRole::InboundGateway | HopRole::Participant => {
let message = Message {
message_type: MessageType::VariableTunnelBuild,
message_id: *next_message_id,
expiration,
payload,
};
Ok((next_router, message, maybe_feedback_tx))
}
HopRole::OutboundEndpoint => {
let message = MessageBuilder::standard()
.with_message_type(MessageType::VariableTunnelBuildReply)
.with_message_id(next_message_id)
.with_expiration(expiration)
.with_payload(&payload)
.build();
let msg = TunnelGateway {
tunnel_id: next_tunnel_id,
payload: &message,
}
.serialize();
let message = Message {
message_type: MessageType::TunnelGateway,
message_id: *next_message_id,
expiration,
payload: msg,
};
Ok((next_router, message, maybe_feedback_tx))
}
}
}
pub fn handle_short_tunnel_build(
&mut self,
message: Message,
) -> crate::Result<(RouterId, Message, Option<oneshot::Sender<()>>)> {
self.router_ctx.metrics_handle().counter(NUM_BUILD_REQUESTS).increment(1);
let Message {
message_id,
expiration,
mut payload,
..
} = message;
let (record_idx, record) = self
.find_local_record::<SHORT_RECORD_LEN>(&mut payload)
.ok_or(Error::Tunnel(TunnelError::RecordNotFound))?;
let mut session = self.router_ctx.noise().create_short_inbound_session(
EphemeralPublicKey::try_from_bytes(&record[PUBLIC_KEY_OFFSET])
.ok_or(Error::InvalidData)?,
);
let decrypted_record =
session.decrypt_build_record(record[RECORD_START_OFFSET].to_vec())?;
let build_record = short::TunnelBuildRecord::parse(&decrypted_record).map_err(|error| {
tracing::debug!(
target: LOG_TARGET,
?message_id,
?error,
"malformed short tunnel build request",
);
Error::InvalidData
})?;
let role = build_record.role();
let tunnel_id = build_record.tunnel_id();
let next_tunnel_id = build_record.next_tunnel_id();
let next_message_id = build_record.next_message_id();
let next_router = build_record.next_router();
tracing::trace!(
target: LOG_TARGET,
?role,
%tunnel_id,
%next_router,
"short tunnel build request",
);
let has_capacity = match self.subsystem_handle.congestion() {
CongestionLevel::Low => true,
CongestionLevel::Medium => R::rng().next_u64().is_multiple_of(2),
CongestionLevel::High => false,
};
if !has_capacity {
tracing::trace!(
target: LOG_TARGET,
%tunnel_id,
congestion = ?self.subsystem_handle.congestion(),
"rejecting short tunnel build request due to congestion",
);
}
let maybe_receiver = match self.can_accept_transit_tunnel() && has_capacity {
false => None,
true => match self.subsystem_handle.try_insert_tunnel::<TUNNEL_CHANNEL_SIZE>(tunnel_id)
{
Ok(receiver) => Some(receiver),
Err(error) => {
tracing::warn!(
target: LOG_TARGET,
%tunnel_id,
?error,
"tunnel already exists in routing table, rejecting",
);
None
}
},
};
let (garlic_key, garlic_tag, maybe_feedback_tx) = match maybe_receiver {
None => {
self.router_ctx
.metrics_handle()
.counter(NUM_TRANSIT_TUNNELS_REJECTED)
.increment(1);
record[48] = 0x00; record[49] = 0x00;
record[201] = 30;
session.create_tunnel_keys(role)?;
session.encrypt_build_records(&mut payload, record_idx)?;
match role {
HopRole::OutboundEndpoint => {
let tunnel_keys = session.finalize()?;
(
Some(tunnel_keys.garlic_key()),
Some(tunnel_keys.garlic_tag()),
None,
)
}
_ => (None, None, None),
}
}
Some(receiver) => {
self.router_ctx
.metrics_handle()
.counter(NUM_TRANSIT_TUNNELS_ACCEPTED)
.increment(1);
record[48] = 0x00; record[49] = 0x00;
record[201] = 0x00;
session.create_tunnel_keys(role)?;
session.encrypt_build_records(&mut payload, record_idx)?;
let subsystem_handle = self.subsystem_handle.clone();
let metrics = self.router_ctx.metrics_handle().clone();
let next_router_id = next_router.clone();
let tunnel_keys = session.finalize()?;
let (tx, rx) = oneshot::channel::<()>();
let event_handle = self.router_ctx.event_handle().clone();
tracing::debug!(
target: LOG_TARGET,
%tunnel_id,
?role,
"start transit tunnel"
);
match role {
HopRole::InboundGateway => {
self.tunnels.push(async move {
match select(rx, pin!(R::delay(Duration::from_secs(2 * 60)))).await {
Either::Left((Ok(_), _)) => {}
Either::Left((Err(_), _)) => return Err(tunnel_id),
Either::Right(_) => {
tracing::warn!(
target: LOG_TARGET,
%tunnel_id,
"failed to receive dial result after 2 minutes",
);
debug_assert!(false);
return Err(tunnel_id);
}
}
Ok(InboundGateway::<R>::new(
tunnel_id,
next_tunnel_id,
next_router_id,
tunnel_keys,
subsystem_handle,
metrics,
receiver,
event_handle,
)
.await)
});
(None, None, Some(tx))
}
HopRole::Participant => {
self.tunnels.push(async move {
match select(rx, pin!(R::delay(Duration::from_secs(2 * 60)))).await {
Either::Left((Ok(_), _)) => {}
Either::Left((Err(_), _)) => return Err(tunnel_id),
Either::Right(_) => {
tracing::warn!(
target: LOG_TARGET,
%tunnel_id,
"failed to receive dial result after 2 minutes",
);
debug_assert!(false);
return Err(tunnel_id);
}
}
Ok(Participant::<R>::new(
tunnel_id,
next_tunnel_id,
next_router_id,
tunnel_keys,
subsystem_handle,
metrics,
receiver,
event_handle,
)
.await)
});
(None, None, Some(tx))
}
HopRole::OutboundEndpoint => {
let garlic_key = tunnel_keys.garlic_key();
let garlic_tag = tunnel_keys.garlic_tag();
self.tunnels.push(async move {
match select(rx, pin!(R::delay(Duration::from_secs(2 * 60)))).await {
Either::Left((Ok(_), _)) => {}
Either::Left((Err(_), _)) => return Err(tunnel_id),
Either::Right(_) => {
tracing::warn!(
target: LOG_TARGET,
%tunnel_id,
"failed to receive dial result after 2 minutes",
);
debug_assert!(false);
return Err(tunnel_id);
}
}
Ok(OutboundEndpoint::<R>::new(
tunnel_id,
next_tunnel_id,
next_router_id,
tunnel_keys,
subsystem_handle,
metrics,
receiver,
event_handle,
)
.await)
});
(Some(garlic_key), Some(garlic_tag), Some(tx))
}
}
}
};
match role {
HopRole::InboundGateway | HopRole::Participant => {
let msg = Message {
message_type: MessageType::ShortTunnelBuild,
message_id: *next_message_id,
expiration,
payload,
};
Ok((next_router, msg, maybe_feedback_tx))
}
HopRole::OutboundEndpoint => {
let garlic_key = garlic_key.expect("to exist");
let garlic_tag = garlic_tag.expect("to exist");
let mut message = GarlicMessageBuilder::default()
.with_garlic_clove(
MessageType::OutboundTunnelBuildReply,
next_message_id,
R::time_since_epoch() + I2NP_MESSAGE_EXPIRATION,
DeliveryInstructions::Local,
&payload,
)
.with_date_time(R::time_since_epoch().as_secs() as u32)
.build();
let mut out = BytesMut::with_capacity(message.len() + 16 + 8 + 4);
ChaChaPoly::new(&garlic_key)
.encrypt_with_ad_new(&garlic_tag, &mut message)
.expect("to succeed");
out.put_u32(message.len() as u32 + 8);
out.put_slice(&garlic_tag);
out.put_slice(&message);
let message = MessageBuilder::standard()
.with_expiration(R::time_since_epoch() + I2NP_MESSAGE_EXPIRATION)
.with_message_type(MessageType::Garlic)
.with_message_id(next_message_id)
.with_payload(&out)
.build();
let msg = TunnelGateway {
tunnel_id: next_tunnel_id,
payload: &message,
}
.serialize();
let message = Message {
message_type: MessageType::TunnelGateway,
message_id: *next_message_id,
expiration,
payload: msg,
};
Ok((next_router, message, maybe_feedback_tx))
}
}
}
}
impl<R: Runtime> Future for TransitTunnelManager<R> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
while let Poll::Ready(event) = self.message_rx.poll_recv(cx) {
let Some(messages) = event else {
return Poll::Ready(());
};
for (router_id, message) in messages {
let result = match message.message_type {
MessageType::ShortTunnelBuild => self.handle_short_tunnel_build(message),
MessageType::VariableTunnelBuild => self.handle_variable_tunnel_build(message),
MessageType::Garlic => {
tracing::warn!(
target: LOG_TARGET,
%router_id,
parsed = ?GarlicMessage::parse(&message.payload[..12]),
"garlic message received to obep",
);
continue;
}
message_type => {
tracing::warn!(
target: LOG_TARGET,
%router_id,
?message_type,
"unsupported message type",
);
continue;
}
};
match result {
Ok((router, message, maybe_feedback_tx)) => match maybe_feedback_tx {
None =>
if let Err(error) = self.subsystem_handle.send(&router, message) {
tracing::error!(target: LOG_TARGET, ?error, "failed to send message");
},
Some(tx) => {
if let Err(error) =
self.subsystem_handle.send_with_feedback(&router, message, tx)
{
tracing::error!(target: LOG_TARGET, ?error, "failed to send message");
}
}
},
Err(error) => tracing::debug!(
target: LOG_TARGET,
?error,
"failed to handle message",
),
}
}
}
if self.shutdown_handle.poll_unpin(cx).is_ready() {
tracing::info!(
target: LOG_TARGET,
"graceful shutdown requested",
);
if self.tunnels.is_empty() {
self.shutdown_handle.shutdown();
return Poll::Ready(());
} else {
tracing::info!(
target: LOG_TARGET,
num_tunnels = ?self.tunnels.len(),
"waiting for transit tunnels to expire",
);
}
}
while let Poll::Ready(event) = self.tunnels.poll_next_unpin(cx) {
let Some(result) = event else {
return Poll::Ready(());
};
match result {
Ok(tunnel_id) => {
tracing::debug!(
target: LOG_TARGET,
%tunnel_id,
"transit tunnel expired",
);
}
Err(tunnel_id) => {
tracing::debug!(
target: LOG_TARGET,
%tunnel_id,
"failed to dial next hop, unable to start transit tunnel",
);
}
}
if self.tunnels.is_empty() && self.shutdown_handle.is_shutting_down() {
tracing::info!(
target: LOG_TARGET,
"shutting down",
);
self.shutdown_handle.shutdown();
return Poll::Ready(());
}
}
if self.event_handle.poll_unpin(cx).is_ready() {
self.router_ctx.event_handle().num_transit_tunnels(self.tunnels.len());
}
Poll::Pending
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
crypto::{StaticPrivateKey, StaticPublicKey},
events::EventManager,
primitives::{MessageId, Str},
profile::ProfileStorage,
runtime::mock::MockRuntime,
shutdown::ShutdownContext,
subsystem::SubsystemHandle,
tunnel::{
garlic::{DeliveryInstructions as GarlicDeliveryInstructions, GarlicHandler},
hop::{
inbound::InboundTunnel, outbound::OutboundTunnel, pending::PendingTunnel,
ReceiverKind, TunnelBuildParameters, TunnelInfo,
},
pool::TunnelPoolBuildParameters,
tests::make_router,
},
};
use bytes::Bytes;
use rand::Rng;
use thingbuf::mpsc::{channel, Sender};
#[tokio::test]
async fn accept_tunnel_build_request_participant() {
let handle = MockRuntime::register_metrics(vec![], None);
let (_event_mgr, _event_subscriber, event_handle) =
EventManager::new(None, MockRuntime::register_metrics(vec![], None));
let (hops, mut transit_managers): (
Vec<(Bytes, StaticPublicKey, ShutdownContext<MockRuntime>)>,
Vec<TransitTunnelManager<MockRuntime>>,
) = (0..3)
.map(|_| make_router(true))
.into_iter()
.map(
|(router_hash, static_key, signing_key, _noise_context, router_info)| {
let (_transit_tx, transit_rx) = channel(16);
let mut shutdown_ctx = ShutdownContext::<MockRuntime>::new();
let shutdown_handle = shutdown_ctx.handle();
let (subsys_handle, _event_rx) = SubsystemHandle::new();
(
(router_hash, static_key.public(), shutdown_ctx),
TransitTunnelManager::new(
Some(TransitConfig {
max_tunnels: Some(5000),
}),
RouterContext::new(
handle.clone(),
ProfileStorage::new(&[], &[]),
router_info.identity.id(),
Bytes::from(router_info.serialize(&signing_key)),
static_key,
signing_key,
2u8,
event_handle.clone(),
),
subsys_handle,
transit_rx,
shutdown_handle,
),
)
},
)
.unzip();
let (local_hash, _local_sk, _, local_noise, _) = make_router(true);
let message_id = MessageId::from(MockRuntime::rng().next_u32());
let tunnel_id = TunnelId::from(MockRuntime::rng().next_u32());
let gateway = TunnelId::from(MockRuntime::rng().next_u32());
let (hops, _handles): (Vec<_>, Vec<_>) = hops
.into_iter()
.map(|(router_id, public_key, context)| ((router_id, public_key), context))
.unzip();
let (_pending_tunnel, _next_router, message) =
PendingTunnel::<_, OutboundTunnel<MockRuntime>>::create_tunnel(TunnelBuildParameters {
hops: hops.clone(),
metrics_handle: MockRuntime::register_metrics(vec![], None),
name: Str::from("tunnel-pool"),
noise: local_noise,
message_id,
tunnel_info: TunnelInfo::Outbound {
gateway,
tunnel_id,
router_id: local_hash,
},
receiver: ReceiverKind::Outbound,
})
.unwrap();
assert!(transit_managers[0].handle_short_tunnel_build(message).is_ok());
}
#[tokio::test]
async fn accept_tunnel_build_request_ibgw() {
let (_event_mgr, _event_subscriber, event_handle) =
EventManager::new(None, MockRuntime::register_metrics(vec![], None));
let handle = MockRuntime::register_metrics(vec![], None);
let (hops, mut transit_managers): (
Vec<(Bytes, StaticPublicKey, ShutdownContext<MockRuntime>)>,
Vec<(
GarlicHandler<MockRuntime>,
TransitTunnelManager<MockRuntime>,
)>,
) = (0..3)
.map(|_| make_router(true))
.into_iter()
.map(
|(router_hash, static_key, signing_key, noise_context, router_info)| {
let (_transit_tx, transit_rx) = channel(16);
let mut shutdown_ctx = ShutdownContext::<MockRuntime>::new();
let shutdown_handle = shutdown_ctx.handle();
let (subsys_handle, _event_rx) = SubsystemHandle::new();
(
(router_hash, static_key.public(), shutdown_ctx),
(
GarlicHandler::new(noise_context.clone(), handle.clone()),
TransitTunnelManager::new(
Some(TransitConfig {
max_tunnels: Some(5000),
}),
RouterContext::new(
handle.clone(),
ProfileStorage::new(&[], &[]),
router_info.identity.id(),
Bytes::from(router_info.serialize(&signing_key)),
static_key,
signing_key,
2u8,
event_handle.clone(),
),
subsys_handle,
transit_rx,
shutdown_handle,
),
),
)
},
)
.unzip();
let (local_hash, _local_sk, _, local_noise, _) = make_router(true);
let message_id = MessageId::from(MockRuntime::rng().next_u32());
let tunnel_id = TunnelId::from(MockRuntime::rng().next_u32());
let TunnelPoolBuildParameters {
context_handle: handle,
..
} = TunnelPoolBuildParameters::new(Default::default());
let (_tx, rx) = channel(64);
let (hops, _handles): (Vec<_>, Vec<_>) = hops
.into_iter()
.map(|(router_id, public_key, context)| ((router_id, public_key), context))
.unzip();
let (_pending_tunnel, _next_router, message) =
PendingTunnel::<_, InboundTunnel<MockRuntime>>::create_tunnel(TunnelBuildParameters {
hops: hops.clone(),
metrics_handle: MockRuntime::register_metrics(vec![], None),
name: Str::from("tunnel-pool"),
noise: local_noise,
message_id,
tunnel_info: TunnelInfo::Inbound {
tunnel_id,
router_id: local_hash,
},
receiver: ReceiverKind::Inbound {
message_rx: rx,
handle,
},
})
.unwrap();
let message = match transit_managers[0].0.handle_message(message).unwrap().next() {
Some(GarlicDeliveryInstructions::Local { message }) => message,
_ => panic!("invalid delivery instructions"),
};
assert!(transit_managers[0].1.handle_short_tunnel_build(message).is_ok());
}
#[tokio::test]
async fn accept_tunnel_build_request_obep() {
let (_event_mgr, _event_subscriber, event_handle) =
EventManager::new(None, MockRuntime::register_metrics(vec![], None));
let handle = MockRuntime::register_metrics(vec![], None);
let (hops, mut transit_managers): (
Vec<(Bytes, StaticPublicKey, ShutdownContext<MockRuntime>)>,
Vec<TransitTunnelManager<MockRuntime>>,
) = (0..3)
.map(|_| make_router(true))
.into_iter()
.map(
|(router_hash, static_key, signing_key, _noise_context, router_info)| {
let (_transit_tx, transit_rx) = channel(16);
let mut shutdown_ctx = ShutdownContext::<MockRuntime>::new();
let shutdown_handle = shutdown_ctx.handle();
let (subsys_handle, _event_rx) = SubsystemHandle::new();
(
(router_hash, static_key.public(), shutdown_ctx),
TransitTunnelManager::new(
Some(TransitConfig {
max_tunnels: Some(5000),
}),
RouterContext::new(
handle.clone(),
ProfileStorage::new(&[], &[]),
router_info.identity.id(),
Bytes::from(router_info.serialize(&signing_key)),
static_key,
signing_key,
2u8,
event_handle.clone(),
),
subsys_handle,
transit_rx,
shutdown_handle,
),
)
},
)
.unzip();
let (local_hash, _local_pk, _, local_noise, _) = make_router(true);
let message_id = MessageId::from(MockRuntime::rng().next_u32());
let tunnel_id = TunnelId::from(MockRuntime::rng().next_u32());
let gateway = TunnelId::from(MockRuntime::rng().next_u32());
let (hops, _handles): (Vec<_>, Vec<_>) = hops
.into_iter()
.map(|(router_id, public_key, context)| ((router_id, public_key), context))
.unzip();
let (pending_tunnel, _next_router, message) =
PendingTunnel::<_, OutboundTunnel<MockRuntime>>::create_tunnel(TunnelBuildParameters {
hops: hops.clone(),
metrics_handle: MockRuntime::register_metrics(vec![], None),
name: Str::from("tunnel-pool"),
noise: local_noise.clone(),
message_id,
tunnel_info: TunnelInfo::Outbound {
gateway,
tunnel_id,
router_id: local_hash,
},
receiver: ReceiverKind::Outbound,
})
.unwrap();
let message = (0..transit_managers.len() - 1).fold(message, |message, i| {
transit_managers[i].handle_short_tunnel_build(message).unwrap().1
});
let (_, msg, _) = transit_managers[2].handle_short_tunnel_build(message).unwrap();
let Message {
message_type,
payload,
..
} = msg;
assert_eq!(message_type, MessageType::TunnelGateway);
let TunnelGateway {
tunnel_id: recv_tunnel_id,
payload,
} = TunnelGateway::parse(&payload).unwrap();
assert_eq!(TunnelId::from(recv_tunnel_id), gateway);
let message = Message::parse_standard(&payload).unwrap();
assert_eq!(message.message_type, MessageType::Garlic);
pending_tunnel.try_build_tunnel(message).unwrap();
}
#[tokio::test]
async fn local_record_not_found() {
let (_event_mgr, _event_subscriber, event_handle) =
EventManager::new(None, MockRuntime::register_metrics(vec![], None));
let handle = MockRuntime::register_metrics(vec![], None);
let (hops, _transit_managers): (
Vec<(Bytes, StaticPublicKey, ShutdownContext<MockRuntime>)>,
Vec<TransitTunnelManager<MockRuntime>>,
) = (0..3)
.map(|_| make_router(true))
.into_iter()
.map(
|(router_hash, static_key, signing_key, _noise_context, router_info)| {
let (_transit_tx, transit_rx) = channel(16);
let mut shutdown_ctx = ShutdownContext::<MockRuntime>::new();
let shutdown_handle = shutdown_ctx.handle();
let (subsys_handle, _event_rx) = SubsystemHandle::new();
(
(router_hash, static_key.public(), shutdown_ctx),
TransitTunnelManager::new(
Some(TransitConfig {
max_tunnels: Some(5000),
}),
RouterContext::new(
handle.clone(),
ProfileStorage::new(&[], &[]),
router_info.identity.id(),
Bytes::from(router_info.serialize(&signing_key)),
static_key,
signing_key,
2u8,
event_handle.clone(),
),
subsys_handle,
transit_rx,
shutdown_handle,
),
)
},
)
.unzip();
let (local_hash, _local_pk, _, local_noise, _) = make_router(true);
let message_id = MessageId::from(MockRuntime::rng().next_u32());
let tunnel_id = TunnelId::from(MockRuntime::rng().next_u32());
let gateway = TunnelId::from(MockRuntime::rng().next_u32());
let (hops, _handles): (Vec<_>, Vec<_>) = hops
.into_iter()
.map(|(router_id, public_key, context)| ((router_id, public_key), context))
.unzip();
let (_pending_tunnel, _next_router, message) =
PendingTunnel::<_, OutboundTunnel<MockRuntime>>::create_tunnel(TunnelBuildParameters {
hops: hops.clone(),
metrics_handle: MockRuntime::register_metrics(vec![], None),
name: Str::from("tunnel-pool"),
noise: local_noise,
message_id,
tunnel_info: TunnelInfo::Outbound {
gateway,
tunnel_id,
router_id: local_hash.clone(),
},
receiver: ReceiverKind::Outbound,
})
.unwrap();
let (_, static_key, signing_key, _noise, router_info) = make_router(true);
let (_transit_tx, transit_rx) = channel(16);
let (subsys_handle, _event_rx) = SubsystemHandle::new();
let mut shutdown_ctx = ShutdownContext::<MockRuntime>::new();
let shutdown_handle = shutdown_ctx.handle();
let (_event_mgr, _event_subscriber, event_handle) =
EventManager::new(None, MockRuntime::register_metrics(vec![], None));
let mut transit_manager = TransitTunnelManager::<MockRuntime>::new(
None,
RouterContext::new(
handle.clone(),
ProfileStorage::new(&[], &[]),
router_info.identity.id(),
Bytes::from(router_info.serialize(&signing_key)),
static_key,
signing_key,
2u8,
event_handle.clone(),
),
subsys_handle,
transit_rx,
shutdown_handle,
);
match transit_manager.handle_short_tunnel_build(message).unwrap_err() {
Error::Tunnel(TunnelError::RecordNotFound) => {}
error => panic!("invalid error: {error:?}"),
}
}
#[tokio::test]
async fn invalid_public_key_used() {
let handle = MockRuntime::register_metrics(vec![], None);
let (_event_mgr, _event_subscriber, event_handle) =
EventManager::new(None, MockRuntime::register_metrics(vec![], None));
let (mut hops, mut transit_managers): (
Vec<(Bytes, StaticPublicKey, ShutdownContext<MockRuntime>)>,
Vec<TransitTunnelManager<MockRuntime>>,
) = (0..3)
.map(|_| make_router(true))
.into_iter()
.map(|(router_hash, static_key, signing_key, _, router_info)| {
let (_transit_tx, transit_rx) = channel(16);
let mut shutdown_ctx = ShutdownContext::<MockRuntime>::new();
let shutdown_handle = shutdown_ctx.handle();
let (subsys_handle, _event_rx) = SubsystemHandle::new();
(
(router_hash, static_key.public(), shutdown_ctx),
TransitTunnelManager::new(
Some(TransitConfig {
max_tunnels: Some(5000),
}),
RouterContext::new(
handle.clone(),
ProfileStorage::new(&[], &[]),
router_info.identity.id(),
Bytes::from(router_info.serialize(&signing_key)),
static_key,
signing_key,
2u8,
event_handle.clone(),
),
subsys_handle,
transit_rx,
shutdown_handle,
),
)
})
.unzip();
let (local_hash, _local_pk, _, local_noise, _) = make_router(true);
let message_id = MessageId::from(MockRuntime::rng().next_u32());
let tunnel_id = TunnelId::from(MockRuntime::rng().next_u32());
let gateway = TunnelId::from(MockRuntime::rng().next_u32());
let new_pubkey = {
let mut key_bytes = [0u8; 32];
MockRuntime::rng().fill_bytes(&mut key_bytes);
let key = StaticPrivateKey::try_from_bytes(&key_bytes).unwrap();
key.public()
};
hops[0].1 = new_pubkey;
let (hops, _handles): (Vec<_>, Vec<_>) = hops
.into_iter()
.map(|(router_id, public_key, context)| ((router_id, public_key), context))
.unzip();
let (_pending_tunnel, _next_router, message) =
PendingTunnel::<_, OutboundTunnel<MockRuntime>>::create_tunnel(TunnelBuildParameters {
hops: hops.clone(),
metrics_handle: MockRuntime::register_metrics(vec![], None),
name: Str::from("tunnel-pool"),
noise: local_noise,
message_id,
tunnel_info: TunnelInfo::Outbound {
gateway,
tunnel_id,
router_id: local_hash,
},
receiver: ReceiverKind::Outbound,
})
.unwrap();
match transit_managers[0].handle_short_tunnel_build(message).unwrap_err() {
Error::Chacha20Poly1305(_) => {}
error => panic!("invalid error: {error:?}"),
}
}
#[tokio::test]
async fn router_shutting_down_tunnel_rejected() {
let handle = MockRuntime::register_metrics(vec![], None);
let mut hops = Vec::<(Bytes, StaticPublicKey)>::new();
let mut ctxs = Vec::<ShutdownContext<MockRuntime>>::new();
let mut transit_managers = Vec::<TransitTunnelManager<MockRuntime>>::new();
let (_event_mgr, _event_subscriber, event_handle) =
EventManager::new(None, MockRuntime::register_metrics(vec![], None));
for i in 0..3 {
let (router_hash, static_key, signing_key, _, router_info) = make_router(true);
let (_transit_tx, transit_rx) = channel(16);
let mut shutdown_ctx = ShutdownContext::<MockRuntime>::new();
let mut shutdown_handle = shutdown_ctx.handle();
if i % 2 == 0 {
shutdown_ctx.shutdown();
tokio::time::timeout(Duration::from_secs(2), &mut shutdown_handle)
.await
.expect("no timeout");
}
let (subsys_handle, _event_rx) = SubsystemHandle::new();
hops.push((router_hash, static_key.public()));
ctxs.push(shutdown_ctx);
transit_managers.push(TransitTunnelManager::new(
Some(TransitConfig {
max_tunnels: Some(5000),
}),
RouterContext::new(
handle.clone(),
ProfileStorage::new(&[], &[]),
router_info.identity.id(),
Bytes::from(router_info.serialize(&signing_key)),
static_key,
signing_key,
2u8,
event_handle.clone(),
),
subsys_handle,
transit_rx,
shutdown_handle,
));
}
let (local_hash, _, _, local_noise, _) = make_router(true);
let message_id = MessageId::from(MockRuntime::rng().next_u32());
let tunnel_id = TunnelId::from(MockRuntime::rng().next_u32());
let gateway = TunnelId::from(MockRuntime::rng().next_u32());
let (pending_tunnel, _next_router, message) =
PendingTunnel::<_, OutboundTunnel<MockRuntime>>::create_tunnel(TunnelBuildParameters {
hops: hops.clone(),
metrics_handle: MockRuntime::register_metrics(vec![], None),
name: Str::from("tunnel-pool"),
noise: local_noise.clone(),
message_id,
tunnel_info: TunnelInfo::Outbound {
gateway,
tunnel_id,
router_id: local_hash,
},
receiver: ReceiverKind::Outbound,
})
.unwrap();
let message = (0..transit_managers.len() - 1).fold(message, |message, i| {
transit_managers[i].handle_short_tunnel_build(message).unwrap().1
});
let (_, msg, _) = transit_managers[2].handle_short_tunnel_build(message).unwrap();
let Message {
message_type,
payload,
..
} = msg;
assert_eq!(message_type, MessageType::TunnelGateway);
let TunnelGateway {
tunnel_id: recv_tunnel_id,
payload,
} = TunnelGateway::parse(&payload).unwrap();
assert_eq!(TunnelId::from(recv_tunnel_id), gateway);
let message = Message::parse_standard(&payload).unwrap();
assert_eq!(message.message_type, MessageType::Garlic);
match pending_tunnel.try_build_tunnel(message) {
Err(error) => {
assert_eq!(error[0].1, Some(Err(TunnelError::TunnelRejected(30))));
assert_eq!(error[1].1, Some(Ok(())));
assert_eq!(error[2].1, Some(Err(TunnelError::TunnelRejected(30))));
}
_ => panic!("invalid error"),
}
}
#[tokio::test(start_paused = true)]
async fn transit_manager_exits_after_all_tunnels_have_expired() {
let handle = MockRuntime::register_metrics(vec![], None);
let (_router_hash, static_key, signing_key, _noise_context, router_info) =
make_router(true);
let (_transit_tx, transit_rx) = channel(16);
let (subsys_handle, _event_rx) = SubsystemHandle::new();
let mut shutdown_ctx = ShutdownContext::<MockRuntime>::new();
let shutdown_handle = shutdown_ctx.handle();
let (_event_mgr, _event_subscriber, event_handle) =
EventManager::new(None, MockRuntime::register_metrics(vec![], None));
let mut transit_manager = TransitTunnelManager::<MockRuntime>::new(
None,
RouterContext::new(
handle.clone(),
ProfileStorage::new(&[], &[]),
router_info.identity.id(),
Bytes::from(router_info.serialize(&signing_key)),
static_key,
signing_key,
2u8,
event_handle.clone(),
),
subsys_handle,
transit_rx,
shutdown_handle,
);
let handle = tokio::spawn(async move {
transit_manager.tunnels.push(async move {
tokio::time::sleep(Duration::from_secs(10)).await;
Ok(TunnelId::random())
});
let _ = (&mut transit_manager).await;
});
shutdown_ctx.shutdown();
assert!(tokio::time::timeout(Duration::from_secs(5), &mut shutdown_ctx).await.is_err());
assert!(tokio::time::timeout(Duration::from_secs(10), &mut shutdown_ctx).await.is_ok());
assert!(handle.await.is_ok());
}
#[tokio::test(start_paused = true)]
async fn transit_tunnels_disabled() {
let handle = MockRuntime::register_metrics(vec![], None);
let mut hops = Vec::<(Bytes, StaticPublicKey)>::new();
let mut ctxs = Vec::<ShutdownContext<MockRuntime>>::new();
let mut transit_managers = Vec::<TransitTunnelManager<MockRuntime>>::new();
let (_event_mgr, _event_subscriber, event_handle) =
EventManager::new(None, MockRuntime::register_metrics(vec![], None));
for i in 0..3 {
let (router_hash, static_key, signing_key, _, router_info) = make_router(true);
let (_transit_tx, transit_rx) = channel(16);
let mut shutdown_ctx = ShutdownContext::<MockRuntime>::new();
let shutdown_handle = shutdown_ctx.handle();
let (subsys_handle, _event_rx) = SubsystemHandle::new();
hops.push((router_hash, static_key.public()));
ctxs.push(shutdown_ctx);
transit_managers.push(TransitTunnelManager::new(
if i == 0 {
None
} else {
Some(TransitConfig {
max_tunnels: Some(5000),
})
},
RouterContext::new(
handle.clone(),
ProfileStorage::new(&[], &[]),
router_info.identity.id(),
Bytes::from(router_info.serialize(&signing_key)),
static_key,
signing_key,
2u8,
event_handle.clone(),
),
subsys_handle,
transit_rx,
shutdown_handle,
));
}
let (local_hash, _, _, local_noise, _) = make_router(true);
let message_id = MessageId::from(MockRuntime::rng().next_u32());
let tunnel_id = TunnelId::from(MockRuntime::rng().next_u32());
let gateway = TunnelId::from(MockRuntime::rng().next_u32());
let (pending_tunnel, _next_router, message) =
PendingTunnel::<_, OutboundTunnel<MockRuntime>>::create_tunnel(TunnelBuildParameters {
hops: hops.clone(),
metrics_handle: MockRuntime::register_metrics(vec![], None),
name: Str::from("tunnel-pool"),
noise: local_noise.clone(),
message_id,
tunnel_info: TunnelInfo::Outbound {
gateway,
tunnel_id,
router_id: local_hash,
},
receiver: ReceiverKind::Outbound,
})
.unwrap();
let message = (0..transit_managers.len() - 1).fold(message, |message, i| {
transit_managers[i].handle_short_tunnel_build(message).unwrap().1
});
let (_, msg, _) = transit_managers[2].handle_short_tunnel_build(message).unwrap();
let Message {
message_type,
payload,
..
} = msg;
assert_eq!(message_type, MessageType::TunnelGateway);
let TunnelGateway {
tunnel_id: recv_tunnel_id,
payload,
} = TunnelGateway::parse(&payload).unwrap();
assert_eq!(TunnelId::from(recv_tunnel_id), gateway);
let message = Message::parse_standard(&payload).unwrap();
assert_eq!(message.message_type, MessageType::Garlic);
match pending_tunnel.try_build_tunnel(message) {
Err(error) => {
assert_eq!(error[0].1, Some(Err(TunnelError::TunnelRejected(30))));
assert_eq!(error[1].1, Some(Ok(())));
assert_eq!(error[2].1, Some(Ok(())));
}
_ => panic!("invalid error"),
}
}
#[tokio::test]
async fn maximum_transit_tunnels() {
let handle = MockRuntime::register_metrics(vec![], None);
let mut hops = Vec::<(Bytes, StaticPublicKey)>::new();
let mut ctxs = Vec::<ShutdownContext<MockRuntime>>::new();
let mut transit_managers = Vec::<TransitTunnelManager<MockRuntime>>::new();
let (_event_mgr, _event_subscriber, event_handle) =
EventManager::new(None, MockRuntime::register_metrics(vec![], None));
for i in 0..3 {
let (router_hash, static_key, signing_key, _, router_info) = make_router(true);
let (_transit_tx, transit_rx) = channel(16);
let mut shutdown_ctx = ShutdownContext::<MockRuntime>::new();
let shutdown_handle = shutdown_ctx.handle();
let (subsys_handle, _event_rx) = SubsystemHandle::new();
hops.push((router_hash, static_key.public()));
ctxs.push(shutdown_ctx);
transit_managers.push(TransitTunnelManager::new(
if i == 0 {
Some(TransitConfig {
max_tunnels: Some(0),
})
} else {
Some(TransitConfig {
max_tunnels: Some(5000),
})
},
RouterContext::new(
handle.clone(),
ProfileStorage::new(&[], &[]),
router_info.identity.id(),
Bytes::from(router_info.serialize(&signing_key)),
static_key,
signing_key,
2u8,
event_handle.clone(),
),
subsys_handle,
transit_rx,
shutdown_handle,
));
}
let (local_hash, _, _, local_noise, _) = make_router(true);
let message_id = MessageId::from(MockRuntime::rng().next_u32());
let tunnel_id = TunnelId::from(MockRuntime::rng().next_u32());
let gateway = TunnelId::from(MockRuntime::rng().next_u32());
let (pending_tunnel, _next_router, message) =
PendingTunnel::<_, OutboundTunnel<MockRuntime>>::create_tunnel(TunnelBuildParameters {
hops: hops.clone(),
metrics_handle: MockRuntime::register_metrics(vec![], None),
name: Str::from("tunnel-pool"),
noise: local_noise.clone(),
message_id,
tunnel_info: TunnelInfo::Outbound {
gateway,
tunnel_id,
router_id: local_hash,
},
receiver: ReceiverKind::Outbound,
})
.unwrap();
let message = (0..transit_managers.len() - 1).fold(message, |message, i| {
transit_managers[i].handle_short_tunnel_build(message).unwrap().1
});
let (_, msg, _) = transit_managers[2].handle_short_tunnel_build(message).unwrap();
let Message {
message_type,
payload,
..
} = msg;
assert_eq!(message_type, MessageType::TunnelGateway);
let TunnelGateway {
tunnel_id: recv_tunnel_id,
payload,
} = TunnelGateway::parse(&payload).unwrap();
assert_eq!(TunnelId::from(recv_tunnel_id), gateway);
let message = Message::parse_standard(&payload).unwrap();
assert_eq!(message.message_type, MessageType::Garlic);
match pending_tunnel.try_build_tunnel(message) {
Err(error) => {
assert_eq!(error[0].1, Some(Err(TunnelError::TunnelRejected(30))));
assert_eq!(error[1].1, Some(Ok(())));
assert_eq!(error[2].1, Some(Ok(())));
}
_ => panic!("invalid error"),
}
}
#[tokio::test(start_paused = true)]
async fn next_hop_dial_failure() {
let handle = MockRuntime::register_metrics(vec![], None);
let (_event_mgr, _event_subscriber, event_handle) =
EventManager::new(None, MockRuntime::register_metrics(vec![], None));
let (hops, mut transit_managers): (
Vec<(
Bytes,
StaticPublicKey,
ShutdownContext<MockRuntime>,
Sender<_>,
)>,
Vec<TransitTunnelManager<MockRuntime>>,
) = (0..3)
.map(|_| make_router(true))
.into_iter()
.map(
|(router_hash, static_key, signing_key, _noise_context, router_info)| {
let (transit_tx, transit_rx) = channel(16);
let mut shutdown_ctx = ShutdownContext::<MockRuntime>::new();
let shutdown_handle = shutdown_ctx.handle();
let (subsys_handle, _event_rx) = SubsystemHandle::new();
(
(router_hash, static_key.public(), shutdown_ctx, transit_tx),
TransitTunnelManager::new(
Some(TransitConfig {
max_tunnels: Some(5000),
}),
RouterContext::new(
handle.clone(),
ProfileStorage::new(&[], &[]),
router_info.identity.id(),
Bytes::from(router_info.serialize(&signing_key)),
static_key,
signing_key,
2u8,
event_handle.clone(),
),
subsys_handle,
transit_rx,
shutdown_handle,
),
)
},
)
.unzip();
let (local_hash, _local_sk, _, local_noise, _) = make_router(true);
let message_id = MessageId::from(MockRuntime::rng().next_u32());
let tunnel_id = TunnelId::from(MockRuntime::rng().next_u32());
let gateway = TunnelId::from(MockRuntime::rng().next_u32());
let (hops, _handles): (Vec<_>, Vec<_>) = hops
.into_iter()
.map(|(router_id, public_key, context, tx)| ((router_id, public_key), (context, tx)))
.unzip();
let (_pending_tunnel, _next_router, message) =
PendingTunnel::<_, OutboundTunnel<MockRuntime>>::create_tunnel(TunnelBuildParameters {
hops: hops.clone(),
metrics_handle: MockRuntime::register_metrics(vec![], None),
name: Str::from("tunnel-pool"),
noise: local_noise,
message_id,
tunnel_info: TunnelInfo::Outbound {
gateway,
tunnel_id,
router_id: local_hash,
},
receiver: ReceiverKind::Outbound,
})
.unwrap();
assert_eq!(transit_managers[0].tunnels.len(), 0);
let (_, _, tx) = transit_managers[0].handle_short_tunnel_build(message).unwrap();
assert_eq!(transit_managers[0].tunnels.len(), 1);
drop(tx);
assert!(
tokio::time::timeout(Duration::from_secs(5), &mut transit_managers[0])
.await
.is_err()
);
assert_eq!(transit_managers[0].tunnels.len(), 0);
}
}