1use std::collections::HashMap;
7use std::marker::PhantomData;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Arc, Mutex};
10use std::time::Duration;
11
12use futures::FutureExt;
13use tokio_util::sync::CancellationToken;
14
15use dactor::actor::{
16 Actor, ActorContext, ActorError, ActorRef, AskReply, ReduceHandler, Handler, ExpandHandler,
17 TransformHandler,
18};
19use dactor::dead_letter::{DeadLetterEvent, DeadLetterHandler, DeadLetterReason};
20use dactor::dispatch::{AskDispatch, Dispatch, ReduceDispatch, ExpandDispatch, TransformDispatch, TypedDispatch};
21use dactor::errors::{ActorSendError, ErrorAction, RuntimeError};
22use dactor::interceptor::{
23 collect_handler_wrappers, apply_handler_wrappers,
24 Disposition, DropObserver, InboundContext, InboundInterceptor, OutboundInterceptor, Outcome,
25 SendMode,
26};
27use dactor::mailbox::MailboxConfig;
28use dactor::message::{Headers, Message, RuntimeHeaders};
29use dactor::node::{ActorId, NodeId};
30use dactor::runtime_support::{
31 spawn_reduce_batched_drain, spawn_reduce_drain, spawn_transform_drain,
32 wrap_batched_stream_with_interception, wrap_stream_with_interception, OutboundPipeline,
33};
34use dactor::stream::{
35 BatchConfig, BatchReader, BatchWriter, BoxStream, StreamReceiver, StreamSender,
36};
37use dactor::supervision::ChildTerminated;
38use dactor::system_actors::{
39 CancelManager, CancelResponse, NodeDirectory, PeerStatus, SpawnManager, SpawnRequest,
40 SpawnResponse, WatchManager, WatchNotification,
41};
42use dactor::type_registry::TypeRegistry;
43
44use crate::cluster::RactorClusterEvents;
45
46struct WatchEntry {
52 watcher_id: ActorId,
53 notify: Box<dyn Fn(ChildTerminated) + Send + Sync>,
55}
56
57type WatcherMap = Arc<Mutex<HashMap<ActorId, Vec<WatchEntry>>>>;
59
60struct DactorMsg<A: Actor>(Box<dyn Dispatch<A>>);
66
67struct RactorDactorActor<A: Actor> {
69 _phantom: PhantomData<fn() -> A>,
70}
71
72struct RactorActorState<A: Actor> {
74 actor: A,
75 ctx: ActorContext,
76 interceptors: Vec<Box<dyn InboundInterceptor>>,
77 watchers: WatcherMap,
78 stop_reason: Option<String>,
79 dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
80 stop_notifier: Option<tokio::sync::oneshot::Sender<Result<(), String>>>,
82}
83
84struct RactorSpawnArgs<A: Actor> {
86 args: A::Args,
87 deps: A::Deps,
88 actor_id: ActorId,
89 actor_name: String,
90 interceptors: Vec<Box<dyn InboundInterceptor>>,
91 watchers: WatcherMap,
92 dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
93 stop_notifier: Option<tokio::sync::oneshot::Sender<Result<(), String>>>,
94}
95
96impl<A: Actor + 'static> ractor::Actor for RactorDactorActor<A> {
97 type Msg = DactorMsg<A>;
98 type State = RactorActorState<A>;
99 type Arguments = RactorSpawnArgs<A>;
100
101 async fn pre_start(
102 &self,
103 _myself: ractor::ActorRef<Self::Msg>,
104 args: Self::Arguments,
105 ) -> Result<Self::State, ractor::ActorProcessingErr> {
106 let mut actor = A::create(args.args, args.deps);
107 let mut ctx = ActorContext::new(args.actor_id, args.actor_name);
108 actor.on_start(&mut ctx).await;
109 Ok(RactorActorState {
110 actor,
111 ctx,
112 interceptors: args.interceptors,
113 watchers: args.watchers,
114 stop_reason: None,
115 dead_letter_handler: args.dead_letter_handler,
116 stop_notifier: args.stop_notifier,
117 })
118 }
119
120 async fn handle(
121 &self,
122 myself: ractor::ActorRef<Self::Msg>,
123 message: Self::Msg,
124 state: &mut Self::State,
125 ) -> Result<(), ractor::ActorProcessingErr> {
126 let dispatch = message.0;
127
128 let send_mode = dispatch.send_mode();
130 let message_type = dispatch.message_type_name();
131
132 state.ctx.send_mode = Some(send_mode);
133 state.ctx.headers = Headers::new();
134
135 let runtime_headers = RuntimeHeaders::new();
137 let mut headers = Headers::new();
138 let mut total_delay = Duration::ZERO;
139 let mut rejection: Option<(String, Disposition)> = None;
140
141 {
142 let ictx = InboundContext {
143 actor_id: state.ctx.actor_id.clone(),
144 actor_name: &state.ctx.actor_name,
145 message_type,
146 send_mode,
147 remote: false,
148 origin_node: None,
149 };
150
151 for interceptor in &state.interceptors {
152 match interceptor.on_receive(
153 &ictx,
154 &runtime_headers,
155 &mut headers,
156 dispatch.message_any(),
157 ) {
158 Disposition::Continue => {}
159 Disposition::Delay(d) => {
160 total_delay += d;
161 }
162 disp @ (Disposition::Drop | Disposition::Reject(_) | Disposition::Retry(_)) => {
163 rejection = Some((interceptor.name().to_string(), disp));
164 break;
165 }
166 }
167 }
168 }
169
170 if let Some((interceptor_name, disposition)) = rejection {
172 if matches!(disposition, Disposition::Drop) {
173 if let Some(ref handler) = *state.dead_letter_handler {
174 let event = DeadLetterEvent {
175 target_id: state.ctx.actor_id.clone(),
176 target_name: Some(state.ctx.actor_name.clone()),
177 message_type,
178 send_mode,
179 reason: DeadLetterReason::DroppedByInterceptor {
180 interceptor: interceptor_name.clone(),
181 },
182 message: None,
183 };
184 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
185 handler.on_dead_letter(event);
186 }));
187 }
188 }
189 dispatch.reject(disposition, &interceptor_name);
190 return Ok(());
191 }
192
193 if !total_delay.is_zero() {
194 tokio::time::sleep(total_delay).await;
195 }
196
197 let ictx_for_wrap = InboundContext {
200 actor_id: state.ctx.actor_id.clone(),
201 actor_name: &state.ctx.actor_name,
202 message_type,
203 send_mode,
204 remote: false,
205 origin_node: None,
206 };
207 let wrappers = collect_handler_wrappers(&state.interceptors, &ictx_for_wrap, &headers);
208 let needs_wrap = wrappers.iter().any(|w| w.is_some());
209
210 state.ctx.headers = headers;
212
213 let cancel_token = dispatch.cancel_token();
215 state.ctx.set_cancellation_token(cancel_token.clone());
216
217 if let Some(ref token) = cancel_token {
219 if token.is_cancelled() {
220 dispatch.cancel();
221 state.ctx.set_cancellation_token(None);
222 return Ok(());
223 }
224 }
225
226 let result = if needs_wrap {
228 let (result_tx, mut result_rx) = tokio::sync::oneshot::channel();
229
230 let inner: std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>> = Box::pin(async {
231 let r = std::panic::AssertUnwindSafe(
232 dispatch.dispatch(&mut state.actor, &mut state.ctx),
233 )
234 .catch_unwind()
235 .await;
236 let _ = result_tx.send(r);
237 });
238
239 let wrapped = apply_handler_wrappers(wrappers, inner);
240
241 let wrapped = std::panic::AssertUnwindSafe(wrapped);
242
243 if let Some(ref token) = cancel_token {
244 tokio::select! {
245 biased;
246 _ = wrapped.catch_unwind() => {},
247 _ = token.cancelled() => {
248 state.ctx.set_cancellation_token(None);
249 return Ok(());
250 }
251 }
252 } else {
253 wrapped.catch_unwind().await.ok();
254 }
255
256 match result_rx.try_recv() {
257 Ok(r) => r,
258 Err(_) => Err(Box::new(
259 "interceptor wrap_handler did not await the handler future",
260 ) as Box<dyn std::any::Any + Send>),
261 }
262 } else {
263 if let Some(ref token) = cancel_token {
264 let dispatch_fut =
265 std::panic::AssertUnwindSafe(dispatch.dispatch(&mut state.actor, &mut state.ctx))
266 .catch_unwind();
267 tokio::select! {
268 biased;
269 r = dispatch_fut => r,
270 _ = token.cancelled() => {
271 state.ctx.set_cancellation_token(None);
272 return Ok(());
273 }
274 }
275 } else {
276 std::panic::AssertUnwindSafe(dispatch.dispatch(&mut state.actor, &mut state.ctx))
277 .catch_unwind()
278 .await
279 }
280 };
281
282 state.ctx.set_cancellation_token(None);
283
284 let ictx = InboundContext {
286 actor_id: state.ctx.actor_id.clone(),
287 actor_name: &state.ctx.actor_name,
288 message_type,
289 send_mode,
290 remote: false,
291 origin_node: None,
292 };
293
294 match result {
295 Ok(dispatch_result) => {
296 let outcome = match (&dispatch_result.reply, send_mode) {
297 (Some(reply), SendMode::Ask) => Outcome::AskSuccess {
298 reply: reply.as_ref(),
299 },
300 _ => Outcome::TellSuccess,
301 };
302
303 for interceptor in &state.interceptors {
304 interceptor.on_complete(&ictx, &runtime_headers, &state.ctx.headers, &outcome);
305 }
306
307 dispatch_result.send_reply();
309 }
310 Err(_panic) => {
311 let error = ActorError::internal("handler panicked");
312 let action = state.actor.on_error(&error);
313
314 let outcome = Outcome::HandlerError { error };
315 for interceptor in &state.interceptors {
316 interceptor.on_complete(&ictx, &runtime_headers, &state.ctx.headers, &outcome);
317 }
318
319 match action {
320 ErrorAction::Resume => {
321 }
323 ErrorAction::Stop | ErrorAction::Escalate => {
324 state.stop_reason = Some("handler panicked".into());
325 myself.stop(None);
326 }
327 ErrorAction::Restart => {
328 tracing::warn!("Restart not fully implemented, treating as Resume");
329 }
330 }
331 }
332 }
333
334 Ok(())
335 }
336
337 async fn post_stop(
338 &self,
339 _myself: ractor::ActorRef<Self::Msg>,
340 state: &mut Self::State,
341 ) -> Result<(), ractor::ActorProcessingErr> {
342 state.ctx.send_mode = None;
344 state.ctx.headers = Headers::new();
345 state.ctx.set_cancellation_token(None);
346
347 let on_stop_panicked =
350 std::panic::AssertUnwindSafe(state.actor.on_stop())
351 .catch_unwind()
352 .await
353 .is_err();
354 if on_stop_panicked && state.stop_reason.is_none() {
355 state.stop_reason = Some("actor panicked in on_stop".to_string());
356 }
357
358 let actor_id = state.ctx.actor_id.clone();
360 let actor_name = state.ctx.actor_name.clone();
361 let entries = {
362 let mut watchers = state.watchers.lock().unwrap();
363 let target_entries = watchers.remove(&actor_id).unwrap_or_default();
365 for entries in watchers.values_mut() {
367 entries.retain(|e| e.watcher_id != actor_id);
368 }
369 watchers.retain(|_, v| !v.is_empty());
370 target_entries
371 };
372 if !entries.is_empty() {
373 let notification = ChildTerminated {
374 child_id: actor_id,
375 child_name: actor_name,
376 reason: state.stop_reason.clone(),
377 };
378 for entry in &entries {
379 (entry.notify)(notification.clone());
380 }
381 }
382
383 if let Some(tx) = state.stop_notifier.take() {
386 let result = if on_stop_panicked {
387 Err("actor panicked in on_stop".to_string())
388 } else {
389 Ok(())
390 };
391 let _ = tx.send(result);
392 }
393
394 Ok(())
395 }
396}
397
398pub struct RactorActorRef<A: Actor> {
411 id: ActorId,
412 name: String,
413 inner: ractor::ActorRef<DactorMsg<A>>,
414 bounded_tx: Option<BoundedMailboxSender<DactorMsg<A>>>,
415 outbound_interceptors: Arc<Vec<Box<dyn OutboundInterceptor>>>,
416 drop_observer: Option<Arc<dyn DropObserver>>,
417 dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
418}
419
420use dactor::runtime_support::BoundedMailboxSender;
421
422impl<A: Actor> Clone for RactorActorRef<A> {
423 fn clone(&self) -> Self {
424 Self {
425 id: self.id.clone(),
426 name: self.name.clone(),
427 inner: self.inner.clone(),
428 bounded_tx: self.bounded_tx.clone(),
429 outbound_interceptors: self.outbound_interceptors.clone(),
430 drop_observer: self.drop_observer.clone(),
431 dead_letter_handler: self.dead_letter_handler.clone(),
432 }
433 }
434}
435
436impl<A: Actor> std::fmt::Debug for RactorActorRef<A> {
437 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
438 write!(f, "RactorActorRef({}, {:?})", self.name, self.id)
439 }
440}
441
442impl<A: Actor + 'static> RactorActorRef<A> {
443 fn outbound_pipeline(&self) -> OutboundPipeline {
444 OutboundPipeline {
445 interceptors: self.outbound_interceptors.clone(),
446 drop_observer: self.drop_observer.clone(),
447 target_id: self.id.clone(),
448 target_name: self.name.clone(),
449 }
450 }
451
452 fn notify_dead_letter(
453 &self,
454 message_type: &'static str,
455 send_mode: SendMode,
456 reason: DeadLetterReason,
457 ) {
458 if let Some(ref handler) = *self.dead_letter_handler {
459 let event = DeadLetterEvent {
460 target_id: self.id.clone(),
461 target_name: Some(self.name.clone()),
462 message_type,
463 send_mode,
464 reason,
465 message: None,
466 };
467 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
468 handler.on_dead_letter(event);
469 }));
470 }
471 }
472
473 fn send_dispatch(&self, dispatch: Box<dyn Dispatch<A>>) -> Result<(), ActorSendError> {
476 if let Some(ref btx) = self.bounded_tx {
477 btx.try_send(DactorMsg(dispatch))
478 } else {
479 self.inner
480 .cast(DactorMsg(dispatch))
481 .map_err(|e| ActorSendError(e.to_string()))
482 }
483 }
484}
485
486impl<A: Actor + 'static> ActorRef<A> for RactorActorRef<A> {
487 fn id(&self) -> ActorId {
488 self.id.clone()
489 }
490
491 fn name(&self) -> String {
492 self.name.clone()
493 }
494
495 fn is_alive(&self) -> bool {
496 let inner_alive = matches!(
497 self.inner.get_status(),
498 ractor::ActorStatus::Running
499 | ractor::ActorStatus::Starting
500 | ractor::ActorStatus::Upgrading
501 );
502 if let Some(ref btx) = self.bounded_tx {
503 !btx.is_closed() && inner_alive
504 } else {
505 inner_alive
506 }
507 }
508
509 fn pending_messages(&self) -> usize {
510 if let Some(ref btx) = self.bounded_tx {
511 btx.pending()
512 } else {
513 0
514 }
515 }
516
517 fn stop(&self) {
518 self.inner.stop(None);
519 }
520
521 fn tell<M>(&self, msg: M) -> Result<(), ActorSendError>
522 where
523 A: Handler<M>,
524 M: Message<Reply = ()>,
525 {
526 let pipeline = self.outbound_pipeline();
527 let result = pipeline.run_on_send(SendMode::Tell, &msg);
528 match result.disposition {
529 Disposition::Continue => {}
530 Disposition::Delay(_) => {} Disposition::Drop | Disposition::Reject(_) | Disposition::Retry(_) => return Ok(()),
532 }
533
534 let dispatch: Box<dyn Dispatch<A>> = Box::new(TypedDispatch { msg });
535 self.send_dispatch(dispatch).map_err(|e| {
536 let reason = if e.0.contains("full") {
537 DeadLetterReason::MailboxFull
538 } else {
539 DeadLetterReason::ActorStopped
540 };
541 self.notify_dead_letter(std::any::type_name::<M>(), SendMode::Tell, reason);
542 e
543 })
544 }
545
546 fn ask<M>(
547 &self,
548 msg: M,
549 cancel: Option<CancellationToken>,
550 ) -> Result<AskReply<M::Reply>, ActorSendError>
551 where
552 A: Handler<M>,
553 M: Message,
554 {
555 let pipeline = self.outbound_pipeline();
556 let result = pipeline.run_on_send(SendMode::Ask, &msg);
557 match result.disposition {
558 Disposition::Continue => {}
559 Disposition::Delay(_) => {} Disposition::Drop => {
561 let (tx, rx) = tokio::sync::oneshot::channel();
562 let _ = tx.send(Err(RuntimeError::ActorNotFound(
563 "message dropped by outbound interceptor".into(),
564 )));
565 return Ok(AskReply::new(rx));
566 }
567 Disposition::Reject(reason) => {
568 let (tx, rx) = tokio::sync::oneshot::channel();
569 let _ = tx.send(Err(RuntimeError::Rejected {
570 interceptor: result.interceptor_name.to_string(),
571 reason,
572 }));
573 return Ok(AskReply::new(rx));
574 }
575 Disposition::Retry(retry_after) => {
576 let (tx, rx) = tokio::sync::oneshot::channel();
577 let _ = tx.send(Err(RuntimeError::RetryAfter {
578 interceptor: result.interceptor_name.to_string(),
579 retry_after,
580 }));
581 return Ok(AskReply::new(rx));
582 }
583 }
584
585 let (tx, rx) = tokio::sync::oneshot::channel();
586 let dispatch: Box<dyn Dispatch<A>> = Box::new(AskDispatch {
587 msg,
588 reply_tx: tx,
589 cancel,
590 });
591 self.send_dispatch(dispatch).map_err(|e| {
592 let reason = if e.0.contains("full") {
593 DeadLetterReason::MailboxFull
594 } else {
595 DeadLetterReason::ActorStopped
596 };
597 self.notify_dead_letter(std::any::type_name::<M>(), SendMode::Ask, reason);
598 e
599 })?;
600 Ok(AskReply::new(rx))
601 }
602
603 fn expand<M, OutputItem>(
604 &self,
605 msg: M,
606 buffer: usize,
607 batch_config: Option<BatchConfig>,
608 cancel: Option<CancellationToken>,
609 ) -> Result<BoxStream<OutputItem>, ActorSendError>
610 where
611 A: ExpandHandler<M, OutputItem>,
612 M: Send + 'static,
613 OutputItem: Send + 'static,
614 {
615 let pipeline = self.outbound_pipeline();
616 let result = pipeline.run_on_send(SendMode::Expand, &msg);
617 match result.disposition {
618 Disposition::Continue => {}
619 Disposition::Delay(_) => {}
620 Disposition::Drop => {
621 return Err(ActorSendError(
622 "stream dropped by outbound interceptor".into(),
623 ));
624 }
625 Disposition::Reject(reason) => {
626 return Err(ActorSendError(format!("stream rejected: {}", reason)));
627 }
628 Disposition::Retry(_) => {
629 return Err(ActorSendError(
630 "stream retry requested by interceptor".into(),
631 ));
632 }
633 }
634
635 let buffer = buffer.max(1);
636 let (tx, mut rx) = tokio::sync::mpsc::channel(buffer);
637 let sender = StreamSender::new(tx);
638 let dispatch: Box<dyn Dispatch<A>> = Box::new(ExpandDispatch {
639 msg,
640 sender,
641 cancel,
642 });
643 self.send_dispatch(dispatch)?;
644
645 match batch_config {
646 Some(batch_config) => {
647 let (batch_tx, batch_rx) = tokio::sync::mpsc::channel::<Vec<OutputItem>>(buffer);
648 let reader = BatchReader::new(batch_rx);
649 let batch_delay = batch_config.max_delay;
650 tokio::spawn(async move {
651 let mut writer = BatchWriter::new(batch_tx, batch_config);
652 loop {
653 if writer.buffered_count() > 0 {
654 let deadline = tokio::time::Instant::now() + batch_delay;
655 tokio::select! {
656 biased;
657 item = rx.recv() => match item {
658 Some(item) => {
659 if writer.push(item).await.is_err() { break; }
660 }
661 None => break,
662 },
663 _ = tokio::time::sleep_until(deadline) => {
664 if writer.check_deadline().await.is_err() { break; }
665 }
666 }
667 } else {
668 match rx.recv().await {
669 Some(item) => {
670 if writer.push(item).await.is_err() {
671 break;
672 }
673 }
674 None => break,
675 }
676 }
677 }
678 let _ = writer.flush().await;
679 });
680 Ok(wrap_batched_stream_with_interception(
681 reader,
682 buffer,
683 pipeline,
684 std::any::type_name::<M>(),
685 SendMode::Expand,
686 ))
687 }
688 None => Ok(wrap_stream_with_interception(
689 rx,
690 buffer,
691 pipeline,
692 std::any::type_name::<M>(),
693 SendMode::Expand,
694 )),
695 }
696 }
697
698 fn reduce<InputItem, Reply>(
699 &self,
700 input: BoxStream<InputItem>,
701 buffer: usize,
702 batch_config: Option<BatchConfig>,
703 cancel: Option<CancellationToken>,
704 ) -> Result<AskReply<Reply>, ActorSendError>
705 where
706 A: ReduceHandler<InputItem, Reply>,
707 InputItem: Send + 'static,
708 Reply: Send + 'static,
709 {
710 let buffer = buffer.max(1);
711 let (item_tx, item_rx) = tokio::sync::mpsc::channel(buffer);
712 let receiver = StreamReceiver::new(item_rx);
713 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
714 let dispatch: Box<dyn Dispatch<A>> = Box::new(ReduceDispatch {
715 receiver,
716 reply_tx,
717 cancel: cancel.clone(),
718 });
719 self.send_dispatch(dispatch)?;
720
721 let pipeline = self.outbound_pipeline();
722 match batch_config {
723 Some(batch_config) => {
724 spawn_reduce_batched_drain(
725 input,
726 item_tx,
727 buffer,
728 batch_config,
729 cancel,
730 pipeline,
731 std::any::type_name::<InputItem>(),
732 );
733 }
734 None => {
735 spawn_reduce_drain(
736 input,
737 item_tx,
738 cancel,
739 pipeline,
740 std::any::type_name::<InputItem>(),
741 );
742 }
743 }
744
745 Ok(AskReply::new(reply_rx))
746 }
747
748 fn transform<InputItem, OutputItem>(
749 &self,
750 input: BoxStream<InputItem>,
751 buffer: usize,
752 batch_config: Option<BatchConfig>,
753 cancel: Option<CancellationToken>,
754 ) -> Result<BoxStream<OutputItem>, ActorSendError>
755 where
756 A: TransformHandler<InputItem, OutputItem>,
757 InputItem: Send + 'static,
758 OutputItem: Send + 'static,
759 {
760 let buffer = buffer.max(1);
761 let (item_tx, item_rx) = tokio::sync::mpsc::channel(buffer);
762 let (output_tx, mut output_rx) = tokio::sync::mpsc::channel(buffer);
763 let receiver = StreamReceiver::new(item_rx);
764 let sender = StreamSender::new(output_tx);
765 let dispatch: Box<dyn Dispatch<A>> = Box::new(TransformDispatch::new(
766 receiver,
767 sender,
768 cancel.clone(),
769 ));
770 self.send_dispatch(dispatch)?;
771
772 let pipeline = self.outbound_pipeline();
773 spawn_transform_drain(
774 input,
775 item_tx,
776 cancel,
777 pipeline.clone(),
778 std::any::type_name::<InputItem>(),
779 );
780
781 match batch_config {
782 Some(batch_config) => {
783 let (batch_tx, batch_rx) =
784 tokio::sync::mpsc::channel::<Vec<OutputItem>>(buffer);
785 let reader = BatchReader::new(batch_rx);
786 let batch_delay = batch_config.max_delay;
787 tokio::spawn(async move {
788 let mut writer = BatchWriter::new(batch_tx, batch_config);
789 loop {
790 if writer.buffered_count() > 0 {
791 let deadline = tokio::time::Instant::now() + batch_delay;
792 tokio::select! {
793 biased;
794 item = output_rx.recv() => match item {
795 Some(item) => {
796 if writer.push(item).await.is_err() { break; }
797 }
798 None => break,
799 },
800 _ = tokio::time::sleep_until(deadline) => {
801 if writer.check_deadline().await.is_err() { break; }
802 }
803 }
804 } else {
805 match output_rx.recv().await {
806 Some(item) => {
807 if writer.push(item).await.is_err() {
808 break;
809 }
810 }
811 None => break,
812 }
813 }
814 }
815 let _ = writer.flush().await;
816 });
817 Ok(wrap_batched_stream_with_interception(
818 reader,
819 buffer,
820 pipeline,
821 std::any::type_name::<OutputItem>(),
822 SendMode::Transform,
823 ))
824 }
825 None => Ok(wrap_stream_with_interception(
826 output_rx,
827 buffer,
828 pipeline,
829 std::any::type_name::<OutputItem>(),
830 SendMode::Transform,
831 )),
832 }
833 }
834}
835
836pub struct SpawnOptions {
844 pub interceptors: Vec<Box<dyn InboundInterceptor>>,
846 pub mailbox: MailboxConfig,
853}
854
855impl Default for SpawnOptions {
856 fn default() -> Self {
857 Self {
858 interceptors: Vec::new(),
859 mailbox: MailboxConfig::Unbounded,
860 }
861 }
862}
863
864pub struct RactorRuntime {
884 node_id: NodeId,
885 next_local: Arc<AtomicU64>,
886 cluster_events: RactorClusterEvents,
887 outbound_interceptors: Arc<Vec<Box<dyn OutboundInterceptor>>>,
888 drop_observer: Option<Arc<dyn DropObserver>>,
889 dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
890 watchers: WatcherMap,
891 spawn_manager: SpawnManager,
893 watch_manager: WatchManager,
894 cancel_manager: CancelManager,
895 node_directory: NodeDirectory,
896 system_actors: Option<RactorSystemActorRefs>,
898 #[allow(clippy::type_complexity)]
900 stop_receivers: Arc<Mutex<HashMap<ActorId, tokio::sync::oneshot::Receiver<Result<(), String>>>>>,
901 app_version: Option<String>,
903}
904
905pub struct RactorSystemActorRefs {
907 pub spawn_manager: SystemActorHandle<crate::system_actors::SpawnManagerMsg>,
908 pub watch_manager: ractor::ActorRef<crate::system_actors::WatchManagerMsg>,
909 pub cancel_manager: ractor::ActorRef<crate::system_actors::CancelManagerMsg>,
910 pub node_directory: ractor::ActorRef<crate::system_actors::NodeDirectoryMsg>,
911}
912
913pub enum SystemActorHandle<M: ractor::Message> {
915 Single(ractor::ActorRef<M>),
917 Pool(SystemActorPool<M>),
919}
920
921impl<M: ractor::Message> SystemActorHandle<M> {
922 pub fn cast(&self, msg: M) -> Result<(), ractor::MessagingErr<M>> {
924 match self {
925 SystemActorHandle::Single(r) => r.cast(msg),
926 SystemActorHandle::Pool(pool) => pool.cast(msg),
927 }
928 }
929}
930
931pub struct SystemActorPool<M: ractor::Message> {
933 workers: Vec<ractor::ActorRef<M>>,
934 counter: std::sync::atomic::AtomicU64,
935}
936
937impl<M: ractor::Message> SystemActorPool<M> {
938 pub fn new(workers: Vec<ractor::ActorRef<M>>) -> Self {
940 assert!(!workers.is_empty(), "pool must have at least one worker");
941 Self {
942 workers,
943 counter: std::sync::atomic::AtomicU64::new(0),
944 }
945 }
946
947 pub fn cast(&self, msg: M) -> Result<(), ractor::MessagingErr<M>> {
949 let idx = self.counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
950 as usize % self.workers.len();
951 self.workers[idx].cast(msg)
952 }
953
954 pub fn len(&self) -> usize {
956 self.workers.len()
957 }
958
959 pub fn workers(&self) -> &[ractor::ActorRef<M>] {
961 &self.workers
962 }
963}
964
965impl RactorRuntime {
966 pub fn new() -> Self {
971 Self::create(NodeId("ractor-node".into()))
972 }
973
974 pub fn with_node_id(node_id: NodeId) -> Self {
976 Self::create(node_id)
977 }
978
979 fn create(node_id: NodeId) -> Self {
980 Self {
981 node_id,
982 next_local: Arc::new(AtomicU64::new(1)),
983 cluster_events: RactorClusterEvents::new(),
984 outbound_interceptors: Arc::new(Vec::new()),
985 drop_observer: None,
986 dead_letter_handler: Arc::new(None),
987 watchers: Arc::new(Mutex::new(HashMap::new())),
988 spawn_manager: SpawnManager::new(TypeRegistry::new()),
989 watch_manager: WatchManager::new(),
990 cancel_manager: CancelManager::new(),
991 node_directory: NodeDirectory::new(),
992 system_actors: None,
993 stop_receivers: Arc::new(Mutex::new(HashMap::new())),
994 app_version: None,
995 }
996 }
997
998 pub const ADAPTER_NAME: &'static str = "ractor";
1000
1001 pub fn with_app_version(mut self, version: impl Into<String>) -> Self {
1007 self.app_version = Some(version.into());
1008 self
1009 }
1010
1011 pub fn app_version(&self) -> Option<&str> {
1013 self.app_version.as_deref()
1014 }
1015
1016 pub fn handshake_request(&self) -> dactor::HandshakeRequest {
1019 dactor::HandshakeRequest::from_runtime(
1020 self.node_id.clone(),
1021 self.app_version.clone(),
1022 Self::ADAPTER_NAME,
1023 )
1024 }
1025
1026 pub async fn start_system_actors(&mut self) {
1037 self.start_system_actors_with_config(dactor::SystemActorConfig::default()).await;
1038 }
1039
1040 pub async fn start_system_actors_with_config(
1064 &mut self,
1065 config: dactor::SystemActorConfig,
1066 ) {
1067 use crate::system_actors::*;
1068
1069 let pool_size = config.spawn_manager_pool_size.unwrap_or(1).max(1);
1071 let mut spawn_refs = Vec::with_capacity(pool_size);
1072
1073 for _ in 0..pool_size {
1074 let (spawn_ref, _) = ractor::Actor::spawn(
1075 None, SpawnManagerActor,
1076 (self.node_id.clone(), TypeRegistry::new(), self.next_local.clone()),
1077 ).await.expect("failed to spawn SpawnManagerActor");
1078 spawn_refs.push(spawn_ref);
1079 }
1080
1081 let spawn_manager = if spawn_refs.len() == 1 {
1082 SystemActorHandle::Single(spawn_refs.pop().unwrap())
1083 } else {
1084 SystemActorHandle::Pool(SystemActorPool::new(spawn_refs))
1085 };
1086
1087 let (watch_ref, _) = ractor::Actor::spawn(
1089 None, WatchManagerActor, (),
1090 ).await.expect("failed to spawn WatchManagerActor");
1091
1092 let (cancel_ref, _) = ractor::Actor::spawn(
1093 None, CancelManagerActor, (),
1094 ).await.expect("failed to spawn CancelManagerActor");
1095
1096 let (node_dir_ref, _) = ractor::Actor::spawn(
1097 None, NodeDirectoryActor, (),
1098 ).await.expect("failed to spawn NodeDirectoryActor");
1099
1100 self.system_actors = Some(RactorSystemActorRefs {
1101 spawn_manager,
1102 watch_manager: watch_ref,
1103 cancel_manager: cancel_ref,
1104 node_directory: node_dir_ref,
1105 });
1106 }
1107
1108 pub fn node_id(&self) -> &NodeId {
1110 &self.node_id
1111 }
1112
1113 pub fn system_actor_refs(&self) -> Option<&RactorSystemActorRefs> {
1117 self.system_actors.as_ref()
1118 }
1119
1120 pub fn add_outbound_interceptor(&mut self, interceptor: Box<dyn OutboundInterceptor>) {
1125 Arc::get_mut(&mut self.outbound_interceptors)
1126 .expect("cannot add interceptors after actors are spawned")
1127 .push(interceptor);
1128 }
1129
1130 pub fn set_drop_observer(&mut self, observer: Arc<dyn DropObserver>) {
1133 self.drop_observer = Some(observer);
1134 }
1135
1136 pub fn set_dead_letter_handler(&mut self, handler: Arc<dyn DeadLetterHandler>) {
1139 self.dead_letter_handler = Arc::new(Some(handler));
1140 }
1141
1142 pub fn cluster_events_handle(&self) -> &RactorClusterEvents {
1144 &self.cluster_events
1145 }
1146
1147 pub fn cluster_events(&self) -> &RactorClusterEvents {
1149 &self.cluster_events
1150 }
1151
1152 pub async fn spawn<A>(&self, name: &str, args: A::Args) -> Result<RactorActorRef<A>, dactor::errors::RuntimeError>
1154 where
1155 A: Actor<Deps = ()> + 'static,
1156 {
1157 self.spawn_internal::<A>(name, args, (), Vec::new(), MailboxConfig::Unbounded).await
1158 }
1159
1160 pub async fn spawn_with_deps<A>(&self, name: &str, args: A::Args, deps: A::Deps) -> Result<RactorActorRef<A>, dactor::errors::RuntimeError>
1162 where
1163 A: Actor + 'static,
1164 {
1165 self.spawn_internal::<A>(name, args, deps, Vec::new(), MailboxConfig::Unbounded).await
1166 }
1167
1168 pub async fn spawn_with_options<A>(
1170 &self,
1171 name: &str,
1172 args: A::Args,
1173 options: SpawnOptions,
1174 ) -> Result<RactorActorRef<A>, dactor::errors::RuntimeError>
1175 where
1176 A: Actor<Deps = ()> + 'static,
1177 {
1178 self.spawn_internal::<A>(name, args, (), options.interceptors, options.mailbox).await
1179 }
1180
1181 async fn spawn_internal<A>(
1182 &self,
1183 name: &str,
1184 args: A::Args,
1185 deps: A::Deps,
1186 interceptors: Vec<Box<dyn InboundInterceptor>>,
1187 mailbox: MailboxConfig,
1188 ) -> Result<RactorActorRef<A>, dactor::errors::RuntimeError>
1189 where
1190 A: Actor + 'static,
1191 {
1192 let local = self.next_local.fetch_add(1, Ordering::SeqCst);
1193 let actor_id = ActorId {
1194 node: self.node_id.clone(),
1195 local,
1196 };
1197 let actor_name = name.to_string();
1198
1199 let (stop_tx, stop_rx) = tokio::sync::oneshot::channel();
1200
1201 let wrapper = RactorDactorActor::<A> {
1202 _phantom: PhantomData,
1203 };
1204 let spawn_args = RactorSpawnArgs {
1205 args,
1206 deps,
1207 actor_id: actor_id.clone(),
1208 actor_name: actor_name.clone(),
1209 interceptors,
1210 watchers: self.watchers.clone(),
1211 dead_letter_handler: self.dead_letter_handler.clone(),
1212 stop_notifier: Some(stop_tx),
1213 };
1214
1215 let (actor_ref, _join_handle) = ractor::Actor::spawn(Some(name.to_string()), wrapper, spawn_args)
1216 .await
1217 .map_err(|e| dactor::errors::RuntimeError::SpawnFailed(e.to_string()))?;
1218
1219 let bounded_tx = match mailbox {
1221 MailboxConfig::Bounded { capacity, overflow } => {
1222 let (btx, mut brx) = tokio::sync::mpsc::channel::<DactorMsg<A>>(capacity);
1223 let fwd_ref = actor_ref.clone();
1224 tokio::spawn(async move {
1225 while let Some(msg) = brx.recv().await {
1226 if fwd_ref.cast(msg).is_err() {
1227 break;
1228 }
1229 }
1230 });
1231 Some(BoundedMailboxSender::new(btx, overflow))
1232 }
1233 MailboxConfig::Unbounded => None,
1234 };
1235
1236 self.stop_receivers.lock().unwrap().insert(actor_id.clone(), stop_rx);
1238
1239 Ok(RactorActorRef {
1240 id: actor_id,
1241 name: actor_name,
1242 inner: actor_ref,
1243 bounded_tx,
1244 outbound_interceptors: self.outbound_interceptors.clone(),
1245 drop_observer: self.drop_observer.clone(),
1246 dead_letter_handler: self.dead_letter_handler.clone(),
1247 })
1248 }
1249
1250 pub fn watch<W>(&self, watcher: &RactorActorRef<W>, target_id: ActorId)
1255 where
1256 W: Actor + Handler<ChildTerminated> + 'static,
1257 {
1258 let watcher_id = watcher.id();
1259 let watcher_inner = watcher.inner.clone();
1260
1261 let entry = WatchEntry {
1262 watcher_id,
1263 notify: Box::new(move |msg: ChildTerminated| {
1264 let dispatch: Box<dyn Dispatch<W>> = Box::new(TypedDispatch { msg });
1265 if watcher_inner.cast(DactorMsg(dispatch)).is_err() {
1266 tracing::debug!("watch notification dropped — watcher may have stopped");
1267 }
1268 }),
1269 };
1270
1271 let mut watchers = self.watchers.lock().unwrap();
1272 watchers.entry(target_id).or_default().push(entry);
1273 }
1274
1275 pub fn unwatch(&self, watcher_id: &ActorId, target_id: &ActorId) {
1277 let mut watchers = self.watchers.lock().unwrap();
1278 if let Some(entries) = watchers.get_mut(target_id) {
1279 entries.retain(|e| &e.watcher_id != watcher_id);
1280 if entries.is_empty() {
1281 watchers.remove(target_id);
1282 }
1283 }
1284 }
1285
1286 pub fn spawn_manager(&self) -> &SpawnManager {
1292 &self.spawn_manager
1293 }
1294
1295 pub fn spawn_manager_mut(&mut self) -> &mut SpawnManager {
1297 &mut self.spawn_manager
1298 }
1299
1300 pub fn register_factory(
1305 &mut self,
1306 type_name: impl Into<String>,
1307 factory: impl Fn(&[u8]) -> Result<Box<dyn std::any::Any + Send>, dactor::remote::SerializationError>
1308 + Send
1309 + Sync
1310 + 'static,
1311 ) {
1312 let type_name = type_name.into();
1313 let factory = Arc::new(factory);
1314
1315 self.spawn_manager
1317 .type_registry_mut()
1318 .register_factory(type_name.clone(), {
1319 let f = factory.clone();
1320 move |bytes: &[u8]| f(bytes)
1321 });
1322
1323 if let Some(ref actors) = self.system_actors {
1325 match &actors.spawn_manager {
1326 SystemActorHandle::Single(r) => {
1327 let (tx, _rx) = tokio::sync::oneshot::channel();
1328 let f = factory;
1329 let _ = r.cast(
1330 crate::system_actors::SpawnManagerMsg::RegisterFactory {
1331 type_name,
1332 factory: Box::new(move |bytes: &[u8]| f(bytes)),
1333 reply: tx,
1334 },
1335 );
1336 }
1337 SystemActorHandle::Pool(pool) => {
1338 for worker in pool.workers() {
1340 let f = factory.clone();
1341 let (tx, _rx) = tokio::sync::oneshot::channel();
1342 let _ = worker.cast(
1343 crate::system_actors::SpawnManagerMsg::RegisterFactory {
1344 type_name: type_name.clone(),
1345 factory: Box::new(move |bytes: &[u8]| f(bytes)),
1346 reply: tx,
1347 },
1348 );
1349 }
1350 }
1351 }
1352 }
1353 }
1354
1355 pub fn handle_spawn_request(
1372 &mut self,
1373 request: &SpawnRequest,
1374 ) -> Result<(ActorId, Box<dyn std::any::Any + Send>), SpawnResponse> {
1375 match self.spawn_manager.create_actor(request) {
1376 Ok(actor) => {
1377 let local = self.next_local.fetch_add(1, Ordering::SeqCst);
1378 let actor_id = ActorId {
1379 node: self.node_id.clone(),
1380 local,
1381 };
1382 self.spawn_manager.record_spawn(actor_id.clone());
1383 Ok((actor_id, actor))
1384 }
1385 Err(e) => Err(SpawnResponse::Failure {
1386 request_id: request.request_id.clone(),
1387 error: e.to_string(),
1388 }),
1389 }
1390 }
1391
1392 pub fn watch_manager(&self) -> &WatchManager {
1398 &self.watch_manager
1399 }
1400
1401 pub fn watch_manager_mut(&mut self) -> &mut WatchManager {
1403 &mut self.watch_manager
1404 }
1405
1406 pub fn remote_watch(&mut self, target: ActorId, watcher: ActorId) {
1409 self.watch_manager.watch(target, watcher);
1410 }
1411
1412 pub fn remote_unwatch(&mut self, target: &ActorId, watcher: &ActorId) {
1414 self.watch_manager.unwatch(target, watcher);
1415 }
1416
1417 pub fn notify_terminated(&mut self, terminated: &ActorId) -> Vec<WatchNotification> {
1423 self.watch_manager.on_terminated(terminated)
1424 }
1425
1426 pub fn cancel_manager(&self) -> &CancelManager {
1432 &self.cancel_manager
1433 }
1434
1435 pub fn cancel_manager_mut(&mut self) -> &mut CancelManager {
1437 &mut self.cancel_manager
1438 }
1439
1440 pub fn register_cancel(&mut self, request_id: String, token: CancellationToken) {
1442 self.cancel_manager.register(request_id, token);
1443 }
1444
1445 pub fn cancel_request(&mut self, request_id: &str) -> CancelResponse {
1447 self.cancel_manager.cancel(request_id)
1448 }
1449
1450 pub fn complete_request(&mut self, request_id: &str) {
1455 self.cancel_manager.remove(request_id);
1456 }
1457
1458 pub fn node_directory(&self) -> &NodeDirectory {
1464 &self.node_directory
1465 }
1466
1467 pub fn node_directory_mut(&mut self) -> &mut NodeDirectory {
1469 &mut self.node_directory
1470 }
1471
1472 pub fn connect_peer(&mut self, peer_id: NodeId, address: Option<String>) {
1485 let was_connected = self.node_directory.is_connected(&peer_id);
1486 if let Some(existing) = self.node_directory.get_peer(&peer_id) {
1487 let resolved_address = address.or_else(|| existing.address.clone());
1489 self.node_directory.remove_peer(&peer_id);
1490 self.node_directory.add_peer(peer_id.clone(), resolved_address);
1491 } else {
1492 self.node_directory.add_peer(peer_id.clone(), address);
1493 }
1494 self.node_directory.set_status(&peer_id, PeerStatus::Connected);
1495 if !was_connected {
1496 self.cluster_events.emit(dactor::ClusterEvent::NodeJoined(peer_id));
1497 }
1498 }
1499
1500 pub fn disconnect_peer(&mut self, peer_id: &NodeId) {
1504 let was_connected = self.node_directory.is_connected(peer_id);
1505 self.node_directory.set_status(peer_id, PeerStatus::Disconnected);
1506 if was_connected {
1507 self.cluster_events.emit(dactor::ClusterEvent::NodeLeft(peer_id.clone()));
1508 }
1509 }
1510
1511 pub fn is_peer_connected(&self, peer_id: &NodeId) -> bool {
1513 self.node_directory.is_connected(peer_id)
1514 }
1515
1516 pub async fn await_stop(&self, actor_id: &ActorId) -> Result<(), String> {
1529 let rx = {
1530 let mut receivers = self.stop_receivers.lock().unwrap();
1531 receivers.remove(actor_id)
1532 };
1533 match rx {
1534 Some(rx) => rx
1535 .await
1536 .map_err(|_| "stop notifier dropped".to_string())
1537 .and_then(|r| r),
1538 None => Ok(()),
1539 }
1540 }
1541
1542 pub async fn await_all(&self) -> Result<(), String> {
1547 let receivers: Vec<_> = {
1548 let mut map = self.stop_receivers.lock().unwrap();
1549 map.drain().collect()
1550 };
1551 let mut first_error = None;
1552 for (_, rx) in receivers {
1553 let result = rx.await.map_err(|e| format!("stop notifier dropped: {e}")).and_then(|r| r);
1554 if let Err(e) = result {
1555 if first_error.is_none() {
1556 first_error = Some(e);
1557 }
1558 }
1559 }
1560 match first_error {
1561 Some(e) => Err(e),
1562 None => Ok(()),
1563 }
1564 }
1565
1566 pub fn cleanup_finished(&self) {
1571 let mut receivers = self.stop_receivers.lock().unwrap();
1572 receivers.retain(|_, rx| {
1573 matches!(rx.try_recv(), Err(tokio::sync::oneshot::error::TryRecvError::Empty))
1574 });
1575 }
1576
1577 pub fn active_handle_count(&self) -> usize {
1583 self.stop_receivers.lock().unwrap().len()
1584 }
1585}
1586
1587impl Default for RactorRuntime {
1588 fn default() -> Self {
1589 Self::new()
1590 }
1591}
1592
1593#[async_trait::async_trait]
1604impl dactor::system_router::SystemMessageRouter for RactorRuntime {
1605 async fn route_system_envelope(
1606 &self,
1607 envelope: dactor::remote::WireEnvelope,
1608 ) -> Result<dactor::system_router::RoutingOutcome, dactor::system_router::RoutingError> {
1609 use dactor::system_actors::*;
1610 use dactor::system_router::{RoutingError, RoutingOutcome};
1611
1612 dactor::system_router::validate_system_message_type(&envelope.message_type)?;
1613
1614 let refs = self
1615 .system_actors
1616 .as_ref()
1617 .ok_or_else(|| RoutingError::new("system actors not started"))?;
1618
1619 match envelope.message_type.as_str() {
1620 SYSTEM_MSG_TYPE_SPAWN => {
1621 let request = dactor::proto::decode_spawn_request(&envelope.body)
1622 .map_err(|e| RoutingError::new(format!("decode SpawnRequest: {e}")))?;
1623
1624 let req_id = request.request_id.clone();
1625 let (tx, rx) = tokio::sync::oneshot::channel();
1626 refs.spawn_manager
1627 .cast(crate::system_actors::SpawnManagerMsg::HandleRequest {
1628 request,
1629 reply: tx,
1630 })
1631 .map_err(|e| RoutingError::new(format!("SpawnManager mailbox: {e}")))?;
1632
1633 let result = rx
1634 .await
1635 .map_err(|_| RoutingError::new("SpawnManager reply dropped"))?;
1636
1637 match result {
1638 Ok((actor_id, _actor)) => Ok(RoutingOutcome::SpawnCompleted {
1639 request_id: req_id,
1640 actor_id,
1641 }),
1642 Err(SpawnResponse::Failure { request_id, error }) => {
1643 Ok(RoutingOutcome::SpawnFailed { request_id, error })
1644 }
1645 Err(SpawnResponse::Success { .. }) => {
1646 unreachable!("SpawnResult::Err always wraps SpawnResponse::Failure")
1647 }
1648 }
1649 }
1650
1651 SYSTEM_MSG_TYPE_WATCH => {
1652 let request = dactor::proto::decode_watch_request(&envelope.body)
1653 .map_err(|e| RoutingError::new(format!("decode WatchRequest: {e}")))?;
1654
1655 refs.watch_manager
1656 .cast(crate::system_actors::WatchManagerMsg::Watch {
1657 target: request.target,
1658 watcher: request.watcher,
1659 })
1660 .map_err(|e| RoutingError::new(format!("WatchManager mailbox: {e}")))?;
1661
1662 Ok(RoutingOutcome::Acknowledged)
1663 }
1664
1665 SYSTEM_MSG_TYPE_UNWATCH => {
1666 let request = dactor::proto::decode_unwatch_request(&envelope.body)
1667 .map_err(|e| RoutingError::new(format!("decode UnwatchRequest: {e}")))?;
1668
1669 refs.watch_manager
1670 .cast(crate::system_actors::WatchManagerMsg::Unwatch {
1671 target: request.target,
1672 watcher: request.watcher,
1673 })
1674 .map_err(|e| RoutingError::new(format!("WatchManager mailbox: {e}")))?;
1675
1676 Ok(RoutingOutcome::Acknowledged)
1677 }
1678
1679 SYSTEM_MSG_TYPE_CANCEL => {
1680 let request = dactor::proto::decode_cancel_request(&envelope.body)
1681 .map_err(|e| RoutingError::new(format!("decode CancelRequest: {e}")))?;
1682
1683 let request_id = request
1684 .request_id
1685 .ok_or_else(|| RoutingError::new("CancelRequest missing request_id"))?;
1686
1687 let (tx, rx) = tokio::sync::oneshot::channel();
1688 refs.cancel_manager
1689 .cast(crate::system_actors::CancelManagerMsg::Cancel {
1690 request_id,
1691 reply: tx,
1692 })
1693 .map_err(|e| RoutingError::new(format!("CancelManager mailbox: {e}")))?;
1694
1695 let response = rx
1696 .await
1697 .map_err(|_| RoutingError::new("CancelManager reply dropped"))?;
1698
1699 match response {
1700 CancelResponse::Acknowledged => Ok(RoutingOutcome::CancelAcknowledged),
1701 CancelResponse::NotFound { reason } => {
1702 Ok(RoutingOutcome::CancelNotFound { reason })
1703 }
1704 }
1705 }
1706
1707 SYSTEM_MSG_TYPE_CONNECT_PEER => {
1708 let (peer_id, address) = dactor::proto::decode_connect_peer(&envelope.body)
1709 .map_err(|e| RoutingError::new(format!("decode ConnectPeer: {e}")))?;
1710
1711 refs.node_directory
1712 .cast(crate::system_actors::NodeDirectoryMsg::ConnectPeer {
1713 peer_id,
1714 address,
1715 })
1716 .map_err(|e| RoutingError::new(format!("NodeDirectory mailbox: {e}")))?;
1717
1718 Ok(RoutingOutcome::Acknowledged)
1719 }
1720
1721 SYSTEM_MSG_TYPE_DISCONNECT_PEER => {
1722 let peer_id = dactor::proto::decode_disconnect_peer(&envelope.body)
1723 .map_err(|e| RoutingError::new(format!("decode DisconnectPeer: {e}")))?;
1724
1725 refs.node_directory
1726 .cast(crate::system_actors::NodeDirectoryMsg::DisconnectPeer { peer_id })
1727 .map_err(|e| RoutingError::new(format!("NodeDirectory mailbox: {e}")))?;
1728
1729 Ok(RoutingOutcome::Acknowledged)
1730 }
1731
1732 other => Err(RoutingError::new(format!(
1736 "unhandled system message type: {other}"
1737 ))),
1738 }
1739 }
1740}
1741