Skip to main content

dactor_ractor/
runtime.rs

1//! V0.2 ractor adapter runtime for the dactor actor framework.
2//!
3//! Bridges dactor's `Actor`/`Handler<M>`/`ActorRef<A>` API with ractor's
4//! single-message-type `ractor::Actor` trait using type-erased dispatch.
5
6use std::collections::HashMap;
7use std::marker::PhantomData;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Arc, Mutex};
10use std::time::Duration;
11
12use futures::FutureExt;
13use tokio_util::sync::CancellationToken;
14
15use dactor::actor::{
16    Actor, ActorContext, ActorError, ActorRef, AskReply, ReduceHandler, Handler, ExpandHandler,
17    TransformHandler,
18};
19use dactor::dead_letter::{DeadLetterEvent, DeadLetterHandler, DeadLetterReason};
20use dactor::dispatch::{AskDispatch, Dispatch, ReduceDispatch, ExpandDispatch, TransformDispatch, TypedDispatch};
21use dactor::errors::{ActorSendError, ErrorAction, RuntimeError};
22use dactor::interceptor::{
23    Disposition, DropObserver, InboundContext, InboundInterceptor, OutboundInterceptor, Outcome,
24    SendMode,
25};
26use dactor::mailbox::MailboxConfig;
27use dactor::message::{Headers, Message, RuntimeHeaders};
28use dactor::node::{ActorId, NodeId};
29use dactor::runtime_support::{
30    spawn_reduce_batched_drain, spawn_reduce_drain, spawn_transform_drain,
31    wrap_batched_stream_with_interception, wrap_stream_with_interception, OutboundPipeline,
32};
33use dactor::stream::{
34    BatchConfig, BatchReader, BatchWriter, BoxStream, StreamReceiver, StreamSender,
35};
36use dactor::supervision::ChildTerminated;
37use dactor::system_actors::{
38    CancelManager, CancelResponse, NodeDirectory, PeerStatus, SpawnManager, SpawnRequest,
39    SpawnResponse, WatchManager, WatchNotification,
40};
41use dactor::type_registry::TypeRegistry;
42
43use crate::cluster::RactorClusterEvents;
44
45// ---------------------------------------------------------------------------
46// Watch registry
47// ---------------------------------------------------------------------------
48
49/// A type-erased entry in the watch registry.
50struct WatchEntry {
51    watcher_id: ActorId,
52    /// Closure that delivers a [`ChildTerminated`] to the watcher actor.
53    notify: Box<dyn Fn(ChildTerminated) + Send + Sync>,
54}
55
56/// Shared watch registry mapping watched actor ID → list of watcher entries.
57type WatcherMap = Arc<Mutex<HashMap<ActorId, Vec<WatchEntry>>>>;
58
59// ---------------------------------------------------------------------------
60// Ractor wrapper actor
61// ---------------------------------------------------------------------------
62
63/// The ractor message type wrapping a type-erased dactor dispatch envelope.
64struct DactorMsg<A: Actor>(Box<dyn Dispatch<A>>);
65
66/// The ractor `Actor` implementation that bridges to a dactor `Actor`.
67struct RactorDactorActor<A: Actor> {
68    _phantom: PhantomData<fn() -> A>,
69}
70
71/// State held by the ractor actor, containing the dactor `Actor` instance.
72struct RactorActorState<A: Actor> {
73    actor: A,
74    ctx: ActorContext,
75    interceptors: Vec<Box<dyn InboundInterceptor>>,
76    watchers: WatcherMap,
77    stop_reason: Option<String>,
78    dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
79    /// Notified when the actor stops (for await_stop).
80    stop_notifier: Option<tokio::sync::oneshot::Sender<Result<(), String>>>,
81}
82
83/// Arguments passed to the ractor actor at spawn time.
84struct RactorSpawnArgs<A: Actor> {
85    args: A::Args,
86    deps: A::Deps,
87    actor_id: ActorId,
88    actor_name: String,
89    interceptors: Vec<Box<dyn InboundInterceptor>>,
90    watchers: WatcherMap,
91    dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
92    stop_notifier: Option<tokio::sync::oneshot::Sender<Result<(), String>>>,
93}
94
95impl<A: Actor + 'static> ractor::Actor for RactorDactorActor<A> {
96    type Msg = DactorMsg<A>;
97    type State = RactorActorState<A>;
98    type Arguments = RactorSpawnArgs<A>;
99
100    async fn pre_start(
101        &self,
102        _myself: ractor::ActorRef<Self::Msg>,
103        args: Self::Arguments,
104    ) -> Result<Self::State, ractor::ActorProcessingErr> {
105        let mut actor = A::create(args.args, args.deps);
106        let mut ctx = ActorContext::new(args.actor_id, args.actor_name);
107        actor.on_start(&mut ctx).await;
108        Ok(RactorActorState {
109            actor,
110            ctx,
111            interceptors: args.interceptors,
112            watchers: args.watchers,
113            stop_reason: None,
114            dead_letter_handler: args.dead_letter_handler,
115            stop_notifier: args.stop_notifier,
116        })
117    }
118
119    async fn handle(
120        &self,
121        myself: ractor::ActorRef<Self::Msg>,
122        message: Self::Msg,
123        state: &mut Self::State,
124    ) -> Result<(), ractor::ActorProcessingErr> {
125        let dispatch = message.0;
126
127        // Capture metadata before dispatch consumes the message
128        let send_mode = dispatch.send_mode();
129        let message_type = dispatch.message_type_name();
130
131        state.ctx.send_mode = Some(send_mode);
132        state.ctx.headers = Headers::new();
133
134        // Run inbound interceptor pipeline
135        let runtime_headers = RuntimeHeaders::new();
136        let mut headers = Headers::new();
137        let mut total_delay = Duration::ZERO;
138        let mut rejection: Option<(String, Disposition)> = None;
139
140        {
141            let ictx = InboundContext {
142                actor_id: state.ctx.actor_id.clone(),
143                actor_name: &state.ctx.actor_name,
144                message_type,
145                send_mode,
146                remote: false,
147                origin_node: None,
148            };
149
150            for interceptor in &state.interceptors {
151                match interceptor.on_receive(
152                    &ictx,
153                    &runtime_headers,
154                    &mut headers,
155                    dispatch.message_any(),
156                ) {
157                    Disposition::Continue => {}
158                    Disposition::Delay(d) => {
159                        total_delay += d;
160                    }
161                    disp @ (Disposition::Drop | Disposition::Reject(_) | Disposition::Retry(_)) => {
162                        rejection = Some((interceptor.name().to_string(), disp));
163                        break;
164                    }
165                }
166            }
167        }
168
169        // If rejected/dropped/retry, propagate proper error to caller
170        if let Some((interceptor_name, disposition)) = rejection {
171            if matches!(disposition, Disposition::Drop) {
172                if let Some(ref handler) = *state.dead_letter_handler {
173                    let event = DeadLetterEvent {
174                        target_id: state.ctx.actor_id.clone(),
175                        target_name: Some(state.ctx.actor_name.clone()),
176                        message_type,
177                        send_mode,
178                        reason: DeadLetterReason::DroppedByInterceptor {
179                            interceptor: interceptor_name.clone(),
180                        },
181                        message: None,
182                    };
183                    let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
184                        handler.on_dead_letter(event);
185                    }));
186                }
187            }
188            dispatch.reject(disposition, &interceptor_name);
189            return Ok(());
190        }
191
192        if !total_delay.is_zero() {
193            tokio::time::sleep(total_delay).await;
194        }
195
196        // Copy interceptor-populated headers to ActorContext so handler can access them
197        state.ctx.headers = headers;
198
199        // Propagate cancellation token to context
200        let cancel_token = dispatch.cancel_token();
201        state.ctx.set_cancellation_token(cancel_token.clone());
202
203        // Check if already cancelled before dispatching
204        if let Some(ref token) = cancel_token {
205            if token.is_cancelled() {
206                dispatch.cancel();
207                state.ctx.set_cancellation_token(None);
208                return Ok(());
209            }
210        }
211
212        // Dispatch with panic catching and cancellation racing
213        let result = if let Some(ref token) = cancel_token {
214            let dispatch_fut =
215                std::panic::AssertUnwindSafe(dispatch.dispatch(&mut state.actor, &mut state.ctx))
216                    .catch_unwind();
217            tokio::select! {
218                biased;
219                r = dispatch_fut => r,
220                _ = token.cancelled() => {
221                    // In-flight cancellation: dispatch_fut is dropped, which drops
222                    // reply_tx inside it. Caller's AskReply sees channel closed.
223                    // Pre-dispatch cancellation (above) sends RuntimeError::Cancelled.
224                    state.ctx.set_cancellation_token(None);
225                    return Ok(());
226                }
227            }
228        } else {
229            std::panic::AssertUnwindSafe(dispatch.dispatch(&mut state.actor, &mut state.ctx))
230                .catch_unwind()
231                .await
232        };
233
234        state.ctx.set_cancellation_token(None);
235
236        // Build context for on_complete
237        let ictx = InboundContext {
238            actor_id: state.ctx.actor_id.clone(),
239            actor_name: &state.ctx.actor_name,
240            message_type,
241            send_mode,
242            remote: false,
243            origin_node: None,
244        };
245
246        match result {
247            Ok(dispatch_result) => {
248                let outcome = match (&dispatch_result.reply, send_mode) {
249                    (Some(reply), SendMode::Ask) => Outcome::AskSuccess {
250                        reply: reply.as_ref(),
251                    },
252                    _ => Outcome::TellSuccess,
253                };
254
255                for interceptor in &state.interceptors {
256                    interceptor.on_complete(&ictx, &runtime_headers, &state.ctx.headers, &outcome);
257                }
258
259                // Send reply to caller AFTER interceptors have seen it
260                dispatch_result.send_reply();
261            }
262            Err(_panic) => {
263                let error = ActorError::internal("handler panicked");
264                let action = state.actor.on_error(&error);
265
266                let outcome = Outcome::HandlerError { error };
267                for interceptor in &state.interceptors {
268                    interceptor.on_complete(&ictx, &runtime_headers, &state.ctx.headers, &outcome);
269                }
270
271                match action {
272                    ErrorAction::Resume => {
273                        // Actor resumes — do NOT set stop_reason
274                    }
275                    ErrorAction::Stop | ErrorAction::Escalate => {
276                        state.stop_reason = Some("handler panicked".into());
277                        myself.stop(None);
278                    }
279                    ErrorAction::Restart => {
280                        tracing::warn!("Restart not fully implemented, treating as Resume");
281                    }
282                }
283            }
284        }
285
286        Ok(())
287    }
288
289    async fn post_stop(
290        &self,
291        _myself: ractor::ActorRef<Self::Msg>,
292        state: &mut Self::State,
293    ) -> Result<(), ractor::ActorProcessingErr> {
294        // Reset context to lifecycle semantics before on_stop
295        state.ctx.send_mode = None;
296        state.ctx.headers = Headers::new();
297        state.ctx.set_cancellation_token(None);
298
299        // Run on_stop with panic catching so panics propagate as errors
300        // through await_stop() instead of aborting the ractor task.
301        let on_stop_panicked =
302            std::panic::AssertUnwindSafe(state.actor.on_stop())
303                .catch_unwind()
304                .await
305                .is_err();
306        if on_stop_panicked && state.stop_reason.is_none() {
307            state.stop_reason = Some("actor panicked in on_stop".to_string());
308        }
309
310        // Notify all watchers that this actor has terminated.
311        let actor_id = state.ctx.actor_id.clone();
312        let actor_name = state.ctx.actor_name.clone();
313        let entries = {
314            let mut watchers = state.watchers.lock().unwrap();
315            // Remove entries where this actor is the TARGET (watchers of this actor)
316            let target_entries = watchers.remove(&actor_id).unwrap_or_default();
317            // Also clean up entries where this actor is the WATCHER (prevent leak)
318            for entries in watchers.values_mut() {
319                entries.retain(|e| e.watcher_id != actor_id);
320            }
321            watchers.retain(|_, v| !v.is_empty());
322            target_entries
323        };
324        if !entries.is_empty() {
325            let notification = ChildTerminated {
326                child_id: actor_id,
327                child_name: actor_name,
328                reason: state.stop_reason.clone(),
329            };
330            for entry in &entries {
331                (entry.notify)(notification.clone());
332            }
333        }
334
335        // Notify await_stop() waiters — track on_stop panic independently
336        // from stop_reason (which may already be set by a handler panic).
337        if let Some(tx) = state.stop_notifier.take() {
338            let result = if on_stop_panicked {
339                Err("actor panicked in on_stop".to_string())
340            } else {
341                Ok(())
342            };
343            let _ = tx.send(result);
344        }
345
346        Ok(())
347    }
348}
349
350// ---------------------------------------------------------------------------
351// RactorActorRef — dactor ActorRef backed by ractor
352// ---------------------------------------------------------------------------
353
354/// A dactor `ActorRef` backed by a ractor `ActorRef`.
355///
356/// Messages are delivered through ractor's mailbox as type-erased dispatch
357/// envelopes, enabling multiple `Handler<M>` impls per actor.
358///
359/// When configured with [`MailboxConfig::Bounded`], a bounded `mpsc` channel
360/// sits in front of the ractor actor, providing backpressure control at the
361/// dactor level while ractor's internal mailbox remains unbounded.
362pub struct RactorActorRef<A: Actor> {
363    id: ActorId,
364    name: String,
365    inner: ractor::ActorRef<DactorMsg<A>>,
366    bounded_tx: Option<BoundedMailboxSender<DactorMsg<A>>>,
367    outbound_interceptors: Arc<Vec<Box<dyn OutboundInterceptor>>>,
368    drop_observer: Option<Arc<dyn DropObserver>>,
369    dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
370}
371
372use dactor::runtime_support::BoundedMailboxSender;
373
374impl<A: Actor> Clone for RactorActorRef<A> {
375    fn clone(&self) -> Self {
376        Self {
377            id: self.id.clone(),
378            name: self.name.clone(),
379            inner: self.inner.clone(),
380            bounded_tx: self.bounded_tx.clone(),
381            outbound_interceptors: self.outbound_interceptors.clone(),
382            drop_observer: self.drop_observer.clone(),
383            dead_letter_handler: self.dead_letter_handler.clone(),
384        }
385    }
386}
387
388impl<A: Actor> std::fmt::Debug for RactorActorRef<A> {
389    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
390        write!(f, "RactorActorRef({}, {:?})", self.name, self.id)
391    }
392}
393
394impl<A: Actor + 'static> RactorActorRef<A> {
395    fn outbound_pipeline(&self) -> OutboundPipeline {
396        OutboundPipeline {
397            interceptors: self.outbound_interceptors.clone(),
398            drop_observer: self.drop_observer.clone(),
399            target_id: self.id.clone(),
400            target_name: self.name.clone(),
401        }
402    }
403
404    fn notify_dead_letter(
405        &self,
406        message_type: &'static str,
407        send_mode: SendMode,
408        reason: DeadLetterReason,
409    ) {
410        if let Some(ref handler) = *self.dead_letter_handler {
411            let event = DeadLetterEvent {
412                target_id: self.id.clone(),
413                target_name: Some(self.name.clone()),
414                message_type,
415                send_mode,
416                reason,
417                message: None,
418            };
419            let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
420                handler.on_dead_letter(event);
421            }));
422        }
423    }
424
425    /// Send a dispatch envelope through the bounded channel (if configured)
426    /// or directly to the ractor actor.
427    fn send_dispatch(&self, dispatch: Box<dyn Dispatch<A>>) -> Result<(), ActorSendError> {
428        if let Some(ref btx) = self.bounded_tx {
429            btx.try_send(DactorMsg(dispatch))
430        } else {
431            self.inner
432                .cast(DactorMsg(dispatch))
433                .map_err(|e| ActorSendError(e.to_string()))
434        }
435    }
436}
437
438impl<A: Actor + 'static> ActorRef<A> for RactorActorRef<A> {
439    fn id(&self) -> ActorId {
440        self.id.clone()
441    }
442
443    fn name(&self) -> String {
444        self.name.clone()
445    }
446
447    fn is_alive(&self) -> bool {
448        let inner_alive = matches!(
449            self.inner.get_status(),
450            ractor::ActorStatus::Running
451                | ractor::ActorStatus::Starting
452                | ractor::ActorStatus::Upgrading
453        );
454        if let Some(ref btx) = self.bounded_tx {
455            !btx.is_closed() && inner_alive
456        } else {
457            inner_alive
458        }
459    }
460
461    fn pending_messages(&self) -> usize {
462        if let Some(ref btx) = self.bounded_tx {
463            btx.pending()
464        } else {
465            0
466        }
467    }
468
469    fn stop(&self) {
470        self.inner.stop(None);
471    }
472
473    fn tell<M>(&self, msg: M) -> Result<(), ActorSendError>
474    where
475        A: Handler<M>,
476        M: Message<Reply = ()>,
477    {
478        let pipeline = self.outbound_pipeline();
479        let result = pipeline.run_on_send(SendMode::Tell, &msg);
480        match result.disposition {
481            Disposition::Continue => {}
482            Disposition::Delay(_) => {} // Not supported in sync tell
483            Disposition::Drop | Disposition::Reject(_) | Disposition::Retry(_) => return Ok(()),
484        }
485
486        let dispatch: Box<dyn Dispatch<A>> = Box::new(TypedDispatch { msg });
487        self.send_dispatch(dispatch).map_err(|e| {
488            let reason = if e.0.contains("full") {
489                DeadLetterReason::MailboxFull
490            } else {
491                DeadLetterReason::ActorStopped
492            };
493            self.notify_dead_letter(std::any::type_name::<M>(), SendMode::Tell, reason);
494            e
495        })
496    }
497
498    fn ask<M>(
499        &self,
500        msg: M,
501        cancel: Option<CancellationToken>,
502    ) -> Result<AskReply<M::Reply>, ActorSendError>
503    where
504        A: Handler<M>,
505        M: Message,
506    {
507        let pipeline = self.outbound_pipeline();
508        let result = pipeline.run_on_send(SendMode::Ask, &msg);
509        match result.disposition {
510            Disposition::Continue => {}
511            Disposition::Delay(_) => {} // Not supported in sync context
512            Disposition::Drop => {
513                let (tx, rx) = tokio::sync::oneshot::channel();
514                let _ = tx.send(Err(RuntimeError::ActorNotFound(
515                    "message dropped by outbound interceptor".into(),
516                )));
517                return Ok(AskReply::new(rx));
518            }
519            Disposition::Reject(reason) => {
520                let (tx, rx) = tokio::sync::oneshot::channel();
521                let _ = tx.send(Err(RuntimeError::Rejected {
522                    interceptor: result.interceptor_name.to_string(),
523                    reason,
524                }));
525                return Ok(AskReply::new(rx));
526            }
527            Disposition::Retry(retry_after) => {
528                let (tx, rx) = tokio::sync::oneshot::channel();
529                let _ = tx.send(Err(RuntimeError::RetryAfter {
530                    interceptor: result.interceptor_name.to_string(),
531                    retry_after,
532                }));
533                return Ok(AskReply::new(rx));
534            }
535        }
536
537        let (tx, rx) = tokio::sync::oneshot::channel();
538        let dispatch: Box<dyn Dispatch<A>> = Box::new(AskDispatch {
539            msg,
540            reply_tx: tx,
541            cancel,
542        });
543        self.send_dispatch(dispatch).map_err(|e| {
544            let reason = if e.0.contains("full") {
545                DeadLetterReason::MailboxFull
546            } else {
547                DeadLetterReason::ActorStopped
548            };
549            self.notify_dead_letter(std::any::type_name::<M>(), SendMode::Ask, reason);
550            e
551        })?;
552        Ok(AskReply::new(rx))
553    }
554
555    fn expand<M, OutputItem>(
556        &self,
557        msg: M,
558        buffer: usize,
559        batch_config: Option<BatchConfig>,
560        cancel: Option<CancellationToken>,
561    ) -> Result<BoxStream<OutputItem>, ActorSendError>
562    where
563        A: ExpandHandler<M, OutputItem>,
564        M: Send + 'static,
565        OutputItem: Send + 'static,
566    {
567        let pipeline = self.outbound_pipeline();
568        let result = pipeline.run_on_send(SendMode::Expand, &msg);
569        match result.disposition {
570            Disposition::Continue => {}
571            Disposition::Delay(_) => {}
572            Disposition::Drop => {
573                return Err(ActorSendError(
574                    "stream dropped by outbound interceptor".into(),
575                ));
576            }
577            Disposition::Reject(reason) => {
578                return Err(ActorSendError(format!("stream rejected: {}", reason)));
579            }
580            Disposition::Retry(_) => {
581                return Err(ActorSendError(
582                    "stream retry requested by interceptor".into(),
583                ));
584            }
585        }
586
587        let buffer = buffer.max(1);
588        let (tx, mut rx) = tokio::sync::mpsc::channel(buffer);
589        let sender = StreamSender::new(tx);
590        let dispatch: Box<dyn Dispatch<A>> = Box::new(ExpandDispatch {
591            msg,
592            sender,
593            cancel,
594        });
595        self.send_dispatch(dispatch)?;
596
597        match batch_config {
598            Some(batch_config) => {
599                let (batch_tx, batch_rx) = tokio::sync::mpsc::channel::<Vec<OutputItem>>(buffer);
600                let reader = BatchReader::new(batch_rx);
601                let batch_delay = batch_config.max_delay;
602                tokio::spawn(async move {
603                    let mut writer = BatchWriter::new(batch_tx, batch_config);
604                    loop {
605                        if writer.buffered_count() > 0 {
606                            let deadline = tokio::time::Instant::now() + batch_delay;
607                            tokio::select! {
608                                biased;
609                                item = rx.recv() => match item {
610                                    Some(item) => {
611                                        if writer.push(item).await.is_err() { break; }
612                                    }
613                                    None => break,
614                                },
615                                _ = tokio::time::sleep_until(deadline) => {
616                                    if writer.check_deadline().await.is_err() { break; }
617                                }
618                            }
619                        } else {
620                            match rx.recv().await {
621                                Some(item) => {
622                                    if writer.push(item).await.is_err() {
623                                        break;
624                                    }
625                                }
626                                None => break,
627                            }
628                        }
629                    }
630                    let _ = writer.flush().await;
631                });
632                Ok(wrap_batched_stream_with_interception(
633                    reader,
634                    buffer,
635                    pipeline,
636                    std::any::type_name::<M>(),
637                    SendMode::Expand,
638                ))
639            }
640            None => Ok(wrap_stream_with_interception(
641                    rx,
642                    buffer,
643                    pipeline,
644                    std::any::type_name::<M>(),
645                    SendMode::Expand,
646                )),
647        }
648    }
649
650    fn reduce<InputItem, Reply>(
651        &self,
652        input: BoxStream<InputItem>,
653        buffer: usize,
654        batch_config: Option<BatchConfig>,
655        cancel: Option<CancellationToken>,
656    ) -> Result<AskReply<Reply>, ActorSendError>
657    where
658        A: ReduceHandler<InputItem, Reply>,
659        InputItem: Send + 'static,
660        Reply: Send + 'static,
661    {
662        let buffer = buffer.max(1);
663        let (item_tx, item_rx) = tokio::sync::mpsc::channel(buffer);
664        let receiver = StreamReceiver::new(item_rx);
665        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
666        let dispatch: Box<dyn Dispatch<A>> = Box::new(ReduceDispatch {
667            receiver,
668            reply_tx,
669            cancel: cancel.clone(),
670        });
671        self.send_dispatch(dispatch)?;
672
673        let pipeline = self.outbound_pipeline();
674        match batch_config {
675            Some(batch_config) => {
676                spawn_reduce_batched_drain(
677                    input,
678                    item_tx,
679                    buffer,
680                    batch_config,
681                    cancel,
682                    pipeline,
683                    std::any::type_name::<InputItem>(),
684                );
685            }
686            None => {
687                spawn_reduce_drain(
688                    input,
689                    item_tx,
690                    cancel,
691                    pipeline,
692                    std::any::type_name::<InputItem>(),
693                );
694            }
695        }
696
697        Ok(AskReply::new(reply_rx))
698    }
699
700    fn transform<InputItem, OutputItem>(
701        &self,
702        input: BoxStream<InputItem>,
703        buffer: usize,
704        batch_config: Option<BatchConfig>,
705        cancel: Option<CancellationToken>,
706    ) -> Result<BoxStream<OutputItem>, ActorSendError>
707    where
708        A: TransformHandler<InputItem, OutputItem>,
709        InputItem: Send + 'static,
710        OutputItem: Send + 'static,
711    {
712        let buffer = buffer.max(1);
713        let (item_tx, item_rx) = tokio::sync::mpsc::channel(buffer);
714        let (output_tx, mut output_rx) = tokio::sync::mpsc::channel(buffer);
715        let receiver = StreamReceiver::new(item_rx);
716        let sender = StreamSender::new(output_tx);
717        let dispatch: Box<dyn Dispatch<A>> = Box::new(TransformDispatch::new(
718            receiver,
719            sender,
720            cancel.clone(),
721        ));
722        self.send_dispatch(dispatch)?;
723
724        let pipeline = self.outbound_pipeline();
725        spawn_transform_drain(
726            input,
727            item_tx,
728            cancel,
729            pipeline.clone(),
730            std::any::type_name::<InputItem>(),
731        );
732
733        match batch_config {
734            Some(batch_config) => {
735                let (batch_tx, batch_rx) =
736                    tokio::sync::mpsc::channel::<Vec<OutputItem>>(buffer);
737                let reader = BatchReader::new(batch_rx);
738                let batch_delay = batch_config.max_delay;
739                tokio::spawn(async move {
740                    let mut writer = BatchWriter::new(batch_tx, batch_config);
741                    loop {
742                        if writer.buffered_count() > 0 {
743                            let deadline = tokio::time::Instant::now() + batch_delay;
744                            tokio::select! {
745                                biased;
746                                item = output_rx.recv() => match item {
747                                    Some(item) => {
748                                        if writer.push(item).await.is_err() { break; }
749                                    }
750                                    None => break,
751                                },
752                                _ = tokio::time::sleep_until(deadline) => {
753                                    if writer.check_deadline().await.is_err() { break; }
754                                }
755                            }
756                        } else {
757                            match output_rx.recv().await {
758                                Some(item) => {
759                                    if writer.push(item).await.is_err() {
760                                        break;
761                                    }
762                                }
763                                None => break,
764                            }
765                        }
766                    }
767                    let _ = writer.flush().await;
768                });
769                Ok(wrap_batched_stream_with_interception(
770                    reader,
771                    buffer,
772                    pipeline,
773                    std::any::type_name::<OutputItem>(),
774                    SendMode::Transform,
775                ))
776            }
777            None => Ok(wrap_stream_with_interception(
778                output_rx,
779                buffer,
780                pipeline,
781                std::any::type_name::<OutputItem>(),
782                SendMode::Transform,
783            )),
784        }
785    }
786}
787
788// ---------------------------------------------------------------------------
789// SpawnOptions
790// ---------------------------------------------------------------------------
791
792/// Options for spawning an actor, including the inbound interceptor pipeline.
793/// Options for spawning an actor. Use `..Default::default()` to future-proof
794/// against new fields.
795pub struct SpawnOptions {
796    /// Inbound interceptors to attach to the actor.
797    pub interceptors: Vec<Box<dyn InboundInterceptor>>,
798    /// Mailbox capacity configuration.
799    ///
800    /// Mailbox capacity configuration.
801    ///
802    /// When [`Bounded`](MailboxConfig::Bounded), a bounded `mpsc` channel is
803    /// placed in front of the ractor actor to enforce backpressure.
804    pub mailbox: MailboxConfig,
805}
806
807impl Default for SpawnOptions {
808    fn default() -> Self {
809        Self {
810            interceptors: Vec::new(),
811            mailbox: MailboxConfig::Unbounded,
812        }
813    }
814}
815
816// ---------------------------------------------------------------------------
817// RactorRuntime
818// ---------------------------------------------------------------------------
819
820/// A dactor v0.2 runtime backed by ractor.
821///
822/// Actors are spawned as real ractor actors via [`ractor::Actor::spawn`].
823/// Messages are delivered through ractor's mailbox as type-erased dispatch
824/// envelopes, supporting multiple `Handler<M>` impls per actor.
825///
826/// ## System Actors
827///
828/// The runtime spawns native ractor system actors on creation:
829/// - [`SpawnManagerActor`](crate::system_actors::SpawnManagerActor) — handles remote spawn requests
830/// - [`WatchManagerActor`](crate::system_actors::WatchManagerActor) — handles remote watch/unwatch
831/// - [`CancelManagerActor`](crate::system_actors::CancelManagerActor) — handles remote cancellation
832/// - [`NodeDirectoryActor`](crate::system_actors::NodeDirectoryActor) — tracks peer connections
833///
834/// System actors are accessible via `system_actor_refs()` for transport routing.
835pub struct RactorRuntime {
836    node_id: NodeId,
837    next_local: Arc<AtomicU64>,
838    cluster_events: RactorClusterEvents,
839    outbound_interceptors: Arc<Vec<Box<dyn OutboundInterceptor>>>,
840    drop_observer: Option<Arc<dyn DropObserver>>,
841    dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
842    watchers: WatcherMap,
843    /// Plain struct system actors (backward-compatible sync API).
844    spawn_manager: SpawnManager,
845    watch_manager: WatchManager,
846    cancel_manager: CancelManager,
847    node_directory: NodeDirectory,
848    /// Native ractor system actor refs (lazily started via `start_system_actors()`).
849    system_actors: Option<RactorSystemActorRefs>,
850    /// Stop notification receivers for await_stop(), keyed by ActorId.
851    #[allow(clippy::type_complexity)]
852    stop_receivers: Arc<Mutex<HashMap<ActorId, tokio::sync::oneshot::Receiver<Result<(), String>>>>>,
853    /// Application version for this node (informational, used in handshake).
854    app_version: Option<String>,
855}
856
857/// References to the native ractor system actors spawned by the runtime.
858pub struct RactorSystemActorRefs {
859    pub spawn_manager: SystemActorHandle<crate::system_actors::SpawnManagerMsg>,
860    pub watch_manager: ractor::ActorRef<crate::system_actors::WatchManagerMsg>,
861    pub cancel_manager: ractor::ActorRef<crate::system_actors::CancelManagerMsg>,
862    pub node_directory: ractor::ActorRef<crate::system_actors::NodeDirectoryMsg>,
863}
864
865/// Handle to a system actor that may be a single actor or a pool.
866pub enum SystemActorHandle<M: ractor::Message> {
867    /// Single actor instance.
868    Single(ractor::ActorRef<M>),
869    /// Pool of actors with round-robin routing.
870    Pool(SystemActorPool<M>),
871}
872
873impl<M: ractor::Message> SystemActorHandle<M> {
874    /// Send a message to the system actor (or a pool member via round-robin).
875    pub fn cast(&self, msg: M) -> Result<(), ractor::MessagingErr<M>> {
876        match self {
877            SystemActorHandle::Single(r) => r.cast(msg),
878            SystemActorHandle::Pool(pool) => pool.cast(msg),
879        }
880    }
881}
882
883/// Round-robin pool of ractor actors.
884pub struct SystemActorPool<M: ractor::Message> {
885    workers: Vec<ractor::ActorRef<M>>,
886    counter: std::sync::atomic::AtomicU64,
887}
888
889impl<M: ractor::Message> SystemActorPool<M> {
890    /// Create a new pool from a vec of actor refs.
891    pub fn new(workers: Vec<ractor::ActorRef<M>>) -> Self {
892        assert!(!workers.is_empty(), "pool must have at least one worker");
893        Self {
894            workers,
895            counter: std::sync::atomic::AtomicU64::new(0),
896        }
897    }
898
899    /// Send a message to the next worker via round-robin.
900    pub fn cast(&self, msg: M) -> Result<(), ractor::MessagingErr<M>> {
901        let idx = self.counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
902            as usize % self.workers.len();
903        self.workers[idx].cast(msg)
904    }
905
906    /// Number of workers in the pool.
907    pub fn len(&self) -> usize {
908        self.workers.len()
909    }
910
911    /// Access all worker refs (for broadcasting to all workers).
912    pub fn workers(&self) -> &[ractor::ActorRef<M>] {
913        &self.workers
914    }
915}
916
917impl RactorRuntime {
918    /// Create a new `RactorRuntime`.
919    ///
920    /// System actors are not spawned until `start_system_actors()` is called.
921    /// This allows the runtime to be constructed outside a tokio context.
922    pub fn new() -> Self {
923        Self::create(NodeId("ractor-node".into()))
924    }
925
926    /// Create a new `RactorRuntime` with a specific node ID.
927    pub fn with_node_id(node_id: NodeId) -> Self {
928        Self::create(node_id)
929    }
930
931    fn create(node_id: NodeId) -> Self {
932        Self {
933            node_id,
934            next_local: Arc::new(AtomicU64::new(1)),
935            cluster_events: RactorClusterEvents::new(),
936            outbound_interceptors: Arc::new(Vec::new()),
937            drop_observer: None,
938            dead_letter_handler: Arc::new(None),
939            watchers: Arc::new(Mutex::new(HashMap::new())),
940            spawn_manager: SpawnManager::new(TypeRegistry::new()),
941            watch_manager: WatchManager::new(),
942            cancel_manager: CancelManager::new(),
943            node_directory: NodeDirectory::new(),
944            system_actors: None,
945            stop_receivers: Arc::new(Mutex::new(HashMap::new())),
946            app_version: None,
947        }
948    }
949
950    /// The adapter name for this runtime, used in version handshakes.
951    pub const ADAPTER_NAME: &'static str = "ractor";
952
953    /// Set the application version for this node.
954    ///
955    /// This is your application's release version (e.g., "2.3.1"), not the
956    /// dactor framework version. It is included in handshake requests for
957    /// operational visibility during rolling upgrades.
958    pub fn with_app_version(mut self, version: impl Into<String>) -> Self {
959        self.app_version = Some(version.into());
960        self
961    }
962
963    /// Returns the configured application version, if any.
964    pub fn app_version(&self) -> Option<&str> {
965        self.app_version.as_deref()
966    }
967
968    /// Build a [`HandshakeRequest`](dactor::HandshakeRequest) from this
969    /// runtime's current configuration.
970    pub fn handshake_request(&self) -> dactor::HandshakeRequest {
971        dactor::HandshakeRequest::from_runtime(
972            self.node_id.clone(),
973            self.app_version.clone(),
974            Self::ADAPTER_NAME,
975        )
976    }
977
978    /// Spawn native ractor system actors for transport routing.
979    ///
980    /// Must be called from within a tokio runtime context. After this call,
981    /// `system_actor_refs()` returns the native actor references.
982    ///
983    /// Factory registrations made via `register_factory()` before this call
984    /// are forwarded to the native SpawnManagerActor.
985    ///
986    /// Uses default configuration (unbounded mailboxes, no pooling).
987    /// For custom configuration, use [`start_system_actors_with_config()`].
988    pub async fn start_system_actors(&mut self) {
989        self.start_system_actors_with_config(dactor::SystemActorConfig::default()).await;
990    }
991
992    /// Spawn native ractor system actors with custom configuration.
993    ///
994    /// Allows configuring mailbox capacity and SpawnManager pooling for
995    /// high-throughput scenarios. See [`SystemActorConfig`](dactor::SystemActorConfig)
996    /// for details.
997    ///
998    /// # Example
999    ///
1000    /// ```rust,ignore
1001    /// use dactor::system_actors::SystemActorConfig;
1002    /// use dactor::mailbox::{MailboxConfig, OverflowStrategy};
1003    ///
1004    /// let config = SystemActorConfig::default()
1005    ///     .with_spawn_manager_mailbox(
1006    ///         MailboxConfig::bounded(10_000, OverflowStrategy::Block)
1007    ///     )
1008    ///     .with_spawn_manager_pool_size(4)
1009    ///     .with_control_plane_mailbox(
1010    ///         MailboxConfig::bounded(5_000, OverflowStrategy::Block)
1011    ///     );
1012    ///
1013    /// runtime.start_system_actors_with_config(config).await;
1014    /// ```
1015    pub async fn start_system_actors_with_config(
1016        &mut self,
1017        config: dactor::SystemActorConfig,
1018    ) {
1019        use crate::system_actors::*;
1020
1021        // --- SpawnManager (optionally pooled) ---
1022        let pool_size = config.spawn_manager_pool_size.unwrap_or(1).max(1);
1023        let mut spawn_refs = Vec::with_capacity(pool_size);
1024
1025        for _ in 0..pool_size {
1026            let (spawn_ref, _) = ractor::Actor::spawn(
1027                None, SpawnManagerActor,
1028                (self.node_id.clone(), TypeRegistry::new(), self.next_local.clone()),
1029            ).await.expect("failed to spawn SpawnManagerActor");
1030            spawn_refs.push(spawn_ref);
1031        }
1032
1033        let spawn_manager = if spawn_refs.len() == 1 {
1034            SystemActorHandle::Single(spawn_refs.pop().unwrap())
1035        } else {
1036            SystemActorHandle::Pool(SystemActorPool::new(spawn_refs))
1037        };
1038
1039        // --- Control-plane actors (never pooled) ---
1040        let (watch_ref, _) = ractor::Actor::spawn(
1041            None, WatchManagerActor, (),
1042        ).await.expect("failed to spawn WatchManagerActor");
1043
1044        let (cancel_ref, _) = ractor::Actor::spawn(
1045            None, CancelManagerActor, (),
1046        ).await.expect("failed to spawn CancelManagerActor");
1047
1048        let (node_dir_ref, _) = ractor::Actor::spawn(
1049            None, NodeDirectoryActor, (),
1050        ).await.expect("failed to spawn NodeDirectoryActor");
1051
1052        self.system_actors = Some(RactorSystemActorRefs {
1053            spawn_manager,
1054            watch_manager: watch_ref,
1055            cancel_manager: cancel_ref,
1056            node_directory: node_dir_ref,
1057        });
1058    }
1059
1060    /// Returns the node ID of this runtime.
1061    pub fn node_id(&self) -> &NodeId {
1062        &self.node_id
1063    }
1064
1065    /// Access the native system actor references for transport routing.
1066    ///
1067    /// Returns `None` if `start_system_actors()` has not been called yet.
1068    pub fn system_actor_refs(&self) -> Option<&RactorSystemActorRefs> {
1069        self.system_actors.as_ref()
1070    }
1071
1072    /// Add a global outbound interceptor.
1073    ///
1074    /// **Must be called before any actors are spawned.** Panics if actors
1075    /// already hold references to the interceptor list (i.e., after `spawn()`).
1076    pub fn add_outbound_interceptor(&mut self, interceptor: Box<dyn OutboundInterceptor>) {
1077        Arc::get_mut(&mut self.outbound_interceptors)
1078            .expect("cannot add interceptors after actors are spawned")
1079            .push(interceptor);
1080    }
1081
1082    /// Set a global drop observer that is notified whenever an outbound
1083    /// interceptor returns `Disposition::Drop`.
1084    pub fn set_drop_observer(&mut self, observer: Arc<dyn DropObserver>) {
1085        self.drop_observer = Some(observer);
1086    }
1087
1088    /// Set a global dead letter handler. Called whenever a message cannot be
1089    /// delivered (actor stopped, mailbox full, dropped by inbound interceptor).
1090    pub fn set_dead_letter_handler(&mut self, handler: Arc<dyn DeadLetterHandler>) {
1091        self.dead_letter_handler = Arc::new(Some(handler));
1092    }
1093
1094    /// Access the cluster events subsystem.
1095    pub fn cluster_events_handle(&self) -> &RactorClusterEvents {
1096        &self.cluster_events
1097    }
1098
1099    /// Access the cluster events subsystem.
1100    pub fn cluster_events(&self) -> &RactorClusterEvents {
1101        &self.cluster_events
1102    }
1103
1104    /// Spawn an actor with `Deps = ()`.
1105    pub async fn spawn<A>(&self, name: &str, args: A::Args) -> Result<RactorActorRef<A>, dactor::errors::RuntimeError>
1106    where
1107        A: Actor<Deps = ()> + 'static,
1108    {
1109        self.spawn_internal::<A>(name, args, (), Vec::new(), MailboxConfig::Unbounded).await
1110    }
1111
1112    /// Spawn an actor with explicit dependencies.
1113    pub async fn spawn_with_deps<A>(&self, name: &str, args: A::Args, deps: A::Deps) -> Result<RactorActorRef<A>, dactor::errors::RuntimeError>
1114    where
1115        A: Actor + 'static,
1116    {
1117        self.spawn_internal::<A>(name, args, deps, Vec::new(), MailboxConfig::Unbounded).await
1118    }
1119
1120    /// Spawn an actor with spawn options (including inbound interceptors and mailbox config).
1121    pub async fn spawn_with_options<A>(
1122        &self,
1123        name: &str,
1124        args: A::Args,
1125        options: SpawnOptions,
1126    ) -> Result<RactorActorRef<A>, dactor::errors::RuntimeError>
1127    where
1128        A: Actor<Deps = ()> + 'static,
1129    {
1130        self.spawn_internal::<A>(name, args, (), options.interceptors, options.mailbox).await
1131    }
1132
1133    async fn spawn_internal<A>(
1134        &self,
1135        name: &str,
1136        args: A::Args,
1137        deps: A::Deps,
1138        interceptors: Vec<Box<dyn InboundInterceptor>>,
1139        mailbox: MailboxConfig,
1140    ) -> Result<RactorActorRef<A>, dactor::errors::RuntimeError>
1141    where
1142        A: Actor + 'static,
1143    {
1144        let local = self.next_local.fetch_add(1, Ordering::SeqCst);
1145        let actor_id = ActorId {
1146            node: self.node_id.clone(),
1147            local,
1148        };
1149        let actor_name = name.to_string();
1150
1151        let (stop_tx, stop_rx) = tokio::sync::oneshot::channel();
1152
1153        let wrapper = RactorDactorActor::<A> {
1154            _phantom: PhantomData,
1155        };
1156        let spawn_args = RactorSpawnArgs {
1157            args,
1158            deps,
1159            actor_id: actor_id.clone(),
1160            actor_name: actor_name.clone(),
1161            interceptors,
1162            watchers: self.watchers.clone(),
1163            dead_letter_handler: self.dead_letter_handler.clone(),
1164            stop_notifier: Some(stop_tx),
1165        };
1166
1167        let (actor_ref, _join_handle) = ractor::Actor::spawn(Some(name.to_string()), wrapper, spawn_args)
1168            .await
1169            .map_err(|e| dactor::errors::RuntimeError::SpawnFailed(e.to_string()))?;
1170
1171        // Set up optional bounded mailbox channel
1172        let bounded_tx = match mailbox {
1173            MailboxConfig::Bounded { capacity, overflow } => {
1174                let (btx, mut brx) = tokio::sync::mpsc::channel::<DactorMsg<A>>(capacity);
1175                let fwd_ref = actor_ref.clone();
1176                tokio::spawn(async move {
1177                    while let Some(msg) = brx.recv().await {
1178                        if fwd_ref.cast(msg).is_err() {
1179                            break;
1180                        }
1181                    }
1182                });
1183                Some(BoundedMailboxSender::new(btx, overflow))
1184            }
1185            MailboxConfig::Unbounded => None,
1186        };
1187
1188        // Store stop receiver for await_stop()
1189        self.stop_receivers.lock().unwrap().insert(actor_id.clone(), stop_rx);
1190
1191        Ok(RactorActorRef {
1192            id: actor_id,
1193            name: actor_name,
1194            inner: actor_ref,
1195            bounded_tx,
1196            outbound_interceptors: self.outbound_interceptors.clone(),
1197            drop_observer: self.drop_observer.clone(),
1198            dead_letter_handler: self.dead_letter_handler.clone(),
1199        })
1200    }
1201
1202    /// Register actor `watcher` to be notified when `target_id` terminates.
1203    ///
1204    /// The watcher must implement `Handler<ChildTerminated>`. When the target
1205    /// stops, the runtime delivers a [`ChildTerminated`] message to the watcher.
1206    pub fn watch<W>(&self, watcher: &RactorActorRef<W>, target_id: ActorId)
1207    where
1208        W: Actor + Handler<ChildTerminated> + 'static,
1209    {
1210        let watcher_id = watcher.id();
1211        let watcher_inner = watcher.inner.clone();
1212
1213        let entry = WatchEntry {
1214            watcher_id,
1215            notify: Box::new(move |msg: ChildTerminated| {
1216                let dispatch: Box<dyn Dispatch<W>> = Box::new(TypedDispatch { msg });
1217                if watcher_inner.cast(DactorMsg(dispatch)).is_err() {
1218                    tracing::debug!("watch notification dropped — watcher may have stopped");
1219                }
1220            }),
1221        };
1222
1223        let mut watchers = self.watchers.lock().unwrap();
1224        watchers.entry(target_id).or_default().push(entry);
1225    }
1226
1227    /// Unregister `watcher_id` from notifications about `target_id`.
1228    pub fn unwatch(&self, watcher_id: &ActorId, target_id: &ActorId) {
1229        let mut watchers = self.watchers.lock().unwrap();
1230        if let Some(entries) = watchers.get_mut(target_id) {
1231            entries.retain(|e| &e.watcher_id != watcher_id);
1232            if entries.is_empty() {
1233                watchers.remove(target_id);
1234            }
1235        }
1236    }
1237
1238    // -----------------------------------------------------------------------
1239    // SA1: SpawnManager wiring
1240    // -----------------------------------------------------------------------
1241
1242    /// Access the spawn manager.
1243    pub fn spawn_manager(&self) -> &SpawnManager {
1244        &self.spawn_manager
1245    }
1246
1247    /// Access the spawn manager mutably (for registering actor factories).
1248    pub fn spawn_manager_mut(&mut self) -> &mut SpawnManager {
1249        &mut self.spawn_manager
1250    }
1251
1252    /// Register an actor type for remote spawning on this node.
1253    ///
1254    /// Registers the factory in both the struct-based SpawnManager and the
1255    /// native SpawnManagerActor (if started via `start_system_actors()`).
1256    pub fn register_factory(
1257        &mut self,
1258        type_name: impl Into<String>,
1259        factory: impl Fn(&[u8]) -> Result<Box<dyn std::any::Any + Send>, dactor::remote::SerializationError>
1260            + Send
1261            + Sync
1262            + 'static,
1263    ) {
1264        let type_name = type_name.into();
1265        let factory = Arc::new(factory);
1266
1267        // Register in struct-based manager
1268        self.spawn_manager
1269            .type_registry_mut()
1270            .register_factory(type_name.clone(), {
1271                let f = factory.clone();
1272                move |bytes: &[u8]| f(bytes)
1273            });
1274
1275        // Forward to native actor(s) if started
1276        if let Some(ref actors) = self.system_actors {
1277            match &actors.spawn_manager {
1278                SystemActorHandle::Single(r) => {
1279                    let (tx, _rx) = tokio::sync::oneshot::channel();
1280                    let f = factory;
1281                    let _ = r.cast(
1282                        crate::system_actors::SpawnManagerMsg::RegisterFactory {
1283                            type_name,
1284                            factory: Box::new(move |bytes: &[u8]| f(bytes)),
1285                            reply: tx,
1286                        },
1287                    );
1288                }
1289                SystemActorHandle::Pool(pool) => {
1290                    // Broadcast factory registration to all pool workers
1291                    for worker in pool.workers() {
1292                        let f = factory.clone();
1293                        let (tx, _rx) = tokio::sync::oneshot::channel();
1294                        let _ = worker.cast(
1295                            crate::system_actors::SpawnManagerMsg::RegisterFactory {
1296                                type_name: type_name.clone(),
1297                                factory: Box::new(move |bytes: &[u8]| f(bytes)),
1298                                reply: tx,
1299                            },
1300                        );
1301                    }
1302                }
1303            }
1304        }
1305    }
1306
1307    /// Process a remote spawn request.
1308    ///
1309    /// Looks up the actor type in the registry, deserializes Args from bytes,
1310    /// and returns the constructed actor along with its assigned [`ActorId`].
1311    ///
1312    /// The returned `ActorId` is pre-assigned for the remote spawn flow where
1313    /// the runtime controls ID assignment. The caller must use this ID when
1314    /// registering the spawned actor (not via the regular `spawn()` path,
1315    /// which assigns its own IDs).
1316    ///
1317    /// **Note:** Currently uses the simple factory API (`TypeRegistry::create_actor`).
1318    /// For actors with non-trivial `Deps`, use `spawn_manager_mut()` to access
1319    /// `TypeRegistry::create_actor_with_deps()` directly.
1320    ///
1321    /// Returns `Ok((actor_id, actor))` on success, or `Err(SpawnResponse::Failure)`
1322    /// if the type is not found or deserialization fails.
1323    pub fn handle_spawn_request(
1324        &mut self,
1325        request: &SpawnRequest,
1326    ) -> Result<(ActorId, Box<dyn std::any::Any + Send>), SpawnResponse> {
1327        match self.spawn_manager.create_actor(request) {
1328            Ok(actor) => {
1329                let local = self.next_local.fetch_add(1, Ordering::SeqCst);
1330                let actor_id = ActorId {
1331                    node: self.node_id.clone(),
1332                    local,
1333                };
1334                self.spawn_manager.record_spawn(actor_id.clone());
1335                Ok((actor_id, actor))
1336            }
1337            Err(e) => Err(SpawnResponse::Failure {
1338                request_id: request.request_id.clone(),
1339                error: e.to_string(),
1340            }),
1341        }
1342    }
1343
1344    // -----------------------------------------------------------------------
1345    // SA2: WatchManager wiring
1346    // -----------------------------------------------------------------------
1347
1348    /// Access the watch manager (for remote watch subscriptions).
1349    pub fn watch_manager(&self) -> &WatchManager {
1350        &self.watch_manager
1351    }
1352
1353    /// Access the watch manager mutably.
1354    pub fn watch_manager_mut(&mut self) -> &mut WatchManager {
1355        &mut self.watch_manager
1356    }
1357
1358    /// Register a remote watch: a remote watcher wants to know when a local
1359    /// actor terminates.
1360    pub fn remote_watch(&mut self, target: ActorId, watcher: ActorId) {
1361        self.watch_manager.watch(target, watcher);
1362    }
1363
1364    /// Remove a remote watch subscription.
1365    pub fn remote_unwatch(&mut self, target: &ActorId, watcher: &ActorId) {
1366        self.watch_manager.unwatch(target, watcher);
1367    }
1368
1369    /// Called when a local actor terminates. Returns notifications for all
1370    /// remote watchers that should be sent to their respective nodes.
1371    ///
1372    /// **Note:** This must be called explicitly by the integration layer.
1373    /// It is not yet automatically wired into ractor's actor stop lifecycle.
1374    pub fn notify_terminated(&mut self, terminated: &ActorId) -> Vec<WatchNotification> {
1375        self.watch_manager.on_terminated(terminated)
1376    }
1377
1378    // -----------------------------------------------------------------------
1379    // SA3: CancelManager wiring
1380    // -----------------------------------------------------------------------
1381
1382    /// Access the cancel manager.
1383    pub fn cancel_manager(&self) -> &CancelManager {
1384        &self.cancel_manager
1385    }
1386
1387    /// Access the cancel manager mutably.
1388    pub fn cancel_manager_mut(&mut self) -> &mut CancelManager {
1389        &mut self.cancel_manager
1390    }
1391
1392    /// Register a cancellation token for a request (for remote cancel support).
1393    pub fn register_cancel(&mut self, request_id: String, token: CancellationToken) {
1394        self.cancel_manager.register(request_id, token);
1395    }
1396
1397    /// Process a remote cancellation request.
1398    pub fn cancel_request(&mut self, request_id: &str) -> CancelResponse {
1399        self.cancel_manager.cancel(request_id)
1400    }
1401
1402    /// Clean up a cancellation token after its request completes normally.
1403    ///
1404    /// Should be called when a remote ask/stream/feed completes successfully
1405    /// to prevent stale tokens from accumulating.
1406    pub fn complete_request(&mut self, request_id: &str) {
1407        self.cancel_manager.remove(request_id);
1408    }
1409
1410    // -----------------------------------------------------------------------
1411    // SA4: NodeDirectory wiring
1412    // -----------------------------------------------------------------------
1413
1414    /// Access the node directory.
1415    pub fn node_directory(&self) -> &NodeDirectory {
1416        &self.node_directory
1417    }
1418
1419    /// Access the node directory mutably.
1420    pub fn node_directory_mut(&mut self) -> &mut NodeDirectory {
1421        &mut self.node_directory
1422    }
1423
1424    /// Register a peer node as connected in the directory.
1425    ///
1426    /// **Post-validation only:** this method assumes the peer has already
1427    /// passed the version handshake. Callers should validate compatibility
1428    /// (via [`handshake_request`](Self::handshake_request) +
1429    /// [`validate_handshake`](dactor::validate_handshake) +
1430    /// [`verify_peer_identity`](dactor::verify_peer_identity)) before calling
1431    /// this method. Direct calls bypass compatibility checks.
1432    ///
1433    /// If the peer already exists, updates its status to `Connected` and
1434    /// preserves the existing address when `address` is `None`.
1435    /// Emits a `ClusterEvent::NodeJoined` if the peer was not previously connected.
1436    pub fn connect_peer(&mut self, peer_id: NodeId, address: Option<String>) {
1437        let was_connected = self.node_directory.is_connected(&peer_id);
1438        if let Some(existing) = self.node_directory.get_peer(&peer_id) {
1439            // Preserve existing address if new address is None
1440            let resolved_address = address.or_else(|| existing.address.clone());
1441            self.node_directory.remove_peer(&peer_id);
1442            self.node_directory.add_peer(peer_id.clone(), resolved_address);
1443        } else {
1444            self.node_directory.add_peer(peer_id.clone(), address);
1445        }
1446        self.node_directory.set_status(&peer_id, PeerStatus::Connected);
1447        if !was_connected {
1448            self.cluster_events.emit(dactor::ClusterEvent::NodeJoined(peer_id));
1449        }
1450    }
1451
1452    /// Mark a peer as disconnected.
1453    ///
1454    /// Emits a `ClusterEvent::NodeLeft` if the peer was previously connected.
1455    pub fn disconnect_peer(&mut self, peer_id: &NodeId) {
1456        let was_connected = self.node_directory.is_connected(peer_id);
1457        self.node_directory.set_status(peer_id, PeerStatus::Disconnected);
1458        if was_connected {
1459            self.cluster_events.emit(dactor::ClusterEvent::NodeLeft(peer_id.clone()));
1460        }
1461    }
1462
1463    /// Check if a peer node is connected.
1464    pub fn is_peer_connected(&self, peer_id: &NodeId) -> bool {
1465        self.node_directory.is_connected(peer_id)
1466    }
1467
1468    // -----------------------------------------------------------------------
1469    // JH1-JH2: Actor lifecycle handles
1470    // -----------------------------------------------------------------------
1471
1472    /// Wait for an actor to stop.
1473    ///
1474    /// Returns `Ok(())` when the actor finishes cleanly, or `Err` if the
1475    /// actor panicked in `on_stop`. The stop receiver is consumed and removed
1476    /// from the map.
1477    ///
1478    /// Returns `Ok(())` immediately if no stop receiver is stored for this ID
1479    /// (e.g., actor was already awaited or was not spawned by this runtime).
1480    pub async fn await_stop(&self, actor_id: &ActorId) -> Result<(), String> {
1481        let rx = {
1482            let mut receivers = self.stop_receivers.lock().unwrap();
1483            receivers.remove(actor_id)
1484        };
1485        match rx {
1486            Some(rx) => rx
1487                .await
1488                .map_err(|_| "stop notifier dropped".to_string())
1489                .and_then(|r| r),
1490            None => Ok(()),
1491        }
1492    }
1493
1494    /// Wait for all spawned actors to stop.
1495    ///
1496    /// Drains all stored stop receivers and awaits them all. Returns the first
1497    /// error encountered, but always waits for every actor to finish.
1498    pub async fn await_all(&self) -> Result<(), String> {
1499        let receivers: Vec<_> = {
1500            let mut map = self.stop_receivers.lock().unwrap();
1501            map.drain().collect()
1502        };
1503        let mut first_error = None;
1504        for (_, rx) in receivers {
1505            let result = rx.await.map_err(|e| format!("stop notifier dropped: {e}")).and_then(|r| r);
1506            if let Err(e) = result {
1507                if first_error.is_none() {
1508                    first_error = Some(e);
1509                }
1510            }
1511        }
1512        match first_error {
1513            Some(e) => Err(e),
1514            None => Ok(()),
1515        }
1516    }
1517
1518    /// Remove completed stop receivers from the map.
1519    ///
1520    /// Call periodically to prevent stale entries from accumulating
1521    /// for actors that stopped without being awaited.
1522    pub fn cleanup_finished(&self) {
1523        let mut receivers = self.stop_receivers.lock().unwrap();
1524        receivers.retain(|_, rx| {
1525            matches!(rx.try_recv(), Err(tokio::sync::oneshot::error::TryRecvError::Empty))
1526        });
1527    }
1528
1529    /// Number of actors with stored stop receivers.
1530    ///
1531    /// Note: includes receivers for actors that have already stopped but
1532    /// haven't been awaited or cleaned up. Call `cleanup_finished()` first
1533    /// for an accurate count of running actors.
1534    pub fn active_handle_count(&self) -> usize {
1535        self.stop_receivers.lock().unwrap().len()
1536    }
1537}
1538
1539impl Default for RactorRuntime {
1540    fn default() -> Self {
1541        Self::new()
1542    }
1543}
1544
1545// ---------------------------------------------------------------------------
1546// NA10: SystemMessageRouter for ractor
1547// ---------------------------------------------------------------------------
1548
1549// NOTE: System messages routed here update the **native system actors** (the
1550// mailbox-based ractor actors spawned by `start_system_actors()`). The runtime
1551// also keeps plain struct system actors (`self.watch_manager`, etc.) for the
1552// backward-compatible sync API. This dual-state pattern is intentional — see
1553// progress.md "Dual struct+actor pattern" design decision.
1554
1555#[async_trait::async_trait]
1556impl dactor::system_router::SystemMessageRouter for RactorRuntime {
1557    async fn route_system_envelope(
1558        &self,
1559        envelope: dactor::remote::WireEnvelope,
1560    ) -> Result<dactor::system_router::RoutingOutcome, dactor::system_router::RoutingError> {
1561        use dactor::system_actors::*;
1562        use dactor::system_router::{RoutingError, RoutingOutcome};
1563
1564        dactor::system_router::validate_system_message_type(&envelope.message_type)?;
1565
1566        let refs = self
1567            .system_actors
1568            .as_ref()
1569            .ok_or_else(|| RoutingError::new("system actors not started"))?;
1570
1571        match envelope.message_type.as_str() {
1572            SYSTEM_MSG_TYPE_SPAWN => {
1573                let request = dactor::proto::decode_spawn_request(&envelope.body)
1574                    .map_err(|e| RoutingError::new(format!("decode SpawnRequest: {e}")))?;
1575
1576                let req_id = request.request_id.clone();
1577                let (tx, rx) = tokio::sync::oneshot::channel();
1578                refs.spawn_manager
1579                    .cast(crate::system_actors::SpawnManagerMsg::HandleRequest {
1580                        request,
1581                        reply: tx,
1582                    })
1583                    .map_err(|e| RoutingError::new(format!("SpawnManager mailbox: {e}")))?;
1584
1585                let result = rx
1586                    .await
1587                    .map_err(|_| RoutingError::new("SpawnManager reply dropped"))?;
1588
1589                match result {
1590                    Ok((actor_id, _actor)) => Ok(RoutingOutcome::SpawnCompleted {
1591                        request_id: req_id,
1592                        actor_id,
1593                    }),
1594                    Err(SpawnResponse::Failure { request_id, error }) => {
1595                        Ok(RoutingOutcome::SpawnFailed { request_id, error })
1596                    }
1597                    Err(SpawnResponse::Success { .. }) => {
1598                        unreachable!("SpawnResult::Err always wraps SpawnResponse::Failure")
1599                    }
1600                }
1601            }
1602
1603            SYSTEM_MSG_TYPE_WATCH => {
1604                let request = dactor::proto::decode_watch_request(&envelope.body)
1605                    .map_err(|e| RoutingError::new(format!("decode WatchRequest: {e}")))?;
1606
1607                refs.watch_manager
1608                    .cast(crate::system_actors::WatchManagerMsg::Watch {
1609                        target: request.target,
1610                        watcher: request.watcher,
1611                    })
1612                    .map_err(|e| RoutingError::new(format!("WatchManager mailbox: {e}")))?;
1613
1614                Ok(RoutingOutcome::Acknowledged)
1615            }
1616
1617            SYSTEM_MSG_TYPE_UNWATCH => {
1618                let request = dactor::proto::decode_unwatch_request(&envelope.body)
1619                    .map_err(|e| RoutingError::new(format!("decode UnwatchRequest: {e}")))?;
1620
1621                refs.watch_manager
1622                    .cast(crate::system_actors::WatchManagerMsg::Unwatch {
1623                        target: request.target,
1624                        watcher: request.watcher,
1625                    })
1626                    .map_err(|e| RoutingError::new(format!("WatchManager mailbox: {e}")))?;
1627
1628                Ok(RoutingOutcome::Acknowledged)
1629            }
1630
1631            SYSTEM_MSG_TYPE_CANCEL => {
1632                let request = dactor::proto::decode_cancel_request(&envelope.body)
1633                    .map_err(|e| RoutingError::new(format!("decode CancelRequest: {e}")))?;
1634
1635                let request_id = request
1636                    .request_id
1637                    .ok_or_else(|| RoutingError::new("CancelRequest missing request_id"))?;
1638
1639                let (tx, rx) = tokio::sync::oneshot::channel();
1640                refs.cancel_manager
1641                    .cast(crate::system_actors::CancelManagerMsg::Cancel {
1642                        request_id,
1643                        reply: tx,
1644                    })
1645                    .map_err(|e| RoutingError::new(format!("CancelManager mailbox: {e}")))?;
1646
1647                let response = rx
1648                    .await
1649                    .map_err(|_| RoutingError::new("CancelManager reply dropped"))?;
1650
1651                match response {
1652                    CancelResponse::Acknowledged => Ok(RoutingOutcome::CancelAcknowledged),
1653                    CancelResponse::NotFound { reason } => {
1654                        Ok(RoutingOutcome::CancelNotFound { reason })
1655                    }
1656                }
1657            }
1658
1659            SYSTEM_MSG_TYPE_CONNECT_PEER => {
1660                let (peer_id, address) = dactor::proto::decode_connect_peer(&envelope.body)
1661                    .map_err(|e| RoutingError::new(format!("decode ConnectPeer: {e}")))?;
1662
1663                refs.node_directory
1664                    .cast(crate::system_actors::NodeDirectoryMsg::ConnectPeer {
1665                        peer_id,
1666                        address,
1667                    })
1668                    .map_err(|e| RoutingError::new(format!("NodeDirectory mailbox: {e}")))?;
1669
1670                Ok(RoutingOutcome::Acknowledged)
1671            }
1672
1673            SYSTEM_MSG_TYPE_DISCONNECT_PEER => {
1674                let peer_id = dactor::proto::decode_disconnect_peer(&envelope.body)
1675                    .map_err(|e| RoutingError::new(format!("decode DisconnectPeer: {e}")))?;
1676
1677                refs.node_directory
1678                    .cast(crate::system_actors::NodeDirectoryMsg::DisconnectPeer { peer_id })
1679                    .map_err(|e| RoutingError::new(format!("NodeDirectory mailbox: {e}")))?;
1680
1681                Ok(RoutingOutcome::Acknowledged)
1682            }
1683
1684            // validate_system_message_type() already rejected unknown types above,
1685            // so this branch only triggers if a new constant is added without a
1686            // handler — a compile-time-detectable oversight.
1687            other => Err(RoutingError::new(format!(
1688                "unhandled system message type: {other}"
1689            ))),
1690        }
1691    }
1692}
1693