use std::{
any::type_name,
fmt::Display,
sync::{Arc, OnceLock},
time::Duration,
};
use futures::{FutureExt, channel::oneshot, future::Either};
use iroh::{
PublicKey,
endpoint::{RecvStream, SendStream},
};
use pin_utils::pin_mut;
use serde::{Deserialize, Serialize};
use theta_flume::unbounded_with_id;
use tracing::{debug, error, trace, warn};
use crate::{
actor::{Actor, ActorId},
actor_ref::AnyActorRef,
base::{BindingError, Hex, Ident, MonitorError},
compat,
context::RootContext,
message::{Continuation, MsgPack},
prelude::ActorRef,
remote::{
base::{ActorTypeId, RemoteError, Tag},
network::{Network, NetworkError, PreparedConn, RecvFrameExt, SendFrameExt},
serde::{ContinuationDto, MsgPackDto},
},
};
#[cfg(feature = "monitor")]
use crate::monitor::{HDLS, Update, UpdateTx};
pub(crate) static LOCAL_PEER: OnceLock<LocalPeer> = OnceLock::new();
compat_task_local! {
pub(crate) static PEER: Peer;
}
#[derive(Debug, Clone)]
pub(crate) struct LocalPeer(Arc<LocalPeerInner>);
#[derive(Debug, Clone)]
pub struct Peer(Arc<PeerInner>);
#[derive(Debug)]
struct LocalPeerInner {
public_key: PublicKey,
network: Network,
peers: compat::ConcurrentMap<PublicKey, Peer>,
imports: compat::ConcurrentMap<ActorId, AnyImport>,
}
#[derive(Debug)]
struct PeerInner {
public_key: PublicKey,
conn: PreparedConn,
favored_peer: OnceLock<Peer>,
}
#[derive(Debug, Clone)]
struct AnyImport {
pub(crate) peer: Peer,
pub(crate) any_actor: Arc<dyn AnyActorRef>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
enum InitFrame {
Import {
actor_id: ActorId,
},
Lookup {
actor_ty_id: ActorTypeId,
ident: Ident,
},
#[cfg(feature = "monitor")]
Monitor {
actor_ty_id: ActorTypeId,
ident: Ident,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
enum ControlFrame {
Forward {
ident: Ident,
tag: Tag,
bytes: Vec<u8>,
},
}
impl LocalPeer {
pub fn init(endpoint: iroh::Endpoint) {
let this = LocalPeer(Arc::new(LocalPeerInner::new(Network::new(endpoint))));
compat::spawn({
let this = this.clone();
async move {
trace!(
public_key = %this.0.public_key,
"starting local peer",
);
loop {
let (public_key, conn) = match this.0.network.accept_and_prepare().await {
Err(err) => {
error!(%err, "failed to accept conn");
continue;
}
Ok(x) => x,
};
debug!(
public_key = %public_key,
"accepted incoming connection",
);
match this.0.peers.entry(public_key) {
compat::Entry::Occupied(mut o) => {
if this.0.public_key > public_key {
trace!(%public_key, "handling unfavored incoming connection");
o.get().clone().run_unfavored(conn);
continue;
}
let peer = Peer::new(public_key, conn);
trace!(%peer, "replacing with favored");
let unfavored_peer = o.insert(peer.clone());
unfavored_peer
.0
.favored_peer
.set(peer.clone())
.expect("favored peer could be set only once here");
unfavored_peer.0.conn.close(b"superseded");
}
compat::Entry::Vacant(v) => {
let peer = Peer::new(public_key, conn);
trace!(%peer, "adding");
v.insert(peer);
}
}
}
}
});
LOCAL_PEER
.set(this)
.expect("local peer should initialize only once");
}
pub(crate) fn inst() -> &'static LocalPeer {
LOCAL_PEER.get().expect("local peer is not initialized")
}
pub(crate) fn public_key(&self) -> PublicKey {
self.0.public_key
}
pub(crate) fn endpoint(&self) -> &iroh::Endpoint {
&self.0.network.endpoint
}
pub(crate) fn get_or_connect_peer(&self, public_key: PublicKey) -> Peer {
match self.peer_entry(&public_key) {
compat::Entry::Occupied(o) => o.get().clone(),
compat::Entry::Vacant(v) => {
let conn = self.0.network.connect_and_prepare(public_key);
let peer = Peer::new(public_key, conn);
trace!(%peer, "adding");
v.insert(peer.clone());
peer
}
}
}
#[cfg(feature = "monitor")]
pub(crate) fn get_imported_peer(&self, actor_id: &ActorId) -> Option<Peer> {
self.0.imports.with(actor_id, |import| import.peer.clone())
}
pub(crate) fn get_import_public_key(&self, actor_id: &ActorId) -> Option<PublicKey> {
self.0
.imports
.with(actor_id, |import| import.peer.public_key())
}
pub(crate) fn get_or_import_actor<A: Actor>(
&self,
actor_id: ActorId,
peer: impl FnOnce() -> Peer, ) -> Option<ActorRef<A>> {
match self.actor_entry(&actor_id) {
compat::Entry::Vacant(v) => {
let peer = peer();
let actor = peer.import(actor_id);
v.insert(AnyImport {
peer,
any_actor: Arc::new(actor.clone()),
});
Some(actor)
}
compat::Entry::Occupied(o) => {
let any_actor = o.get().any_actor.clone();
any_actor
.as_any()?
.downcast::<ActorRef<A>>()
.ok()
.map(|a| *a)
}
}
}
fn peer_entry(&self, public_key: &PublicKey) -> compat::Entry<'_, PublicKey, Peer> {
self.0.peers.entry(*public_key)
}
fn remove_peer(&self, peer: &Peer) {
trace!(%peer, "removing disconnected");
match self.peer_entry(&peer.public_key()) {
compat::Entry::Vacant(_) => {
error!(%peer, "no such peer to remove"); }
compat::Entry::Occupied(o) => {
if !Arc::ptr_eq(&o.get().0, &peer.0) {
return debug!(%peer, "skipping removal: superseded by path migration");
}
let peer = o.remove();
debug!(%peer, "removed and canceled");
}
}
}
fn actor_entry(&self, actor_id: &ActorId) -> compat::Entry<'_, ActorId, AnyImport> {
self.0.imports.entry(*actor_id)
}
fn remove_actor(&self, actor_id: &ActorId) {
trace!(actor_id = %Hex(actor_id.as_bytes()), "removing remote actor from imports");
match self.0.imports.remove(actor_id) {
None => error!(actor_id = %Hex(actor_id.as_bytes()), "no such remote actor to remove"), Some(_) => debug!(actor_id = %Hex(actor_id.as_bytes()), "removed remote actor"),
}
}
}
impl Peer {
pub(crate) fn new(public_key: PublicKey, conn: PreparedConn) -> Self {
let inst = Self(Arc::new(PeerInner {
public_key,
conn,
favored_peer: OnceLock::new(),
}));
trace!(peer = %inst, %public_key, "creating");
inst.clone().run();
inst
}
pub(crate) fn public_key(&self) -> PublicKey {
self.0.public_key
}
pub(crate) async fn send_forward(
&self,
ident: Ident,
tag: Tag,
bytes: Vec<u8>,
) -> Result<(), RemoteError> {
self.send_control_frame(&ControlFrame::Forward { ident, tag, bytes })
.await
}
#[cfg(feature = "monitor")]
pub(crate) async fn monitor<A: Actor>(
&self,
ident: Ident,
tx: UpdateTx<A>,
) -> Result<(), RemoteError> {
trace!(
actor = format_args!("{}({})", type_name::<A>(), Hex(&ident)),
host = %self,
"sending monitoring request via bi-stream",
);
let (mut send_half, mut recv_half) = self.0.conn.open_bi().await?;
let init_frame = InitFrame::Monitor {
actor_ty_id: A::IMPL_ID,
ident,
};
let init_bytes = postcard::to_stdvec(&init_frame).map_err(RemoteError::SerializeError)?;
send_half.send_frame(init_bytes).await?;
let mut status = [0u8; 1];
compat::timeout(Duration::from_secs(5), recv_half.read_exact(&mut status))
.await
.map_err(|_| RemoteError::Timeout)?
.map_err(|e| NetworkError::ReadExactError(Arc::new(e)))?;
if status[0] == 0x00 {
let mut buf = Vec::new();
recv_half.recv_frame_into(&mut buf).await?;
let err: MonitorError =
postcard::from_bytes(&buf).map_err(RemoteError::DeserializeError)?;
return Err(RemoteError::MonitorError(err));
}
debug!(
actor = format_args!("{}({})", type_name::<A>(), Hex(&ident)),
host = %self,
"received monitoring stream",
);
compat::spawn({
let this = self.clone();
PEER.scope(this, async move {
trace!(
actor = format_args!("{}({})", type_name::<A>(), Hex(&ident)),
host = %PEER.get(),
"starting to monitor",
);
let mut buf = Vec::new();
loop {
buf.clear();
if let Err(err) = recv_half.recv_frame_into(&mut buf).await {
break warn!(
actor = format_args!("{}({})", type_name::<A>(), Hex(&ident)),
host = %PEER.get(),
%err,
"failed to receive update frame",
);
}
let update = match postcard::from_bytes::<Update<A>>(&buf) {
Err(err) => {
error!(
actor = format_args!("{}({})", type_name::<A>(), Hex(&ident)),
host = %PEER.get(),
%err,
"failed to deserialize update frame"
);
continue;
}
Ok(update) => update,
};
if tx.send(update).is_err() {
break warn!(
actor = format_args!("{}({})", type_name::<A>(), Hex(&ident)),
host = %PEER.get(),
err = "update channel closed",
"stopped monitoring",
);
}
}
})
});
Ok(())
}
pub(crate) async fn lookup<A: Actor>(
&self,
ident: Ident,
) -> Result<Result<ActorRef<A>, BindingError>, RemoteError> {
trace!(
actor = format_args!("{}({})", type_name::<A>(), Hex(&ident)),
host = %self,
"sending lookup request via bi-stream",
);
let (mut send_half, mut recv_half) = self.0.conn.open_bi().await?;
let init_frame = InitFrame::Lookup {
actor_ty_id: A::IMPL_ID,
ident,
};
let init_bytes = postcard::to_stdvec(&init_frame).map_err(RemoteError::SerializeError)?;
send_half.send_frame(init_bytes).await?;
drop(send_half);
let mut buf = Vec::new();
compat::timeout(Duration::from_secs(5), recv_half.recv_frame_into(&mut buf))
.await
.map_err(|_| RemoteError::Timeout)??;
let resp: Result<Vec<u8>, BindingError> =
postcard::from_bytes(&buf).map_err(RemoteError::DeserializeError)?;
let peer = match self.0.favored_peer.get() {
None => self.clone(),
Some(p) => p.clone(),
};
debug!(
actor = format_args!("{}({})", type_name::<A>(), Hex(&ident)),
host = %peer,
resp = %match &resp {
Err(err) => format!("Err({err})"),
Ok(b) => format!("Ok({} bytes)", b.len()),
},
"received lookup response",
);
let bytes = match resp {
Err(e) => return Ok(Err(e)),
Ok(bytes) => bytes,
};
let actor = PEER
.sync_scope(peer, || postcard::from_bytes::<ActorRef<A>>(&bytes))
.map_err(RemoteError::DeserializeError)?;
Ok(Ok(actor))
}
fn run(self) {
self.clone().spawn_control_reader(self.0.conn.clone());
compat::spawn({
async move {
trace!(from = %self, "accepting bi-streams");
loop {
let (reply_tx, in_stream) = match self.0.conn.accept_bi().await {
Err(err) => break warn!(from = %self, %err, "failed to accept bi-stream"),
Ok(bi) => bi,
};
Self::spawn_bi_handler(self.clone(), reply_tx, in_stream);
}
LocalPeer::inst().remove_peer(&self);
}
});
}
fn spawn_control_reader(self, conn: PreparedConn) {
compat::spawn({
async move {
let mut control_rx = match conn.control_rx().await {
Err(err) => return warn!(from = %self, %err, "failed to accept control stream"),
Ok(rx) => rx,
};
trace!(from = %self, "control stream reader started");
let mut buf = Vec::new();
loop {
buf.clear();
if let Err(err) = control_rx.recv_frame_into(&mut buf).await {
break warn!(from = %self, %err, "control stream ended");
}
let frame: ControlFrame = match postcard::from_bytes(&buf) {
Err(err) => {
error!(from = %self, %err, "failed to deserialize control frame");
continue;
}
Ok(frame) => frame,
};
#[cfg(feature = "verbose")]
debug!(from = %self, %frame, "received control frame");
match frame {
ControlFrame::Forward { ident, tag, bytes } => {
self.process_forward(ident, tag, bytes).await;
}
}
}
}
});
}
fn run_unfavored(self, unfavored_conn: PreparedConn) {
self.spawn_control_reader(unfavored_conn);
}
fn spawn_bi_handler(peer: Peer, reply_tx: SendStream, in_stream: RecvStream) {
compat::spawn(async move {
let mut buf = Vec::new();
let mut in_stream = in_stream;
let mut reply_tx = reply_tx;
match compat::timeout(Duration::from_secs(5), in_stream.recv_frame_into(&mut buf)).await
{
Err(_) => {
return warn!(from = %peer, "timeout receiving initial frame on bi-stream, dropping dead stream");
}
Ok(Err(err)) => {
return warn!(from = %peer, %err, "failed to receive initial frame on bi-stream");
}
Ok(Ok(())) => {}
}
let init_frame: InitFrame = match postcard::from_bytes(&buf) {
Err(err) => {
return error!(from = %peer, %err, "failed to deserialize initial frame");
}
Ok(frame) => frame,
};
match init_frame {
InitFrame::Import { actor_id } => {
let Ok(actor) =
RootContext::lookup_any_local_unchecked_impl(actor_id.as_bytes())
else {
return error!(from = %peer, actor_id = %Hex(actor_id.as_bytes()), "local actor to export not found");
};
if let Err(err) = actor.spawn_export_task(peer.clone(), in_stream, reply_tx) {
warn!(from = %peer, actor_id = %Hex(actor_id.as_bytes()), %err, "failed to spawn export task");
}
}
InitFrame::Lookup { actor_ty_id, ident } => {
let resp = match RootContext::lookup_any_local_impl(actor_ty_id, &ident) {
Err(e) => Err(e),
Ok(any_actor) => PEER.sync_scope(peer.clone(), || any_actor.serialize()),
};
debug!(
ident = %Hex(&ident),
host = %peer,
"processed lookup request",
);
let bytes = match postcard::to_stdvec(&resp) {
Err(err) => {
return error!(ident = %Hex(&ident), host = %peer, %err, "failed to serialize lookup response");
}
Ok(b) => b,
};
if let Err(err) = reply_tx.send_frame(bytes).await {
warn!(ident = %Hex(&ident), host = %peer, %err, "failed to send lookup response");
}
}
#[cfg(feature = "monitor")]
InitFrame::Monitor { actor_ty_id, ident } => {
Self::handle_monitor_request(peer, actor_ty_id, ident, reply_tx).await;
}
#[cfg(not(feature = "monitor"))]
_ => {
error!(from = %peer, "unexpected InitFrame variant on bi-stream");
}
}
});
}
#[cfg(feature = "monitor")]
async fn handle_monitor_request(
peer: Peer,
actor_ty_id: ActorTypeId,
ident: Ident,
mut reply_tx: SendStream,
) {
let actor = match RootContext::lookup_any_local_impl(actor_ty_id, &ident) {
Err(err) => {
warn!(
ident = %Hex(&ident),
host = %peer,
%err,
"failed to lookup local actor for remote monitoring request",
);
if let Err(err) = reply_tx.write_all(&[0x00]).await {
return error!(ident = %Hex(&ident), host = %peer, %err, "failed to send monitor error status");
}
let monitor_err = MonitorError::from(err);
let err_bytes = match postcard::to_stdvec(&monitor_err) {
Ok(b) => b,
Err(err) => {
return error!(ident = %Hex(&ident), host = %peer, %err, "failed to serialize monitor error");
}
};
if let Err(err) = reply_tx.send_frame(err_bytes).await {
error!(ident = %Hex(&ident), host = %peer, %err, "failed to send monitor error frame");
}
return;
}
Ok(actor) => actor,
};
let Some(id) = actor.id() else {
return warn!(
ident = %Hex(&ident),
host = %peer,
"failed to monitor terminated",
);
};
let Some(hdl) = HDLS.get(&id) else {
warn!(
ident = %Hex(&ident),
host = %peer,
"failed to monitor not found",
);
if let Err(err) = reply_tx.write_all(&[0x00]).await {
return error!(ident = %Hex(&ident), host = %peer, %err, "failed to send monitor error status");
}
let monitor_err = MonitorError::from(BindingError::NotFound);
let err_bytes = match postcard::to_stdvec(&monitor_err) {
Err(err) => {
return error!(ident = %Hex(&ident), host = %peer, %err, "failed to serialize monitor error");
}
Ok(b) => b,
};
if let Err(err) = reply_tx.send_frame(err_bytes).await {
error!(ident = %Hex(&ident), host = %peer, %err, "failed to send monitor error frame");
}
return;
};
if reply_tx.write_all(&[0x01]).await.is_err() {
return warn!(
ident = %Hex(&ident),
host = %peer,
"failed to send monitoring ok status",
);
}
if let Err(e) = actor.monitor_as_bytes(peer.clone(), hdl, reply_tx) {
error!(
ident = %Hex(&ident),
host = %peer,
%e,
"failed to monitor as bytes",
);
}
}
fn import<A: Actor>(&self, actor_id: ActorId) -> ActorRef<A> {
let (msg_tx, msg_rx) = unbounded_with_id::<MsgPack<A>>(actor_id);
compat::spawn({
let this = self.clone();
PEER.scope(self.clone(), async move {
trace!(
actor = format_args!("{}({actor_id})", type_name::<A>()),
host = %this,
"starting imported remote",
);
let init_frame = InitFrame::Import { actor_id };
let init_bytes = match postcard::to_stdvec(&init_frame) {
Err(err) => {
error!(
actor = format_args!("{}({actor_id})", type_name::<A>()),
host = %this,
%err,
"failed to serialize import initial frame",
);
return LocalPeer::inst().remove_actor(&actor_id);
}
Ok(bytes) => bytes,
};
let first_msg = match msg_rx.recv().await {
None => {
debug!(
actor = format_args!("{}({actor_id})", type_name::<A>()),
host = %this,
"remote actor message channel closed (no messages ever sent)",
);
return LocalPeer::inst().remove_actor(&actor_id);
}
Some(msg_k) => msg_k,
};
let (mut send_half, mut recv_half) = match this.0.conn.open_bi().await {
Err(err) => {
error!(
actor = format_args!("{}({actor_id})", type_name::<A>()),
host = %this,
%err,
"failed to open bi-stream to import",
);
return LocalPeer::inst().remove_actor(&actor_id);
}
Ok(s) => s,
};
if let Err(err) = send_half.send_frame(init_bytes).await {
error!(
actor = format_args!("{}({actor_id})", type_name::<A>()),
host = %this,
%err,
"failed to send import initial frame",
);
return LocalPeer::inst().remove_actor(&actor_id);
}
let mut pending_first = Some(first_msg);
loop {
let (msg, k) = if let Some(msg_k) = pending_first.take() {
msg_k
} else {
match msg_rx.try_recv() {
Err(_) => {
let recv_fut = msg_rx.recv().fuse();
let stopped_fut = send_half.stopped().fuse();
pin_mut!(recv_fut, stopped_fut);
match futures::future::select(recv_fut, stopped_fut).await {
Either::Left((mb_msg_k, _)) => match mb_msg_k {
None => break debug!(
actor = format_args!("{}({actor_id})", type_name::<A>()),
host = %this,
"remote actor message channel closed",
),
Some(msg_k) => msg_k,
},
Either::Right((_, _)) => break warn!(
actor = format_args!("{}({actor_id})", type_name::<A>()),
host = %this,
"stopped disconnected remote",
),
}
}
Ok(msg_k) => msg_k,
}
};
#[cfg(feature = "verbose")]
trace!(
msg = ?(std::mem::discriminant(&msg), &k),
target = format_args!("{}({})", type_name::<A>(), Hex(actor_id.as_bytes())),
host =%this,
"sending remote",
);
let (k_dto, bin_reply_tx) = match k {
Continuation::Reply(tx) => {
let (bin_reply_tx, bin_reply_rx) = oneshot::channel();
if tx.send(Box::new(bin_reply_rx)).is_err() {
warn!(
target = format_args!("{}({actor_id})", type_name::<A>()),
host = %this,
"failed to send binary reply rx to caller",
);
continue;
}
(ContinuationDto::Reply, Some(bin_reply_tx))
}
other => {
let Some(dto) = other.into_dto().await else {
break error!(
target = format_args!("{}({actor_id})", type_name::<A>()),
host = %this,
"failed to convert continuation to DTO",
);
};
(dto, None)
}
};
let dto: MsgPackDto<A> = (msg, k_dto);
let msg_k_bytes = match postcard::to_stdvec(&dto) {
Err(err) => {
error!(
target = format_args!("{}({actor_id})", type_name::<A>()),
host = %this,
%err,
"failed to serialize message for remote",
);
continue;
}
Ok(bytes) => bytes,
};
if let Err(err) = send_half.send_frame(msg_k_bytes).await {
break warn!(
target = format_args!("{}({actor_id})", type_name::<A>()),
host = %this,
%err,
"failed to send message to remote",
);
}
if let Some(reply_tx) = bin_reply_tx {
let mut buf = Vec::new();
if let Err(err) = recv_half.recv_frame_into(&mut buf).await {
break warn!(
actor = format_args!("{}({actor_id})", type_name::<A>()),
host = %this,
%err,
"reply read failed, stopping import",
);
}
if reply_tx.send((this.clone(), buf)).is_err() {
trace!(
actor = format_args!("{}({actor_id})", type_name::<A>()),
host = %this,
"reply oneshot dropped (caller cancelled)",
);
}
}
}
debug!(
actor = format_args!("{}({actor_id})", type_name::<A>()),
host = %this,
"stopped imported remote",
);
LocalPeer::inst().remove_actor(&actor_id);
})
});
ActorRef(msg_tx)
}
async fn send_control_frame(&self, frame: &ControlFrame) -> Result<(), RemoteError> {
let bytes = postcard::to_stdvec(frame).map_err(RemoteError::SerializeError)?;
self.0.conn.send_control_frame(bytes).await?;
Ok(())
}
async fn process_forward(&self, ident: Ident, tag: Tag, bytes: Vec<u8>) {
let actor = match RootContext::lookup_any_local_unchecked_impl(&ident) {
Err(err) => {
return error!(
ident = %Hex(&ident),
host = %self,
%err,
"failed to lookup local actor for forwarding",
);
}
Ok(actor) => actor,
};
if let Err(err) = actor.send_tagged_bytes(tag, bytes) {
warn!(
ident = %Hex(&ident),
host = %self,
%err,
"failed to send tagged bytes to local actor",
);
}
}
}
impl Display for Peer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Peer({:p})", Arc::as_ptr(&self.0))
}
}
impl LocalPeerInner {
pub fn new(network: Network) -> Self {
Self {
public_key: network.public_key(),
network,
peers: compat::ConcurrentMap::default(),
imports: compat::ConcurrentMap::default(),
}
}
}
impl Display for InitFrame {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
InitFrame::Import { actor_id } => {
write!(
f,
"InitFrame::Import {{ actor_id: {} }}",
Hex(actor_id.as_bytes())
)
}
InitFrame::Lookup { actor_ty_id, ident } => {
write!(
f,
"InitFrame::Lookup {{ actor_ty_id: {actor_ty_id}, ident: {} }}",
Hex(ident)
)
}
#[cfg(feature = "monitor")]
InitFrame::Monitor { actor_ty_id, ident } => {
write!(
f,
"InitFrame::Monitor {{ actor_ty_id: {actor_ty_id}, ident: {} }}",
Hex(ident)
)
}
}
}
}
impl Display for ControlFrame {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ControlFrame::Forward { ident, tag, bytes } => write!(
f,
"ControlFrame::Forward {{ ident: {}, tag: {tag:?}, bytes: {} bytes }}",
Hex(ident),
bytes.len()
),
}
}
}