use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use futures::FutureExt;
use tokio_util::sync::CancellationToken;
use dactor::actor::{
Actor, ActorContext, ActorError, ActorRef, AskReply, ReduceHandler, Handler, ExpandHandler,
TransformHandler,
};
use dactor::dead_letter::{DeadLetterEvent, DeadLetterHandler, DeadLetterReason};
use dactor::dispatch::{AskDispatch, Dispatch, ReduceDispatch, ExpandDispatch, TransformDispatch, TypedDispatch};
use dactor::errors::{ActorSendError, ErrorAction, RuntimeError};
use dactor::interceptor::{
Disposition, DropObserver, InboundContext, InboundInterceptor, OutboundInterceptor, Outcome,
SendMode,
};
use dactor::mailbox::MailboxConfig;
use dactor::message::{Headers, Message, RuntimeHeaders};
use dactor::node::{ActorId, NodeId};
use dactor::runtime_support::{
spawn_reduce_batched_drain, spawn_reduce_drain, spawn_transform_drain,
wrap_batched_stream_with_interception, wrap_stream_with_interception, OutboundPipeline,
};
use dactor::stream::{
BatchConfig, BatchReader, BatchWriter, BoxStream, StreamReceiver, StreamSender,
};
use dactor::supervision::ChildTerminated;
use dactor::system_actors::{
CancelManager, CancelResponse, NodeDirectory, PeerStatus, SpawnManager, SpawnRequest,
SpawnResponse, WatchManager, WatchNotification,
};
use dactor::type_registry::TypeRegistry;
use crate::cluster::KameoClusterEvents;
struct WatchEntry {
watcher_id: ActorId,
notify: Box<dyn Fn(ChildTerminated) + Send + Sync>,
}
type WatcherMap = Arc<Mutex<HashMap<ActorId, Vec<WatchEntry>>>>;
struct DactorMsg<A: Actor>(Box<dyn Dispatch<A>>);
struct KameoDactorActor<A: Actor> {
actor: A,
ctx: ActorContext,
interceptors: Vec<Box<dyn InboundInterceptor>>,
watchers: WatcherMap,
stop_reason: Option<String>,
dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
stop_notifier: Option<tokio::sync::oneshot::Sender<Result<(), String>>>,
}
struct KameoSpawnArgs<A: Actor> {
args: A::Args,
deps: A::Deps,
actor_id: ActorId,
actor_name: String,
interceptors: Vec<Box<dyn InboundInterceptor>>,
watchers: WatcherMap,
dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
stop_notifier: Option<tokio::sync::oneshot::Sender<Result<(), String>>>,
}
impl<A: Actor + 'static> kameo::Actor for KameoDactorActor<A> {
type Args = KameoSpawnArgs<A>;
type Error = kameo::error::Infallible;
async fn on_start(
args: Self::Args,
_actor_ref: kameo::actor::ActorRef<Self>,
) -> Result<Self, Self::Error> {
let mut actor = A::create(args.args, args.deps);
let mut ctx = ActorContext::new(args.actor_id, args.actor_name);
actor.on_start(&mut ctx).await;
Ok(Self {
actor,
ctx,
interceptors: args.interceptors,
watchers: args.watchers,
stop_reason: None,
dead_letter_handler: args.dead_letter_handler,
stop_notifier: args.stop_notifier,
})
}
async fn on_stop(
&mut self,
_actor_ref: kameo::actor::WeakActorRef<Self>,
_reason: kameo::error::ActorStopReason,
) -> Result<(), Self::Error> {
self.ctx.send_mode = None;
self.ctx.headers = Headers::new();
self.ctx.set_cancellation_token(None);
let stop_result =
std::panic::AssertUnwindSafe(self.actor.on_stop())
.catch_unwind()
.await;
let stop_err = match stop_result {
Ok(()) => None,
Err(_panic) => Some("actor panicked in on_stop".to_string()),
};
let actor_id = self.ctx.actor_id.clone();
let actor_name = self.ctx.actor_name.clone();
let entries = {
let mut watchers = self.watchers.lock().unwrap();
let target_entries = watchers.remove(&actor_id).unwrap_or_default();
for entries in watchers.values_mut() {
entries.retain(|e| e.watcher_id != actor_id);
}
watchers.retain(|_, v| !v.is_empty());
target_entries
};
if !entries.is_empty() {
let notification = ChildTerminated {
child_id: actor_id,
child_name: actor_name,
reason: self.stop_reason.clone(),
};
for entry in &entries {
(entry.notify)(notification.clone());
}
}
if let Some(tx) = self.stop_notifier.take() {
let result = match &stop_err {
Some(e) => Err(e.clone()),
None => Ok(()),
};
let _ = tx.send(result);
}
Ok(())
}
}
impl<A: Actor + 'static> kameo::message::Message<DactorMsg<A>> for KameoDactorActor<A> {
type Reply = ();
async fn handle(
&mut self,
msg: DactorMsg<A>,
_ctx: &mut kameo::message::Context<Self, Self::Reply>,
) -> Self::Reply {
let dispatch = msg.0;
let send_mode = dispatch.send_mode();
let message_type = dispatch.message_type_name();
self.ctx.send_mode = Some(send_mode);
self.ctx.headers = Headers::new();
let runtime_headers = RuntimeHeaders::new();
let mut headers = Headers::new();
let mut total_delay = Duration::ZERO;
let mut rejection: Option<(String, Disposition)> = None;
{
let ictx = InboundContext {
actor_id: self.ctx.actor_id.clone(),
actor_name: &self.ctx.actor_name,
message_type,
send_mode,
remote: false,
origin_node: None,
};
for interceptor in &self.interceptors {
match interceptor.on_receive(
&ictx,
&runtime_headers,
&mut headers,
dispatch.message_any(),
) {
Disposition::Continue => {}
Disposition::Delay(d) => {
total_delay += d;
}
disp @ (Disposition::Drop | Disposition::Reject(_) | Disposition::Retry(_)) => {
rejection = Some((interceptor.name().to_string(), disp));
break;
}
}
}
}
if let Some((interceptor_name, disposition)) = rejection {
if matches!(disposition, Disposition::Drop) {
if let Some(ref handler) = *self.dead_letter_handler {
let event = DeadLetterEvent {
target_id: self.ctx.actor_id.clone(),
target_name: Some(self.ctx.actor_name.clone()),
message_type,
send_mode,
reason: DeadLetterReason::DroppedByInterceptor {
interceptor: interceptor_name.clone(),
},
message: None,
};
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
handler.on_dead_letter(event);
}));
}
}
dispatch.reject(disposition, &interceptor_name);
return;
}
if !total_delay.is_zero() {
tokio::time::sleep(total_delay).await;
}
self.ctx.headers = headers;
let cancel_token = dispatch.cancel_token();
self.ctx.set_cancellation_token(cancel_token.clone());
if let Some(ref token) = cancel_token {
if token.is_cancelled() {
dispatch.cancel();
self.ctx.set_cancellation_token(None);
return;
}
}
let result = if let Some(ref token) = cancel_token {
let dispatch_fut =
std::panic::AssertUnwindSafe(dispatch.dispatch(&mut self.actor, &mut self.ctx))
.catch_unwind();
tokio::select! {
biased;
r = dispatch_fut => r,
_ = token.cancelled() => {
self.ctx.set_cancellation_token(None);
return;
}
}
} else {
std::panic::AssertUnwindSafe(dispatch.dispatch(&mut self.actor, &mut self.ctx))
.catch_unwind()
.await
};
self.ctx.set_cancellation_token(None);
let ictx = InboundContext {
actor_id: self.ctx.actor_id.clone(),
actor_name: &self.ctx.actor_name,
message_type,
send_mode,
remote: false,
origin_node: None,
};
match result {
Ok(dispatch_result) => {
let outcome = match (&dispatch_result.reply, send_mode) {
(Some(reply), SendMode::Ask) => Outcome::AskSuccess {
reply: reply.as_ref(),
},
_ => Outcome::TellSuccess,
};
for interceptor in &self.interceptors {
interceptor.on_complete(&ictx, &runtime_headers, &self.ctx.headers, &outcome);
}
dispatch_result.send_reply();
}
Err(_panic) => {
let error = ActorError::internal("handler panicked");
let action = self.actor.on_error(&error);
let outcome = Outcome::HandlerError { error };
for interceptor in &self.interceptors {
interceptor.on_complete(&ictx, &runtime_headers, &self.ctx.headers, &outcome);
}
match action {
ErrorAction::Resume => {
}
ErrorAction::Stop | ErrorAction::Escalate => {
self.stop_reason = Some("handler panicked".into());
std::panic::resume_unwind(Box::new(
"dactor: actor stop requested after panic",
));
}
ErrorAction::Restart => {
tracing::warn!("Restart not fully implemented, treating as Resume");
}
}
}
}
}
}
pub struct KameoActorRef<A: Actor> {
id: ActorId,
name: String,
inner: kameo::actor::ActorRef<KameoDactorActor<A>>,
bounded_tx: Option<BoundedMailboxSender<DactorMsg<A>>>,
outbound_interceptors: Arc<Vec<Box<dyn OutboundInterceptor>>>,
drop_observer: Option<Arc<dyn DropObserver>>,
dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
}
use dactor::runtime_support::BoundedMailboxSender;
impl<A: Actor> Clone for KameoActorRef<A> {
fn clone(&self) -> Self {
Self {
id: self.id.clone(),
name: self.name.clone(),
inner: self.inner.clone(),
bounded_tx: self.bounded_tx.clone(),
outbound_interceptors: self.outbound_interceptors.clone(),
drop_observer: self.drop_observer.clone(),
dead_letter_handler: self.dead_letter_handler.clone(),
}
}
}
impl<A: Actor> std::fmt::Debug for KameoActorRef<A> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "KameoActorRef({}, {:?})", self.name, self.id)
}
}
impl<A: Actor> KameoActorRef<A> {
fn outbound_pipeline(&self) -> OutboundPipeline {
OutboundPipeline {
interceptors: self.outbound_interceptors.clone(),
drop_observer: self.drop_observer.clone(),
target_id: self.id.clone(),
target_name: self.name.clone(),
}
}
fn notify_dead_letter(
&self,
message_type: &'static str,
send_mode: SendMode,
reason: DeadLetterReason,
) {
if let Some(ref handler) = *self.dead_letter_handler {
let event = DeadLetterEvent {
target_id: self.id.clone(),
target_name: Some(self.name.clone()),
message_type,
send_mode,
reason,
message: None,
};
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
handler.on_dead_letter(event);
}));
}
}
fn send_dispatch(&self, dispatch: Box<dyn Dispatch<A>>) -> Result<(), ActorSendError> {
if let Some(ref btx) = self.bounded_tx {
btx.try_send(DactorMsg(dispatch))
} else {
self.inner
.tell(DactorMsg(dispatch))
.try_send()
.map_err(|e| ActorSendError(e.to_string()))
}
}
}
impl<A: Actor + 'static> ActorRef<A> for KameoActorRef<A> {
fn id(&self) -> ActorId {
self.id.clone()
}
fn name(&self) -> String {
self.name.clone()
}
fn is_alive(&self) -> bool {
if let Some(ref btx) = self.bounded_tx {
!btx.is_closed() && self.inner.is_alive()
} else {
self.inner.is_alive()
}
}
fn pending_messages(&self) -> usize {
if let Some(ref btx) = self.bounded_tx {
btx.pending()
} else {
0
}
}
fn stop(&self) {
self.inner.kill();
}
fn tell<M>(&self, msg: M) -> Result<(), ActorSendError>
where
A: Handler<M>,
M: Message<Reply = ()>,
{
let pipeline = self.outbound_pipeline();
let result = pipeline.run_on_send(SendMode::Tell, &msg);
match result.disposition {
Disposition::Continue => {}
Disposition::Drop | Disposition::Reject(_) | Disposition::Retry(_) => return Ok(()),
Disposition::Delay(_) => {} }
let dispatch: Box<dyn Dispatch<A>> = Box::new(TypedDispatch { msg });
self.send_dispatch(dispatch).map_err(|e| {
let reason = if e.0.contains("full") {
DeadLetterReason::MailboxFull
} else {
DeadLetterReason::ActorStopped
};
self.notify_dead_letter(
std::any::type_name::<M>(),
SendMode::Tell,
reason,
);
ActorSendError(e.to_string())
})
}
fn ask<M>(
&self,
msg: M,
cancel: Option<CancellationToken>,
) -> Result<AskReply<M::Reply>, ActorSendError>
where
A: Handler<M>,
M: Message,
{
let pipeline = self.outbound_pipeline();
let result = pipeline.run_on_send(SendMode::Ask, &msg);
match result.disposition {
Disposition::Continue => {}
Disposition::Delay(_) => {} Disposition::Drop => {
let (tx, rx) = tokio::sync::oneshot::channel();
let _ = tx.send(Err(RuntimeError::ActorNotFound(
"message dropped by outbound interceptor".into(),
)));
return Ok(AskReply::new(rx));
}
Disposition::Reject(reason) => {
let (tx, rx) = tokio::sync::oneshot::channel();
let _ = tx.send(Err(RuntimeError::Rejected {
interceptor: result.interceptor_name.to_string(),
reason,
}));
return Ok(AskReply::new(rx));
}
Disposition::Retry(retry_after) => {
let (tx, rx) = tokio::sync::oneshot::channel();
let _ = tx.send(Err(RuntimeError::RetryAfter {
interceptor: result.interceptor_name.to_string(),
retry_after,
}));
return Ok(AskReply::new(rx));
}
}
let (tx, rx) = tokio::sync::oneshot::channel();
let dispatch: Box<dyn Dispatch<A>> = Box::new(AskDispatch {
msg,
reply_tx: tx,
cancel,
});
self.send_dispatch(dispatch).map_err(|e| {
let reason = if e.0.contains("full") {
DeadLetterReason::MailboxFull
} else {
DeadLetterReason::ActorStopped
};
self.notify_dead_letter(
std::any::type_name::<M>(),
SendMode::Ask,
reason,
);
ActorSendError(e.to_string())
})?;
Ok(AskReply::new(rx))
}
fn expand<M, OutputItem>(
&self,
msg: M,
buffer: usize,
batch_config: Option<BatchConfig>,
cancel: Option<CancellationToken>,
) -> Result<BoxStream<OutputItem>, ActorSendError>
where
A: ExpandHandler<M, OutputItem>,
M: Send + 'static,
OutputItem: Send + 'static,
{
let pipeline = self.outbound_pipeline();
let result = pipeline.run_on_send(SendMode::Expand, &msg);
match result.disposition {
Disposition::Continue => {}
Disposition::Delay(_) => {}
Disposition::Drop => {
return Err(ActorSendError(
"stream dropped by outbound interceptor".into(),
));
}
Disposition::Reject(reason) => {
return Err(ActorSendError(format!("stream rejected: {}", reason)));
}
Disposition::Retry(_) => {
return Err(ActorSendError(
"stream retry requested by interceptor".into(),
));
}
}
let buffer = buffer.max(1);
let (tx, rx) = tokio::sync::mpsc::channel(buffer);
let sender = StreamSender::new(tx);
let dispatch: Box<dyn Dispatch<A>> = Box::new(ExpandDispatch {
msg,
sender,
cancel,
});
self.send_dispatch(dispatch)?;
match batch_config {
Some(batch_config) => {
let (batch_tx, batch_rx) = tokio::sync::mpsc::channel::<Vec<OutputItem>>(buffer);
let reader = BatchReader::new(batch_rx);
let batch_delay = batch_config.max_delay;
tokio::spawn(async move {
let mut writer = BatchWriter::new(batch_tx, batch_config);
let mut rx = rx;
loop {
if writer.buffered_count() > 0 {
let deadline = tokio::time::Instant::now() + batch_delay;
tokio::select! {
biased;
item = rx.recv() => match item {
Some(item) => {
if writer.push(item).await.is_err() { break; }
}
None => break,
},
_ = tokio::time::sleep_until(deadline) => {
if writer.check_deadline().await.is_err() { break; }
}
}
} else {
match rx.recv().await {
Some(item) => {
if writer.push(item).await.is_err() {
break;
}
}
None => break,
}
}
}
let _ = writer.flush().await;
});
Ok(wrap_batched_stream_with_interception(
reader,
buffer,
pipeline,
std::any::type_name::<M>(),
SendMode::Expand,
))
}
None => Ok(wrap_stream_with_interception(
rx,
buffer,
pipeline,
std::any::type_name::<M>(),
SendMode::Expand,
)),
}
}
fn reduce<InputItem, Reply>(
&self,
input: BoxStream<InputItem>,
buffer: usize,
batch_config: Option<BatchConfig>,
cancel: Option<CancellationToken>,
) -> Result<AskReply<Reply>, ActorSendError>
where
A: ReduceHandler<InputItem, Reply>,
InputItem: Send + 'static,
Reply: Send + 'static,
{
let buffer = buffer.max(1);
let (item_tx, item_rx) = tokio::sync::mpsc::channel(buffer);
let receiver = StreamReceiver::new(item_rx);
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
let dispatch: Box<dyn Dispatch<A>> = Box::new(ReduceDispatch {
receiver,
reply_tx,
cancel: cancel.clone(),
});
self.send_dispatch(dispatch)?;
let pipeline = self.outbound_pipeline();
match batch_config {
Some(batch_config) => {
spawn_reduce_batched_drain(
input,
item_tx,
buffer,
batch_config,
cancel,
pipeline,
std::any::type_name::<InputItem>(),
);
}
None => {
spawn_reduce_drain(
input,
item_tx,
cancel,
pipeline,
std::any::type_name::<InputItem>(),
);
}
}
Ok(AskReply::new(reply_rx))
}
fn transform<InputItem, OutputItem>(
&self,
input: BoxStream<InputItem>,
buffer: usize,
batch_config: Option<BatchConfig>,
cancel: Option<CancellationToken>,
) -> Result<BoxStream<OutputItem>, ActorSendError>
where
A: TransformHandler<InputItem, OutputItem>,
InputItem: Send + 'static,
OutputItem: Send + 'static,
{
let buffer = buffer.max(1);
let (item_tx, item_rx) = tokio::sync::mpsc::channel(buffer);
let (output_tx, mut output_rx) = tokio::sync::mpsc::channel(buffer);
let receiver = StreamReceiver::new(item_rx);
let sender = StreamSender::new(output_tx);
let dispatch: Box<dyn Dispatch<A>> = Box::new(TransformDispatch::new(
receiver,
sender,
cancel.clone(),
));
self.send_dispatch(dispatch)?;
let pipeline = self.outbound_pipeline();
spawn_transform_drain(
input,
item_tx,
cancel,
pipeline.clone(),
std::any::type_name::<InputItem>(),
);
match batch_config {
Some(batch_config) => {
let (batch_tx, batch_rx) =
tokio::sync::mpsc::channel::<Vec<OutputItem>>(buffer);
let reader = BatchReader::new(batch_rx);
let batch_delay = batch_config.max_delay;
tokio::spawn(async move {
let mut writer = BatchWriter::new(batch_tx, batch_config);
loop {
if writer.buffered_count() > 0 {
let deadline = tokio::time::Instant::now() + batch_delay;
tokio::select! {
biased;
item = output_rx.recv() => match item {
Some(item) => {
if writer.push(item).await.is_err() { break; }
}
None => break,
},
_ = tokio::time::sleep_until(deadline) => {
if writer.check_deadline().await.is_err() { break; }
}
}
} else {
match output_rx.recv().await {
Some(item) => {
if writer.push(item).await.is_err() {
break;
}
}
None => break,
}
}
}
let _ = writer.flush().await;
});
Ok(wrap_batched_stream_with_interception(
reader,
buffer,
pipeline,
std::any::type_name::<OutputItem>(),
SendMode::Transform,
))
}
None => Ok(wrap_stream_with_interception(
output_rx,
buffer,
pipeline,
std::any::type_name::<OutputItem>(),
SendMode::Transform,
)),
}
}
}
pub struct SpawnOptions {
pub interceptors: Vec<Box<dyn InboundInterceptor>>,
pub mailbox: MailboxConfig,
}
impl Default for SpawnOptions {
fn default() -> Self {
Self {
interceptors: Vec::new(),
mailbox: MailboxConfig::Unbounded,
}
}
}
pub struct KameoRuntime {
node_id: NodeId,
next_local: Arc<AtomicU64>,
cluster_events: KameoClusterEvents,
outbound_interceptors: Arc<Vec<Box<dyn OutboundInterceptor>>>,
drop_observer: Option<Arc<dyn DropObserver>>,
dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
watchers: WatcherMap,
spawn_manager: SpawnManager,
watch_manager: WatchManager,
cancel_manager: CancelManager,
node_directory: NodeDirectory,
system_actors: Option<KameoSystemActorRefs>,
#[allow(clippy::type_complexity)]
stop_receivers: Arc<Mutex<HashMap<ActorId, tokio::sync::oneshot::Receiver<Result<(), String>>>>>,
app_version: Option<String>,
}
pub struct KameoSystemActorRefs {
pub spawn_manager: kameo::actor::ActorRef<crate::system_actors::SpawnManagerActor>,
pub watch_manager: kameo::actor::ActorRef<crate::system_actors::WatchManagerActor>,
pub cancel_manager: kameo::actor::ActorRef<crate::system_actors::CancelManagerActor>,
pub node_directory: kameo::actor::ActorRef<crate::system_actors::NodeDirectoryActor>,
}
impl KameoRuntime {
pub fn new() -> Self {
Self::create(NodeId("kameo-node".into()))
}
pub fn with_node_id(node_id: NodeId) -> Self {
Self::create(node_id)
}
fn create(node_id: NodeId) -> Self {
Self {
node_id,
next_local: Arc::new(AtomicU64::new(1)),
cluster_events: KameoClusterEvents::new(),
outbound_interceptors: Arc::new(Vec::new()),
drop_observer: None,
dead_letter_handler: Arc::new(None),
watchers: Arc::new(Mutex::new(HashMap::new())),
spawn_manager: SpawnManager::new(TypeRegistry::new()),
watch_manager: WatchManager::new(),
cancel_manager: CancelManager::new(),
node_directory: NodeDirectory::new(),
system_actors: None,
stop_receivers: Arc::new(Mutex::new(HashMap::new())),
app_version: None,
}
}
pub const ADAPTER_NAME: &'static str = "kameo";
pub fn with_app_version(mut self, version: impl Into<String>) -> Self {
self.app_version = Some(version.into());
self
}
pub fn app_version(&self) -> Option<&str> {
self.app_version.as_deref()
}
pub fn handshake_request(&self) -> dactor::HandshakeRequest {
dactor::HandshakeRequest::from_runtime(
self.node_id.clone(),
self.app_version.clone(),
Self::ADAPTER_NAME,
)
}
pub fn start_system_actors(&mut self) {
use crate::system_actors::*;
use kameo::actor::Spawn;
let spawn_mgr_ref = SpawnManagerActor::spawn_with_mailbox(
(self.node_id.clone(), TypeRegistry::new(), self.next_local.clone()),
kameo::mailbox::unbounded(),
);
let watch_mgr_ref =
WatchManagerActor::spawn_with_mailbox((), kameo::mailbox::unbounded());
let cancel_mgr_ref =
CancelManagerActor::spawn_with_mailbox((), kameo::mailbox::unbounded());
let node_dir_ref =
NodeDirectoryActor::spawn_with_mailbox((), kameo::mailbox::unbounded());
self.system_actors = Some(KameoSystemActorRefs {
spawn_manager: spawn_mgr_ref,
watch_manager: watch_mgr_ref,
cancel_manager: cancel_mgr_ref,
node_directory: node_dir_ref,
});
}
pub fn node_id(&self) -> &NodeId {
&self.node_id
}
pub fn system_actor_refs(&self) -> Option<&KameoSystemActorRefs> {
self.system_actors.as_ref()
}
pub fn add_outbound_interceptor(&mut self, interceptor: Box<dyn OutboundInterceptor>) {
Arc::get_mut(&mut self.outbound_interceptors)
.expect("cannot add interceptors after actors are spawned")
.push(interceptor);
}
pub fn set_drop_observer(&mut self, observer: Arc<dyn DropObserver>) {
self.drop_observer = Some(observer);
}
pub fn set_dead_letter_handler(&mut self, handler: Arc<dyn DeadLetterHandler>) {
self.dead_letter_handler = Arc::new(Some(handler));
}
pub fn cluster_events_handle(&self) -> &KameoClusterEvents {
&self.cluster_events
}
pub fn cluster_events(&self) -> &KameoClusterEvents {
&self.cluster_events
}
pub async fn spawn<A>(&self, name: &str, args: A::Args) -> Result<KameoActorRef<A>, dactor::errors::RuntimeError>
where
A: Actor<Deps = ()> + 'static,
{
Ok(self.spawn_internal::<A>(name, args, (), Vec::new(), MailboxConfig::Unbounded))
}
pub async fn spawn_with_deps<A>(&self, name: &str, args: A::Args, deps: A::Deps) -> Result<KameoActorRef<A>, dactor::errors::RuntimeError>
where
A: Actor + 'static,
{
Ok(self.spawn_internal::<A>(name, args, deps, Vec::new(), MailboxConfig::Unbounded))
}
pub async fn spawn_with_options<A>(
&self,
name: &str,
args: A::Args,
options: SpawnOptions,
) -> Result<KameoActorRef<A>, dactor::errors::RuntimeError>
where
A: Actor<Deps = ()> + 'static,
{
Ok(self.spawn_internal::<A>(name, args, (), options.interceptors, options.mailbox))
}
fn spawn_internal<A>(
&self,
name: &str,
args: A::Args,
deps: A::Deps,
interceptors: Vec<Box<dyn InboundInterceptor>>,
mailbox: MailboxConfig,
) -> KameoActorRef<A>
where
A: Actor + 'static,
{
use kameo::actor::Spawn;
let local = self.next_local.fetch_add(1, Ordering::SeqCst);
let actor_id = ActorId {
node: self.node_id.clone(),
local,
};
let actor_name = name.to_string();
let (stop_tx, stop_rx) = tokio::sync::oneshot::channel();
let spawn_args = KameoSpawnArgs {
args,
deps,
actor_id: actor_id.clone(),
actor_name: actor_name.clone(),
interceptors,
watchers: self.watchers.clone(),
dead_letter_handler: self.dead_letter_handler.clone(),
stop_notifier: Some(stop_tx),
};
let actor_ref =
KameoDactorActor::<A>::spawn_with_mailbox(spawn_args, kameo::mailbox::unbounded());
let bounded_tx = match mailbox {
MailboxConfig::Bounded { capacity, overflow } => {
let (btx, mut brx) = tokio::sync::mpsc::channel::<DactorMsg<A>>(capacity);
let fwd_ref = actor_ref.clone();
tokio::spawn(async move {
while let Some(msg) = brx.recv().await {
if fwd_ref.tell(msg).try_send().is_err() {
break;
}
}
});
Some(BoundedMailboxSender::new(btx, overflow))
}
MailboxConfig::Unbounded => None,
};
self.stop_receivers.lock().unwrap().insert(actor_id.clone(), stop_rx);
KameoActorRef {
id: actor_id,
name: actor_name,
inner: actor_ref,
bounded_tx,
outbound_interceptors: self.outbound_interceptors.clone(),
drop_observer: self.drop_observer.clone(),
dead_letter_handler: self.dead_letter_handler.clone(),
}
}
pub fn watch<W>(&self, watcher: &KameoActorRef<W>, target_id: ActorId)
where
W: Actor + Handler<ChildTerminated> + 'static,
{
let watcher_id = watcher.id();
let watcher_inner = watcher.inner.clone();
let entry = WatchEntry {
watcher_id,
notify: Box::new(move |msg: ChildTerminated| {
let dispatch: Box<dyn Dispatch<W>> = Box::new(TypedDispatch { msg });
if watcher_inner.tell(DactorMsg(dispatch)).try_send().is_err() {
tracing::debug!("watch notification dropped — watcher may have stopped");
}
}),
};
let mut watchers = self.watchers.lock().unwrap();
watchers.entry(target_id).or_default().push(entry);
}
pub fn unwatch(&self, watcher_id: &ActorId, target_id: &ActorId) {
let mut watchers = self.watchers.lock().unwrap();
if let Some(entries) = watchers.get_mut(target_id) {
entries.retain(|e| &e.watcher_id != watcher_id);
if entries.is_empty() {
watchers.remove(target_id);
}
}
}
pub fn spawn_manager(&self) -> &SpawnManager {
&self.spawn_manager
}
pub fn spawn_manager_mut(&mut self) -> &mut SpawnManager {
&mut self.spawn_manager
}
pub fn register_factory(
&mut self,
type_name: impl Into<String>,
factory: impl Fn(&[u8]) -> Result<Box<dyn std::any::Any + Send>, dactor::remote::SerializationError>
+ Send
+ Sync
+ 'static,
) {
self.spawn_manager
.type_registry_mut()
.register_factory(type_name, factory);
}
pub fn handle_spawn_request(
&mut self,
request: &SpawnRequest,
) -> Result<(ActorId, Box<dyn std::any::Any + Send>), SpawnResponse> {
match self.spawn_manager.create_actor(request) {
Ok(actor) => {
let local = self.next_local.fetch_add(1, Ordering::SeqCst);
let actor_id = ActorId {
node: self.node_id.clone(),
local,
};
self.spawn_manager.record_spawn(actor_id.clone());
Ok((actor_id, actor))
}
Err(e) => Err(SpawnResponse::Failure {
request_id: request.request_id.clone(),
error: e.to_string(),
}),
}
}
pub fn watch_manager(&self) -> &WatchManager {
&self.watch_manager
}
pub fn watch_manager_mut(&mut self) -> &mut WatchManager {
&mut self.watch_manager
}
pub fn remote_watch(&mut self, target: ActorId, watcher: ActorId) {
self.watch_manager.watch(target, watcher);
}
pub fn remote_unwatch(&mut self, target: &ActorId, watcher: &ActorId) {
self.watch_manager.unwatch(target, watcher);
}
pub fn notify_terminated(&mut self, terminated: &ActorId) -> Vec<WatchNotification> {
self.watch_manager.on_terminated(terminated)
}
pub fn cancel_manager(&self) -> &CancelManager {
&self.cancel_manager
}
pub fn cancel_manager_mut(&mut self) -> &mut CancelManager {
&mut self.cancel_manager
}
pub fn register_cancel(&mut self, request_id: String, token: CancellationToken) {
self.cancel_manager.register(request_id, token);
}
pub fn cancel_request(&mut self, request_id: &str) -> CancelResponse {
self.cancel_manager.cancel(request_id)
}
pub fn complete_request(&mut self, request_id: &str) {
self.cancel_manager.remove(request_id);
}
pub fn node_directory(&self) -> &NodeDirectory {
&self.node_directory
}
pub fn node_directory_mut(&mut self) -> &mut NodeDirectory {
&mut self.node_directory
}
pub fn connect_peer(&mut self, peer_id: NodeId, address: Option<String>) {
let was_connected = self.node_directory.is_connected(&peer_id);
if let Some(existing) = self.node_directory.get_peer(&peer_id) {
let resolved_address = address.or_else(|| existing.address.clone());
self.node_directory.remove_peer(&peer_id);
self.node_directory.add_peer(peer_id.clone(), resolved_address);
} else {
self.node_directory.add_peer(peer_id.clone(), address);
}
self.node_directory.set_status(&peer_id, PeerStatus::Connected);
if !was_connected {
self.cluster_events.emit(dactor::ClusterEvent::NodeJoined(peer_id));
}
}
pub fn disconnect_peer(&mut self, peer_id: &NodeId) {
let was_connected = self.node_directory.is_connected(peer_id);
self.node_directory.set_status(peer_id, PeerStatus::Disconnected);
if was_connected {
self.cluster_events.emit(dactor::ClusterEvent::NodeLeft(peer_id.clone()));
}
}
pub fn is_peer_connected(&self, peer_id: &NodeId) -> bool {
self.node_directory.is_connected(peer_id)
}
pub async fn await_stop(&self, actor_id: &ActorId) -> Result<(), String> {
let rx = {
let mut receivers = self.stop_receivers.lock().unwrap();
receivers.remove(actor_id)
};
match rx {
Some(rx) => rx
.await
.map_err(|_| "stop notifier dropped".to_string())
.and_then(|r| r),
None => Ok(()),
}
}
pub async fn await_all(&self) -> Result<(), String> {
let receivers: Vec<_> = {
let mut map = self.stop_receivers.lock().unwrap();
map.drain().collect()
};
let mut first_error = None;
for (_, rx) in receivers {
let result = rx.await.map_err(|e| format!("stop notifier dropped: {e}")).and_then(|r| r);
if let Err(e) = result {
if first_error.is_none() {
first_error = Some(e);
}
}
}
match first_error {
Some(e) => Err(e),
None => Ok(()),
}
}
pub fn cleanup_finished(&self) {
let mut receivers = self.stop_receivers.lock().unwrap();
receivers.retain(|_, rx| {
matches!(rx.try_recv(), Err(tokio::sync::oneshot::error::TryRecvError::Empty))
});
}
pub fn active_handle_count(&self) -> usize {
self.stop_receivers.lock().unwrap().len()
}
}
impl Default for KameoRuntime {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl dactor::system_router::SystemMessageRouter for KameoRuntime {
async fn route_system_envelope(
&self,
envelope: dactor::remote::WireEnvelope,
) -> Result<dactor::system_router::RoutingOutcome, dactor::system_router::RoutingError> {
use dactor::system_actors::*;
use dactor::system_router::{RoutingError, RoutingOutcome};
dactor::system_router::validate_system_message_type(&envelope.message_type)?;
let refs = self
.system_actors
.as_ref()
.ok_or_else(|| RoutingError::new("system actors not started"))?;
match envelope.message_type.as_str() {
SYSTEM_MSG_TYPE_SPAWN => {
let request = dactor::proto::decode_spawn_request(&envelope.body)
.map_err(|e| RoutingError::new(format!("decode SpawnRequest: {e}")))?;
let req_id = request.request_id.clone();
let outcome = refs
.spawn_manager
.ask(crate::system_actors::HandleSpawnRequest(request))
.await
.map_err(|e| RoutingError::new(format!("SpawnManager ask: {e}")))?;
match outcome {
crate::system_actors::SpawnOutcome::Success { actor_id, .. } => {
Ok(RoutingOutcome::SpawnCompleted {
request_id: req_id,
actor_id,
})
}
crate::system_actors::SpawnOutcome::Failure(SpawnResponse::Failure {
request_id,
error,
}) => Ok(RoutingOutcome::SpawnFailed { request_id, error }),
crate::system_actors::SpawnOutcome::Failure(SpawnResponse::Success {
..
}) => {
unreachable!("SpawnOutcome::Failure always wraps SpawnResponse::Failure")
}
}
}
SYSTEM_MSG_TYPE_WATCH => {
let request = dactor::proto::decode_watch_request(&envelope.body)
.map_err(|e| RoutingError::new(format!("decode WatchRequest: {e}")))?;
refs.watch_manager
.tell(crate::system_actors::RemoteWatch {
target: request.target,
watcher: request.watcher,
})
.await
.map_err(|e| RoutingError::new(format!("WatchManager tell: {e}")))?;
Ok(RoutingOutcome::Acknowledged)
}
SYSTEM_MSG_TYPE_UNWATCH => {
let request = dactor::proto::decode_unwatch_request(&envelope.body)
.map_err(|e| RoutingError::new(format!("decode UnwatchRequest: {e}")))?;
refs.watch_manager
.tell(crate::system_actors::RemoteUnwatch {
target: request.target,
watcher: request.watcher,
})
.await
.map_err(|e| RoutingError::new(format!("WatchManager tell: {e}")))?;
Ok(RoutingOutcome::Acknowledged)
}
SYSTEM_MSG_TYPE_CANCEL => {
let request = dactor::proto::decode_cancel_request(&envelope.body)
.map_err(|e| RoutingError::new(format!("decode CancelRequest: {e}")))?;
let request_id = request
.request_id
.ok_or_else(|| RoutingError::new("CancelRequest missing request_id"))?;
let outcome = refs
.cancel_manager
.ask(crate::system_actors::CancelById(request_id))
.await
.map_err(|e| RoutingError::new(format!("CancelManager ask: {e}")))?;
match outcome.0 {
CancelResponse::Acknowledged => Ok(RoutingOutcome::CancelAcknowledged),
CancelResponse::NotFound { reason } => {
Ok(RoutingOutcome::CancelNotFound { reason })
}
}
}
SYSTEM_MSG_TYPE_CONNECT_PEER => {
let (peer_id, address) = dactor::proto::decode_connect_peer(&envelope.body)
.map_err(|e| RoutingError::new(format!("decode ConnectPeer: {e}")))?;
refs.node_directory
.tell(crate::system_actors::ConnectPeer {
peer_id,
address,
})
.await
.map_err(|e| RoutingError::new(format!("NodeDirectory tell: {e}")))?;
Ok(RoutingOutcome::Acknowledged)
}
SYSTEM_MSG_TYPE_DISCONNECT_PEER => {
let peer_id = dactor::proto::decode_disconnect_peer(&envelope.body)
.map_err(|e| RoutingError::new(format!("decode DisconnectPeer: {e}")))?;
refs.node_directory
.tell(crate::system_actors::DisconnectPeer(peer_id))
.await
.map_err(|e| RoutingError::new(format!("NodeDirectory tell: {e}")))?;
Ok(RoutingOutcome::Acknowledged)
}
other => Err(RoutingError::new(format!(
"unhandled system message type: {other}"
))),
}
}
}