use crate::{
error::Error,
events::EventHandle,
netdb::NetDbHandle,
primitives::{Lease, TunnelId},
profile::ProfileStorage,
runtime::{AddressBook, Runtime},
sam::{
parser::{DestinationContext, SessionKind},
socket::SamSocket,
types::{SamSessionCommand, SamSessionCommandRecycle},
SubSessionCommand,
},
tunnel::{TunnelPoolEvent, TunnelPoolHandle},
};
use futures::{future::BoxFuture, StreamExt};
use hashbrown::{HashMap, HashSet};
use thingbuf::mpsc::{Receiver, Sender};
use alloc::{boxed::Box, string::String, sync::Arc, vec::Vec};
use core::time::Duration;
const LOG_TARGET: &str = "emissary::sam::pending::session";
const RETRY_DURATION: Duration = Duration::from_secs(5);
pub struct SamSessionContext<R: Runtime> {
pub address_book: Option<Arc<dyn AddressBook>>,
pub datagram_tx: Sender<(u16, Vec<u8>)>,
pub destination: DestinationContext,
pub event_handle: EventHandle<R>,
pub inbound: HashMap<TunnelId, Lease>,
pub netdb_handle: NetDbHandle,
pub options: HashMap<String, String>,
pub outbound: HashSet<TunnelId>,
pub profile_storage: ProfileStorage<R>,
pub receiver: Receiver<SamSessionCommand<R>, SamSessionCommandRecycle>,
pub session_id: Arc<str>,
pub session_kind: SessionKind,
pub socket: Box<SamSocket<R>>,
pub sub_session_tx: Option<Sender<SubSessionCommand>>,
pub tunnel_pool_handle: TunnelPoolHandle,
}
pub struct PendingSamSession<R: Runtime> {
address_book: Option<Arc<dyn AddressBook>>,
datagram_tx: Sender<(u16, Vec<u8>)>,
destination: DestinationContext,
event_handle: EventHandle<R>,
inbound: HashMap<TunnelId, Lease>,
netdb_handle: NetDbHandle,
options: HashMap<String, String>,
outbound: HashSet<TunnelId>,
profile_storage: ProfileStorage<R>,
receiver: Receiver<SamSessionCommand<R>, SamSessionCommandRecycle>,
session_id: Arc<str>,
session_kind: SessionKind,
socket: Box<SamSocket<R>>,
sub_session_tx: Option<Sender<SubSessionCommand>>,
tunnel_pool_future: BoxFuture<'static, TunnelPoolHandle>,
}
impl<R: Runtime> PendingSamSession<R> {
pub fn new(
socket: Box<SamSocket<R>>,
destination: DestinationContext,
session_id: Arc<str>,
session_kind: SessionKind,
options: HashMap<String, String>,
receiver: Receiver<SamSessionCommand<R>, SamSessionCommandRecycle>,
datagram_tx: Sender<(u16, Vec<u8>)>,
tunnel_pool_future: BoxFuture<'static, TunnelPoolHandle>,
netdb_handle: NetDbHandle,
address_book: Option<Arc<dyn AddressBook>>,
event_handle: EventHandle<R>,
profile_storage: ProfileStorage<R>,
sub_session_tx: Option<Sender<SubSessionCommand>>,
) -> Self {
Self {
address_book,
datagram_tx,
destination,
event_handle,
inbound: HashMap::new(),
netdb_handle,
options,
outbound: HashSet::new(),
profile_storage,
receiver,
session_id,
session_kind,
socket,
sub_session_tx,
tunnel_pool_future,
}
}
pub async fn run(mut self) -> crate::Result<SamSessionContext<R>> {
loop {
match self.netdb_handle.wait_until_ready() {
Ok(rx) =>
if rx.await.is_ok() {
break;
},
Err(_) => R::delay(RETRY_DURATION).await,
}
}
let mut tunnel_pool_handle = self.tunnel_pool_future.await;
tracing::trace!(
target: LOG_TARGET,
session_id = %self.session_id,
"tunnel pool for the session has been built",
);
loop {
match tunnel_pool_handle.next().await.ok_or(Error::EssentialTaskClosed)? {
TunnelPoolEvent::InboundTunnelBuilt { tunnel_id, lease } => {
tracing::trace!(
target: LOG_TARGET,
session_id = %self.session_id,
%tunnel_id,
"inbound tunnel built for pending session",
);
self.inbound.insert(tunnel_id, lease);
if !self.inbound.is_empty() && !self.outbound.is_empty() {
break;
}
}
TunnelPoolEvent::OutboundTunnelBuilt { tunnel_id } => {
tracing::trace!(
target: LOG_TARGET,
session_id = %self.session_id,
%tunnel_id,
"outbound tunnel built for pending session",
);
self.outbound.insert(tunnel_id);
if !self.inbound.is_empty() && !self.outbound.is_empty() {
break;
}
}
TunnelPoolEvent::InboundTunnelExpired { tunnel_id } => {
tracing::warn!(
target: LOG_TARGET,
session_id = %self.session_id,
%tunnel_id,
"inbound tunnel expired for pending session",
);
self.inbound.remove(&tunnel_id);
}
TunnelPoolEvent::OutboundTunnelExpired { tunnel_id } => {
tracing::warn!(
target: LOG_TARGET,
session_id = %self.session_id,
%tunnel_id,
"outbound tunnel expired for pending session",
);
self.outbound.remove(&tunnel_id);
}
_ => {}
}
}
Ok(SamSessionContext {
address_book: self.address_book,
datagram_tx: self.datagram_tx,
destination: self.destination,
event_handle: self.event_handle,
inbound: self.inbound,
netdb_handle: self.netdb_handle,
options: self.options,
outbound: self.outbound,
profile_storage: self.profile_storage,
receiver: self.receiver,
session_id: self.session_id,
session_kind: self.session_kind,
socket: self.socket,
sub_session_tx: self.sub_session_tx,
tunnel_pool_handle,
})
}
}