use crate::{
error::{ChannelError, RoutingError},
i2np::{Message, MessageType},
primitives::{Lease, MessageId, RouterId, TunnelId},
tunnel::pool::{TunnelPoolConfig, TunnelPoolEvent, TunnelPoolHandle},
};
use bytes::Bytes;
use futures::Stream;
use futures_channel::oneshot;
use hashbrown::HashMap;
use rand::CryptoRng;
use thingbuf::mpsc;
#[cfg(feature = "std")]
use parking_lot::RwLock;
#[cfg(feature = "no_std")]
use spin::rwlock::RwLock;
use alloc::{sync::Arc, vec::Vec};
use core::{
pin::Pin,
task::{Context, Poll},
};
const LOG_TARGET: &str = "emissary::tunnel::context";
const TUNNEL_CHANNEL_SIZE: usize = 4096usize;
#[derive(Default)]
struct MessageListeners {
listeners: HashMap<MessageId, oneshot::Sender<Message>>,
garlic_tags: HashMap<Bytes, MessageId>,
message_ids: HashMap<MessageId, Bytes>,
}
#[derive(Clone)]
pub struct TunnelPoolContextHandle {
listeners: Arc<RwLock<MessageListeners>>,
event_tx: mpsc::Sender<TunnelPoolEvent>,
tx: mpsc::Sender<TunnelMessage, TunnelMessageRecycle>,
}
impl TunnelPoolContextHandle {
pub fn send_to_router_with_feedback(
&self,
gateway: TunnelId,
router_id: RouterId,
message: Vec<u8>,
feedback_tx: oneshot::Sender<()>,
) -> Result<(), ChannelError> {
self.tx
.try_send(TunnelMessage::RouterDelivery {
gateway,
router_id,
message,
feedback_tx: Some(feedback_tx),
})
.map_err(From::from)
}
#[allow(unused)]
pub fn send_to_tunnel(
&self,
gateway: RouterId,
tunnel_id: TunnelId,
message: Vec<u8>,
) -> Result<(), ChannelError> {
self.tx
.try_send(TunnelMessage::TunnelDelivery {
gateway,
tunnel_id,
message,
})
.map_err(From::from)
}
pub fn route_message(&self, message: Message) -> Result<(), RoutingError> {
let mut inner = self.listeners.write();
let message_id = MessageId::from(message.message_id);
let message_id = match message.message_type {
MessageType::Garlic => {
let garlic_tag = Bytes::from(message.payload[4..12].to_vec());
match inner.garlic_tags.remove(&garlic_tag) {
Some(message_id) => {
let _value = inner.message_ids.remove(&message_id);
debug_assert!(_value == Some(garlic_tag));
message_id
}
None => match inner.listeners.remove(&message_id) {
Some(listener) =>
return listener.send(message).map_err(RoutingError::ChannelClosed),
None =>
return self
.event_tx
.try_send(TunnelPoolEvent::Message { message })
.map_err(|error| {
tracing::warn!(
target: LOG_TARGET,
?message_id,
?error,
"failed to route garlic message to client destination",
);
match error {
mpsc::errors::TrySendError::Full(
TunnelPoolEvent::Message { message },
) => RoutingError::ChannelFull(message),
mpsc::errors::TrySendError::Closed(
TunnelPoolEvent::Message { message },
) => RoutingError::ChannelClosed(message),
_ => unreachable!(),
}
}),
},
}
}
MessageType::DatabaseStore
| MessageType::DatabaseLookup
| MessageType::DatabaseSearchReply => {
return self.event_tx.try_send(TunnelPoolEvent::Message { message }).map_err(
|error| {
tracing::warn!(
target: LOG_TARGET,
?message_id,
?error,
"failed to route netdb message",
);
match error {
mpsc::errors::TrySendError::Full(TunnelPoolEvent::Message {
message,
}) => RoutingError::ChannelFull(message),
mpsc::errors::TrySendError::Closed(TunnelPoolEvent::Message {
message,
}) => RoutingError::ChannelClosed(message),
_ => unreachable!(),
}
},
);
}
MessageType::OutboundTunnelBuildReply => {
inner
.message_ids
.remove(&message_id)
.map(|garlic_tag| inner.garlic_tags.remove(&garlic_tag));
message_id
}
_ => message_id,
};
match inner.listeners.remove(&message_id) {
Some(listener) => listener.send(message).map_err(RoutingError::ChannelClosed),
None =>
self.tx
.try_send(TunnelMessage::Inbound { message })
.map_err(|error| match error {
mpsc::errors::TrySendError::Full(TunnelMessage::Inbound { message }) =>
RoutingError::ChannelFull(message),
mpsc::errors::TrySendError::Closed(TunnelMessage::Inbound { message }) =>
RoutingError::ChannelClosed(message),
_ => unreachable!(),
}),
}
}
pub fn add_listener(
&self,
rng: &mut impl CryptoRng,
) -> (MessageId, oneshot::Receiver<Message>) {
let mut inner = self.listeners.write();
let (tx, rx) = oneshot::channel();
loop {
let message_id = MessageId::from(rng.next_u32());
if !inner.listeners.contains_key(&message_id) {
inner.listeners.insert(message_id, tx);
return (message_id, rx);
}
}
}
pub fn add_garlic_listener(&self, message_id: MessageId, garlic_tag: Bytes) {
let mut inner = self.listeners.write();
debug_assert!(inner.listeners.contains_key(&message_id));
inner.garlic_tags.insert(garlic_tag.clone(), message_id);
inner.message_ids.insert(message_id, garlic_tag);
}
pub fn remove_listener(&self, message_id: &MessageId) {
let mut inner = self.listeners.write();
inner.listeners.remove(message_id);
inner
.message_ids
.remove(message_id)
.map(|garlic_tag| inner.garlic_tags.remove(&garlic_tag));
}
}
#[derive(Debug, Default, Clone)]
pub struct TunnelMessageRecycle(());
impl thingbuf::Recycle<TunnelMessage> for TunnelMessageRecycle {
fn new_element(&self) -> TunnelMessage {
TunnelMessage::Dummy
}
fn recycle(&self, element: &mut TunnelMessage) {
*element = TunnelMessage::Dummy
}
}
#[derive(Default)]
pub enum TunnelMessage {
Inbound {
message: Message,
},
RouterDelivery {
gateway: TunnelId,
router_id: RouterId,
message: Vec<u8>,
feedback_tx: Option<oneshot::Sender<()>>,
},
TunnelDelivery {
gateway: RouterId,
tunnel_id: TunnelId,
message: Vec<u8>,
},
RouterDeliveryViaRoute {
router_id: RouterId,
outbound_tunnel: Option<TunnelId>,
message: Vec<u8>,
},
TunnelDeliveryViaRoute {
router_id: RouterId,
tunnel_id: TunnelId,
outbound_tunnel: Option<TunnelId>,
message: Vec<u8>,
},
#[default]
Dummy,
}
pub struct TunnelPoolContext {
listeners: Arc<RwLock<MessageListeners>>,
event_tx: mpsc::Sender<TunnelPoolEvent>,
rx: mpsc::Receiver<TunnelMessage, TunnelMessageRecycle>,
tx: mpsc::Sender<TunnelMessage, TunnelMessageRecycle>,
}
impl TunnelPoolContext {
pub fn add_listener(
&self,
rng: &mut impl CryptoRng,
) -> (MessageId, oneshot::Receiver<Message>) {
let mut inner = self.listeners.write();
let (tx, rx) = oneshot::channel();
loop {
let message_id = MessageId::from(rng.next_u32());
if !inner.listeners.contains_key(&message_id) {
inner.listeners.insert(message_id, tx);
return (message_id, rx);
}
}
}
pub fn remove_listener(&self, message_id: &MessageId) {
let mut inner = self.listeners.write();
inner.listeners.remove(message_id);
inner
.message_ids
.remove(message_id)
.map(|garlic_tag| inner.garlic_tags.remove(&garlic_tag));
}
pub fn context_handle(&self) -> TunnelPoolContextHandle {
TunnelPoolContextHandle {
listeners: Arc::clone(&self.listeners),
event_tx: self.event_tx.clone(),
tx: self.tx.clone(),
}
}
pub fn register_tunnel_pool_shut_down(&self) -> Result<(), ChannelError> {
self.event_tx.try_send(TunnelPoolEvent::TunnelPoolShutDown).map_err(From::from)
}
pub fn register_inbound_tunnel_built(
&self,
tunnel_id: TunnelId,
lease: Lease,
) -> Result<(), ChannelError> {
self.event_tx
.try_send(TunnelPoolEvent::InboundTunnelBuilt { tunnel_id, lease })
.map_err(From::from)
}
pub fn register_outbound_tunnel_built(&self, tunnel_id: TunnelId) -> Result<(), ChannelError> {
self.event_tx
.try_send(TunnelPoolEvent::OutboundTunnelBuilt { tunnel_id })
.map_err(From::from)
}
pub fn register_inbound_tunnel_expired(&self, tunnel_id: TunnelId) -> Result<(), ChannelError> {
self.event_tx
.try_send(TunnelPoolEvent::InboundTunnelExpired { tunnel_id })
.map_err(From::from)
}
pub fn register_outbound_tunnel_expired(
&self,
tunnel_id: TunnelId,
) -> Result<(), ChannelError> {
self.event_tx
.try_send(TunnelPoolEvent::OutboundTunnelExpired { tunnel_id })
.map_err(From::from)
}
pub fn register_expiring_inbound_tunnel(
&self,
tunnel_id: TunnelId,
) -> Result<(), ChannelError> {
self.event_tx
.try_send(TunnelPoolEvent::InboundTunnelExpiring { tunnel_id })
.map_err(From::from)
}
pub fn register_expiring_outbound_tunnel(
&self,
tunnel_id: TunnelId,
) -> Result<(), ChannelError> {
self.event_tx
.try_send(TunnelPoolEvent::OutboundTunnelExpiring { tunnel_id })
.map_err(From::from)
}
}
impl Stream for TunnelPoolContext {
type Item = TunnelMessage;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.rx.poll_recv(cx)
}
}
pub struct TunnelPoolBuildParameters {
pub config: TunnelPoolConfig,
pub context: TunnelPoolContext,
pub context_handle: TunnelPoolContextHandle,
pub shutdown_rx: oneshot::Receiver<()>,
pub tunnel_pool_handle: TunnelPoolHandle,
}
impl TunnelPoolBuildParameters {
pub fn new(config: TunnelPoolConfig) -> Self {
let listeners = Arc::new(RwLock::new(MessageListeners::default()));
let (tx, rx) = mpsc::with_recycle(TUNNEL_CHANNEL_SIZE, TunnelMessageRecycle::default());
let (tunnel_pool_handle, event_tx, shutdown_rx) =
TunnelPoolHandle::new(config.clone(), tx.clone());
Self {
config,
context: TunnelPoolContext {
listeners: Arc::clone(&listeners),
event_tx: event_tx.clone(),
rx,
tx: tx.clone(),
},
context_handle: TunnelPoolContextHandle {
listeners,
event_tx,
tx,
},
shutdown_rx,
tunnel_pool_handle,
}
}
}