use crate::{
crypto::StaticPrivateKey,
i2cp::{
message::{
BandwidthLimits, Message, RequestVariableLeaseSet, SessionId, SessionStatus,
SessionStatusKind, SetDate,
},
socket::I2cpSocket,
},
primitives::{Date, DestinationId, Lease, Mapping, Str, TunnelId},
profile::ProfileStorage,
runtime::{AddressBook, Runtime},
tunnel::{TunnelManagerHandle, TunnelPoolConfig, TunnelPoolEvent, TunnelPoolHandle},
};
use bytes::Bytes;
use futures::{future::BoxFuture, FutureExt, StreamExt};
use hashbrown::{HashMap, HashSet};
use alloc::{boxed::Box, string::ToString, sync::Arc, vec::Vec};
use core::{
fmt,
future::Future,
mem,
pin::Pin,
str::FromStr,
task::{Context, Poll},
};
const LOG_TARGET: &str = "emissary::i2cp::pending-session";
pub struct I2cpSessionContext<R: Runtime> {
pub address_book: Option<Arc<dyn AddressBook>>,
pub destination_id: DestinationId,
pub inbound: HashMap<TunnelId, Lease>,
pub leaseset: Bytes,
pub options: Mapping,
pub outbound: HashSet<TunnelId>,
pub private_keys: Vec<StaticPrivateKey>,
pub profile_storage: ProfileStorage<R>,
pub session_id: u16,
pub socket: I2cpSocket<R>,
pub tunnel_pool_handle: TunnelPoolHandle,
}
enum PendingSessionState<R: Runtime> {
Inactive {
session_id: u16,
socket: I2cpSocket<R>,
},
BuildingPool {
session_id: u16,
socket: I2cpSocket<R>,
options: Mapping,
tunnel_pool_future: BoxFuture<'static, TunnelPoolHandle>,
},
BuildingTunnels {
session_id: u16,
socket: I2cpSocket<R>,
options: Mapping,
handle: TunnelPoolHandle,
inbound: HashMap<TunnelId, Lease>,
outbound: HashSet<TunnelId>,
},
AwaitingLeaseSet {
session_id: u16,
socket: I2cpSocket<R>,
options: Mapping,
handle: TunnelPoolHandle,
inbound: HashMap<TunnelId, Lease>,
outbound: HashSet<TunnelId>,
},
Poisoned,
}
impl<R: Runtime> fmt::Debug for PendingSessionState<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Inactive { session_id, .. } => f
.debug_struct("PendingSessionState::Inactive")
.field("session_id", &session_id)
.finish_non_exhaustive(),
Self::BuildingPool { session_id, .. } => f
.debug_struct("PendingSessionState::BuildingPool")
.field("session_id", &session_id)
.finish_non_exhaustive(),
Self::BuildingTunnels { session_id, .. } => f
.debug_struct("PendingSessionState::BuildingTunnels")
.field("session_id", &session_id)
.finish_non_exhaustive(),
Self::AwaitingLeaseSet { session_id, .. } => f
.debug_struct("PendingSessionState::AwaitingLeaseSet")
.field("session_id", &session_id)
.finish_non_exhaustive(),
Self::Poisoned =>
f.debug_struct("PendingSessionState::Poisoned").finish_non_exhaustive(),
}
}
}
impl<R: Runtime> PendingSessionState<R> {
fn socket(&mut self) -> &mut I2cpSocket<R> {
match self {
Self::Inactive { socket, .. } => socket,
Self::BuildingPool { socket, .. } => socket,
Self::BuildingTunnels { socket, .. } => socket,
Self::AwaitingLeaseSet { socket, .. } => socket,
Self::Poisoned => unreachable!(),
}
}
fn session_id(&self) -> u16 {
match self {
Self::Inactive { session_id, .. } => *session_id,
Self::BuildingPool { session_id, .. } => *session_id,
Self::BuildingTunnels { session_id, .. } => *session_id,
Self::AwaitingLeaseSet { session_id, .. } => *session_id,
Self::Poisoned => unreachable!(),
}
}
}
pub struct PendingI2cpSession<R: Runtime> {
address_book: Option<Arc<dyn AddressBook>>,
profile_storage: ProfileStorage<R>,
state: PendingSessionState<R>,
tunnel_manager_handle: TunnelManagerHandle,
}
impl<R: Runtime> PendingI2cpSession<R> {
pub fn new(
session_id: u16,
socket: I2cpSocket<R>,
tunnel_manager_handle: TunnelManagerHandle,
address_book: Option<Arc<dyn AddressBook>>,
profile_storage: ProfileStorage<R>,
) -> Self {
Self {
address_book,
profile_storage,
state: PendingSessionState::Inactive { session_id, socket },
tunnel_manager_handle,
}
}
fn on_message(&mut self, message: Message) -> Option<I2cpSessionContext<R>> {
match message {
Message::GetDate { version, options } => {
tracing::trace!(
target: LOG_TARGET,
session_id = ?self.state.session_id(),
%version,
?options,
"get date, send set date",
);
self.state.socket().send_message(SetDate::new(
Date::new(R::time_since_epoch().as_millis() as u64),
Str::from_str("0.9.63").expect("to succeed"),
));
}
Message::GetBandwidthLimits => {
tracing::trace!(
target: LOG_TARGET,
session_id = ?self.state.session_id(),
"handle bandwidth limit request",
);
self.state.socket().send_message(BandwidthLimits::new());
}
Message::DestroySession { session_id } => {
tracing::trace!(
target: LOG_TARGET,
session_id = ?self.state.session_id(),
destroyed_session_id = ?session_id,
"destroy session",
);
self.state
.socket()
.send_message(SessionStatus::new(session_id, SessionStatusKind::Destroyed));
}
Message::CreateSession {
destination,
date,
mut options,
} => match mem::replace(&mut self.state, PendingSessionState::Poisoned) {
PendingSessionState::Inactive {
session_id,
mut socket,
} => {
tracing::info!(
target: LOG_TARGET,
?session_id,
destination = %destination.id(),
?date,
num_options = ?options.len(),
"create session",
);
let tunnel_pool_config = {
match options.get(&Str::from("inbound.nickname")) {
Some(_) => TunnelPoolConfig::from(&options),
None => {
let name =
options.get(&Str::from("outbound.nickname")).map_or_else(
|| Str::from(destination.id().to_string()),
|name| name.clone(),
);
options.insert(Str::from("inbound.nickname"), name);
TunnelPoolConfig::from(&options)
}
}
};
match self.tunnel_manager_handle.create_tunnel_pool(tunnel_pool_config) {
Ok(future) => {
tracing::trace!(
target: LOG_TARGET,
?session_id,
"tunnel pool build started",
);
socket.send_message(SessionStatus::new(
SessionId::Session(session_id),
SessionStatusKind::Created,
));
self.state = PendingSessionState::BuildingPool {
session_id,
socket,
options,
tunnel_pool_future: Box::pin(future),
};
}
Err(error) => {
tracing::warn!(
target: LOG_TARGET,
?session_id,
?error,
"failed to build tunnel pool",
);
self.state = PendingSessionState::Inactive { session_id, socket };
}
}
}
state => {
tracing::warn!(
target: LOG_TARGET,
?state,
"`CreateSession` received but tunnel pool is already pending",
);
self.state = state;
}
},
Message::CreateLeaseSet2 {
key,
leaseset,
private_keys,
..
} => match mem::replace(&mut self.state, PendingSessionState::Poisoned) {
PendingSessionState::AwaitingLeaseSet {
session_id,
socket,
options,
handle,
inbound,
outbound,
} => {
return Some(I2cpSessionContext {
address_book: self.address_book.clone(),
destination_id: DestinationId::from(key),
inbound,
leaseset,
options,
outbound,
private_keys,
profile_storage: self.profile_storage.clone(),
session_id,
socket,
tunnel_pool_handle: handle,
});
}
state => {
tracing::warn!(
target: LOG_TARGET,
?state,
"`CreateLeaseSet2` received but not awaiting lease set",
);
debug_assert!(false);
self.state = state;
}
},
_ => {}
}
None
}
}
impl<R: Runtime> Future for PendingI2cpSession<R> {
type Output = Option<I2cpSessionContext<R>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match self.state.socket().poll_next_unpin(cx) {
Poll::Pending => break,
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(message)) =>
if let Some(context) = self.on_message(message) {
return Poll::Ready(Some(context));
},
}
}
loop {
match mem::replace(&mut self.state, PendingSessionState::Poisoned) {
state @ PendingSessionState::Inactive { .. } => {
self.state = state;
break;
}
PendingSessionState::BuildingPool {
session_id,
socket,
options,
mut tunnel_pool_future,
} => match tunnel_pool_future.poll_unpin(cx) {
Poll::Ready(handle) => {
tracing::trace!(
target: LOG_TARGET,
?session_id,
"tunnel pool for the session has been built",
);
self.state = PendingSessionState::BuildingTunnels {
session_id,
socket,
options,
handle,
inbound: HashMap::new(),
outbound: HashSet::new(),
};
}
Poll::Pending => {
self.state = PendingSessionState::BuildingPool {
session_id,
socket,
options,
tunnel_pool_future,
};
break;
}
},
PendingSessionState::BuildingTunnels {
session_id,
mut socket,
options,
mut handle,
mut inbound,
mut outbound,
} => match handle.poll_next_unpin(cx) {
Poll::Pending => {
self.state = PendingSessionState::BuildingTunnels {
session_id,
socket,
options,
handle,
inbound,
outbound,
};
break;
}
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(TunnelPoolEvent::InboundTunnelBuilt { tunnel_id, lease })) => {
tracing::trace!(
target: LOG_TARGET,
?session_id,
%tunnel_id,
"inbound tunnel built for pending session",
);
inbound.insert(tunnel_id, lease);
if inbound.len() != handle.config().num_inbound
|| outbound.len() != handle.config().num_outbound
{
self.state = PendingSessionState::BuildingTunnels {
session_id,
socket,
options,
handle,
inbound,
outbound,
};
continue;
}
tracing::trace!(
target: LOG_TARGET,
?session_id,
"send leaseset request to client",
);
socket.send_message(RequestVariableLeaseSet::new(
session_id,
inbound.values().cloned().collect::<Vec<_>>(),
));
self.state = PendingSessionState::AwaitingLeaseSet {
inbound,
options,
outbound,
session_id,
socket,
handle,
};
}
Poll::Ready(Some(TunnelPoolEvent::OutboundTunnelBuilt { tunnel_id })) => {
tracing::trace!(
target: LOG_TARGET,
?session_id,
%tunnel_id,
"outbound tunnel built for pending session",
);
outbound.insert(tunnel_id);
if inbound.len() != handle.config().num_inbound
|| outbound.len() != handle.config().num_outbound
{
self.state = PendingSessionState::BuildingTunnels {
session_id,
socket,
options,
handle,
inbound,
outbound,
};
continue;
}
tracing::trace!(
target: LOG_TARGET,
?session_id,
"send leaseset request to client",
);
socket.send_message(RequestVariableLeaseSet::new(
session_id,
inbound.values().cloned().collect::<Vec<_>>(),
));
self.state = PendingSessionState::AwaitingLeaseSet {
inbound,
options,
outbound,
session_id,
socket,
handle,
};
}
Poll::Ready(Some(TunnelPoolEvent::InboundTunnelExpired { tunnel_id })) => {
tracing::warn!(
target: LOG_TARGET,
?session_id,
%tunnel_id,
"inbound tunnel expired for pending session",
);
inbound.remove(&tunnel_id);
self.state = PendingSessionState::BuildingTunnels {
session_id,
socket,
options,
handle,
inbound,
outbound,
};
}
Poll::Ready(Some(TunnelPoolEvent::OutboundTunnelExpired { tunnel_id })) => {
tracing::warn!(
target: LOG_TARGET,
?session_id,
%tunnel_id,
"outbound tunnel expired for pending session",
);
outbound.remove(&tunnel_id);
self.state = PendingSessionState::BuildingTunnels {
session_id,
socket,
options,
handle,
inbound,
outbound,
};
}
Poll::Ready(Some(event)) => {
tracing::warn!(
target: LOG_TARGET,
?session_id,
?event,
"unexpected event",
);
self.state = PendingSessionState::BuildingTunnels {
session_id,
socket,
options,
handle,
inbound,
outbound,
};
}
},
state @ PendingSessionState::AwaitingLeaseSet { .. } => {
self.state = state;
break;
}
PendingSessionState::Poisoned => {
tracing::warn!(
target: LOG_TARGET,
"pending i2cp session tunnel pool state is poisoned",
);
debug_assert!(false);
return Poll::Ready(None);
}
}
}
Poll::Pending
}
}