use crate::{
crypto::aes::{cbc, ecb},
error::Error,
events::EventHandle,
i2np::{
tunnel::{data::TunnelDataBuilder, gateway::TunnelGateway},
Message, MessageType,
},
primitives::{RouterId, TunnelId},
runtime::{Counter, Gauge, Instant, MetricsHandle, Runtime},
subsystem::SubsystemHandle,
tunnel::{
metrics::{
NUM_DROPPED_MESSAGES, NUM_IBGWS, NUM_ROUTED_MESSAGES, NUM_TERMINATED,
NUM_TRANSIT_TUNNELS, TOTAL_TRANSIT_TUNNELS,
},
noise::TunnelKeys,
transit::{TransitTunnel, TERMINATION_TIMEOUT, TRANSIT_TUNNEL_EXPIRATION},
},
};
use futures::FutureExt;
use rand::Rng;
use thingbuf::mpsc::Receiver;
use alloc::vec::Vec;
use core::{
future::Future,
ops::{Range, RangeFrom},
pin::Pin,
task::{Context, Poll},
time::Duration,
};
const LOG_TARGET: &str = "emissary::tunnel::transit::ibgw";
const AES_IV_OFFSET: Range<usize> = 4..20;
const PAYLOAD_OFFSET: RangeFrom<usize> = 20..;
pub struct InboundGateway<R: Runtime> {
event_handle: EventHandle<R>,
expiration_timer: R::Timer,
inbound_bandwidth: usize,
message_rx: Receiver<Message>,
metrics_handle: R::MetricsHandle,
next_router: RouterId,
next_tunnel_id: TunnelId,
outbound_bandwidth: usize,
padding_bytes: [u8; 1028],
started: Option<R::Instant>,
subsystem_handle: SubsystemHandle,
tunnel_id: TunnelId,
tunnel_keys: TunnelKeys,
}
impl<R: Runtime> InboundGateway<R> {
fn handle_tunnel_gateway(
&self,
tunnel_gateway: &TunnelGateway,
) -> crate::Result<(RouterId, impl Iterator<Item = Message> + '_)> {
match Message::parse_standard(tunnel_gateway.payload) {
Err(error) => {
tracing::warn!(
target: LOG_TARGET,
tunnel_id = %self.tunnel_id,
gateway_tunnel_id = %tunnel_gateway.tunnel_id,
message_len = ?tunnel_gateway.payload.len(),
?error,
"malformed i2np message",
);
return Err(Error::InvalidData);
}
Ok(message) if message.is_expired::<R>() => {
tracing::debug!(
target: LOG_TARGET,
message_id = ?message.message_id,
message_type = ?message.message_type,
"dropping expired i2np message",
);
return Err(Error::Expired);
}
Ok(message) => tracing::trace!(
target: LOG_TARGET,
tunnel_id = %self.tunnel_id,
message_type = ?message.message_type,
"tunnel gateway",
),
}
let messages = TunnelDataBuilder::new(self.next_tunnel_id)
.with_local_delivery(tunnel_gateway.payload)
.build::<R>(&self.padding_bytes)
.map(|mut message| {
let mut aes = ecb::Aes::new_encryptor(self.tunnel_keys.iv_key());
let iv = aes.encrypt(&message[AES_IV_OFFSET]);
let mut aes = cbc::Aes::new_encryptor(self.tunnel_keys.layer_key(), &iv);
let ciphertext = aes.encrypt(&message[PAYLOAD_OFFSET]);
let mut aes = ecb::Aes::new_encryptor(self.tunnel_keys.iv_key());
let iv = aes.encrypt(iv);
message[AES_IV_OFFSET].copy_from_slice(&iv);
message[PAYLOAD_OFFSET].copy_from_slice(&ciphertext);
Message {
message_type: MessageType::TunnelData,
message_id: R::rng().next_u32(),
expiration: R::time_since_epoch() + Duration::from_secs(8),
payload: message,
}
});
Ok((self.next_router.clone(), messages))
}
}
impl<R: Runtime> TransitTunnel<R> for InboundGateway<R> {
fn new(
tunnel_id: TunnelId,
next_tunnel_id: TunnelId,
next_router: RouterId,
tunnel_keys: TunnelKeys,
subsystem_handle: SubsystemHandle,
metrics_handle: R::MetricsHandle,
message_rx: Receiver<Message>,
event_handle: EventHandle<R>,
) -> Self {
let padding_bytes = {
let mut padding_bytes = [0u8; 1028];
R::rng().fill_bytes(&mut padding_bytes);
padding_bytes = TryInto::<[u8; 1028]>::try_into(
padding_bytes
.into_iter()
.map(|byte| if byte == 0 { 1u8 } else { byte })
.collect::<Vec<_>>(),
)
.expect("to succeed");
padding_bytes
};
metrics_handle.gauge(NUM_IBGWS).increment(1);
metrics_handle.gauge(NUM_TRANSIT_TUNNELS).increment(1);
metrics_handle.counter(TOTAL_TRANSIT_TUNNELS).increment(1);
InboundGateway {
event_handle,
expiration_timer: R::timer(TRANSIT_TUNNEL_EXPIRATION),
inbound_bandwidth: 0usize,
message_rx,
metrics_handle,
next_router,
next_tunnel_id,
outbound_bandwidth: 0usize,
padding_bytes,
started: Some(R::now()),
subsystem_handle,
tunnel_id,
tunnel_keys,
}
}
}
impl<R: Runtime> Future for InboundGateway<R> {
type Output = TunnelId;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
while let Poll::Ready(event) = self.message_rx.poll_recv(cx) {
match event {
None => {
tracing::warn!(
target: LOG_TARGET,
tunnel_id = %self.tunnel_id,
"message channel closed",
);
self.subsystem_handle.remove_tunnel(&self.tunnel_id);
self.metrics_handle.gauge(NUM_IBGWS).decrement(1);
return Poll::Ready(self.tunnel_id);
}
Some(message) => {
self.inbound_bandwidth += message.serialized_len_short();
let MessageType::TunnelGateway = message.message_type else {
tracing::warn!(
target: LOG_TARGET,
tunnel_id = %self.tunnel_id,
message_type = ?message.message_type,
"unsupported message",
);
debug_assert!(false);
self.metrics_handle.counter(NUM_DROPPED_MESSAGES).increment(1);
continue;
};
let Some(message) = TunnelGateway::parse(&message.payload) else {
tracing::warn!(
target: LOG_TARGET,
tunnel_id = %self.tunnel_id,
"malformed tunnel gateway message",
);
debug_assert!(false);
self.metrics_handle.counter(NUM_DROPPED_MESSAGES).increment(1);
continue;
};
let (router, messages) = match self.handle_tunnel_gateway(&message) {
Ok((router, messages)) => (router, messages),
Err(Error::Expired) => {
self.metrics_handle.counter(NUM_DROPPED_MESSAGES).increment(1);
continue;
}
Err(error) => {
tracing::warn!(
target: LOG_TARGET,
tunnel_id = %self.tunnel_id,
?error,
"failed to handle tunnel gateway",
);
self.metrics_handle.counter(NUM_DROPPED_MESSAGES).increment(1);
continue;
}
};
let total_len = messages.into_iter().fold(0usize, |mut acc, message| {
acc += message.serialized_len_short();
match self.subsystem_handle.send(&router, message) {
Ok(()) => {
self.metrics_handle.counter(NUM_ROUTED_MESSAGES).increment(1);
}
Err(error) => {
tracing::error!(
target: LOG_TARGET,
tunnel_id = %self.tunnel_id,
?error,
"failed to send message",
);
self.metrics_handle.counter(NUM_DROPPED_MESSAGES).increment(1);
}
}
acc
});
self.outbound_bandwidth += total_len;
}
}
}
if let Some(ref started) = self.started {
if started.elapsed() > TERMINATION_TIMEOUT {
self.started = None;
if self.inbound_bandwidth == 0 && self.outbound_bandwidth == 0 {
tracing::debug!(
target: LOG_TARGET,
tunnel_id = %self.tunnel_id,
"shutting down tunnel after 2 minutes of inactivity",
);
self.subsystem_handle.remove_tunnel(&self.tunnel_id);
self.metrics_handle.gauge(NUM_IBGWS).decrement(1);
self.metrics_handle.gauge(NUM_TRANSIT_TUNNELS).decrement(1);
self.metrics_handle.counter(NUM_TERMINATED).increment(1);
return Poll::Ready(self.tunnel_id);
}
}
}
if self.event_handle.poll_unpin(cx).is_ready() {
self.event_handle.transit_inbound_bandwidth(self.inbound_bandwidth);
self.event_handle.transit_outbound_bandwidth(self.outbound_bandwidth);
self.inbound_bandwidth = 0;
self.outbound_bandwidth = 0;
}
if self.expiration_timer.poll_unpin(cx).is_ready() {
self.subsystem_handle.remove_tunnel(&self.tunnel_id);
self.metrics_handle.gauge(NUM_IBGWS).decrement(1);
self.metrics_handle.gauge(NUM_TRANSIT_TUNNELS).decrement(1);
return Poll::Ready(self.tunnel_id);
}
Poll::Pending
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
crypto::EphemeralPublicKey,
events::EventManager,
i2np::{HopRole, MessageBuilder},
primitives::{MessageId, Str},
runtime::mock::MockRuntime,
subsystem::SubsystemHandle,
tunnel::{
garlic::{DeliveryInstructions, GarlicHandler},
hop::{
inbound::InboundTunnel, pending::PendingTunnel, ReceiverKind,
TunnelBuildParameters, TunnelInfo,
},
pool::TunnelPoolBuildParameters,
tests::make_router,
},
};
use bytes::Bytes;
use thingbuf::mpsc::channel;
#[tokio::test]
async fn expired_tunnel_gateway_payload() {
let (ibgw_router_hash, ibgw_static_key, _, ibgw_noise, ibgw_router_info) =
make_router(false);
let mut ibgw_garlic = GarlicHandler::<MockRuntime>::new(
ibgw_noise.clone(),
MockRuntime::register_metrics(vec![], None),
);
let (_ibep_router_hash, _ibep_public_key, _, ibep_noise, _ibep_router_info) =
make_router(false);
let (_event_mgr, _event_subscriber, event_handle) =
EventManager::new(None, MockRuntime::register_metrics(vec![], None));
let (subsys_handle, _event_rx) = SubsystemHandle::new();
let (_tx, rx) = channel(64);
let TunnelPoolBuildParameters {
context_handle: handle,
..
} = TunnelPoolBuildParameters::new(Default::default());
let (pending, router_id, message) =
PendingTunnel::<_, InboundTunnel<MockRuntime>>::create_tunnel(TunnelBuildParameters {
hops: vec![(ibgw_router_hash.clone(), ibgw_static_key.public())],
metrics_handle: MockRuntime::register_metrics(vec![], None),
name: Str::from("tunnel-pool"),
noise: ibep_noise.clone(),
message_id: MessageId::from(MockRuntime::rng().next_u32()),
tunnel_info: TunnelInfo::Inbound {
tunnel_id: TunnelId::random(),
router_id: Bytes::from(RouterId::random().to_vec()),
},
receiver: ReceiverKind::Inbound {
message_rx: rx,
handle,
},
})
.unwrap();
assert_eq!(router_id, ibgw_router_info.identity.id());
assert_eq!(message.message_type, MessageType::Garlic);
let mut message = match ibgw_garlic.handle_message(message).unwrap().next() {
Some(DeliveryInstructions::Local { message }) => message,
_ => panic!("invalid delivery instructions"),
};
assert_eq!(message.message_type, MessageType::ShortTunnelBuild);
assert_eq!(message.payload[1..].len() % 218, 0);
let (ibgw_keys, _) = {
let mut ibgw_session = ibgw_noise.create_short_inbound_session(
EphemeralPublicKey::try_from_bytes(
pending.hops()[0].outbound_session().ephemeral_key(),
)
.unwrap(),
);
let router_id = ibgw_router_hash;
let (idx, record) = message.payload[1..]
.chunks_mut(218)
.enumerate()
.find(|(_, chunk)| &chunk[..16] == &router_id[..16])
.unwrap();
let _decrypted_record = ibgw_session.decrypt_build_record(record[48..].to_vec());
ibgw_session.create_tunnel_keys(HopRole::InboundGateway).unwrap();
record[48] = 0x00;
record[49] = 0x00;
record[201] = 0x00;
ibgw_session.encrypt_build_records(&mut message.payload, idx).unwrap();
let keys = ibgw_session.finalize().unwrap();
let msg = MessageBuilder::standard()
.with_message_type(MessageType::ShortTunnelBuild)
.with_message_id(MockRuntime::rng().next_u32())
.with_expiration(MockRuntime::time_since_epoch() + Duration::from_secs(5))
.with_payload(&message.payload)
.build();
let message = Message::parse_standard(&msg).unwrap();
(keys, pending.try_build_tunnel(message).unwrap())
};
let (_msg_tx, msg_rx) = channel(64);
let tunnel = InboundGateway::<MockRuntime>::new(
TunnelId::random(),
TunnelId::random(),
RouterId::random(),
ibgw_keys,
subsys_handle,
MockRuntime::register_metrics(vec![], None),
msg_rx,
event_handle.clone(),
);
let message = MessageBuilder::standard()
.with_expiration(MockRuntime::time_since_epoch() - Duration::from_secs(5))
.with_message_type(MessageType::DatabaseLookup)
.with_message_id(MockRuntime::rng().next_u32())
.with_payload(&vec![1, 2, 3, 4])
.build();
let tunnel_gateway = TunnelGateway {
tunnel_id: tunnel.tunnel_id,
payload: &message,
};
match tunnel.handle_tunnel_gateway(&tunnel_gateway) {
Err(Error::Expired) => {}
_ => panic!("invalid result"),
};
}
#[tokio::test]
async fn invalid_tunnel_gateway_payload() {
let (_event_mgr, _event_subscriber, event_handle) =
EventManager::new(None, MockRuntime::register_metrics(vec![], None));
let (ibgw_router_hash, ibgw_static_key, _, ibgw_noise, ibgw_router_info) =
make_router(false);
let mut ibgw_garlic = GarlicHandler::<MockRuntime>::new(
ibgw_noise.clone(),
MockRuntime::register_metrics(vec![], None),
);
let (_ibep_router_hash, _ibep_public_key, _, ibep_noise, _ibep_router_info) =
make_router(false);
let (subsys_handle, _event_rx) = SubsystemHandle::new();
let (_tx, rx) = channel(64);
let TunnelPoolBuildParameters {
context_handle: handle,
..
} = TunnelPoolBuildParameters::new(Default::default());
let (pending, router_id, message) =
PendingTunnel::<_, InboundTunnel<MockRuntime>>::create_tunnel(TunnelBuildParameters {
hops: vec![(ibgw_router_hash.clone(), ibgw_static_key.public())],
metrics_handle: MockRuntime::register_metrics(vec![], None),
name: Str::from("tunnel-pool"),
noise: ibep_noise.clone(),
message_id: MessageId::from(MockRuntime::rng().next_u32()),
tunnel_info: TunnelInfo::Inbound {
tunnel_id: TunnelId::random(),
router_id: Bytes::from(RouterId::random().to_vec()),
},
receiver: ReceiverKind::Inbound {
message_rx: rx,
handle,
},
})
.unwrap();
assert_eq!(router_id, ibgw_router_info.identity.id());
assert_eq!(message.message_type, MessageType::Garlic);
let mut message = match ibgw_garlic.handle_message(message).unwrap().next() {
Some(DeliveryInstructions::Local { message }) => message,
_ => panic!("invalid delivery instructions"),
};
assert_eq!(message.message_type, MessageType::ShortTunnelBuild);
assert_eq!(message.payload[1..].len() % 218, 0);
let (ibgw_keys, _) = {
let mut ibgw_session = ibgw_noise.create_short_inbound_session(
EphemeralPublicKey::try_from_bytes(
pending.hops()[0].outbound_session().ephemeral_key(),
)
.unwrap(),
);
let router_id = ibgw_router_hash;
let (idx, record) = message.payload[1..]
.chunks_mut(218)
.enumerate()
.find(|(_, chunk)| &chunk[..16] == &router_id[..16])
.unwrap();
let _decrypted_record = ibgw_session.decrypt_build_record(record[48..].to_vec());
ibgw_session.create_tunnel_keys(HopRole::InboundGateway).unwrap();
record[48] = 0x00;
record[49] = 0x00;
record[201] = 0x00;
ibgw_session.encrypt_build_records(&mut message.payload, idx).unwrap();
let keys = ibgw_session.finalize().unwrap();
let msg = MessageBuilder::standard()
.with_message_type(MessageType::ShortTunnelBuild)
.with_message_id(MockRuntime::rng().next_u32())
.with_expiration(MockRuntime::time_since_epoch() + Duration::from_secs(5))
.with_payload(&message.payload)
.build();
let message = Message::parse_standard(&msg).unwrap();
(keys, pending.try_build_tunnel(message).unwrap())
};
let (_msg_tx, msg_rx) = channel(64);
let tunnel = InboundGateway::<MockRuntime>::new(
TunnelId::random(),
TunnelId::random(),
RouterId::random(),
ibgw_keys,
subsys_handle,
MockRuntime::register_metrics(vec![], None),
msg_rx,
event_handle.clone(),
);
let tunnel_gateway = TunnelGateway {
tunnel_id: tunnel.tunnel_id,
payload: &vec![0xaa, 0xaa, 0xaa],
};
match tunnel.handle_tunnel_gateway(&tunnel_gateway) {
Err(Error::InvalidData) => {}
_ => panic!("invalid result"),
};
}
}