Skip to main content

dactor_ractor/
runtime.rs

1//! V0.2 ractor adapter runtime for the dactor actor framework.
2//!
3//! Bridges dactor's `Actor`/`Handler<M>`/`ActorRef<A>` API with ractor's
4//! single-message-type `ractor::Actor` trait using type-erased dispatch.
5
6use std::collections::HashMap;
7use std::marker::PhantomData;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Arc, Mutex};
10use std::time::Duration;
11
12use futures::FutureExt;
13use tokio_util::sync::CancellationToken;
14
15use dactor::actor::{
16    Actor, ActorContext, ActorError, ActorRef, AskReply, ReduceHandler, Handler, ExpandHandler,
17    TransformHandler,
18};
19use dactor::dead_letter::{DeadLetterEvent, DeadLetterHandler, DeadLetterReason};
20use dactor::dispatch::{AskDispatch, Dispatch, ReduceDispatch, ExpandDispatch, TransformDispatch, TypedDispatch};
21use dactor::errors::{ActorSendError, ErrorAction, RuntimeError};
22use dactor::interceptor::{
23    Disposition, DropObserver, InboundContext, InboundInterceptor, OutboundInterceptor, Outcome,
24    SendMode,
25};
26use dactor::mailbox::MailboxConfig;
27use dactor::message::{Headers, Message, RuntimeHeaders};
28use dactor::node::{ActorId, NodeId};
29use dactor::runtime_support::{
30    spawn_reduce_batched_drain, spawn_reduce_drain, spawn_transform_drain,
31    wrap_batched_stream_with_interception, wrap_stream_with_interception, OutboundPipeline,
32};
33use dactor::stream::{
34    BatchConfig, BatchReader, BatchWriter, BoxStream, StreamReceiver, StreamSender,
35};
36use dactor::supervision::ChildTerminated;
37use dactor::system_actors::{
38    CancelManager, CancelResponse, NodeDirectory, PeerStatus, SpawnManager, SpawnRequest,
39    SpawnResponse, WatchManager, WatchNotification,
40};
41use dactor::type_registry::TypeRegistry;
42
43use crate::cluster::RactorClusterEvents;
44
45// ---------------------------------------------------------------------------
46// Watch registry
47// ---------------------------------------------------------------------------
48
49/// A type-erased entry in the watch registry.
50struct WatchEntry {
51    watcher_id: ActorId,
52    /// Closure that delivers a [`ChildTerminated`] to the watcher actor.
53    notify: Box<dyn Fn(ChildTerminated) + Send + Sync>,
54}
55
56/// Shared watch registry mapping watched actor ID → list of watcher entries.
57type WatcherMap = Arc<Mutex<HashMap<ActorId, Vec<WatchEntry>>>>;
58
59// ---------------------------------------------------------------------------
60// Ractor wrapper actor
61// ---------------------------------------------------------------------------
62
63/// The ractor message type wrapping a type-erased dactor dispatch envelope.
64struct DactorMsg<A: Actor>(Box<dyn Dispatch<A>>);
65
66/// The ractor `Actor` implementation that bridges to a dactor `Actor`.
67struct RactorDactorActor<A: Actor> {
68    _phantom: PhantomData<fn() -> A>,
69}
70
71/// State held by the ractor actor, containing the dactor `Actor` instance.
72struct RactorActorState<A: Actor> {
73    actor: A,
74    ctx: ActorContext,
75    interceptors: Vec<Box<dyn InboundInterceptor>>,
76    watchers: WatcherMap,
77    stop_reason: Option<String>,
78    dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
79    /// Notified when the actor stops (for await_stop).
80    stop_notifier: Option<tokio::sync::oneshot::Sender<Result<(), String>>>,
81}
82
83/// Arguments passed to the ractor actor at spawn time.
84struct RactorSpawnArgs<A: Actor> {
85    args: A::Args,
86    deps: A::Deps,
87    actor_id: ActorId,
88    actor_name: String,
89    interceptors: Vec<Box<dyn InboundInterceptor>>,
90    watchers: WatcherMap,
91    dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
92    stop_notifier: Option<tokio::sync::oneshot::Sender<Result<(), String>>>,
93}
94
95impl<A: Actor + 'static> ractor::Actor for RactorDactorActor<A> {
96    type Msg = DactorMsg<A>;
97    type State = RactorActorState<A>;
98    type Arguments = RactorSpawnArgs<A>;
99
100    async fn pre_start(
101        &self,
102        _myself: ractor::ActorRef<Self::Msg>,
103        args: Self::Arguments,
104    ) -> Result<Self::State, ractor::ActorProcessingErr> {
105        let mut actor = A::create(args.args, args.deps);
106        let mut ctx = ActorContext::new(args.actor_id, args.actor_name);
107        actor.on_start(&mut ctx).await;
108        Ok(RactorActorState {
109            actor,
110            ctx,
111            interceptors: args.interceptors,
112            watchers: args.watchers,
113            stop_reason: None,
114            dead_letter_handler: args.dead_letter_handler,
115            stop_notifier: args.stop_notifier,
116        })
117    }
118
119    async fn handle(
120        &self,
121        myself: ractor::ActorRef<Self::Msg>,
122        message: Self::Msg,
123        state: &mut Self::State,
124    ) -> Result<(), ractor::ActorProcessingErr> {
125        let dispatch = message.0;
126
127        // Capture metadata before dispatch consumes the message
128        let send_mode = dispatch.send_mode();
129        let message_type = dispatch.message_type_name();
130
131        state.ctx.send_mode = Some(send_mode);
132        state.ctx.headers = Headers::new();
133
134        // Run inbound interceptor pipeline
135        let runtime_headers = RuntimeHeaders::new();
136        let mut headers = Headers::new();
137        let mut total_delay = Duration::ZERO;
138        let mut rejection: Option<(String, Disposition)> = None;
139
140        {
141            let ictx = InboundContext {
142                actor_id: state.ctx.actor_id.clone(),
143                actor_name: &state.ctx.actor_name,
144                message_type,
145                send_mode,
146                remote: false,
147                origin_node: None,
148            };
149
150            for interceptor in &state.interceptors {
151                match interceptor.on_receive(
152                    &ictx,
153                    &runtime_headers,
154                    &mut headers,
155                    dispatch.message_any(),
156                ) {
157                    Disposition::Continue => {}
158                    Disposition::Delay(d) => {
159                        total_delay += d;
160                    }
161                    disp @ (Disposition::Drop | Disposition::Reject(_) | Disposition::Retry(_)) => {
162                        rejection = Some((interceptor.name().to_string(), disp));
163                        break;
164                    }
165                }
166            }
167        }
168
169        // If rejected/dropped/retry, propagate proper error to caller
170        if let Some((interceptor_name, disposition)) = rejection {
171            if matches!(disposition, Disposition::Drop) {
172                if let Some(ref handler) = *state.dead_letter_handler {
173                    let event = DeadLetterEvent {
174                        target_id: state.ctx.actor_id.clone(),
175                        target_name: Some(state.ctx.actor_name.clone()),
176                        message_type,
177                        send_mode,
178                        reason: DeadLetterReason::DroppedByInterceptor {
179                            interceptor: interceptor_name.clone(),
180                        },
181                        message: None,
182                    };
183                    let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
184                        handler.on_dead_letter(event);
185                    }));
186                }
187            }
188            dispatch.reject(disposition, &interceptor_name);
189            return Ok(());
190        }
191
192        if !total_delay.is_zero() {
193            tokio::time::sleep(total_delay).await;
194        }
195
196        // Copy interceptor-populated headers to ActorContext so handler can access them
197        state.ctx.headers = headers;
198
199        // Propagate cancellation token to context
200        let cancel_token = dispatch.cancel_token();
201        state.ctx.set_cancellation_token(cancel_token.clone());
202
203        // Check if already cancelled before dispatching
204        if let Some(ref token) = cancel_token {
205            if token.is_cancelled() {
206                dispatch.cancel();
207                state.ctx.set_cancellation_token(None);
208                return Ok(());
209            }
210        }
211
212        // Dispatch with panic catching and cancellation racing
213        let result = if let Some(ref token) = cancel_token {
214            let dispatch_fut =
215                std::panic::AssertUnwindSafe(dispatch.dispatch(&mut state.actor, &mut state.ctx))
216                    .catch_unwind();
217            tokio::select! {
218                biased;
219                r = dispatch_fut => r,
220                _ = token.cancelled() => {
221                    // In-flight cancellation: dispatch_fut is dropped, which drops
222                    // reply_tx inside it. Caller's AskReply sees channel closed.
223                    // Pre-dispatch cancellation (above) sends RuntimeError::Cancelled.
224                    state.ctx.set_cancellation_token(None);
225                    return Ok(());
226                }
227            }
228        } else {
229            std::panic::AssertUnwindSafe(dispatch.dispatch(&mut state.actor, &mut state.ctx))
230                .catch_unwind()
231                .await
232        };
233
234        state.ctx.set_cancellation_token(None);
235
236        // Build context for on_complete
237        let ictx = InboundContext {
238            actor_id: state.ctx.actor_id.clone(),
239            actor_name: &state.ctx.actor_name,
240            message_type,
241            send_mode,
242            remote: false,
243            origin_node: None,
244        };
245
246        match result {
247            Ok(dispatch_result) => {
248                let outcome = match (&dispatch_result.reply, send_mode) {
249                    (Some(reply), SendMode::Ask) => Outcome::AskSuccess {
250                        reply: reply.as_ref(),
251                    },
252                    _ => Outcome::TellSuccess,
253                };
254
255                for interceptor in &state.interceptors {
256                    interceptor.on_complete(&ictx, &runtime_headers, &state.ctx.headers, &outcome);
257                }
258
259                // Send reply to caller AFTER interceptors have seen it
260                dispatch_result.send_reply();
261            }
262            Err(_panic) => {
263                let error = ActorError::internal("handler panicked");
264                let action = state.actor.on_error(&error);
265
266                let outcome = Outcome::HandlerError { error };
267                for interceptor in &state.interceptors {
268                    interceptor.on_complete(&ictx, &runtime_headers, &state.ctx.headers, &outcome);
269                }
270
271                match action {
272                    ErrorAction::Resume => {
273                        // Actor resumes — do NOT set stop_reason
274                    }
275                    ErrorAction::Stop | ErrorAction::Escalate => {
276                        state.stop_reason = Some("handler panicked".into());
277                        myself.stop(None);
278                    }
279                    ErrorAction::Restart => {
280                        tracing::warn!("Restart not fully implemented, treating as Resume");
281                    }
282                }
283            }
284        }
285
286        Ok(())
287    }
288
289    async fn post_stop(
290        &self,
291        _myself: ractor::ActorRef<Self::Msg>,
292        state: &mut Self::State,
293    ) -> Result<(), ractor::ActorProcessingErr> {
294        // Reset context to lifecycle semantics before on_stop
295        state.ctx.send_mode = None;
296        state.ctx.headers = Headers::new();
297        state.ctx.set_cancellation_token(None);
298
299        // Run on_stop with panic catching so panics propagate as errors
300        // through await_stop() instead of aborting the ractor task.
301        let on_stop_panicked =
302            std::panic::AssertUnwindSafe(state.actor.on_stop())
303                .catch_unwind()
304                .await
305                .is_err();
306        if on_stop_panicked && state.stop_reason.is_none() {
307            state.stop_reason = Some("actor panicked in on_stop".to_string());
308        }
309
310        // Notify all watchers that this actor has terminated.
311        let actor_id = state.ctx.actor_id.clone();
312        let actor_name = state.ctx.actor_name.clone();
313        let entries = {
314            let mut watchers = state.watchers.lock().unwrap();
315            // Remove entries where this actor is the TARGET (watchers of this actor)
316            let target_entries = watchers.remove(&actor_id).unwrap_or_default();
317            // Also clean up entries where this actor is the WATCHER (prevent leak)
318            for entries in watchers.values_mut() {
319                entries.retain(|e| e.watcher_id != actor_id);
320            }
321            watchers.retain(|_, v| !v.is_empty());
322            target_entries
323        };
324        if !entries.is_empty() {
325            let notification = ChildTerminated {
326                child_id: actor_id,
327                child_name: actor_name,
328                reason: state.stop_reason.clone(),
329            };
330            for entry in &entries {
331                (entry.notify)(notification.clone());
332            }
333        }
334
335        // Notify await_stop() waiters — track on_stop panic independently
336        // from stop_reason (which may already be set by a handler panic).
337        if let Some(tx) = state.stop_notifier.take() {
338            let result = if on_stop_panicked {
339                Err("actor panicked in on_stop".to_string())
340            } else {
341                Ok(())
342            };
343            let _ = tx.send(result);
344        }
345
346        Ok(())
347    }
348}
349
350// ---------------------------------------------------------------------------
351// RactorActorRef — dactor ActorRef backed by ractor
352// ---------------------------------------------------------------------------
353
354/// A dactor `ActorRef` backed by a ractor `ActorRef`.
355///
356/// Messages are delivered through ractor's mailbox as type-erased dispatch
357/// envelopes, enabling multiple `Handler<M>` impls per actor.
358///
359/// When configured with [`MailboxConfig::Bounded`], a bounded `mpsc` channel
360/// sits in front of the ractor actor, providing backpressure control at the
361/// dactor level while ractor's internal mailbox remains unbounded.
362pub struct RactorActorRef<A: Actor> {
363    id: ActorId,
364    name: String,
365    inner: ractor::ActorRef<DactorMsg<A>>,
366    bounded_tx: Option<BoundedMailboxSender<DactorMsg<A>>>,
367    outbound_interceptors: Arc<Vec<Box<dyn OutboundInterceptor>>>,
368    drop_observer: Option<Arc<dyn DropObserver>>,
369    dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
370}
371
372use dactor::runtime_support::BoundedMailboxSender;
373
374impl<A: Actor> Clone for RactorActorRef<A> {
375    fn clone(&self) -> Self {
376        Self {
377            id: self.id.clone(),
378            name: self.name.clone(),
379            inner: self.inner.clone(),
380            bounded_tx: self.bounded_tx.clone(),
381            outbound_interceptors: self.outbound_interceptors.clone(),
382            drop_observer: self.drop_observer.clone(),
383            dead_letter_handler: self.dead_letter_handler.clone(),
384        }
385    }
386}
387
388impl<A: Actor> std::fmt::Debug for RactorActorRef<A> {
389    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
390        write!(f, "RactorActorRef({}, {:?})", self.name, self.id)
391    }
392}
393
394impl<A: Actor + 'static> RactorActorRef<A> {
395    fn outbound_pipeline(&self) -> OutboundPipeline {
396        OutboundPipeline {
397            interceptors: self.outbound_interceptors.clone(),
398            drop_observer: self.drop_observer.clone(),
399            target_id: self.id.clone(),
400            target_name: self.name.clone(),
401        }
402    }
403
404    fn notify_dead_letter(
405        &self,
406        message_type: &'static str,
407        send_mode: SendMode,
408        reason: DeadLetterReason,
409    ) {
410        if let Some(ref handler) = *self.dead_letter_handler {
411            let event = DeadLetterEvent {
412                target_id: self.id.clone(),
413                target_name: Some(self.name.clone()),
414                message_type,
415                send_mode,
416                reason,
417                message: None,
418            };
419            let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
420                handler.on_dead_letter(event);
421            }));
422        }
423    }
424
425    /// Send a dispatch envelope through the bounded channel (if configured)
426    /// or directly to the ractor actor.
427    fn send_dispatch(&self, dispatch: Box<dyn Dispatch<A>>) -> Result<(), ActorSendError> {
428        if let Some(ref btx) = self.bounded_tx {
429            btx.try_send(DactorMsg(dispatch))
430        } else {
431            self.inner
432                .cast(DactorMsg(dispatch))
433                .map_err(|e| ActorSendError(e.to_string()))
434        }
435    }
436}
437
438impl<A: Actor + 'static> ActorRef<A> for RactorActorRef<A> {
439    fn id(&self) -> ActorId {
440        self.id.clone()
441    }
442
443    fn name(&self) -> String {
444        self.name.clone()
445    }
446
447    fn is_alive(&self) -> bool {
448        let inner_alive = matches!(
449            self.inner.get_status(),
450            ractor::ActorStatus::Running
451                | ractor::ActorStatus::Starting
452                | ractor::ActorStatus::Upgrading
453        );
454        if let Some(ref btx) = self.bounded_tx {
455            !btx.is_closed() && inner_alive
456        } else {
457            inner_alive
458        }
459    }
460
461    fn pending_messages(&self) -> usize {
462        if let Some(ref btx) = self.bounded_tx {
463            btx.pending()
464        } else {
465            0
466        }
467    }
468
469    fn stop(&self) {
470        self.inner.stop(None);
471    }
472
473    fn tell<M>(&self, msg: M) -> Result<(), ActorSendError>
474    where
475        A: Handler<M>,
476        M: Message<Reply = ()>,
477    {
478        let pipeline = self.outbound_pipeline();
479        let result = pipeline.run_on_send(SendMode::Tell, &msg);
480        match result.disposition {
481            Disposition::Continue => {}
482            Disposition::Delay(_) => {} // Not supported in sync tell
483            Disposition::Drop | Disposition::Reject(_) | Disposition::Retry(_) => return Ok(()),
484        }
485
486        let dispatch: Box<dyn Dispatch<A>> = Box::new(TypedDispatch { msg });
487        self.send_dispatch(dispatch).map_err(|e| {
488            let reason = if e.0.contains("full") {
489                DeadLetterReason::MailboxFull
490            } else {
491                DeadLetterReason::ActorStopped
492            };
493            self.notify_dead_letter(std::any::type_name::<M>(), SendMode::Tell, reason);
494            e
495        })
496    }
497
498    fn ask<M>(
499        &self,
500        msg: M,
501        cancel: Option<CancellationToken>,
502    ) -> Result<AskReply<M::Reply>, ActorSendError>
503    where
504        A: Handler<M>,
505        M: Message,
506    {
507        let pipeline = self.outbound_pipeline();
508        let result = pipeline.run_on_send(SendMode::Ask, &msg);
509        match result.disposition {
510            Disposition::Continue => {}
511            Disposition::Delay(_) => {} // Not supported in sync context
512            Disposition::Drop => {
513                let (tx, rx) = tokio::sync::oneshot::channel();
514                let _ = tx.send(Err(RuntimeError::ActorNotFound(
515                    "message dropped by outbound interceptor".into(),
516                )));
517                return Ok(AskReply::new(rx));
518            }
519            Disposition::Reject(reason) => {
520                let (tx, rx) = tokio::sync::oneshot::channel();
521                let _ = tx.send(Err(RuntimeError::Rejected {
522                    interceptor: result.interceptor_name.to_string(),
523                    reason,
524                }));
525                return Ok(AskReply::new(rx));
526            }
527            Disposition::Retry(retry_after) => {
528                let (tx, rx) = tokio::sync::oneshot::channel();
529                let _ = tx.send(Err(RuntimeError::RetryAfter {
530                    interceptor: result.interceptor_name.to_string(),
531                    retry_after,
532                }));
533                return Ok(AskReply::new(rx));
534            }
535        }
536
537        let (tx, rx) = tokio::sync::oneshot::channel();
538        let dispatch: Box<dyn Dispatch<A>> = Box::new(AskDispatch {
539            msg,
540            reply_tx: tx,
541            cancel,
542        });
543        self.send_dispatch(dispatch).map_err(|e| {
544            let reason = if e.0.contains("full") {
545                DeadLetterReason::MailboxFull
546            } else {
547                DeadLetterReason::ActorStopped
548            };
549            self.notify_dead_letter(std::any::type_name::<M>(), SendMode::Ask, reason);
550            e
551        })?;
552        Ok(AskReply::new(rx))
553    }
554
555    fn expand<M, OutputItem>(
556        &self,
557        msg: M,
558        buffer: usize,
559        batch_config: Option<BatchConfig>,
560        cancel: Option<CancellationToken>,
561    ) -> Result<BoxStream<OutputItem>, ActorSendError>
562    where
563        A: ExpandHandler<M, OutputItem>,
564        M: Send + 'static,
565        OutputItem: Send + 'static,
566    {
567        let pipeline = self.outbound_pipeline();
568        let result = pipeline.run_on_send(SendMode::Expand, &msg);
569        match result.disposition {
570            Disposition::Continue => {}
571            Disposition::Delay(_) => {}
572            Disposition::Drop => {
573                return Err(ActorSendError(
574                    "stream dropped by outbound interceptor".into(),
575                ));
576            }
577            Disposition::Reject(reason) => {
578                return Err(ActorSendError(format!("stream rejected: {}", reason)));
579            }
580            Disposition::Retry(_) => {
581                return Err(ActorSendError(
582                    "stream retry requested by interceptor".into(),
583                ));
584            }
585        }
586
587        let buffer = buffer.max(1);
588        let (tx, mut rx) = tokio::sync::mpsc::channel(buffer);
589        let sender = StreamSender::new(tx);
590        let dispatch: Box<dyn Dispatch<A>> = Box::new(ExpandDispatch {
591            msg,
592            sender,
593            cancel,
594        });
595        self.send_dispatch(dispatch)?;
596
597        match batch_config {
598            Some(batch_config) => {
599                let (batch_tx, batch_rx) = tokio::sync::mpsc::channel::<Vec<OutputItem>>(buffer);
600                let reader = BatchReader::new(batch_rx);
601                let batch_delay = batch_config.max_delay;
602                tokio::spawn(async move {
603                    let mut writer = BatchWriter::new(batch_tx, batch_config);
604                    loop {
605                        if writer.buffered_count() > 0 {
606                            let deadline = tokio::time::Instant::now() + batch_delay;
607                            tokio::select! {
608                                biased;
609                                item = rx.recv() => match item {
610                                    Some(item) => {
611                                        if writer.push(item).await.is_err() { break; }
612                                    }
613                                    None => break,
614                                },
615                                _ = tokio::time::sleep_until(deadline) => {
616                                    if writer.check_deadline().await.is_err() { break; }
617                                }
618                            }
619                        } else {
620                            match rx.recv().await {
621                                Some(item) => {
622                                    if writer.push(item).await.is_err() {
623                                        break;
624                                    }
625                                }
626                                None => break,
627                            }
628                        }
629                    }
630                    let _ = writer.flush().await;
631                });
632                Ok(wrap_batched_stream_with_interception(
633                    reader,
634                    buffer,
635                    pipeline,
636                    std::any::type_name::<M>(),
637                    SendMode::Expand,
638                ))
639            }
640            None => Ok(wrap_stream_with_interception(
641                    rx,
642                    buffer,
643                    pipeline,
644                    std::any::type_name::<M>(),
645                    SendMode::Expand,
646                )),
647        }
648    }
649
650    fn reduce<InputItem, Reply>(
651        &self,
652        input: BoxStream<InputItem>,
653        buffer: usize,
654        batch_config: Option<BatchConfig>,
655        cancel: Option<CancellationToken>,
656    ) -> Result<AskReply<Reply>, ActorSendError>
657    where
658        A: ReduceHandler<InputItem, Reply>,
659        InputItem: Send + 'static,
660        Reply: Send + 'static,
661    {
662        let buffer = buffer.max(1);
663        let (item_tx, item_rx) = tokio::sync::mpsc::channel(buffer);
664        let receiver = StreamReceiver::new(item_rx);
665        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
666        let dispatch: Box<dyn Dispatch<A>> = Box::new(ReduceDispatch {
667            receiver,
668            reply_tx,
669            cancel: cancel.clone(),
670        });
671        self.send_dispatch(dispatch)?;
672
673        let pipeline = self.outbound_pipeline();
674        match batch_config {
675            Some(batch_config) => {
676                spawn_reduce_batched_drain(
677                    input,
678                    item_tx,
679                    buffer,
680                    batch_config,
681                    cancel,
682                    pipeline,
683                    std::any::type_name::<InputItem>(),
684                );
685            }
686            None => {
687                spawn_reduce_drain(
688                    input,
689                    item_tx,
690                    cancel,
691                    pipeline,
692                    std::any::type_name::<InputItem>(),
693                );
694            }
695        }
696
697        Ok(AskReply::new(reply_rx))
698    }
699
700    fn transform<InputItem, OutputItem>(
701        &self,
702        input: BoxStream<InputItem>,
703        buffer: usize,
704        batch_config: Option<BatchConfig>,
705        cancel: Option<CancellationToken>,
706    ) -> Result<BoxStream<OutputItem>, ActorSendError>
707    where
708        A: TransformHandler<InputItem, OutputItem>,
709        InputItem: Send + 'static,
710        OutputItem: Send + 'static,
711    {
712        let buffer = buffer.max(1);
713        let (item_tx, item_rx) = tokio::sync::mpsc::channel(buffer);
714        let (output_tx, mut output_rx) = tokio::sync::mpsc::channel(buffer);
715        let receiver = StreamReceiver::new(item_rx);
716        let sender = StreamSender::new(output_tx);
717        let dispatch: Box<dyn Dispatch<A>> = Box::new(TransformDispatch::new(
718            receiver,
719            sender,
720            cancel.clone(),
721        ));
722        self.send_dispatch(dispatch)?;
723
724        let pipeline = self.outbound_pipeline();
725        spawn_transform_drain(
726            input,
727            item_tx,
728            cancel,
729            pipeline.clone(),
730            std::any::type_name::<InputItem>(),
731        );
732
733        match batch_config {
734            Some(batch_config) => {
735                let (batch_tx, batch_rx) =
736                    tokio::sync::mpsc::channel::<Vec<OutputItem>>(buffer);
737                let reader = BatchReader::new(batch_rx);
738                let batch_delay = batch_config.max_delay;
739                tokio::spawn(async move {
740                    let mut writer = BatchWriter::new(batch_tx, batch_config);
741                    loop {
742                        if writer.buffered_count() > 0 {
743                            let deadline = tokio::time::Instant::now() + batch_delay;
744                            tokio::select! {
745                                biased;
746                                item = output_rx.recv() => match item {
747                                    Some(item) => {
748                                        if writer.push(item).await.is_err() { break; }
749                                    }
750                                    None => break,
751                                },
752                                _ = tokio::time::sleep_until(deadline) => {
753                                    if writer.check_deadline().await.is_err() { break; }
754                                }
755                            }
756                        } else {
757                            match output_rx.recv().await {
758                                Some(item) => {
759                                    if writer.push(item).await.is_err() {
760                                        break;
761                                    }
762                                }
763                                None => break,
764                            }
765                        }
766                    }
767                    let _ = writer.flush().await;
768                });
769                Ok(wrap_batched_stream_with_interception(
770                    reader,
771                    buffer,
772                    pipeline,
773                    std::any::type_name::<OutputItem>(),
774                    SendMode::Transform,
775                ))
776            }
777            None => Ok(wrap_stream_with_interception(
778                output_rx,
779                buffer,
780                pipeline,
781                std::any::type_name::<OutputItem>(),
782                SendMode::Transform,
783            )),
784        }
785    }
786}
787
788// ---------------------------------------------------------------------------
789// SpawnOptions
790// ---------------------------------------------------------------------------
791
792/// Options for spawning an actor, including the inbound interceptor pipeline.
793/// Options for spawning an actor. Use `..Default::default()` to future-proof
794/// against new fields.
795pub struct SpawnOptions {
796    /// Inbound interceptors to attach to the actor.
797    pub interceptors: Vec<Box<dyn InboundInterceptor>>,
798    /// Mailbox capacity configuration.
799    ///
800    /// Mailbox capacity configuration.
801    ///
802    /// When [`Bounded`](MailboxConfig::Bounded), a bounded `mpsc` channel is
803    /// placed in front of the ractor actor to enforce backpressure.
804    pub mailbox: MailboxConfig,
805}
806
807impl Default for SpawnOptions {
808    fn default() -> Self {
809        Self {
810            interceptors: Vec::new(),
811            mailbox: MailboxConfig::Unbounded,
812        }
813    }
814}
815
816// ---------------------------------------------------------------------------
817// RactorRuntime
818// ---------------------------------------------------------------------------
819
820/// A dactor v0.2 runtime backed by ractor.
821///
822/// Actors are spawned as real ractor actors via [`ractor::Actor::spawn`].
823/// Messages are delivered through ractor's mailbox as type-erased dispatch
824/// envelopes, supporting multiple `Handler<M>` impls per actor.
825///
826/// ## System Actors
827///
828/// The runtime spawns native ractor system actors on creation:
829/// - [`SpawnManagerActor`](crate::system_actors::SpawnManagerActor) — handles remote spawn requests
830/// - [`WatchManagerActor`](crate::system_actors::WatchManagerActor) — handles remote watch/unwatch
831/// - [`CancelManagerActor`](crate::system_actors::CancelManagerActor) — handles remote cancellation
832/// - [`NodeDirectoryActor`](crate::system_actors::NodeDirectoryActor) — tracks peer connections
833///
834/// System actors are accessible via `system_actor_refs()` for transport routing.
835pub struct RactorRuntime {
836    node_id: NodeId,
837    next_local: Arc<AtomicU64>,
838    cluster_events: RactorClusterEvents,
839    outbound_interceptors: Arc<Vec<Box<dyn OutboundInterceptor>>>,
840    drop_observer: Option<Arc<dyn DropObserver>>,
841    dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
842    watchers: WatcherMap,
843    /// Plain struct system actors (backward-compatible sync API).
844    spawn_manager: SpawnManager,
845    watch_manager: WatchManager,
846    cancel_manager: CancelManager,
847    node_directory: NodeDirectory,
848    /// Native ractor system actor refs (lazily started via `start_system_actors()`).
849    system_actors: Option<RactorSystemActorRefs>,
850    /// Stop notification receivers for await_stop(), keyed by ActorId.
851    #[allow(clippy::type_complexity)]
852    stop_receivers: Arc<Mutex<HashMap<ActorId, tokio::sync::oneshot::Receiver<Result<(), String>>>>>,
853    /// Application version for this node (informational, used in handshake).
854    app_version: Option<String>,
855}
856
857/// References to the native ractor system actors spawned by the runtime.
858pub struct RactorSystemActorRefs {
859    pub spawn_manager: ractor::ActorRef<crate::system_actors::SpawnManagerMsg>,
860    pub watch_manager: ractor::ActorRef<crate::system_actors::WatchManagerMsg>,
861    pub cancel_manager: ractor::ActorRef<crate::system_actors::CancelManagerMsg>,
862    pub node_directory: ractor::ActorRef<crate::system_actors::NodeDirectoryMsg>,
863}
864
865impl RactorRuntime {
866    /// Create a new `RactorRuntime`.
867    ///
868    /// System actors are not spawned until `start_system_actors()` is called.
869    /// This allows the runtime to be constructed outside a tokio context.
870    pub fn new() -> Self {
871        Self::create(NodeId("ractor-node".into()))
872    }
873
874    /// Create a new `RactorRuntime` with a specific node ID.
875    pub fn with_node_id(node_id: NodeId) -> Self {
876        Self::create(node_id)
877    }
878
879    fn create(node_id: NodeId) -> Self {
880        Self {
881            node_id,
882            next_local: Arc::new(AtomicU64::new(1)),
883            cluster_events: RactorClusterEvents::new(),
884            outbound_interceptors: Arc::new(Vec::new()),
885            drop_observer: None,
886            dead_letter_handler: Arc::new(None),
887            watchers: Arc::new(Mutex::new(HashMap::new())),
888            spawn_manager: SpawnManager::new(TypeRegistry::new()),
889            watch_manager: WatchManager::new(),
890            cancel_manager: CancelManager::new(),
891            node_directory: NodeDirectory::new(),
892            system_actors: None,
893            stop_receivers: Arc::new(Mutex::new(HashMap::new())),
894            app_version: None,
895        }
896    }
897
898    /// The adapter name for this runtime, used in version handshakes.
899    pub const ADAPTER_NAME: &'static str = "ractor";
900
901    /// Set the application version for this node.
902    ///
903    /// This is your application's release version (e.g., "2.3.1"), not the
904    /// dactor framework version. It is included in handshake requests for
905    /// operational visibility during rolling upgrades.
906    pub fn with_app_version(mut self, version: impl Into<String>) -> Self {
907        self.app_version = Some(version.into());
908        self
909    }
910
911    /// Returns the configured application version, if any.
912    pub fn app_version(&self) -> Option<&str> {
913        self.app_version.as_deref()
914    }
915
916    /// Build a [`HandshakeRequest`](dactor::HandshakeRequest) from this
917    /// runtime's current configuration.
918    pub fn handshake_request(&self) -> dactor::HandshakeRequest {
919        dactor::HandshakeRequest::from_runtime(
920            self.node_id.clone(),
921            self.app_version.clone(),
922            Self::ADAPTER_NAME,
923        )
924    }
925
926    /// Spawn native ractor system actors for transport routing.
927    ///
928    /// Must be called from within a tokio runtime context. After this call,
929    /// `system_actor_refs()` returns the native actor references.
930    ///
931    /// Factory registrations made via `register_factory()` before this call
932    /// are forwarded to the native SpawnManagerActor.
933    pub async fn start_system_actors(&mut self) {
934        use crate::system_actors::*;
935
936        let (spawn_ref, _) = ractor::Actor::spawn(
937            None, SpawnManagerActor,
938            (self.node_id.clone(), TypeRegistry::new(), self.next_local.clone()),
939        ).await.expect("failed to spawn SpawnManagerActor");
940
941        let (watch_ref, _) = ractor::Actor::spawn(
942            None, WatchManagerActor, (),
943        ).await.expect("failed to spawn WatchManagerActor");
944
945        let (cancel_ref, _) = ractor::Actor::spawn(
946            None, CancelManagerActor, (),
947        ).await.expect("failed to spawn CancelManagerActor");
948
949        let (node_dir_ref, _) = ractor::Actor::spawn(
950            None, NodeDirectoryActor, (),
951        ).await.expect("failed to spawn NodeDirectoryActor");
952
953        self.system_actors = Some(RactorSystemActorRefs {
954            spawn_manager: spawn_ref,
955            watch_manager: watch_ref,
956            cancel_manager: cancel_ref,
957            node_directory: node_dir_ref,
958        });
959    }
960
961    /// Returns the node ID of this runtime.
962    pub fn node_id(&self) -> &NodeId {
963        &self.node_id
964    }
965
966    /// Access the native system actor references for transport routing.
967    ///
968    /// Returns `None` if `start_system_actors()` has not been called yet.
969    pub fn system_actor_refs(&self) -> Option<&RactorSystemActorRefs> {
970        self.system_actors.as_ref()
971    }
972
973    /// Add a global outbound interceptor.
974    ///
975    /// **Must be called before any actors are spawned.** Panics if actors
976    /// already hold references to the interceptor list (i.e., after `spawn()`).
977    pub fn add_outbound_interceptor(&mut self, interceptor: Box<dyn OutboundInterceptor>) {
978        Arc::get_mut(&mut self.outbound_interceptors)
979            .expect("cannot add interceptors after actors are spawned")
980            .push(interceptor);
981    }
982
983    /// Set a global drop observer that is notified whenever an outbound
984    /// interceptor returns `Disposition::Drop`.
985    pub fn set_drop_observer(&mut self, observer: Arc<dyn DropObserver>) {
986        self.drop_observer = Some(observer);
987    }
988
989    /// Set a global dead letter handler. Called whenever a message cannot be
990    /// delivered (actor stopped, mailbox full, dropped by inbound interceptor).
991    pub fn set_dead_letter_handler(&mut self, handler: Arc<dyn DeadLetterHandler>) {
992        self.dead_letter_handler = Arc::new(Some(handler));
993    }
994
995    /// Access the cluster events subsystem.
996    pub fn cluster_events_handle(&self) -> &RactorClusterEvents {
997        &self.cluster_events
998    }
999
1000    /// Access the cluster events subsystem.
1001    pub fn cluster_events(&self) -> &RactorClusterEvents {
1002        &self.cluster_events
1003    }
1004
1005    /// Spawn an actor with `Deps = ()`.
1006    pub async fn spawn<A>(&self, name: &str, args: A::Args) -> Result<RactorActorRef<A>, dactor::errors::RuntimeError>
1007    where
1008        A: Actor<Deps = ()> + 'static,
1009    {
1010        self.spawn_internal::<A>(name, args, (), Vec::new(), MailboxConfig::Unbounded).await
1011    }
1012
1013    /// Spawn an actor with explicit dependencies.
1014    pub async fn spawn_with_deps<A>(&self, name: &str, args: A::Args, deps: A::Deps) -> Result<RactorActorRef<A>, dactor::errors::RuntimeError>
1015    where
1016        A: Actor + 'static,
1017    {
1018        self.spawn_internal::<A>(name, args, deps, Vec::new(), MailboxConfig::Unbounded).await
1019    }
1020
1021    /// Spawn an actor with spawn options (including inbound interceptors and mailbox config).
1022    pub async fn spawn_with_options<A>(
1023        &self,
1024        name: &str,
1025        args: A::Args,
1026        options: SpawnOptions,
1027    ) -> Result<RactorActorRef<A>, dactor::errors::RuntimeError>
1028    where
1029        A: Actor<Deps = ()> + 'static,
1030    {
1031        self.spawn_internal::<A>(name, args, (), options.interceptors, options.mailbox).await
1032    }
1033
1034    async fn spawn_internal<A>(
1035        &self,
1036        name: &str,
1037        args: A::Args,
1038        deps: A::Deps,
1039        interceptors: Vec<Box<dyn InboundInterceptor>>,
1040        mailbox: MailboxConfig,
1041    ) -> Result<RactorActorRef<A>, dactor::errors::RuntimeError>
1042    where
1043        A: Actor + 'static,
1044    {
1045        let local = self.next_local.fetch_add(1, Ordering::SeqCst);
1046        let actor_id = ActorId {
1047            node: self.node_id.clone(),
1048            local,
1049        };
1050        let actor_name = name.to_string();
1051
1052        let (stop_tx, stop_rx) = tokio::sync::oneshot::channel();
1053
1054        let wrapper = RactorDactorActor::<A> {
1055            _phantom: PhantomData,
1056        };
1057        let spawn_args = RactorSpawnArgs {
1058            args,
1059            deps,
1060            actor_id: actor_id.clone(),
1061            actor_name: actor_name.clone(),
1062            interceptors,
1063            watchers: self.watchers.clone(),
1064            dead_letter_handler: self.dead_letter_handler.clone(),
1065            stop_notifier: Some(stop_tx),
1066        };
1067
1068        let (actor_ref, _join_handle) = ractor::Actor::spawn(Some(name.to_string()), wrapper, spawn_args)
1069            .await
1070            .map_err(|e| dactor::errors::RuntimeError::SpawnFailed(e.to_string()))?;
1071
1072        // Set up optional bounded mailbox channel
1073        let bounded_tx = match mailbox {
1074            MailboxConfig::Bounded { capacity, overflow } => {
1075                let (btx, mut brx) = tokio::sync::mpsc::channel::<DactorMsg<A>>(capacity);
1076                let fwd_ref = actor_ref.clone();
1077                tokio::spawn(async move {
1078                    while let Some(msg) = brx.recv().await {
1079                        if fwd_ref.cast(msg).is_err() {
1080                            break;
1081                        }
1082                    }
1083                });
1084                Some(BoundedMailboxSender::new(btx, overflow))
1085            }
1086            MailboxConfig::Unbounded => None,
1087        };
1088
1089        // Store stop receiver for await_stop()
1090        self.stop_receivers.lock().unwrap().insert(actor_id.clone(), stop_rx);
1091
1092        Ok(RactorActorRef {
1093            id: actor_id,
1094            name: actor_name,
1095            inner: actor_ref,
1096            bounded_tx,
1097            outbound_interceptors: self.outbound_interceptors.clone(),
1098            drop_observer: self.drop_observer.clone(),
1099            dead_letter_handler: self.dead_letter_handler.clone(),
1100        })
1101    }
1102
1103    /// Register actor `watcher` to be notified when `target_id` terminates.
1104    ///
1105    /// The watcher must implement `Handler<ChildTerminated>`. When the target
1106    /// stops, the runtime delivers a [`ChildTerminated`] message to the watcher.
1107    pub fn watch<W>(&self, watcher: &RactorActorRef<W>, target_id: ActorId)
1108    where
1109        W: Actor + Handler<ChildTerminated> + 'static,
1110    {
1111        let watcher_id = watcher.id();
1112        let watcher_inner = watcher.inner.clone();
1113
1114        let entry = WatchEntry {
1115            watcher_id,
1116            notify: Box::new(move |msg: ChildTerminated| {
1117                let dispatch: Box<dyn Dispatch<W>> = Box::new(TypedDispatch { msg });
1118                if watcher_inner.cast(DactorMsg(dispatch)).is_err() {
1119                    tracing::debug!("watch notification dropped — watcher may have stopped");
1120                }
1121            }),
1122        };
1123
1124        let mut watchers = self.watchers.lock().unwrap();
1125        watchers.entry(target_id).or_default().push(entry);
1126    }
1127
1128    /// Unregister `watcher_id` from notifications about `target_id`.
1129    pub fn unwatch(&self, watcher_id: &ActorId, target_id: &ActorId) {
1130        let mut watchers = self.watchers.lock().unwrap();
1131        if let Some(entries) = watchers.get_mut(target_id) {
1132            entries.retain(|e| &e.watcher_id != watcher_id);
1133            if entries.is_empty() {
1134                watchers.remove(target_id);
1135            }
1136        }
1137    }
1138
1139    // -----------------------------------------------------------------------
1140    // SA1: SpawnManager wiring
1141    // -----------------------------------------------------------------------
1142
1143    /// Access the spawn manager.
1144    pub fn spawn_manager(&self) -> &SpawnManager {
1145        &self.spawn_manager
1146    }
1147
1148    /// Access the spawn manager mutably (for registering actor factories).
1149    pub fn spawn_manager_mut(&mut self) -> &mut SpawnManager {
1150        &mut self.spawn_manager
1151    }
1152
1153    /// Register an actor type for remote spawning on this node.
1154    ///
1155    /// Registers the factory in both the struct-based SpawnManager and the
1156    /// native SpawnManagerActor (if started via `start_system_actors()`).
1157    pub fn register_factory(
1158        &mut self,
1159        type_name: impl Into<String>,
1160        factory: impl Fn(&[u8]) -> Result<Box<dyn std::any::Any + Send>, dactor::remote::SerializationError>
1161            + Send
1162            + Sync
1163            + 'static,
1164    ) {
1165        let type_name = type_name.into();
1166        let factory = Arc::new(factory);
1167
1168        // Register in struct-based manager
1169        self.spawn_manager
1170            .type_registry_mut()
1171            .register_factory(type_name.clone(), {
1172                let f = factory.clone();
1173                move |bytes: &[u8]| f(bytes)
1174            });
1175
1176        // Forward to native actor if started
1177        if let Some(ref actors) = self.system_actors {
1178            let (tx, _rx) = tokio::sync::oneshot::channel();
1179            let f = factory;
1180            let _ = actors.spawn_manager.cast(
1181                crate::system_actors::SpawnManagerMsg::RegisterFactory {
1182                    type_name,
1183                    factory: Box::new(move |bytes: &[u8]| f(bytes)),
1184                    reply: tx,
1185                },
1186            );
1187        }
1188    }
1189
1190    /// Process a remote spawn request.
1191    ///
1192    /// Looks up the actor type in the registry, deserializes Args from bytes,
1193    /// and returns the constructed actor along with its assigned [`ActorId`].
1194    ///
1195    /// The returned `ActorId` is pre-assigned for the remote spawn flow where
1196    /// the runtime controls ID assignment. The caller must use this ID when
1197    /// registering the spawned actor (not via the regular `spawn()` path,
1198    /// which assigns its own IDs).
1199    ///
1200    /// **Note:** Currently uses the simple factory API (`TypeRegistry::create_actor`).
1201    /// For actors with non-trivial `Deps`, use `spawn_manager_mut()` to access
1202    /// `TypeRegistry::create_actor_with_deps()` directly.
1203    ///
1204    /// Returns `Ok((actor_id, actor))` on success, or `Err(SpawnResponse::Failure)`
1205    /// if the type is not found or deserialization fails.
1206    pub fn handle_spawn_request(
1207        &mut self,
1208        request: &SpawnRequest,
1209    ) -> Result<(ActorId, Box<dyn std::any::Any + Send>), SpawnResponse> {
1210        match self.spawn_manager.create_actor(request) {
1211            Ok(actor) => {
1212                let local = self.next_local.fetch_add(1, Ordering::SeqCst);
1213                let actor_id = ActorId {
1214                    node: self.node_id.clone(),
1215                    local,
1216                };
1217                self.spawn_manager.record_spawn(actor_id.clone());
1218                Ok((actor_id, actor))
1219            }
1220            Err(e) => Err(SpawnResponse::Failure {
1221                request_id: request.request_id.clone(),
1222                error: e.to_string(),
1223            }),
1224        }
1225    }
1226
1227    // -----------------------------------------------------------------------
1228    // SA2: WatchManager wiring
1229    // -----------------------------------------------------------------------
1230
1231    /// Access the watch manager (for remote watch subscriptions).
1232    pub fn watch_manager(&self) -> &WatchManager {
1233        &self.watch_manager
1234    }
1235
1236    /// Access the watch manager mutably.
1237    pub fn watch_manager_mut(&mut self) -> &mut WatchManager {
1238        &mut self.watch_manager
1239    }
1240
1241    /// Register a remote watch: a remote watcher wants to know when a local
1242    /// actor terminates.
1243    pub fn remote_watch(&mut self, target: ActorId, watcher: ActorId) {
1244        self.watch_manager.watch(target, watcher);
1245    }
1246
1247    /// Remove a remote watch subscription.
1248    pub fn remote_unwatch(&mut self, target: &ActorId, watcher: &ActorId) {
1249        self.watch_manager.unwatch(target, watcher);
1250    }
1251
1252    /// Called when a local actor terminates. Returns notifications for all
1253    /// remote watchers that should be sent to their respective nodes.
1254    ///
1255    /// **Note:** This must be called explicitly by the integration layer.
1256    /// It is not yet automatically wired into ractor's actor stop lifecycle.
1257    pub fn notify_terminated(&mut self, terminated: &ActorId) -> Vec<WatchNotification> {
1258        self.watch_manager.on_terminated(terminated)
1259    }
1260
1261    // -----------------------------------------------------------------------
1262    // SA3: CancelManager wiring
1263    // -----------------------------------------------------------------------
1264
1265    /// Access the cancel manager.
1266    pub fn cancel_manager(&self) -> &CancelManager {
1267        &self.cancel_manager
1268    }
1269
1270    /// Access the cancel manager mutably.
1271    pub fn cancel_manager_mut(&mut self) -> &mut CancelManager {
1272        &mut self.cancel_manager
1273    }
1274
1275    /// Register a cancellation token for a request (for remote cancel support).
1276    pub fn register_cancel(&mut self, request_id: String, token: CancellationToken) {
1277        self.cancel_manager.register(request_id, token);
1278    }
1279
1280    /// Process a remote cancellation request.
1281    pub fn cancel_request(&mut self, request_id: &str) -> CancelResponse {
1282        self.cancel_manager.cancel(request_id)
1283    }
1284
1285    /// Clean up a cancellation token after its request completes normally.
1286    ///
1287    /// Should be called when a remote ask/stream/feed completes successfully
1288    /// to prevent stale tokens from accumulating.
1289    pub fn complete_request(&mut self, request_id: &str) {
1290        self.cancel_manager.remove(request_id);
1291    }
1292
1293    // -----------------------------------------------------------------------
1294    // SA4: NodeDirectory wiring
1295    // -----------------------------------------------------------------------
1296
1297    /// Access the node directory.
1298    pub fn node_directory(&self) -> &NodeDirectory {
1299        &self.node_directory
1300    }
1301
1302    /// Access the node directory mutably.
1303    pub fn node_directory_mut(&mut self) -> &mut NodeDirectory {
1304        &mut self.node_directory
1305    }
1306
1307    /// Register a peer node as connected in the directory.
1308    ///
1309    /// **Post-validation only:** this method assumes the peer has already
1310    /// passed the version handshake. Callers should validate compatibility
1311    /// (via [`handshake_request`](Self::handshake_request) +
1312    /// [`validate_handshake`](dactor::validate_handshake) +
1313    /// [`verify_peer_identity`](dactor::verify_peer_identity)) before calling
1314    /// this method. Direct calls bypass compatibility checks.
1315    ///
1316    /// If the peer already exists, updates its status to `Connected` and
1317    /// preserves the existing address when `address` is `None`.
1318    /// Emits a `ClusterEvent::NodeJoined` if the peer was not previously connected.
1319    pub fn connect_peer(&mut self, peer_id: NodeId, address: Option<String>) {
1320        let was_connected = self.node_directory.is_connected(&peer_id);
1321        if let Some(existing) = self.node_directory.get_peer(&peer_id) {
1322            // Preserve existing address if new address is None
1323            let resolved_address = address.or_else(|| existing.address.clone());
1324            self.node_directory.remove_peer(&peer_id);
1325            self.node_directory.add_peer(peer_id.clone(), resolved_address);
1326        } else {
1327            self.node_directory.add_peer(peer_id.clone(), address);
1328        }
1329        self.node_directory.set_status(&peer_id, PeerStatus::Connected);
1330        if !was_connected {
1331            self.cluster_events.emit(dactor::ClusterEvent::NodeJoined(peer_id));
1332        }
1333    }
1334
1335    /// Mark a peer as disconnected.
1336    ///
1337    /// Emits a `ClusterEvent::NodeLeft` if the peer was previously connected.
1338    pub fn disconnect_peer(&mut self, peer_id: &NodeId) {
1339        let was_connected = self.node_directory.is_connected(peer_id);
1340        self.node_directory.set_status(peer_id, PeerStatus::Disconnected);
1341        if was_connected {
1342            self.cluster_events.emit(dactor::ClusterEvent::NodeLeft(peer_id.clone()));
1343        }
1344    }
1345
1346    /// Check if a peer node is connected.
1347    pub fn is_peer_connected(&self, peer_id: &NodeId) -> bool {
1348        self.node_directory.is_connected(peer_id)
1349    }
1350
1351    // -----------------------------------------------------------------------
1352    // JH1-JH2: Actor lifecycle handles
1353    // -----------------------------------------------------------------------
1354
1355    /// Wait for an actor to stop.
1356    ///
1357    /// Returns `Ok(())` when the actor finishes cleanly, or `Err` if the
1358    /// actor panicked in `on_stop`. The stop receiver is consumed and removed
1359    /// from the map.
1360    ///
1361    /// Returns `Ok(())` immediately if no stop receiver is stored for this ID
1362    /// (e.g., actor was already awaited or was not spawned by this runtime).
1363    pub async fn await_stop(&self, actor_id: &ActorId) -> Result<(), String> {
1364        let rx = {
1365            let mut receivers = self.stop_receivers.lock().unwrap();
1366            receivers.remove(actor_id)
1367        };
1368        match rx {
1369            Some(rx) => rx
1370                .await
1371                .map_err(|_| "stop notifier dropped".to_string())
1372                .and_then(|r| r),
1373            None => Ok(()),
1374        }
1375    }
1376
1377    /// Wait for all spawned actors to stop.
1378    ///
1379    /// Drains all stored stop receivers and awaits them all. Returns the first
1380    /// error encountered, but always waits for every actor to finish.
1381    pub async fn await_all(&self) -> Result<(), String> {
1382        let receivers: Vec<_> = {
1383            let mut map = self.stop_receivers.lock().unwrap();
1384            map.drain().collect()
1385        };
1386        let mut first_error = None;
1387        for (_, rx) in receivers {
1388            let result = rx.await.map_err(|e| format!("stop notifier dropped: {e}")).and_then(|r| r);
1389            if let Err(e) = result {
1390                if first_error.is_none() {
1391                    first_error = Some(e);
1392                }
1393            }
1394        }
1395        match first_error {
1396            Some(e) => Err(e),
1397            None => Ok(()),
1398        }
1399    }
1400
1401    /// Remove completed stop receivers from the map.
1402    ///
1403    /// Call periodically to prevent stale entries from accumulating
1404    /// for actors that stopped without being awaited.
1405    pub fn cleanup_finished(&self) {
1406        let mut receivers = self.stop_receivers.lock().unwrap();
1407        receivers.retain(|_, rx| {
1408            matches!(rx.try_recv(), Err(tokio::sync::oneshot::error::TryRecvError::Empty))
1409        });
1410    }
1411
1412    /// Number of actors with stored stop receivers.
1413    ///
1414    /// Note: includes receivers for actors that have already stopped but
1415    /// haven't been awaited or cleaned up. Call `cleanup_finished()` first
1416    /// for an accurate count of running actors.
1417    pub fn active_handle_count(&self) -> usize {
1418        self.stop_receivers.lock().unwrap().len()
1419    }
1420}
1421
1422impl Default for RactorRuntime {
1423    fn default() -> Self {
1424        Self::new()
1425    }
1426}
1427
1428// ---------------------------------------------------------------------------
1429// NA10: SystemMessageRouter for ractor
1430// ---------------------------------------------------------------------------
1431
1432// NOTE: System messages routed here update the **native system actors** (the
1433// mailbox-based ractor actors spawned by `start_system_actors()`). The runtime
1434// also keeps plain struct system actors (`self.watch_manager`, etc.) for the
1435// backward-compatible sync API. This dual-state pattern is intentional — see
1436// progress.md "Dual struct+actor pattern" design decision.
1437
1438#[async_trait::async_trait]
1439impl dactor::system_router::SystemMessageRouter for RactorRuntime {
1440    async fn route_system_envelope(
1441        &self,
1442        envelope: dactor::remote::WireEnvelope,
1443    ) -> Result<dactor::system_router::RoutingOutcome, dactor::system_router::RoutingError> {
1444        use dactor::system_actors::*;
1445        use dactor::system_router::{RoutingError, RoutingOutcome};
1446
1447        dactor::system_router::validate_system_message_type(&envelope.message_type)?;
1448
1449        let refs = self
1450            .system_actors
1451            .as_ref()
1452            .ok_or_else(|| RoutingError::new("system actors not started"))?;
1453
1454        match envelope.message_type.as_str() {
1455            SYSTEM_MSG_TYPE_SPAWN => {
1456                let request = dactor::proto::decode_spawn_request(&envelope.body)
1457                    .map_err(|e| RoutingError::new(format!("decode SpawnRequest: {e}")))?;
1458
1459                let req_id = request.request_id.clone();
1460                let (tx, rx) = tokio::sync::oneshot::channel();
1461                refs.spawn_manager
1462                    .cast(crate::system_actors::SpawnManagerMsg::HandleRequest {
1463                        request,
1464                        reply: tx,
1465                    })
1466                    .map_err(|e| RoutingError::new(format!("SpawnManager mailbox: {e}")))?;
1467
1468                let result = rx
1469                    .await
1470                    .map_err(|_| RoutingError::new("SpawnManager reply dropped"))?;
1471
1472                match result {
1473                    Ok((actor_id, _actor)) => Ok(RoutingOutcome::SpawnCompleted {
1474                        request_id: req_id,
1475                        actor_id,
1476                    }),
1477                    Err(SpawnResponse::Failure { request_id, error }) => {
1478                        Ok(RoutingOutcome::SpawnFailed { request_id, error })
1479                    }
1480                    Err(SpawnResponse::Success { .. }) => {
1481                        unreachable!("SpawnResult::Err always wraps SpawnResponse::Failure")
1482                    }
1483                }
1484            }
1485
1486            SYSTEM_MSG_TYPE_WATCH => {
1487                let request = dactor::proto::decode_watch_request(&envelope.body)
1488                    .map_err(|e| RoutingError::new(format!("decode WatchRequest: {e}")))?;
1489
1490                refs.watch_manager
1491                    .cast(crate::system_actors::WatchManagerMsg::Watch {
1492                        target: request.target,
1493                        watcher: request.watcher,
1494                    })
1495                    .map_err(|e| RoutingError::new(format!("WatchManager mailbox: {e}")))?;
1496
1497                Ok(RoutingOutcome::Acknowledged)
1498            }
1499
1500            SYSTEM_MSG_TYPE_UNWATCH => {
1501                let request = dactor::proto::decode_unwatch_request(&envelope.body)
1502                    .map_err(|e| RoutingError::new(format!("decode UnwatchRequest: {e}")))?;
1503
1504                refs.watch_manager
1505                    .cast(crate::system_actors::WatchManagerMsg::Unwatch {
1506                        target: request.target,
1507                        watcher: request.watcher,
1508                    })
1509                    .map_err(|e| RoutingError::new(format!("WatchManager mailbox: {e}")))?;
1510
1511                Ok(RoutingOutcome::Acknowledged)
1512            }
1513
1514            SYSTEM_MSG_TYPE_CANCEL => {
1515                let request = dactor::proto::decode_cancel_request(&envelope.body)
1516                    .map_err(|e| RoutingError::new(format!("decode CancelRequest: {e}")))?;
1517
1518                let request_id = request
1519                    .request_id
1520                    .ok_or_else(|| RoutingError::new("CancelRequest missing request_id"))?;
1521
1522                let (tx, rx) = tokio::sync::oneshot::channel();
1523                refs.cancel_manager
1524                    .cast(crate::system_actors::CancelManagerMsg::Cancel {
1525                        request_id,
1526                        reply: tx,
1527                    })
1528                    .map_err(|e| RoutingError::new(format!("CancelManager mailbox: {e}")))?;
1529
1530                let response = rx
1531                    .await
1532                    .map_err(|_| RoutingError::new("CancelManager reply dropped"))?;
1533
1534                match response {
1535                    CancelResponse::Acknowledged => Ok(RoutingOutcome::CancelAcknowledged),
1536                    CancelResponse::NotFound { reason } => {
1537                        Ok(RoutingOutcome::CancelNotFound { reason })
1538                    }
1539                }
1540            }
1541
1542            SYSTEM_MSG_TYPE_CONNECT_PEER => {
1543                let (peer_id, address) = dactor::proto::decode_connect_peer(&envelope.body)
1544                    .map_err(|e| RoutingError::new(format!("decode ConnectPeer: {e}")))?;
1545
1546                refs.node_directory
1547                    .cast(crate::system_actors::NodeDirectoryMsg::ConnectPeer {
1548                        peer_id,
1549                        address,
1550                    })
1551                    .map_err(|e| RoutingError::new(format!("NodeDirectory mailbox: {e}")))?;
1552
1553                Ok(RoutingOutcome::Acknowledged)
1554            }
1555
1556            SYSTEM_MSG_TYPE_DISCONNECT_PEER => {
1557                let peer_id = dactor::proto::decode_disconnect_peer(&envelope.body)
1558                    .map_err(|e| RoutingError::new(format!("decode DisconnectPeer: {e}")))?;
1559
1560                refs.node_directory
1561                    .cast(crate::system_actors::NodeDirectoryMsg::DisconnectPeer { peer_id })
1562                    .map_err(|e| RoutingError::new(format!("NodeDirectory mailbox: {e}")))?;
1563
1564                Ok(RoutingOutcome::Acknowledged)
1565            }
1566
1567            // validate_system_message_type() already rejected unknown types above,
1568            // so this branch only triggers if a new constant is added without a
1569            // handler — a compile-time-detectable oversight.
1570            other => Err(RoutingError::new(format!(
1571                "unhandled system message type: {other}"
1572            ))),
1573        }
1574    }
1575}
1576