use crate::{
events::EventHandle,
i2np::{tunnel::data::EncryptedTunnelData, Message, MessageType},
primitives::{RouterId, TunnelId},
runtime::{Counter, Gauge, Instant, MetricsHandle, Runtime},
subsystem::SubsystemHandle,
tunnel::{
metrics::{
NUM_DROPPED_MESSAGES, NUM_PARTICIPANTS, NUM_ROUTED_MESSAGES, NUM_TERMINATED,
NUM_TRANSIT_TUNNELS, TOTAL_TRANSIT_TUNNELS,
},
noise::TunnelKeys,
transit::{TransitTunnel, TERMINATION_TIMEOUT, TRANSIT_TUNNEL_EXPIRATION},
},
};
use bytes::{BufMut, BytesMut};
use futures::FutureExt;
use rand::Rng;
use thingbuf::mpsc::Receiver;
use core::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
const LOG_TARGET: &str = "emissary::tunnel::transit::participant";
pub struct Participant<R: Runtime> {
event_handle: EventHandle<R>,
expiration_timer: R::Timer,
inbound_bandwidth: usize,
message_rx: Receiver<Message>,
metrics_handle: R::MetricsHandle,
next_router: RouterId,
next_tunnel_id: TunnelId,
outbound_bandwidth: usize,
started: Option<R::Instant>,
subsystem_handle: SubsystemHandle,
tunnel_id: TunnelId,
tunnel_keys: TunnelKeys,
}
impl<R: Runtime> Participant<R> {
fn handle_tunnel_data(
&mut self,
tunnel_data: &EncryptedTunnelData,
) -> crate::Result<(RouterId, Message)> {
tracing::trace!(
target: LOG_TARGET,
tunnel_id = %self.tunnel_id,
"participant tunnel data",
);
let (ciphertext, iv) = self.tunnel_keys.decrypt_record(tunnel_data);
let mut out = BytesMut::with_capacity(4 + 16 + ciphertext.len());
out.put_u32(self.next_tunnel_id.into());
out.put_slice(&iv);
out.put_slice(&ciphertext);
let message = Message {
message_type: MessageType::TunnelData,
message_id: R::rng().next_u32(),
expiration: R::time_since_epoch() + Duration::from_secs(8),
payload: out.to_vec(),
};
Ok((self.next_router.clone(), message))
}
}
impl<R: Runtime> TransitTunnel<R> for Participant<R> {
fn new(
tunnel_id: TunnelId,
next_tunnel_id: TunnelId,
next_router: RouterId,
tunnel_keys: TunnelKeys,
subsystem_handle: SubsystemHandle,
metrics_handle: R::MetricsHandle,
message_rx: Receiver<Message>,
event_handle: EventHandle<R>,
) -> Self {
metrics_handle.gauge(NUM_PARTICIPANTS).increment(1);
metrics_handle.gauge(NUM_TRANSIT_TUNNELS).increment(1);
metrics_handle.counter(TOTAL_TRANSIT_TUNNELS).increment(1);
Participant {
event_handle,
expiration_timer: R::timer(TRANSIT_TUNNEL_EXPIRATION),
inbound_bandwidth: 0usize,
message_rx,
metrics_handle,
next_router,
next_tunnel_id,
outbound_bandwidth: 0usize,
started: Some(R::now()),
subsystem_handle,
tunnel_id,
tunnel_keys,
}
}
}
impl<R: Runtime> Future for Participant<R> {
type Output = TunnelId;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
while let Poll::Ready(event) = self.message_rx.poll_recv(cx) {
match event {
None => {
tracing::warn!(
target: LOG_TARGET,
tunnel_id = %self.tunnel_id,
"message channel closed",
);
self.subsystem_handle.remove_tunnel(&self.tunnel_id);
self.metrics_handle.gauge(NUM_PARTICIPANTS).decrement(1);
return Poll::Ready(self.tunnel_id);
}
Some(message) => {
self.inbound_bandwidth += message.serialized_len_short();
let MessageType::TunnelData = message.message_type else {
tracing::warn!(
target: LOG_TARGET,
tunnel_id = %self.tunnel_id,
message_type = ?message.message_type,
"unsupported message",
);
self.metrics_handle.counter(NUM_DROPPED_MESSAGES).increment(1);
continue;
};
let Some(message) = EncryptedTunnelData::parse(&message.payload) else {
tracing::warn!(
target: LOG_TARGET,
tunnel_id = %self.tunnel_id,
"failed to parse TunnelData message",
);
self.metrics_handle.counter(NUM_DROPPED_MESSAGES).increment(1);
continue;
};
match self.handle_tunnel_data(&message) {
Ok((router, message)) => {
self.outbound_bandwidth += message.serialized_len_short();
match self.subsystem_handle.send(&router, message) {
Ok(()) => {
self.metrics_handle.counter(NUM_ROUTED_MESSAGES).increment(1);
}
Err(error) => {
tracing::error!(
target: LOG_TARGET,
tunnel_id = %self.tunnel_id,
?error,
"failed to send message",
);
self.metrics_handle.counter(NUM_DROPPED_MESSAGES).increment(1);
}
}
}
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
tunnel_id = %self.tunnel_id,
?error,
"failed to parse TunnelData message",
);
self.metrics_handle.counter(NUM_DROPPED_MESSAGES).increment(1);
}
}
}
}
}
if let Some(ref started) = self.started {
if started.elapsed() > TERMINATION_TIMEOUT {
self.started = None;
if self.inbound_bandwidth == 0 && self.outbound_bandwidth == 0 {
tracing::debug!(
target: LOG_TARGET,
tunnel_id = %self.tunnel_id,
"shutting down tunnel after 2 minutes of inactivity",
);
self.subsystem_handle.remove_tunnel(&self.tunnel_id);
self.metrics_handle.gauge(NUM_PARTICIPANTS).decrement(1);
self.metrics_handle.gauge(NUM_TRANSIT_TUNNELS).decrement(1);
self.metrics_handle.counter(NUM_TERMINATED).increment(1);
return Poll::Ready(self.tunnel_id);
}
}
}
if self.event_handle.poll_unpin(cx).is_ready() {
self.event_handle.transit_inbound_bandwidth(self.inbound_bandwidth);
self.event_handle.transit_outbound_bandwidth(self.outbound_bandwidth);
self.inbound_bandwidth = 0;
self.outbound_bandwidth = 0;
}
if self.expiration_timer.poll_unpin(cx).is_ready() {
self.subsystem_handle.remove_tunnel(&self.tunnel_id);
self.metrics_handle.gauge(NUM_PARTICIPANTS).decrement(1);
self.metrics_handle.gauge(NUM_TRANSIT_TUNNELS).decrement(1);
return Poll::Ready(self.tunnel_id);
}
Poll::Pending
}
}