Skip to main content

dactor_coerce/
runtime.rs

1//! V0.2 coerce adapter runtime for the dactor actor framework.
2//!
3//! Bridges dactor's `Actor`/`Handler<M>`/`ActorRef<A>` API with coerce-rs's
4//! actor system using type-erased dispatch.
5
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Mutex};
9use std::time::Duration;
10
11use futures::FutureExt;
12use tokio_util::sync::CancellationToken;
13
14use dactor::actor::{
15    Actor, ActorContext, ActorError, ActorRef, AskReply, ReduceHandler, Handler, ExpandHandler,
16    TransformHandler,
17};
18use dactor::dead_letter::{DeadLetterEvent, DeadLetterHandler, DeadLetterReason};
19use dactor::dispatch::{AskDispatch, Dispatch, ReduceDispatch, ExpandDispatch, TransformDispatch, TypedDispatch};
20use dactor::errors::{ActorSendError, ErrorAction, RuntimeError};
21use dactor::interceptor::{
22    collect_handler_wrappers, apply_handler_wrappers,
23    Disposition, DropObserver, InboundContext, InboundInterceptor, OutboundInterceptor, Outcome,
24    SendMode,
25};
26use dactor::mailbox::MailboxConfig;
27use dactor::message::{Headers, Message, RuntimeHeaders};
28use dactor::node::{ActorId, NodeId};
29use dactor::runtime_support::{
30    spawn_reduce_batched_drain, spawn_reduce_drain, spawn_transform_drain,
31    wrap_batched_stream_with_interception, wrap_stream_with_interception, OutboundPipeline,
32};
33use dactor::stream::{
34    BatchConfig, BatchReader, BatchWriter, BoxStream, StreamReceiver, StreamSender,
35};
36use dactor::supervision::ChildTerminated;
37use dactor::system_actors::{
38    CancelManager, CancelResponse, NodeDirectory, PeerStatus, SpawnManager, SpawnRequest,
39    SpawnResponse, WatchManager, WatchNotification,
40};
41use dactor::type_registry::TypeRegistry;
42
43use crate::cluster::CoerceClusterEvents;
44
45// -- coerce-rs imports --
46use coerce::actor::context::ActorContext as CoerceActorCtx;
47use coerce::actor::message::{Handler as CoerceHandler, Message as CoerceMessage};
48use coerce::actor::system::ActorSystem;
49use coerce::actor::{Actor as CoerceActor, LocalActorRef};
50
51// ---------------------------------------------------------------------------
52// Watch registry
53// ---------------------------------------------------------------------------
54
55/// A type-erased entry in the watch registry.
56struct WatchEntry {
57    watcher_id: ActorId,
58    /// Closure that delivers a [`ChildTerminated`] to the watcher actor.
59    notify: Box<dyn Fn(ChildTerminated) + Send + Sync>,
60}
61
62/// Shared watch registry mapping watched actor ID → list of watcher entries.
63type WatcherMap = Arc<Mutex<HashMap<ActorId, Vec<WatchEntry>>>>;
64
65// ---------------------------------------------------------------------------
66// Coerce wrapper actor
67// ---------------------------------------------------------------------------
68
69/// The coerce message type wrapping a type-erased dactor dispatch envelope.
70// Wrap Box<dyn Dispatch<A>> in a Sync-safe container using Mutex.
71// coerce::actor::message::Message requires Sync + Send + 'static.
72// The Mutex is never contended — it's locked once in handle() to take
73// the dispatch out. This avoids an unsound manual Sync impl.
74struct DactorMsg<A: Actor>(std::sync::Mutex<Option<Box<dyn Dispatch<A>>>>);
75
76impl<A: Actor> DactorMsg<A> {
77    fn new(dispatch: Box<dyn Dispatch<A>>) -> Self {
78        Self(std::sync::Mutex::new(Some(dispatch)))
79    }
80
81    fn take(&self) -> Option<Box<dyn Dispatch<A>>> {
82        self.0.lock().unwrap_or_else(|e| e.into_inner()).take()
83    }
84}
85
86impl<A: Actor + Send + Sync + 'static> CoerceMessage for DactorMsg<A> {
87    type Result = ();
88}
89
90/// The coerce `Actor` implementation that bridges to a dactor `Actor`.
91struct CoerceDactorActor<A: Actor> {
92    actor: A,
93    ctx: ActorContext,
94    interceptors: Vec<Box<dyn InboundInterceptor>>,
95    watchers: WatcherMap,
96    stop_reason: Option<String>,
97    dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
98    /// Notified when the actor stops (for await_stop).
99    /// Wrapped in Mutex to make CoerceDactorActor Sync (oneshot::Sender is !Sync).
100    stop_notifier: Mutex<Option<tokio::sync::oneshot::Sender<Result<(), String>>>>,
101}
102
103#[async_trait::async_trait]
104impl<A: Actor + Send + Sync + 'static> CoerceActor for CoerceDactorActor<A> {
105    async fn started(&mut self, _ctx: &mut CoerceActorCtx) {
106        self.actor.on_start(&mut self.ctx).await;
107    }
108
109    async fn stopped(&mut self, _ctx: &mut CoerceActorCtx) {
110        // Reset context to lifecycle semantics before on_stop
111        self.ctx.send_mode = None;
112        self.ctx.headers = Headers::new();
113        self.ctx.set_cancellation_token(None);
114
115        // Run on_stop with panic catching so we can propagate errors
116        let stop_result =
117            std::panic::AssertUnwindSafe(self.actor.on_stop())
118                .catch_unwind()
119                .await;
120        let stop_err = match stop_result {
121            Ok(()) => None,
122            Err(_panic) => Some("actor panicked in on_stop".to_string()),
123        };
124
125        // Notify all watchers that this actor has terminated.
126        let actor_id = self.ctx.actor_id.clone();
127        let actor_name = self.ctx.actor_name.clone();
128        let entries = {
129            let mut watchers = self.watchers.lock().unwrap();
130            // Remove entries where this actor is the TARGET (watchers of this actor)
131            let target_entries = watchers.remove(&actor_id).unwrap_or_default();
132            // Also clean up entries where this actor is the WATCHER (prevent leak)
133            for entries in watchers.values_mut() {
134                entries.retain(|e| e.watcher_id != actor_id);
135            }
136            watchers.retain(|_, v| !v.is_empty());
137            target_entries
138        };
139        if !entries.is_empty() {
140            let notification = ChildTerminated {
141                child_id: actor_id,
142                child_name: actor_name,
143                reason: self.stop_reason.clone(),
144            };
145            for entry in &entries {
146                (entry.notify)(notification.clone());
147            }
148        }
149
150        // Notify await_stop() waiters with the result
151        if let Some(tx) = self.stop_notifier.lock().unwrap().take() {
152            let result = match &stop_err {
153                Some(e) => Err(e.clone()),
154                None => Ok(()),
155            };
156            let _ = tx.send(result);
157        }
158    }
159}
160
161#[async_trait::async_trait]
162impl<A: Actor + Send + Sync + 'static> CoerceHandler<DactorMsg<A>> for CoerceDactorActor<A> {
163    async fn handle(&mut self, msg: DactorMsg<A>, coerce_ctx: &mut CoerceActorCtx) {
164        let dispatch = match msg.take() {
165            Some(d) => d,
166            None => return,
167        };
168
169        // Capture metadata before dispatch consumes the message
170        let send_mode = dispatch.send_mode();
171        let message_type = dispatch.message_type_name();
172
173        self.ctx.send_mode = Some(send_mode);
174        self.ctx.headers = Headers::new();
175
176        // Run inbound interceptor pipeline
177        let runtime_headers = RuntimeHeaders::new();
178        let mut headers = Headers::new();
179        let mut total_delay = Duration::ZERO;
180        let mut rejection: Option<(String, Disposition)> = None;
181
182        {
183            let ictx = InboundContext {
184                actor_id: self.ctx.actor_id.clone(),
185                actor_name: &self.ctx.actor_name,
186                message_type,
187                send_mode,
188                remote: false,
189                origin_node: None,
190            };
191
192            for interceptor in &self.interceptors {
193                match interceptor.on_receive(
194                    &ictx,
195                    &runtime_headers,
196                    &mut headers,
197                    dispatch.message_any(),
198                ) {
199                    Disposition::Continue => {}
200                    Disposition::Delay(d) => {
201                        total_delay += d;
202                    }
203                    disp @ (Disposition::Drop | Disposition::Reject(_) | Disposition::Retry(_)) => {
204                        rejection = Some((interceptor.name().to_string(), disp));
205                        break;
206                    }
207                }
208            }
209        }
210
211        // If rejected/dropped/retry, propagate proper error to caller
212        if let Some((interceptor_name, disposition)) = rejection {
213            if matches!(disposition, Disposition::Drop) {
214                if let Some(ref handler) = *self.dead_letter_handler {
215                    let event = DeadLetterEvent {
216                        target_id: self.ctx.actor_id.clone(),
217                        target_name: Some(self.ctx.actor_name.clone()),
218                        message_type,
219                        send_mode,
220                        reason: DeadLetterReason::DroppedByInterceptor {
221                            interceptor: interceptor_name.clone(),
222                        },
223                        message: None,
224                    };
225                    let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
226                        handler.on_dead_letter(event);
227                    }));
228                }
229            }
230            dispatch.reject(disposition, &interceptor_name);
231            return;
232        }
233
234        if !total_delay.is_zero() {
235            tokio::time::sleep(total_delay).await;
236        }
237
238        // Collect handler wrappers BEFORE moving headers into ctx,
239        // so interceptors can read headers without borrow conflicts.
240        let ictx_for_wrap = InboundContext {
241            actor_id: self.ctx.actor_id.clone(),
242            actor_name: &self.ctx.actor_name,
243            message_type,
244            send_mode,
245            remote: false,
246            origin_node: None,
247        };
248        let wrappers = collect_handler_wrappers(&self.interceptors, &ictx_for_wrap, &headers);
249        let needs_wrap = wrappers.iter().any(|w| w.is_some());
250
251        // Copy interceptor-populated headers to ActorContext
252        self.ctx.headers = headers;
253
254        // Propagate cancellation token to context
255        let cancel_token = dispatch.cancel_token();
256        self.ctx.set_cancellation_token(cancel_token.clone());
257
258        // Check if already cancelled before dispatching
259        if let Some(ref token) = cancel_token {
260            if token.is_cancelled() {
261                dispatch.cancel();
262                self.ctx.set_cancellation_token(None);
263                return;
264            }
265        }
266
267        // Dispatch with panic catching and cancellation racing
268        let result = if needs_wrap {
269            let (result_tx, mut result_rx) = tokio::sync::oneshot::channel();
270
271            let inner: std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>> = Box::pin(async {
272                let r = std::panic::AssertUnwindSafe(
273                    dispatch.dispatch(&mut self.actor, &mut self.ctx),
274                )
275                .catch_unwind()
276                .await;
277                let _ = result_tx.send(r);
278            });
279
280            let wrapped = apply_handler_wrappers(wrappers, inner);
281
282            let wrapped = std::panic::AssertUnwindSafe(wrapped);
283
284            if let Some(ref token) = cancel_token {
285                tokio::select! {
286                    biased;
287                    _ = wrapped.catch_unwind() => {},
288                    _ = token.cancelled() => {
289                        self.ctx.set_cancellation_token(None);
290                        return;
291                    }
292                }
293            } else {
294                wrapped.catch_unwind().await.ok();
295            }
296
297            match result_rx.try_recv() {
298                Ok(r) => r,
299                Err(_) => Err(Box::new(
300                    "interceptor wrap_handler did not await the handler future",
301                ) as Box<dyn std::any::Any + Send>),
302            }
303        } else {
304            if let Some(ref token) = cancel_token {
305                let dispatch_fut =
306                    std::panic::AssertUnwindSafe(dispatch.dispatch(&mut self.actor, &mut self.ctx))
307                        .catch_unwind();
308                tokio::select! {
309                    biased;
310                    r = dispatch_fut => r,
311                    _ = token.cancelled() => {
312                        self.ctx.set_cancellation_token(None);
313                        return;
314                    }
315                }
316            } else {
317                std::panic::AssertUnwindSafe(dispatch.dispatch(&mut self.actor, &mut self.ctx))
318                    .catch_unwind()
319                    .await
320            }
321        };
322
323        self.ctx.set_cancellation_token(None);
324
325        // Build context for on_complete
326        let ictx = InboundContext {
327            actor_id: self.ctx.actor_id.clone(),
328            actor_name: &self.ctx.actor_name,
329            message_type,
330            send_mode,
331            remote: false,
332            origin_node: None,
333        };
334
335        match result {
336            Ok(dispatch_result) => {
337                let outcome = match (&dispatch_result.reply, send_mode) {
338                    (Some(reply), SendMode::Ask) => Outcome::AskSuccess {
339                        reply: reply.as_ref(),
340                    },
341                    _ => Outcome::TellSuccess,
342                };
343
344                for interceptor in &self.interceptors {
345                    interceptor.on_complete(&ictx, &runtime_headers, &self.ctx.headers, &outcome);
346                }
347
348                // Send reply to caller AFTER interceptors have seen it
349                dispatch_result.send_reply();
350            }
351            Err(_panic) => {
352                let error = ActorError::internal("handler panicked");
353                let action = self.actor.on_error(&error);
354
355                let outcome = Outcome::HandlerError { error };
356                for interceptor in &self.interceptors {
357                    interceptor.on_complete(&ictx, &runtime_headers, &self.ctx.headers, &outcome);
358                }
359
360                match action {
361                    ErrorAction::Resume => {
362                        // Actor resumes — do NOT set stop_reason
363                    }
364                    ErrorAction::Stop | ErrorAction::Escalate => {
365                        self.stop_reason = Some("handler panicked".into());
366                        coerce_ctx.stop(None);
367                    }
368                    ErrorAction::Restart => {
369                        tracing::warn!("Restart not fully implemented, treating as Resume");
370                    }
371                }
372            }
373        }
374    }
375}
376
377// ---------------------------------------------------------------------------
378// CoerceActorRef — dactor ActorRef backed by coerce
379// ---------------------------------------------------------------------------
380
381/// A dactor `ActorRef` backed by a coerce `LocalActorRef`.
382///
383/// Messages are delivered through coerce's mailbox as type-erased dispatch
384/// envelopes, enabling multiple `Handler<M>` impls per actor.
385///
386/// When configured with [`MailboxConfig::Bounded`], a bounded `mpsc` channel
387/// sits in front of the coerce actor.  A forwarding task drains the channel
388/// and delivers messages to coerce, providing backpressure / overflow control
389/// at the dactor level while coerce's internal mailbox remains unbounded.
390pub struct CoerceActorRef<A: Actor + Send + Sync + 'static> {
391    id: ActorId,
392    name: String,
393    inner: LocalActorRef<CoerceDactorActor<A>>,
394    /// When Some, sends go through this bounded channel (a forwarder task
395    /// drains it into `inner`).  When None, sends go directly to `inner`.
396    bounded_tx: Option<BoundedMailboxSender<DactorMsg<A>>>,
397    outbound_interceptors: Arc<Vec<Box<dyn OutboundInterceptor>>>,
398    drop_observer: Option<Arc<dyn DropObserver>>,
399    dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
400}
401
402use dactor::runtime_support::BoundedMailboxSender;
403
404impl<A: Actor + Send + Sync + 'static> Clone for CoerceActorRef<A> {
405    fn clone(&self) -> Self {
406        Self {
407            id: self.id.clone(),
408            name: self.name.clone(),
409            inner: self.inner.clone(),
410            bounded_tx: self.bounded_tx.clone(),
411            outbound_interceptors: self.outbound_interceptors.clone(),
412            drop_observer: self.drop_observer.clone(),
413            dead_letter_handler: self.dead_letter_handler.clone(),
414        }
415    }
416}
417
418impl<A: Actor + Send + Sync + 'static> std::fmt::Debug for CoerceActorRef<A> {
419    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
420        write!(f, "CoerceActorRef({}, {:?})", self.name, self.id)
421    }
422}
423
424impl<A: Actor + Send + Sync> CoerceActorRef<A> {
425    fn outbound_pipeline(&self) -> OutboundPipeline {
426        OutboundPipeline {
427            interceptors: self.outbound_interceptors.clone(),
428            drop_observer: self.drop_observer.clone(),
429            target_id: self.id.clone(),
430            target_name: self.name.clone(),
431        }
432    }
433
434    fn notify_dead_letter(
435        &self,
436        message_type: &'static str,
437        send_mode: SendMode,
438        reason: DeadLetterReason,
439    ) {
440        if let Some(ref handler) = *self.dead_letter_handler {
441            let event = DeadLetterEvent {
442                target_id: self.id.clone(),
443                target_name: Some(self.name.clone()),
444                message_type,
445                send_mode,
446                reason,
447                message: None,
448            };
449            let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
450                handler.on_dead_letter(event);
451            }));
452        }
453    }
454
455    /// Send a dispatch envelope through the bounded channel (if configured)
456    /// or directly to the coerce actor.
457    fn send_dispatch(&self, dispatch: Box<dyn Dispatch<A>>) -> Result<(), ActorSendError> {
458        let dactor_msg = DactorMsg::new(dispatch);
459        if let Some(ref btx) = self.bounded_tx {
460            btx.try_send(dactor_msg)
461        } else {
462            self.inner
463                .notify(dactor_msg)
464                .map_err(|e| ActorSendError(e.to_string()))
465        }
466    }
467}
468
469impl<A: Actor + Send + Sync + 'static> ActorRef<A> for CoerceActorRef<A> {
470    fn id(&self) -> ActorId {
471        self.id.clone()
472    }
473
474    fn name(&self) -> String {
475        self.name.clone()
476    }
477
478    fn is_alive(&self) -> bool {
479        if let Some(ref btx) = self.bounded_tx {
480            !btx.is_closed() && self.inner.is_valid()
481        } else {
482            self.inner.is_valid()
483        }
484    }
485
486    fn pending_messages(&self) -> usize {
487        if let Some(ref btx) = self.bounded_tx {
488            btx.pending()
489        } else {
490            0 // unbounded — no depth info
491        }
492    }
493
494    fn stop(&self) {
495        let _ = self.inner.notify_stop();
496    }
497
498    fn tell<M>(&self, msg: M) -> Result<(), ActorSendError>
499    where
500        A: Handler<M>,
501        M: Message<Reply = ()>,
502    {
503        let pipeline = self.outbound_pipeline();
504        let result = pipeline.run_on_send(SendMode::Tell, &msg);
505        match result.disposition {
506            Disposition::Continue => {}
507            Disposition::Drop | Disposition::Reject(_) | Disposition::Retry(_) => return Ok(()),
508            Disposition::Delay(_) => {} // Not supported in sync tell
509        }
510
511        let dispatch: Box<dyn Dispatch<A>> = Box::new(TypedDispatch { msg });
512        self.send_dispatch(dispatch).map_err(|e| {
513            let reason = if e.0.contains("full") {
514                DeadLetterReason::MailboxFull
515            } else {
516                DeadLetterReason::ActorStopped
517            };
518            self.notify_dead_letter(
519                std::any::type_name::<M>(),
520                SendMode::Tell,
521                reason,
522            );
523            e
524        })
525    }
526
527    fn ask<M>(
528        &self,
529        msg: M,
530        cancel: Option<CancellationToken>,
531    ) -> Result<AskReply<M::Reply>, ActorSendError>
532    where
533        A: Handler<M>,
534        M: Message,
535    {
536        let pipeline = self.outbound_pipeline();
537        let result = pipeline.run_on_send(SendMode::Ask, &msg);
538        match result.disposition {
539            Disposition::Continue => {}
540            Disposition::Delay(_) => {} // Not supported in sync context
541            Disposition::Drop => {
542                let (tx, rx) = tokio::sync::oneshot::channel();
543                let _ = tx.send(Err(RuntimeError::ActorNotFound(
544                    "message dropped by outbound interceptor".into(),
545                )));
546                return Ok(AskReply::new(rx));
547            }
548            Disposition::Reject(reason) => {
549                let (tx, rx) = tokio::sync::oneshot::channel();
550                let _ = tx.send(Err(RuntimeError::Rejected {
551                    interceptor: result.interceptor_name.to_string(),
552                    reason,
553                }));
554                return Ok(AskReply::new(rx));
555            }
556            Disposition::Retry(retry_after) => {
557                let (tx, rx) = tokio::sync::oneshot::channel();
558                let _ = tx.send(Err(RuntimeError::RetryAfter {
559                    interceptor: result.interceptor_name.to_string(),
560                    retry_after,
561                }));
562                return Ok(AskReply::new(rx));
563            }
564        }
565
566        let (tx, rx) = tokio::sync::oneshot::channel();
567        let dispatch: Box<dyn Dispatch<A>> = Box::new(AskDispatch {
568            msg,
569            reply_tx: tx,
570            cancel,
571        });
572        self.send_dispatch(dispatch).map_err(|e| {
573            let reason = if e.0.contains("full") {
574                DeadLetterReason::MailboxFull
575            } else {
576                DeadLetterReason::ActorStopped
577            };
578            self.notify_dead_letter(
579                std::any::type_name::<M>(),
580                SendMode::Ask,
581                reason,
582            );
583            e
584        })?;
585        Ok(AskReply::new(rx))
586    }
587
588    fn expand<M, OutputItem>(
589        &self,
590        msg: M,
591        buffer: usize,
592        batch_config: Option<BatchConfig>,
593        cancel: Option<CancellationToken>,
594    ) -> Result<BoxStream<OutputItem>, ActorSendError>
595    where
596        A: ExpandHandler<M, OutputItem>,
597        M: Send + 'static,
598        OutputItem: Send + 'static,
599    {
600        let pipeline = self.outbound_pipeline();
601        let result = pipeline.run_on_send(SendMode::Expand, &msg);
602        match result.disposition {
603            Disposition::Continue => {}
604            Disposition::Delay(_) => {}
605            Disposition::Drop => {
606                return Err(ActorSendError(
607                    "stream dropped by outbound interceptor".into(),
608                ));
609            }
610            Disposition::Reject(reason) => {
611                return Err(ActorSendError(format!("stream rejected: {}", reason)));
612            }
613            Disposition::Retry(_) => {
614                return Err(ActorSendError(
615                    "stream retry requested by interceptor".into(),
616                ));
617            }
618        }
619
620        let buffer = buffer.max(1);
621        let (tx, rx) = tokio::sync::mpsc::channel(buffer);
622        let sender = StreamSender::new(tx);
623        let dispatch: Box<dyn Dispatch<A>> = Box::new(ExpandDispatch {
624            msg,
625            sender,
626            cancel,
627        });
628        self.send_dispatch(dispatch)?;
629
630        match batch_config {
631            Some(batch_config) => {
632                let (batch_tx, batch_rx) = tokio::sync::mpsc::channel::<Vec<OutputItem>>(buffer);
633                let reader = BatchReader::new(batch_rx);
634                let batch_delay = batch_config.max_delay;
635                tokio::spawn(async move {
636                    let mut writer = BatchWriter::new(batch_tx, batch_config);
637                    let mut rx = rx;
638                    loop {
639                        if writer.buffered_count() > 0 {
640                            let deadline = tokio::time::Instant::now() + batch_delay;
641                            tokio::select! {
642                                biased;
643                                item = rx.recv() => match item {
644                                    Some(item) => {
645                                        if writer.push(item).await.is_err() { break; }
646                                    }
647                                    None => break,
648                                },
649                                _ = tokio::time::sleep_until(deadline) => {
650                                    if writer.check_deadline().await.is_err() { break; }
651                                }
652                            }
653                        } else {
654                            match rx.recv().await {
655                                Some(item) => {
656                                    if writer.push(item).await.is_err() {
657                                        break;
658                                    }
659                                }
660                                None => break,
661                            }
662                        }
663                    }
664                    let _ = writer.flush().await;
665                });
666                Ok(wrap_batched_stream_with_interception(
667                    reader,
668                    buffer,
669                    pipeline,
670                    std::any::type_name::<M>(),
671                    SendMode::Expand,
672                ))
673            }
674            None => Ok(wrap_stream_with_interception(
675                    rx,
676                    buffer,
677                    pipeline,
678                    std::any::type_name::<M>(),
679                    SendMode::Expand,
680                )),
681        }
682    }
683
684    fn reduce<InputItem, Reply>(
685        &self,
686        input: BoxStream<InputItem>,
687        buffer: usize,
688        batch_config: Option<BatchConfig>,
689        cancel: Option<CancellationToken>,
690    ) -> Result<AskReply<Reply>, ActorSendError>
691    where
692        A: ReduceHandler<InputItem, Reply>,
693        InputItem: Send + 'static,
694        Reply: Send + 'static,
695    {
696        let buffer = buffer.max(1);
697        let (item_tx, item_rx) = tokio::sync::mpsc::channel(buffer);
698        let receiver = StreamReceiver::new(item_rx);
699        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
700        let dispatch: Box<dyn Dispatch<A>> = Box::new(ReduceDispatch {
701            receiver,
702            reply_tx,
703            cancel: cancel.clone(),
704        });
705        self.send_dispatch(dispatch)?;
706
707        let pipeline = self.outbound_pipeline();
708        match batch_config {
709            Some(batch_config) => {
710                spawn_reduce_batched_drain(
711                    input,
712                    item_tx,
713                    buffer,
714                    batch_config,
715                    cancel,
716                    pipeline,
717                    std::any::type_name::<InputItem>(),
718                );
719            }
720            None => {
721                spawn_reduce_drain(
722                    input,
723                    item_tx,
724                    cancel,
725                    pipeline,
726                    std::any::type_name::<InputItem>(),
727                );
728            }
729        }
730
731        Ok(AskReply::new(reply_rx))
732    }
733
734    fn transform<InputItem, OutputItem>(
735        &self,
736        input: BoxStream<InputItem>,
737        buffer: usize,
738        batch_config: Option<BatchConfig>,
739        cancel: Option<CancellationToken>,
740    ) -> Result<BoxStream<OutputItem>, ActorSendError>
741    where
742        A: TransformHandler<InputItem, OutputItem>,
743        InputItem: Send + 'static,
744        OutputItem: Send + 'static,
745    {
746        let buffer = buffer.max(1);
747        let (item_tx, item_rx) = tokio::sync::mpsc::channel(buffer);
748        let (output_tx, mut output_rx) = tokio::sync::mpsc::channel(buffer);
749        let receiver = StreamReceiver::new(item_rx);
750        let sender = StreamSender::new(output_tx);
751        let dispatch: Box<dyn Dispatch<A>> = Box::new(TransformDispatch::new(
752            receiver,
753            sender,
754            cancel.clone(),
755        ));
756        self.send_dispatch(dispatch)?;
757
758        let pipeline = self.outbound_pipeline();
759        spawn_transform_drain(
760            input,
761            item_tx,
762            cancel,
763            pipeline.clone(),
764            std::any::type_name::<InputItem>(),
765        );
766
767        match batch_config {
768            Some(batch_config) => {
769                let (batch_tx, batch_rx) =
770                    tokio::sync::mpsc::channel::<Vec<OutputItem>>(buffer);
771                let reader = BatchReader::new(batch_rx);
772                let batch_delay = batch_config.max_delay;
773                tokio::spawn(async move {
774                    let mut writer = BatchWriter::new(batch_tx, batch_config);
775                    loop {
776                        if writer.buffered_count() > 0 {
777                            let deadline = tokio::time::Instant::now() + batch_delay;
778                            tokio::select! {
779                                biased;
780                                item = output_rx.recv() => match item {
781                                    Some(item) => {
782                                        if writer.push(item).await.is_err() { break; }
783                                    }
784                                    None => break,
785                                },
786                                _ = tokio::time::sleep_until(deadline) => {
787                                    if writer.check_deadline().await.is_err() { break; }
788                                }
789                            }
790                        } else {
791                            match output_rx.recv().await {
792                                Some(item) => {
793                                    if writer.push(item).await.is_err() {
794                                        break;
795                                    }
796                                }
797                                None => break,
798                            }
799                        }
800                    }
801                    let _ = writer.flush().await;
802                });
803                Ok(wrap_batched_stream_with_interception(
804                    reader,
805                    buffer,
806                    pipeline,
807                    std::any::type_name::<OutputItem>(),
808                    SendMode::Transform,
809                ))
810            }
811            None => Ok(wrap_stream_with_interception(
812                output_rx,
813                buffer,
814                pipeline,
815                std::any::type_name::<OutputItem>(),
816                SendMode::Transform,
817            )),
818        }
819    }
820}
821
822// ---------------------------------------------------------------------------
823// SpawnOptions
824// ---------------------------------------------------------------------------
825
826/// Options for spawning an actor, including the inbound interceptor pipeline.
827/// Use `..Default::default()` to future-proof against new fields.
828pub struct SpawnOptions {
829    /// Inbound interceptors to attach to the actor.
830    pub interceptors: Vec<Box<dyn InboundInterceptor>>,
831    /// Mailbox capacity configuration.
832    ///
833    /// When [`Bounded`](MailboxConfig::Bounded), a bounded `mpsc` channel is
834    /// placed in front of the coerce actor to enforce backpressure.  The
835    /// [`OverflowStrategy`](dactor::mailbox::OverflowStrategy) controls what
836    /// happens when the channel is full.
837    pub mailbox: MailboxConfig,
838}
839
840impl Default for SpawnOptions {
841    fn default() -> Self {
842        Self {
843            interceptors: Vec::new(),
844            mailbox: MailboxConfig::Unbounded,
845        }
846    }
847}
848
849// ---------------------------------------------------------------------------
850// CoerceRuntime
851// ---------------------------------------------------------------------------
852
853/// References to the native coerce system actors spawned by the runtime.
854pub struct CoerceSystemActorRefs {
855    pub spawn_managers: Vec<LocalActorRef<crate::system_actors::SpawnManagerActor>>,
856    spawn_manager_counter: std::sync::atomic::AtomicU64,
857    pub watch_manager: LocalActorRef<crate::system_actors::WatchManagerActor>,
858    pub cancel_manager: LocalActorRef<crate::system_actors::CancelManagerActor>,
859    pub node_directory: LocalActorRef<crate::system_actors::NodeDirectoryActor>,
860}
861
862impl CoerceSystemActorRefs {
863    /// Get the spawn manager ref (round-robin if pooled).
864    pub fn spawn_manager(&self) -> &LocalActorRef<crate::system_actors::SpawnManagerActor> {
865        let idx = self.spawn_manager_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
866            as usize % self.spawn_managers.len();
867        &self.spawn_managers[idx]
868    }
869}
870
871/// A dactor v0.2 runtime backed by coerce-rs.
872///
873/// Actors are spawned as real coerce actors. Messages are delivered through
874/// coerce's mailbox as type-erased dispatch envelopes, supporting multiple
875/// `Handler<M>` impls per actor.
876///
877/// **Note:** `CoerceRuntime::new()` creates a `coerce::actor::system::ActorSystem`
878/// internally, which requires a tokio runtime context. All `spawn()` methods
879/// also require a tokio context.
880///
881/// ## System Actors
882///
883/// The runtime includes struct-based system actors for remote operations:
884/// - [`SpawnManager`] — handles remote actor spawn requests
885/// - [`WatchManager`] — handles remote watch/unwatch subscriptions
886/// - [`CancelManager`] — handles remote cancellation requests
887/// - [`NodeDirectory`] — tracks peer node connection state
888///
889/// Native coerce system actors are accessible via `system_actor_refs()` for
890/// transport routing after calling `start_system_actors()`.
891pub struct CoerceRuntime {
892    node_id: NodeId,
893    next_local: Arc<AtomicU64>,
894    system: ActorSystem,
895    cluster_events: CoerceClusterEvents,
896    outbound_interceptors: Arc<Vec<Box<dyn OutboundInterceptor>>>,
897    drop_observer: Option<Arc<dyn DropObserver>>,
898    dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
899    watchers: WatcherMap,
900    /// Plain struct system actors (backward-compatible sync API).
901    spawn_manager: SpawnManager,
902    watch_manager: WatchManager,
903    cancel_manager: CancelManager,
904    node_directory: NodeDirectory,
905    /// Native coerce system actor refs (lazily started via `start_system_actors()`).
906    system_actors: Option<CoerceSystemActorRefs>,
907    /// Stop notification receivers for await_stop(), keyed by ActorId.
908    #[allow(clippy::type_complexity)]
909    stop_receivers: Arc<Mutex<HashMap<ActorId, tokio::sync::oneshot::Receiver<Result<(), String>>>>>,
910    /// Application version for this node (informational, used in handshake).
911    app_version: Option<String>,
912}
913
914impl CoerceRuntime {
915    /// Create a new `CoerceRuntime`.
916    ///
917    /// **Requires a tokio runtime context** — `ActorSystem::new()` spawns
918    /// the coerce scheduler as a tokio task.
919    pub fn new() -> Self {
920        Self::create(NodeId("coerce-node".into()))
921    }
922
923    /// Create a new `CoerceRuntime` with a specific node ID.
924    ///
925    /// **Requires a tokio runtime context.**
926    pub fn with_node_id(node_id: NodeId) -> Self {
927        Self::create(node_id)
928    }
929
930    fn create(node_id: NodeId) -> Self {
931        Self {
932            node_id,
933            next_local: Arc::new(AtomicU64::new(1)),
934            system: ActorSystem::new(),
935            cluster_events: CoerceClusterEvents::new(),
936            outbound_interceptors: Arc::new(Vec::new()),
937            drop_observer: None,
938            dead_letter_handler: Arc::new(None),
939            watchers: Arc::new(Mutex::new(HashMap::new())),
940            spawn_manager: SpawnManager::new(TypeRegistry::new()),
941            watch_manager: WatchManager::new(),
942            cancel_manager: CancelManager::new(),
943            node_directory: NodeDirectory::new(),
944            system_actors: None,
945            stop_receivers: Arc::new(Mutex::new(HashMap::new())),
946            app_version: None,
947        }
948    }
949
950    /// The adapter name for this runtime, used in version handshakes.
951    pub const ADAPTER_NAME: &'static str = "coerce";
952
953    /// Set the application version for this node.
954    ///
955    /// This is your application's release version (e.g., "2.3.1"), not the
956    /// dactor framework version. It is included in handshake requests for
957    /// operational visibility during rolling upgrades.
958    pub fn with_app_version(mut self, version: impl Into<String>) -> Self {
959        self.app_version = Some(version.into());
960        self
961    }
962
963    /// Returns the configured application version, if any.
964    pub fn app_version(&self) -> Option<&str> {
965        self.app_version.as_deref()
966    }
967
968    /// Build a [`HandshakeRequest`](dactor::HandshakeRequest) from this
969    /// runtime's current configuration.
970    pub fn handshake_request(&self) -> dactor::HandshakeRequest {
971        dactor::HandshakeRequest::from_runtime(
972            self.node_id.clone(),
973            self.app_version.clone(),
974            Self::ADAPTER_NAME,
975        )
976    }
977
978    /// Returns the node ID of this runtime.
979    pub fn node_id(&self) -> &NodeId {
980        &self.node_id
981    }
982
983    /// Returns a reference to the underlying coerce `ActorSystem`.
984    pub fn system(&self) -> &ActorSystem {
985        &self.system
986    }
987
988    /// Add a global outbound interceptor.
989    ///
990    /// **Must be called before any actors are spawned.** Panics if actors
991    /// already hold references to the interceptor list (i.e., after `spawn()`).
992    pub fn add_outbound_interceptor(&mut self, interceptor: Box<dyn OutboundInterceptor>) {
993        Arc::get_mut(&mut self.outbound_interceptors)
994            .expect("cannot add interceptors after actors are spawned")
995            .push(interceptor);
996    }
997
998    /// Set the drop observer for outbound interceptor pipelines.
999    ///
1000    /// **Must be called before any actors are spawned.**
1001    pub fn set_drop_observer(&mut self, observer: Arc<dyn DropObserver>) {
1002        self.drop_observer = Some(observer);
1003    }
1004
1005    /// Set a global dead letter handler. Called whenever a message cannot be
1006    /// delivered (actor stopped, mailbox full, dropped by inbound interceptor).
1007    pub fn set_dead_letter_handler(&mut self, handler: Arc<dyn DeadLetterHandler>) {
1008        self.dead_letter_handler = Arc::new(Some(handler));
1009    }
1010
1011    /// Access the cluster events subsystem.
1012    pub fn cluster_events_handle(&self) -> &CoerceClusterEvents {
1013        &self.cluster_events
1014    }
1015
1016    /// Access the cluster events subsystem.
1017    pub fn cluster_events(&self) -> &CoerceClusterEvents {
1018        &self.cluster_events
1019    }
1020
1021    /// Spawn native coerce system actors for transport routing.
1022    ///
1023    /// Must be called from within a tokio runtime context. After this call,
1024    /// `system_actor_refs()` returns the native actor references.
1025    ///
1026    /// Uses default configuration (unbounded mailboxes, no pooling).
1027    /// For custom configuration, use [`start_system_actors_with_config()`].
1028    pub fn start_system_actors(&mut self) {
1029        self.start_system_actors_with_config(dactor::SystemActorConfig::default());
1030    }
1031
1032    /// Spawn native coerce system actors with custom configuration.
1033    ///
1034    /// Allows configuring SpawnManager pooling for high-throughput scenarios.
1035    /// See [`SystemActorConfig`](dactor::SystemActorConfig) for details.
1036    pub fn start_system_actors_with_config(&mut self, config: dactor::SystemActorConfig) {
1037        use crate::system_actors::*;
1038
1039        let pool_size = config.spawn_manager_pool_size.unwrap_or(1).max(1);
1040        let system = self.system.clone();
1041        let mut spawn_refs = Vec::with_capacity(pool_size);
1042
1043        for i in 0..pool_size {
1044            let spawn_mgr = SpawnManagerActor::new(
1045                self.node_id.clone(),
1046                TypeRegistry::new(),
1047                self.next_local.clone(),
1048            );
1049            let spawn_mgr_ref = coerce::actor::scheduler::start_actor(
1050                spawn_mgr,
1051                coerce::actor::new_actor_id(),
1052                coerce::actor::scheduler::ActorType::Tracked,
1053                None,
1054                Some(system.clone()),
1055                None,
1056                format!("dactor-spawn-manager-{i}").into(),
1057            );
1058            spawn_refs.push(spawn_mgr_ref);
1059        }
1060
1061        let watch_mgr = WatchManagerActor::new();
1062        let cancel_mgr = CancelManagerActor::new();
1063        let node_dir = NodeDirectoryActor::new();
1064
1065        let watch_mgr_ref = coerce::actor::scheduler::start_actor(
1066            watch_mgr,
1067            coerce::actor::new_actor_id(),
1068            coerce::actor::scheduler::ActorType::Tracked,
1069            None,
1070            Some(system.clone()),
1071            None,
1072            "dactor-watch-manager".to_string().into(),
1073        );
1074        let cancel_mgr_ref = coerce::actor::scheduler::start_actor(
1075            cancel_mgr,
1076            coerce::actor::new_actor_id(),
1077            coerce::actor::scheduler::ActorType::Tracked,
1078            None,
1079            Some(system.clone()),
1080            None,
1081            "dactor-cancel-manager".to_string().into(),
1082        );
1083        let node_dir_ref = coerce::actor::scheduler::start_actor(
1084            node_dir,
1085            coerce::actor::new_actor_id(),
1086            coerce::actor::scheduler::ActorType::Tracked,
1087            None,
1088            Some(system),
1089            None,
1090            "dactor-node-directory".to_string().into(),
1091        );
1092
1093        self.system_actors = Some(CoerceSystemActorRefs {
1094            spawn_managers: spawn_refs,
1095            spawn_manager_counter: std::sync::atomic::AtomicU64::new(0),
1096            watch_manager: watch_mgr_ref,
1097            cancel_manager: cancel_mgr_ref,
1098            node_directory: node_dir_ref,
1099        });
1100    }
1101
1102    /// Access the native system actor references for transport routing.
1103    ///
1104    /// Returns `None` if `start_system_actors()` has not been called yet.
1105    pub fn system_actor_refs(&self) -> Option<&CoerceSystemActorRefs> {
1106        self.system_actors.as_ref()
1107    }
1108
1109    /// Spawn an actor with `Deps = ()`.
1110    pub async fn spawn<A>(&self, name: &str, args: A::Args) -> Result<CoerceActorRef<A>, dactor::errors::RuntimeError>
1111    where
1112        A: Actor<Deps = ()> + Send + Sync + 'static,
1113    {
1114        Ok(self.spawn_internal::<A>(name, args, (), Vec::new(), MailboxConfig::Unbounded))
1115    }
1116
1117    /// Spawn an actor with explicit dependencies.
1118    pub async fn spawn_with_deps<A>(&self, name: &str, args: A::Args, deps: A::Deps) -> Result<CoerceActorRef<A>, dactor::errors::RuntimeError>
1119    where
1120        A: Actor + Send + Sync + 'static,
1121    {
1122        Ok(self.spawn_internal::<A>(name, args, deps, Vec::new(), MailboxConfig::Unbounded))
1123    }
1124
1125    /// Spawn an actor with spawn options (including inbound interceptors and mailbox config).
1126    pub async fn spawn_with_options<A>(
1127        &self,
1128        name: &str,
1129        args: A::Args,
1130        options: SpawnOptions,
1131    ) -> Result<CoerceActorRef<A>, dactor::errors::RuntimeError>
1132    where
1133        A: Actor<Deps = ()> + Send + Sync + 'static,
1134    {
1135        Ok(self.spawn_internal::<A>(name, args, (), options.interceptors, options.mailbox))
1136    }
1137
1138    fn spawn_internal<A>(
1139        &self,
1140        name: &str,
1141        args: A::Args,
1142        deps: A::Deps,
1143        interceptors: Vec<Box<dyn InboundInterceptor>>,
1144        mailbox: MailboxConfig,
1145    ) -> CoerceActorRef<A>
1146    where
1147        A: Actor + Send + Sync + 'static,
1148    {
1149        let local = self.next_local.fetch_add(1, Ordering::SeqCst);
1150        let actor_id = ActorId {
1151            node: self.node_id.clone(),
1152            local,
1153        };
1154        let actor_name = name.to_string();
1155
1156        let actor = A::create(args, deps);
1157        let (stop_tx, stop_rx) = tokio::sync::oneshot::channel();
1158
1159        let wrapper = CoerceDactorActor {
1160            actor,
1161            ctx: ActorContext::new(actor_id.clone(), actor_name.clone()),
1162            interceptors,
1163            watchers: self.watchers.clone(),
1164            stop_reason: None,
1165            dead_letter_handler: self.dead_letter_handler.clone(),
1166            stop_notifier: Mutex::new(Some(stop_tx)),
1167        };
1168
1169        // Use coerce's synchronous start_actor — it internally calls
1170        // tokio::spawn (non-blocking) to run the actor loop, and returns
1171        // the LocalActorRef immediately. No async bridge needed.
1172        let system = self.system.clone();
1173        let coerce_ref = coerce::actor::scheduler::start_actor(
1174            wrapper,
1175            coerce::actor::new_actor_id(),
1176            coerce::actor::scheduler::ActorType::Tracked,
1177            None,
1178            Some(system),
1179            None,
1180            name.to_string().into(),
1181        );
1182
1183        // Set up optional bounded mailbox channel
1184        let bounded_tx = match mailbox {
1185            MailboxConfig::Bounded { capacity, overflow } => {
1186                let (btx, mut brx) =
1187                    tokio::sync::mpsc::channel::<DactorMsg<A>>(capacity);
1188                let fwd_ref = coerce_ref.clone();
1189                tokio::spawn(async move {
1190                    while let Some(msg) = brx.recv().await {
1191                        if fwd_ref.notify(msg).is_err() {
1192                            break; // actor stopped
1193                        }
1194                    }
1195                });
1196                Some(BoundedMailboxSender::new(btx, overflow))
1197            }
1198            MailboxConfig::Unbounded => None,
1199        };
1200
1201        // Store stop receiver for await_stop()
1202        self.stop_receivers
1203            .lock()
1204            .unwrap()
1205            .insert(actor_id.clone(), stop_rx);
1206
1207        CoerceActorRef {
1208            id: actor_id,
1209            name: actor_name,
1210            inner: coerce_ref,
1211            bounded_tx,
1212            outbound_interceptors: self.outbound_interceptors.clone(),
1213            drop_observer: self.drop_observer.clone(),
1214            dead_letter_handler: self.dead_letter_handler.clone(),
1215        }
1216    }
1217
1218    /// Register actor `watcher` to be notified when `target_id` terminates.
1219    ///
1220    /// The watcher must implement `Handler<ChildTerminated>`. When the target
1221    /// stops, the runtime delivers a [`ChildTerminated`] message to the watcher.
1222    pub fn watch<W>(&self, watcher: &CoerceActorRef<W>, target_id: ActorId)
1223    where
1224        W: Actor + Handler<ChildTerminated> + Send + Sync + 'static,
1225    {
1226        let watcher_id = watcher.id();
1227        let watcher_inner = watcher.inner.clone();
1228
1229        let entry = WatchEntry {
1230            watcher_id,
1231            notify: Box::new(move |msg: ChildTerminated| {
1232                let dispatch: Box<dyn Dispatch<W>> = Box::new(TypedDispatch { msg });
1233                if watcher_inner.notify(DactorMsg::new(dispatch)).is_err() {
1234                    tracing::debug!("watch notification dropped — watcher may have stopped");
1235                }
1236            }),
1237        };
1238
1239        let mut watchers = self.watchers.lock().unwrap();
1240        watchers.entry(target_id).or_default().push(entry);
1241    }
1242
1243    /// Unregister `watcher_id` from notifications about `target_id`.
1244    pub fn unwatch(&self, watcher_id: &ActorId, target_id: &ActorId) {
1245        let mut watchers = self.watchers.lock().unwrap();
1246        if let Some(entries) = watchers.get_mut(target_id) {
1247            entries.retain(|e| &e.watcher_id != watcher_id);
1248            if entries.is_empty() {
1249                watchers.remove(target_id);
1250            }
1251        }
1252    }
1253
1254    // -----------------------------------------------------------------------
1255    // SpawnManager wiring
1256    // -----------------------------------------------------------------------
1257
1258    /// Access the spawn manager.
1259    pub fn spawn_manager(&self) -> &SpawnManager {
1260        &self.spawn_manager
1261    }
1262
1263    /// Access the spawn manager mutably (for registering actor factories).
1264    pub fn spawn_manager_mut(&mut self) -> &mut SpawnManager {
1265        &mut self.spawn_manager
1266    }
1267
1268    /// Register an actor type for remote spawning on this node.
1269    ///
1270    /// Registers the factory in both the struct-based SpawnManager and the
1271    /// native SpawnManagerActor (if started via `start_system_actors()`).
1272    pub fn register_factory(
1273        &mut self,
1274        type_name: impl Into<String>,
1275        factory: impl Fn(&[u8]) -> Result<Box<dyn std::any::Any + Send>, dactor::remote::SerializationError>
1276            + Send
1277            + Sync
1278            + 'static,
1279    ) {
1280        let type_name = type_name.into();
1281        let factory = Arc::new(factory);
1282
1283        // Register in struct-based manager
1284        self.spawn_manager
1285            .type_registry_mut()
1286            .register_factory(type_name.clone(), {
1287                let f = factory.clone();
1288                move |bytes: &[u8]| f(bytes)
1289            });
1290
1291        // Forward to native actor(s) if started
1292        if let Some(ref actors) = self.system_actors {
1293            for worker in &actors.spawn_managers {
1294                let f = factory.clone();
1295                let _ = worker.notify(
1296                    crate::system_actors::RegisterFactory {
1297                        type_name: type_name.clone(),
1298                        factory: Box::new(move |bytes: &[u8]| f(bytes)),
1299                    },
1300                );
1301            }
1302        }
1303    }
1304
1305    /// Process a remote spawn request.
1306    ///
1307    /// Looks up the actor type in the registry, deserializes Args from bytes,
1308    /// and returns the constructed actor along with its assigned [`ActorId`].
1309    ///
1310    /// The returned `ActorId` is pre-assigned for the remote spawn flow where
1311    /// the runtime controls ID assignment. The caller must use this ID when
1312    /// registering the spawned actor (not via the regular `spawn()` path,
1313    /// which assigns its own IDs).
1314    ///
1315    /// **Note:** Currently uses the simple factory API (`TypeRegistry::create_actor`).
1316    /// For actors with non-trivial `Deps`, use `spawn_manager_mut()` to access
1317    /// `TypeRegistry::create_actor_with_deps()` directly.
1318    ///
1319    /// Returns `Ok((actor_id, actor))` on success, or `Err(SpawnResponse::Failure)`
1320    /// if the type is not found or deserialization fails.
1321    pub fn handle_spawn_request(
1322        &mut self,
1323        request: &SpawnRequest,
1324    ) -> Result<(ActorId, Box<dyn std::any::Any + Send>), SpawnResponse> {
1325        match self.spawn_manager.create_actor(request) {
1326            Ok(actor) => {
1327                let local = self.next_local.fetch_add(1, Ordering::SeqCst);
1328                let actor_id = ActorId {
1329                    node: self.node_id.clone(),
1330                    local,
1331                };
1332                self.spawn_manager.record_spawn(actor_id.clone());
1333                Ok((actor_id, actor))
1334            }
1335            Err(e) => Err(SpawnResponse::Failure {
1336                request_id: request.request_id.clone(),
1337                error: e.to_string(),
1338            }),
1339        }
1340    }
1341
1342    // -----------------------------------------------------------------------
1343    // WatchManager wiring
1344    // -----------------------------------------------------------------------
1345
1346    /// Access the watch manager (for remote watch subscriptions).
1347    pub fn watch_manager(&self) -> &WatchManager {
1348        &self.watch_manager
1349    }
1350
1351    /// Access the watch manager mutably.
1352    pub fn watch_manager_mut(&mut self) -> &mut WatchManager {
1353        &mut self.watch_manager
1354    }
1355
1356    /// Register a remote watch: a remote watcher wants to know when a local
1357    /// actor terminates.
1358    pub fn remote_watch(&mut self, target: ActorId, watcher: ActorId) {
1359        self.watch_manager.watch(target, watcher);
1360    }
1361
1362    /// Remove a remote watch subscription.
1363    pub fn remote_unwatch(&mut self, target: &ActorId, watcher: &ActorId) {
1364        self.watch_manager.unwatch(target, watcher);
1365    }
1366
1367    /// Called when a local actor terminates. Returns notifications for all
1368    /// remote watchers that should be sent to their respective nodes.
1369    ///
1370    /// **Note:** This must be called explicitly by the integration layer.
1371    /// It is not yet automatically wired into coerce's actor stop lifecycle.
1372    pub fn notify_terminated(&mut self, terminated: &ActorId) -> Vec<WatchNotification> {
1373        self.watch_manager.on_terminated(terminated)
1374    }
1375
1376    // -----------------------------------------------------------------------
1377    // CancelManager wiring
1378    // -----------------------------------------------------------------------
1379
1380    /// Access the cancel manager.
1381    pub fn cancel_manager(&self) -> &CancelManager {
1382        &self.cancel_manager
1383    }
1384
1385    /// Access the cancel manager mutably.
1386    pub fn cancel_manager_mut(&mut self) -> &mut CancelManager {
1387        &mut self.cancel_manager
1388    }
1389
1390    /// Register a cancellation token for a request (for remote cancel support).
1391    pub fn register_cancel(&mut self, request_id: String, token: CancellationToken) {
1392        self.cancel_manager.register(request_id, token);
1393    }
1394
1395    /// Process a remote cancellation request.
1396    pub fn cancel_request(&mut self, request_id: &str) -> CancelResponse {
1397        self.cancel_manager.cancel(request_id)
1398    }
1399
1400    /// Clean up a cancellation token after its request completes normally.
1401    ///
1402    /// Should be called when a remote ask/stream/feed completes successfully
1403    /// to prevent stale tokens from accumulating.
1404    pub fn complete_request(&mut self, request_id: &str) {
1405        self.cancel_manager.remove(request_id);
1406    }
1407
1408    // -----------------------------------------------------------------------
1409    // NodeDirectory wiring
1410    // -----------------------------------------------------------------------
1411
1412    /// Access the node directory.
1413    pub fn node_directory(&self) -> &NodeDirectory {
1414        &self.node_directory
1415    }
1416
1417    /// Access the node directory mutably.
1418    pub fn node_directory_mut(&mut self) -> &mut NodeDirectory {
1419        &mut self.node_directory
1420    }
1421
1422    /// Register a peer node as connected in the directory.
1423    ///
1424    /// **Post-validation only:** this method assumes the peer has already
1425    /// passed the version handshake. Callers should validate compatibility
1426    /// (via [`handshake_request`](Self::handshake_request) +
1427    /// [`validate_handshake`](dactor::validate_handshake) +
1428    /// [`verify_peer_identity`](dactor::verify_peer_identity)) before calling
1429    /// this method. Direct calls bypass compatibility checks.
1430    ///
1431    /// If the peer already exists, updates its status to `Connected` and
1432    /// preserves the existing address when `address` is `None`.
1433    /// Emits a `ClusterEvent::NodeJoined` if the peer was not previously connected.
1434    pub fn connect_peer(&mut self, peer_id: NodeId, address: Option<String>) {
1435        let was_connected = self.node_directory.is_connected(&peer_id);
1436        if let Some(existing) = self.node_directory.get_peer(&peer_id) {
1437            let resolved_address = address.or_else(|| existing.address.clone());
1438            self.node_directory.remove_peer(&peer_id);
1439            self.node_directory
1440                .add_peer(peer_id.clone(), resolved_address);
1441        } else {
1442            self.node_directory.add_peer(peer_id.clone(), address);
1443        }
1444        self.node_directory
1445            .set_status(&peer_id, PeerStatus::Connected);
1446        if !was_connected {
1447            self.cluster_events
1448                .emit(dactor::ClusterEvent::NodeJoined(peer_id));
1449        }
1450    }
1451
1452    /// Mark a peer as disconnected.
1453    ///
1454    /// Emits a `ClusterEvent::NodeLeft` if the peer was previously connected.
1455    pub fn disconnect_peer(&mut self, peer_id: &NodeId) {
1456        let was_connected = self.node_directory.is_connected(peer_id);
1457        self.node_directory
1458            .set_status(peer_id, PeerStatus::Disconnected);
1459        if was_connected {
1460            self.cluster_events
1461                .emit(dactor::ClusterEvent::NodeLeft(peer_id.clone()));
1462        }
1463    }
1464
1465    /// Check if a peer node is connected.
1466    pub fn is_peer_connected(&self, peer_id: &NodeId) -> bool {
1467        self.node_directory.is_connected(peer_id)
1468    }
1469
1470    // -----------------------------------------------------------------------
1471    // Actor lifecycle handles
1472    // -----------------------------------------------------------------------
1473
1474    /// Wait for an actor to stop.
1475    ///
1476    /// Returns `Ok(())` when the actor finishes cleanly, or `Err` if the
1477    /// actor panicked in `on_stop`. The stop receiver is consumed and removed
1478    /// from the map.
1479    ///
1480    /// Returns `Ok(())` immediately if no stop receiver is stored for this ID.
1481    pub async fn await_stop(&self, actor_id: &ActorId) -> Result<(), String> {
1482        let rx = {
1483            let mut receivers = self.stop_receivers.lock().unwrap();
1484            receivers.remove(actor_id)
1485        };
1486        match rx {
1487            Some(rx) => rx
1488                .await
1489                .map_err(|_| "stop notifier dropped".to_string())
1490                .and_then(|r| r),
1491            None => Ok(()),
1492        }
1493    }
1494
1495    /// Wait for all spawned actors to stop.
1496    ///
1497    /// Drains all stored stop receivers and awaits them all. Returns the first
1498    /// error encountered, but always waits for every actor to finish.
1499    pub async fn await_all(&self) -> Result<(), String> {
1500        let receivers: Vec<_> = {
1501            let mut map = self.stop_receivers.lock().unwrap();
1502            map.drain().collect()
1503        };
1504        let mut first_error = None;
1505        for (_, rx) in receivers {
1506            let result = rx.await.map_err(|e| format!("stop notifier dropped: {e}")).and_then(|r| r);
1507            if let Err(e) = result {
1508                if first_error.is_none() {
1509                    first_error = Some(e);
1510                }
1511            }
1512        }
1513        match first_error {
1514            Some(e) => Err(e),
1515            None => Ok(()),
1516        }
1517    }
1518
1519    /// Remove completed stop receivers from the map.
1520    ///
1521    /// Call periodically to prevent stale entries from accumulating
1522    /// for actors that stopped without being awaited.
1523    pub fn cleanup_finished(&self) {
1524        let mut receivers = self.stop_receivers.lock().unwrap();
1525        receivers.retain(|_, rx| {
1526            matches!(
1527                rx.try_recv(),
1528                Err(tokio::sync::oneshot::error::TryRecvError::Empty)
1529            )
1530        });
1531    }
1532
1533    /// Number of actors with stored stop receivers.
1534    ///
1535    /// Note: includes receivers for actors that have already stopped but
1536    /// haven't been awaited or cleaned up. Call `cleanup_finished()` first
1537    /// for an accurate count of running actors.
1538    pub fn active_handle_count(&self) -> usize {
1539        self.stop_receivers.lock().unwrap().len()
1540    }
1541}
1542
1543impl Default for CoerceRuntime {
1544    fn default() -> Self {
1545        Self::new()
1546    }
1547}
1548
1549// ---------------------------------------------------------------------------
1550// NA10: SystemMessageRouter for coerce
1551// ---------------------------------------------------------------------------
1552
1553// NOTE: System messages routed here update the **native system actors** (the
1554// mailbox-based coerce actors spawned by `start_system_actors()`). The runtime
1555// also keeps plain struct system actors (`self.watch_manager`, etc.) for the
1556// backward-compatible sync API. This dual-state pattern is intentional — see
1557// progress.md "Dual struct+actor pattern" design decision.
1558
1559#[async_trait::async_trait]
1560impl dactor::system_router::SystemMessageRouter for CoerceRuntime {
1561    async fn route_system_envelope(
1562        &self,
1563        envelope: dactor::remote::WireEnvelope,
1564    ) -> Result<dactor::system_router::RoutingOutcome, dactor::system_router::RoutingError> {
1565        use dactor::system_actors::*;
1566        use dactor::system_router::{RoutingError, RoutingOutcome};
1567
1568        dactor::system_router::validate_system_message_type(&envelope.message_type)?;
1569
1570        let refs = self
1571            .system_actors
1572            .as_ref()
1573            .ok_or_else(|| RoutingError::new("system actors not started"))?;
1574
1575        match envelope.message_type.as_str() {
1576            SYSTEM_MSG_TYPE_SPAWN => {
1577                let request = dactor::proto::decode_spawn_request(&envelope.body)
1578                    .map_err(|e| RoutingError::new(format!("decode SpawnRequest: {e}")))?;
1579
1580                let req_id = request.request_id.clone();
1581                let outcome = refs
1582                    .spawn_manager()
1583                    .send(crate::system_actors::HandleSpawnRequest(request))
1584                    .await
1585                    .map_err(|e| RoutingError::new(format!("SpawnManager send: {e}")))?;
1586
1587                match outcome {
1588                    crate::system_actors::SpawnOutcome::Success { actor_id, .. } => {
1589                        Ok(RoutingOutcome::SpawnCompleted {
1590                            request_id: req_id,
1591                            actor_id,
1592                        })
1593                    }
1594                    crate::system_actors::SpawnOutcome::Failure(SpawnResponse::Failure {
1595                        request_id,
1596                        error,
1597                    }) => Ok(RoutingOutcome::SpawnFailed { request_id, error }),
1598                    crate::system_actors::SpawnOutcome::Failure(SpawnResponse::Success {
1599                        ..
1600                    }) => {
1601                        unreachable!("SpawnOutcome::Failure always wraps SpawnResponse::Failure")
1602                    }
1603                }
1604            }
1605
1606            SYSTEM_MSG_TYPE_WATCH => {
1607                let request = dactor::proto::decode_watch_request(&envelope.body)
1608                    .map_err(|e| RoutingError::new(format!("decode WatchRequest: {e}")))?;
1609
1610                refs.watch_manager
1611                    .notify(crate::system_actors::RemoteWatch {
1612                        target: request.target,
1613                        watcher: request.watcher,
1614                    })
1615                    .map_err(|e| RoutingError::new(format!("WatchManager notify: {e}")))?;
1616
1617                Ok(RoutingOutcome::Acknowledged)
1618            }
1619
1620            SYSTEM_MSG_TYPE_UNWATCH => {
1621                let request = dactor::proto::decode_unwatch_request(&envelope.body)
1622                    .map_err(|e| RoutingError::new(format!("decode UnwatchRequest: {e}")))?;
1623
1624                refs.watch_manager
1625                    .notify(crate::system_actors::RemoteUnwatch {
1626                        target: request.target,
1627                        watcher: request.watcher,
1628                    })
1629                    .map_err(|e| RoutingError::new(format!("WatchManager notify: {e}")))?;
1630
1631                Ok(RoutingOutcome::Acknowledged)
1632            }
1633
1634            SYSTEM_MSG_TYPE_CANCEL => {
1635                let request = dactor::proto::decode_cancel_request(&envelope.body)
1636                    .map_err(|e| RoutingError::new(format!("decode CancelRequest: {e}")))?;
1637
1638                let request_id = request
1639                    .request_id
1640                    .ok_or_else(|| RoutingError::new("CancelRequest missing request_id"))?;
1641
1642                let outcome = refs
1643                    .cancel_manager
1644                    .send(crate::system_actors::CancelById(request_id))
1645                    .await
1646                    .map_err(|e| RoutingError::new(format!("CancelManager send: {e}")))?;
1647
1648                match outcome.0 {
1649                    CancelResponse::Acknowledged => Ok(RoutingOutcome::CancelAcknowledged),
1650                    CancelResponse::NotFound { reason } => {
1651                        Ok(RoutingOutcome::CancelNotFound { reason })
1652                    }
1653                }
1654            }
1655
1656            SYSTEM_MSG_TYPE_CONNECT_PEER => {
1657                let (peer_id, address) = dactor::proto::decode_connect_peer(&envelope.body)
1658                    .map_err(|e| RoutingError::new(format!("decode ConnectPeer: {e}")))?;
1659
1660                refs.node_directory
1661                    .notify(crate::system_actors::ConnectPeer {
1662                        peer_id,
1663                        address,
1664                    })
1665                    .map_err(|e| RoutingError::new(format!("NodeDirectory notify: {e}")))?;
1666
1667                Ok(RoutingOutcome::Acknowledged)
1668            }
1669
1670            SYSTEM_MSG_TYPE_DISCONNECT_PEER => {
1671                let peer_id = dactor::proto::decode_disconnect_peer(&envelope.body)
1672                    .map_err(|e| RoutingError::new(format!("decode DisconnectPeer: {e}")))?;
1673
1674                refs.node_directory
1675                    .notify(crate::system_actors::DisconnectPeer(peer_id))
1676                    .map_err(|e| RoutingError::new(format!("NodeDirectory notify: {e}")))?;
1677
1678                Ok(RoutingOutcome::Acknowledged)
1679            }
1680
1681            other => Err(RoutingError::new(format!(
1682                "unhandled system message type: {other}"
1683            ))),
1684        }
1685    }
1686}
1687