use std::collections::BTreeMap;
use std::net::{SocketAddrV4, SocketAddrV6};
use std::sync::Arc;
use std::time::Duration;
use iroh::endpoint::{QuicTransportConfig, presets};
use iroh::protocol::DynProtocolHandler;
use p2panda_core::SigningKey;
use ractor::thread_local::{ThreadLocalActor, ThreadLocalActorSpawner};
use ractor::{ActorProcessingErr, ActorRef, RpcReplyPort, SupervisionEvent};
use thiserror::Error;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tracing::{debug, error, warn};
use crate::address_book::report::{ConnectionOutcome, ConnectionRole};
use crate::address_book::{AddressBook, AddressBookError};
use crate::iroh_endpoint::actors::connection::{
ConnectReplyPort, ConnectionActorError, IrohConnection, IrohConnectionArgs,
};
use crate::iroh_endpoint::actors::is_globally_reachable_endpoint;
use crate::iroh_endpoint::config::IrohConfig;
use crate::iroh_endpoint::discovery::AddressBookDiscovery;
use crate::utils::{ShortFormat, from_signing_key};
use crate::{NetworkId, NodeId, ProtocolId, hash_protocol_id_with_network_id};
pub const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(5);
pub const MAX_IDLE_TIMEOUT: Duration = Duration::from_secs(10);
#[allow(clippy::large_enum_variant)]
pub enum ToIrohEndpoint {
Bind,
Endpoint(RpcReplyPort<iroh::Endpoint>),
RegisterProtocol(ProtocolId, Box<dyn DynProtocolHandler>),
Connect(
NodeId,
ProtocolId,
Option<QuicTransportConfig>,
ConnectReplyPort,
),
Incoming(iroh::endpoint::Incoming),
Report {
remote_node_id: NodeId,
role: ConnectionRole,
outcome: ConnectionOutcome,
},
}
pub type ProtocolMap = Arc<RwLock<BTreeMap<ProtocolId, Box<dyn DynProtocolHandler>>>>;
pub struct IrohState {
network_id: NetworkId,
signing_key: SigningKey,
config: IrohConfig,
relay_map: iroh::RelayMap,
address_book: AddressBook,
endpoint: Option<iroh::Endpoint>,
protocols: ProtocolMap,
accept_handle: Option<JoinHandle<()>>,
watch_addr_handle: Option<JoinHandle<()>>,
worker_pool: ThreadLocalActorSpawner,
}
pub type IrohEndpointArgs = (
NetworkId,
SigningKey,
IrohConfig,
iroh::RelayMap,
AddressBook,
);
#[derive(Default)]
pub struct IrohEndpoint;
impl ThreadLocalActor for IrohEndpoint {
type State = IrohState;
type Msg = ToIrohEndpoint;
type Arguments = IrohEndpointArgs;
async fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
let (network_id, signing_key, config, relay_map, address_book) = args;
myself.send_message(ToIrohEndpoint::Bind)?;
Ok(IrohState {
network_id,
signing_key,
config,
relay_map,
address_book,
endpoint: None,
protocols: Arc::default(),
accept_handle: None,
watch_addr_handle: None,
worker_pool: ThreadLocalActorSpawner::new(),
})
}
async fn post_stop(
&self,
_myself: ActorRef<Self::Msg>,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
if let Some(endpoint) = state.endpoint.take() {
tokio::task::spawn(async move {
endpoint.close().await;
});
}
if let Some(watch_addr_handle) = &state.watch_addr_handle {
watch_addr_handle.abort();
}
if let Some(accept_handle) = &state.accept_handle {
accept_handle.abort();
}
Ok(())
}
async fn handle(
&self,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
ToIrohEndpoint::Bind => {
let config = state.config.clone();
let socket_address_v4 = SocketAddrV4::new(config.bind_ip_v4, config.bind_port_v4);
let socket_address_v6 =
SocketAddrV6::new(config.bind_ip_v6, config.bind_port_v6, 0, 0);
let quic_transport_config = QuicTransportConfig::builder()
.keep_alive_interval(KEEP_ALIVE_INTERVAL)
.max_idle_timeout(Some(
MAX_IDLE_TIMEOUT.try_into().expect("correct max idle value"),
))
.build();
let relay_mode = iroh::RelayMode::Custom(state.relay_map.clone());
let address_book_discovery = AddressBookDiscovery::new(
state.signing_key.clone(),
state.address_book.clone(),
);
let endpoint = iroh::Endpoint::builder(presets::Minimal)
.relay_mode(relay_mode)
.address_lookup(address_book_discovery)
.secret_key(from_signing_key(state.signing_key.clone()))
.transport_config(quic_transport_config)
.bind_addr(socket_address_v4)?
.bind_addr(socket_address_v6)?
.bind()
.await
.inspect_err(|err| error!("{err}"))?;
let accept_handle = {
let endpoint = endpoint.clone();
tokio::spawn(async move {
loop {
let Some(incoming) = endpoint.accept().await else {
break; };
let _ = myself.send_message(ToIrohEndpoint::Incoming(incoming));
}
})
};
state.endpoint = Some(endpoint);
state.accept_handle = Some(accept_handle);
}
ToIrohEndpoint::RegisterProtocol(alpn, protocol_handler) => {
let mixed_protocol_id = hash_protocol_id_with_network_id(&alpn, state.network_id);
debug!(alpn = %mixed_protocol_id.fmt_short(), "register protocol");
let mut protocols = state.protocols.write().await;
protocols.insert(mixed_protocol_id, protocol_handler);
state
.endpoint
.as_ref()
.expect(
"bind always takes place first, an endpoint must exist after this point",
)
.set_alpns(protocols.keys().cloned().collect());
}
ToIrohEndpoint::Connect(node_id, alpn, quic_transport_config, reply) => {
let mixed_protocol_id = hash_protocol_id_with_network_id(&alpn, state.network_id);
let result = match state.address_book.node_info(node_id).await {
Ok(result) => result,
Err(err) => {
let _ = reply.send(Err(err.into()));
return Ok(());
}
};
let Some(node_info) = result else {
let _ = reply.send(Err(ConnectError::TransportInfoMissing(node_id)));
return Ok(());
};
let Ok(endpoint_addr) = iroh::EndpointAddr::try_from(node_info) else {
let _ = reply.send(Err(ConnectError::TransportInfoMissing(node_id)));
return Ok(());
};
IrohConnection::spawn_linked(
None,
(
state.signing_key.verifying_key(),
IrohConnectionArgs::Connect {
endpoint: state.endpoint
.clone()
.expect(
"bind always takes place first, an endpoint must exist after this point"
),
endpoint_addr: endpoint_addr.clone(),
alpn: mixed_protocol_id,
quic_transport_config,
reply,
},
myself.clone(),
),
myself.into(),
state.worker_pool.clone(),
)
.await?;
}
ToIrohEndpoint::Incoming(incoming) => {
IrohConnection::spawn_linked(
None,
(
state.signing_key.verifying_key(),
IrohConnectionArgs::Accept {
incoming,
protocols: state.protocols.clone(),
},
myself.clone(),
),
myself.into(),
state.worker_pool.clone(),
)
.await?;
}
ToIrohEndpoint::Endpoint(reply) => {
let _ = reply.send(state.endpoint.clone().expect(
"bind always takes place first, an endpoint must exist after this point",
));
}
ToIrohEndpoint::Report {
remote_node_id,
outcome,
role,
..
} => {
if let ConnectionRole::Connect { .. } = role
&& outcome.is_failed()
&& !is_globally_reachable_endpoint(state.endpoint.as_ref().expect(
"bind always takes place first, an endpoint must exist after this point").addr()) {
return Ok(());
}
if state
.address_book
.report(remote_node_id, outcome)
.await
.is_err()
{
warn!("could not record connection outcome in address book");
}
}
}
Ok(())
}
async fn handle_supervisor_evt(
&self,
_myself: ActorRef<Self::Msg>,
_message: SupervisionEvent,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
Ok(())
}
}
#[derive(Debug, Error)]
pub enum ConnectError {
#[error(transparent)]
AddressBook(#[from] AddressBookError),
#[error("address book does not have any iroh address info for node id {0}")]
TransportInfoMissing(NodeId),
#[error(transparent)]
Connection(#[from] ConnectionActorError),
}