1use std::collections::HashMap;
7use std::marker::PhantomData;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Arc, Mutex};
10use std::time::Duration;
11
12use futures::FutureExt;
13use tokio_util::sync::CancellationToken;
14
15use dactor::actor::{
16 Actor, ActorContext, ActorError, ActorRef, AskReply, ReduceHandler, Handler, ExpandHandler,
17 TransformHandler,
18};
19use dactor::dead_letter::{DeadLetterEvent, DeadLetterHandler, DeadLetterReason};
20use dactor::dispatch::{AskDispatch, Dispatch, ReduceDispatch, ExpandDispatch, TransformDispatch, TypedDispatch};
21use dactor::errors::{ActorSendError, ErrorAction, RuntimeError};
22use dactor::interceptor::{
23 Disposition, DropObserver, InboundContext, InboundInterceptor, OutboundInterceptor, Outcome,
24 SendMode,
25};
26use dactor::mailbox::MailboxConfig;
27use dactor::message::{Headers, Message, RuntimeHeaders};
28use dactor::node::{ActorId, NodeId};
29use dactor::runtime_support::{
30 spawn_reduce_batched_drain, spawn_reduce_drain, spawn_transform_drain,
31 wrap_batched_stream_with_interception, wrap_stream_with_interception, OutboundPipeline,
32};
33use dactor::stream::{
34 BatchConfig, BatchReader, BatchWriter, BoxStream, StreamReceiver, StreamSender,
35};
36use dactor::supervision::ChildTerminated;
37use dactor::system_actors::{
38 CancelManager, CancelResponse, NodeDirectory, PeerStatus, SpawnManager, SpawnRequest,
39 SpawnResponse, WatchManager, WatchNotification,
40};
41use dactor::type_registry::TypeRegistry;
42
43use crate::cluster::RactorClusterEvents;
44
45struct WatchEntry {
51 watcher_id: ActorId,
52 notify: Box<dyn Fn(ChildTerminated) + Send + Sync>,
54}
55
56type WatcherMap = Arc<Mutex<HashMap<ActorId, Vec<WatchEntry>>>>;
58
59struct DactorMsg<A: Actor>(Box<dyn Dispatch<A>>);
65
66struct RactorDactorActor<A: Actor> {
68 _phantom: PhantomData<fn() -> A>,
69}
70
71struct RactorActorState<A: Actor> {
73 actor: A,
74 ctx: ActorContext,
75 interceptors: Vec<Box<dyn InboundInterceptor>>,
76 watchers: WatcherMap,
77 stop_reason: Option<String>,
78 dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
79 stop_notifier: Option<tokio::sync::oneshot::Sender<Result<(), String>>>,
81}
82
83struct RactorSpawnArgs<A: Actor> {
85 args: A::Args,
86 deps: A::Deps,
87 actor_id: ActorId,
88 actor_name: String,
89 interceptors: Vec<Box<dyn InboundInterceptor>>,
90 watchers: WatcherMap,
91 dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
92 stop_notifier: Option<tokio::sync::oneshot::Sender<Result<(), String>>>,
93}
94
95impl<A: Actor + 'static> ractor::Actor for RactorDactorActor<A> {
96 type Msg = DactorMsg<A>;
97 type State = RactorActorState<A>;
98 type Arguments = RactorSpawnArgs<A>;
99
100 async fn pre_start(
101 &self,
102 _myself: ractor::ActorRef<Self::Msg>,
103 args: Self::Arguments,
104 ) -> Result<Self::State, ractor::ActorProcessingErr> {
105 let mut actor = A::create(args.args, args.deps);
106 let mut ctx = ActorContext::new(args.actor_id, args.actor_name);
107 actor.on_start(&mut ctx).await;
108 Ok(RactorActorState {
109 actor,
110 ctx,
111 interceptors: args.interceptors,
112 watchers: args.watchers,
113 stop_reason: None,
114 dead_letter_handler: args.dead_letter_handler,
115 stop_notifier: args.stop_notifier,
116 })
117 }
118
119 async fn handle(
120 &self,
121 myself: ractor::ActorRef<Self::Msg>,
122 message: Self::Msg,
123 state: &mut Self::State,
124 ) -> Result<(), ractor::ActorProcessingErr> {
125 let dispatch = message.0;
126
127 let send_mode = dispatch.send_mode();
129 let message_type = dispatch.message_type_name();
130
131 state.ctx.send_mode = Some(send_mode);
132 state.ctx.headers = Headers::new();
133
134 let runtime_headers = RuntimeHeaders::new();
136 let mut headers = Headers::new();
137 let mut total_delay = Duration::ZERO;
138 let mut rejection: Option<(String, Disposition)> = None;
139
140 {
141 let ictx = InboundContext {
142 actor_id: state.ctx.actor_id.clone(),
143 actor_name: &state.ctx.actor_name,
144 message_type,
145 send_mode,
146 remote: false,
147 origin_node: None,
148 };
149
150 for interceptor in &state.interceptors {
151 match interceptor.on_receive(
152 &ictx,
153 &runtime_headers,
154 &mut headers,
155 dispatch.message_any(),
156 ) {
157 Disposition::Continue => {}
158 Disposition::Delay(d) => {
159 total_delay += d;
160 }
161 disp @ (Disposition::Drop | Disposition::Reject(_) | Disposition::Retry(_)) => {
162 rejection = Some((interceptor.name().to_string(), disp));
163 break;
164 }
165 }
166 }
167 }
168
169 if let Some((interceptor_name, disposition)) = rejection {
171 if matches!(disposition, Disposition::Drop) {
172 if let Some(ref handler) = *state.dead_letter_handler {
173 let event = DeadLetterEvent {
174 target_id: state.ctx.actor_id.clone(),
175 target_name: Some(state.ctx.actor_name.clone()),
176 message_type,
177 send_mode,
178 reason: DeadLetterReason::DroppedByInterceptor {
179 interceptor: interceptor_name.clone(),
180 },
181 message: None,
182 };
183 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
184 handler.on_dead_letter(event);
185 }));
186 }
187 }
188 dispatch.reject(disposition, &interceptor_name);
189 return Ok(());
190 }
191
192 if !total_delay.is_zero() {
193 tokio::time::sleep(total_delay).await;
194 }
195
196 state.ctx.headers = headers;
198
199 let cancel_token = dispatch.cancel_token();
201 state.ctx.set_cancellation_token(cancel_token.clone());
202
203 if let Some(ref token) = cancel_token {
205 if token.is_cancelled() {
206 dispatch.cancel();
207 state.ctx.set_cancellation_token(None);
208 return Ok(());
209 }
210 }
211
212 let result = if let Some(ref token) = cancel_token {
214 let dispatch_fut =
215 std::panic::AssertUnwindSafe(dispatch.dispatch(&mut state.actor, &mut state.ctx))
216 .catch_unwind();
217 tokio::select! {
218 biased;
219 r = dispatch_fut => r,
220 _ = token.cancelled() => {
221 state.ctx.set_cancellation_token(None);
225 return Ok(());
226 }
227 }
228 } else {
229 std::panic::AssertUnwindSafe(dispatch.dispatch(&mut state.actor, &mut state.ctx))
230 .catch_unwind()
231 .await
232 };
233
234 state.ctx.set_cancellation_token(None);
235
236 let ictx = InboundContext {
238 actor_id: state.ctx.actor_id.clone(),
239 actor_name: &state.ctx.actor_name,
240 message_type,
241 send_mode,
242 remote: false,
243 origin_node: None,
244 };
245
246 match result {
247 Ok(dispatch_result) => {
248 let outcome = match (&dispatch_result.reply, send_mode) {
249 (Some(reply), SendMode::Ask) => Outcome::AskSuccess {
250 reply: reply.as_ref(),
251 },
252 _ => Outcome::TellSuccess,
253 };
254
255 for interceptor in &state.interceptors {
256 interceptor.on_complete(&ictx, &runtime_headers, &state.ctx.headers, &outcome);
257 }
258
259 dispatch_result.send_reply();
261 }
262 Err(_panic) => {
263 let error = ActorError::internal("handler panicked");
264 let action = state.actor.on_error(&error);
265
266 let outcome = Outcome::HandlerError { error };
267 for interceptor in &state.interceptors {
268 interceptor.on_complete(&ictx, &runtime_headers, &state.ctx.headers, &outcome);
269 }
270
271 match action {
272 ErrorAction::Resume => {
273 }
275 ErrorAction::Stop | ErrorAction::Escalate => {
276 state.stop_reason = Some("handler panicked".into());
277 myself.stop(None);
278 }
279 ErrorAction::Restart => {
280 tracing::warn!("Restart not fully implemented, treating as Resume");
281 }
282 }
283 }
284 }
285
286 Ok(())
287 }
288
289 async fn post_stop(
290 &self,
291 _myself: ractor::ActorRef<Self::Msg>,
292 state: &mut Self::State,
293 ) -> Result<(), ractor::ActorProcessingErr> {
294 state.ctx.send_mode = None;
296 state.ctx.headers = Headers::new();
297 state.ctx.set_cancellation_token(None);
298
299 let on_stop_panicked =
302 std::panic::AssertUnwindSafe(state.actor.on_stop())
303 .catch_unwind()
304 .await
305 .is_err();
306 if on_stop_panicked && state.stop_reason.is_none() {
307 state.stop_reason = Some("actor panicked in on_stop".to_string());
308 }
309
310 let actor_id = state.ctx.actor_id.clone();
312 let actor_name = state.ctx.actor_name.clone();
313 let entries = {
314 let mut watchers = state.watchers.lock().unwrap();
315 let target_entries = watchers.remove(&actor_id).unwrap_or_default();
317 for entries in watchers.values_mut() {
319 entries.retain(|e| e.watcher_id != actor_id);
320 }
321 watchers.retain(|_, v| !v.is_empty());
322 target_entries
323 };
324 if !entries.is_empty() {
325 let notification = ChildTerminated {
326 child_id: actor_id,
327 child_name: actor_name,
328 reason: state.stop_reason.clone(),
329 };
330 for entry in &entries {
331 (entry.notify)(notification.clone());
332 }
333 }
334
335 if let Some(tx) = state.stop_notifier.take() {
338 let result = if on_stop_panicked {
339 Err("actor panicked in on_stop".to_string())
340 } else {
341 Ok(())
342 };
343 let _ = tx.send(result);
344 }
345
346 Ok(())
347 }
348}
349
350pub struct RactorActorRef<A: Actor> {
363 id: ActorId,
364 name: String,
365 inner: ractor::ActorRef<DactorMsg<A>>,
366 bounded_tx: Option<BoundedMailboxSender<DactorMsg<A>>>,
367 outbound_interceptors: Arc<Vec<Box<dyn OutboundInterceptor>>>,
368 drop_observer: Option<Arc<dyn DropObserver>>,
369 dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
370}
371
372use dactor::runtime_support::BoundedMailboxSender;
373
374impl<A: Actor> Clone for RactorActorRef<A> {
375 fn clone(&self) -> Self {
376 Self {
377 id: self.id.clone(),
378 name: self.name.clone(),
379 inner: self.inner.clone(),
380 bounded_tx: self.bounded_tx.clone(),
381 outbound_interceptors: self.outbound_interceptors.clone(),
382 drop_observer: self.drop_observer.clone(),
383 dead_letter_handler: self.dead_letter_handler.clone(),
384 }
385 }
386}
387
388impl<A: Actor> std::fmt::Debug for RactorActorRef<A> {
389 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
390 write!(f, "RactorActorRef({}, {:?})", self.name, self.id)
391 }
392}
393
394impl<A: Actor + 'static> RactorActorRef<A> {
395 fn outbound_pipeline(&self) -> OutboundPipeline {
396 OutboundPipeline {
397 interceptors: self.outbound_interceptors.clone(),
398 drop_observer: self.drop_observer.clone(),
399 target_id: self.id.clone(),
400 target_name: self.name.clone(),
401 }
402 }
403
404 fn notify_dead_letter(
405 &self,
406 message_type: &'static str,
407 send_mode: SendMode,
408 reason: DeadLetterReason,
409 ) {
410 if let Some(ref handler) = *self.dead_letter_handler {
411 let event = DeadLetterEvent {
412 target_id: self.id.clone(),
413 target_name: Some(self.name.clone()),
414 message_type,
415 send_mode,
416 reason,
417 message: None,
418 };
419 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
420 handler.on_dead_letter(event);
421 }));
422 }
423 }
424
425 fn send_dispatch(&self, dispatch: Box<dyn Dispatch<A>>) -> Result<(), ActorSendError> {
428 if let Some(ref btx) = self.bounded_tx {
429 btx.try_send(DactorMsg(dispatch))
430 } else {
431 self.inner
432 .cast(DactorMsg(dispatch))
433 .map_err(|e| ActorSendError(e.to_string()))
434 }
435 }
436}
437
438impl<A: Actor + 'static> ActorRef<A> for RactorActorRef<A> {
439 fn id(&self) -> ActorId {
440 self.id.clone()
441 }
442
443 fn name(&self) -> String {
444 self.name.clone()
445 }
446
447 fn is_alive(&self) -> bool {
448 let inner_alive = matches!(
449 self.inner.get_status(),
450 ractor::ActorStatus::Running
451 | ractor::ActorStatus::Starting
452 | ractor::ActorStatus::Upgrading
453 );
454 if let Some(ref btx) = self.bounded_tx {
455 !btx.is_closed() && inner_alive
456 } else {
457 inner_alive
458 }
459 }
460
461 fn pending_messages(&self) -> usize {
462 if let Some(ref btx) = self.bounded_tx {
463 btx.pending()
464 } else {
465 0
466 }
467 }
468
469 fn stop(&self) {
470 self.inner.stop(None);
471 }
472
473 fn tell<M>(&self, msg: M) -> Result<(), ActorSendError>
474 where
475 A: Handler<M>,
476 M: Message<Reply = ()>,
477 {
478 let pipeline = self.outbound_pipeline();
479 let result = pipeline.run_on_send(SendMode::Tell, &msg);
480 match result.disposition {
481 Disposition::Continue => {}
482 Disposition::Delay(_) => {} Disposition::Drop | Disposition::Reject(_) | Disposition::Retry(_) => return Ok(()),
484 }
485
486 let dispatch: Box<dyn Dispatch<A>> = Box::new(TypedDispatch { msg });
487 self.send_dispatch(dispatch).map_err(|e| {
488 let reason = if e.0.contains("full") {
489 DeadLetterReason::MailboxFull
490 } else {
491 DeadLetterReason::ActorStopped
492 };
493 self.notify_dead_letter(std::any::type_name::<M>(), SendMode::Tell, reason);
494 e
495 })
496 }
497
498 fn ask<M>(
499 &self,
500 msg: M,
501 cancel: Option<CancellationToken>,
502 ) -> Result<AskReply<M::Reply>, ActorSendError>
503 where
504 A: Handler<M>,
505 M: Message,
506 {
507 let pipeline = self.outbound_pipeline();
508 let result = pipeline.run_on_send(SendMode::Ask, &msg);
509 match result.disposition {
510 Disposition::Continue => {}
511 Disposition::Delay(_) => {} Disposition::Drop => {
513 let (tx, rx) = tokio::sync::oneshot::channel();
514 let _ = tx.send(Err(RuntimeError::ActorNotFound(
515 "message dropped by outbound interceptor".into(),
516 )));
517 return Ok(AskReply::new(rx));
518 }
519 Disposition::Reject(reason) => {
520 let (tx, rx) = tokio::sync::oneshot::channel();
521 let _ = tx.send(Err(RuntimeError::Rejected {
522 interceptor: result.interceptor_name.to_string(),
523 reason,
524 }));
525 return Ok(AskReply::new(rx));
526 }
527 Disposition::Retry(retry_after) => {
528 let (tx, rx) = tokio::sync::oneshot::channel();
529 let _ = tx.send(Err(RuntimeError::RetryAfter {
530 interceptor: result.interceptor_name.to_string(),
531 retry_after,
532 }));
533 return Ok(AskReply::new(rx));
534 }
535 }
536
537 let (tx, rx) = tokio::sync::oneshot::channel();
538 let dispatch: Box<dyn Dispatch<A>> = Box::new(AskDispatch {
539 msg,
540 reply_tx: tx,
541 cancel,
542 });
543 self.send_dispatch(dispatch).map_err(|e| {
544 let reason = if e.0.contains("full") {
545 DeadLetterReason::MailboxFull
546 } else {
547 DeadLetterReason::ActorStopped
548 };
549 self.notify_dead_letter(std::any::type_name::<M>(), SendMode::Ask, reason);
550 e
551 })?;
552 Ok(AskReply::new(rx))
553 }
554
555 fn expand<M, OutputItem>(
556 &self,
557 msg: M,
558 buffer: usize,
559 batch_config: Option<BatchConfig>,
560 cancel: Option<CancellationToken>,
561 ) -> Result<BoxStream<OutputItem>, ActorSendError>
562 where
563 A: ExpandHandler<M, OutputItem>,
564 M: Send + 'static,
565 OutputItem: Send + 'static,
566 {
567 let pipeline = self.outbound_pipeline();
568 let result = pipeline.run_on_send(SendMode::Expand, &msg);
569 match result.disposition {
570 Disposition::Continue => {}
571 Disposition::Delay(_) => {}
572 Disposition::Drop => {
573 return Err(ActorSendError(
574 "stream dropped by outbound interceptor".into(),
575 ));
576 }
577 Disposition::Reject(reason) => {
578 return Err(ActorSendError(format!("stream rejected: {}", reason)));
579 }
580 Disposition::Retry(_) => {
581 return Err(ActorSendError(
582 "stream retry requested by interceptor".into(),
583 ));
584 }
585 }
586
587 let buffer = buffer.max(1);
588 let (tx, mut rx) = tokio::sync::mpsc::channel(buffer);
589 let sender = StreamSender::new(tx);
590 let dispatch: Box<dyn Dispatch<A>> = Box::new(ExpandDispatch {
591 msg,
592 sender,
593 cancel,
594 });
595 self.send_dispatch(dispatch)?;
596
597 match batch_config {
598 Some(batch_config) => {
599 let (batch_tx, batch_rx) = tokio::sync::mpsc::channel::<Vec<OutputItem>>(buffer);
600 let reader = BatchReader::new(batch_rx);
601 let batch_delay = batch_config.max_delay;
602 tokio::spawn(async move {
603 let mut writer = BatchWriter::new(batch_tx, batch_config);
604 loop {
605 if writer.buffered_count() > 0 {
606 let deadline = tokio::time::Instant::now() + batch_delay;
607 tokio::select! {
608 biased;
609 item = rx.recv() => match item {
610 Some(item) => {
611 if writer.push(item).await.is_err() { break; }
612 }
613 None => break,
614 },
615 _ = tokio::time::sleep_until(deadline) => {
616 if writer.check_deadline().await.is_err() { break; }
617 }
618 }
619 } else {
620 match rx.recv().await {
621 Some(item) => {
622 if writer.push(item).await.is_err() {
623 break;
624 }
625 }
626 None => break,
627 }
628 }
629 }
630 let _ = writer.flush().await;
631 });
632 Ok(wrap_batched_stream_with_interception(
633 reader,
634 buffer,
635 pipeline,
636 std::any::type_name::<M>(),
637 SendMode::Expand,
638 ))
639 }
640 None => Ok(wrap_stream_with_interception(
641 rx,
642 buffer,
643 pipeline,
644 std::any::type_name::<M>(),
645 SendMode::Expand,
646 )),
647 }
648 }
649
650 fn reduce<InputItem, Reply>(
651 &self,
652 input: BoxStream<InputItem>,
653 buffer: usize,
654 batch_config: Option<BatchConfig>,
655 cancel: Option<CancellationToken>,
656 ) -> Result<AskReply<Reply>, ActorSendError>
657 where
658 A: ReduceHandler<InputItem, Reply>,
659 InputItem: Send + 'static,
660 Reply: Send + 'static,
661 {
662 let buffer = buffer.max(1);
663 let (item_tx, item_rx) = tokio::sync::mpsc::channel(buffer);
664 let receiver = StreamReceiver::new(item_rx);
665 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
666 let dispatch: Box<dyn Dispatch<A>> = Box::new(ReduceDispatch {
667 receiver,
668 reply_tx,
669 cancel: cancel.clone(),
670 });
671 self.send_dispatch(dispatch)?;
672
673 let pipeline = self.outbound_pipeline();
674 match batch_config {
675 Some(batch_config) => {
676 spawn_reduce_batched_drain(
677 input,
678 item_tx,
679 buffer,
680 batch_config,
681 cancel,
682 pipeline,
683 std::any::type_name::<InputItem>(),
684 );
685 }
686 None => {
687 spawn_reduce_drain(
688 input,
689 item_tx,
690 cancel,
691 pipeline,
692 std::any::type_name::<InputItem>(),
693 );
694 }
695 }
696
697 Ok(AskReply::new(reply_rx))
698 }
699
700 fn transform<InputItem, OutputItem>(
701 &self,
702 input: BoxStream<InputItem>,
703 buffer: usize,
704 batch_config: Option<BatchConfig>,
705 cancel: Option<CancellationToken>,
706 ) -> Result<BoxStream<OutputItem>, ActorSendError>
707 where
708 A: TransformHandler<InputItem, OutputItem>,
709 InputItem: Send + 'static,
710 OutputItem: Send + 'static,
711 {
712 let buffer = buffer.max(1);
713 let (item_tx, item_rx) = tokio::sync::mpsc::channel(buffer);
714 let (output_tx, mut output_rx) = tokio::sync::mpsc::channel(buffer);
715 let receiver = StreamReceiver::new(item_rx);
716 let sender = StreamSender::new(output_tx);
717 let dispatch: Box<dyn Dispatch<A>> = Box::new(TransformDispatch::new(
718 receiver,
719 sender,
720 cancel.clone(),
721 ));
722 self.send_dispatch(dispatch)?;
723
724 let pipeline = self.outbound_pipeline();
725 spawn_transform_drain(
726 input,
727 item_tx,
728 cancel,
729 pipeline.clone(),
730 std::any::type_name::<InputItem>(),
731 );
732
733 match batch_config {
734 Some(batch_config) => {
735 let (batch_tx, batch_rx) =
736 tokio::sync::mpsc::channel::<Vec<OutputItem>>(buffer);
737 let reader = BatchReader::new(batch_rx);
738 let batch_delay = batch_config.max_delay;
739 tokio::spawn(async move {
740 let mut writer = BatchWriter::new(batch_tx, batch_config);
741 loop {
742 if writer.buffered_count() > 0 {
743 let deadline = tokio::time::Instant::now() + batch_delay;
744 tokio::select! {
745 biased;
746 item = output_rx.recv() => match item {
747 Some(item) => {
748 if writer.push(item).await.is_err() { break; }
749 }
750 None => break,
751 },
752 _ = tokio::time::sleep_until(deadline) => {
753 if writer.check_deadline().await.is_err() { break; }
754 }
755 }
756 } else {
757 match output_rx.recv().await {
758 Some(item) => {
759 if writer.push(item).await.is_err() {
760 break;
761 }
762 }
763 None => break,
764 }
765 }
766 }
767 let _ = writer.flush().await;
768 });
769 Ok(wrap_batched_stream_with_interception(
770 reader,
771 buffer,
772 pipeline,
773 std::any::type_name::<OutputItem>(),
774 SendMode::Transform,
775 ))
776 }
777 None => Ok(wrap_stream_with_interception(
778 output_rx,
779 buffer,
780 pipeline,
781 std::any::type_name::<OutputItem>(),
782 SendMode::Transform,
783 )),
784 }
785 }
786}
787
788pub struct SpawnOptions {
796 pub interceptors: Vec<Box<dyn InboundInterceptor>>,
798 pub mailbox: MailboxConfig,
805}
806
807impl Default for SpawnOptions {
808 fn default() -> Self {
809 Self {
810 interceptors: Vec::new(),
811 mailbox: MailboxConfig::Unbounded,
812 }
813 }
814}
815
816pub struct RactorRuntime {
836 node_id: NodeId,
837 next_local: Arc<AtomicU64>,
838 cluster_events: RactorClusterEvents,
839 outbound_interceptors: Arc<Vec<Box<dyn OutboundInterceptor>>>,
840 drop_observer: Option<Arc<dyn DropObserver>>,
841 dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
842 watchers: WatcherMap,
843 spawn_manager: SpawnManager,
845 watch_manager: WatchManager,
846 cancel_manager: CancelManager,
847 node_directory: NodeDirectory,
848 system_actors: Option<RactorSystemActorRefs>,
850 #[allow(clippy::type_complexity)]
852 stop_receivers: Arc<Mutex<HashMap<ActorId, tokio::sync::oneshot::Receiver<Result<(), String>>>>>,
853 app_version: Option<String>,
855}
856
857pub struct RactorSystemActorRefs {
859 pub spawn_manager: SystemActorHandle<crate::system_actors::SpawnManagerMsg>,
860 pub watch_manager: ractor::ActorRef<crate::system_actors::WatchManagerMsg>,
861 pub cancel_manager: ractor::ActorRef<crate::system_actors::CancelManagerMsg>,
862 pub node_directory: ractor::ActorRef<crate::system_actors::NodeDirectoryMsg>,
863}
864
865pub enum SystemActorHandle<M: ractor::Message> {
867 Single(ractor::ActorRef<M>),
869 Pool(SystemActorPool<M>),
871}
872
873impl<M: ractor::Message> SystemActorHandle<M> {
874 pub fn cast(&self, msg: M) -> Result<(), ractor::MessagingErr<M>> {
876 match self {
877 SystemActorHandle::Single(r) => r.cast(msg),
878 SystemActorHandle::Pool(pool) => pool.cast(msg),
879 }
880 }
881}
882
883pub struct SystemActorPool<M: ractor::Message> {
885 workers: Vec<ractor::ActorRef<M>>,
886 counter: std::sync::atomic::AtomicU64,
887}
888
889impl<M: ractor::Message> SystemActorPool<M> {
890 pub fn new(workers: Vec<ractor::ActorRef<M>>) -> Self {
892 assert!(!workers.is_empty(), "pool must have at least one worker");
893 Self {
894 workers,
895 counter: std::sync::atomic::AtomicU64::new(0),
896 }
897 }
898
899 pub fn cast(&self, msg: M) -> Result<(), ractor::MessagingErr<M>> {
901 let idx = self.counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
902 as usize % self.workers.len();
903 self.workers[idx].cast(msg)
904 }
905
906 pub fn len(&self) -> usize {
908 self.workers.len()
909 }
910
911 pub fn workers(&self) -> &[ractor::ActorRef<M>] {
913 &self.workers
914 }
915}
916
917impl RactorRuntime {
918 pub fn new() -> Self {
923 Self::create(NodeId("ractor-node".into()))
924 }
925
926 pub fn with_node_id(node_id: NodeId) -> Self {
928 Self::create(node_id)
929 }
930
931 fn create(node_id: NodeId) -> Self {
932 Self {
933 node_id,
934 next_local: Arc::new(AtomicU64::new(1)),
935 cluster_events: RactorClusterEvents::new(),
936 outbound_interceptors: Arc::new(Vec::new()),
937 drop_observer: None,
938 dead_letter_handler: Arc::new(None),
939 watchers: Arc::new(Mutex::new(HashMap::new())),
940 spawn_manager: SpawnManager::new(TypeRegistry::new()),
941 watch_manager: WatchManager::new(),
942 cancel_manager: CancelManager::new(),
943 node_directory: NodeDirectory::new(),
944 system_actors: None,
945 stop_receivers: Arc::new(Mutex::new(HashMap::new())),
946 app_version: None,
947 }
948 }
949
950 pub const ADAPTER_NAME: &'static str = "ractor";
952
953 pub fn with_app_version(mut self, version: impl Into<String>) -> Self {
959 self.app_version = Some(version.into());
960 self
961 }
962
963 pub fn app_version(&self) -> Option<&str> {
965 self.app_version.as_deref()
966 }
967
968 pub fn handshake_request(&self) -> dactor::HandshakeRequest {
971 dactor::HandshakeRequest::from_runtime(
972 self.node_id.clone(),
973 self.app_version.clone(),
974 Self::ADAPTER_NAME,
975 )
976 }
977
978 pub async fn start_system_actors(&mut self) {
989 self.start_system_actors_with_config(dactor::SystemActorConfig::default()).await;
990 }
991
992 pub async fn start_system_actors_with_config(
1016 &mut self,
1017 config: dactor::SystemActorConfig,
1018 ) {
1019 use crate::system_actors::*;
1020
1021 let pool_size = config.spawn_manager_pool_size.unwrap_or(1).max(1);
1023 let mut spawn_refs = Vec::with_capacity(pool_size);
1024
1025 for _ in 0..pool_size {
1026 let (spawn_ref, _) = ractor::Actor::spawn(
1027 None, SpawnManagerActor,
1028 (self.node_id.clone(), TypeRegistry::new(), self.next_local.clone()),
1029 ).await.expect("failed to spawn SpawnManagerActor");
1030 spawn_refs.push(spawn_ref);
1031 }
1032
1033 let spawn_manager = if spawn_refs.len() == 1 {
1034 SystemActorHandle::Single(spawn_refs.pop().unwrap())
1035 } else {
1036 SystemActorHandle::Pool(SystemActorPool::new(spawn_refs))
1037 };
1038
1039 let (watch_ref, _) = ractor::Actor::spawn(
1041 None, WatchManagerActor, (),
1042 ).await.expect("failed to spawn WatchManagerActor");
1043
1044 let (cancel_ref, _) = ractor::Actor::spawn(
1045 None, CancelManagerActor, (),
1046 ).await.expect("failed to spawn CancelManagerActor");
1047
1048 let (node_dir_ref, _) = ractor::Actor::spawn(
1049 None, NodeDirectoryActor, (),
1050 ).await.expect("failed to spawn NodeDirectoryActor");
1051
1052 self.system_actors = Some(RactorSystemActorRefs {
1053 spawn_manager,
1054 watch_manager: watch_ref,
1055 cancel_manager: cancel_ref,
1056 node_directory: node_dir_ref,
1057 });
1058 }
1059
1060 pub fn node_id(&self) -> &NodeId {
1062 &self.node_id
1063 }
1064
1065 pub fn system_actor_refs(&self) -> Option<&RactorSystemActorRefs> {
1069 self.system_actors.as_ref()
1070 }
1071
1072 pub fn add_outbound_interceptor(&mut self, interceptor: Box<dyn OutboundInterceptor>) {
1077 Arc::get_mut(&mut self.outbound_interceptors)
1078 .expect("cannot add interceptors after actors are spawned")
1079 .push(interceptor);
1080 }
1081
1082 pub fn set_drop_observer(&mut self, observer: Arc<dyn DropObserver>) {
1085 self.drop_observer = Some(observer);
1086 }
1087
1088 pub fn set_dead_letter_handler(&mut self, handler: Arc<dyn DeadLetterHandler>) {
1091 self.dead_letter_handler = Arc::new(Some(handler));
1092 }
1093
1094 pub fn cluster_events_handle(&self) -> &RactorClusterEvents {
1096 &self.cluster_events
1097 }
1098
1099 pub fn cluster_events(&self) -> &RactorClusterEvents {
1101 &self.cluster_events
1102 }
1103
1104 pub async fn spawn<A>(&self, name: &str, args: A::Args) -> Result<RactorActorRef<A>, dactor::errors::RuntimeError>
1106 where
1107 A: Actor<Deps = ()> + 'static,
1108 {
1109 self.spawn_internal::<A>(name, args, (), Vec::new(), MailboxConfig::Unbounded).await
1110 }
1111
1112 pub async fn spawn_with_deps<A>(&self, name: &str, args: A::Args, deps: A::Deps) -> Result<RactorActorRef<A>, dactor::errors::RuntimeError>
1114 where
1115 A: Actor + 'static,
1116 {
1117 self.spawn_internal::<A>(name, args, deps, Vec::new(), MailboxConfig::Unbounded).await
1118 }
1119
1120 pub async fn spawn_with_options<A>(
1122 &self,
1123 name: &str,
1124 args: A::Args,
1125 options: SpawnOptions,
1126 ) -> Result<RactorActorRef<A>, dactor::errors::RuntimeError>
1127 where
1128 A: Actor<Deps = ()> + 'static,
1129 {
1130 self.spawn_internal::<A>(name, args, (), options.interceptors, options.mailbox).await
1131 }
1132
1133 async fn spawn_internal<A>(
1134 &self,
1135 name: &str,
1136 args: A::Args,
1137 deps: A::Deps,
1138 interceptors: Vec<Box<dyn InboundInterceptor>>,
1139 mailbox: MailboxConfig,
1140 ) -> Result<RactorActorRef<A>, dactor::errors::RuntimeError>
1141 where
1142 A: Actor + 'static,
1143 {
1144 let local = self.next_local.fetch_add(1, Ordering::SeqCst);
1145 let actor_id = ActorId {
1146 node: self.node_id.clone(),
1147 local,
1148 };
1149 let actor_name = name.to_string();
1150
1151 let (stop_tx, stop_rx) = tokio::sync::oneshot::channel();
1152
1153 let wrapper = RactorDactorActor::<A> {
1154 _phantom: PhantomData,
1155 };
1156 let spawn_args = RactorSpawnArgs {
1157 args,
1158 deps,
1159 actor_id: actor_id.clone(),
1160 actor_name: actor_name.clone(),
1161 interceptors,
1162 watchers: self.watchers.clone(),
1163 dead_letter_handler: self.dead_letter_handler.clone(),
1164 stop_notifier: Some(stop_tx),
1165 };
1166
1167 let (actor_ref, _join_handle) = ractor::Actor::spawn(Some(name.to_string()), wrapper, spawn_args)
1168 .await
1169 .map_err(|e| dactor::errors::RuntimeError::SpawnFailed(e.to_string()))?;
1170
1171 let bounded_tx = match mailbox {
1173 MailboxConfig::Bounded { capacity, overflow } => {
1174 let (btx, mut brx) = tokio::sync::mpsc::channel::<DactorMsg<A>>(capacity);
1175 let fwd_ref = actor_ref.clone();
1176 tokio::spawn(async move {
1177 while let Some(msg) = brx.recv().await {
1178 if fwd_ref.cast(msg).is_err() {
1179 break;
1180 }
1181 }
1182 });
1183 Some(BoundedMailboxSender::new(btx, overflow))
1184 }
1185 MailboxConfig::Unbounded => None,
1186 };
1187
1188 self.stop_receivers.lock().unwrap().insert(actor_id.clone(), stop_rx);
1190
1191 Ok(RactorActorRef {
1192 id: actor_id,
1193 name: actor_name,
1194 inner: actor_ref,
1195 bounded_tx,
1196 outbound_interceptors: self.outbound_interceptors.clone(),
1197 drop_observer: self.drop_observer.clone(),
1198 dead_letter_handler: self.dead_letter_handler.clone(),
1199 })
1200 }
1201
1202 pub fn watch<W>(&self, watcher: &RactorActorRef<W>, target_id: ActorId)
1207 where
1208 W: Actor + Handler<ChildTerminated> + 'static,
1209 {
1210 let watcher_id = watcher.id();
1211 let watcher_inner = watcher.inner.clone();
1212
1213 let entry = WatchEntry {
1214 watcher_id,
1215 notify: Box::new(move |msg: ChildTerminated| {
1216 let dispatch: Box<dyn Dispatch<W>> = Box::new(TypedDispatch { msg });
1217 if watcher_inner.cast(DactorMsg(dispatch)).is_err() {
1218 tracing::debug!("watch notification dropped — watcher may have stopped");
1219 }
1220 }),
1221 };
1222
1223 let mut watchers = self.watchers.lock().unwrap();
1224 watchers.entry(target_id).or_default().push(entry);
1225 }
1226
1227 pub fn unwatch(&self, watcher_id: &ActorId, target_id: &ActorId) {
1229 let mut watchers = self.watchers.lock().unwrap();
1230 if let Some(entries) = watchers.get_mut(target_id) {
1231 entries.retain(|e| &e.watcher_id != watcher_id);
1232 if entries.is_empty() {
1233 watchers.remove(target_id);
1234 }
1235 }
1236 }
1237
1238 pub fn spawn_manager(&self) -> &SpawnManager {
1244 &self.spawn_manager
1245 }
1246
1247 pub fn spawn_manager_mut(&mut self) -> &mut SpawnManager {
1249 &mut self.spawn_manager
1250 }
1251
1252 pub fn register_factory(
1257 &mut self,
1258 type_name: impl Into<String>,
1259 factory: impl Fn(&[u8]) -> Result<Box<dyn std::any::Any + Send>, dactor::remote::SerializationError>
1260 + Send
1261 + Sync
1262 + 'static,
1263 ) {
1264 let type_name = type_name.into();
1265 let factory = Arc::new(factory);
1266
1267 self.spawn_manager
1269 .type_registry_mut()
1270 .register_factory(type_name.clone(), {
1271 let f = factory.clone();
1272 move |bytes: &[u8]| f(bytes)
1273 });
1274
1275 if let Some(ref actors) = self.system_actors {
1277 match &actors.spawn_manager {
1278 SystemActorHandle::Single(r) => {
1279 let (tx, _rx) = tokio::sync::oneshot::channel();
1280 let f = factory;
1281 let _ = r.cast(
1282 crate::system_actors::SpawnManagerMsg::RegisterFactory {
1283 type_name,
1284 factory: Box::new(move |bytes: &[u8]| f(bytes)),
1285 reply: tx,
1286 },
1287 );
1288 }
1289 SystemActorHandle::Pool(pool) => {
1290 for worker in pool.workers() {
1292 let f = factory.clone();
1293 let (tx, _rx) = tokio::sync::oneshot::channel();
1294 let _ = worker.cast(
1295 crate::system_actors::SpawnManagerMsg::RegisterFactory {
1296 type_name: type_name.clone(),
1297 factory: Box::new(move |bytes: &[u8]| f(bytes)),
1298 reply: tx,
1299 },
1300 );
1301 }
1302 }
1303 }
1304 }
1305 }
1306
1307 pub fn handle_spawn_request(
1324 &mut self,
1325 request: &SpawnRequest,
1326 ) -> Result<(ActorId, Box<dyn std::any::Any + Send>), SpawnResponse> {
1327 match self.spawn_manager.create_actor(request) {
1328 Ok(actor) => {
1329 let local = self.next_local.fetch_add(1, Ordering::SeqCst);
1330 let actor_id = ActorId {
1331 node: self.node_id.clone(),
1332 local,
1333 };
1334 self.spawn_manager.record_spawn(actor_id.clone());
1335 Ok((actor_id, actor))
1336 }
1337 Err(e) => Err(SpawnResponse::Failure {
1338 request_id: request.request_id.clone(),
1339 error: e.to_string(),
1340 }),
1341 }
1342 }
1343
1344 pub fn watch_manager(&self) -> &WatchManager {
1350 &self.watch_manager
1351 }
1352
1353 pub fn watch_manager_mut(&mut self) -> &mut WatchManager {
1355 &mut self.watch_manager
1356 }
1357
1358 pub fn remote_watch(&mut self, target: ActorId, watcher: ActorId) {
1361 self.watch_manager.watch(target, watcher);
1362 }
1363
1364 pub fn remote_unwatch(&mut self, target: &ActorId, watcher: &ActorId) {
1366 self.watch_manager.unwatch(target, watcher);
1367 }
1368
1369 pub fn notify_terminated(&mut self, terminated: &ActorId) -> Vec<WatchNotification> {
1375 self.watch_manager.on_terminated(terminated)
1376 }
1377
1378 pub fn cancel_manager(&self) -> &CancelManager {
1384 &self.cancel_manager
1385 }
1386
1387 pub fn cancel_manager_mut(&mut self) -> &mut CancelManager {
1389 &mut self.cancel_manager
1390 }
1391
1392 pub fn register_cancel(&mut self, request_id: String, token: CancellationToken) {
1394 self.cancel_manager.register(request_id, token);
1395 }
1396
1397 pub fn cancel_request(&mut self, request_id: &str) -> CancelResponse {
1399 self.cancel_manager.cancel(request_id)
1400 }
1401
1402 pub fn complete_request(&mut self, request_id: &str) {
1407 self.cancel_manager.remove(request_id);
1408 }
1409
1410 pub fn node_directory(&self) -> &NodeDirectory {
1416 &self.node_directory
1417 }
1418
1419 pub fn node_directory_mut(&mut self) -> &mut NodeDirectory {
1421 &mut self.node_directory
1422 }
1423
1424 pub fn connect_peer(&mut self, peer_id: NodeId, address: Option<String>) {
1437 let was_connected = self.node_directory.is_connected(&peer_id);
1438 if let Some(existing) = self.node_directory.get_peer(&peer_id) {
1439 let resolved_address = address.or_else(|| existing.address.clone());
1441 self.node_directory.remove_peer(&peer_id);
1442 self.node_directory.add_peer(peer_id.clone(), resolved_address);
1443 } else {
1444 self.node_directory.add_peer(peer_id.clone(), address);
1445 }
1446 self.node_directory.set_status(&peer_id, PeerStatus::Connected);
1447 if !was_connected {
1448 self.cluster_events.emit(dactor::ClusterEvent::NodeJoined(peer_id));
1449 }
1450 }
1451
1452 pub fn disconnect_peer(&mut self, peer_id: &NodeId) {
1456 let was_connected = self.node_directory.is_connected(peer_id);
1457 self.node_directory.set_status(peer_id, PeerStatus::Disconnected);
1458 if was_connected {
1459 self.cluster_events.emit(dactor::ClusterEvent::NodeLeft(peer_id.clone()));
1460 }
1461 }
1462
1463 pub fn is_peer_connected(&self, peer_id: &NodeId) -> bool {
1465 self.node_directory.is_connected(peer_id)
1466 }
1467
1468 pub async fn await_stop(&self, actor_id: &ActorId) -> Result<(), String> {
1481 let rx = {
1482 let mut receivers = self.stop_receivers.lock().unwrap();
1483 receivers.remove(actor_id)
1484 };
1485 match rx {
1486 Some(rx) => rx
1487 .await
1488 .map_err(|_| "stop notifier dropped".to_string())
1489 .and_then(|r| r),
1490 None => Ok(()),
1491 }
1492 }
1493
1494 pub async fn await_all(&self) -> Result<(), String> {
1499 let receivers: Vec<_> = {
1500 let mut map = self.stop_receivers.lock().unwrap();
1501 map.drain().collect()
1502 };
1503 let mut first_error = None;
1504 for (_, rx) in receivers {
1505 let result = rx.await.map_err(|e| format!("stop notifier dropped: {e}")).and_then(|r| r);
1506 if let Err(e) = result {
1507 if first_error.is_none() {
1508 first_error = Some(e);
1509 }
1510 }
1511 }
1512 match first_error {
1513 Some(e) => Err(e),
1514 None => Ok(()),
1515 }
1516 }
1517
1518 pub fn cleanup_finished(&self) {
1523 let mut receivers = self.stop_receivers.lock().unwrap();
1524 receivers.retain(|_, rx| {
1525 matches!(rx.try_recv(), Err(tokio::sync::oneshot::error::TryRecvError::Empty))
1526 });
1527 }
1528
1529 pub fn active_handle_count(&self) -> usize {
1535 self.stop_receivers.lock().unwrap().len()
1536 }
1537}
1538
1539impl Default for RactorRuntime {
1540 fn default() -> Self {
1541 Self::new()
1542 }
1543}
1544
1545#[async_trait::async_trait]
1556impl dactor::system_router::SystemMessageRouter for RactorRuntime {
1557 async fn route_system_envelope(
1558 &self,
1559 envelope: dactor::remote::WireEnvelope,
1560 ) -> Result<dactor::system_router::RoutingOutcome, dactor::system_router::RoutingError> {
1561 use dactor::system_actors::*;
1562 use dactor::system_router::{RoutingError, RoutingOutcome};
1563
1564 dactor::system_router::validate_system_message_type(&envelope.message_type)?;
1565
1566 let refs = self
1567 .system_actors
1568 .as_ref()
1569 .ok_or_else(|| RoutingError::new("system actors not started"))?;
1570
1571 match envelope.message_type.as_str() {
1572 SYSTEM_MSG_TYPE_SPAWN => {
1573 let request = dactor::proto::decode_spawn_request(&envelope.body)
1574 .map_err(|e| RoutingError::new(format!("decode SpawnRequest: {e}")))?;
1575
1576 let req_id = request.request_id.clone();
1577 let (tx, rx) = tokio::sync::oneshot::channel();
1578 refs.spawn_manager
1579 .cast(crate::system_actors::SpawnManagerMsg::HandleRequest {
1580 request,
1581 reply: tx,
1582 })
1583 .map_err(|e| RoutingError::new(format!("SpawnManager mailbox: {e}")))?;
1584
1585 let result = rx
1586 .await
1587 .map_err(|_| RoutingError::new("SpawnManager reply dropped"))?;
1588
1589 match result {
1590 Ok((actor_id, _actor)) => Ok(RoutingOutcome::SpawnCompleted {
1591 request_id: req_id,
1592 actor_id,
1593 }),
1594 Err(SpawnResponse::Failure { request_id, error }) => {
1595 Ok(RoutingOutcome::SpawnFailed { request_id, error })
1596 }
1597 Err(SpawnResponse::Success { .. }) => {
1598 unreachable!("SpawnResult::Err always wraps SpawnResponse::Failure")
1599 }
1600 }
1601 }
1602
1603 SYSTEM_MSG_TYPE_WATCH => {
1604 let request = dactor::proto::decode_watch_request(&envelope.body)
1605 .map_err(|e| RoutingError::new(format!("decode WatchRequest: {e}")))?;
1606
1607 refs.watch_manager
1608 .cast(crate::system_actors::WatchManagerMsg::Watch {
1609 target: request.target,
1610 watcher: request.watcher,
1611 })
1612 .map_err(|e| RoutingError::new(format!("WatchManager mailbox: {e}")))?;
1613
1614 Ok(RoutingOutcome::Acknowledged)
1615 }
1616
1617 SYSTEM_MSG_TYPE_UNWATCH => {
1618 let request = dactor::proto::decode_unwatch_request(&envelope.body)
1619 .map_err(|e| RoutingError::new(format!("decode UnwatchRequest: {e}")))?;
1620
1621 refs.watch_manager
1622 .cast(crate::system_actors::WatchManagerMsg::Unwatch {
1623 target: request.target,
1624 watcher: request.watcher,
1625 })
1626 .map_err(|e| RoutingError::new(format!("WatchManager mailbox: {e}")))?;
1627
1628 Ok(RoutingOutcome::Acknowledged)
1629 }
1630
1631 SYSTEM_MSG_TYPE_CANCEL => {
1632 let request = dactor::proto::decode_cancel_request(&envelope.body)
1633 .map_err(|e| RoutingError::new(format!("decode CancelRequest: {e}")))?;
1634
1635 let request_id = request
1636 .request_id
1637 .ok_or_else(|| RoutingError::new("CancelRequest missing request_id"))?;
1638
1639 let (tx, rx) = tokio::sync::oneshot::channel();
1640 refs.cancel_manager
1641 .cast(crate::system_actors::CancelManagerMsg::Cancel {
1642 request_id,
1643 reply: tx,
1644 })
1645 .map_err(|e| RoutingError::new(format!("CancelManager mailbox: {e}")))?;
1646
1647 let response = rx
1648 .await
1649 .map_err(|_| RoutingError::new("CancelManager reply dropped"))?;
1650
1651 match response {
1652 CancelResponse::Acknowledged => Ok(RoutingOutcome::CancelAcknowledged),
1653 CancelResponse::NotFound { reason } => {
1654 Ok(RoutingOutcome::CancelNotFound { reason })
1655 }
1656 }
1657 }
1658
1659 SYSTEM_MSG_TYPE_CONNECT_PEER => {
1660 let (peer_id, address) = dactor::proto::decode_connect_peer(&envelope.body)
1661 .map_err(|e| RoutingError::new(format!("decode ConnectPeer: {e}")))?;
1662
1663 refs.node_directory
1664 .cast(crate::system_actors::NodeDirectoryMsg::ConnectPeer {
1665 peer_id,
1666 address,
1667 })
1668 .map_err(|e| RoutingError::new(format!("NodeDirectory mailbox: {e}")))?;
1669
1670 Ok(RoutingOutcome::Acknowledged)
1671 }
1672
1673 SYSTEM_MSG_TYPE_DISCONNECT_PEER => {
1674 let peer_id = dactor::proto::decode_disconnect_peer(&envelope.body)
1675 .map_err(|e| RoutingError::new(format!("decode DisconnectPeer: {e}")))?;
1676
1677 refs.node_directory
1678 .cast(crate::system_actors::NodeDirectoryMsg::DisconnectPeer { peer_id })
1679 .map_err(|e| RoutingError::new(format!("NodeDirectory mailbox: {e}")))?;
1680
1681 Ok(RoutingOutcome::Acknowledged)
1682 }
1683
1684 other => Err(RoutingError::new(format!(
1688 "unhandled system message type: {other}"
1689 ))),
1690 }
1691 }
1692}
1693