1use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Mutex};
9use std::time::Duration;
10
11use futures::FutureExt;
12use tokio_util::sync::CancellationToken;
13
14use dactor::actor::{
15 Actor, ActorContext, ActorError, ActorRef, AskReply, ReduceHandler, Handler, ExpandHandler,
16 TransformHandler,
17};
18use dactor::dead_letter::{DeadLetterEvent, DeadLetterHandler, DeadLetterReason};
19use dactor::dispatch::{AskDispatch, Dispatch, ReduceDispatch, ExpandDispatch, TransformDispatch, TypedDispatch};
20use dactor::errors::{ActorSendError, ErrorAction, RuntimeError};
21use dactor::interceptor::{
22 collect_handler_wrappers, apply_handler_wrappers,
23 Disposition, DropObserver, InboundContext, InboundInterceptor, OutboundInterceptor, Outcome,
24 SendMode,
25};
26use dactor::mailbox::MailboxConfig;
27use dactor::message::{Headers, Message, RuntimeHeaders};
28use dactor::node::{ActorId, NodeId};
29use dactor::runtime_support::{
30 spawn_reduce_batched_drain, spawn_reduce_drain, spawn_transform_drain,
31 wrap_batched_stream_with_interception, wrap_stream_with_interception, OutboundPipeline,
32};
33use dactor::stream::{
34 BatchConfig, BatchReader, BatchWriter, BoxStream, StreamReceiver, StreamSender,
35};
36use dactor::supervision::ChildTerminated;
37use dactor::system_actors::{
38 CancelManager, CancelResponse, NodeDirectory, PeerStatus, SpawnManager, SpawnRequest,
39 SpawnResponse, WatchManager, WatchNotification,
40};
41use dactor::type_registry::TypeRegistry;
42
43use crate::cluster::CoerceClusterEvents;
44
45use coerce::actor::context::ActorContext as CoerceActorCtx;
47use coerce::actor::message::{Handler as CoerceHandler, Message as CoerceMessage};
48use coerce::actor::system::ActorSystem;
49use coerce::actor::{Actor as CoerceActor, LocalActorRef};
50
51struct WatchEntry {
57 watcher_id: ActorId,
58 notify: Box<dyn Fn(ChildTerminated) + Send + Sync>,
60}
61
62type WatcherMap = Arc<Mutex<HashMap<ActorId, Vec<WatchEntry>>>>;
64
65struct DactorMsg<A: Actor>(std::sync::Mutex<Option<Box<dyn Dispatch<A>>>>);
75
76impl<A: Actor> DactorMsg<A> {
77 fn new(dispatch: Box<dyn Dispatch<A>>) -> Self {
78 Self(std::sync::Mutex::new(Some(dispatch)))
79 }
80
81 fn take(&self) -> Option<Box<dyn Dispatch<A>>> {
82 self.0.lock().unwrap_or_else(|e| e.into_inner()).take()
83 }
84}
85
86impl<A: Actor + Send + Sync + 'static> CoerceMessage for DactorMsg<A> {
87 type Result = ();
88}
89
90struct CoerceDactorActor<A: Actor> {
92 actor: A,
93 ctx: ActorContext,
94 interceptors: Vec<Box<dyn InboundInterceptor>>,
95 watchers: WatcherMap,
96 stop_reason: Option<String>,
97 dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
98 stop_notifier: Mutex<Option<tokio::sync::oneshot::Sender<Result<(), String>>>>,
101}
102
103#[async_trait::async_trait]
104impl<A: Actor + Send + Sync + 'static> CoerceActor for CoerceDactorActor<A> {
105 async fn started(&mut self, _ctx: &mut CoerceActorCtx) {
106 self.actor.on_start(&mut self.ctx).await;
107 }
108
109 async fn stopped(&mut self, _ctx: &mut CoerceActorCtx) {
110 self.ctx.send_mode = None;
112 self.ctx.headers = Headers::new();
113 self.ctx.set_cancellation_token(None);
114
115 let stop_result =
117 std::panic::AssertUnwindSafe(self.actor.on_stop())
118 .catch_unwind()
119 .await;
120 let stop_err = match stop_result {
121 Ok(()) => None,
122 Err(_panic) => Some("actor panicked in on_stop".to_string()),
123 };
124
125 let actor_id = self.ctx.actor_id.clone();
127 let actor_name = self.ctx.actor_name.clone();
128 let entries = {
129 let mut watchers = self.watchers.lock().unwrap();
130 let target_entries = watchers.remove(&actor_id).unwrap_or_default();
132 for entries in watchers.values_mut() {
134 entries.retain(|e| e.watcher_id != actor_id);
135 }
136 watchers.retain(|_, v| !v.is_empty());
137 target_entries
138 };
139 if !entries.is_empty() {
140 let notification = ChildTerminated {
141 child_id: actor_id,
142 child_name: actor_name,
143 reason: self.stop_reason.clone(),
144 };
145 for entry in &entries {
146 (entry.notify)(notification.clone());
147 }
148 }
149
150 if let Some(tx) = self.stop_notifier.lock().unwrap().take() {
152 let result = match &stop_err {
153 Some(e) => Err(e.clone()),
154 None => Ok(()),
155 };
156 let _ = tx.send(result);
157 }
158 }
159}
160
161#[async_trait::async_trait]
162impl<A: Actor + Send + Sync + 'static> CoerceHandler<DactorMsg<A>> for CoerceDactorActor<A> {
163 async fn handle(&mut self, msg: DactorMsg<A>, coerce_ctx: &mut CoerceActorCtx) {
164 let dispatch = match msg.take() {
165 Some(d) => d,
166 None => return,
167 };
168
169 let send_mode = dispatch.send_mode();
171 let message_type = dispatch.message_type_name();
172
173 self.ctx.send_mode = Some(send_mode);
174 self.ctx.headers = Headers::new();
175
176 let runtime_headers = RuntimeHeaders::new();
178 let mut headers = Headers::new();
179 let mut total_delay = Duration::ZERO;
180 let mut rejection: Option<(String, Disposition)> = None;
181
182 {
183 let ictx = InboundContext {
184 actor_id: self.ctx.actor_id.clone(),
185 actor_name: &self.ctx.actor_name,
186 message_type,
187 send_mode,
188 remote: false,
189 origin_node: None,
190 };
191
192 for interceptor in &self.interceptors {
193 match interceptor.on_receive(
194 &ictx,
195 &runtime_headers,
196 &mut headers,
197 dispatch.message_any(),
198 ) {
199 Disposition::Continue => {}
200 Disposition::Delay(d) => {
201 total_delay += d;
202 }
203 disp @ (Disposition::Drop | Disposition::Reject(_) | Disposition::Retry(_)) => {
204 rejection = Some((interceptor.name().to_string(), disp));
205 break;
206 }
207 }
208 }
209 }
210
211 if let Some((interceptor_name, disposition)) = rejection {
213 if matches!(disposition, Disposition::Drop) {
214 if let Some(ref handler) = *self.dead_letter_handler {
215 let event = DeadLetterEvent {
216 target_id: self.ctx.actor_id.clone(),
217 target_name: Some(self.ctx.actor_name.clone()),
218 message_type,
219 send_mode,
220 reason: DeadLetterReason::DroppedByInterceptor {
221 interceptor: interceptor_name.clone(),
222 },
223 message: None,
224 };
225 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
226 handler.on_dead_letter(event);
227 }));
228 }
229 }
230 dispatch.reject(disposition, &interceptor_name);
231 return;
232 }
233
234 if !total_delay.is_zero() {
235 tokio::time::sleep(total_delay).await;
236 }
237
238 let ictx_for_wrap = InboundContext {
241 actor_id: self.ctx.actor_id.clone(),
242 actor_name: &self.ctx.actor_name,
243 message_type,
244 send_mode,
245 remote: false,
246 origin_node: None,
247 };
248 let wrappers = collect_handler_wrappers(&self.interceptors, &ictx_for_wrap, &headers);
249 let needs_wrap = wrappers.iter().any(|w| w.is_some());
250
251 self.ctx.headers = headers;
253
254 let cancel_token = dispatch.cancel_token();
256 self.ctx.set_cancellation_token(cancel_token.clone());
257
258 if let Some(ref token) = cancel_token {
260 if token.is_cancelled() {
261 dispatch.cancel();
262 self.ctx.set_cancellation_token(None);
263 return;
264 }
265 }
266
267 let result = if needs_wrap {
269 let (result_tx, mut result_rx) = tokio::sync::oneshot::channel();
270
271 let inner: std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>> = Box::pin(async {
272 let r = std::panic::AssertUnwindSafe(
273 dispatch.dispatch(&mut self.actor, &mut self.ctx),
274 )
275 .catch_unwind()
276 .await;
277 let _ = result_tx.send(r);
278 });
279
280 let wrapped = apply_handler_wrappers(wrappers, inner);
281
282 let wrapped = std::panic::AssertUnwindSafe(wrapped);
283
284 if let Some(ref token) = cancel_token {
285 tokio::select! {
286 biased;
287 _ = wrapped.catch_unwind() => {},
288 _ = token.cancelled() => {
289 self.ctx.set_cancellation_token(None);
290 return;
291 }
292 }
293 } else {
294 wrapped.catch_unwind().await.ok();
295 }
296
297 match result_rx.try_recv() {
298 Ok(r) => r,
299 Err(_) => Err(Box::new(
300 "interceptor wrap_handler did not await the handler future",
301 ) as Box<dyn std::any::Any + Send>),
302 }
303 } else {
304 if let Some(ref token) = cancel_token {
305 let dispatch_fut =
306 std::panic::AssertUnwindSafe(dispatch.dispatch(&mut self.actor, &mut self.ctx))
307 .catch_unwind();
308 tokio::select! {
309 biased;
310 r = dispatch_fut => r,
311 _ = token.cancelled() => {
312 self.ctx.set_cancellation_token(None);
313 return;
314 }
315 }
316 } else {
317 std::panic::AssertUnwindSafe(dispatch.dispatch(&mut self.actor, &mut self.ctx))
318 .catch_unwind()
319 .await
320 }
321 };
322
323 self.ctx.set_cancellation_token(None);
324
325 let ictx = InboundContext {
327 actor_id: self.ctx.actor_id.clone(),
328 actor_name: &self.ctx.actor_name,
329 message_type,
330 send_mode,
331 remote: false,
332 origin_node: None,
333 };
334
335 match result {
336 Ok(dispatch_result) => {
337 let outcome = match (&dispatch_result.reply, send_mode) {
338 (Some(reply), SendMode::Ask) => Outcome::AskSuccess {
339 reply: reply.as_ref(),
340 },
341 _ => Outcome::TellSuccess,
342 };
343
344 for interceptor in &self.interceptors {
345 interceptor.on_complete(&ictx, &runtime_headers, &self.ctx.headers, &outcome);
346 }
347
348 dispatch_result.send_reply();
350 }
351 Err(_panic) => {
352 let error = ActorError::internal("handler panicked");
353 let action = self.actor.on_error(&error);
354
355 let outcome = Outcome::HandlerError { error };
356 for interceptor in &self.interceptors {
357 interceptor.on_complete(&ictx, &runtime_headers, &self.ctx.headers, &outcome);
358 }
359
360 match action {
361 ErrorAction::Resume => {
362 }
364 ErrorAction::Stop | ErrorAction::Escalate => {
365 self.stop_reason = Some("handler panicked".into());
366 coerce_ctx.stop(None);
367 }
368 ErrorAction::Restart => {
369 tracing::warn!("Restart not fully implemented, treating as Resume");
370 }
371 }
372 }
373 }
374 }
375}
376
377pub struct CoerceActorRef<A: Actor + Send + Sync + 'static> {
391 id: ActorId,
392 name: String,
393 inner: LocalActorRef<CoerceDactorActor<A>>,
394 bounded_tx: Option<BoundedMailboxSender<DactorMsg<A>>>,
397 outbound_interceptors: Arc<Vec<Box<dyn OutboundInterceptor>>>,
398 drop_observer: Option<Arc<dyn DropObserver>>,
399 dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
400}
401
402use dactor::runtime_support::BoundedMailboxSender;
403
404impl<A: Actor + Send + Sync + 'static> Clone for CoerceActorRef<A> {
405 fn clone(&self) -> Self {
406 Self {
407 id: self.id.clone(),
408 name: self.name.clone(),
409 inner: self.inner.clone(),
410 bounded_tx: self.bounded_tx.clone(),
411 outbound_interceptors: self.outbound_interceptors.clone(),
412 drop_observer: self.drop_observer.clone(),
413 dead_letter_handler: self.dead_letter_handler.clone(),
414 }
415 }
416}
417
418impl<A: Actor + Send + Sync + 'static> std::fmt::Debug for CoerceActorRef<A> {
419 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
420 write!(f, "CoerceActorRef({}, {:?})", self.name, self.id)
421 }
422}
423
424impl<A: Actor + Send + Sync> CoerceActorRef<A> {
425 fn outbound_pipeline(&self) -> OutboundPipeline {
426 OutboundPipeline {
427 interceptors: self.outbound_interceptors.clone(),
428 drop_observer: self.drop_observer.clone(),
429 target_id: self.id.clone(),
430 target_name: self.name.clone(),
431 }
432 }
433
434 fn notify_dead_letter(
435 &self,
436 message_type: &'static str,
437 send_mode: SendMode,
438 reason: DeadLetterReason,
439 ) {
440 if let Some(ref handler) = *self.dead_letter_handler {
441 let event = DeadLetterEvent {
442 target_id: self.id.clone(),
443 target_name: Some(self.name.clone()),
444 message_type,
445 send_mode,
446 reason,
447 message: None,
448 };
449 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
450 handler.on_dead_letter(event);
451 }));
452 }
453 }
454
455 fn send_dispatch(&self, dispatch: Box<dyn Dispatch<A>>) -> Result<(), ActorSendError> {
458 let dactor_msg = DactorMsg::new(dispatch);
459 if let Some(ref btx) = self.bounded_tx {
460 btx.try_send(dactor_msg)
461 } else {
462 self.inner
463 .notify(dactor_msg)
464 .map_err(|e| ActorSendError(e.to_string()))
465 }
466 }
467}
468
469impl<A: Actor + Send + Sync + 'static> ActorRef<A> for CoerceActorRef<A> {
470 fn id(&self) -> ActorId {
471 self.id.clone()
472 }
473
474 fn name(&self) -> String {
475 self.name.clone()
476 }
477
478 fn is_alive(&self) -> bool {
479 if let Some(ref btx) = self.bounded_tx {
480 !btx.is_closed() && self.inner.is_valid()
481 } else {
482 self.inner.is_valid()
483 }
484 }
485
486 fn pending_messages(&self) -> usize {
487 if let Some(ref btx) = self.bounded_tx {
488 btx.pending()
489 } else {
490 0 }
492 }
493
494 fn stop(&self) {
495 let _ = self.inner.notify_stop();
496 }
497
498 fn tell<M>(&self, msg: M) -> Result<(), ActorSendError>
499 where
500 A: Handler<M>,
501 M: Message<Reply = ()>,
502 {
503 let pipeline = self.outbound_pipeline();
504 let result = pipeline.run_on_send(SendMode::Tell, &msg);
505 match result.disposition {
506 Disposition::Continue => {}
507 Disposition::Drop | Disposition::Reject(_) | Disposition::Retry(_) => return Ok(()),
508 Disposition::Delay(_) => {} }
510
511 let dispatch: Box<dyn Dispatch<A>> = Box::new(TypedDispatch { msg });
512 self.send_dispatch(dispatch).map_err(|e| {
513 let reason = if e.0.contains("full") {
514 DeadLetterReason::MailboxFull
515 } else {
516 DeadLetterReason::ActorStopped
517 };
518 self.notify_dead_letter(
519 std::any::type_name::<M>(),
520 SendMode::Tell,
521 reason,
522 );
523 e
524 })
525 }
526
527 fn ask<M>(
528 &self,
529 msg: M,
530 cancel: Option<CancellationToken>,
531 ) -> Result<AskReply<M::Reply>, ActorSendError>
532 where
533 A: Handler<M>,
534 M: Message,
535 {
536 let pipeline = self.outbound_pipeline();
537 let result = pipeline.run_on_send(SendMode::Ask, &msg);
538 match result.disposition {
539 Disposition::Continue => {}
540 Disposition::Delay(_) => {} Disposition::Drop => {
542 let (tx, rx) = tokio::sync::oneshot::channel();
543 let _ = tx.send(Err(RuntimeError::ActorNotFound(
544 "message dropped by outbound interceptor".into(),
545 )));
546 return Ok(AskReply::new(rx));
547 }
548 Disposition::Reject(reason) => {
549 let (tx, rx) = tokio::sync::oneshot::channel();
550 let _ = tx.send(Err(RuntimeError::Rejected {
551 interceptor: result.interceptor_name.to_string(),
552 reason,
553 }));
554 return Ok(AskReply::new(rx));
555 }
556 Disposition::Retry(retry_after) => {
557 let (tx, rx) = tokio::sync::oneshot::channel();
558 let _ = tx.send(Err(RuntimeError::RetryAfter {
559 interceptor: result.interceptor_name.to_string(),
560 retry_after,
561 }));
562 return Ok(AskReply::new(rx));
563 }
564 }
565
566 let (tx, rx) = tokio::sync::oneshot::channel();
567 let dispatch: Box<dyn Dispatch<A>> = Box::new(AskDispatch {
568 msg,
569 reply_tx: tx,
570 cancel,
571 });
572 self.send_dispatch(dispatch).map_err(|e| {
573 let reason = if e.0.contains("full") {
574 DeadLetterReason::MailboxFull
575 } else {
576 DeadLetterReason::ActorStopped
577 };
578 self.notify_dead_letter(
579 std::any::type_name::<M>(),
580 SendMode::Ask,
581 reason,
582 );
583 e
584 })?;
585 Ok(AskReply::new(rx))
586 }
587
588 fn expand<M, OutputItem>(
589 &self,
590 msg: M,
591 buffer: usize,
592 batch_config: Option<BatchConfig>,
593 cancel: Option<CancellationToken>,
594 ) -> Result<BoxStream<OutputItem>, ActorSendError>
595 where
596 A: ExpandHandler<M, OutputItem>,
597 M: Send + 'static,
598 OutputItem: Send + 'static,
599 {
600 let pipeline = self.outbound_pipeline();
601 let result = pipeline.run_on_send(SendMode::Expand, &msg);
602 match result.disposition {
603 Disposition::Continue => {}
604 Disposition::Delay(_) => {}
605 Disposition::Drop => {
606 return Err(ActorSendError(
607 "stream dropped by outbound interceptor".into(),
608 ));
609 }
610 Disposition::Reject(reason) => {
611 return Err(ActorSendError(format!("stream rejected: {}", reason)));
612 }
613 Disposition::Retry(_) => {
614 return Err(ActorSendError(
615 "stream retry requested by interceptor".into(),
616 ));
617 }
618 }
619
620 let buffer = buffer.max(1);
621 let (tx, rx) = tokio::sync::mpsc::channel(buffer);
622 let sender = StreamSender::new(tx);
623 let dispatch: Box<dyn Dispatch<A>> = Box::new(ExpandDispatch {
624 msg,
625 sender,
626 cancel,
627 });
628 self.send_dispatch(dispatch)?;
629
630 match batch_config {
631 Some(batch_config) => {
632 let (batch_tx, batch_rx) = tokio::sync::mpsc::channel::<Vec<OutputItem>>(buffer);
633 let reader = BatchReader::new(batch_rx);
634 let batch_delay = batch_config.max_delay;
635 tokio::spawn(async move {
636 let mut writer = BatchWriter::new(batch_tx, batch_config);
637 let mut rx = rx;
638 loop {
639 if writer.buffered_count() > 0 {
640 let deadline = tokio::time::Instant::now() + batch_delay;
641 tokio::select! {
642 biased;
643 item = rx.recv() => match item {
644 Some(item) => {
645 if writer.push(item).await.is_err() { break; }
646 }
647 None => break,
648 },
649 _ = tokio::time::sleep_until(deadline) => {
650 if writer.check_deadline().await.is_err() { break; }
651 }
652 }
653 } else {
654 match rx.recv().await {
655 Some(item) => {
656 if writer.push(item).await.is_err() {
657 break;
658 }
659 }
660 None => break,
661 }
662 }
663 }
664 let _ = writer.flush().await;
665 });
666 Ok(wrap_batched_stream_with_interception(
667 reader,
668 buffer,
669 pipeline,
670 std::any::type_name::<M>(),
671 SendMode::Expand,
672 ))
673 }
674 None => Ok(wrap_stream_with_interception(
675 rx,
676 buffer,
677 pipeline,
678 std::any::type_name::<M>(),
679 SendMode::Expand,
680 )),
681 }
682 }
683
684 fn reduce<InputItem, Reply>(
685 &self,
686 input: BoxStream<InputItem>,
687 buffer: usize,
688 batch_config: Option<BatchConfig>,
689 cancel: Option<CancellationToken>,
690 ) -> Result<AskReply<Reply>, ActorSendError>
691 where
692 A: ReduceHandler<InputItem, Reply>,
693 InputItem: Send + 'static,
694 Reply: Send + 'static,
695 {
696 let buffer = buffer.max(1);
697 let (item_tx, item_rx) = tokio::sync::mpsc::channel(buffer);
698 let receiver = StreamReceiver::new(item_rx);
699 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
700 let dispatch: Box<dyn Dispatch<A>> = Box::new(ReduceDispatch {
701 receiver,
702 reply_tx,
703 cancel: cancel.clone(),
704 });
705 self.send_dispatch(dispatch)?;
706
707 let pipeline = self.outbound_pipeline();
708 match batch_config {
709 Some(batch_config) => {
710 spawn_reduce_batched_drain(
711 input,
712 item_tx,
713 buffer,
714 batch_config,
715 cancel,
716 pipeline,
717 std::any::type_name::<InputItem>(),
718 );
719 }
720 None => {
721 spawn_reduce_drain(
722 input,
723 item_tx,
724 cancel,
725 pipeline,
726 std::any::type_name::<InputItem>(),
727 );
728 }
729 }
730
731 Ok(AskReply::new(reply_rx))
732 }
733
734 fn transform<InputItem, OutputItem>(
735 &self,
736 input: BoxStream<InputItem>,
737 buffer: usize,
738 batch_config: Option<BatchConfig>,
739 cancel: Option<CancellationToken>,
740 ) -> Result<BoxStream<OutputItem>, ActorSendError>
741 where
742 A: TransformHandler<InputItem, OutputItem>,
743 InputItem: Send + 'static,
744 OutputItem: Send + 'static,
745 {
746 let buffer = buffer.max(1);
747 let (item_tx, item_rx) = tokio::sync::mpsc::channel(buffer);
748 let (output_tx, mut output_rx) = tokio::sync::mpsc::channel(buffer);
749 let receiver = StreamReceiver::new(item_rx);
750 let sender = StreamSender::new(output_tx);
751 let dispatch: Box<dyn Dispatch<A>> = Box::new(TransformDispatch::new(
752 receiver,
753 sender,
754 cancel.clone(),
755 ));
756 self.send_dispatch(dispatch)?;
757
758 let pipeline = self.outbound_pipeline();
759 spawn_transform_drain(
760 input,
761 item_tx,
762 cancel,
763 pipeline.clone(),
764 std::any::type_name::<InputItem>(),
765 );
766
767 match batch_config {
768 Some(batch_config) => {
769 let (batch_tx, batch_rx) =
770 tokio::sync::mpsc::channel::<Vec<OutputItem>>(buffer);
771 let reader = BatchReader::new(batch_rx);
772 let batch_delay = batch_config.max_delay;
773 tokio::spawn(async move {
774 let mut writer = BatchWriter::new(batch_tx, batch_config);
775 loop {
776 if writer.buffered_count() > 0 {
777 let deadline = tokio::time::Instant::now() + batch_delay;
778 tokio::select! {
779 biased;
780 item = output_rx.recv() => match item {
781 Some(item) => {
782 if writer.push(item).await.is_err() { break; }
783 }
784 None => break,
785 },
786 _ = tokio::time::sleep_until(deadline) => {
787 if writer.check_deadline().await.is_err() { break; }
788 }
789 }
790 } else {
791 match output_rx.recv().await {
792 Some(item) => {
793 if writer.push(item).await.is_err() {
794 break;
795 }
796 }
797 None => break,
798 }
799 }
800 }
801 let _ = writer.flush().await;
802 });
803 Ok(wrap_batched_stream_with_interception(
804 reader,
805 buffer,
806 pipeline,
807 std::any::type_name::<OutputItem>(),
808 SendMode::Transform,
809 ))
810 }
811 None => Ok(wrap_stream_with_interception(
812 output_rx,
813 buffer,
814 pipeline,
815 std::any::type_name::<OutputItem>(),
816 SendMode::Transform,
817 )),
818 }
819 }
820}
821
822pub struct SpawnOptions {
829 pub interceptors: Vec<Box<dyn InboundInterceptor>>,
831 pub mailbox: MailboxConfig,
838}
839
840impl Default for SpawnOptions {
841 fn default() -> Self {
842 Self {
843 interceptors: Vec::new(),
844 mailbox: MailboxConfig::Unbounded,
845 }
846 }
847}
848
849pub struct CoerceSystemActorRefs {
855 pub spawn_managers: Vec<LocalActorRef<crate::system_actors::SpawnManagerActor>>,
856 spawn_manager_counter: std::sync::atomic::AtomicU64,
857 pub watch_manager: LocalActorRef<crate::system_actors::WatchManagerActor>,
858 pub cancel_manager: LocalActorRef<crate::system_actors::CancelManagerActor>,
859 pub node_directory: LocalActorRef<crate::system_actors::NodeDirectoryActor>,
860}
861
862impl CoerceSystemActorRefs {
863 pub fn spawn_manager(&self) -> &LocalActorRef<crate::system_actors::SpawnManagerActor> {
865 let idx = self.spawn_manager_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
866 as usize % self.spawn_managers.len();
867 &self.spawn_managers[idx]
868 }
869}
870
871pub struct CoerceRuntime {
892 node_id: NodeId,
893 next_local: Arc<AtomicU64>,
894 system: ActorSystem,
895 cluster_events: CoerceClusterEvents,
896 outbound_interceptors: Arc<Vec<Box<dyn OutboundInterceptor>>>,
897 drop_observer: Option<Arc<dyn DropObserver>>,
898 dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
899 watchers: WatcherMap,
900 spawn_manager: SpawnManager,
902 watch_manager: WatchManager,
903 cancel_manager: CancelManager,
904 node_directory: NodeDirectory,
905 system_actors: Option<CoerceSystemActorRefs>,
907 #[allow(clippy::type_complexity)]
909 stop_receivers: Arc<Mutex<HashMap<ActorId, tokio::sync::oneshot::Receiver<Result<(), String>>>>>,
910 app_version: Option<String>,
912}
913
914impl CoerceRuntime {
915 pub fn new() -> Self {
920 Self::create(NodeId("coerce-node".into()))
921 }
922
923 pub fn with_node_id(node_id: NodeId) -> Self {
927 Self::create(node_id)
928 }
929
930 fn create(node_id: NodeId) -> Self {
931 Self {
932 node_id,
933 next_local: Arc::new(AtomicU64::new(1)),
934 system: ActorSystem::new(),
935 cluster_events: CoerceClusterEvents::new(),
936 outbound_interceptors: Arc::new(Vec::new()),
937 drop_observer: None,
938 dead_letter_handler: Arc::new(None),
939 watchers: Arc::new(Mutex::new(HashMap::new())),
940 spawn_manager: SpawnManager::new(TypeRegistry::new()),
941 watch_manager: WatchManager::new(),
942 cancel_manager: CancelManager::new(),
943 node_directory: NodeDirectory::new(),
944 system_actors: None,
945 stop_receivers: Arc::new(Mutex::new(HashMap::new())),
946 app_version: None,
947 }
948 }
949
950 pub const ADAPTER_NAME: &'static str = "coerce";
952
953 pub fn with_app_version(mut self, version: impl Into<String>) -> Self {
959 self.app_version = Some(version.into());
960 self
961 }
962
963 pub fn app_version(&self) -> Option<&str> {
965 self.app_version.as_deref()
966 }
967
968 pub fn handshake_request(&self) -> dactor::HandshakeRequest {
971 dactor::HandshakeRequest::from_runtime(
972 self.node_id.clone(),
973 self.app_version.clone(),
974 Self::ADAPTER_NAME,
975 )
976 }
977
978 pub fn node_id(&self) -> &NodeId {
980 &self.node_id
981 }
982
983 pub fn system(&self) -> &ActorSystem {
985 &self.system
986 }
987
988 pub fn add_outbound_interceptor(&mut self, interceptor: Box<dyn OutboundInterceptor>) {
993 Arc::get_mut(&mut self.outbound_interceptors)
994 .expect("cannot add interceptors after actors are spawned")
995 .push(interceptor);
996 }
997
998 pub fn set_drop_observer(&mut self, observer: Arc<dyn DropObserver>) {
1002 self.drop_observer = Some(observer);
1003 }
1004
1005 pub fn set_dead_letter_handler(&mut self, handler: Arc<dyn DeadLetterHandler>) {
1008 self.dead_letter_handler = Arc::new(Some(handler));
1009 }
1010
1011 pub fn cluster_events_handle(&self) -> &CoerceClusterEvents {
1013 &self.cluster_events
1014 }
1015
1016 pub fn cluster_events(&self) -> &CoerceClusterEvents {
1018 &self.cluster_events
1019 }
1020
1021 pub fn start_system_actors(&mut self) {
1029 self.start_system_actors_with_config(dactor::SystemActorConfig::default());
1030 }
1031
1032 pub fn start_system_actors_with_config(&mut self, config: dactor::SystemActorConfig) {
1037 use crate::system_actors::*;
1038
1039 let pool_size = config.spawn_manager_pool_size.unwrap_or(1).max(1);
1040 let system = self.system.clone();
1041 let mut spawn_refs = Vec::with_capacity(pool_size);
1042
1043 for i in 0..pool_size {
1044 let spawn_mgr = SpawnManagerActor::new(
1045 self.node_id.clone(),
1046 TypeRegistry::new(),
1047 self.next_local.clone(),
1048 );
1049 let spawn_mgr_ref = coerce::actor::scheduler::start_actor(
1050 spawn_mgr,
1051 coerce::actor::new_actor_id(),
1052 coerce::actor::scheduler::ActorType::Tracked,
1053 None,
1054 Some(system.clone()),
1055 None,
1056 format!("dactor-spawn-manager-{i}").into(),
1057 );
1058 spawn_refs.push(spawn_mgr_ref);
1059 }
1060
1061 let watch_mgr = WatchManagerActor::new();
1062 let cancel_mgr = CancelManagerActor::new();
1063 let node_dir = NodeDirectoryActor::new();
1064
1065 let watch_mgr_ref = coerce::actor::scheduler::start_actor(
1066 watch_mgr,
1067 coerce::actor::new_actor_id(),
1068 coerce::actor::scheduler::ActorType::Tracked,
1069 None,
1070 Some(system.clone()),
1071 None,
1072 "dactor-watch-manager".to_string().into(),
1073 );
1074 let cancel_mgr_ref = coerce::actor::scheduler::start_actor(
1075 cancel_mgr,
1076 coerce::actor::new_actor_id(),
1077 coerce::actor::scheduler::ActorType::Tracked,
1078 None,
1079 Some(system.clone()),
1080 None,
1081 "dactor-cancel-manager".to_string().into(),
1082 );
1083 let node_dir_ref = coerce::actor::scheduler::start_actor(
1084 node_dir,
1085 coerce::actor::new_actor_id(),
1086 coerce::actor::scheduler::ActorType::Tracked,
1087 None,
1088 Some(system),
1089 None,
1090 "dactor-node-directory".to_string().into(),
1091 );
1092
1093 self.system_actors = Some(CoerceSystemActorRefs {
1094 spawn_managers: spawn_refs,
1095 spawn_manager_counter: std::sync::atomic::AtomicU64::new(0),
1096 watch_manager: watch_mgr_ref,
1097 cancel_manager: cancel_mgr_ref,
1098 node_directory: node_dir_ref,
1099 });
1100 }
1101
1102 pub fn system_actor_refs(&self) -> Option<&CoerceSystemActorRefs> {
1106 self.system_actors.as_ref()
1107 }
1108
1109 pub async fn spawn<A>(&self, name: &str, args: A::Args) -> Result<CoerceActorRef<A>, dactor::errors::RuntimeError>
1111 where
1112 A: Actor<Deps = ()> + Send + Sync + 'static,
1113 {
1114 Ok(self.spawn_internal::<A>(name, args, (), Vec::new(), MailboxConfig::Unbounded))
1115 }
1116
1117 pub async fn spawn_with_deps<A>(&self, name: &str, args: A::Args, deps: A::Deps) -> Result<CoerceActorRef<A>, dactor::errors::RuntimeError>
1119 where
1120 A: Actor + Send + Sync + 'static,
1121 {
1122 Ok(self.spawn_internal::<A>(name, args, deps, Vec::new(), MailboxConfig::Unbounded))
1123 }
1124
1125 pub async fn spawn_with_options<A>(
1127 &self,
1128 name: &str,
1129 args: A::Args,
1130 options: SpawnOptions,
1131 ) -> Result<CoerceActorRef<A>, dactor::errors::RuntimeError>
1132 where
1133 A: Actor<Deps = ()> + Send + Sync + 'static,
1134 {
1135 Ok(self.spawn_internal::<A>(name, args, (), options.interceptors, options.mailbox))
1136 }
1137
1138 fn spawn_internal<A>(
1139 &self,
1140 name: &str,
1141 args: A::Args,
1142 deps: A::Deps,
1143 interceptors: Vec<Box<dyn InboundInterceptor>>,
1144 mailbox: MailboxConfig,
1145 ) -> CoerceActorRef<A>
1146 where
1147 A: Actor + Send + Sync + 'static,
1148 {
1149 let local = self.next_local.fetch_add(1, Ordering::SeqCst);
1150 let actor_id = ActorId {
1151 node: self.node_id.clone(),
1152 local,
1153 };
1154 let actor_name = name.to_string();
1155
1156 let actor = A::create(args, deps);
1157 let (stop_tx, stop_rx) = tokio::sync::oneshot::channel();
1158
1159 let wrapper = CoerceDactorActor {
1160 actor,
1161 ctx: ActorContext::new(actor_id.clone(), actor_name.clone()),
1162 interceptors,
1163 watchers: self.watchers.clone(),
1164 stop_reason: None,
1165 dead_letter_handler: self.dead_letter_handler.clone(),
1166 stop_notifier: Mutex::new(Some(stop_tx)),
1167 };
1168
1169 let system = self.system.clone();
1173 let coerce_ref = coerce::actor::scheduler::start_actor(
1174 wrapper,
1175 coerce::actor::new_actor_id(),
1176 coerce::actor::scheduler::ActorType::Tracked,
1177 None,
1178 Some(system),
1179 None,
1180 name.to_string().into(),
1181 );
1182
1183 let bounded_tx = match mailbox {
1185 MailboxConfig::Bounded { capacity, overflow } => {
1186 let (btx, mut brx) =
1187 tokio::sync::mpsc::channel::<DactorMsg<A>>(capacity);
1188 let fwd_ref = coerce_ref.clone();
1189 tokio::spawn(async move {
1190 while let Some(msg) = brx.recv().await {
1191 if fwd_ref.notify(msg).is_err() {
1192 break; }
1194 }
1195 });
1196 Some(BoundedMailboxSender::new(btx, overflow))
1197 }
1198 MailboxConfig::Unbounded => None,
1199 };
1200
1201 self.stop_receivers
1203 .lock()
1204 .unwrap()
1205 .insert(actor_id.clone(), stop_rx);
1206
1207 CoerceActorRef {
1208 id: actor_id,
1209 name: actor_name,
1210 inner: coerce_ref,
1211 bounded_tx,
1212 outbound_interceptors: self.outbound_interceptors.clone(),
1213 drop_observer: self.drop_observer.clone(),
1214 dead_letter_handler: self.dead_letter_handler.clone(),
1215 }
1216 }
1217
1218 pub fn watch<W>(&self, watcher: &CoerceActorRef<W>, target_id: ActorId)
1223 where
1224 W: Actor + Handler<ChildTerminated> + Send + Sync + 'static,
1225 {
1226 let watcher_id = watcher.id();
1227 let watcher_inner = watcher.inner.clone();
1228
1229 let entry = WatchEntry {
1230 watcher_id,
1231 notify: Box::new(move |msg: ChildTerminated| {
1232 let dispatch: Box<dyn Dispatch<W>> = Box::new(TypedDispatch { msg });
1233 if watcher_inner.notify(DactorMsg::new(dispatch)).is_err() {
1234 tracing::debug!("watch notification dropped — watcher may have stopped");
1235 }
1236 }),
1237 };
1238
1239 let mut watchers = self.watchers.lock().unwrap();
1240 watchers.entry(target_id).or_default().push(entry);
1241 }
1242
1243 pub fn unwatch(&self, watcher_id: &ActorId, target_id: &ActorId) {
1245 let mut watchers = self.watchers.lock().unwrap();
1246 if let Some(entries) = watchers.get_mut(target_id) {
1247 entries.retain(|e| &e.watcher_id != watcher_id);
1248 if entries.is_empty() {
1249 watchers.remove(target_id);
1250 }
1251 }
1252 }
1253
1254 pub fn spawn_manager(&self) -> &SpawnManager {
1260 &self.spawn_manager
1261 }
1262
1263 pub fn spawn_manager_mut(&mut self) -> &mut SpawnManager {
1265 &mut self.spawn_manager
1266 }
1267
1268 pub fn register_factory(
1273 &mut self,
1274 type_name: impl Into<String>,
1275 factory: impl Fn(&[u8]) -> Result<Box<dyn std::any::Any + Send>, dactor::remote::SerializationError>
1276 + Send
1277 + Sync
1278 + 'static,
1279 ) {
1280 let type_name = type_name.into();
1281 let factory = Arc::new(factory);
1282
1283 self.spawn_manager
1285 .type_registry_mut()
1286 .register_factory(type_name.clone(), {
1287 let f = factory.clone();
1288 move |bytes: &[u8]| f(bytes)
1289 });
1290
1291 if let Some(ref actors) = self.system_actors {
1293 for worker in &actors.spawn_managers {
1294 let f = factory.clone();
1295 let _ = worker.notify(
1296 crate::system_actors::RegisterFactory {
1297 type_name: type_name.clone(),
1298 factory: Box::new(move |bytes: &[u8]| f(bytes)),
1299 },
1300 );
1301 }
1302 }
1303 }
1304
1305 pub fn handle_spawn_request(
1322 &mut self,
1323 request: &SpawnRequest,
1324 ) -> Result<(ActorId, Box<dyn std::any::Any + Send>), SpawnResponse> {
1325 match self.spawn_manager.create_actor(request) {
1326 Ok(actor) => {
1327 let local = self.next_local.fetch_add(1, Ordering::SeqCst);
1328 let actor_id = ActorId {
1329 node: self.node_id.clone(),
1330 local,
1331 };
1332 self.spawn_manager.record_spawn(actor_id.clone());
1333 Ok((actor_id, actor))
1334 }
1335 Err(e) => Err(SpawnResponse::Failure {
1336 request_id: request.request_id.clone(),
1337 error: e.to_string(),
1338 }),
1339 }
1340 }
1341
1342 pub fn watch_manager(&self) -> &WatchManager {
1348 &self.watch_manager
1349 }
1350
1351 pub fn watch_manager_mut(&mut self) -> &mut WatchManager {
1353 &mut self.watch_manager
1354 }
1355
1356 pub fn remote_watch(&mut self, target: ActorId, watcher: ActorId) {
1359 self.watch_manager.watch(target, watcher);
1360 }
1361
1362 pub fn remote_unwatch(&mut self, target: &ActorId, watcher: &ActorId) {
1364 self.watch_manager.unwatch(target, watcher);
1365 }
1366
1367 pub fn notify_terminated(&mut self, terminated: &ActorId) -> Vec<WatchNotification> {
1373 self.watch_manager.on_terminated(terminated)
1374 }
1375
1376 pub fn cancel_manager(&self) -> &CancelManager {
1382 &self.cancel_manager
1383 }
1384
1385 pub fn cancel_manager_mut(&mut self) -> &mut CancelManager {
1387 &mut self.cancel_manager
1388 }
1389
1390 pub fn register_cancel(&mut self, request_id: String, token: CancellationToken) {
1392 self.cancel_manager.register(request_id, token);
1393 }
1394
1395 pub fn cancel_request(&mut self, request_id: &str) -> CancelResponse {
1397 self.cancel_manager.cancel(request_id)
1398 }
1399
1400 pub fn complete_request(&mut self, request_id: &str) {
1405 self.cancel_manager.remove(request_id);
1406 }
1407
1408 pub fn node_directory(&self) -> &NodeDirectory {
1414 &self.node_directory
1415 }
1416
1417 pub fn node_directory_mut(&mut self) -> &mut NodeDirectory {
1419 &mut self.node_directory
1420 }
1421
1422 pub fn connect_peer(&mut self, peer_id: NodeId, address: Option<String>) {
1435 let was_connected = self.node_directory.is_connected(&peer_id);
1436 if let Some(existing) = self.node_directory.get_peer(&peer_id) {
1437 let resolved_address = address.or_else(|| existing.address.clone());
1438 self.node_directory.remove_peer(&peer_id);
1439 self.node_directory
1440 .add_peer(peer_id.clone(), resolved_address);
1441 } else {
1442 self.node_directory.add_peer(peer_id.clone(), address);
1443 }
1444 self.node_directory
1445 .set_status(&peer_id, PeerStatus::Connected);
1446 if !was_connected {
1447 self.cluster_events
1448 .emit(dactor::ClusterEvent::NodeJoined(peer_id));
1449 }
1450 }
1451
1452 pub fn disconnect_peer(&mut self, peer_id: &NodeId) {
1456 let was_connected = self.node_directory.is_connected(peer_id);
1457 self.node_directory
1458 .set_status(peer_id, PeerStatus::Disconnected);
1459 if was_connected {
1460 self.cluster_events
1461 .emit(dactor::ClusterEvent::NodeLeft(peer_id.clone()));
1462 }
1463 }
1464
1465 pub fn is_peer_connected(&self, peer_id: &NodeId) -> bool {
1467 self.node_directory.is_connected(peer_id)
1468 }
1469
1470 pub async fn await_stop(&self, actor_id: &ActorId) -> Result<(), String> {
1482 let rx = {
1483 let mut receivers = self.stop_receivers.lock().unwrap();
1484 receivers.remove(actor_id)
1485 };
1486 match rx {
1487 Some(rx) => rx
1488 .await
1489 .map_err(|_| "stop notifier dropped".to_string())
1490 .and_then(|r| r),
1491 None => Ok(()),
1492 }
1493 }
1494
1495 pub async fn await_all(&self) -> Result<(), String> {
1500 let receivers: Vec<_> = {
1501 let mut map = self.stop_receivers.lock().unwrap();
1502 map.drain().collect()
1503 };
1504 let mut first_error = None;
1505 for (_, rx) in receivers {
1506 let result = rx.await.map_err(|e| format!("stop notifier dropped: {e}")).and_then(|r| r);
1507 if let Err(e) = result {
1508 if first_error.is_none() {
1509 first_error = Some(e);
1510 }
1511 }
1512 }
1513 match first_error {
1514 Some(e) => Err(e),
1515 None => Ok(()),
1516 }
1517 }
1518
1519 pub fn cleanup_finished(&self) {
1524 let mut receivers = self.stop_receivers.lock().unwrap();
1525 receivers.retain(|_, rx| {
1526 matches!(
1527 rx.try_recv(),
1528 Err(tokio::sync::oneshot::error::TryRecvError::Empty)
1529 )
1530 });
1531 }
1532
1533 pub fn active_handle_count(&self) -> usize {
1539 self.stop_receivers.lock().unwrap().len()
1540 }
1541}
1542
1543impl Default for CoerceRuntime {
1544 fn default() -> Self {
1545 Self::new()
1546 }
1547}
1548
1549#[async_trait::async_trait]
1560impl dactor::system_router::SystemMessageRouter for CoerceRuntime {
1561 async fn route_system_envelope(
1562 &self,
1563 envelope: dactor::remote::WireEnvelope,
1564 ) -> Result<dactor::system_router::RoutingOutcome, dactor::system_router::RoutingError> {
1565 use dactor::system_actors::*;
1566 use dactor::system_router::{RoutingError, RoutingOutcome};
1567
1568 dactor::system_router::validate_system_message_type(&envelope.message_type)?;
1569
1570 let refs = self
1571 .system_actors
1572 .as_ref()
1573 .ok_or_else(|| RoutingError::new("system actors not started"))?;
1574
1575 match envelope.message_type.as_str() {
1576 SYSTEM_MSG_TYPE_SPAWN => {
1577 let request = dactor::proto::decode_spawn_request(&envelope.body)
1578 .map_err(|e| RoutingError::new(format!("decode SpawnRequest: {e}")))?;
1579
1580 let req_id = request.request_id.clone();
1581 let outcome = refs
1582 .spawn_manager()
1583 .send(crate::system_actors::HandleSpawnRequest(request))
1584 .await
1585 .map_err(|e| RoutingError::new(format!("SpawnManager send: {e}")))?;
1586
1587 match outcome {
1588 crate::system_actors::SpawnOutcome::Success { actor_id, .. } => {
1589 Ok(RoutingOutcome::SpawnCompleted {
1590 request_id: req_id,
1591 actor_id,
1592 })
1593 }
1594 crate::system_actors::SpawnOutcome::Failure(SpawnResponse::Failure {
1595 request_id,
1596 error,
1597 }) => Ok(RoutingOutcome::SpawnFailed { request_id, error }),
1598 crate::system_actors::SpawnOutcome::Failure(SpawnResponse::Success {
1599 ..
1600 }) => {
1601 unreachable!("SpawnOutcome::Failure always wraps SpawnResponse::Failure")
1602 }
1603 }
1604 }
1605
1606 SYSTEM_MSG_TYPE_WATCH => {
1607 let request = dactor::proto::decode_watch_request(&envelope.body)
1608 .map_err(|e| RoutingError::new(format!("decode WatchRequest: {e}")))?;
1609
1610 refs.watch_manager
1611 .notify(crate::system_actors::RemoteWatch {
1612 target: request.target,
1613 watcher: request.watcher,
1614 })
1615 .map_err(|e| RoutingError::new(format!("WatchManager notify: {e}")))?;
1616
1617 Ok(RoutingOutcome::Acknowledged)
1618 }
1619
1620 SYSTEM_MSG_TYPE_UNWATCH => {
1621 let request = dactor::proto::decode_unwatch_request(&envelope.body)
1622 .map_err(|e| RoutingError::new(format!("decode UnwatchRequest: {e}")))?;
1623
1624 refs.watch_manager
1625 .notify(crate::system_actors::RemoteUnwatch {
1626 target: request.target,
1627 watcher: request.watcher,
1628 })
1629 .map_err(|e| RoutingError::new(format!("WatchManager notify: {e}")))?;
1630
1631 Ok(RoutingOutcome::Acknowledged)
1632 }
1633
1634 SYSTEM_MSG_TYPE_CANCEL => {
1635 let request = dactor::proto::decode_cancel_request(&envelope.body)
1636 .map_err(|e| RoutingError::new(format!("decode CancelRequest: {e}")))?;
1637
1638 let request_id = request
1639 .request_id
1640 .ok_or_else(|| RoutingError::new("CancelRequest missing request_id"))?;
1641
1642 let outcome = refs
1643 .cancel_manager
1644 .send(crate::system_actors::CancelById(request_id))
1645 .await
1646 .map_err(|e| RoutingError::new(format!("CancelManager send: {e}")))?;
1647
1648 match outcome.0 {
1649 CancelResponse::Acknowledged => Ok(RoutingOutcome::CancelAcknowledged),
1650 CancelResponse::NotFound { reason } => {
1651 Ok(RoutingOutcome::CancelNotFound { reason })
1652 }
1653 }
1654 }
1655
1656 SYSTEM_MSG_TYPE_CONNECT_PEER => {
1657 let (peer_id, address) = dactor::proto::decode_connect_peer(&envelope.body)
1658 .map_err(|e| RoutingError::new(format!("decode ConnectPeer: {e}")))?;
1659
1660 refs.node_directory
1661 .notify(crate::system_actors::ConnectPeer {
1662 peer_id,
1663 address,
1664 })
1665 .map_err(|e| RoutingError::new(format!("NodeDirectory notify: {e}")))?;
1666
1667 Ok(RoutingOutcome::Acknowledged)
1668 }
1669
1670 SYSTEM_MSG_TYPE_DISCONNECT_PEER => {
1671 let peer_id = dactor::proto::decode_disconnect_peer(&envelope.body)
1672 .map_err(|e| RoutingError::new(format!("decode DisconnectPeer: {e}")))?;
1673
1674 refs.node_directory
1675 .notify(crate::system_actors::DisconnectPeer(peer_id))
1676 .map_err(|e| RoutingError::new(format!("NodeDirectory notify: {e}")))?;
1677
1678 Ok(RoutingOutcome::Acknowledged)
1679 }
1680
1681 other => Err(RoutingError::new(format!(
1682 "unhandled system message type: {other}"
1683 ))),
1684 }
1685 }
1686}
1687