1use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Mutex};
9use std::time::Duration;
10
11use futures::FutureExt;
12use tokio_util::sync::CancellationToken;
13
14use dactor::actor::{
15 Actor, ActorContext, ActorError, ActorRef, AskReply, ReduceHandler, Handler, ExpandHandler,
16 TransformHandler,
17};
18use dactor::dead_letter::{DeadLetterEvent, DeadLetterHandler, DeadLetterReason};
19use dactor::dispatch::{AskDispatch, Dispatch, ReduceDispatch, ExpandDispatch, TransformDispatch, TypedDispatch};
20use dactor::errors::{ActorSendError, ErrorAction, RuntimeError};
21use dactor::interceptor::{
22 collect_handler_wrappers, apply_handler_wrappers,
23 Disposition, DropObserver, InboundContext, InboundInterceptor, OutboundInterceptor, Outcome,
24 SendMode,
25};
26use dactor::mailbox::MailboxConfig;
27use dactor::message::{Headers, Message, RuntimeHeaders};
28use dactor::node::{ActorId, NodeId};
29use dactor::runtime_support::{
30 spawn_reduce_batched_drain, spawn_reduce_drain, spawn_transform_drain,
31 wrap_batched_stream_with_interception, wrap_stream_with_interception, OutboundPipeline,
32};
33use dactor::stream::{
34 BatchConfig, BatchReader, BatchWriter, BoxStream, StreamReceiver, StreamSender,
35};
36use dactor::supervision::ChildTerminated;
37use dactor::system_actors::{
38 CancelManager, CancelResponse, NodeDirectory, PeerStatus, SpawnManager, SpawnRequest,
39 SpawnResponse, WatchManager, WatchNotification,
40};
41use dactor::type_registry::TypeRegistry;
42
43use crate::cluster::KameoClusterEvents;
44
45struct WatchEntry {
51 watcher_id: ActorId,
52 notify: Box<dyn Fn(ChildTerminated) + Send + Sync>,
54}
55
56type WatcherMap = Arc<Mutex<HashMap<ActorId, Vec<WatchEntry>>>>;
58
59struct DactorMsg<A: Actor>(Box<dyn Dispatch<A>>);
65
66struct KameoDactorActor<A: Actor> {
68 actor: A,
69 ctx: ActorContext,
70 interceptors: Vec<Box<dyn InboundInterceptor>>,
71 watchers: WatcherMap,
72 stop_reason: Option<String>,
73 dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
74 stop_notifier: Option<tokio::sync::oneshot::Sender<Result<(), String>>>,
76}
77
78struct KameoSpawnArgs<A: Actor> {
80 args: A::Args,
81 deps: A::Deps,
82 actor_id: ActorId,
83 actor_name: String,
84 interceptors: Vec<Box<dyn InboundInterceptor>>,
85 watchers: WatcherMap,
86 dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
87 stop_notifier: Option<tokio::sync::oneshot::Sender<Result<(), String>>>,
88}
89
90impl<A: Actor + 'static> kameo::Actor for KameoDactorActor<A> {
91 type Args = KameoSpawnArgs<A>;
92 type Error = kameo::error::Infallible;
93
94 async fn on_start(
95 args: Self::Args,
96 _actor_ref: kameo::actor::ActorRef<Self>,
97 ) -> Result<Self, Self::Error> {
98 let mut actor = A::create(args.args, args.deps);
99 let mut ctx = ActorContext::new(args.actor_id, args.actor_name);
100 actor.on_start(&mut ctx).await;
101 Ok(Self {
102 actor,
103 ctx,
104 interceptors: args.interceptors,
105 watchers: args.watchers,
106 stop_reason: None,
107 dead_letter_handler: args.dead_letter_handler,
108 stop_notifier: args.stop_notifier,
109 })
110 }
111
112 async fn on_stop(
113 &mut self,
114 _actor_ref: kameo::actor::WeakActorRef<Self>,
115 _reason: kameo::error::ActorStopReason,
116 ) -> Result<(), Self::Error> {
117 self.ctx.send_mode = None;
119 self.ctx.headers = Headers::new();
120 self.ctx.set_cancellation_token(None);
121
122 let stop_result =
124 std::panic::AssertUnwindSafe(self.actor.on_stop())
125 .catch_unwind()
126 .await;
127 let stop_err = match stop_result {
128 Ok(()) => None,
129 Err(_panic) => Some("actor panicked in on_stop".to_string()),
130 };
131
132 let actor_id = self.ctx.actor_id.clone();
134 let actor_name = self.ctx.actor_name.clone();
135 let entries = {
136 let mut watchers = self.watchers.lock().unwrap();
137 let target_entries = watchers.remove(&actor_id).unwrap_or_default();
139 for entries in watchers.values_mut() {
141 entries.retain(|e| e.watcher_id != actor_id);
142 }
143 watchers.retain(|_, v| !v.is_empty());
144 target_entries
145 };
146 if !entries.is_empty() {
147 let notification = ChildTerminated {
148 child_id: actor_id,
149 child_name: actor_name,
150 reason: self.stop_reason.clone(),
151 };
152 for entry in &entries {
153 (entry.notify)(notification.clone());
154 }
155 }
156
157 if let Some(tx) = self.stop_notifier.take() {
159 let result = match &stop_err {
160 Some(e) => Err(e.clone()),
161 None => Ok(()),
162 };
163 let _ = tx.send(result);
164 }
165
166 Ok(())
167 }
168}
169
170impl<A: Actor + 'static> kameo::message::Message<DactorMsg<A>> for KameoDactorActor<A> {
171 type Reply = ();
172
173 async fn handle(
174 &mut self,
175 msg: DactorMsg<A>,
176 _ctx: &mut kameo::message::Context<Self, Self::Reply>,
177 ) -> Self::Reply {
178 let dispatch = msg.0;
179
180 let send_mode = dispatch.send_mode();
182 let message_type = dispatch.message_type_name();
183
184 self.ctx.send_mode = Some(send_mode);
185 self.ctx.headers = Headers::new();
186
187 let runtime_headers = RuntimeHeaders::new();
189 let mut headers = Headers::new();
190 let mut total_delay = Duration::ZERO;
191 let mut rejection: Option<(String, Disposition)> = None;
192
193 {
194 let ictx = InboundContext {
195 actor_id: self.ctx.actor_id.clone(),
196 actor_name: &self.ctx.actor_name,
197 message_type,
198 send_mode,
199 remote: false,
200 origin_node: None,
201 };
202
203 for interceptor in &self.interceptors {
204 match interceptor.on_receive(
205 &ictx,
206 &runtime_headers,
207 &mut headers,
208 dispatch.message_any(),
209 ) {
210 Disposition::Continue => {}
211 Disposition::Delay(d) => {
212 total_delay += d;
213 }
214 disp @ (Disposition::Drop | Disposition::Reject(_) | Disposition::Retry(_)) => {
215 rejection = Some((interceptor.name().to_string(), disp));
216 break;
217 }
218 }
219 }
220 }
221
222 if let Some((interceptor_name, disposition)) = rejection {
224 if matches!(disposition, Disposition::Drop) {
225 if let Some(ref handler) = *self.dead_letter_handler {
226 let event = DeadLetterEvent {
227 target_id: self.ctx.actor_id.clone(),
228 target_name: Some(self.ctx.actor_name.clone()),
229 message_type,
230 send_mode,
231 reason: DeadLetterReason::DroppedByInterceptor {
232 interceptor: interceptor_name.clone(),
233 },
234 message: None,
235 };
236 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
237 handler.on_dead_letter(event);
238 }));
239 }
240 }
241 dispatch.reject(disposition, &interceptor_name);
242 return;
243 }
244
245 if !total_delay.is_zero() {
246 tokio::time::sleep(total_delay).await;
247 }
248
249 let ictx_for_wrap = InboundContext {
252 actor_id: self.ctx.actor_id.clone(),
253 actor_name: &self.ctx.actor_name,
254 message_type,
255 send_mode,
256 remote: false,
257 origin_node: None,
258 };
259 let wrappers = collect_handler_wrappers(&self.interceptors, &ictx_for_wrap, &headers);
260 let needs_wrap = wrappers.iter().any(|w| w.is_some());
261
262 self.ctx.headers = headers;
264
265 let cancel_token = dispatch.cancel_token();
267 self.ctx.set_cancellation_token(cancel_token.clone());
268
269 if let Some(ref token) = cancel_token {
271 if token.is_cancelled() {
272 dispatch.cancel();
273 self.ctx.set_cancellation_token(None);
274 return;
275 }
276 }
277
278 let result = if needs_wrap {
280 let (result_tx, mut result_rx) = tokio::sync::oneshot::channel();
281
282 let inner: std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>> = Box::pin(async {
283 let r = std::panic::AssertUnwindSafe(
284 dispatch.dispatch(&mut self.actor, &mut self.ctx),
285 )
286 .catch_unwind()
287 .await;
288 let _ = result_tx.send(r);
289 });
290
291 let wrapped = apply_handler_wrappers(wrappers, inner);
292
293 let wrapped = std::panic::AssertUnwindSafe(wrapped);
294
295 if let Some(ref token) = cancel_token {
296 tokio::select! {
297 biased;
298 _ = wrapped.catch_unwind() => {},
299 _ = token.cancelled() => {
300 self.ctx.set_cancellation_token(None);
301 return;
302 }
303 }
304 } else {
305 wrapped.catch_unwind().await.ok();
306 }
307
308 match result_rx.try_recv() {
309 Ok(r) => r,
310 Err(_) => Err(Box::new(
311 "interceptor wrap_handler did not await the handler future",
312 ) as Box<dyn std::any::Any + Send>),
313 }
314 } else {
315 if let Some(ref token) = cancel_token {
316 let dispatch_fut =
317 std::panic::AssertUnwindSafe(dispatch.dispatch(&mut self.actor, &mut self.ctx))
318 .catch_unwind();
319 tokio::select! {
320 biased;
321 r = dispatch_fut => r,
322 _ = token.cancelled() => {
323 self.ctx.set_cancellation_token(None);
324 return;
325 }
326 }
327 } else {
328 std::panic::AssertUnwindSafe(dispatch.dispatch(&mut self.actor, &mut self.ctx))
329 .catch_unwind()
330 .await
331 }
332 };
333
334 self.ctx.set_cancellation_token(None);
335
336 let ictx = InboundContext {
338 actor_id: self.ctx.actor_id.clone(),
339 actor_name: &self.ctx.actor_name,
340 message_type,
341 send_mode,
342 remote: false,
343 origin_node: None,
344 };
345
346 match result {
347 Ok(dispatch_result) => {
348 let outcome = match (&dispatch_result.reply, send_mode) {
349 (Some(reply), SendMode::Ask) => Outcome::AskSuccess {
350 reply: reply.as_ref(),
351 },
352 _ => Outcome::TellSuccess,
353 };
354
355 for interceptor in &self.interceptors {
356 interceptor.on_complete(&ictx, &runtime_headers, &self.ctx.headers, &outcome);
357 }
358
359 dispatch_result.send_reply();
361 }
362 Err(_panic) => {
363 let error = ActorError::internal("handler panicked");
364 let action = self.actor.on_error(&error);
365
366 let outcome = Outcome::HandlerError { error };
367 for interceptor in &self.interceptors {
368 interceptor.on_complete(&ictx, &runtime_headers, &self.ctx.headers, &outcome);
369 }
370
371 match action {
372 ErrorAction::Resume => {
373 }
375 ErrorAction::Stop | ErrorAction::Escalate => {
376 self.stop_reason = Some("handler panicked".into());
377 std::panic::resume_unwind(Box::new(
393 "dactor: actor stop requested after panic",
394 ));
395 }
396 ErrorAction::Restart => {
397 tracing::warn!("Restart not fully implemented, treating as Resume");
398 }
399 }
400 }
401 }
402 }
403}
404
405pub struct KameoActorRef<A: Actor> {
418 id: ActorId,
419 name: String,
420 inner: kameo::actor::ActorRef<KameoDactorActor<A>>,
421 bounded_tx: Option<BoundedMailboxSender<DactorMsg<A>>>,
422 outbound_interceptors: Arc<Vec<Box<dyn OutboundInterceptor>>>,
423 drop_observer: Option<Arc<dyn DropObserver>>,
424 dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
425}
426
427use dactor::runtime_support::BoundedMailboxSender;
428
429impl<A: Actor> Clone for KameoActorRef<A> {
430 fn clone(&self) -> Self {
431 Self {
432 id: self.id.clone(),
433 name: self.name.clone(),
434 inner: self.inner.clone(),
435 bounded_tx: self.bounded_tx.clone(),
436 outbound_interceptors: self.outbound_interceptors.clone(),
437 drop_observer: self.drop_observer.clone(),
438 dead_letter_handler: self.dead_letter_handler.clone(),
439 }
440 }
441}
442
443impl<A: Actor> std::fmt::Debug for KameoActorRef<A> {
444 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
445 write!(f, "KameoActorRef({}, {:?})", self.name, self.id)
446 }
447}
448
449impl<A: Actor> KameoActorRef<A> {
450 fn outbound_pipeline(&self) -> OutboundPipeline {
451 OutboundPipeline {
452 interceptors: self.outbound_interceptors.clone(),
453 drop_observer: self.drop_observer.clone(),
454 target_id: self.id.clone(),
455 target_name: self.name.clone(),
456 }
457 }
458
459 fn notify_dead_letter(
460 &self,
461 message_type: &'static str,
462 send_mode: SendMode,
463 reason: DeadLetterReason,
464 ) {
465 if let Some(ref handler) = *self.dead_letter_handler {
466 let event = DeadLetterEvent {
467 target_id: self.id.clone(),
468 target_name: Some(self.name.clone()),
469 message_type,
470 send_mode,
471 reason,
472 message: None,
473 };
474 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
475 handler.on_dead_letter(event);
476 }));
477 }
478 }
479
480 fn send_dispatch(&self, dispatch: Box<dyn Dispatch<A>>) -> Result<(), ActorSendError> {
483 if let Some(ref btx) = self.bounded_tx {
484 btx.try_send(DactorMsg(dispatch))
485 } else {
486 self.inner
487 .tell(DactorMsg(dispatch))
488 .try_send()
489 .map_err(|e| ActorSendError(e.to_string()))
490 }
491 }
492}
493
494impl<A: Actor + 'static> ActorRef<A> for KameoActorRef<A> {
495 fn id(&self) -> ActorId {
496 self.id.clone()
497 }
498
499 fn name(&self) -> String {
500 self.name.clone()
501 }
502
503 fn is_alive(&self) -> bool {
504 if let Some(ref btx) = self.bounded_tx {
505 !btx.is_closed() && self.inner.is_alive()
506 } else {
507 self.inner.is_alive()
508 }
509 }
510
511 fn pending_messages(&self) -> usize {
512 if let Some(ref btx) = self.bounded_tx {
513 btx.pending()
514 } else {
515 0
516 }
517 }
518
519 fn stop(&self) {
520 self.inner.kill();
521 }
522
523 fn tell<M>(&self, msg: M) -> Result<(), ActorSendError>
524 where
525 A: Handler<M>,
526 M: Message<Reply = ()>,
527 {
528 let pipeline = self.outbound_pipeline();
529 let result = pipeline.run_on_send(SendMode::Tell, &msg);
530 match result.disposition {
531 Disposition::Continue => {}
532 Disposition::Drop | Disposition::Reject(_) | Disposition::Retry(_) => return Ok(()),
533 Disposition::Delay(_) => {} }
535
536 let dispatch: Box<dyn Dispatch<A>> = Box::new(TypedDispatch { msg });
537 self.send_dispatch(dispatch).map_err(|e| {
538 let reason = if e.0.contains("full") {
539 DeadLetterReason::MailboxFull
540 } else {
541 DeadLetterReason::ActorStopped
542 };
543 self.notify_dead_letter(
544 std::any::type_name::<M>(),
545 SendMode::Tell,
546 reason,
547 );
548 ActorSendError(e.to_string())
549 })
550 }
551
552 fn ask<M>(
553 &self,
554 msg: M,
555 cancel: Option<CancellationToken>,
556 ) -> Result<AskReply<M::Reply>, ActorSendError>
557 where
558 A: Handler<M>,
559 M: Message,
560 {
561 let pipeline = self.outbound_pipeline();
562 let result = pipeline.run_on_send(SendMode::Ask, &msg);
563 match result.disposition {
564 Disposition::Continue => {}
565 Disposition::Delay(_) => {} Disposition::Drop => {
567 let (tx, rx) = tokio::sync::oneshot::channel();
568 let _ = tx.send(Err(RuntimeError::ActorNotFound(
569 "message dropped by outbound interceptor".into(),
570 )));
571 return Ok(AskReply::new(rx));
572 }
573 Disposition::Reject(reason) => {
574 let (tx, rx) = tokio::sync::oneshot::channel();
575 let _ = tx.send(Err(RuntimeError::Rejected {
576 interceptor: result.interceptor_name.to_string(),
577 reason,
578 }));
579 return Ok(AskReply::new(rx));
580 }
581 Disposition::Retry(retry_after) => {
582 let (tx, rx) = tokio::sync::oneshot::channel();
583 let _ = tx.send(Err(RuntimeError::RetryAfter {
584 interceptor: result.interceptor_name.to_string(),
585 retry_after,
586 }));
587 return Ok(AskReply::new(rx));
588 }
589 }
590
591 let (tx, rx) = tokio::sync::oneshot::channel();
592 let dispatch: Box<dyn Dispatch<A>> = Box::new(AskDispatch {
593 msg,
594 reply_tx: tx,
595 cancel,
596 });
597 self.send_dispatch(dispatch).map_err(|e| {
598 let reason = if e.0.contains("full") {
599 DeadLetterReason::MailboxFull
600 } else {
601 DeadLetterReason::ActorStopped
602 };
603 self.notify_dead_letter(
604 std::any::type_name::<M>(),
605 SendMode::Ask,
606 reason,
607 );
608 ActorSendError(e.to_string())
609 })?;
610 Ok(AskReply::new(rx))
611 }
612
613 fn expand<M, OutputItem>(
614 &self,
615 msg: M,
616 buffer: usize,
617 batch_config: Option<BatchConfig>,
618 cancel: Option<CancellationToken>,
619 ) -> Result<BoxStream<OutputItem>, ActorSendError>
620 where
621 A: ExpandHandler<M, OutputItem>,
622 M: Send + 'static,
623 OutputItem: Send + 'static,
624 {
625 let pipeline = self.outbound_pipeline();
626 let result = pipeline.run_on_send(SendMode::Expand, &msg);
627 match result.disposition {
628 Disposition::Continue => {}
629 Disposition::Delay(_) => {}
630 Disposition::Drop => {
631 return Err(ActorSendError(
632 "stream dropped by outbound interceptor".into(),
633 ));
634 }
635 Disposition::Reject(reason) => {
636 return Err(ActorSendError(format!("stream rejected: {}", reason)));
637 }
638 Disposition::Retry(_) => {
639 return Err(ActorSendError(
640 "stream retry requested by interceptor".into(),
641 ));
642 }
643 }
644
645 let buffer = buffer.max(1);
646 let (tx, rx) = tokio::sync::mpsc::channel(buffer);
647 let sender = StreamSender::new(tx);
648 let dispatch: Box<dyn Dispatch<A>> = Box::new(ExpandDispatch {
649 msg,
650 sender,
651 cancel,
652 });
653 self.send_dispatch(dispatch)?;
654
655 match batch_config {
656 Some(batch_config) => {
657 let (batch_tx, batch_rx) = tokio::sync::mpsc::channel::<Vec<OutputItem>>(buffer);
658 let reader = BatchReader::new(batch_rx);
659 let batch_delay = batch_config.max_delay;
660 tokio::spawn(async move {
661 let mut writer = BatchWriter::new(batch_tx, batch_config);
662 let mut rx = rx;
663 loop {
664 if writer.buffered_count() > 0 {
665 let deadline = tokio::time::Instant::now() + batch_delay;
666 tokio::select! {
667 biased;
668 item = rx.recv() => match item {
669 Some(item) => {
670 if writer.push(item).await.is_err() { break; }
671 }
672 None => break,
673 },
674 _ = tokio::time::sleep_until(deadline) => {
675 if writer.check_deadline().await.is_err() { break; }
676 }
677 }
678 } else {
679 match rx.recv().await {
680 Some(item) => {
681 if writer.push(item).await.is_err() {
682 break;
683 }
684 }
685 None => break,
686 }
687 }
688 }
689 let _ = writer.flush().await;
690 });
691 Ok(wrap_batched_stream_with_interception(
692 reader,
693 buffer,
694 pipeline,
695 std::any::type_name::<M>(),
696 SendMode::Expand,
697 ))
698 }
699 None => Ok(wrap_stream_with_interception(
700 rx,
701 buffer,
702 pipeline,
703 std::any::type_name::<M>(),
704 SendMode::Expand,
705 )),
706 }
707 }
708
709 fn reduce<InputItem, Reply>(
710 &self,
711 input: BoxStream<InputItem>,
712 buffer: usize,
713 batch_config: Option<BatchConfig>,
714 cancel: Option<CancellationToken>,
715 ) -> Result<AskReply<Reply>, ActorSendError>
716 where
717 A: ReduceHandler<InputItem, Reply>,
718 InputItem: Send + 'static,
719 Reply: Send + 'static,
720 {
721 let buffer = buffer.max(1);
722 let (item_tx, item_rx) = tokio::sync::mpsc::channel(buffer);
723 let receiver = StreamReceiver::new(item_rx);
724 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
725 let dispatch: Box<dyn Dispatch<A>> = Box::new(ReduceDispatch {
726 receiver,
727 reply_tx,
728 cancel: cancel.clone(),
729 });
730 self.send_dispatch(dispatch)?;
731
732 let pipeline = self.outbound_pipeline();
733 match batch_config {
734 Some(batch_config) => {
735 spawn_reduce_batched_drain(
736 input,
737 item_tx,
738 buffer,
739 batch_config,
740 cancel,
741 pipeline,
742 std::any::type_name::<InputItem>(),
743 );
744 }
745 None => {
746 spawn_reduce_drain(
747 input,
748 item_tx,
749 cancel,
750 pipeline,
751 std::any::type_name::<InputItem>(),
752 );
753 }
754 }
755
756 Ok(AskReply::new(reply_rx))
757 }
758
759 fn transform<InputItem, OutputItem>(
760 &self,
761 input: BoxStream<InputItem>,
762 buffer: usize,
763 batch_config: Option<BatchConfig>,
764 cancel: Option<CancellationToken>,
765 ) -> Result<BoxStream<OutputItem>, ActorSendError>
766 where
767 A: TransformHandler<InputItem, OutputItem>,
768 InputItem: Send + 'static,
769 OutputItem: Send + 'static,
770 {
771 let buffer = buffer.max(1);
772 let (item_tx, item_rx) = tokio::sync::mpsc::channel(buffer);
773 let (output_tx, mut output_rx) = tokio::sync::mpsc::channel(buffer);
774 let receiver = StreamReceiver::new(item_rx);
775 let sender = StreamSender::new(output_tx);
776 let dispatch: Box<dyn Dispatch<A>> = Box::new(TransformDispatch::new(
777 receiver,
778 sender,
779 cancel.clone(),
780 ));
781 self.send_dispatch(dispatch)?;
782
783 let pipeline = self.outbound_pipeline();
784 spawn_transform_drain(
785 input,
786 item_tx,
787 cancel,
788 pipeline.clone(),
789 std::any::type_name::<InputItem>(),
790 );
791
792 match batch_config {
793 Some(batch_config) => {
794 let (batch_tx, batch_rx) =
795 tokio::sync::mpsc::channel::<Vec<OutputItem>>(buffer);
796 let reader = BatchReader::new(batch_rx);
797 let batch_delay = batch_config.max_delay;
798 tokio::spawn(async move {
799 let mut writer = BatchWriter::new(batch_tx, batch_config);
800 loop {
801 if writer.buffered_count() > 0 {
802 let deadline = tokio::time::Instant::now() + batch_delay;
803 tokio::select! {
804 biased;
805 item = output_rx.recv() => match item {
806 Some(item) => {
807 if writer.push(item).await.is_err() { break; }
808 }
809 None => break,
810 },
811 _ = tokio::time::sleep_until(deadline) => {
812 if writer.check_deadline().await.is_err() { break; }
813 }
814 }
815 } else {
816 match output_rx.recv().await {
817 Some(item) => {
818 if writer.push(item).await.is_err() {
819 break;
820 }
821 }
822 None => break,
823 }
824 }
825 }
826 let _ = writer.flush().await;
827 });
828 Ok(wrap_batched_stream_with_interception(
829 reader,
830 buffer,
831 pipeline,
832 std::any::type_name::<OutputItem>(),
833 SendMode::Transform,
834 ))
835 }
836 None => Ok(wrap_stream_with_interception(
837 output_rx,
838 buffer,
839 pipeline,
840 std::any::type_name::<OutputItem>(),
841 SendMode::Transform,
842 )),
843 }
844 }
845}
846
847pub struct SpawnOptions {
854 pub interceptors: Vec<Box<dyn InboundInterceptor>>,
856 pub mailbox: MailboxConfig,
861}
862
863impl Default for SpawnOptions {
864 fn default() -> Self {
865 Self {
866 interceptors: Vec::new(),
867 mailbox: MailboxConfig::Unbounded,
868 }
869 }
870}
871
872pub struct KameoRuntime {
901 node_id: NodeId,
902 next_local: Arc<AtomicU64>,
903 cluster_events: KameoClusterEvents,
904 outbound_interceptors: Arc<Vec<Box<dyn OutboundInterceptor>>>,
905 drop_observer: Option<Arc<dyn DropObserver>>,
906 dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
907 watchers: WatcherMap,
908 spawn_manager: SpawnManager,
910 watch_manager: WatchManager,
911 cancel_manager: CancelManager,
912 node_directory: NodeDirectory,
913 system_actors: Option<KameoSystemActorRefs>,
915 #[allow(clippy::type_complexity)]
917 stop_receivers: Arc<Mutex<HashMap<ActorId, tokio::sync::oneshot::Receiver<Result<(), String>>>>>,
918 app_version: Option<String>,
920}
921
922pub struct KameoSystemActorRefs {
924 pub spawn_managers: Vec<kameo::actor::ActorRef<crate::system_actors::SpawnManagerActor>>,
925 spawn_manager_counter: std::sync::atomic::AtomicU64,
926 pub watch_manager: kameo::actor::ActorRef<crate::system_actors::WatchManagerActor>,
927 pub cancel_manager: kameo::actor::ActorRef<crate::system_actors::CancelManagerActor>,
928 pub node_directory: kameo::actor::ActorRef<crate::system_actors::NodeDirectoryActor>,
929}
930
931impl KameoSystemActorRefs {
932 pub fn spawn_manager(&self) -> &kameo::actor::ActorRef<crate::system_actors::SpawnManagerActor> {
934 let idx = self.spawn_manager_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
935 as usize % self.spawn_managers.len();
936 &self.spawn_managers[idx]
937 }
938}
939
940impl KameoRuntime {
941 pub fn new() -> Self {
946 Self::create(NodeId("kameo-node".into()))
947 }
948
949 pub fn with_node_id(node_id: NodeId) -> Self {
951 Self::create(node_id)
952 }
953
954 fn create(node_id: NodeId) -> Self {
955 Self {
956 node_id,
957 next_local: Arc::new(AtomicU64::new(1)),
958 cluster_events: KameoClusterEvents::new(),
959 outbound_interceptors: Arc::new(Vec::new()),
960 drop_observer: None,
961 dead_letter_handler: Arc::new(None),
962 watchers: Arc::new(Mutex::new(HashMap::new())),
963 spawn_manager: SpawnManager::new(TypeRegistry::new()),
964 watch_manager: WatchManager::new(),
965 cancel_manager: CancelManager::new(),
966 node_directory: NodeDirectory::new(),
967 system_actors: None,
968 stop_receivers: Arc::new(Mutex::new(HashMap::new())),
969 app_version: None,
970 }
971 }
972
973 pub const ADAPTER_NAME: &'static str = "kameo";
975
976 pub fn with_app_version(mut self, version: impl Into<String>) -> Self {
982 self.app_version = Some(version.into());
983 self
984 }
985
986 pub fn app_version(&self) -> Option<&str> {
988 self.app_version.as_deref()
989 }
990
991 pub fn handshake_request(&self) -> dactor::HandshakeRequest {
994 dactor::HandshakeRequest::from_runtime(
995 self.node_id.clone(),
996 self.app_version.clone(),
997 Self::ADAPTER_NAME,
998 )
999 }
1000
1001 pub fn start_system_actors(&mut self) {
1012 self.start_system_actors_with_config(dactor::SystemActorConfig::default());
1013 }
1014
1015 pub fn start_system_actors_with_config(&mut self, config: dactor::SystemActorConfig) {
1021 use crate::system_actors::*;
1022 use kameo::actor::Spawn;
1023
1024 let pool_size = config.spawn_manager_pool_size.unwrap_or(1).max(1);
1025 let mut spawn_refs = Vec::with_capacity(pool_size);
1026
1027 for _ in 0..pool_size {
1028 let spawn_mgr_ref = SpawnManagerActor::spawn_with_mailbox(
1029 (self.node_id.clone(), TypeRegistry::new(), self.next_local.clone()),
1030 kameo::mailbox::unbounded(),
1031 );
1032 spawn_refs.push(spawn_mgr_ref);
1033 }
1034
1035 let watch_mgr_ref =
1036 WatchManagerActor::spawn_with_mailbox((), kameo::mailbox::unbounded());
1037 let cancel_mgr_ref =
1038 CancelManagerActor::spawn_with_mailbox((), kameo::mailbox::unbounded());
1039 let node_dir_ref =
1040 NodeDirectoryActor::spawn_with_mailbox((), kameo::mailbox::unbounded());
1041
1042 self.system_actors = Some(KameoSystemActorRefs {
1043 spawn_managers: spawn_refs,
1044 spawn_manager_counter: std::sync::atomic::AtomicU64::new(0),
1045 watch_manager: watch_mgr_ref,
1046 cancel_manager: cancel_mgr_ref,
1047 node_directory: node_dir_ref,
1048 });
1049 }
1050
1051 pub fn node_id(&self) -> &NodeId {
1053 &self.node_id
1054 }
1055
1056 pub fn system_actor_refs(&self) -> Option<&KameoSystemActorRefs> {
1060 self.system_actors.as_ref()
1061 }
1062
1063 pub fn add_outbound_interceptor(&mut self, interceptor: Box<dyn OutboundInterceptor>) {
1068 Arc::get_mut(&mut self.outbound_interceptors)
1069 .expect("cannot add interceptors after actors are spawned")
1070 .push(interceptor);
1071 }
1072
1073 pub fn set_drop_observer(&mut self, observer: Arc<dyn DropObserver>) {
1077 self.drop_observer = Some(observer);
1078 }
1079
1080 pub fn set_dead_letter_handler(&mut self, handler: Arc<dyn DeadLetterHandler>) {
1083 self.dead_letter_handler = Arc::new(Some(handler));
1084 }
1085
1086 pub fn cluster_events_handle(&self) -> &KameoClusterEvents {
1088 &self.cluster_events
1089 }
1090
1091 pub fn cluster_events(&self) -> &KameoClusterEvents {
1093 &self.cluster_events
1094 }
1095
1096 pub async fn spawn<A>(&self, name: &str, args: A::Args) -> Result<KameoActorRef<A>, dactor::errors::RuntimeError>
1098 where
1099 A: Actor<Deps = ()> + 'static,
1100 {
1101 Ok(self.spawn_internal::<A>(name, args, (), Vec::new(), MailboxConfig::Unbounded))
1102 }
1103
1104 pub async fn spawn_with_deps<A>(&self, name: &str, args: A::Args, deps: A::Deps) -> Result<KameoActorRef<A>, dactor::errors::RuntimeError>
1106 where
1107 A: Actor + 'static,
1108 {
1109 Ok(self.spawn_internal::<A>(name, args, deps, Vec::new(), MailboxConfig::Unbounded))
1110 }
1111
1112 pub async fn spawn_with_options<A>(
1114 &self,
1115 name: &str,
1116 args: A::Args,
1117 options: SpawnOptions,
1118 ) -> Result<KameoActorRef<A>, dactor::errors::RuntimeError>
1119 where
1120 A: Actor<Deps = ()> + 'static,
1121 {
1122 Ok(self.spawn_internal::<A>(name, args, (), options.interceptors, options.mailbox))
1123 }
1124
1125 fn spawn_internal<A>(
1126 &self,
1127 name: &str,
1128 args: A::Args,
1129 deps: A::Deps,
1130 interceptors: Vec<Box<dyn InboundInterceptor>>,
1131 mailbox: MailboxConfig,
1132 ) -> KameoActorRef<A>
1133 where
1134 A: Actor + 'static,
1135 {
1136 use kameo::actor::Spawn;
1137
1138 let local = self.next_local.fetch_add(1, Ordering::SeqCst);
1139 let actor_id = ActorId {
1140 node: self.node_id.clone(),
1141 local,
1142 };
1143 let actor_name = name.to_string();
1144
1145 let (stop_tx, stop_rx) = tokio::sync::oneshot::channel();
1146
1147 let spawn_args = KameoSpawnArgs {
1148 args,
1149 deps,
1150 actor_id: actor_id.clone(),
1151 actor_name: actor_name.clone(),
1152 interceptors,
1153 watchers: self.watchers.clone(),
1154 dead_letter_handler: self.dead_letter_handler.clone(),
1155 stop_notifier: Some(stop_tx),
1156 };
1157
1158 let actor_ref =
1160 KameoDactorActor::<A>::spawn_with_mailbox(spawn_args, kameo::mailbox::unbounded());
1161
1162 let bounded_tx = match mailbox {
1164 MailboxConfig::Bounded { capacity, overflow } => {
1165 let (btx, mut brx) = tokio::sync::mpsc::channel::<DactorMsg<A>>(capacity);
1166 let fwd_ref = actor_ref.clone();
1167 tokio::spawn(async move {
1168 while let Some(msg) = brx.recv().await {
1169 if fwd_ref.tell(msg).try_send().is_err() {
1170 break;
1171 }
1172 }
1173 });
1174 Some(BoundedMailboxSender::new(btx, overflow))
1175 }
1176 MailboxConfig::Unbounded => None,
1177 };
1178
1179 self.stop_receivers.lock().unwrap().insert(actor_id.clone(), stop_rx);
1181
1182 KameoActorRef {
1183 id: actor_id,
1184 name: actor_name,
1185 inner: actor_ref,
1186 bounded_tx,
1187 outbound_interceptors: self.outbound_interceptors.clone(),
1188 drop_observer: self.drop_observer.clone(),
1189 dead_letter_handler: self.dead_letter_handler.clone(),
1190 }
1191 }
1192 pub fn watch<W>(&self, watcher: &KameoActorRef<W>, target_id: ActorId)
1197 where
1198 W: Actor + Handler<ChildTerminated> + 'static,
1199 {
1200 let watcher_id = watcher.id();
1201 let watcher_inner = watcher.inner.clone();
1202
1203 let entry = WatchEntry {
1204 watcher_id,
1205 notify: Box::new(move |msg: ChildTerminated| {
1206 let dispatch: Box<dyn Dispatch<W>> = Box::new(TypedDispatch { msg });
1207 if watcher_inner.tell(DactorMsg(dispatch)).try_send().is_err() {
1208 tracing::debug!("watch notification dropped — watcher may have stopped");
1209 }
1210 }),
1211 };
1212
1213 let mut watchers = self.watchers.lock().unwrap();
1214 watchers.entry(target_id).or_default().push(entry);
1215 }
1216
1217 pub fn unwatch(&self, watcher_id: &ActorId, target_id: &ActorId) {
1219 let mut watchers = self.watchers.lock().unwrap();
1220 if let Some(entries) = watchers.get_mut(target_id) {
1221 entries.retain(|e| &e.watcher_id != watcher_id);
1222 if entries.is_empty() {
1223 watchers.remove(target_id);
1224 }
1225 }
1226 }
1227
1228 pub fn spawn_manager(&self) -> &SpawnManager {
1234 &self.spawn_manager
1235 }
1236
1237 pub fn spawn_manager_mut(&mut self) -> &mut SpawnManager {
1239 &mut self.spawn_manager
1240 }
1241
1242 pub fn register_factory(
1248 &mut self,
1249 type_name: impl Into<String>,
1250 factory: impl Fn(&[u8]) -> Result<Box<dyn std::any::Any + Send>, dactor::remote::SerializationError>
1251 + Send
1252 + Sync
1253 + 'static,
1254 ) {
1255 self.spawn_manager
1256 .type_registry_mut()
1257 .register_factory(type_name, factory);
1258 }
1259
1260 pub fn handle_spawn_request(
1277 &mut self,
1278 request: &SpawnRequest,
1279 ) -> Result<(ActorId, Box<dyn std::any::Any + Send>), SpawnResponse> {
1280 match self.spawn_manager.create_actor(request) {
1281 Ok(actor) => {
1282 let local = self.next_local.fetch_add(1, Ordering::SeqCst);
1283 let actor_id = ActorId {
1284 node: self.node_id.clone(),
1285 local,
1286 };
1287 self.spawn_manager.record_spawn(actor_id.clone());
1288 Ok((actor_id, actor))
1289 }
1290 Err(e) => Err(SpawnResponse::Failure {
1291 request_id: request.request_id.clone(),
1292 error: e.to_string(),
1293 }),
1294 }
1295 }
1296
1297 pub fn watch_manager(&self) -> &WatchManager {
1303 &self.watch_manager
1304 }
1305
1306 pub fn watch_manager_mut(&mut self) -> &mut WatchManager {
1308 &mut self.watch_manager
1309 }
1310
1311 pub fn remote_watch(&mut self, target: ActorId, watcher: ActorId) {
1314 self.watch_manager.watch(target, watcher);
1315 }
1316
1317 pub fn remote_unwatch(&mut self, target: &ActorId, watcher: &ActorId) {
1319 self.watch_manager.unwatch(target, watcher);
1320 }
1321
1322 pub fn notify_terminated(&mut self, terminated: &ActorId) -> Vec<WatchNotification> {
1328 self.watch_manager.on_terminated(terminated)
1329 }
1330
1331 pub fn cancel_manager(&self) -> &CancelManager {
1337 &self.cancel_manager
1338 }
1339
1340 pub fn cancel_manager_mut(&mut self) -> &mut CancelManager {
1342 &mut self.cancel_manager
1343 }
1344
1345 pub fn register_cancel(&mut self, request_id: String, token: CancellationToken) {
1347 self.cancel_manager.register(request_id, token);
1348 }
1349
1350 pub fn cancel_request(&mut self, request_id: &str) -> CancelResponse {
1352 self.cancel_manager.cancel(request_id)
1353 }
1354
1355 pub fn complete_request(&mut self, request_id: &str) {
1360 self.cancel_manager.remove(request_id);
1361 }
1362
1363 pub fn node_directory(&self) -> &NodeDirectory {
1369 &self.node_directory
1370 }
1371
1372 pub fn node_directory_mut(&mut self) -> &mut NodeDirectory {
1374 &mut self.node_directory
1375 }
1376
1377 pub fn connect_peer(&mut self, peer_id: NodeId, address: Option<String>) {
1390 let was_connected = self.node_directory.is_connected(&peer_id);
1391 if let Some(existing) = self.node_directory.get_peer(&peer_id) {
1392 let resolved_address = address.or_else(|| existing.address.clone());
1393 self.node_directory.remove_peer(&peer_id);
1394 self.node_directory.add_peer(peer_id.clone(), resolved_address);
1395 } else {
1396 self.node_directory.add_peer(peer_id.clone(), address);
1397 }
1398 self.node_directory.set_status(&peer_id, PeerStatus::Connected);
1399 if !was_connected {
1400 self.cluster_events.emit(dactor::ClusterEvent::NodeJoined(peer_id));
1401 }
1402 }
1403
1404 pub fn disconnect_peer(&mut self, peer_id: &NodeId) {
1408 let was_connected = self.node_directory.is_connected(peer_id);
1409 self.node_directory.set_status(peer_id, PeerStatus::Disconnected);
1410 if was_connected {
1411 self.cluster_events.emit(dactor::ClusterEvent::NodeLeft(peer_id.clone()));
1412 }
1413 }
1414
1415 pub fn is_peer_connected(&self, peer_id: &NodeId) -> bool {
1417 self.node_directory.is_connected(peer_id)
1418 }
1419
1420 pub async fn await_stop(&self, actor_id: &ActorId) -> Result<(), String> {
1432 let rx = {
1433 let mut receivers = self.stop_receivers.lock().unwrap();
1434 receivers.remove(actor_id)
1435 };
1436 match rx {
1437 Some(rx) => rx
1438 .await
1439 .map_err(|_| "stop notifier dropped".to_string())
1440 .and_then(|r| r),
1441 None => Ok(()),
1442 }
1443 }
1444
1445 pub async fn await_all(&self) -> Result<(), String> {
1450 let receivers: Vec<_> = {
1451 let mut map = self.stop_receivers.lock().unwrap();
1452 map.drain().collect()
1453 };
1454 let mut first_error = None;
1455 for (_, rx) in receivers {
1456 let result = rx.await.map_err(|e| format!("stop notifier dropped: {e}")).and_then(|r| r);
1457 if let Err(e) = result {
1458 if first_error.is_none() {
1459 first_error = Some(e);
1460 }
1461 }
1462 }
1463 match first_error {
1464 Some(e) => Err(e),
1465 None => Ok(()),
1466 }
1467 }
1468
1469 pub fn cleanup_finished(&self) {
1474 let mut receivers = self.stop_receivers.lock().unwrap();
1475 receivers.retain(|_, rx| {
1476 matches!(rx.try_recv(), Err(tokio::sync::oneshot::error::TryRecvError::Empty))
1477 });
1478 }
1479
1480 pub fn active_handle_count(&self) -> usize {
1486 self.stop_receivers.lock().unwrap().len()
1487 }
1488}
1489
1490impl Default for KameoRuntime {
1491 fn default() -> Self {
1492 Self::new()
1493 }
1494}
1495
1496#[async_trait::async_trait]
1507impl dactor::system_router::SystemMessageRouter for KameoRuntime {
1508 async fn route_system_envelope(
1509 &self,
1510 envelope: dactor::remote::WireEnvelope,
1511 ) -> Result<dactor::system_router::RoutingOutcome, dactor::system_router::RoutingError> {
1512 use dactor::system_actors::*;
1513 use dactor::system_router::{RoutingError, RoutingOutcome};
1514
1515 dactor::system_router::validate_system_message_type(&envelope.message_type)?;
1516
1517 let refs = self
1518 .system_actors
1519 .as_ref()
1520 .ok_or_else(|| RoutingError::new("system actors not started"))?;
1521
1522 match envelope.message_type.as_str() {
1523 SYSTEM_MSG_TYPE_SPAWN => {
1524 let request = dactor::proto::decode_spawn_request(&envelope.body)
1525 .map_err(|e| RoutingError::new(format!("decode SpawnRequest: {e}")))?;
1526
1527 let req_id = request.request_id.clone();
1528 let outcome = refs
1529 .spawn_manager()
1530 .ask(crate::system_actors::HandleSpawnRequest(request))
1531 .await
1532 .map_err(|e| RoutingError::new(format!("SpawnManager ask: {e}")))?;
1533
1534 match outcome {
1535 crate::system_actors::SpawnOutcome::Success { actor_id, .. } => {
1536 Ok(RoutingOutcome::SpawnCompleted {
1537 request_id: req_id,
1538 actor_id,
1539 })
1540 }
1541 crate::system_actors::SpawnOutcome::Failure(SpawnResponse::Failure {
1542 request_id,
1543 error,
1544 }) => Ok(RoutingOutcome::SpawnFailed { request_id, error }),
1545 crate::system_actors::SpawnOutcome::Failure(SpawnResponse::Success {
1546 ..
1547 }) => {
1548 unreachable!("SpawnOutcome::Failure always wraps SpawnResponse::Failure")
1549 }
1550 }
1551 }
1552
1553 SYSTEM_MSG_TYPE_WATCH => {
1554 let request = dactor::proto::decode_watch_request(&envelope.body)
1555 .map_err(|e| RoutingError::new(format!("decode WatchRequest: {e}")))?;
1556
1557 refs.watch_manager
1558 .tell(crate::system_actors::RemoteWatch {
1559 target: request.target,
1560 watcher: request.watcher,
1561 })
1562 .await
1563 .map_err(|e| RoutingError::new(format!("WatchManager tell: {e}")))?;
1564
1565 Ok(RoutingOutcome::Acknowledged)
1566 }
1567
1568 SYSTEM_MSG_TYPE_UNWATCH => {
1569 let request = dactor::proto::decode_unwatch_request(&envelope.body)
1570 .map_err(|e| RoutingError::new(format!("decode UnwatchRequest: {e}")))?;
1571
1572 refs.watch_manager
1573 .tell(crate::system_actors::RemoteUnwatch {
1574 target: request.target,
1575 watcher: request.watcher,
1576 })
1577 .await
1578 .map_err(|e| RoutingError::new(format!("WatchManager tell: {e}")))?;
1579
1580 Ok(RoutingOutcome::Acknowledged)
1581 }
1582
1583 SYSTEM_MSG_TYPE_CANCEL => {
1584 let request = dactor::proto::decode_cancel_request(&envelope.body)
1585 .map_err(|e| RoutingError::new(format!("decode CancelRequest: {e}")))?;
1586
1587 let request_id = request
1588 .request_id
1589 .ok_or_else(|| RoutingError::new("CancelRequest missing request_id"))?;
1590
1591 let outcome = refs
1592 .cancel_manager
1593 .ask(crate::system_actors::CancelById(request_id))
1594 .await
1595 .map_err(|e| RoutingError::new(format!("CancelManager ask: {e}")))?;
1596
1597 match outcome.0 {
1598 CancelResponse::Acknowledged => Ok(RoutingOutcome::CancelAcknowledged),
1599 CancelResponse::NotFound { reason } => {
1600 Ok(RoutingOutcome::CancelNotFound { reason })
1601 }
1602 }
1603 }
1604
1605 SYSTEM_MSG_TYPE_CONNECT_PEER => {
1606 let (peer_id, address) = dactor::proto::decode_connect_peer(&envelope.body)
1607 .map_err(|e| RoutingError::new(format!("decode ConnectPeer: {e}")))?;
1608
1609 refs.node_directory
1610 .tell(crate::system_actors::ConnectPeer {
1611 peer_id,
1612 address,
1613 })
1614 .await
1615 .map_err(|e| RoutingError::new(format!("NodeDirectory tell: {e}")))?;
1616
1617 Ok(RoutingOutcome::Acknowledged)
1618 }
1619
1620 SYSTEM_MSG_TYPE_DISCONNECT_PEER => {
1621 let peer_id = dactor::proto::decode_disconnect_peer(&envelope.body)
1622 .map_err(|e| RoutingError::new(format!("decode DisconnectPeer: {e}")))?;
1623
1624 refs.node_directory
1625 .tell(crate::system_actors::DisconnectPeer(peer_id))
1626 .await
1627 .map_err(|e| RoutingError::new(format!("NodeDirectory tell: {e}")))?;
1628
1629 Ok(RoutingOutcome::Acknowledged)
1630 }
1631
1632 other => Err(RoutingError::new(format!(
1633 "unhandled system message type: {other}"
1634 ))),
1635 }
1636 }
1637}
1638