Skip to main content

dactor_ractor/
runtime.rs

1//! V0.2 ractor adapter runtime for the dactor actor framework.
2//!
3//! Bridges dactor's `Actor`/`Handler<M>`/`ActorRef<A>` API with ractor's
4//! single-message-type `ractor::Actor` trait using type-erased dispatch.
5
6use std::collections::HashMap;
7use std::marker::PhantomData;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Arc, Mutex};
10use std::time::Duration;
11
12use futures::FutureExt;
13use tokio_util::sync::CancellationToken;
14
15use dactor::actor::{
16    Actor, ActorContext, ActorError, ActorRef, AskReply, ReduceHandler, Handler, ExpandHandler,
17    TransformHandler,
18};
19use dactor::dead_letter::{DeadLetterEvent, DeadLetterHandler, DeadLetterReason};
20use dactor::dispatch::{AskDispatch, Dispatch, ReduceDispatch, ExpandDispatch, TransformDispatch, TypedDispatch};
21use dactor::errors::{ActorSendError, ErrorAction, RuntimeError};
22use dactor::interceptor::{
23    collect_handler_wrappers, apply_handler_wrappers,
24    Disposition, DropObserver, InboundContext, InboundInterceptor, OutboundInterceptor, Outcome,
25    SendMode,
26};
27use dactor::mailbox::MailboxConfig;
28use dactor::message::{Headers, Message, RuntimeHeaders};
29use dactor::node::{ActorId, NodeId};
30use dactor::runtime_support::{
31    spawn_reduce_batched_drain, spawn_reduce_drain, spawn_transform_drain,
32    wrap_batched_stream_with_interception, wrap_stream_with_interception, OutboundPipeline,
33};
34use dactor::stream::{
35    BatchConfig, BatchReader, BatchWriter, BoxStream, StreamReceiver, StreamSender,
36};
37use dactor::supervision::ChildTerminated;
38use dactor::system_actors::{
39    CancelManager, CancelResponse, NodeDirectory, PeerStatus, SpawnManager, SpawnRequest,
40    SpawnResponse, WatchManager, WatchNotification,
41};
42use dactor::type_registry::TypeRegistry;
43
44use crate::cluster::RactorClusterEvents;
45
46// ---------------------------------------------------------------------------
47// Watch registry
48// ---------------------------------------------------------------------------
49
50/// A type-erased entry in the watch registry.
51struct WatchEntry {
52    watcher_id: ActorId,
53    /// Closure that delivers a [`ChildTerminated`] to the watcher actor.
54    notify: Box<dyn Fn(ChildTerminated) + Send + Sync>,
55}
56
57/// Shared watch registry mapping watched actor ID → list of watcher entries.
58type WatcherMap = Arc<Mutex<HashMap<ActorId, Vec<WatchEntry>>>>;
59
60// ---------------------------------------------------------------------------
61// Ractor wrapper actor
62// ---------------------------------------------------------------------------
63
64/// The ractor message type wrapping a type-erased dactor dispatch envelope.
65struct DactorMsg<A: Actor>(Box<dyn Dispatch<A>>);
66
67/// The ractor `Actor` implementation that bridges to a dactor `Actor`.
68struct RactorDactorActor<A: Actor> {
69    _phantom: PhantomData<fn() -> A>,
70}
71
72/// State held by the ractor actor, containing the dactor `Actor` instance.
73struct RactorActorState<A: Actor> {
74    actor: A,
75    ctx: ActorContext,
76    interceptors: Vec<Box<dyn InboundInterceptor>>,
77    watchers: WatcherMap,
78    stop_reason: Option<String>,
79    dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
80    /// Notified when the actor stops (for await_stop).
81    stop_notifier: Option<tokio::sync::oneshot::Sender<Result<(), String>>>,
82}
83
84/// Arguments passed to the ractor actor at spawn time.
85struct RactorSpawnArgs<A: Actor> {
86    args: A::Args,
87    deps: A::Deps,
88    actor_id: ActorId,
89    actor_name: String,
90    interceptors: Vec<Box<dyn InboundInterceptor>>,
91    watchers: WatcherMap,
92    dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
93    stop_notifier: Option<tokio::sync::oneshot::Sender<Result<(), String>>>,
94}
95
96impl<A: Actor + 'static> ractor::Actor for RactorDactorActor<A> {
97    type Msg = DactorMsg<A>;
98    type State = RactorActorState<A>;
99    type Arguments = RactorSpawnArgs<A>;
100
101    async fn pre_start(
102        &self,
103        _myself: ractor::ActorRef<Self::Msg>,
104        args: Self::Arguments,
105    ) -> Result<Self::State, ractor::ActorProcessingErr> {
106        let mut actor = A::create(args.args, args.deps);
107        let mut ctx = ActorContext::new(args.actor_id, args.actor_name);
108        actor.on_start(&mut ctx).await;
109        Ok(RactorActorState {
110            actor,
111            ctx,
112            interceptors: args.interceptors,
113            watchers: args.watchers,
114            stop_reason: None,
115            dead_letter_handler: args.dead_letter_handler,
116            stop_notifier: args.stop_notifier,
117        })
118    }
119
120    async fn handle(
121        &self,
122        myself: ractor::ActorRef<Self::Msg>,
123        message: Self::Msg,
124        state: &mut Self::State,
125    ) -> Result<(), ractor::ActorProcessingErr> {
126        let dispatch = message.0;
127
128        // Capture metadata before dispatch consumes the message
129        let send_mode = dispatch.send_mode();
130        let message_type = dispatch.message_type_name();
131
132        state.ctx.send_mode = Some(send_mode);
133        state.ctx.headers = Headers::new();
134
135        // Run inbound interceptor pipeline
136        let runtime_headers = RuntimeHeaders::new();
137        let mut headers = Headers::new();
138        let mut total_delay = Duration::ZERO;
139        let mut rejection: Option<(String, Disposition)> = None;
140
141        {
142            let ictx = InboundContext {
143                actor_id: state.ctx.actor_id.clone(),
144                actor_name: &state.ctx.actor_name,
145                message_type,
146                send_mode,
147                remote: false,
148                origin_node: None,
149            };
150
151            for interceptor in &state.interceptors {
152                match interceptor.on_receive(
153                    &ictx,
154                    &runtime_headers,
155                    &mut headers,
156                    dispatch.message_any(),
157                ) {
158                    Disposition::Continue => {}
159                    Disposition::Delay(d) => {
160                        total_delay += d;
161                    }
162                    disp @ (Disposition::Drop | Disposition::Reject(_) | Disposition::Retry(_)) => {
163                        rejection = Some((interceptor.name().to_string(), disp));
164                        break;
165                    }
166                }
167            }
168        }
169
170        // If rejected/dropped/retry, propagate proper error to caller
171        if let Some((interceptor_name, disposition)) = rejection {
172            if matches!(disposition, Disposition::Drop) {
173                if let Some(ref handler) = *state.dead_letter_handler {
174                    let event = DeadLetterEvent {
175                        target_id: state.ctx.actor_id.clone(),
176                        target_name: Some(state.ctx.actor_name.clone()),
177                        message_type,
178                        send_mode,
179                        reason: DeadLetterReason::DroppedByInterceptor {
180                            interceptor: interceptor_name.clone(),
181                        },
182                        message: None,
183                    };
184                    let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
185                        handler.on_dead_letter(event);
186                    }));
187                }
188            }
189            dispatch.reject(disposition, &interceptor_name);
190            return Ok(());
191        }
192
193        if !total_delay.is_zero() {
194            tokio::time::sleep(total_delay).await;
195        }
196
197        // Collect handler wrappers BEFORE moving headers into ctx,
198        // so interceptors can read headers without borrow conflicts.
199        let ictx_for_wrap = InboundContext {
200            actor_id: state.ctx.actor_id.clone(),
201            actor_name: &state.ctx.actor_name,
202            message_type,
203            send_mode,
204            remote: false,
205            origin_node: None,
206        };
207        let wrappers = collect_handler_wrappers(&state.interceptors, &ictx_for_wrap, &headers);
208        let needs_wrap = wrappers.iter().any(|w| w.is_some());
209
210        // Copy interceptor-populated headers to ActorContext so handler can access them
211        state.ctx.headers = headers;
212
213        // Propagate cancellation token to context
214        let cancel_token = dispatch.cancel_token();
215        state.ctx.set_cancellation_token(cancel_token.clone());
216
217        // Check if already cancelled before dispatching
218        if let Some(ref token) = cancel_token {
219            if token.is_cancelled() {
220                dispatch.cancel();
221                state.ctx.set_cancellation_token(None);
222                return Ok(());
223            }
224        }
225
226        // Dispatch with panic catching and cancellation racing
227        let result = if needs_wrap {
228            let (result_tx, mut result_rx) = tokio::sync::oneshot::channel();
229
230            let inner: std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>> = Box::pin(async {
231                let r = std::panic::AssertUnwindSafe(
232                    dispatch.dispatch(&mut state.actor, &mut state.ctx),
233                )
234                .catch_unwind()
235                .await;
236                let _ = result_tx.send(r);
237            });
238
239            let wrapped = apply_handler_wrappers(wrappers, inner);
240
241            let wrapped = std::panic::AssertUnwindSafe(wrapped);
242
243            if let Some(ref token) = cancel_token {
244                tokio::select! {
245                    biased;
246                    _ = wrapped.catch_unwind() => {},
247                    _ = token.cancelled() => {
248                        state.ctx.set_cancellation_token(None);
249                        return Ok(());
250                    }
251                }
252            } else {
253                wrapped.catch_unwind().await.ok();
254            }
255
256            match result_rx.try_recv() {
257                Ok(r) => r,
258                Err(_) => Err(Box::new(
259                    "interceptor wrap_handler did not await the handler future",
260                ) as Box<dyn std::any::Any + Send>),
261            }
262        } else {
263            if let Some(ref token) = cancel_token {
264                let dispatch_fut =
265                    std::panic::AssertUnwindSafe(dispatch.dispatch(&mut state.actor, &mut state.ctx))
266                        .catch_unwind();
267                tokio::select! {
268                    biased;
269                    r = dispatch_fut => r,
270                    _ = token.cancelled() => {
271                        state.ctx.set_cancellation_token(None);
272                        return Ok(());
273                    }
274                }
275            } else {
276                std::panic::AssertUnwindSafe(dispatch.dispatch(&mut state.actor, &mut state.ctx))
277                    .catch_unwind()
278                    .await
279            }
280        };
281
282        state.ctx.set_cancellation_token(None);
283
284        // Build context for on_complete
285        let ictx = InboundContext {
286            actor_id: state.ctx.actor_id.clone(),
287            actor_name: &state.ctx.actor_name,
288            message_type,
289            send_mode,
290            remote: false,
291            origin_node: None,
292        };
293
294        match result {
295            Ok(dispatch_result) => {
296                let outcome = match (&dispatch_result.reply, send_mode) {
297                    (Some(reply), SendMode::Ask) => Outcome::AskSuccess {
298                        reply: reply.as_ref(),
299                    },
300                    _ => Outcome::TellSuccess,
301                };
302
303                for interceptor in &state.interceptors {
304                    interceptor.on_complete(&ictx, &runtime_headers, &state.ctx.headers, &outcome);
305                }
306
307                // Send reply to caller AFTER interceptors have seen it
308                dispatch_result.send_reply();
309            }
310            Err(_panic) => {
311                let error = ActorError::internal("handler panicked");
312                let action = state.actor.on_error(&error);
313
314                let outcome = Outcome::HandlerError { error };
315                for interceptor in &state.interceptors {
316                    interceptor.on_complete(&ictx, &runtime_headers, &state.ctx.headers, &outcome);
317                }
318
319                match action {
320                    ErrorAction::Resume => {
321                        // Actor resumes — do NOT set stop_reason
322                    }
323                    ErrorAction::Stop | ErrorAction::Escalate => {
324                        state.stop_reason = Some("handler panicked".into());
325                        myself.stop(None);
326                    }
327                    ErrorAction::Restart => {
328                        tracing::warn!("Restart not fully implemented, treating as Resume");
329                    }
330                }
331            }
332        }
333
334        Ok(())
335    }
336
337    async fn post_stop(
338        &self,
339        _myself: ractor::ActorRef<Self::Msg>,
340        state: &mut Self::State,
341    ) -> Result<(), ractor::ActorProcessingErr> {
342        // Reset context to lifecycle semantics before on_stop
343        state.ctx.send_mode = None;
344        state.ctx.headers = Headers::new();
345        state.ctx.set_cancellation_token(None);
346
347        // Run on_stop with panic catching so panics propagate as errors
348        // through await_stop() instead of aborting the ractor task.
349        let on_stop_panicked =
350            std::panic::AssertUnwindSafe(state.actor.on_stop())
351                .catch_unwind()
352                .await
353                .is_err();
354        if on_stop_panicked && state.stop_reason.is_none() {
355            state.stop_reason = Some("actor panicked in on_stop".to_string());
356        }
357
358        // Notify all watchers that this actor has terminated.
359        let actor_id = state.ctx.actor_id.clone();
360        let actor_name = state.ctx.actor_name.clone();
361        let entries = {
362            let mut watchers = state.watchers.lock().unwrap();
363            // Remove entries where this actor is the TARGET (watchers of this actor)
364            let target_entries = watchers.remove(&actor_id).unwrap_or_default();
365            // Also clean up entries where this actor is the WATCHER (prevent leak)
366            for entries in watchers.values_mut() {
367                entries.retain(|e| e.watcher_id != actor_id);
368            }
369            watchers.retain(|_, v| !v.is_empty());
370            target_entries
371        };
372        if !entries.is_empty() {
373            let notification = ChildTerminated {
374                child_id: actor_id,
375                child_name: actor_name,
376                reason: state.stop_reason.clone(),
377            };
378            for entry in &entries {
379                (entry.notify)(notification.clone());
380            }
381        }
382
383        // Notify await_stop() waiters — track on_stop panic independently
384        // from stop_reason (which may already be set by a handler panic).
385        if let Some(tx) = state.stop_notifier.take() {
386            let result = if on_stop_panicked {
387                Err("actor panicked in on_stop".to_string())
388            } else {
389                Ok(())
390            };
391            let _ = tx.send(result);
392        }
393
394        Ok(())
395    }
396}
397
398// ---------------------------------------------------------------------------
399// RactorActorRef — dactor ActorRef backed by ractor
400// ---------------------------------------------------------------------------
401
402/// A dactor `ActorRef` backed by a ractor `ActorRef`.
403///
404/// Messages are delivered through ractor's mailbox as type-erased dispatch
405/// envelopes, enabling multiple `Handler<M>` impls per actor.
406///
407/// When configured with [`MailboxConfig::Bounded`], a bounded `mpsc` channel
408/// sits in front of the ractor actor, providing backpressure control at the
409/// dactor level while ractor's internal mailbox remains unbounded.
410pub struct RactorActorRef<A: Actor> {
411    id: ActorId,
412    name: String,
413    inner: ractor::ActorRef<DactorMsg<A>>,
414    bounded_tx: Option<BoundedMailboxSender<DactorMsg<A>>>,
415    outbound_interceptors: Arc<Vec<Box<dyn OutboundInterceptor>>>,
416    drop_observer: Option<Arc<dyn DropObserver>>,
417    dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
418}
419
420use dactor::runtime_support::BoundedMailboxSender;
421
422impl<A: Actor> Clone for RactorActorRef<A> {
423    fn clone(&self) -> Self {
424        Self {
425            id: self.id.clone(),
426            name: self.name.clone(),
427            inner: self.inner.clone(),
428            bounded_tx: self.bounded_tx.clone(),
429            outbound_interceptors: self.outbound_interceptors.clone(),
430            drop_observer: self.drop_observer.clone(),
431            dead_letter_handler: self.dead_letter_handler.clone(),
432        }
433    }
434}
435
436impl<A: Actor> std::fmt::Debug for RactorActorRef<A> {
437    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
438        write!(f, "RactorActorRef({}, {:?})", self.name, self.id)
439    }
440}
441
442impl<A: Actor + 'static> RactorActorRef<A> {
443    fn outbound_pipeline(&self) -> OutboundPipeline {
444        OutboundPipeline {
445            interceptors: self.outbound_interceptors.clone(),
446            drop_observer: self.drop_observer.clone(),
447            target_id: self.id.clone(),
448            target_name: self.name.clone(),
449        }
450    }
451
452    fn notify_dead_letter(
453        &self,
454        message_type: &'static str,
455        send_mode: SendMode,
456        reason: DeadLetterReason,
457    ) {
458        if let Some(ref handler) = *self.dead_letter_handler {
459            let event = DeadLetterEvent {
460                target_id: self.id.clone(),
461                target_name: Some(self.name.clone()),
462                message_type,
463                send_mode,
464                reason,
465                message: None,
466            };
467            let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
468                handler.on_dead_letter(event);
469            }));
470        }
471    }
472
473    /// Send a dispatch envelope through the bounded channel (if configured)
474    /// or directly to the ractor actor.
475    fn send_dispatch(&self, dispatch: Box<dyn Dispatch<A>>) -> Result<(), ActorSendError> {
476        if let Some(ref btx) = self.bounded_tx {
477            btx.try_send(DactorMsg(dispatch))
478        } else {
479            self.inner
480                .cast(DactorMsg(dispatch))
481                .map_err(|e| ActorSendError(e.to_string()))
482        }
483    }
484}
485
486impl<A: Actor + 'static> ActorRef<A> for RactorActorRef<A> {
487    fn id(&self) -> ActorId {
488        self.id.clone()
489    }
490
491    fn name(&self) -> String {
492        self.name.clone()
493    }
494
495    fn is_alive(&self) -> bool {
496        let inner_alive = matches!(
497            self.inner.get_status(),
498            ractor::ActorStatus::Running
499                | ractor::ActorStatus::Starting
500                | ractor::ActorStatus::Upgrading
501        );
502        if let Some(ref btx) = self.bounded_tx {
503            !btx.is_closed() && inner_alive
504        } else {
505            inner_alive
506        }
507    }
508
509    fn pending_messages(&self) -> usize {
510        if let Some(ref btx) = self.bounded_tx {
511            btx.pending()
512        } else {
513            0
514        }
515    }
516
517    fn stop(&self) {
518        self.inner.stop(None);
519    }
520
521    fn tell<M>(&self, msg: M) -> Result<(), ActorSendError>
522    where
523        A: Handler<M>,
524        M: Message<Reply = ()>,
525    {
526        let pipeline = self.outbound_pipeline();
527        let result = pipeline.run_on_send(SendMode::Tell, &msg);
528        match result.disposition {
529            Disposition::Continue => {}
530            Disposition::Delay(_) => {} // Not supported in sync tell
531            Disposition::Drop | Disposition::Reject(_) | Disposition::Retry(_) => return Ok(()),
532        }
533
534        let dispatch: Box<dyn Dispatch<A>> = Box::new(TypedDispatch { msg });
535        self.send_dispatch(dispatch).map_err(|e| {
536            let reason = if e.0.contains("full") {
537                DeadLetterReason::MailboxFull
538            } else {
539                DeadLetterReason::ActorStopped
540            };
541            self.notify_dead_letter(std::any::type_name::<M>(), SendMode::Tell, reason);
542            e
543        })
544    }
545
546    fn ask<M>(
547        &self,
548        msg: M,
549        cancel: Option<CancellationToken>,
550    ) -> Result<AskReply<M::Reply>, ActorSendError>
551    where
552        A: Handler<M>,
553        M: Message,
554    {
555        let pipeline = self.outbound_pipeline();
556        let result = pipeline.run_on_send(SendMode::Ask, &msg);
557        match result.disposition {
558            Disposition::Continue => {}
559            Disposition::Delay(_) => {} // Not supported in sync context
560            Disposition::Drop => {
561                let (tx, rx) = tokio::sync::oneshot::channel();
562                let _ = tx.send(Err(RuntimeError::ActorNotFound(
563                    "message dropped by outbound interceptor".into(),
564                )));
565                return Ok(AskReply::new(rx));
566            }
567            Disposition::Reject(reason) => {
568                let (tx, rx) = tokio::sync::oneshot::channel();
569                let _ = tx.send(Err(RuntimeError::Rejected {
570                    interceptor: result.interceptor_name.to_string(),
571                    reason,
572                }));
573                return Ok(AskReply::new(rx));
574            }
575            Disposition::Retry(retry_after) => {
576                let (tx, rx) = tokio::sync::oneshot::channel();
577                let _ = tx.send(Err(RuntimeError::RetryAfter {
578                    interceptor: result.interceptor_name.to_string(),
579                    retry_after,
580                }));
581                return Ok(AskReply::new(rx));
582            }
583        }
584
585        let (tx, rx) = tokio::sync::oneshot::channel();
586        let dispatch: Box<dyn Dispatch<A>> = Box::new(AskDispatch {
587            msg,
588            reply_tx: tx,
589            cancel,
590        });
591        self.send_dispatch(dispatch).map_err(|e| {
592            let reason = if e.0.contains("full") {
593                DeadLetterReason::MailboxFull
594            } else {
595                DeadLetterReason::ActorStopped
596            };
597            self.notify_dead_letter(std::any::type_name::<M>(), SendMode::Ask, reason);
598            e
599        })?;
600        Ok(AskReply::new(rx))
601    }
602
603    fn expand<M, OutputItem>(
604        &self,
605        msg: M,
606        buffer: usize,
607        batch_config: Option<BatchConfig>,
608        cancel: Option<CancellationToken>,
609    ) -> Result<BoxStream<OutputItem>, ActorSendError>
610    where
611        A: ExpandHandler<M, OutputItem>,
612        M: Send + 'static,
613        OutputItem: Send + 'static,
614    {
615        let pipeline = self.outbound_pipeline();
616        let result = pipeline.run_on_send(SendMode::Expand, &msg);
617        match result.disposition {
618            Disposition::Continue => {}
619            Disposition::Delay(_) => {}
620            Disposition::Drop => {
621                return Err(ActorSendError(
622                    "stream dropped by outbound interceptor".into(),
623                ));
624            }
625            Disposition::Reject(reason) => {
626                return Err(ActorSendError(format!("stream rejected: {}", reason)));
627            }
628            Disposition::Retry(_) => {
629                return Err(ActorSendError(
630                    "stream retry requested by interceptor".into(),
631                ));
632            }
633        }
634
635        let buffer = buffer.max(1);
636        let (tx, mut rx) = tokio::sync::mpsc::channel(buffer);
637        let sender = StreamSender::new(tx);
638        let dispatch: Box<dyn Dispatch<A>> = Box::new(ExpandDispatch {
639            msg,
640            sender,
641            cancel,
642        });
643        self.send_dispatch(dispatch)?;
644
645        match batch_config {
646            Some(batch_config) => {
647                let (batch_tx, batch_rx) = tokio::sync::mpsc::channel::<Vec<OutputItem>>(buffer);
648                let reader = BatchReader::new(batch_rx);
649                let batch_delay = batch_config.max_delay;
650                tokio::spawn(async move {
651                    let mut writer = BatchWriter::new(batch_tx, batch_config);
652                    loop {
653                        if writer.buffered_count() > 0 {
654                            let deadline = tokio::time::Instant::now() + batch_delay;
655                            tokio::select! {
656                                biased;
657                                item = rx.recv() => match item {
658                                    Some(item) => {
659                                        if writer.push(item).await.is_err() { break; }
660                                    }
661                                    None => break,
662                                },
663                                _ = tokio::time::sleep_until(deadline) => {
664                                    if writer.check_deadline().await.is_err() { break; }
665                                }
666                            }
667                        } else {
668                            match rx.recv().await {
669                                Some(item) => {
670                                    if writer.push(item).await.is_err() {
671                                        break;
672                                    }
673                                }
674                                None => break,
675                            }
676                        }
677                    }
678                    let _ = writer.flush().await;
679                });
680                Ok(wrap_batched_stream_with_interception(
681                    reader,
682                    buffer,
683                    pipeline,
684                    std::any::type_name::<M>(),
685                    SendMode::Expand,
686                ))
687            }
688            None => Ok(wrap_stream_with_interception(
689                    rx,
690                    buffer,
691                    pipeline,
692                    std::any::type_name::<M>(),
693                    SendMode::Expand,
694                )),
695        }
696    }
697
698    fn reduce<InputItem, Reply>(
699        &self,
700        input: BoxStream<InputItem>,
701        buffer: usize,
702        batch_config: Option<BatchConfig>,
703        cancel: Option<CancellationToken>,
704    ) -> Result<AskReply<Reply>, ActorSendError>
705    where
706        A: ReduceHandler<InputItem, Reply>,
707        InputItem: Send + 'static,
708        Reply: Send + 'static,
709    {
710        let buffer = buffer.max(1);
711        let (item_tx, item_rx) = tokio::sync::mpsc::channel(buffer);
712        let receiver = StreamReceiver::new(item_rx);
713        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
714        let dispatch: Box<dyn Dispatch<A>> = Box::new(ReduceDispatch {
715            receiver,
716            reply_tx,
717            cancel: cancel.clone(),
718        });
719        self.send_dispatch(dispatch)?;
720
721        let pipeline = self.outbound_pipeline();
722        match batch_config {
723            Some(batch_config) => {
724                spawn_reduce_batched_drain(
725                    input,
726                    item_tx,
727                    buffer,
728                    batch_config,
729                    cancel,
730                    pipeline,
731                    std::any::type_name::<InputItem>(),
732                );
733            }
734            None => {
735                spawn_reduce_drain(
736                    input,
737                    item_tx,
738                    cancel,
739                    pipeline,
740                    std::any::type_name::<InputItem>(),
741                );
742            }
743        }
744
745        Ok(AskReply::new(reply_rx))
746    }
747
748    fn transform<InputItem, OutputItem>(
749        &self,
750        input: BoxStream<InputItem>,
751        buffer: usize,
752        batch_config: Option<BatchConfig>,
753        cancel: Option<CancellationToken>,
754    ) -> Result<BoxStream<OutputItem>, ActorSendError>
755    where
756        A: TransformHandler<InputItem, OutputItem>,
757        InputItem: Send + 'static,
758        OutputItem: Send + 'static,
759    {
760        let buffer = buffer.max(1);
761        let (item_tx, item_rx) = tokio::sync::mpsc::channel(buffer);
762        let (output_tx, mut output_rx) = tokio::sync::mpsc::channel(buffer);
763        let receiver = StreamReceiver::new(item_rx);
764        let sender = StreamSender::new(output_tx);
765        let dispatch: Box<dyn Dispatch<A>> = Box::new(TransformDispatch::new(
766            receiver,
767            sender,
768            cancel.clone(),
769        ));
770        self.send_dispatch(dispatch)?;
771
772        let pipeline = self.outbound_pipeline();
773        spawn_transform_drain(
774            input,
775            item_tx,
776            cancel,
777            pipeline.clone(),
778            std::any::type_name::<InputItem>(),
779        );
780
781        match batch_config {
782            Some(batch_config) => {
783                let (batch_tx, batch_rx) =
784                    tokio::sync::mpsc::channel::<Vec<OutputItem>>(buffer);
785                let reader = BatchReader::new(batch_rx);
786                let batch_delay = batch_config.max_delay;
787                tokio::spawn(async move {
788                    let mut writer = BatchWriter::new(batch_tx, batch_config);
789                    loop {
790                        if writer.buffered_count() > 0 {
791                            let deadline = tokio::time::Instant::now() + batch_delay;
792                            tokio::select! {
793                                biased;
794                                item = output_rx.recv() => match item {
795                                    Some(item) => {
796                                        if writer.push(item).await.is_err() { break; }
797                                    }
798                                    None => break,
799                                },
800                                _ = tokio::time::sleep_until(deadline) => {
801                                    if writer.check_deadline().await.is_err() { break; }
802                                }
803                            }
804                        } else {
805                            match output_rx.recv().await {
806                                Some(item) => {
807                                    if writer.push(item).await.is_err() {
808                                        break;
809                                    }
810                                }
811                                None => break,
812                            }
813                        }
814                    }
815                    let _ = writer.flush().await;
816                });
817                Ok(wrap_batched_stream_with_interception(
818                    reader,
819                    buffer,
820                    pipeline,
821                    std::any::type_name::<OutputItem>(),
822                    SendMode::Transform,
823                ))
824            }
825            None => Ok(wrap_stream_with_interception(
826                output_rx,
827                buffer,
828                pipeline,
829                std::any::type_name::<OutputItem>(),
830                SendMode::Transform,
831            )),
832        }
833    }
834}
835
836// ---------------------------------------------------------------------------
837// SpawnOptions
838// ---------------------------------------------------------------------------
839
840/// Options for spawning an actor, including the inbound interceptor pipeline.
841/// Options for spawning an actor. Use `..Default::default()` to future-proof
842/// against new fields.
843pub struct SpawnOptions {
844    /// Inbound interceptors to attach to the actor.
845    pub interceptors: Vec<Box<dyn InboundInterceptor>>,
846    /// Mailbox capacity configuration.
847    ///
848    /// Mailbox capacity configuration.
849    ///
850    /// When [`Bounded`](MailboxConfig::Bounded), a bounded `mpsc` channel is
851    /// placed in front of the ractor actor to enforce backpressure.
852    pub mailbox: MailboxConfig,
853}
854
855impl Default for SpawnOptions {
856    fn default() -> Self {
857        Self {
858            interceptors: Vec::new(),
859            mailbox: MailboxConfig::Unbounded,
860        }
861    }
862}
863
864// ---------------------------------------------------------------------------
865// RactorRuntime
866// ---------------------------------------------------------------------------
867
868/// A dactor v0.2 runtime backed by ractor.
869///
870/// Actors are spawned as real ractor actors via [`ractor::Actor::spawn`].
871/// Messages are delivered through ractor's mailbox as type-erased dispatch
872/// envelopes, supporting multiple `Handler<M>` impls per actor.
873///
874/// ## System Actors
875///
876/// The runtime spawns native ractor system actors on creation:
877/// - [`SpawnManagerActor`](crate::system_actors::SpawnManagerActor) — handles remote spawn requests
878/// - [`WatchManagerActor`](crate::system_actors::WatchManagerActor) — handles remote watch/unwatch
879/// - [`CancelManagerActor`](crate::system_actors::CancelManagerActor) — handles remote cancellation
880/// - [`NodeDirectoryActor`](crate::system_actors::NodeDirectoryActor) — tracks peer connections
881///
882/// System actors are accessible via `system_actor_refs()` for transport routing.
883pub struct RactorRuntime {
884    node_id: NodeId,
885    next_local: Arc<AtomicU64>,
886    cluster_events: RactorClusterEvents,
887    outbound_interceptors: Arc<Vec<Box<dyn OutboundInterceptor>>>,
888    drop_observer: Option<Arc<dyn DropObserver>>,
889    dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
890    watchers: WatcherMap,
891    /// Plain struct system actors (backward-compatible sync API).
892    spawn_manager: SpawnManager,
893    watch_manager: WatchManager,
894    cancel_manager: CancelManager,
895    node_directory: NodeDirectory,
896    /// Native ractor system actor refs (lazily started via `start_system_actors()`).
897    system_actors: Option<RactorSystemActorRefs>,
898    /// Stop notification receivers for await_stop(), keyed by ActorId.
899    #[allow(clippy::type_complexity)]
900    stop_receivers: Arc<Mutex<HashMap<ActorId, tokio::sync::oneshot::Receiver<Result<(), String>>>>>,
901    /// Application version for this node (informational, used in handshake).
902    app_version: Option<String>,
903}
904
905/// References to the native ractor system actors spawned by the runtime.
906pub struct RactorSystemActorRefs {
907    pub spawn_manager: SystemActorHandle<crate::system_actors::SpawnManagerMsg>,
908    pub watch_manager: ractor::ActorRef<crate::system_actors::WatchManagerMsg>,
909    pub cancel_manager: ractor::ActorRef<crate::system_actors::CancelManagerMsg>,
910    pub node_directory: ractor::ActorRef<crate::system_actors::NodeDirectoryMsg>,
911}
912
913/// Handle to a system actor that may be a single actor or a pool.
914pub enum SystemActorHandle<M: ractor::Message> {
915    /// Single actor instance.
916    Single(ractor::ActorRef<M>),
917    /// Pool of actors with round-robin routing.
918    Pool(SystemActorPool<M>),
919}
920
921impl<M: ractor::Message> SystemActorHandle<M> {
922    /// Send a message to the system actor (or a pool member via round-robin).
923    pub fn cast(&self, msg: M) -> Result<(), ractor::MessagingErr<M>> {
924        match self {
925            SystemActorHandle::Single(r) => r.cast(msg),
926            SystemActorHandle::Pool(pool) => pool.cast(msg),
927        }
928    }
929}
930
931/// Round-robin pool of ractor actors.
932pub struct SystemActorPool<M: ractor::Message> {
933    workers: Vec<ractor::ActorRef<M>>,
934    counter: std::sync::atomic::AtomicU64,
935}
936
937impl<M: ractor::Message> SystemActorPool<M> {
938    /// Create a new pool from a vec of actor refs.
939    pub fn new(workers: Vec<ractor::ActorRef<M>>) -> Self {
940        assert!(!workers.is_empty(), "pool must have at least one worker");
941        Self {
942            workers,
943            counter: std::sync::atomic::AtomicU64::new(0),
944        }
945    }
946
947    /// Send a message to the next worker via round-robin.
948    pub fn cast(&self, msg: M) -> Result<(), ractor::MessagingErr<M>> {
949        let idx = self.counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
950            as usize % self.workers.len();
951        self.workers[idx].cast(msg)
952    }
953
954    /// Number of workers in the pool.
955    pub fn len(&self) -> usize {
956        self.workers.len()
957    }
958
959    /// Access all worker refs (for broadcasting to all workers).
960    pub fn workers(&self) -> &[ractor::ActorRef<M>] {
961        &self.workers
962    }
963}
964
965impl RactorRuntime {
966    /// Create a new `RactorRuntime`.
967    ///
968    /// System actors are not spawned until `start_system_actors()` is called.
969    /// This allows the runtime to be constructed outside a tokio context.
970    pub fn new() -> Self {
971        Self::create(NodeId("ractor-node".into()))
972    }
973
974    /// Create a new `RactorRuntime` with a specific node ID.
975    pub fn with_node_id(node_id: NodeId) -> Self {
976        Self::create(node_id)
977    }
978
979    fn create(node_id: NodeId) -> Self {
980        Self {
981            node_id,
982            next_local: Arc::new(AtomicU64::new(1)),
983            cluster_events: RactorClusterEvents::new(),
984            outbound_interceptors: Arc::new(Vec::new()),
985            drop_observer: None,
986            dead_letter_handler: Arc::new(None),
987            watchers: Arc::new(Mutex::new(HashMap::new())),
988            spawn_manager: SpawnManager::new(TypeRegistry::new()),
989            watch_manager: WatchManager::new(),
990            cancel_manager: CancelManager::new(),
991            node_directory: NodeDirectory::new(),
992            system_actors: None,
993            stop_receivers: Arc::new(Mutex::new(HashMap::new())),
994            app_version: None,
995        }
996    }
997
998    /// The adapter name for this runtime, used in version handshakes.
999    pub const ADAPTER_NAME: &'static str = "ractor";
1000
1001    /// Set the application version for this node.
1002    ///
1003    /// This is your application's release version (e.g., "2.3.1"), not the
1004    /// dactor framework version. It is included in handshake requests for
1005    /// operational visibility during rolling upgrades.
1006    pub fn with_app_version(mut self, version: impl Into<String>) -> Self {
1007        self.app_version = Some(version.into());
1008        self
1009    }
1010
1011    /// Returns the configured application version, if any.
1012    pub fn app_version(&self) -> Option<&str> {
1013        self.app_version.as_deref()
1014    }
1015
1016    /// Build a [`HandshakeRequest`](dactor::HandshakeRequest) from this
1017    /// runtime's current configuration.
1018    pub fn handshake_request(&self) -> dactor::HandshakeRequest {
1019        dactor::HandshakeRequest::from_runtime(
1020            self.node_id.clone(),
1021            self.app_version.clone(),
1022            Self::ADAPTER_NAME,
1023        )
1024    }
1025
1026    /// Spawn native ractor system actors for transport routing.
1027    ///
1028    /// Must be called from within a tokio runtime context. After this call,
1029    /// `system_actor_refs()` returns the native actor references.
1030    ///
1031    /// Factory registrations made via `register_factory()` before this call
1032    /// are forwarded to the native SpawnManagerActor.
1033    ///
1034    /// Uses default configuration (unbounded mailboxes, no pooling).
1035    /// For custom configuration, use [`start_system_actors_with_config()`].
1036    pub async fn start_system_actors(&mut self) {
1037        self.start_system_actors_with_config(dactor::SystemActorConfig::default()).await;
1038    }
1039
1040    /// Spawn native ractor system actors with custom configuration.
1041    ///
1042    /// Allows configuring mailbox capacity and SpawnManager pooling for
1043    /// high-throughput scenarios. See [`SystemActorConfig`](dactor::SystemActorConfig)
1044    /// for details.
1045    ///
1046    /// # Example
1047    ///
1048    /// ```rust,ignore
1049    /// use dactor::system_actors::SystemActorConfig;
1050    /// use dactor::mailbox::{MailboxConfig, OverflowStrategy};
1051    ///
1052    /// let config = SystemActorConfig::default()
1053    ///     .with_spawn_manager_mailbox(
1054    ///         MailboxConfig::bounded(10_000, OverflowStrategy::Block)
1055    ///     )
1056    ///     .with_spawn_manager_pool_size(4)
1057    ///     .with_control_plane_mailbox(
1058    ///         MailboxConfig::bounded(5_000, OverflowStrategy::Block)
1059    ///     );
1060    ///
1061    /// runtime.start_system_actors_with_config(config).await;
1062    /// ```
1063    pub async fn start_system_actors_with_config(
1064        &mut self,
1065        config: dactor::SystemActorConfig,
1066    ) {
1067        use crate::system_actors::*;
1068
1069        // --- SpawnManager (optionally pooled) ---
1070        let pool_size = config.spawn_manager_pool_size.unwrap_or(1).max(1);
1071        let mut spawn_refs = Vec::with_capacity(pool_size);
1072
1073        for _ in 0..pool_size {
1074            let (spawn_ref, _) = ractor::Actor::spawn(
1075                None, SpawnManagerActor,
1076                (self.node_id.clone(), TypeRegistry::new(), self.next_local.clone()),
1077            ).await.expect("failed to spawn SpawnManagerActor");
1078            spawn_refs.push(spawn_ref);
1079        }
1080
1081        let spawn_manager = if spawn_refs.len() == 1 {
1082            SystemActorHandle::Single(spawn_refs.pop().unwrap())
1083        } else {
1084            SystemActorHandle::Pool(SystemActorPool::new(spawn_refs))
1085        };
1086
1087        // --- Control-plane actors (never pooled) ---
1088        let (watch_ref, _) = ractor::Actor::spawn(
1089            None, WatchManagerActor, (),
1090        ).await.expect("failed to spawn WatchManagerActor");
1091
1092        let (cancel_ref, _) = ractor::Actor::spawn(
1093            None, CancelManagerActor, (),
1094        ).await.expect("failed to spawn CancelManagerActor");
1095
1096        let (node_dir_ref, _) = ractor::Actor::spawn(
1097            None, NodeDirectoryActor, (),
1098        ).await.expect("failed to spawn NodeDirectoryActor");
1099
1100        self.system_actors = Some(RactorSystemActorRefs {
1101            spawn_manager,
1102            watch_manager: watch_ref,
1103            cancel_manager: cancel_ref,
1104            node_directory: node_dir_ref,
1105        });
1106    }
1107
1108    /// Returns the node ID of this runtime.
1109    pub fn node_id(&self) -> &NodeId {
1110        &self.node_id
1111    }
1112
1113    /// Access the native system actor references for transport routing.
1114    ///
1115    /// Returns `None` if `start_system_actors()` has not been called yet.
1116    pub fn system_actor_refs(&self) -> Option<&RactorSystemActorRefs> {
1117        self.system_actors.as_ref()
1118    }
1119
1120    /// Add a global outbound interceptor.
1121    ///
1122    /// **Must be called before any actors are spawned.** Panics if actors
1123    /// already hold references to the interceptor list (i.e., after `spawn()`).
1124    pub fn add_outbound_interceptor(&mut self, interceptor: Box<dyn OutboundInterceptor>) {
1125        Arc::get_mut(&mut self.outbound_interceptors)
1126            .expect("cannot add interceptors after actors are spawned")
1127            .push(interceptor);
1128    }
1129
1130    /// Set a global drop observer that is notified whenever an outbound
1131    /// interceptor returns `Disposition::Drop`.
1132    pub fn set_drop_observer(&mut self, observer: Arc<dyn DropObserver>) {
1133        self.drop_observer = Some(observer);
1134    }
1135
1136    /// Set a global dead letter handler. Called whenever a message cannot be
1137    /// delivered (actor stopped, mailbox full, dropped by inbound interceptor).
1138    pub fn set_dead_letter_handler(&mut self, handler: Arc<dyn DeadLetterHandler>) {
1139        self.dead_letter_handler = Arc::new(Some(handler));
1140    }
1141
1142    /// Access the cluster events subsystem.
1143    pub fn cluster_events_handle(&self) -> &RactorClusterEvents {
1144        &self.cluster_events
1145    }
1146
1147    /// Access the cluster events subsystem.
1148    pub fn cluster_events(&self) -> &RactorClusterEvents {
1149        &self.cluster_events
1150    }
1151
1152    /// Spawn an actor with `Deps = ()`.
1153    pub async fn spawn<A>(&self, name: &str, args: A::Args) -> Result<RactorActorRef<A>, dactor::errors::RuntimeError>
1154    where
1155        A: Actor<Deps = ()> + 'static,
1156    {
1157        self.spawn_internal::<A>(name, args, (), Vec::new(), MailboxConfig::Unbounded).await
1158    }
1159
1160    /// Spawn an actor with explicit dependencies.
1161    pub async fn spawn_with_deps<A>(&self, name: &str, args: A::Args, deps: A::Deps) -> Result<RactorActorRef<A>, dactor::errors::RuntimeError>
1162    where
1163        A: Actor + 'static,
1164    {
1165        self.spawn_internal::<A>(name, args, deps, Vec::new(), MailboxConfig::Unbounded).await
1166    }
1167
1168    /// Spawn an actor with spawn options (including inbound interceptors and mailbox config).
1169    pub async fn spawn_with_options<A>(
1170        &self,
1171        name: &str,
1172        args: A::Args,
1173        options: SpawnOptions,
1174    ) -> Result<RactorActorRef<A>, dactor::errors::RuntimeError>
1175    where
1176        A: Actor<Deps = ()> + 'static,
1177    {
1178        self.spawn_internal::<A>(name, args, (), options.interceptors, options.mailbox).await
1179    }
1180
1181    async fn spawn_internal<A>(
1182        &self,
1183        name: &str,
1184        args: A::Args,
1185        deps: A::Deps,
1186        interceptors: Vec<Box<dyn InboundInterceptor>>,
1187        mailbox: MailboxConfig,
1188    ) -> Result<RactorActorRef<A>, dactor::errors::RuntimeError>
1189    where
1190        A: Actor + 'static,
1191    {
1192        let local = self.next_local.fetch_add(1, Ordering::SeqCst);
1193        let actor_id = ActorId {
1194            node: self.node_id.clone(),
1195            local,
1196        };
1197        let actor_name = name.to_string();
1198
1199        let (stop_tx, stop_rx) = tokio::sync::oneshot::channel();
1200
1201        let wrapper = RactorDactorActor::<A> {
1202            _phantom: PhantomData,
1203        };
1204        let spawn_args = RactorSpawnArgs {
1205            args,
1206            deps,
1207            actor_id: actor_id.clone(),
1208            actor_name: actor_name.clone(),
1209            interceptors,
1210            watchers: self.watchers.clone(),
1211            dead_letter_handler: self.dead_letter_handler.clone(),
1212            stop_notifier: Some(stop_tx),
1213        };
1214
1215        let (actor_ref, _join_handle) = ractor::Actor::spawn(Some(name.to_string()), wrapper, spawn_args)
1216            .await
1217            .map_err(|e| dactor::errors::RuntimeError::SpawnFailed(e.to_string()))?;
1218
1219        // Set up optional bounded mailbox channel
1220        let bounded_tx = match mailbox {
1221            MailboxConfig::Bounded { capacity, overflow } => {
1222                let (btx, mut brx) = tokio::sync::mpsc::channel::<DactorMsg<A>>(capacity);
1223                let fwd_ref = actor_ref.clone();
1224                tokio::spawn(async move {
1225                    while let Some(msg) = brx.recv().await {
1226                        if fwd_ref.cast(msg).is_err() {
1227                            break;
1228                        }
1229                    }
1230                });
1231                Some(BoundedMailboxSender::new(btx, overflow))
1232            }
1233            MailboxConfig::Unbounded => None,
1234        };
1235
1236        // Store stop receiver for await_stop()
1237        self.stop_receivers.lock().unwrap().insert(actor_id.clone(), stop_rx);
1238
1239        Ok(RactorActorRef {
1240            id: actor_id,
1241            name: actor_name,
1242            inner: actor_ref,
1243            bounded_tx,
1244            outbound_interceptors: self.outbound_interceptors.clone(),
1245            drop_observer: self.drop_observer.clone(),
1246            dead_letter_handler: self.dead_letter_handler.clone(),
1247        })
1248    }
1249
1250    /// Register actor `watcher` to be notified when `target_id` terminates.
1251    ///
1252    /// The watcher must implement `Handler<ChildTerminated>`. When the target
1253    /// stops, the runtime delivers a [`ChildTerminated`] message to the watcher.
1254    pub fn watch<W>(&self, watcher: &RactorActorRef<W>, target_id: ActorId)
1255    where
1256        W: Actor + Handler<ChildTerminated> + 'static,
1257    {
1258        let watcher_id = watcher.id();
1259        let watcher_inner = watcher.inner.clone();
1260
1261        let entry = WatchEntry {
1262            watcher_id,
1263            notify: Box::new(move |msg: ChildTerminated| {
1264                let dispatch: Box<dyn Dispatch<W>> = Box::new(TypedDispatch { msg });
1265                if watcher_inner.cast(DactorMsg(dispatch)).is_err() {
1266                    tracing::debug!("watch notification dropped — watcher may have stopped");
1267                }
1268            }),
1269        };
1270
1271        let mut watchers = self.watchers.lock().unwrap();
1272        watchers.entry(target_id).or_default().push(entry);
1273    }
1274
1275    /// Unregister `watcher_id` from notifications about `target_id`.
1276    pub fn unwatch(&self, watcher_id: &ActorId, target_id: &ActorId) {
1277        let mut watchers = self.watchers.lock().unwrap();
1278        if let Some(entries) = watchers.get_mut(target_id) {
1279            entries.retain(|e| &e.watcher_id != watcher_id);
1280            if entries.is_empty() {
1281                watchers.remove(target_id);
1282            }
1283        }
1284    }
1285
1286    // -----------------------------------------------------------------------
1287    // SA1: SpawnManager wiring
1288    // -----------------------------------------------------------------------
1289
1290    /// Access the spawn manager.
1291    pub fn spawn_manager(&self) -> &SpawnManager {
1292        &self.spawn_manager
1293    }
1294
1295    /// Access the spawn manager mutably (for registering actor factories).
1296    pub fn spawn_manager_mut(&mut self) -> &mut SpawnManager {
1297        &mut self.spawn_manager
1298    }
1299
1300    /// Register an actor type for remote spawning on this node.
1301    ///
1302    /// Registers the factory in both the struct-based SpawnManager and the
1303    /// native SpawnManagerActor (if started via `start_system_actors()`).
1304    pub fn register_factory(
1305        &mut self,
1306        type_name: impl Into<String>,
1307        factory: impl Fn(&[u8]) -> Result<Box<dyn std::any::Any + Send>, dactor::remote::SerializationError>
1308            + Send
1309            + Sync
1310            + 'static,
1311    ) {
1312        let type_name = type_name.into();
1313        let factory = Arc::new(factory);
1314
1315        // Register in struct-based manager
1316        self.spawn_manager
1317            .type_registry_mut()
1318            .register_factory(type_name.clone(), {
1319                let f = factory.clone();
1320                move |bytes: &[u8]| f(bytes)
1321            });
1322
1323        // Forward to native actor(s) if started
1324        if let Some(ref actors) = self.system_actors {
1325            match &actors.spawn_manager {
1326                SystemActorHandle::Single(r) => {
1327                    let (tx, _rx) = tokio::sync::oneshot::channel();
1328                    let f = factory;
1329                    let _ = r.cast(
1330                        crate::system_actors::SpawnManagerMsg::RegisterFactory {
1331                            type_name,
1332                            factory: Box::new(move |bytes: &[u8]| f(bytes)),
1333                            reply: tx,
1334                        },
1335                    );
1336                }
1337                SystemActorHandle::Pool(pool) => {
1338                    // Broadcast factory registration to all pool workers
1339                    for worker in pool.workers() {
1340                        let f = factory.clone();
1341                        let (tx, _rx) = tokio::sync::oneshot::channel();
1342                        let _ = worker.cast(
1343                            crate::system_actors::SpawnManagerMsg::RegisterFactory {
1344                                type_name: type_name.clone(),
1345                                factory: Box::new(move |bytes: &[u8]| f(bytes)),
1346                                reply: tx,
1347                            },
1348                        );
1349                    }
1350                }
1351            }
1352        }
1353    }
1354
1355    /// Process a remote spawn request.
1356    ///
1357    /// Looks up the actor type in the registry, deserializes Args from bytes,
1358    /// and returns the constructed actor along with its assigned [`ActorId`].
1359    ///
1360    /// The returned `ActorId` is pre-assigned for the remote spawn flow where
1361    /// the runtime controls ID assignment. The caller must use this ID when
1362    /// registering the spawned actor (not via the regular `spawn()` path,
1363    /// which assigns its own IDs).
1364    ///
1365    /// **Note:** Currently uses the simple factory API (`TypeRegistry::create_actor`).
1366    /// For actors with non-trivial `Deps`, use `spawn_manager_mut()` to access
1367    /// `TypeRegistry::create_actor_with_deps()` directly.
1368    ///
1369    /// Returns `Ok((actor_id, actor))` on success, or `Err(SpawnResponse::Failure)`
1370    /// if the type is not found or deserialization fails.
1371    pub fn handle_spawn_request(
1372        &mut self,
1373        request: &SpawnRequest,
1374    ) -> Result<(ActorId, Box<dyn std::any::Any + Send>), SpawnResponse> {
1375        match self.spawn_manager.create_actor(request) {
1376            Ok(actor) => {
1377                let local = self.next_local.fetch_add(1, Ordering::SeqCst);
1378                let actor_id = ActorId {
1379                    node: self.node_id.clone(),
1380                    local,
1381                };
1382                self.spawn_manager.record_spawn(actor_id.clone());
1383                Ok((actor_id, actor))
1384            }
1385            Err(e) => Err(SpawnResponse::Failure {
1386                request_id: request.request_id.clone(),
1387                error: e.to_string(),
1388            }),
1389        }
1390    }
1391
1392    // -----------------------------------------------------------------------
1393    // SA2: WatchManager wiring
1394    // -----------------------------------------------------------------------
1395
1396    /// Access the watch manager (for remote watch subscriptions).
1397    pub fn watch_manager(&self) -> &WatchManager {
1398        &self.watch_manager
1399    }
1400
1401    /// Access the watch manager mutably.
1402    pub fn watch_manager_mut(&mut self) -> &mut WatchManager {
1403        &mut self.watch_manager
1404    }
1405
1406    /// Register a remote watch: a remote watcher wants to know when a local
1407    /// actor terminates.
1408    pub fn remote_watch(&mut self, target: ActorId, watcher: ActorId) {
1409        self.watch_manager.watch(target, watcher);
1410    }
1411
1412    /// Remove a remote watch subscription.
1413    pub fn remote_unwatch(&mut self, target: &ActorId, watcher: &ActorId) {
1414        self.watch_manager.unwatch(target, watcher);
1415    }
1416
1417    /// Called when a local actor terminates. Returns notifications for all
1418    /// remote watchers that should be sent to their respective nodes.
1419    ///
1420    /// **Note:** This must be called explicitly by the integration layer.
1421    /// It is not yet automatically wired into ractor's actor stop lifecycle.
1422    pub fn notify_terminated(&mut self, terminated: &ActorId) -> Vec<WatchNotification> {
1423        self.watch_manager.on_terminated(terminated)
1424    }
1425
1426    // -----------------------------------------------------------------------
1427    // SA3: CancelManager wiring
1428    // -----------------------------------------------------------------------
1429
1430    /// Access the cancel manager.
1431    pub fn cancel_manager(&self) -> &CancelManager {
1432        &self.cancel_manager
1433    }
1434
1435    /// Access the cancel manager mutably.
1436    pub fn cancel_manager_mut(&mut self) -> &mut CancelManager {
1437        &mut self.cancel_manager
1438    }
1439
1440    /// Register a cancellation token for a request (for remote cancel support).
1441    pub fn register_cancel(&mut self, request_id: String, token: CancellationToken) {
1442        self.cancel_manager.register(request_id, token);
1443    }
1444
1445    /// Process a remote cancellation request.
1446    pub fn cancel_request(&mut self, request_id: &str) -> CancelResponse {
1447        self.cancel_manager.cancel(request_id)
1448    }
1449
1450    /// Clean up a cancellation token after its request completes normally.
1451    ///
1452    /// Should be called when a remote ask/stream/feed completes successfully
1453    /// to prevent stale tokens from accumulating.
1454    pub fn complete_request(&mut self, request_id: &str) {
1455        self.cancel_manager.remove(request_id);
1456    }
1457
1458    // -----------------------------------------------------------------------
1459    // SA4: NodeDirectory wiring
1460    // -----------------------------------------------------------------------
1461
1462    /// Access the node directory.
1463    pub fn node_directory(&self) -> &NodeDirectory {
1464        &self.node_directory
1465    }
1466
1467    /// Access the node directory mutably.
1468    pub fn node_directory_mut(&mut self) -> &mut NodeDirectory {
1469        &mut self.node_directory
1470    }
1471
1472    /// Register a peer node as connected in the directory.
1473    ///
1474    /// **Post-validation only:** this method assumes the peer has already
1475    /// passed the version handshake. Callers should validate compatibility
1476    /// (via [`handshake_request`](Self::handshake_request) +
1477    /// [`validate_handshake`](dactor::validate_handshake) +
1478    /// [`verify_peer_identity`](dactor::verify_peer_identity)) before calling
1479    /// this method. Direct calls bypass compatibility checks.
1480    ///
1481    /// If the peer already exists, updates its status to `Connected` and
1482    /// preserves the existing address when `address` is `None`.
1483    /// Emits a `ClusterEvent::NodeJoined` if the peer was not previously connected.
1484    pub fn connect_peer(&mut self, peer_id: NodeId, address: Option<String>) {
1485        let was_connected = self.node_directory.is_connected(&peer_id);
1486        if let Some(existing) = self.node_directory.get_peer(&peer_id) {
1487            // Preserve existing address if new address is None
1488            let resolved_address = address.or_else(|| existing.address.clone());
1489            self.node_directory.remove_peer(&peer_id);
1490            self.node_directory.add_peer(peer_id.clone(), resolved_address);
1491        } else {
1492            self.node_directory.add_peer(peer_id.clone(), address);
1493        }
1494        self.node_directory.set_status(&peer_id, PeerStatus::Connected);
1495        if !was_connected {
1496            self.cluster_events.emit(dactor::ClusterEvent::NodeJoined(peer_id));
1497        }
1498    }
1499
1500    /// Mark a peer as disconnected.
1501    ///
1502    /// Emits a `ClusterEvent::NodeLeft` if the peer was previously connected.
1503    pub fn disconnect_peer(&mut self, peer_id: &NodeId) {
1504        let was_connected = self.node_directory.is_connected(peer_id);
1505        self.node_directory.set_status(peer_id, PeerStatus::Disconnected);
1506        if was_connected {
1507            self.cluster_events.emit(dactor::ClusterEvent::NodeLeft(peer_id.clone()));
1508        }
1509    }
1510
1511    /// Check if a peer node is connected.
1512    pub fn is_peer_connected(&self, peer_id: &NodeId) -> bool {
1513        self.node_directory.is_connected(peer_id)
1514    }
1515
1516    // -----------------------------------------------------------------------
1517    // JH1-JH2: Actor lifecycle handles
1518    // -----------------------------------------------------------------------
1519
1520    /// Wait for an actor to stop.
1521    ///
1522    /// Returns `Ok(())` when the actor finishes cleanly, or `Err` if the
1523    /// actor panicked in `on_stop`. The stop receiver is consumed and removed
1524    /// from the map.
1525    ///
1526    /// Returns `Ok(())` immediately if no stop receiver is stored for this ID
1527    /// (e.g., actor was already awaited or was not spawned by this runtime).
1528    pub async fn await_stop(&self, actor_id: &ActorId) -> Result<(), String> {
1529        let rx = {
1530            let mut receivers = self.stop_receivers.lock().unwrap();
1531            receivers.remove(actor_id)
1532        };
1533        match rx {
1534            Some(rx) => rx
1535                .await
1536                .map_err(|_| "stop notifier dropped".to_string())
1537                .and_then(|r| r),
1538            None => Ok(()),
1539        }
1540    }
1541
1542    /// Wait for all spawned actors to stop.
1543    ///
1544    /// Drains all stored stop receivers and awaits them all. Returns the first
1545    /// error encountered, but always waits for every actor to finish.
1546    pub async fn await_all(&self) -> Result<(), String> {
1547        let receivers: Vec<_> = {
1548            let mut map = self.stop_receivers.lock().unwrap();
1549            map.drain().collect()
1550        };
1551        let mut first_error = None;
1552        for (_, rx) in receivers {
1553            let result = rx.await.map_err(|e| format!("stop notifier dropped: {e}")).and_then(|r| r);
1554            if let Err(e) = result {
1555                if first_error.is_none() {
1556                    first_error = Some(e);
1557                }
1558            }
1559        }
1560        match first_error {
1561            Some(e) => Err(e),
1562            None => Ok(()),
1563        }
1564    }
1565
1566    /// Remove completed stop receivers from the map.
1567    ///
1568    /// Call periodically to prevent stale entries from accumulating
1569    /// for actors that stopped without being awaited.
1570    pub fn cleanup_finished(&self) {
1571        let mut receivers = self.stop_receivers.lock().unwrap();
1572        receivers.retain(|_, rx| {
1573            matches!(rx.try_recv(), Err(tokio::sync::oneshot::error::TryRecvError::Empty))
1574        });
1575    }
1576
1577    /// Number of actors with stored stop receivers.
1578    ///
1579    /// Note: includes receivers for actors that have already stopped but
1580    /// haven't been awaited or cleaned up. Call `cleanup_finished()` first
1581    /// for an accurate count of running actors.
1582    pub fn active_handle_count(&self) -> usize {
1583        self.stop_receivers.lock().unwrap().len()
1584    }
1585}
1586
1587impl Default for RactorRuntime {
1588    fn default() -> Self {
1589        Self::new()
1590    }
1591}
1592
1593// ---------------------------------------------------------------------------
1594// NA10: SystemMessageRouter for ractor
1595// ---------------------------------------------------------------------------
1596
1597// NOTE: System messages routed here update the **native system actors** (the
1598// mailbox-based ractor actors spawned by `start_system_actors()`). The runtime
1599// also keeps plain struct system actors (`self.watch_manager`, etc.) for the
1600// backward-compatible sync API. This dual-state pattern is intentional — see
1601// progress.md "Dual struct+actor pattern" design decision.
1602
1603#[async_trait::async_trait]
1604impl dactor::system_router::SystemMessageRouter for RactorRuntime {
1605    async fn route_system_envelope(
1606        &self,
1607        envelope: dactor::remote::WireEnvelope,
1608    ) -> Result<dactor::system_router::RoutingOutcome, dactor::system_router::RoutingError> {
1609        use dactor::system_actors::*;
1610        use dactor::system_router::{RoutingError, RoutingOutcome};
1611
1612        dactor::system_router::validate_system_message_type(&envelope.message_type)?;
1613
1614        let refs = self
1615            .system_actors
1616            .as_ref()
1617            .ok_or_else(|| RoutingError::new("system actors not started"))?;
1618
1619        match envelope.message_type.as_str() {
1620            SYSTEM_MSG_TYPE_SPAWN => {
1621                let request = dactor::proto::decode_spawn_request(&envelope.body)
1622                    .map_err(|e| RoutingError::new(format!("decode SpawnRequest: {e}")))?;
1623
1624                let req_id = request.request_id.clone();
1625                let (tx, rx) = tokio::sync::oneshot::channel();
1626                refs.spawn_manager
1627                    .cast(crate::system_actors::SpawnManagerMsg::HandleRequest {
1628                        request,
1629                        reply: tx,
1630                    })
1631                    .map_err(|e| RoutingError::new(format!("SpawnManager mailbox: {e}")))?;
1632
1633                let result = rx
1634                    .await
1635                    .map_err(|_| RoutingError::new("SpawnManager reply dropped"))?;
1636
1637                match result {
1638                    Ok((actor_id, _actor)) => Ok(RoutingOutcome::SpawnCompleted {
1639                        request_id: req_id,
1640                        actor_id,
1641                    }),
1642                    Err(SpawnResponse::Failure { request_id, error }) => {
1643                        Ok(RoutingOutcome::SpawnFailed { request_id, error })
1644                    }
1645                    Err(SpawnResponse::Success { .. }) => {
1646                        unreachable!("SpawnResult::Err always wraps SpawnResponse::Failure")
1647                    }
1648                }
1649            }
1650
1651            SYSTEM_MSG_TYPE_WATCH => {
1652                let request = dactor::proto::decode_watch_request(&envelope.body)
1653                    .map_err(|e| RoutingError::new(format!("decode WatchRequest: {e}")))?;
1654
1655                refs.watch_manager
1656                    .cast(crate::system_actors::WatchManagerMsg::Watch {
1657                        target: request.target,
1658                        watcher: request.watcher,
1659                    })
1660                    .map_err(|e| RoutingError::new(format!("WatchManager mailbox: {e}")))?;
1661
1662                Ok(RoutingOutcome::Acknowledged)
1663            }
1664
1665            SYSTEM_MSG_TYPE_UNWATCH => {
1666                let request = dactor::proto::decode_unwatch_request(&envelope.body)
1667                    .map_err(|e| RoutingError::new(format!("decode UnwatchRequest: {e}")))?;
1668
1669                refs.watch_manager
1670                    .cast(crate::system_actors::WatchManagerMsg::Unwatch {
1671                        target: request.target,
1672                        watcher: request.watcher,
1673                    })
1674                    .map_err(|e| RoutingError::new(format!("WatchManager mailbox: {e}")))?;
1675
1676                Ok(RoutingOutcome::Acknowledged)
1677            }
1678
1679            SYSTEM_MSG_TYPE_CANCEL => {
1680                let request = dactor::proto::decode_cancel_request(&envelope.body)
1681                    .map_err(|e| RoutingError::new(format!("decode CancelRequest: {e}")))?;
1682
1683                let request_id = request
1684                    .request_id
1685                    .ok_or_else(|| RoutingError::new("CancelRequest missing request_id"))?;
1686
1687                let (tx, rx) = tokio::sync::oneshot::channel();
1688                refs.cancel_manager
1689                    .cast(crate::system_actors::CancelManagerMsg::Cancel {
1690                        request_id,
1691                        reply: tx,
1692                    })
1693                    .map_err(|e| RoutingError::new(format!("CancelManager mailbox: {e}")))?;
1694
1695                let response = rx
1696                    .await
1697                    .map_err(|_| RoutingError::new("CancelManager reply dropped"))?;
1698
1699                match response {
1700                    CancelResponse::Acknowledged => Ok(RoutingOutcome::CancelAcknowledged),
1701                    CancelResponse::NotFound { reason } => {
1702                        Ok(RoutingOutcome::CancelNotFound { reason })
1703                    }
1704                }
1705            }
1706
1707            SYSTEM_MSG_TYPE_CONNECT_PEER => {
1708                let (peer_id, address) = dactor::proto::decode_connect_peer(&envelope.body)
1709                    .map_err(|e| RoutingError::new(format!("decode ConnectPeer: {e}")))?;
1710
1711                refs.node_directory
1712                    .cast(crate::system_actors::NodeDirectoryMsg::ConnectPeer {
1713                        peer_id,
1714                        address,
1715                    })
1716                    .map_err(|e| RoutingError::new(format!("NodeDirectory mailbox: {e}")))?;
1717
1718                Ok(RoutingOutcome::Acknowledged)
1719            }
1720
1721            SYSTEM_MSG_TYPE_DISCONNECT_PEER => {
1722                let peer_id = dactor::proto::decode_disconnect_peer(&envelope.body)
1723                    .map_err(|e| RoutingError::new(format!("decode DisconnectPeer: {e}")))?;
1724
1725                refs.node_directory
1726                    .cast(crate::system_actors::NodeDirectoryMsg::DisconnectPeer { peer_id })
1727                    .map_err(|e| RoutingError::new(format!("NodeDirectory mailbox: {e}")))?;
1728
1729                Ok(RoutingOutcome::Acknowledged)
1730            }
1731
1732            // validate_system_message_type() already rejected unknown types above,
1733            // so this branch only triggers if a new constant is added without a
1734            // handler — a compile-time-detectable oversight.
1735            other => Err(RoutingError::new(format!(
1736                "unhandled system message type: {other}"
1737            ))),
1738        }
1739    }
1740}
1741