use std::sync::{Arc, LazyLock, Mutex};
use futures::channel::oneshot;
use iroh::Endpoint;
use theta_flume::unbounded_with_id;
use tracing::{error, trace};
use uuid::Uuid;
#[cfg(feature = "monitor")]
use crate::monitor::HDLS;
use crate::{
actor::{Actor, ActorArgs, ActorId},
actor_instance::ActorConfig,
actor_ref::{ActorHdl, ActorRef, AnyActorRef, WeakActorHdl, WeakActorRef},
base::{BindingError, Hex, Ident, parse_ident},
compat,
message::RawSignal,
};
#[cfg(feature = "remote")]
use {
crate::remote::{base::ActorTypeId, peer::LocalPeer},
iroh::PublicKey,
};
pub(crate) static BINDINGS: LazyLock<compat::ConcurrentMap<Ident, Arc<dyn AnyActorRef>>> =
LazyLock::new(compat::ConcurrentMap::default);
#[derive(Debug)]
pub struct Context<A: Actor> {
pub this: WeakActorRef<A>,
pub(crate) this_hdl: ActorHdl,
pub(crate) child_hdls: Arc<Mutex<Vec<WeakActorHdl>>>,
}
impl<A: Actor> Clone for Context<A> {
fn clone(&self) -> Self {
Self {
this: self.this.clone(),
this_hdl: self.this_hdl.clone(),
child_hdls: self.child_hdls.clone(),
}
}
}
#[derive(Debug, Clone)]
pub struct RootContext {
pub(crate) this_hdl: ActorHdl,
pub(crate) child_hdls: Arc<Mutex<Vec<WeakActorHdl>>>,
}
impl<A: Actor> Context<A> {
pub fn id(&self) -> ActorId {
self.this_hdl.id()
}
pub fn spawn<Args: ActorArgs>(&self, args: Args) -> ActorRef<Args::Actor> {
let (hdl, actor) = spawn_impl(&self.this_hdl, args);
self.child_hdls.lock().unwrap().push(hdl.downgrade());
actor
}
pub async fn terminate(&self) {
let (tx, rx) = oneshot::channel();
self.this_hdl
.raw_send(RawSignal::Terminate(Some(tx)))
.unwrap();
let _ = rx.await;
}
}
impl RootContext {
#[cfg(feature = "remote")]
pub fn init(endpoint: Endpoint) -> Self {
LocalPeer::init(endpoint);
Self::init_local()
}
pub fn init_local() -> Self {
Self::default()
}
#[cfg(feature = "remote")]
pub fn public_key(&self) -> PublicKey {
LocalPeer::inst().public_key()
}
#[cfg(feature = "remote")]
pub fn endpoint(&self) -> &Endpoint {
LocalPeer::inst().endpoint()
}
pub fn spawn<Args: ActorArgs>(&self, args: Args) -> ActorRef<Args::Actor> {
let (actor_hdl, actor) = spawn_impl(&self.this_hdl, args);
self.child_hdls.lock().unwrap().push(actor_hdl.downgrade());
actor
}
pub fn bind<A: Actor>(
&self,
ident: impl AsRef<str>,
actor: ActorRef<A>,
) -> Result<(), BindingError> {
let ident = parse_ident(ident.as_ref())?;
trace!(ident = % Hex(&ident), % actor, "binding");
Self::bind_impl(ident, actor);
Ok(())
}
pub fn free(&self, ident: &[u8]) -> Option<Arc<dyn AnyActorRef>> {
if ident.len() > 16 {
return None;
}
let mut normalized: Ident = [0u8; 16];
normalized[..ident.len()].copy_from_slice(ident);
Self::free_impl(&normalized)
}
pub async fn terminate(&self) {
let (tx, rx) = oneshot::channel();
self.this_hdl
.raw_send(RawSignal::Terminate(Some(tx)))
.unwrap();
let _ = rx.await;
}
#[allow(dead_code)]
pub(crate) fn is_bound_impl<A: Actor>(ident: &[u8; 16]) -> bool {
BINDINGS
.with(ident, |actor| {
actor.as_any().is_some_and(|a| a.is::<ActorRef<A>>())
})
.unwrap_or(false)
}
pub(crate) fn bind_impl<A: AnyActorRef>(ident: Ident, actor: A) {
let _ = BINDINGS.insert(ident, Arc::new(actor));
}
#[cfg(feature = "remote")]
pub(crate) fn lookup_any_local_impl(
actor_ty_id: ActorTypeId,
ident: &[u8; 16],
) -> Result<Arc<dyn AnyActorRef>, BindingError> {
match Self::lookup_any_local_unchecked_impl(ident) {
Err(err) => Err(err),
Ok(actor) => {
if actor.ty_id() == actor_ty_id {
Ok(actor)
} else {
Err(BindingError::TypeMismatch)
}
}
}
}
#[cfg(feature = "remote")]
pub(crate) fn lookup_any_local_unchecked_impl(
ident: &[u8; 16],
) -> Result<Arc<dyn AnyActorRef>, BindingError> {
BINDINGS
.with(ident, std::clone::Clone::clone)
.ok_or(BindingError::NotFound)
}
pub(crate) fn free_impl(ident: &[u8; 16]) -> Option<Arc<dyn AnyActorRef>> {
BINDINGS.remove(ident).map(|(_, v)| v)
}
}
impl Default for RootContext {
fn default() -> Self {
let (sig_tx, sig_rx) = unbounded_with_id(Uuid::new_v4());
let this_hdl = ActorHdl(sig_tx);
let child_hdls = Arc::new(Mutex::new(Vec::<WeakActorHdl>::new()));
compat::spawn({
let child_hdls = child_hdls.clone();
async move {
while let Some(sig) = sig_rx.recv().await {
match sig {
RawSignal::Escalation(child_hdl, esc) => {
error!(child = % child_hdl.id(), ? esc, "root received escalation");
if let Err(e) = child_hdl.raw_send(RawSignal::Terminate(None)) {
error!(?e, "failed to send terminate to escalating child");
}
}
RawSignal::ChildDropped => {
let mut child_hdls = child_hdls.lock().unwrap();
child_hdls.retain(|hdl| match hdl.upgrade() {
None => false,
Some(hdl) => hdl.0.sender_count() > 0,
});
}
RawSignal::Terminate(k) => {
let alive_hdls: Vec<_> = child_hdls
.lock()
.unwrap()
.iter()
.filter_map(super::actor_ref::WeakActorHdl::upgrade)
.collect();
let child_rxs: Vec<_> = alive_hdls
.iter()
.filter_map(|hdl| {
let (tx, rx) = oneshot::channel();
match hdl.raw_send(RawSignal::Terminate(Some(tx))) {
Ok(()) => Some(rx),
Err(_) => None,
}
})
.collect();
for rx in child_rxs {
let _ = rx.await;
}
if let Some(k) = k {
let _ = k.send(());
}
}
RawSignal::Pause(k) | RawSignal::Resume(k) | RawSignal::Restart(k) => {
if let Some(k) = k {
let _ = k.send(());
}
}
#[cfg(feature = "monitor")]
RawSignal::Monitor(_) => {}
}
}
}
});
Self {
this_hdl,
child_hdls,
}
}
}
pub(crate) fn spawn_impl<Args: ActorArgs>(
parent_hdl: &ActorHdl,
args: Args,
) -> (ActorHdl, ActorRef<Args::Actor>) {
spawn_with_id_impl(Uuid::new_v4(), parent_hdl, args)
}
pub(crate) fn spawn_with_id_impl<Args: ActorArgs>(
actor_id: ActorId,
parent_hdl: &ActorHdl,
args: Args,
) -> (ActorHdl, ActorRef<Args::Actor>) {
let (msg_tx, msg_rx) = unbounded_with_id(actor_id);
let (sig_tx, sig_rx) = unbounded_with_id(actor_id);
let actor_hdl = ActorHdl(sig_tx);
let actor = ActorRef(msg_tx);
#[cfg(feature = "monitor")]
let _ = HDLS.insert(actor_id, actor_hdl.clone());
compat::spawn({
let actor = actor.downgrade();
let parent_hdl = parent_hdl.clone();
let actor_hdl = actor_hdl.clone();
async move {
let config = ActorConfig::new(actor, parent_hdl, actor_hdl, sig_rx, msg_rx, args);
config.exec().await;
#[cfg(feature = "monitor")]
let _ = HDLS.remove(&actor_id);
}
});
(actor_hdl, actor)
}