Skip to main content

dactor_kameo/
runtime.rs

1//! V0.2 kameo adapter runtime for the dactor actor framework.
2//!
3//! Bridges dactor's `Actor`/`Handler<M>`/`ActorRef<A>` API with kameo's
4//! single-message-type `kameo::Actor` trait using type-erased dispatch.
5
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Mutex};
9use std::time::Duration;
10
11use futures::FutureExt;
12use tokio_util::sync::CancellationToken;
13
14use dactor::actor::{
15    Actor, ActorContext, ActorError, ActorRef, AskReply, ReduceHandler, Handler, ExpandHandler,
16    TransformHandler,
17};
18use dactor::dead_letter::{DeadLetterEvent, DeadLetterHandler, DeadLetterReason};
19use dactor::dispatch::{AskDispatch, Dispatch, ReduceDispatch, ExpandDispatch, TransformDispatch, TypedDispatch};
20use dactor::errors::{ActorSendError, ErrorAction, RuntimeError};
21use dactor::interceptor::{
22    collect_handler_wrappers, apply_handler_wrappers,
23    Disposition, DropObserver, InboundContext, InboundInterceptor, OutboundInterceptor, Outcome,
24    SendMode,
25};
26use dactor::mailbox::MailboxConfig;
27use dactor::message::{Headers, Message, RuntimeHeaders};
28use dactor::node::{ActorId, NodeId};
29use dactor::runtime_support::{
30    spawn_reduce_batched_drain, spawn_reduce_drain, spawn_transform_drain,
31    wrap_batched_stream_with_interception, wrap_stream_with_interception, OutboundPipeline,
32};
33use dactor::stream::{
34    BatchConfig, BatchReader, BatchWriter, BoxStream, StreamReceiver, StreamSender,
35};
36use dactor::supervision::ChildTerminated;
37use dactor::system_actors::{
38    CancelManager, CancelResponse, NodeDirectory, PeerStatus, SpawnManager, SpawnRequest,
39    SpawnResponse, WatchManager, WatchNotification,
40};
41use dactor::type_registry::TypeRegistry;
42
43use crate::cluster::KameoClusterEvents;
44
45// ---------------------------------------------------------------------------
46// Watch registry
47// ---------------------------------------------------------------------------
48
49/// A type-erased entry in the watch registry.
50struct WatchEntry {
51    watcher_id: ActorId,
52    /// Closure that delivers a [`ChildTerminated`] to the watcher actor.
53    notify: Box<dyn Fn(ChildTerminated) + Send + Sync>,
54}
55
56/// Shared watch registry mapping watched actor ID → list of watcher entries.
57type WatcherMap = Arc<Mutex<HashMap<ActorId, Vec<WatchEntry>>>>;
58
59// ---------------------------------------------------------------------------
60// Kameo wrapper actor
61// ---------------------------------------------------------------------------
62
63/// The kameo message type wrapping a type-erased dactor dispatch envelope.
64struct DactorMsg<A: Actor>(Box<dyn Dispatch<A>>);
65
66/// The kameo `Actor` implementation that bridges to a dactor `Actor`.
67struct KameoDactorActor<A: Actor> {
68    actor: A,
69    ctx: ActorContext,
70    interceptors: Vec<Box<dyn InboundInterceptor>>,
71    watchers: WatcherMap,
72    stop_reason: Option<String>,
73    dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
74    /// Notified when the actor stops (for await_stop).
75    stop_notifier: Option<tokio::sync::oneshot::Sender<Result<(), String>>>,
76}
77
78/// Arguments passed to the kameo actor at spawn time.
79struct KameoSpawnArgs<A: Actor> {
80    args: A::Args,
81    deps: A::Deps,
82    actor_id: ActorId,
83    actor_name: String,
84    interceptors: Vec<Box<dyn InboundInterceptor>>,
85    watchers: WatcherMap,
86    dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
87    stop_notifier: Option<tokio::sync::oneshot::Sender<Result<(), String>>>,
88}
89
90impl<A: Actor + 'static> kameo::Actor for KameoDactorActor<A> {
91    type Args = KameoSpawnArgs<A>;
92    type Error = kameo::error::Infallible;
93
94    async fn on_start(
95        args: Self::Args,
96        _actor_ref: kameo::actor::ActorRef<Self>,
97    ) -> Result<Self, Self::Error> {
98        let mut actor = A::create(args.args, args.deps);
99        let mut ctx = ActorContext::new(args.actor_id, args.actor_name);
100        actor.on_start(&mut ctx).await;
101        Ok(Self {
102            actor,
103            ctx,
104            interceptors: args.interceptors,
105            watchers: args.watchers,
106            stop_reason: None,
107            dead_letter_handler: args.dead_letter_handler,
108            stop_notifier: args.stop_notifier,
109        })
110    }
111
112    async fn on_stop(
113        &mut self,
114        _actor_ref: kameo::actor::WeakActorRef<Self>,
115        _reason: kameo::error::ActorStopReason,
116    ) -> Result<(), Self::Error> {
117        // Reset context to lifecycle semantics before on_stop
118        self.ctx.send_mode = None;
119        self.ctx.headers = Headers::new();
120        self.ctx.set_cancellation_token(None);
121
122        // Run on_stop with panic catching so we can propagate errors
123        let stop_result =
124            std::panic::AssertUnwindSafe(self.actor.on_stop())
125                .catch_unwind()
126                .await;
127        let stop_err = match stop_result {
128            Ok(()) => None,
129            Err(_panic) => Some("actor panicked in on_stop".to_string()),
130        };
131
132        // Notify all watchers that this actor has terminated.
133        let actor_id = self.ctx.actor_id.clone();
134        let actor_name = self.ctx.actor_name.clone();
135        let entries = {
136            let mut watchers = self.watchers.lock().unwrap();
137            // Remove entries where this actor is the TARGET (watchers of this actor)
138            let target_entries = watchers.remove(&actor_id).unwrap_or_default();
139            // Also clean up entries where this actor is the WATCHER (prevent leak)
140            for entries in watchers.values_mut() {
141                entries.retain(|e| e.watcher_id != actor_id);
142            }
143            watchers.retain(|_, v| !v.is_empty());
144            target_entries
145        };
146        if !entries.is_empty() {
147            let notification = ChildTerminated {
148                child_id: actor_id,
149                child_name: actor_name,
150                reason: self.stop_reason.clone(),
151            };
152            for entry in &entries {
153                (entry.notify)(notification.clone());
154            }
155        }
156
157        // Notify await_stop() waiters with the result
158        if let Some(tx) = self.stop_notifier.take() {
159            let result = match &stop_err {
160                Some(e) => Err(e.clone()),
161                None => Ok(()),
162            };
163            let _ = tx.send(result);
164        }
165
166        Ok(())
167    }
168}
169
170impl<A: Actor + 'static> kameo::message::Message<DactorMsg<A>> for KameoDactorActor<A> {
171    type Reply = ();
172
173    async fn handle(
174        &mut self,
175        msg: DactorMsg<A>,
176        _ctx: &mut kameo::message::Context<Self, Self::Reply>,
177    ) -> Self::Reply {
178        let dispatch = msg.0;
179
180        // Capture metadata before dispatch consumes the message
181        let send_mode = dispatch.send_mode();
182        let message_type = dispatch.message_type_name();
183
184        self.ctx.send_mode = Some(send_mode);
185        self.ctx.headers = Headers::new();
186
187        // Run inbound interceptor pipeline
188        let runtime_headers = RuntimeHeaders::new();
189        let mut headers = Headers::new();
190        let mut total_delay = Duration::ZERO;
191        let mut rejection: Option<(String, Disposition)> = None;
192
193        {
194            let ictx = InboundContext {
195                actor_id: self.ctx.actor_id.clone(),
196                actor_name: &self.ctx.actor_name,
197                message_type,
198                send_mode,
199                remote: false,
200                origin_node: None,
201            };
202
203            for interceptor in &self.interceptors {
204                match interceptor.on_receive(
205                    &ictx,
206                    &runtime_headers,
207                    &mut headers,
208                    dispatch.message_any(),
209                ) {
210                    Disposition::Continue => {}
211                    Disposition::Delay(d) => {
212                        total_delay += d;
213                    }
214                    disp @ (Disposition::Drop | Disposition::Reject(_) | Disposition::Retry(_)) => {
215                        rejection = Some((interceptor.name().to_string(), disp));
216                        break;
217                    }
218                }
219            }
220        }
221
222        // If rejected/dropped/retry, propagate proper error to caller
223        if let Some((interceptor_name, disposition)) = rejection {
224            if matches!(disposition, Disposition::Drop) {
225                if let Some(ref handler) = *self.dead_letter_handler {
226                    let event = DeadLetterEvent {
227                        target_id: self.ctx.actor_id.clone(),
228                        target_name: Some(self.ctx.actor_name.clone()),
229                        message_type,
230                        send_mode,
231                        reason: DeadLetterReason::DroppedByInterceptor {
232                            interceptor: interceptor_name.clone(),
233                        },
234                        message: None,
235                    };
236                    let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
237                        handler.on_dead_letter(event);
238                    }));
239                }
240            }
241            dispatch.reject(disposition, &interceptor_name);
242            return;
243        }
244
245        if !total_delay.is_zero() {
246            tokio::time::sleep(total_delay).await;
247        }
248
249        // Collect handler wrappers BEFORE moving headers into ctx,
250        // so interceptors can read headers without borrow conflicts.
251        let ictx_for_wrap = InboundContext {
252            actor_id: self.ctx.actor_id.clone(),
253            actor_name: &self.ctx.actor_name,
254            message_type,
255            send_mode,
256            remote: false,
257            origin_node: None,
258        };
259        let wrappers = collect_handler_wrappers(&self.interceptors, &ictx_for_wrap, &headers);
260        let needs_wrap = wrappers.iter().any(|w| w.is_some());
261
262        // Copy interceptor-populated headers to ActorContext
263        self.ctx.headers = headers;
264
265        // Propagate cancellation token to context
266        let cancel_token = dispatch.cancel_token();
267        self.ctx.set_cancellation_token(cancel_token.clone());
268
269        // Check if already cancelled before dispatching
270        if let Some(ref token) = cancel_token {
271            if token.is_cancelled() {
272                dispatch.cancel();
273                self.ctx.set_cancellation_token(None);
274                return;
275            }
276        }
277
278        // Dispatch with panic catching and cancellation racing
279        let result = if needs_wrap {
280            let (result_tx, mut result_rx) = tokio::sync::oneshot::channel();
281
282            let inner: std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>> = Box::pin(async {
283                let r = std::panic::AssertUnwindSafe(
284                    dispatch.dispatch(&mut self.actor, &mut self.ctx),
285                )
286                .catch_unwind()
287                .await;
288                let _ = result_tx.send(r);
289            });
290
291            let wrapped = apply_handler_wrappers(wrappers, inner);
292
293            let wrapped = std::panic::AssertUnwindSafe(wrapped);
294
295            if let Some(ref token) = cancel_token {
296                tokio::select! {
297                    biased;
298                    _ = wrapped.catch_unwind() => {},
299                    _ = token.cancelled() => {
300                        self.ctx.set_cancellation_token(None);
301                        return;
302                    }
303                }
304            } else {
305                wrapped.catch_unwind().await.ok();
306            }
307
308            match result_rx.try_recv() {
309                Ok(r) => r,
310                Err(_) => Err(Box::new(
311                    "interceptor wrap_handler did not await the handler future",
312                ) as Box<dyn std::any::Any + Send>),
313            }
314        } else {
315            if let Some(ref token) = cancel_token {
316                let dispatch_fut =
317                    std::panic::AssertUnwindSafe(dispatch.dispatch(&mut self.actor, &mut self.ctx))
318                        .catch_unwind();
319                tokio::select! {
320                    biased;
321                    r = dispatch_fut => r,
322                    _ = token.cancelled() => {
323                        self.ctx.set_cancellation_token(None);
324                        return;
325                    }
326                }
327            } else {
328                std::panic::AssertUnwindSafe(dispatch.dispatch(&mut self.actor, &mut self.ctx))
329                    .catch_unwind()
330                    .await
331            }
332        };
333
334        self.ctx.set_cancellation_token(None);
335
336        // Build context for on_complete
337        let ictx = InboundContext {
338            actor_id: self.ctx.actor_id.clone(),
339            actor_name: &self.ctx.actor_name,
340            message_type,
341            send_mode,
342            remote: false,
343            origin_node: None,
344        };
345
346        match result {
347            Ok(dispatch_result) => {
348                let outcome = match (&dispatch_result.reply, send_mode) {
349                    (Some(reply), SendMode::Ask) => Outcome::AskSuccess {
350                        reply: reply.as_ref(),
351                    },
352                    _ => Outcome::TellSuccess,
353                };
354
355                for interceptor in &self.interceptors {
356                    interceptor.on_complete(&ictx, &runtime_headers, &self.ctx.headers, &outcome);
357                }
358
359                // Send reply to caller AFTER interceptors have seen it
360                dispatch_result.send_reply();
361            }
362            Err(_panic) => {
363                let error = ActorError::internal("handler panicked");
364                let action = self.actor.on_error(&error);
365
366                let outcome = Outcome::HandlerError { error };
367                for interceptor in &self.interceptors {
368                    interceptor.on_complete(&ictx, &runtime_headers, &self.ctx.headers, &outcome);
369                }
370
371                match action {
372                    ErrorAction::Resume => {
373                        // Actor resumes — do NOT set stop_reason (graceful lifecycle continues)
374                    }
375                    ErrorAction::Stop | ErrorAction::Escalate => {
376                        self.stop_reason = Some("handler panicked".into());
377                        // Signal kameo to stop via the context — but we don't
378                        // have access to the kameo Context here since we
379                        // consumed it in the signature. Instead, use kill on
380                        // the actor_ref stored externally. For now, log and
381                        // rely on the fact that kameo's on_panic default
382                        // behavior already stops the actor on panic. We
383                        // caught the panic ourselves, so we need to re-panic
384                        // to trigger kameo's stop.
385                        //
386                        // Actually, since we caught the panic with
387                        // catch_unwind, kameo won't see it. We can use the
388                        // kameo context's stop() method. But we don't have
389                        // the kameo message::Context here.
390                        //
391                        // Re-panic to let kameo's on_panic handler deal with it.
392                        std::panic::resume_unwind(Box::new(
393                            "dactor: actor stop requested after panic",
394                        ));
395                    }
396                    ErrorAction::Restart => {
397                        tracing::warn!("Restart not fully implemented, treating as Resume");
398                    }
399                }
400            }
401        }
402    }
403}
404
405// ---------------------------------------------------------------------------
406// KameoActorRef — dactor ActorRef backed by kameo
407// ---------------------------------------------------------------------------
408
409/// A dactor `ActorRef` backed by a kameo `ActorRef`.
410///
411/// Messages are delivered through kameo's mailbox as type-erased dispatch
412/// envelopes, enabling multiple `Handler<M>` impls per actor.
413///
414/// When configured with [`MailboxConfig::Bounded`], a bounded `mpsc` channel
415/// sits in front of the kameo actor, providing backpressure control at the
416/// dactor level while kameo's internal mailbox remains unbounded.
417pub struct KameoActorRef<A: Actor> {
418    id: ActorId,
419    name: String,
420    inner: kameo::actor::ActorRef<KameoDactorActor<A>>,
421    bounded_tx: Option<BoundedMailboxSender<DactorMsg<A>>>,
422    outbound_interceptors: Arc<Vec<Box<dyn OutboundInterceptor>>>,
423    drop_observer: Option<Arc<dyn DropObserver>>,
424    dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
425}
426
427use dactor::runtime_support::BoundedMailboxSender;
428
429impl<A: Actor> Clone for KameoActorRef<A> {
430    fn clone(&self) -> Self {
431        Self {
432            id: self.id.clone(),
433            name: self.name.clone(),
434            inner: self.inner.clone(),
435            bounded_tx: self.bounded_tx.clone(),
436            outbound_interceptors: self.outbound_interceptors.clone(),
437            drop_observer: self.drop_observer.clone(),
438            dead_letter_handler: self.dead_letter_handler.clone(),
439        }
440    }
441}
442
443impl<A: Actor> std::fmt::Debug for KameoActorRef<A> {
444    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
445        write!(f, "KameoActorRef({}, {:?})", self.name, self.id)
446    }
447}
448
449impl<A: Actor> KameoActorRef<A> {
450    fn outbound_pipeline(&self) -> OutboundPipeline {
451        OutboundPipeline {
452            interceptors: self.outbound_interceptors.clone(),
453            drop_observer: self.drop_observer.clone(),
454            target_id: self.id.clone(),
455            target_name: self.name.clone(),
456        }
457    }
458
459    fn notify_dead_letter(
460        &self,
461        message_type: &'static str,
462        send_mode: SendMode,
463        reason: DeadLetterReason,
464    ) {
465        if let Some(ref handler) = *self.dead_letter_handler {
466            let event = DeadLetterEvent {
467                target_id: self.id.clone(),
468                target_name: Some(self.name.clone()),
469                message_type,
470                send_mode,
471                reason,
472                message: None,
473            };
474            let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
475                handler.on_dead_letter(event);
476            }));
477        }
478    }
479
480    /// Send a dispatch envelope through the bounded channel (if configured)
481    /// or directly to the kameo actor.
482    fn send_dispatch(&self, dispatch: Box<dyn Dispatch<A>>) -> Result<(), ActorSendError> {
483        if let Some(ref btx) = self.bounded_tx {
484            btx.try_send(DactorMsg(dispatch))
485        } else {
486            self.inner
487                .tell(DactorMsg(dispatch))
488                .try_send()
489                .map_err(|e| ActorSendError(e.to_string()))
490        }
491    }
492}
493
494impl<A: Actor + 'static> ActorRef<A> for KameoActorRef<A> {
495    fn id(&self) -> ActorId {
496        self.id.clone()
497    }
498
499    fn name(&self) -> String {
500        self.name.clone()
501    }
502
503    fn is_alive(&self) -> bool {
504        if let Some(ref btx) = self.bounded_tx {
505            !btx.is_closed() && self.inner.is_alive()
506        } else {
507            self.inner.is_alive()
508        }
509    }
510
511    fn pending_messages(&self) -> usize {
512        if let Some(ref btx) = self.bounded_tx {
513            btx.pending()
514        } else {
515            0
516        }
517    }
518
519    fn stop(&self) {
520        self.inner.kill();
521    }
522
523    fn tell<M>(&self, msg: M) -> Result<(), ActorSendError>
524    where
525        A: Handler<M>,
526        M: Message<Reply = ()>,
527    {
528        let pipeline = self.outbound_pipeline();
529        let result = pipeline.run_on_send(SendMode::Tell, &msg);
530        match result.disposition {
531            Disposition::Continue => {}
532            Disposition::Drop | Disposition::Reject(_) | Disposition::Retry(_) => return Ok(()),
533            Disposition::Delay(_) => {} // Not supported in sync tell
534        }
535
536        let dispatch: Box<dyn Dispatch<A>> = Box::new(TypedDispatch { msg });
537        self.send_dispatch(dispatch).map_err(|e| {
538                let reason = if e.0.contains("full") {
539                    DeadLetterReason::MailboxFull
540                } else {
541                    DeadLetterReason::ActorStopped
542                };
543                self.notify_dead_letter(
544                    std::any::type_name::<M>(),
545                    SendMode::Tell,
546                    reason,
547                );
548                ActorSendError(e.to_string())
549            })
550    }
551
552    fn ask<M>(
553        &self,
554        msg: M,
555        cancel: Option<CancellationToken>,
556    ) -> Result<AskReply<M::Reply>, ActorSendError>
557    where
558        A: Handler<M>,
559        M: Message,
560    {
561        let pipeline = self.outbound_pipeline();
562        let result = pipeline.run_on_send(SendMode::Ask, &msg);
563        match result.disposition {
564            Disposition::Continue => {}
565            Disposition::Delay(_) => {} // Not supported in sync context
566            Disposition::Drop => {
567                let (tx, rx) = tokio::sync::oneshot::channel();
568                let _ = tx.send(Err(RuntimeError::ActorNotFound(
569                    "message dropped by outbound interceptor".into(),
570                )));
571                return Ok(AskReply::new(rx));
572            }
573            Disposition::Reject(reason) => {
574                let (tx, rx) = tokio::sync::oneshot::channel();
575                let _ = tx.send(Err(RuntimeError::Rejected {
576                    interceptor: result.interceptor_name.to_string(),
577                    reason,
578                }));
579                return Ok(AskReply::new(rx));
580            }
581            Disposition::Retry(retry_after) => {
582                let (tx, rx) = tokio::sync::oneshot::channel();
583                let _ = tx.send(Err(RuntimeError::RetryAfter {
584                    interceptor: result.interceptor_name.to_string(),
585                    retry_after,
586                }));
587                return Ok(AskReply::new(rx));
588            }
589        }
590
591        let (tx, rx) = tokio::sync::oneshot::channel();
592        let dispatch: Box<dyn Dispatch<A>> = Box::new(AskDispatch {
593            msg,
594            reply_tx: tx,
595            cancel,
596        });
597        self.send_dispatch(dispatch).map_err(|e| {
598                let reason = if e.0.contains("full") {
599                    DeadLetterReason::MailboxFull
600                } else {
601                    DeadLetterReason::ActorStopped
602                };
603                self.notify_dead_letter(
604                    std::any::type_name::<M>(),
605                    SendMode::Ask,
606                    reason,
607                );
608                ActorSendError(e.to_string())
609            })?;
610        Ok(AskReply::new(rx))
611    }
612
613    fn expand<M, OutputItem>(
614        &self,
615        msg: M,
616        buffer: usize,
617        batch_config: Option<BatchConfig>,
618        cancel: Option<CancellationToken>,
619    ) -> Result<BoxStream<OutputItem>, ActorSendError>
620    where
621        A: ExpandHandler<M, OutputItem>,
622        M: Send + 'static,
623        OutputItem: Send + 'static,
624    {
625        let pipeline = self.outbound_pipeline();
626        let result = pipeline.run_on_send(SendMode::Expand, &msg);
627        match result.disposition {
628            Disposition::Continue => {}
629            Disposition::Delay(_) => {}
630            Disposition::Drop => {
631                return Err(ActorSendError(
632                    "stream dropped by outbound interceptor".into(),
633                ));
634            }
635            Disposition::Reject(reason) => {
636                return Err(ActorSendError(format!("stream rejected: {}", reason)));
637            }
638            Disposition::Retry(_) => {
639                return Err(ActorSendError(
640                    "stream retry requested by interceptor".into(),
641                ));
642            }
643        }
644
645        let buffer = buffer.max(1);
646        let (tx, rx) = tokio::sync::mpsc::channel(buffer);
647        let sender = StreamSender::new(tx);
648        let dispatch: Box<dyn Dispatch<A>> = Box::new(ExpandDispatch {
649            msg,
650            sender,
651            cancel,
652        });
653        self.send_dispatch(dispatch)?;
654
655        match batch_config {
656            Some(batch_config) => {
657                let (batch_tx, batch_rx) = tokio::sync::mpsc::channel::<Vec<OutputItem>>(buffer);
658                let reader = BatchReader::new(batch_rx);
659                let batch_delay = batch_config.max_delay;
660                tokio::spawn(async move {
661                    let mut writer = BatchWriter::new(batch_tx, batch_config);
662                    let mut rx = rx;
663                    loop {
664                        if writer.buffered_count() > 0 {
665                            let deadline = tokio::time::Instant::now() + batch_delay;
666                            tokio::select! {
667                                biased;
668                                item = rx.recv() => match item {
669                                    Some(item) => {
670                                        if writer.push(item).await.is_err() { break; }
671                                    }
672                                    None => break,
673                                },
674                                _ = tokio::time::sleep_until(deadline) => {
675                                    if writer.check_deadline().await.is_err() { break; }
676                                }
677                            }
678                        } else {
679                            match rx.recv().await {
680                                Some(item) => {
681                                    if writer.push(item).await.is_err() {
682                                        break;
683                                    }
684                                }
685                                None => break,
686                            }
687                        }
688                    }
689                    let _ = writer.flush().await;
690                });
691                Ok(wrap_batched_stream_with_interception(
692                    reader,
693                    buffer,
694                    pipeline,
695                    std::any::type_name::<M>(),
696                    SendMode::Expand,
697                ))
698            }
699            None => Ok(wrap_stream_with_interception(
700                    rx,
701                    buffer,
702                    pipeline,
703                    std::any::type_name::<M>(),
704                    SendMode::Expand,
705                )),
706        }
707    }
708
709    fn reduce<InputItem, Reply>(
710        &self,
711        input: BoxStream<InputItem>,
712        buffer: usize,
713        batch_config: Option<BatchConfig>,
714        cancel: Option<CancellationToken>,
715    ) -> Result<AskReply<Reply>, ActorSendError>
716    where
717        A: ReduceHandler<InputItem, Reply>,
718        InputItem: Send + 'static,
719        Reply: Send + 'static,
720    {
721        let buffer = buffer.max(1);
722        let (item_tx, item_rx) = tokio::sync::mpsc::channel(buffer);
723        let receiver = StreamReceiver::new(item_rx);
724        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
725        let dispatch: Box<dyn Dispatch<A>> = Box::new(ReduceDispatch {
726            receiver,
727            reply_tx,
728            cancel: cancel.clone(),
729        });
730        self.send_dispatch(dispatch)?;
731
732        let pipeline = self.outbound_pipeline();
733        match batch_config {
734            Some(batch_config) => {
735                spawn_reduce_batched_drain(
736                    input,
737                    item_tx,
738                    buffer,
739                    batch_config,
740                    cancel,
741                    pipeline,
742                    std::any::type_name::<InputItem>(),
743                );
744            }
745            None => {
746                spawn_reduce_drain(
747                    input,
748                    item_tx,
749                    cancel,
750                    pipeline,
751                    std::any::type_name::<InputItem>(),
752                );
753            }
754        }
755
756        Ok(AskReply::new(reply_rx))
757    }
758
759    fn transform<InputItem, OutputItem>(
760        &self,
761        input: BoxStream<InputItem>,
762        buffer: usize,
763        batch_config: Option<BatchConfig>,
764        cancel: Option<CancellationToken>,
765    ) -> Result<BoxStream<OutputItem>, ActorSendError>
766    where
767        A: TransformHandler<InputItem, OutputItem>,
768        InputItem: Send + 'static,
769        OutputItem: Send + 'static,
770    {
771        let buffer = buffer.max(1);
772        let (item_tx, item_rx) = tokio::sync::mpsc::channel(buffer);
773        let (output_tx, mut output_rx) = tokio::sync::mpsc::channel(buffer);
774        let receiver = StreamReceiver::new(item_rx);
775        let sender = StreamSender::new(output_tx);
776        let dispatch: Box<dyn Dispatch<A>> = Box::new(TransformDispatch::new(
777            receiver,
778            sender,
779            cancel.clone(),
780        ));
781        self.send_dispatch(dispatch)?;
782
783        let pipeline = self.outbound_pipeline();
784        spawn_transform_drain(
785            input,
786            item_tx,
787            cancel,
788            pipeline.clone(),
789            std::any::type_name::<InputItem>(),
790        );
791
792        match batch_config {
793            Some(batch_config) => {
794                let (batch_tx, batch_rx) =
795                    tokio::sync::mpsc::channel::<Vec<OutputItem>>(buffer);
796                let reader = BatchReader::new(batch_rx);
797                let batch_delay = batch_config.max_delay;
798                tokio::spawn(async move {
799                    let mut writer = BatchWriter::new(batch_tx, batch_config);
800                    loop {
801                        if writer.buffered_count() > 0 {
802                            let deadline = tokio::time::Instant::now() + batch_delay;
803                            tokio::select! {
804                                biased;
805                                item = output_rx.recv() => match item {
806                                    Some(item) => {
807                                        if writer.push(item).await.is_err() { break; }
808                                    }
809                                    None => break,
810                                },
811                                _ = tokio::time::sleep_until(deadline) => {
812                                    if writer.check_deadline().await.is_err() { break; }
813                                }
814                            }
815                        } else {
816                            match output_rx.recv().await {
817                                Some(item) => {
818                                    if writer.push(item).await.is_err() {
819                                        break;
820                                    }
821                                }
822                                None => break,
823                            }
824                        }
825                    }
826                    let _ = writer.flush().await;
827                });
828                Ok(wrap_batched_stream_with_interception(
829                    reader,
830                    buffer,
831                    pipeline,
832                    std::any::type_name::<OutputItem>(),
833                    SendMode::Transform,
834                ))
835            }
836            None => Ok(wrap_stream_with_interception(
837                output_rx,
838                buffer,
839                pipeline,
840                std::any::type_name::<OutputItem>(),
841                SendMode::Transform,
842            )),
843        }
844    }
845}
846
847// ---------------------------------------------------------------------------
848// SpawnOptions
849// ---------------------------------------------------------------------------
850
851/// Options for spawning an actor, including the inbound interceptor pipeline.
852/// Use `..Default::default()` to future-proof against new fields.
853pub struct SpawnOptions {
854    /// Inbound interceptors to attach to the actor.
855    pub interceptors: Vec<Box<dyn InboundInterceptor>>,
856    /// Mailbox capacity configuration.
857    ///
858    /// When [`Bounded`](MailboxConfig::Bounded), a bounded `mpsc` channel is
859    /// placed in front of the kameo actor to enforce backpressure.
860    pub mailbox: MailboxConfig,
861}
862
863impl Default for SpawnOptions {
864    fn default() -> Self {
865        Self {
866            interceptors: Vec::new(),
867            mailbox: MailboxConfig::Unbounded,
868        }
869    }
870}
871
872// ---------------------------------------------------------------------------
873// KameoRuntime
874// ---------------------------------------------------------------------------
875
876/// A dactor v0.2 runtime backed by kameo.
877///
878/// Actors are spawned as real kameo actors via kameo's `Spawn` trait.
879/// Messages are delivered through kameo's mailbox as type-erased dispatch
880/// envelopes, supporting multiple `Handler<M>` impls per actor.
881///
882/// Unlike the ractor adapter, kameo's `Spawn::spawn()` is synchronous
883/// (it calls `tokio::spawn` internally), so `spawn()` does not require a
884/// sync→async bridge thread.
885/// A dactor v0.2 runtime backed by kameo.
886///
887/// Actors are spawned as real kameo actors via kameo's `Spawn` trait.
888/// Messages are delivered through kameo's mailbox as type-erased dispatch
889/// envelopes, supporting multiple `Handler<M>` impls per actor.
890///
891/// ## System Actors
892///
893/// The runtime spawns native kameo system actors on creation:
894/// - [`SpawnManagerActor`](crate::system_actors::SpawnManagerActor) — handles remote spawn requests
895/// - [`WatchManagerActor`](crate::system_actors::WatchManagerActor) — handles remote watch/unwatch
896/// - [`CancelManagerActor`](crate::system_actors::CancelManagerActor) — handles remote cancellation
897/// - [`NodeDirectoryActor`](crate::system_actors::NodeDirectoryActor) — tracks peer connections
898///
899/// System actors are accessible via `system_actor_refs()` for transport routing.
900pub struct KameoRuntime {
901    node_id: NodeId,
902    next_local: Arc<AtomicU64>,
903    cluster_events: KameoClusterEvents,
904    outbound_interceptors: Arc<Vec<Box<dyn OutboundInterceptor>>>,
905    drop_observer: Option<Arc<dyn DropObserver>>,
906    dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
907    watchers: WatcherMap,
908    /// Plain struct system actors (backward-compatible sync API).
909    spawn_manager: SpawnManager,
910    watch_manager: WatchManager,
911    cancel_manager: CancelManager,
912    node_directory: NodeDirectory,
913    /// Native kameo system actor refs (lazily started via `start_system_actors()`).
914    system_actors: Option<KameoSystemActorRefs>,
915    /// Stop notification receivers for await_stop(), keyed by ActorId.
916    #[allow(clippy::type_complexity)]
917    stop_receivers: Arc<Mutex<HashMap<ActorId, tokio::sync::oneshot::Receiver<Result<(), String>>>>>,
918    /// Application version for this node (informational, used in handshake).
919    app_version: Option<String>,
920}
921
922/// References to the native kameo system actors spawned by the runtime.
923pub struct KameoSystemActorRefs {
924    pub spawn_managers: Vec<kameo::actor::ActorRef<crate::system_actors::SpawnManagerActor>>,
925    spawn_manager_counter: std::sync::atomic::AtomicU64,
926    pub watch_manager: kameo::actor::ActorRef<crate::system_actors::WatchManagerActor>,
927    pub cancel_manager: kameo::actor::ActorRef<crate::system_actors::CancelManagerActor>,
928    pub node_directory: kameo::actor::ActorRef<crate::system_actors::NodeDirectoryActor>,
929}
930
931impl KameoSystemActorRefs {
932    /// Get the spawn manager ref (round-robin if pooled).
933    pub fn spawn_manager(&self) -> &kameo::actor::ActorRef<crate::system_actors::SpawnManagerActor> {
934        let idx = self.spawn_manager_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
935            as usize % self.spawn_managers.len();
936        &self.spawn_managers[idx]
937    }
938}
939
940impl KameoRuntime {
941    /// Create a new `KameoRuntime`.
942    ///
943    /// System actors are not spawned until `start_system_actors()` is called.
944    /// This allows the runtime to be constructed outside a tokio context.
945    pub fn new() -> Self {
946        Self::create(NodeId("kameo-node".into()))
947    }
948
949    /// Create a new `KameoRuntime` with a specific node ID.
950    pub fn with_node_id(node_id: NodeId) -> Self {
951        Self::create(node_id)
952    }
953
954    fn create(node_id: NodeId) -> Self {
955        Self {
956            node_id,
957            next_local: Arc::new(AtomicU64::new(1)),
958            cluster_events: KameoClusterEvents::new(),
959            outbound_interceptors: Arc::new(Vec::new()),
960            drop_observer: None,
961            dead_letter_handler: Arc::new(None),
962            watchers: Arc::new(Mutex::new(HashMap::new())),
963            spawn_manager: SpawnManager::new(TypeRegistry::new()),
964            watch_manager: WatchManager::new(),
965            cancel_manager: CancelManager::new(),
966            node_directory: NodeDirectory::new(),
967            system_actors: None,
968            stop_receivers: Arc::new(Mutex::new(HashMap::new())),
969            app_version: None,
970        }
971    }
972
973    /// The adapter name for this runtime, used in version handshakes.
974    pub const ADAPTER_NAME: &'static str = "kameo";
975
976    /// Set the application version for this node.
977    ///
978    /// This is your application's release version (e.g., "2.3.1"), not the
979    /// dactor framework version. It is included in handshake requests for
980    /// operational visibility during rolling upgrades.
981    pub fn with_app_version(mut self, version: impl Into<String>) -> Self {
982        self.app_version = Some(version.into());
983        self
984    }
985
986    /// Returns the configured application version, if any.
987    pub fn app_version(&self) -> Option<&str> {
988        self.app_version.as_deref()
989    }
990
991    /// Build a [`HandshakeRequest`](dactor::HandshakeRequest) from this
992    /// runtime's current configuration.
993    pub fn handshake_request(&self) -> dactor::HandshakeRequest {
994        dactor::HandshakeRequest::from_runtime(
995            self.node_id.clone(),
996            self.app_version.clone(),
997            Self::ADAPTER_NAME,
998        )
999    }
1000
1001    /// Spawn native kameo system actors for transport routing.
1002    ///
1003    /// Must be called from within a tokio runtime context. After this call,
1004    /// `system_actor_refs()` returns the native actor references.
1005    ///
1006    /// Factory registrations made via `register_factory()` before this call
1007    /// are forwarded to the native SpawnManagerActor.
1008    ///
1009    /// Uses default configuration (unbounded mailboxes, no pooling).
1010    /// For custom configuration, use [`start_system_actors_with_config()`].
1011    pub fn start_system_actors(&mut self) {
1012        self.start_system_actors_with_config(dactor::SystemActorConfig::default());
1013    }
1014
1015    /// Spawn native kameo system actors with custom configuration.
1016    ///
1017    /// Allows configuring mailbox capacity and SpawnManager pooling for
1018    /// high-throughput scenarios. See [`SystemActorConfig`](dactor::SystemActorConfig)
1019    /// for details.
1020    pub fn start_system_actors_with_config(&mut self, config: dactor::SystemActorConfig) {
1021        use crate::system_actors::*;
1022        use kameo::actor::Spawn;
1023
1024        let pool_size = config.spawn_manager_pool_size.unwrap_or(1).max(1);
1025        let mut spawn_refs = Vec::with_capacity(pool_size);
1026
1027        for _ in 0..pool_size {
1028            let spawn_mgr_ref = SpawnManagerActor::spawn_with_mailbox(
1029                (self.node_id.clone(), TypeRegistry::new(), self.next_local.clone()),
1030                kameo::mailbox::unbounded(),
1031            );
1032            spawn_refs.push(spawn_mgr_ref);
1033        }
1034
1035        let watch_mgr_ref =
1036            WatchManagerActor::spawn_with_mailbox((), kameo::mailbox::unbounded());
1037        let cancel_mgr_ref =
1038            CancelManagerActor::spawn_with_mailbox((), kameo::mailbox::unbounded());
1039        let node_dir_ref =
1040            NodeDirectoryActor::spawn_with_mailbox((), kameo::mailbox::unbounded());
1041
1042        self.system_actors = Some(KameoSystemActorRefs {
1043            spawn_managers: spawn_refs,
1044            spawn_manager_counter: std::sync::atomic::AtomicU64::new(0),
1045            watch_manager: watch_mgr_ref,
1046            cancel_manager: cancel_mgr_ref,
1047            node_directory: node_dir_ref,
1048        });
1049    }
1050
1051    /// Returns the node ID of this runtime.
1052    pub fn node_id(&self) -> &NodeId {
1053        &self.node_id
1054    }
1055
1056    /// Access the native system actor references for transport routing.
1057    ///
1058    /// Returns `None` if `start_system_actors()` has not been called yet.
1059    pub fn system_actor_refs(&self) -> Option<&KameoSystemActorRefs> {
1060        self.system_actors.as_ref()
1061    }
1062
1063    /// Add a global outbound interceptor.
1064    ///
1065    /// **Must be called before any actors are spawned.** Panics if actors
1066    /// already hold references to the interceptor list (i.e., after `spawn()`).
1067    pub fn add_outbound_interceptor(&mut self, interceptor: Box<dyn OutboundInterceptor>) {
1068        Arc::get_mut(&mut self.outbound_interceptors)
1069            .expect("cannot add interceptors after actors are spawned")
1070            .push(interceptor);
1071    }
1072
1073    /// Set the drop observer for outbound interceptor pipelines.
1074    ///
1075    /// **Must be called before any actors are spawned.**
1076    pub fn set_drop_observer(&mut self, observer: Arc<dyn DropObserver>) {
1077        self.drop_observer = Some(observer);
1078    }
1079
1080    /// Set a global dead letter handler. Called whenever a message cannot be
1081    /// delivered (actor stopped, mailbox full, dropped by inbound interceptor).
1082    pub fn set_dead_letter_handler(&mut self, handler: Arc<dyn DeadLetterHandler>) {
1083        self.dead_letter_handler = Arc::new(Some(handler));
1084    }
1085
1086    /// Access the cluster events subsystem.
1087    pub fn cluster_events_handle(&self) -> &KameoClusterEvents {
1088        &self.cluster_events
1089    }
1090
1091    /// Access the cluster events subsystem.
1092    pub fn cluster_events(&self) -> &KameoClusterEvents {
1093        &self.cluster_events
1094    }
1095
1096    /// Spawn an actor with `Deps = ()`.
1097    pub async fn spawn<A>(&self, name: &str, args: A::Args) -> Result<KameoActorRef<A>, dactor::errors::RuntimeError>
1098    where
1099        A: Actor<Deps = ()> + 'static,
1100    {
1101        Ok(self.spawn_internal::<A>(name, args, (), Vec::new(), MailboxConfig::Unbounded))
1102    }
1103
1104    /// Spawn an actor with explicit dependencies.
1105    pub async fn spawn_with_deps<A>(&self, name: &str, args: A::Args, deps: A::Deps) -> Result<KameoActorRef<A>, dactor::errors::RuntimeError>
1106    where
1107        A: Actor + 'static,
1108    {
1109        Ok(self.spawn_internal::<A>(name, args, deps, Vec::new(), MailboxConfig::Unbounded))
1110    }
1111
1112    /// Spawn an actor with spawn options (including inbound interceptors and mailbox config).
1113    pub async fn spawn_with_options<A>(
1114        &self,
1115        name: &str,
1116        args: A::Args,
1117        options: SpawnOptions,
1118    ) -> Result<KameoActorRef<A>, dactor::errors::RuntimeError>
1119    where
1120        A: Actor<Deps = ()> + 'static,
1121    {
1122        Ok(self.spawn_internal::<A>(name, args, (), options.interceptors, options.mailbox))
1123    }
1124
1125    fn spawn_internal<A>(
1126        &self,
1127        name: &str,
1128        args: A::Args,
1129        deps: A::Deps,
1130        interceptors: Vec<Box<dyn InboundInterceptor>>,
1131        mailbox: MailboxConfig,
1132    ) -> KameoActorRef<A>
1133    where
1134        A: Actor + 'static,
1135    {
1136        use kameo::actor::Spawn;
1137
1138        let local = self.next_local.fetch_add(1, Ordering::SeqCst);
1139        let actor_id = ActorId {
1140            node: self.node_id.clone(),
1141            local,
1142        };
1143        let actor_name = name.to_string();
1144
1145        let (stop_tx, stop_rx) = tokio::sync::oneshot::channel();
1146
1147        let spawn_args = KameoSpawnArgs {
1148            args,
1149            deps,
1150            actor_id: actor_id.clone(),
1151            actor_name: actor_name.clone(),
1152            interceptors,
1153            watchers: self.watchers.clone(),
1154            dead_letter_handler: self.dead_letter_handler.clone(),
1155            stop_notifier: Some(stop_tx),
1156        };
1157
1158        // kameo's Spawn::spawn_with_mailbox is synchronous (internally calls tokio::spawn)
1159        let actor_ref =
1160            KameoDactorActor::<A>::spawn_with_mailbox(spawn_args, kameo::mailbox::unbounded());
1161
1162        // Set up optional bounded mailbox channel
1163        let bounded_tx = match mailbox {
1164            MailboxConfig::Bounded { capacity, overflow } => {
1165                let (btx, mut brx) = tokio::sync::mpsc::channel::<DactorMsg<A>>(capacity);
1166                let fwd_ref = actor_ref.clone();
1167                tokio::spawn(async move {
1168                    while let Some(msg) = brx.recv().await {
1169                        if fwd_ref.tell(msg).try_send().is_err() {
1170                            break;
1171                        }
1172                    }
1173                });
1174                Some(BoundedMailboxSender::new(btx, overflow))
1175            }
1176            MailboxConfig::Unbounded => None,
1177        };
1178
1179        // Store stop receiver for await_stop()
1180        self.stop_receivers.lock().unwrap().insert(actor_id.clone(), stop_rx);
1181
1182        KameoActorRef {
1183            id: actor_id,
1184            name: actor_name,
1185            inner: actor_ref,
1186            bounded_tx,
1187            outbound_interceptors: self.outbound_interceptors.clone(),
1188            drop_observer: self.drop_observer.clone(),
1189            dead_letter_handler: self.dead_letter_handler.clone(),
1190        }
1191    }
1192    /// Register actor `watcher` to be notified when `target_id` terminates.
1193    ///
1194    /// The watcher must implement `Handler<ChildTerminated>`. When the target
1195    /// stops, the runtime delivers a [`ChildTerminated`] message to the watcher.
1196    pub fn watch<W>(&self, watcher: &KameoActorRef<W>, target_id: ActorId)
1197    where
1198        W: Actor + Handler<ChildTerminated> + 'static,
1199    {
1200        let watcher_id = watcher.id();
1201        let watcher_inner = watcher.inner.clone();
1202
1203        let entry = WatchEntry {
1204            watcher_id,
1205            notify: Box::new(move |msg: ChildTerminated| {
1206                let dispatch: Box<dyn Dispatch<W>> = Box::new(TypedDispatch { msg });
1207                if watcher_inner.tell(DactorMsg(dispatch)).try_send().is_err() {
1208                    tracing::debug!("watch notification dropped — watcher may have stopped");
1209                }
1210            }),
1211        };
1212
1213        let mut watchers = self.watchers.lock().unwrap();
1214        watchers.entry(target_id).or_default().push(entry);
1215    }
1216
1217    /// Unregister `watcher_id` from notifications about `target_id`.
1218    pub fn unwatch(&self, watcher_id: &ActorId, target_id: &ActorId) {
1219        let mut watchers = self.watchers.lock().unwrap();
1220        if let Some(entries) = watchers.get_mut(target_id) {
1221            entries.retain(|e| &e.watcher_id != watcher_id);
1222            if entries.is_empty() {
1223                watchers.remove(target_id);
1224            }
1225        }
1226    }
1227
1228    // -----------------------------------------------------------------------
1229    // SA5: SpawnManager wiring
1230    // -----------------------------------------------------------------------
1231
1232    /// Access the spawn manager.
1233    pub fn spawn_manager(&self) -> &SpawnManager {
1234        &self.spawn_manager
1235    }
1236
1237    /// Access the spawn manager mutably (for registering actor factories).
1238    pub fn spawn_manager_mut(&mut self) -> &mut SpawnManager {
1239        &mut self.spawn_manager
1240    }
1241
1242    /// Register an actor type for remote spawning on this node.
1243    ///
1244    /// The factory closure deserializes actor `Args` from bytes and returns
1245    /// the constructed actor as `Box<dyn Any + Send>`. The runtime is
1246    /// responsible for actually spawning the returned actor.
1247    pub fn register_factory(
1248        &mut self,
1249        type_name: impl Into<String>,
1250        factory: impl Fn(&[u8]) -> Result<Box<dyn std::any::Any + Send>, dactor::remote::SerializationError>
1251            + Send
1252            + Sync
1253            + 'static,
1254    ) {
1255        self.spawn_manager
1256            .type_registry_mut()
1257            .register_factory(type_name, factory);
1258    }
1259
1260    /// Process a remote spawn request.
1261    ///
1262    /// Looks up the actor type in the registry, deserializes Args from bytes,
1263    /// and returns the constructed actor along with its assigned [`ActorId`].
1264    ///
1265    /// The returned `ActorId` is pre-assigned for the remote spawn flow where
1266    /// the runtime controls ID assignment. The caller must use this ID when
1267    /// registering the spawned actor (not via the regular `spawn()` path,
1268    /// which assigns its own IDs).
1269    ///
1270    /// **Note:** Currently uses the simple factory API (`TypeRegistry::create_actor`).
1271    /// For actors with non-trivial `Deps`, use `spawn_manager_mut()` to access
1272    /// `TypeRegistry::create_actor_with_deps()` directly.
1273    ///
1274    /// Returns `Ok((actor_id, actor))` on success, or `Err(SpawnResponse::Failure)`
1275    /// if the type is not found or deserialization fails.
1276    pub fn handle_spawn_request(
1277        &mut self,
1278        request: &SpawnRequest,
1279    ) -> Result<(ActorId, Box<dyn std::any::Any + Send>), SpawnResponse> {
1280        match self.spawn_manager.create_actor(request) {
1281            Ok(actor) => {
1282                let local = self.next_local.fetch_add(1, Ordering::SeqCst);
1283                let actor_id = ActorId {
1284                    node: self.node_id.clone(),
1285                    local,
1286                };
1287                self.spawn_manager.record_spawn(actor_id.clone());
1288                Ok((actor_id, actor))
1289            }
1290            Err(e) => Err(SpawnResponse::Failure {
1291                request_id: request.request_id.clone(),
1292                error: e.to_string(),
1293            }),
1294        }
1295    }
1296
1297    // -----------------------------------------------------------------------
1298    // SA6: WatchManager wiring
1299    // -----------------------------------------------------------------------
1300
1301    /// Access the watch manager (for remote watch subscriptions).
1302    pub fn watch_manager(&self) -> &WatchManager {
1303        &self.watch_manager
1304    }
1305
1306    /// Access the watch manager mutably.
1307    pub fn watch_manager_mut(&mut self) -> &mut WatchManager {
1308        &mut self.watch_manager
1309    }
1310
1311    /// Register a remote watch: a remote watcher wants to know when a local
1312    /// actor terminates.
1313    pub fn remote_watch(&mut self, target: ActorId, watcher: ActorId) {
1314        self.watch_manager.watch(target, watcher);
1315    }
1316
1317    /// Remove a remote watch subscription.
1318    pub fn remote_unwatch(&mut self, target: &ActorId, watcher: &ActorId) {
1319        self.watch_manager.unwatch(target, watcher);
1320    }
1321
1322    /// Called when a local actor terminates. Returns notifications for all
1323    /// remote watchers that should be sent to their respective nodes.
1324    ///
1325    /// **Note:** This must be called explicitly by the integration layer.
1326    /// It is not yet automatically wired into kameo's actor stop lifecycle.
1327    pub fn notify_terminated(&mut self, terminated: &ActorId) -> Vec<WatchNotification> {
1328        self.watch_manager.on_terminated(terminated)
1329    }
1330
1331    // -----------------------------------------------------------------------
1332    // SA7: CancelManager wiring
1333    // -----------------------------------------------------------------------
1334
1335    /// Access the cancel manager.
1336    pub fn cancel_manager(&self) -> &CancelManager {
1337        &self.cancel_manager
1338    }
1339
1340    /// Access the cancel manager mutably.
1341    pub fn cancel_manager_mut(&mut self) -> &mut CancelManager {
1342        &mut self.cancel_manager
1343    }
1344
1345    /// Register a cancellation token for a request (for remote cancel support).
1346    pub fn register_cancel(&mut self, request_id: String, token: CancellationToken) {
1347        self.cancel_manager.register(request_id, token);
1348    }
1349
1350    /// Process a remote cancellation request.
1351    pub fn cancel_request(&mut self, request_id: &str) -> CancelResponse {
1352        self.cancel_manager.cancel(request_id)
1353    }
1354
1355    /// Clean up a cancellation token after its request completes normally.
1356    ///
1357    /// Should be called when a remote ask/stream/feed completes successfully
1358    /// to prevent stale tokens from accumulating.
1359    pub fn complete_request(&mut self, request_id: &str) {
1360        self.cancel_manager.remove(request_id);
1361    }
1362
1363    // -----------------------------------------------------------------------
1364    // SA8: NodeDirectory wiring
1365    // -----------------------------------------------------------------------
1366
1367    /// Access the node directory.
1368    pub fn node_directory(&self) -> &NodeDirectory {
1369        &self.node_directory
1370    }
1371
1372    /// Access the node directory mutably.
1373    pub fn node_directory_mut(&mut self) -> &mut NodeDirectory {
1374        &mut self.node_directory
1375    }
1376
1377    /// Register a peer node as connected in the directory.
1378    ///
1379    /// **Post-validation only:** this method assumes the peer has already
1380    /// passed the version handshake. Callers should validate compatibility
1381    /// (via [`handshake_request`](Self::handshake_request) +
1382    /// [`validate_handshake`](dactor::validate_handshake) +
1383    /// [`verify_peer_identity`](dactor::verify_peer_identity)) before calling
1384    /// this method. Direct calls bypass compatibility checks.
1385    ///
1386    /// If the peer already exists, updates its status to `Connected` and
1387    /// preserves the existing address when `address` is `None`.
1388    /// Emits a `ClusterEvent::NodeJoined` if the peer was not previously connected.
1389    pub fn connect_peer(&mut self, peer_id: NodeId, address: Option<String>) {
1390        let was_connected = self.node_directory.is_connected(&peer_id);
1391        if let Some(existing) = self.node_directory.get_peer(&peer_id) {
1392            let resolved_address = address.or_else(|| existing.address.clone());
1393            self.node_directory.remove_peer(&peer_id);
1394            self.node_directory.add_peer(peer_id.clone(), resolved_address);
1395        } else {
1396            self.node_directory.add_peer(peer_id.clone(), address);
1397        }
1398        self.node_directory.set_status(&peer_id, PeerStatus::Connected);
1399        if !was_connected {
1400            self.cluster_events.emit(dactor::ClusterEvent::NodeJoined(peer_id));
1401        }
1402    }
1403
1404    /// Mark a peer as disconnected.
1405    ///
1406    /// Emits a `ClusterEvent::NodeLeft` if the peer was previously connected.
1407    pub fn disconnect_peer(&mut self, peer_id: &NodeId) {
1408        let was_connected = self.node_directory.is_connected(peer_id);
1409        self.node_directory.set_status(peer_id, PeerStatus::Disconnected);
1410        if was_connected {
1411            self.cluster_events.emit(dactor::ClusterEvent::NodeLeft(peer_id.clone()));
1412        }
1413    }
1414
1415    /// Check if a peer node is connected.
1416    pub fn is_peer_connected(&self, peer_id: &NodeId) -> bool {
1417        self.node_directory.is_connected(peer_id)
1418    }
1419
1420    // -----------------------------------------------------------------------
1421    // JH1-JH2: Actor lifecycle handles
1422    // -----------------------------------------------------------------------
1423
1424    /// Wait for an actor to stop.
1425    ///
1426    /// Returns `Ok(())` when the actor finishes cleanly, or `Err` if the
1427    /// actor panicked in `on_stop`. The stop receiver is consumed and removed
1428    /// from the map.
1429    ///
1430    /// Returns `Ok(())` immediately if no stop receiver is stored for this ID.
1431    pub async fn await_stop(&self, actor_id: &ActorId) -> Result<(), String> {
1432        let rx = {
1433            let mut receivers = self.stop_receivers.lock().unwrap();
1434            receivers.remove(actor_id)
1435        };
1436        match rx {
1437            Some(rx) => rx
1438                .await
1439                .map_err(|_| "stop notifier dropped".to_string())
1440                .and_then(|r| r),
1441            None => Ok(()),
1442        }
1443    }
1444
1445    /// Wait for all spawned actors to stop.
1446    ///
1447    /// Drains all stored stop receivers and awaits them all. Returns the first
1448    /// error encountered, but always waits for every actor to finish.
1449    pub async fn await_all(&self) -> Result<(), String> {
1450        let receivers: Vec<_> = {
1451            let mut map = self.stop_receivers.lock().unwrap();
1452            map.drain().collect()
1453        };
1454        let mut first_error = None;
1455        for (_, rx) in receivers {
1456            let result = rx.await.map_err(|e| format!("stop notifier dropped: {e}")).and_then(|r| r);
1457            if let Err(e) = result {
1458                if first_error.is_none() {
1459                    first_error = Some(e);
1460                }
1461            }
1462        }
1463        match first_error {
1464            Some(e) => Err(e),
1465            None => Ok(()),
1466        }
1467    }
1468
1469    /// Remove completed stop receivers from the map.
1470    ///
1471    /// Call periodically to prevent stale entries from accumulating
1472    /// for actors that stopped without being awaited.
1473    pub fn cleanup_finished(&self) {
1474        let mut receivers = self.stop_receivers.lock().unwrap();
1475        receivers.retain(|_, rx| {
1476            matches!(rx.try_recv(), Err(tokio::sync::oneshot::error::TryRecvError::Empty))
1477        });
1478    }
1479
1480    /// Number of actors with stored stop receivers.
1481    ///
1482    /// Note: includes receivers for actors that have already stopped but
1483    /// haven't been awaited or cleaned up. Call `cleanup_finished()` first
1484    /// for an accurate count of running actors.
1485    pub fn active_handle_count(&self) -> usize {
1486        self.stop_receivers.lock().unwrap().len()
1487    }
1488}
1489
1490impl Default for KameoRuntime {
1491    fn default() -> Self {
1492        Self::new()
1493    }
1494}
1495
1496// ---------------------------------------------------------------------------
1497// NA10: SystemMessageRouter for kameo
1498// ---------------------------------------------------------------------------
1499
1500// NOTE: System messages routed here update the **native system actors** (the
1501// mailbox-based kameo actors spawned by `start_system_actors()`). The runtime
1502// also keeps plain struct system actors (`self.watch_manager`, etc.) for the
1503// backward-compatible sync API. This dual-state pattern is intentional — see
1504// progress.md "Dual struct+actor pattern" design decision.
1505
1506#[async_trait::async_trait]
1507impl dactor::system_router::SystemMessageRouter for KameoRuntime {
1508    async fn route_system_envelope(
1509        &self,
1510        envelope: dactor::remote::WireEnvelope,
1511    ) -> Result<dactor::system_router::RoutingOutcome, dactor::system_router::RoutingError> {
1512        use dactor::system_actors::*;
1513        use dactor::system_router::{RoutingError, RoutingOutcome};
1514
1515        dactor::system_router::validate_system_message_type(&envelope.message_type)?;
1516
1517        let refs = self
1518            .system_actors
1519            .as_ref()
1520            .ok_or_else(|| RoutingError::new("system actors not started"))?;
1521
1522        match envelope.message_type.as_str() {
1523            SYSTEM_MSG_TYPE_SPAWN => {
1524                let request = dactor::proto::decode_spawn_request(&envelope.body)
1525                    .map_err(|e| RoutingError::new(format!("decode SpawnRequest: {e}")))?;
1526
1527                let req_id = request.request_id.clone();
1528                let outcome = refs
1529                    .spawn_manager()
1530                    .ask(crate::system_actors::HandleSpawnRequest(request))
1531                    .await
1532                    .map_err(|e| RoutingError::new(format!("SpawnManager ask: {e}")))?;
1533
1534                match outcome {
1535                    crate::system_actors::SpawnOutcome::Success { actor_id, .. } => {
1536                        Ok(RoutingOutcome::SpawnCompleted {
1537                            request_id: req_id,
1538                            actor_id,
1539                        })
1540                    }
1541                    crate::system_actors::SpawnOutcome::Failure(SpawnResponse::Failure {
1542                        request_id,
1543                        error,
1544                    }) => Ok(RoutingOutcome::SpawnFailed { request_id, error }),
1545                    crate::system_actors::SpawnOutcome::Failure(SpawnResponse::Success {
1546                        ..
1547                    }) => {
1548                        unreachable!("SpawnOutcome::Failure always wraps SpawnResponse::Failure")
1549                    }
1550                }
1551            }
1552
1553            SYSTEM_MSG_TYPE_WATCH => {
1554                let request = dactor::proto::decode_watch_request(&envelope.body)
1555                    .map_err(|e| RoutingError::new(format!("decode WatchRequest: {e}")))?;
1556
1557                refs.watch_manager
1558                    .tell(crate::system_actors::RemoteWatch {
1559                        target: request.target,
1560                        watcher: request.watcher,
1561                    })
1562                    .await
1563                    .map_err(|e| RoutingError::new(format!("WatchManager tell: {e}")))?;
1564
1565                Ok(RoutingOutcome::Acknowledged)
1566            }
1567
1568            SYSTEM_MSG_TYPE_UNWATCH => {
1569                let request = dactor::proto::decode_unwatch_request(&envelope.body)
1570                    .map_err(|e| RoutingError::new(format!("decode UnwatchRequest: {e}")))?;
1571
1572                refs.watch_manager
1573                    .tell(crate::system_actors::RemoteUnwatch {
1574                        target: request.target,
1575                        watcher: request.watcher,
1576                    })
1577                    .await
1578                    .map_err(|e| RoutingError::new(format!("WatchManager tell: {e}")))?;
1579
1580                Ok(RoutingOutcome::Acknowledged)
1581            }
1582
1583            SYSTEM_MSG_TYPE_CANCEL => {
1584                let request = dactor::proto::decode_cancel_request(&envelope.body)
1585                    .map_err(|e| RoutingError::new(format!("decode CancelRequest: {e}")))?;
1586
1587                let request_id = request
1588                    .request_id
1589                    .ok_or_else(|| RoutingError::new("CancelRequest missing request_id"))?;
1590
1591                let outcome = refs
1592                    .cancel_manager
1593                    .ask(crate::system_actors::CancelById(request_id))
1594                    .await
1595                    .map_err(|e| RoutingError::new(format!("CancelManager ask: {e}")))?;
1596
1597                match outcome.0 {
1598                    CancelResponse::Acknowledged => Ok(RoutingOutcome::CancelAcknowledged),
1599                    CancelResponse::NotFound { reason } => {
1600                        Ok(RoutingOutcome::CancelNotFound { reason })
1601                    }
1602                }
1603            }
1604
1605            SYSTEM_MSG_TYPE_CONNECT_PEER => {
1606                let (peer_id, address) = dactor::proto::decode_connect_peer(&envelope.body)
1607                    .map_err(|e| RoutingError::new(format!("decode ConnectPeer: {e}")))?;
1608
1609                refs.node_directory
1610                    .tell(crate::system_actors::ConnectPeer {
1611                        peer_id,
1612                        address,
1613                    })
1614                    .await
1615                    .map_err(|e| RoutingError::new(format!("NodeDirectory tell: {e}")))?;
1616
1617                Ok(RoutingOutcome::Acknowledged)
1618            }
1619
1620            SYSTEM_MSG_TYPE_DISCONNECT_PEER => {
1621                let peer_id = dactor::proto::decode_disconnect_peer(&envelope.body)
1622                    .map_err(|e| RoutingError::new(format!("decode DisconnectPeer: {e}")))?;
1623
1624                refs.node_directory
1625                    .tell(crate::system_actors::DisconnectPeer(peer_id))
1626                    .await
1627                    .map_err(|e| RoutingError::new(format!("NodeDirectory tell: {e}")))?;
1628
1629                Ok(RoutingOutcome::Acknowledged)
1630            }
1631
1632            other => Err(RoutingError::new(format!(
1633                "unhandled system message type: {other}"
1634            ))),
1635        }
1636    }
1637}
1638