use crate::{
error::{ConnectionError, Error, I2cpError},
i2cp::{
pending::{I2cpSessionContext, PendingI2cpSession},
session::I2cpSession,
socket::I2cpSocket,
},
netdb::NetDbHandle,
profile::ProfileStorage,
runtime::{AddressBook, JoinSet, Runtime, TcpListener},
tunnel::TunnelManagerHandle,
util::AsyncReadExt,
};
use futures::StreamExt;
use alloc::{string::String, sync::Arc, vec};
use core::{
future::Future,
net::{IpAddr, SocketAddr},
pin::Pin,
task::{Context, Poll},
};
mod message;
mod payload;
mod pending;
mod session;
mod socket;
pub use payload::{I2cpPayload, I2cpPayloadBuilder};
#[cfg(feature = "fuzz")]
pub use message::{Message, MessageType};
const LOG_TARGET: &str = "emissary::i2cp";
const I2CP_PROTOCOL_BYTE: u8 = 0x2a;
pub struct I2cpServer<R: Runtime> {
address_book: Option<Arc<dyn AddressBook>>,
listener: R::TcpListener,
netdb_handle: NetDbHandle,
next_session_id: u16,
pending_connections: R::JoinSet<crate::Result<R::TcpStream>>,
pending_session: R::JoinSet<Option<I2cpSessionContext<R>>>,
profile_storage: ProfileStorage<R>,
tunnel_manager_handle: TunnelManagerHandle,
}
impl<R: Runtime> I2cpServer<R> {
pub async fn new(
host: String,
port: u16,
netdb_handle: NetDbHandle,
tunnel_manager_handle: TunnelManagerHandle,
address_book: Option<Arc<dyn AddressBook>>,
profile_storage: ProfileStorage<R>,
) -> crate::Result<Self> {
tracing::info!(
target: LOG_TARGET,
?port,
"starting i2cp server",
);
let address = SocketAddr::new(host.parse::<IpAddr>().expect("valid address"), port);
let listener = R::TcpListener::bind(address)
.await
.ok_or(Error::Connection(ConnectionError::BindFailure))?;
Ok(Self {
address_book,
listener,
netdb_handle,
next_session_id: 1u16,
pending_connections: R::join_set(),
pending_session: R::join_set(),
profile_storage,
tunnel_manager_handle,
})
}
fn next_session_id(&mut self) -> u16 {
let session_id = self.next_session_id;
self.next_session_id = self.next_session_id.wrapping_add(1);
session_id
}
pub fn local_address(&self) -> Option<SocketAddr> {
self.listener.local_address()
}
}
impl<R: Runtime> Future for I2cpServer<R> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match self.listener.poll_accept(cx) {
Poll::Pending => break,
Poll::Ready(None) => {
tracing::error!(
target: LOG_TARGET,
"ready `None` from i2cp server socket",
);
return Poll::Ready(());
}
Poll::Ready(Some((mut stream, _))) => {
tracing::trace!(
target: LOG_TARGET,
"incoming connection, read protocol byte",
);
self.pending_connections.push(async move {
let mut protocol_byte = vec![0u8; 1];
stream.read_exact::<R>(&mut protocol_byte).await?;
if protocol_byte[0] != I2CP_PROTOCOL_BYTE {
return Err(Error::I2cp(I2cpError::InvalidProtocolByte(
protocol_byte[0],
)));
}
Ok(stream)
});
}
}
}
loop {
match self.pending_connections.poll_next_unpin(cx) {
Poll::Pending => break,
Poll::Ready(None) => {
tracing::error!(
target: LOG_TARGET,
"read `None` from pending connections",
);
return Poll::Ready(());
}
Poll::Ready(Some(Err(error))) => tracing::warn!(
target: LOG_TARGET,
?error,
"failed to accept inbound i2cp connection",
),
Poll::Ready(Some(Ok(stream))) => {
let session_id = self.next_session_id();
let tunnel_manager_handle = self.tunnel_manager_handle.clone();
let address_book = self.address_book.clone();
let profile_storage = self.profile_storage.clone();
tracing::trace!(
target: LOG_TARGET,
?session_id,
"i2cp client session accepted",
);
self.pending_session.push(PendingI2cpSession::<R>::new(
session_id,
I2cpSocket::new(stream),
tunnel_manager_handle,
address_book,
profile_storage,
));
}
}
}
loop {
match self.pending_session.poll_next_unpin(cx) {
Poll::Pending => break,
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(None)) => {}
Poll::Ready(Some(Some(context))) => {
tracing::info!(
target: LOG_TARGET,
session_id = ?context.session_id,
"start active i2cp connection",
);
R::spawn(I2cpSession::<R>::new(self.netdb_handle.clone(), context));
}
}
}
Poll::Pending
}
}