use futures::channel::oneshot;
use iroh::PublicKey;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use tracing::{error, trace, warn};
use crate::{
actor::{Actor, ActorId},
base::{BindingError, Hex, Ident},
context::RootContext,
message::Continuation,
prelude::ActorRef,
remote::{
base::{Key, Tag},
peer::{LocalPeer, PEER},
},
};
pub trait FromTaggedBytes: Sized {
fn from(tag: Tag, bytes: &[u8]) -> Result<Self, postcard::Error>;
}
pub(crate) type MsgPackDto<A> = (<A as Actor>::Msg, ContinuationDto);
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct ForwardInfo {
pub(crate) actor_id: ActorId,
pub(crate) tag: Tag,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) enum ActorRefDto {
First {
actor_id: ActorId, },
Second {
actor_id: ActorId, },
Third {
actor_id: ActorId, public_key: PublicKey,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) enum ContinuationDto {
Nil,
Reply(Key), Forward {
ident: Ident,
mb_public_key: Option<PublicKey>, tag: Tag,
},
}
impl<A: Actor> From<&ActorRef<A>> for ActorRefDto {
fn from(actor: &ActorRef<A>) -> Self {
let actor_id = actor.id();
match LocalPeer::inst().get_import_public_key(&actor_id) {
None => {
RootContext::bind_impl(*actor_id.as_bytes(), actor.downgrade());
ActorRefDto::Second { actor_id }
}
Some(public_key) => match PEER.with(|p| p.public_key() == public_key) {
false => ActorRefDto::Third {
public_key,
actor_id,
},
true => ActorRefDto::First { actor_id }, },
}
}
}
impl<A: Actor> Serialize for ActorRef<A> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
ActorRefDto::from(self).serialize(serializer)
}
}
impl<'de, A: Actor> Deserialize<'de> for ActorRef<A> {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
ActorRefDto::deserialize(deserializer)?
.try_into()
.map_err(|e| {
serde::de::Error::custom(format!(
"failed to construct ActorRef from ActorRefDto: {e}"
))
})
}
}
impl<A: Actor> TryFrom<ActorRefDto> for ActorRef<A> {
type Error = BindingError;
fn try_from(dto: ActorRefDto) -> Result<Self, Self::Error> {
match dto {
ActorRefDto::First { actor_id } => {
Ok(ActorRef::<A>::lookup_local_impl(actor_id.as_bytes())?)
} ActorRefDto::Second { actor_id } => {
match LocalPeer::inst().get_or_import_actor::<A>(actor_id, || {
PEER.get()
}) {
None => Err(BindingError::DowncastError), Some(actor) => Ok(actor),
}
}
ActorRefDto::Third {
public_key,
actor_id,
} => {
match LocalPeer::inst().get_or_import_actor::<A>(actor_id, || {
LocalPeer::inst().get_or_connect_peer(public_key)
}) {
None => Err(BindingError::DowncastError), Some(actor) => Ok(actor),
}
}
}
}
}
impl Continuation {
pub(crate) async fn into_dto(self) -> Option<ContinuationDto> {
match self {
Continuation::Nil => Some(ContinuationDto::Nil),
Continuation::Reply(tx) => {
let (bin_reply_tx, bin_reply_rx) = oneshot::channel();
match tx.send(Box::new(bin_reply_rx)) {
Err(_) => {
warn!("failed to send binary reply tx");
Some(ContinuationDto::Nil)
}
Ok(_) => PEER
.with(|p| Some(ContinuationDto::Reply(p.arrange_recv_reply(bin_reply_tx)))),
}
}
Continuation::Forward(tx) => {
let (info_tx, info_rx) = oneshot::channel::<ForwardInfo>();
if tx.send(Box::new(info_tx)).is_err() {
warn!("failed to request forward info");
return Some(ContinuationDto::Nil);
};
let Ok(info) = info_rx.await else {
warn!("failed to receive forward info");
return Some(ContinuationDto::Nil);
};
let mb_public_key = match LocalPeer::inst().get_import_public_key(&info.actor_id) {
None => Some(LocalPeer::inst().public_key()), Some(public_key) => match PEER.with(|p| p.public_key() == public_key) {
false => Some(public_key), true => None, },
};
Some(ContinuationDto::Forward {
ident: *info.actor_id.as_bytes(),
mb_public_key,
tag: info.tag,
})
}
_ => panic!("only Nil | Reply | Forward continuations are serializable"),
}
}
}
impl From<ContinuationDto> for Continuation {
fn from(dto: ContinuationDto) -> Self {
match dto {
ContinuationDto::Nil => Continuation::Nil,
ContinuationDto::Reply(key) => Continuation::BinReply {
peer: PEER.get(),
key,
},
ContinuationDto::Forward {
ident,
mb_public_key,
tag,
} => match mb_public_key {
None => {
let (tx, rx) = oneshot::channel::<Vec<u8>>();
crate::compat::spawn({
PEER.scope(PEER.get(), async move {
trace!(
ident = %Hex(&ident),
"Scheduling deligated local forwarding"
);
let Ok(bytes) = rx.await else {
return warn!(
ident = %Hex(&ident),
"failed to receive binary local forwarding"
);
};
let any_actor =
match RootContext::lookup_any_local_unchecked_impl(&ident) {
Err(err) => {
return error!(
ident = %Hex(&ident),
%err,
"failed to find local forward target",
);
}
Ok(any_actor) => any_actor,
};
if let Err(err) = any_actor.send_tagged_bytes(tag, bytes) {
error!(
ident = %Hex(&ident),
%err,
"failed to send binary local forwarding"
);
}
})
});
Continuation::LocalBinForward {
peer: PEER.get(),
tx,
}
}
Some(public_key) => {
let (tx, rx) = oneshot::channel::<Vec<u8>>();
crate::compat::spawn(PEER.scope(PEER.get(), async move {
trace!(
ident = %Hex(&ident),
host = %PEER.get(),
"scheduling deligated remote forwarding"
);
let Ok(bytes) = rx.await else {
return warn!("failed to receive binary remote forwarding");
};
let target_peer = LocalPeer::inst().get_or_connect_peer(public_key);
if let Err(err) = target_peer.send_forward(ident, tag, bytes).await {
warn!(
ident = %Hex(&ident),
host = %PEER.get(),
%err,
"failed to send binary remote forwarding"
);
}
}));
Continuation::RemoteBinForward {
peer: PEER.get(),
tx,
}
}
},
}
}
}