use iroh::endpoint::{ConnectOptions, QuicTransportConfig};
use ractor::thread_local::ThreadLocalActor;
use ractor::{ActorProcessingErr, ActorRef, RpcReplyPort};
use thiserror::Error;
use tracing::field::Empty;
use tracing::{Instrument, debug, info_span, warn};
use crate::address_book::report::{ConnectionOutcome, ConnectionRole};
use crate::iroh_endpoint::actors::endpoint::{ConnectError, ProtocolMap, ToIrohEndpoint};
use crate::utils::{ShortFormat, to_verifying_key};
use crate::{NodeId, ProtocolId};
pub type ConnectReplyPort = RpcReplyPort<Result<iroh::endpoint::Connection, ConnectError>>;
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum IrohConnectionArgs {
Connect {
endpoint: iroh::endpoint::Endpoint,
endpoint_addr: iroh::EndpointAddr,
alpn: ProtocolId,
quic_transport_config: Option<QuicTransportConfig>,
reply: ConnectReplyPort,
},
Accept {
incoming: iroh::endpoint::Incoming,
protocols: ProtocolMap,
},
}
pub enum ToIrohConnection {
EstablishConnection(NodeId, IrohConnectionArgs, ActorRef<ToIrohEndpoint>),
}
#[derive(Default)]
pub struct IrohConnection;
impl ThreadLocalActor for IrohConnection {
type State = ();
type Msg = ToIrohConnection;
type Arguments = (NodeId, IrohConnectionArgs, ActorRef<ToIrohEndpoint>);
async fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
let (node_id, args, endpoint_ref) = args;
myself.send_message(ToIrohConnection::EstablishConnection(
node_id,
args,
endpoint_ref,
))?;
Ok(())
}
async fn handle(
&self,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
ToIrohConnection::EstablishConnection(node_id, args, endpoint_ref) => {
let span =
info_span!("connection", me=%node_id.fmt_short(), remote=Empty, alpn=Empty);
establish_connection(args, endpoint_ref)
.instrument(span)
.await?;
myself.stop(None);
}
}
Ok(())
}
}
async fn establish_connection(
args: IrohConnectionArgs,
endpoint_ref: ActorRef<ToIrohEndpoint>,
) -> Result<(), ConnectionActorError> {
match args {
IrohConnectionArgs::Connect {
endpoint,
endpoint_addr,
alpn,
quic_transport_config,
reply,
} => {
tracing::Span::current().record("alpn", alpn.fmt_short());
let remote_node_id = to_verifying_key(endpoint_addr.id);
let mut connect_options = ConnectOptions::default();
if let Some(config) = quic_transport_config {
connect_options = connect_options.with_transport_config(config);
}
let connecting = endpoint
.connect_with_opts(endpoint_addr.clone(), &alpn, connect_options)
.await
.map_err(|err| ConnectionActorError::Iroh(err.into()));
let connection = match connecting {
Ok(connecting) => connecting
.await
.map_err(|err| ConnectionActorError::Iroh(err.into())),
Err(err) => Err(err),
};
match connection {
Ok(connection) => {
debug!("successfully initiated connection");
let _ = endpoint_ref.send_message(ToIrohEndpoint::Report {
remote_node_id,
role: ConnectionRole::Connect {
remote_address: endpoint_addr,
},
outcome: ConnectionOutcome::Successful,
});
let _ = reply.send(Ok(connection));
}
Err(err) => {
warn!("connection establishment failed: {err:#}");
let _ = endpoint_ref.send_message(ToIrohEndpoint::Report {
remote_node_id,
role: ConnectionRole::Connect {
remote_address: endpoint_addr,
},
outcome: ConnectionOutcome::Failed,
});
let reason = err.to_string();
let _ = reply.send(Err(err.into()));
return Err(ConnectionActorError::ConnectionAttemptFailed(reason));
}
}
}
IrohConnectionArgs::Accept {
incoming,
protocols,
} => {
let mut accepting = match incoming.accept() {
Ok(accepting) => accepting,
Err(err) => {
warn!("ignoring connection: accepting failed: {err:#}");
return Err(ConnectionActorError::Iroh(err.into()));
}
};
let alpn = match accepting.alpn().await {
Ok(alpn) => alpn,
Err(err) => {
warn!("ignoring connection: invalid handshake: {err:#}");
return Err(ConnectionActorError::Iroh(err.into()));
}
};
tracing::Span::current().record("alpn", alpn.fmt_short());
let protocols = protocols.read().await;
let Some(protocol_handler) = protocols.get(&alpn) else {
warn!("ignoring connection: unsupported alpn protocol");
return Err(ConnectionActorError::InvalidAlpnHandshake(alpn));
};
let connection = match protocol_handler.on_accepting(accepting).await {
Ok(connection) => connection,
Err(err) => {
warn!("accepting incoming connection ended with error: {err}");
return Err(ConnectionActorError::Iroh(err.into()));
}
};
tracing::Span::current().record(
"remote",
tracing::field::display(connection.remote_id().fmt_short()),
);
debug!("successfully accepted connection");
let _ = endpoint_ref.send_message(ToIrohEndpoint::Report {
remote_node_id: to_verifying_key(connection.remote_id()),
role: ConnectionRole::Accept,
outcome: ConnectionOutcome::Successful,
});
let _ = protocol_handler.accept(connection).await;
}
}
Ok(())
}
#[derive(Debug, Error)]
pub enum IrohError {
#[error(transparent)]
Connect(#[from] iroh::endpoint::ConnectError),
#[error(transparent)]
ConnectWithOpts(#[from] iroh::endpoint::ConnectWithOptsError),
#[error(transparent)]
Connecting(#[from] iroh::endpoint::ConnectingError),
#[error(transparent)]
Connection(#[from] iroh::endpoint::ConnectionError),
#[error(transparent)]
Alpn(#[from] iroh::endpoint::AlpnError),
#[error(transparent)]
Accept(#[from] iroh::protocol::AcceptError),
}
#[derive(Debug, Error)]
pub enum ConnectionActorError {
#[error("{0}")]
Iroh(IrohError),
#[error("remote node tried to connect with unknown alpn")]
InvalidAlpnHandshake(Vec<u8>),
#[error("{0}")]
ConnectionAttemptFailed(String),
}