use crate::{
i2np::{tunnel::gateway::TunnelGateway, Message},
primitives::TunnelId,
runtime::Runtime,
subsystem::SubsystemHandle,
tunnel::pool::TUNNEL_BUILD_EXPIRATION,
};
use futures::FutureExt;
use futures_channel::oneshot;
use thingbuf::mpsc;
use core::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
const LOG_TARGET: &str = "emissary::tunnel::pool::zero-hop";
pub struct ZeroHopInboundTunnel<R: Runtime> {
expiration_timer: R::Timer,
message_rx: mpsc::Receiver<Message>,
reply_tx: Option<oneshot::Sender<Message>>,
subsyste_handle: SubsystemHandle,
tunnel_id: TunnelId,
}
impl<R: Runtime> ZeroHopInboundTunnel<R> {
pub fn new(subsyste_handle: SubsystemHandle) -> (TunnelId, Self, oneshot::Receiver<Message>) {
let (tunnel_id, message_rx) = subsyste_handle.insert_tunnel::<1>(&mut R::rng());
let (tx, rx) = oneshot::channel();
(
tunnel_id,
Self {
expiration_timer: R::timer(TUNNEL_BUILD_EXPIRATION),
message_rx,
reply_tx: Some(tx),
subsyste_handle,
tunnel_id,
},
rx,
)
}
fn on_message(&mut self, message: Message) {
tracing::trace!(
target: LOG_TARGET,
tunnel_id = %self.tunnel_id,
message_type = ?message.message_type,
"handle message",
);
let Some(TunnelGateway { payload, .. }) = TunnelGateway::parse(&message.payload) else {
tracing::warn!(
target: LOG_TARGET,
tunnel_id = %self.tunnel_id,
message_type = ?message.message_type,
"invalid message, expected `TunnelGateway`",
);
return;
};
match Message::parse_standard(payload) {
Ok(message) => {
self.reply_tx.take().map(|tx| tx.send(message));
}
Err(error) => {
tracing::warn!(
target: LOG_TARGET,
tunnel_id = %self.tunnel_id,
message_type = ?message.message_type,
?error,
"invalid message, expected standard i2np message",
);
}
}
}
}
impl<R: Runtime> Future for ZeroHopInboundTunnel<R> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.message_rx.poll_recv(cx) {
Poll::Pending => {}
Poll::Ready(None) => {
tracing::debug!(
target: LOG_TARGET,
zero_hop_tunnel = %self.tunnel_id,
"channel closed while waiting for build response",
);
self.subsyste_handle.remove_tunnel(&self.tunnel_id);
return Poll::Ready(());
}
Poll::Ready(Some(message)) => {
self.on_message(message);
self.subsyste_handle.remove_tunnel(&self.tunnel_id);
return Poll::Ready(());
}
}
if self.expiration_timer.poll_unpin(cx).is_ready() {
tracing::trace!(
target: LOG_TARGET,
zero_hop_tunnel = %self.tunnel_id,
"zero-hop tunnel expired before reply",
);
self.subsyste_handle.remove_tunnel(&self.tunnel_id);
return Poll::Ready(());
}
Poll::Pending
}
}