1use std::collections::HashMap;
7use std::marker::PhantomData;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Arc, Mutex};
10use std::time::Duration;
11
12use futures::FutureExt;
13use tokio_util::sync::CancellationToken;
14
15use dactor::actor::{
16 Actor, ActorContext, ActorError, ActorRef, AskReply, ReduceHandler, Handler, ExpandHandler,
17 TransformHandler,
18};
19use dactor::dead_letter::{DeadLetterEvent, DeadLetterHandler, DeadLetterReason};
20use dactor::dispatch::{AskDispatch, Dispatch, ReduceDispatch, ExpandDispatch, TransformDispatch, TypedDispatch};
21use dactor::errors::{ActorSendError, ErrorAction, RuntimeError};
22use dactor::interceptor::{
23 Disposition, DropObserver, InboundContext, InboundInterceptor, OutboundInterceptor, Outcome,
24 SendMode,
25};
26use dactor::mailbox::MailboxConfig;
27use dactor::message::{Headers, Message, RuntimeHeaders};
28use dactor::node::{ActorId, NodeId};
29use dactor::runtime_support::{
30 spawn_reduce_batched_drain, spawn_reduce_drain, spawn_transform_drain,
31 wrap_batched_stream_with_interception, wrap_stream_with_interception, OutboundPipeline,
32};
33use dactor::stream::{
34 BatchConfig, BatchReader, BatchWriter, BoxStream, StreamReceiver, StreamSender,
35};
36use dactor::supervision::ChildTerminated;
37use dactor::system_actors::{
38 CancelManager, CancelResponse, NodeDirectory, PeerStatus, SpawnManager, SpawnRequest,
39 SpawnResponse, WatchManager, WatchNotification,
40};
41use dactor::type_registry::TypeRegistry;
42
43use crate::cluster::RactorClusterEvents;
44
45struct WatchEntry {
51 watcher_id: ActorId,
52 notify: Box<dyn Fn(ChildTerminated) + Send + Sync>,
54}
55
56type WatcherMap = Arc<Mutex<HashMap<ActorId, Vec<WatchEntry>>>>;
58
59struct DactorMsg<A: Actor>(Box<dyn Dispatch<A>>);
65
66struct RactorDactorActor<A: Actor> {
68 _phantom: PhantomData<fn() -> A>,
69}
70
71struct RactorActorState<A: Actor> {
73 actor: A,
74 ctx: ActorContext,
75 interceptors: Vec<Box<dyn InboundInterceptor>>,
76 watchers: WatcherMap,
77 stop_reason: Option<String>,
78 dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
79 stop_notifier: Option<tokio::sync::oneshot::Sender<Result<(), String>>>,
81}
82
83struct RactorSpawnArgs<A: Actor> {
85 args: A::Args,
86 deps: A::Deps,
87 actor_id: ActorId,
88 actor_name: String,
89 interceptors: Vec<Box<dyn InboundInterceptor>>,
90 watchers: WatcherMap,
91 dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
92 stop_notifier: Option<tokio::sync::oneshot::Sender<Result<(), String>>>,
93}
94
95impl<A: Actor + 'static> ractor::Actor for RactorDactorActor<A> {
96 type Msg = DactorMsg<A>;
97 type State = RactorActorState<A>;
98 type Arguments = RactorSpawnArgs<A>;
99
100 async fn pre_start(
101 &self,
102 _myself: ractor::ActorRef<Self::Msg>,
103 args: Self::Arguments,
104 ) -> Result<Self::State, ractor::ActorProcessingErr> {
105 let mut actor = A::create(args.args, args.deps);
106 let mut ctx = ActorContext::new(args.actor_id, args.actor_name);
107 actor.on_start(&mut ctx).await;
108 Ok(RactorActorState {
109 actor,
110 ctx,
111 interceptors: args.interceptors,
112 watchers: args.watchers,
113 stop_reason: None,
114 dead_letter_handler: args.dead_letter_handler,
115 stop_notifier: args.stop_notifier,
116 })
117 }
118
119 async fn handle(
120 &self,
121 myself: ractor::ActorRef<Self::Msg>,
122 message: Self::Msg,
123 state: &mut Self::State,
124 ) -> Result<(), ractor::ActorProcessingErr> {
125 let dispatch = message.0;
126
127 let send_mode = dispatch.send_mode();
129 let message_type = dispatch.message_type_name();
130
131 state.ctx.send_mode = Some(send_mode);
132 state.ctx.headers = Headers::new();
133
134 let runtime_headers = RuntimeHeaders::new();
136 let mut headers = Headers::new();
137 let mut total_delay = Duration::ZERO;
138 let mut rejection: Option<(String, Disposition)> = None;
139
140 {
141 let ictx = InboundContext {
142 actor_id: state.ctx.actor_id.clone(),
143 actor_name: &state.ctx.actor_name,
144 message_type,
145 send_mode,
146 remote: false,
147 origin_node: None,
148 };
149
150 for interceptor in &state.interceptors {
151 match interceptor.on_receive(
152 &ictx,
153 &runtime_headers,
154 &mut headers,
155 dispatch.message_any(),
156 ) {
157 Disposition::Continue => {}
158 Disposition::Delay(d) => {
159 total_delay += d;
160 }
161 disp @ (Disposition::Drop | Disposition::Reject(_) | Disposition::Retry(_)) => {
162 rejection = Some((interceptor.name().to_string(), disp));
163 break;
164 }
165 }
166 }
167 }
168
169 if let Some((interceptor_name, disposition)) = rejection {
171 if matches!(disposition, Disposition::Drop) {
172 if let Some(ref handler) = *state.dead_letter_handler {
173 let event = DeadLetterEvent {
174 target_id: state.ctx.actor_id.clone(),
175 target_name: Some(state.ctx.actor_name.clone()),
176 message_type,
177 send_mode,
178 reason: DeadLetterReason::DroppedByInterceptor {
179 interceptor: interceptor_name.clone(),
180 },
181 message: None,
182 };
183 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
184 handler.on_dead_letter(event);
185 }));
186 }
187 }
188 dispatch.reject(disposition, &interceptor_name);
189 return Ok(());
190 }
191
192 if !total_delay.is_zero() {
193 tokio::time::sleep(total_delay).await;
194 }
195
196 state.ctx.headers = headers;
198
199 let cancel_token = dispatch.cancel_token();
201 state.ctx.set_cancellation_token(cancel_token.clone());
202
203 if let Some(ref token) = cancel_token {
205 if token.is_cancelled() {
206 dispatch.cancel();
207 state.ctx.set_cancellation_token(None);
208 return Ok(());
209 }
210 }
211
212 let result = if let Some(ref token) = cancel_token {
214 let dispatch_fut =
215 std::panic::AssertUnwindSafe(dispatch.dispatch(&mut state.actor, &mut state.ctx))
216 .catch_unwind();
217 tokio::select! {
218 biased;
219 r = dispatch_fut => r,
220 _ = token.cancelled() => {
221 state.ctx.set_cancellation_token(None);
225 return Ok(());
226 }
227 }
228 } else {
229 std::panic::AssertUnwindSafe(dispatch.dispatch(&mut state.actor, &mut state.ctx))
230 .catch_unwind()
231 .await
232 };
233
234 state.ctx.set_cancellation_token(None);
235
236 let ictx = InboundContext {
238 actor_id: state.ctx.actor_id.clone(),
239 actor_name: &state.ctx.actor_name,
240 message_type,
241 send_mode,
242 remote: false,
243 origin_node: None,
244 };
245
246 match result {
247 Ok(dispatch_result) => {
248 let outcome = match (&dispatch_result.reply, send_mode) {
249 (Some(reply), SendMode::Ask) => Outcome::AskSuccess {
250 reply: reply.as_ref(),
251 },
252 _ => Outcome::TellSuccess,
253 };
254
255 for interceptor in &state.interceptors {
256 interceptor.on_complete(&ictx, &runtime_headers, &state.ctx.headers, &outcome);
257 }
258
259 dispatch_result.send_reply();
261 }
262 Err(_panic) => {
263 let error = ActorError::internal("handler panicked");
264 let action = state.actor.on_error(&error);
265
266 let outcome = Outcome::HandlerError { error };
267 for interceptor in &state.interceptors {
268 interceptor.on_complete(&ictx, &runtime_headers, &state.ctx.headers, &outcome);
269 }
270
271 match action {
272 ErrorAction::Resume => {
273 }
275 ErrorAction::Stop | ErrorAction::Escalate => {
276 state.stop_reason = Some("handler panicked".into());
277 myself.stop(None);
278 }
279 ErrorAction::Restart => {
280 tracing::warn!("Restart not fully implemented, treating as Resume");
281 }
282 }
283 }
284 }
285
286 Ok(())
287 }
288
289 async fn post_stop(
290 &self,
291 _myself: ractor::ActorRef<Self::Msg>,
292 state: &mut Self::State,
293 ) -> Result<(), ractor::ActorProcessingErr> {
294 state.ctx.send_mode = None;
296 state.ctx.headers = Headers::new();
297 state.ctx.set_cancellation_token(None);
298
299 let on_stop_panicked =
302 std::panic::AssertUnwindSafe(state.actor.on_stop())
303 .catch_unwind()
304 .await
305 .is_err();
306 if on_stop_panicked && state.stop_reason.is_none() {
307 state.stop_reason = Some("actor panicked in on_stop".to_string());
308 }
309
310 let actor_id = state.ctx.actor_id.clone();
312 let actor_name = state.ctx.actor_name.clone();
313 let entries = {
314 let mut watchers = state.watchers.lock().unwrap();
315 let target_entries = watchers.remove(&actor_id).unwrap_or_default();
317 for entries in watchers.values_mut() {
319 entries.retain(|e| e.watcher_id != actor_id);
320 }
321 watchers.retain(|_, v| !v.is_empty());
322 target_entries
323 };
324 if !entries.is_empty() {
325 let notification = ChildTerminated {
326 child_id: actor_id,
327 child_name: actor_name,
328 reason: state.stop_reason.clone(),
329 };
330 for entry in &entries {
331 (entry.notify)(notification.clone());
332 }
333 }
334
335 if let Some(tx) = state.stop_notifier.take() {
338 let result = if on_stop_panicked {
339 Err("actor panicked in on_stop".to_string())
340 } else {
341 Ok(())
342 };
343 let _ = tx.send(result);
344 }
345
346 Ok(())
347 }
348}
349
350pub struct RactorActorRef<A: Actor> {
363 id: ActorId,
364 name: String,
365 inner: ractor::ActorRef<DactorMsg<A>>,
366 bounded_tx: Option<BoundedMailboxSender<DactorMsg<A>>>,
367 outbound_interceptors: Arc<Vec<Box<dyn OutboundInterceptor>>>,
368 drop_observer: Option<Arc<dyn DropObserver>>,
369 dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
370}
371
372use dactor::runtime_support::BoundedMailboxSender;
373
374impl<A: Actor> Clone for RactorActorRef<A> {
375 fn clone(&self) -> Self {
376 Self {
377 id: self.id.clone(),
378 name: self.name.clone(),
379 inner: self.inner.clone(),
380 bounded_tx: self.bounded_tx.clone(),
381 outbound_interceptors: self.outbound_interceptors.clone(),
382 drop_observer: self.drop_observer.clone(),
383 dead_letter_handler: self.dead_letter_handler.clone(),
384 }
385 }
386}
387
388impl<A: Actor> std::fmt::Debug for RactorActorRef<A> {
389 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
390 write!(f, "RactorActorRef({}, {:?})", self.name, self.id)
391 }
392}
393
394impl<A: Actor + 'static> RactorActorRef<A> {
395 fn outbound_pipeline(&self) -> OutboundPipeline {
396 OutboundPipeline {
397 interceptors: self.outbound_interceptors.clone(),
398 drop_observer: self.drop_observer.clone(),
399 target_id: self.id.clone(),
400 target_name: self.name.clone(),
401 }
402 }
403
404 fn notify_dead_letter(
405 &self,
406 message_type: &'static str,
407 send_mode: SendMode,
408 reason: DeadLetterReason,
409 ) {
410 if let Some(ref handler) = *self.dead_letter_handler {
411 let event = DeadLetterEvent {
412 target_id: self.id.clone(),
413 target_name: Some(self.name.clone()),
414 message_type,
415 send_mode,
416 reason,
417 message: None,
418 };
419 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
420 handler.on_dead_letter(event);
421 }));
422 }
423 }
424
425 fn send_dispatch(&self, dispatch: Box<dyn Dispatch<A>>) -> Result<(), ActorSendError> {
428 if let Some(ref btx) = self.bounded_tx {
429 btx.try_send(DactorMsg(dispatch))
430 } else {
431 self.inner
432 .cast(DactorMsg(dispatch))
433 .map_err(|e| ActorSendError(e.to_string()))
434 }
435 }
436}
437
438impl<A: Actor + 'static> ActorRef<A> for RactorActorRef<A> {
439 fn id(&self) -> ActorId {
440 self.id.clone()
441 }
442
443 fn name(&self) -> String {
444 self.name.clone()
445 }
446
447 fn is_alive(&self) -> bool {
448 let inner_alive = matches!(
449 self.inner.get_status(),
450 ractor::ActorStatus::Running
451 | ractor::ActorStatus::Starting
452 | ractor::ActorStatus::Upgrading
453 );
454 if let Some(ref btx) = self.bounded_tx {
455 !btx.is_closed() && inner_alive
456 } else {
457 inner_alive
458 }
459 }
460
461 fn pending_messages(&self) -> usize {
462 if let Some(ref btx) = self.bounded_tx {
463 btx.pending()
464 } else {
465 0
466 }
467 }
468
469 fn stop(&self) {
470 self.inner.stop(None);
471 }
472
473 fn tell<M>(&self, msg: M) -> Result<(), ActorSendError>
474 where
475 A: Handler<M>,
476 M: Message<Reply = ()>,
477 {
478 let pipeline = self.outbound_pipeline();
479 let result = pipeline.run_on_send(SendMode::Tell, &msg);
480 match result.disposition {
481 Disposition::Continue => {}
482 Disposition::Delay(_) => {} Disposition::Drop | Disposition::Reject(_) | Disposition::Retry(_) => return Ok(()),
484 }
485
486 let dispatch: Box<dyn Dispatch<A>> = Box::new(TypedDispatch { msg });
487 self.send_dispatch(dispatch).map_err(|e| {
488 let reason = if e.0.contains("full") {
489 DeadLetterReason::MailboxFull
490 } else {
491 DeadLetterReason::ActorStopped
492 };
493 self.notify_dead_letter(std::any::type_name::<M>(), SendMode::Tell, reason);
494 e
495 })
496 }
497
498 fn ask<M>(
499 &self,
500 msg: M,
501 cancel: Option<CancellationToken>,
502 ) -> Result<AskReply<M::Reply>, ActorSendError>
503 where
504 A: Handler<M>,
505 M: Message,
506 {
507 let pipeline = self.outbound_pipeline();
508 let result = pipeline.run_on_send(SendMode::Ask, &msg);
509 match result.disposition {
510 Disposition::Continue => {}
511 Disposition::Delay(_) => {} Disposition::Drop => {
513 let (tx, rx) = tokio::sync::oneshot::channel();
514 let _ = tx.send(Err(RuntimeError::ActorNotFound(
515 "message dropped by outbound interceptor".into(),
516 )));
517 return Ok(AskReply::new(rx));
518 }
519 Disposition::Reject(reason) => {
520 let (tx, rx) = tokio::sync::oneshot::channel();
521 let _ = tx.send(Err(RuntimeError::Rejected {
522 interceptor: result.interceptor_name.to_string(),
523 reason,
524 }));
525 return Ok(AskReply::new(rx));
526 }
527 Disposition::Retry(retry_after) => {
528 let (tx, rx) = tokio::sync::oneshot::channel();
529 let _ = tx.send(Err(RuntimeError::RetryAfter {
530 interceptor: result.interceptor_name.to_string(),
531 retry_after,
532 }));
533 return Ok(AskReply::new(rx));
534 }
535 }
536
537 let (tx, rx) = tokio::sync::oneshot::channel();
538 let dispatch: Box<dyn Dispatch<A>> = Box::new(AskDispatch {
539 msg,
540 reply_tx: tx,
541 cancel,
542 });
543 self.send_dispatch(dispatch).map_err(|e| {
544 let reason = if e.0.contains("full") {
545 DeadLetterReason::MailboxFull
546 } else {
547 DeadLetterReason::ActorStopped
548 };
549 self.notify_dead_letter(std::any::type_name::<M>(), SendMode::Ask, reason);
550 e
551 })?;
552 Ok(AskReply::new(rx))
553 }
554
555 fn expand<M, OutputItem>(
556 &self,
557 msg: M,
558 buffer: usize,
559 batch_config: Option<BatchConfig>,
560 cancel: Option<CancellationToken>,
561 ) -> Result<BoxStream<OutputItem>, ActorSendError>
562 where
563 A: ExpandHandler<M, OutputItem>,
564 M: Send + 'static,
565 OutputItem: Send + 'static,
566 {
567 let pipeline = self.outbound_pipeline();
568 let result = pipeline.run_on_send(SendMode::Expand, &msg);
569 match result.disposition {
570 Disposition::Continue => {}
571 Disposition::Delay(_) => {}
572 Disposition::Drop => {
573 return Err(ActorSendError(
574 "stream dropped by outbound interceptor".into(),
575 ));
576 }
577 Disposition::Reject(reason) => {
578 return Err(ActorSendError(format!("stream rejected: {}", reason)));
579 }
580 Disposition::Retry(_) => {
581 return Err(ActorSendError(
582 "stream retry requested by interceptor".into(),
583 ));
584 }
585 }
586
587 let buffer = buffer.max(1);
588 let (tx, mut rx) = tokio::sync::mpsc::channel(buffer);
589 let sender = StreamSender::new(tx);
590 let dispatch: Box<dyn Dispatch<A>> = Box::new(ExpandDispatch {
591 msg,
592 sender,
593 cancel,
594 });
595 self.send_dispatch(dispatch)?;
596
597 match batch_config {
598 Some(batch_config) => {
599 let (batch_tx, batch_rx) = tokio::sync::mpsc::channel::<Vec<OutputItem>>(buffer);
600 let reader = BatchReader::new(batch_rx);
601 let batch_delay = batch_config.max_delay;
602 tokio::spawn(async move {
603 let mut writer = BatchWriter::new(batch_tx, batch_config);
604 loop {
605 if writer.buffered_count() > 0 {
606 let deadline = tokio::time::Instant::now() + batch_delay;
607 tokio::select! {
608 biased;
609 item = rx.recv() => match item {
610 Some(item) => {
611 if writer.push(item).await.is_err() { break; }
612 }
613 None => break,
614 },
615 _ = tokio::time::sleep_until(deadline) => {
616 if writer.check_deadline().await.is_err() { break; }
617 }
618 }
619 } else {
620 match rx.recv().await {
621 Some(item) => {
622 if writer.push(item).await.is_err() {
623 break;
624 }
625 }
626 None => break,
627 }
628 }
629 }
630 let _ = writer.flush().await;
631 });
632 Ok(wrap_batched_stream_with_interception(
633 reader,
634 buffer,
635 pipeline,
636 std::any::type_name::<M>(),
637 SendMode::Expand,
638 ))
639 }
640 None => Ok(wrap_stream_with_interception(
641 rx,
642 buffer,
643 pipeline,
644 std::any::type_name::<M>(),
645 SendMode::Expand,
646 )),
647 }
648 }
649
650 fn reduce<InputItem, Reply>(
651 &self,
652 input: BoxStream<InputItem>,
653 buffer: usize,
654 batch_config: Option<BatchConfig>,
655 cancel: Option<CancellationToken>,
656 ) -> Result<AskReply<Reply>, ActorSendError>
657 where
658 A: ReduceHandler<InputItem, Reply>,
659 InputItem: Send + 'static,
660 Reply: Send + 'static,
661 {
662 let buffer = buffer.max(1);
663 let (item_tx, item_rx) = tokio::sync::mpsc::channel(buffer);
664 let receiver = StreamReceiver::new(item_rx);
665 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
666 let dispatch: Box<dyn Dispatch<A>> = Box::new(ReduceDispatch {
667 receiver,
668 reply_tx,
669 cancel: cancel.clone(),
670 });
671 self.send_dispatch(dispatch)?;
672
673 let pipeline = self.outbound_pipeline();
674 match batch_config {
675 Some(batch_config) => {
676 spawn_reduce_batched_drain(
677 input,
678 item_tx,
679 buffer,
680 batch_config,
681 cancel,
682 pipeline,
683 std::any::type_name::<InputItem>(),
684 );
685 }
686 None => {
687 spawn_reduce_drain(
688 input,
689 item_tx,
690 cancel,
691 pipeline,
692 std::any::type_name::<InputItem>(),
693 );
694 }
695 }
696
697 Ok(AskReply::new(reply_rx))
698 }
699
700 fn transform<InputItem, OutputItem>(
701 &self,
702 input: BoxStream<InputItem>,
703 buffer: usize,
704 batch_config: Option<BatchConfig>,
705 cancel: Option<CancellationToken>,
706 ) -> Result<BoxStream<OutputItem>, ActorSendError>
707 where
708 A: TransformHandler<InputItem, OutputItem>,
709 InputItem: Send + 'static,
710 OutputItem: Send + 'static,
711 {
712 let buffer = buffer.max(1);
713 let (item_tx, item_rx) = tokio::sync::mpsc::channel(buffer);
714 let (output_tx, mut output_rx) = tokio::sync::mpsc::channel(buffer);
715 let receiver = StreamReceiver::new(item_rx);
716 let sender = StreamSender::new(output_tx);
717 let dispatch: Box<dyn Dispatch<A>> = Box::new(TransformDispatch::new(
718 receiver,
719 sender,
720 cancel.clone(),
721 ));
722 self.send_dispatch(dispatch)?;
723
724 let pipeline = self.outbound_pipeline();
725 spawn_transform_drain(
726 input,
727 item_tx,
728 cancel,
729 pipeline.clone(),
730 std::any::type_name::<InputItem>(),
731 );
732
733 match batch_config {
734 Some(batch_config) => {
735 let (batch_tx, batch_rx) =
736 tokio::sync::mpsc::channel::<Vec<OutputItem>>(buffer);
737 let reader = BatchReader::new(batch_rx);
738 let batch_delay = batch_config.max_delay;
739 tokio::spawn(async move {
740 let mut writer = BatchWriter::new(batch_tx, batch_config);
741 loop {
742 if writer.buffered_count() > 0 {
743 let deadline = tokio::time::Instant::now() + batch_delay;
744 tokio::select! {
745 biased;
746 item = output_rx.recv() => match item {
747 Some(item) => {
748 if writer.push(item).await.is_err() { break; }
749 }
750 None => break,
751 },
752 _ = tokio::time::sleep_until(deadline) => {
753 if writer.check_deadline().await.is_err() { break; }
754 }
755 }
756 } else {
757 match output_rx.recv().await {
758 Some(item) => {
759 if writer.push(item).await.is_err() {
760 break;
761 }
762 }
763 None => break,
764 }
765 }
766 }
767 let _ = writer.flush().await;
768 });
769 Ok(wrap_batched_stream_with_interception(
770 reader,
771 buffer,
772 pipeline,
773 std::any::type_name::<OutputItem>(),
774 SendMode::Transform,
775 ))
776 }
777 None => Ok(wrap_stream_with_interception(
778 output_rx,
779 buffer,
780 pipeline,
781 std::any::type_name::<OutputItem>(),
782 SendMode::Transform,
783 )),
784 }
785 }
786}
787
788pub struct SpawnOptions {
796 pub interceptors: Vec<Box<dyn InboundInterceptor>>,
798 pub mailbox: MailboxConfig,
805}
806
807impl Default for SpawnOptions {
808 fn default() -> Self {
809 Self {
810 interceptors: Vec::new(),
811 mailbox: MailboxConfig::Unbounded,
812 }
813 }
814}
815
816pub struct RactorRuntime {
836 node_id: NodeId,
837 next_local: Arc<AtomicU64>,
838 cluster_events: RactorClusterEvents,
839 outbound_interceptors: Arc<Vec<Box<dyn OutboundInterceptor>>>,
840 drop_observer: Option<Arc<dyn DropObserver>>,
841 dead_letter_handler: Arc<Option<Arc<dyn DeadLetterHandler>>>,
842 watchers: WatcherMap,
843 spawn_manager: SpawnManager,
845 watch_manager: WatchManager,
846 cancel_manager: CancelManager,
847 node_directory: NodeDirectory,
848 system_actors: Option<RactorSystemActorRefs>,
850 #[allow(clippy::type_complexity)]
852 stop_receivers: Arc<Mutex<HashMap<ActorId, tokio::sync::oneshot::Receiver<Result<(), String>>>>>,
853 app_version: Option<String>,
855}
856
857pub struct RactorSystemActorRefs {
859 pub spawn_manager: ractor::ActorRef<crate::system_actors::SpawnManagerMsg>,
860 pub watch_manager: ractor::ActorRef<crate::system_actors::WatchManagerMsg>,
861 pub cancel_manager: ractor::ActorRef<crate::system_actors::CancelManagerMsg>,
862 pub node_directory: ractor::ActorRef<crate::system_actors::NodeDirectoryMsg>,
863}
864
865impl RactorRuntime {
866 pub fn new() -> Self {
871 Self::create(NodeId("ractor-node".into()))
872 }
873
874 pub fn with_node_id(node_id: NodeId) -> Self {
876 Self::create(node_id)
877 }
878
879 fn create(node_id: NodeId) -> Self {
880 Self {
881 node_id,
882 next_local: Arc::new(AtomicU64::new(1)),
883 cluster_events: RactorClusterEvents::new(),
884 outbound_interceptors: Arc::new(Vec::new()),
885 drop_observer: None,
886 dead_letter_handler: Arc::new(None),
887 watchers: Arc::new(Mutex::new(HashMap::new())),
888 spawn_manager: SpawnManager::new(TypeRegistry::new()),
889 watch_manager: WatchManager::new(),
890 cancel_manager: CancelManager::new(),
891 node_directory: NodeDirectory::new(),
892 system_actors: None,
893 stop_receivers: Arc::new(Mutex::new(HashMap::new())),
894 app_version: None,
895 }
896 }
897
898 pub const ADAPTER_NAME: &'static str = "ractor";
900
901 pub fn with_app_version(mut self, version: impl Into<String>) -> Self {
907 self.app_version = Some(version.into());
908 self
909 }
910
911 pub fn app_version(&self) -> Option<&str> {
913 self.app_version.as_deref()
914 }
915
916 pub fn handshake_request(&self) -> dactor::HandshakeRequest {
919 dactor::HandshakeRequest::from_runtime(
920 self.node_id.clone(),
921 self.app_version.clone(),
922 Self::ADAPTER_NAME,
923 )
924 }
925
926 pub async fn start_system_actors(&mut self) {
934 use crate::system_actors::*;
935
936 let (spawn_ref, _) = ractor::Actor::spawn(
937 None, SpawnManagerActor,
938 (self.node_id.clone(), TypeRegistry::new(), self.next_local.clone()),
939 ).await.expect("failed to spawn SpawnManagerActor");
940
941 let (watch_ref, _) = ractor::Actor::spawn(
942 None, WatchManagerActor, (),
943 ).await.expect("failed to spawn WatchManagerActor");
944
945 let (cancel_ref, _) = ractor::Actor::spawn(
946 None, CancelManagerActor, (),
947 ).await.expect("failed to spawn CancelManagerActor");
948
949 let (node_dir_ref, _) = ractor::Actor::spawn(
950 None, NodeDirectoryActor, (),
951 ).await.expect("failed to spawn NodeDirectoryActor");
952
953 self.system_actors = Some(RactorSystemActorRefs {
954 spawn_manager: spawn_ref,
955 watch_manager: watch_ref,
956 cancel_manager: cancel_ref,
957 node_directory: node_dir_ref,
958 });
959 }
960
961 pub fn node_id(&self) -> &NodeId {
963 &self.node_id
964 }
965
966 pub fn system_actor_refs(&self) -> Option<&RactorSystemActorRefs> {
970 self.system_actors.as_ref()
971 }
972
973 pub fn add_outbound_interceptor(&mut self, interceptor: Box<dyn OutboundInterceptor>) {
978 Arc::get_mut(&mut self.outbound_interceptors)
979 .expect("cannot add interceptors after actors are spawned")
980 .push(interceptor);
981 }
982
983 pub fn set_drop_observer(&mut self, observer: Arc<dyn DropObserver>) {
986 self.drop_observer = Some(observer);
987 }
988
989 pub fn set_dead_letter_handler(&mut self, handler: Arc<dyn DeadLetterHandler>) {
992 self.dead_letter_handler = Arc::new(Some(handler));
993 }
994
995 pub fn cluster_events_handle(&self) -> &RactorClusterEvents {
997 &self.cluster_events
998 }
999
1000 pub fn cluster_events(&self) -> &RactorClusterEvents {
1002 &self.cluster_events
1003 }
1004
1005 pub async fn spawn<A>(&self, name: &str, args: A::Args) -> Result<RactorActorRef<A>, dactor::errors::RuntimeError>
1007 where
1008 A: Actor<Deps = ()> + 'static,
1009 {
1010 self.spawn_internal::<A>(name, args, (), Vec::new(), MailboxConfig::Unbounded).await
1011 }
1012
1013 pub async fn spawn_with_deps<A>(&self, name: &str, args: A::Args, deps: A::Deps) -> Result<RactorActorRef<A>, dactor::errors::RuntimeError>
1015 where
1016 A: Actor + 'static,
1017 {
1018 self.spawn_internal::<A>(name, args, deps, Vec::new(), MailboxConfig::Unbounded).await
1019 }
1020
1021 pub async fn spawn_with_options<A>(
1023 &self,
1024 name: &str,
1025 args: A::Args,
1026 options: SpawnOptions,
1027 ) -> Result<RactorActorRef<A>, dactor::errors::RuntimeError>
1028 where
1029 A: Actor<Deps = ()> + 'static,
1030 {
1031 self.spawn_internal::<A>(name, args, (), options.interceptors, options.mailbox).await
1032 }
1033
1034 async fn spawn_internal<A>(
1035 &self,
1036 name: &str,
1037 args: A::Args,
1038 deps: A::Deps,
1039 interceptors: Vec<Box<dyn InboundInterceptor>>,
1040 mailbox: MailboxConfig,
1041 ) -> Result<RactorActorRef<A>, dactor::errors::RuntimeError>
1042 where
1043 A: Actor + 'static,
1044 {
1045 let local = self.next_local.fetch_add(1, Ordering::SeqCst);
1046 let actor_id = ActorId {
1047 node: self.node_id.clone(),
1048 local,
1049 };
1050 let actor_name = name.to_string();
1051
1052 let (stop_tx, stop_rx) = tokio::sync::oneshot::channel();
1053
1054 let wrapper = RactorDactorActor::<A> {
1055 _phantom: PhantomData,
1056 };
1057 let spawn_args = RactorSpawnArgs {
1058 args,
1059 deps,
1060 actor_id: actor_id.clone(),
1061 actor_name: actor_name.clone(),
1062 interceptors,
1063 watchers: self.watchers.clone(),
1064 dead_letter_handler: self.dead_letter_handler.clone(),
1065 stop_notifier: Some(stop_tx),
1066 };
1067
1068 let (actor_ref, _join_handle) = ractor::Actor::spawn(Some(name.to_string()), wrapper, spawn_args)
1069 .await
1070 .map_err(|e| dactor::errors::RuntimeError::SpawnFailed(e.to_string()))?;
1071
1072 let bounded_tx = match mailbox {
1074 MailboxConfig::Bounded { capacity, overflow } => {
1075 let (btx, mut brx) = tokio::sync::mpsc::channel::<DactorMsg<A>>(capacity);
1076 let fwd_ref = actor_ref.clone();
1077 tokio::spawn(async move {
1078 while let Some(msg) = brx.recv().await {
1079 if fwd_ref.cast(msg).is_err() {
1080 break;
1081 }
1082 }
1083 });
1084 Some(BoundedMailboxSender::new(btx, overflow))
1085 }
1086 MailboxConfig::Unbounded => None,
1087 };
1088
1089 self.stop_receivers.lock().unwrap().insert(actor_id.clone(), stop_rx);
1091
1092 Ok(RactorActorRef {
1093 id: actor_id,
1094 name: actor_name,
1095 inner: actor_ref,
1096 bounded_tx,
1097 outbound_interceptors: self.outbound_interceptors.clone(),
1098 drop_observer: self.drop_observer.clone(),
1099 dead_letter_handler: self.dead_letter_handler.clone(),
1100 })
1101 }
1102
1103 pub fn watch<W>(&self, watcher: &RactorActorRef<W>, target_id: ActorId)
1108 where
1109 W: Actor + Handler<ChildTerminated> + 'static,
1110 {
1111 let watcher_id = watcher.id();
1112 let watcher_inner = watcher.inner.clone();
1113
1114 let entry = WatchEntry {
1115 watcher_id,
1116 notify: Box::new(move |msg: ChildTerminated| {
1117 let dispatch: Box<dyn Dispatch<W>> = Box::new(TypedDispatch { msg });
1118 if watcher_inner.cast(DactorMsg(dispatch)).is_err() {
1119 tracing::debug!("watch notification dropped — watcher may have stopped");
1120 }
1121 }),
1122 };
1123
1124 let mut watchers = self.watchers.lock().unwrap();
1125 watchers.entry(target_id).or_default().push(entry);
1126 }
1127
1128 pub fn unwatch(&self, watcher_id: &ActorId, target_id: &ActorId) {
1130 let mut watchers = self.watchers.lock().unwrap();
1131 if let Some(entries) = watchers.get_mut(target_id) {
1132 entries.retain(|e| &e.watcher_id != watcher_id);
1133 if entries.is_empty() {
1134 watchers.remove(target_id);
1135 }
1136 }
1137 }
1138
1139 pub fn spawn_manager(&self) -> &SpawnManager {
1145 &self.spawn_manager
1146 }
1147
1148 pub fn spawn_manager_mut(&mut self) -> &mut SpawnManager {
1150 &mut self.spawn_manager
1151 }
1152
1153 pub fn register_factory(
1158 &mut self,
1159 type_name: impl Into<String>,
1160 factory: impl Fn(&[u8]) -> Result<Box<dyn std::any::Any + Send>, dactor::remote::SerializationError>
1161 + Send
1162 + Sync
1163 + 'static,
1164 ) {
1165 let type_name = type_name.into();
1166 let factory = Arc::new(factory);
1167
1168 self.spawn_manager
1170 .type_registry_mut()
1171 .register_factory(type_name.clone(), {
1172 let f = factory.clone();
1173 move |bytes: &[u8]| f(bytes)
1174 });
1175
1176 if let Some(ref actors) = self.system_actors {
1178 let (tx, _rx) = tokio::sync::oneshot::channel();
1179 let f = factory;
1180 let _ = actors.spawn_manager.cast(
1181 crate::system_actors::SpawnManagerMsg::RegisterFactory {
1182 type_name,
1183 factory: Box::new(move |bytes: &[u8]| f(bytes)),
1184 reply: tx,
1185 },
1186 );
1187 }
1188 }
1189
1190 pub fn handle_spawn_request(
1207 &mut self,
1208 request: &SpawnRequest,
1209 ) -> Result<(ActorId, Box<dyn std::any::Any + Send>), SpawnResponse> {
1210 match self.spawn_manager.create_actor(request) {
1211 Ok(actor) => {
1212 let local = self.next_local.fetch_add(1, Ordering::SeqCst);
1213 let actor_id = ActorId {
1214 node: self.node_id.clone(),
1215 local,
1216 };
1217 self.spawn_manager.record_spawn(actor_id.clone());
1218 Ok((actor_id, actor))
1219 }
1220 Err(e) => Err(SpawnResponse::Failure {
1221 request_id: request.request_id.clone(),
1222 error: e.to_string(),
1223 }),
1224 }
1225 }
1226
1227 pub fn watch_manager(&self) -> &WatchManager {
1233 &self.watch_manager
1234 }
1235
1236 pub fn watch_manager_mut(&mut self) -> &mut WatchManager {
1238 &mut self.watch_manager
1239 }
1240
1241 pub fn remote_watch(&mut self, target: ActorId, watcher: ActorId) {
1244 self.watch_manager.watch(target, watcher);
1245 }
1246
1247 pub fn remote_unwatch(&mut self, target: &ActorId, watcher: &ActorId) {
1249 self.watch_manager.unwatch(target, watcher);
1250 }
1251
1252 pub fn notify_terminated(&mut self, terminated: &ActorId) -> Vec<WatchNotification> {
1258 self.watch_manager.on_terminated(terminated)
1259 }
1260
1261 pub fn cancel_manager(&self) -> &CancelManager {
1267 &self.cancel_manager
1268 }
1269
1270 pub fn cancel_manager_mut(&mut self) -> &mut CancelManager {
1272 &mut self.cancel_manager
1273 }
1274
1275 pub fn register_cancel(&mut self, request_id: String, token: CancellationToken) {
1277 self.cancel_manager.register(request_id, token);
1278 }
1279
1280 pub fn cancel_request(&mut self, request_id: &str) -> CancelResponse {
1282 self.cancel_manager.cancel(request_id)
1283 }
1284
1285 pub fn complete_request(&mut self, request_id: &str) {
1290 self.cancel_manager.remove(request_id);
1291 }
1292
1293 pub fn node_directory(&self) -> &NodeDirectory {
1299 &self.node_directory
1300 }
1301
1302 pub fn node_directory_mut(&mut self) -> &mut NodeDirectory {
1304 &mut self.node_directory
1305 }
1306
1307 pub fn connect_peer(&mut self, peer_id: NodeId, address: Option<String>) {
1320 let was_connected = self.node_directory.is_connected(&peer_id);
1321 if let Some(existing) = self.node_directory.get_peer(&peer_id) {
1322 let resolved_address = address.or_else(|| existing.address.clone());
1324 self.node_directory.remove_peer(&peer_id);
1325 self.node_directory.add_peer(peer_id.clone(), resolved_address);
1326 } else {
1327 self.node_directory.add_peer(peer_id.clone(), address);
1328 }
1329 self.node_directory.set_status(&peer_id, PeerStatus::Connected);
1330 if !was_connected {
1331 self.cluster_events.emit(dactor::ClusterEvent::NodeJoined(peer_id));
1332 }
1333 }
1334
1335 pub fn disconnect_peer(&mut self, peer_id: &NodeId) {
1339 let was_connected = self.node_directory.is_connected(peer_id);
1340 self.node_directory.set_status(peer_id, PeerStatus::Disconnected);
1341 if was_connected {
1342 self.cluster_events.emit(dactor::ClusterEvent::NodeLeft(peer_id.clone()));
1343 }
1344 }
1345
1346 pub fn is_peer_connected(&self, peer_id: &NodeId) -> bool {
1348 self.node_directory.is_connected(peer_id)
1349 }
1350
1351 pub async fn await_stop(&self, actor_id: &ActorId) -> Result<(), String> {
1364 let rx = {
1365 let mut receivers = self.stop_receivers.lock().unwrap();
1366 receivers.remove(actor_id)
1367 };
1368 match rx {
1369 Some(rx) => rx
1370 .await
1371 .map_err(|_| "stop notifier dropped".to_string())
1372 .and_then(|r| r),
1373 None => Ok(()),
1374 }
1375 }
1376
1377 pub async fn await_all(&self) -> Result<(), String> {
1382 let receivers: Vec<_> = {
1383 let mut map = self.stop_receivers.lock().unwrap();
1384 map.drain().collect()
1385 };
1386 let mut first_error = None;
1387 for (_, rx) in receivers {
1388 let result = rx.await.map_err(|e| format!("stop notifier dropped: {e}")).and_then(|r| r);
1389 if let Err(e) = result {
1390 if first_error.is_none() {
1391 first_error = Some(e);
1392 }
1393 }
1394 }
1395 match first_error {
1396 Some(e) => Err(e),
1397 None => Ok(()),
1398 }
1399 }
1400
1401 pub fn cleanup_finished(&self) {
1406 let mut receivers = self.stop_receivers.lock().unwrap();
1407 receivers.retain(|_, rx| {
1408 matches!(rx.try_recv(), Err(tokio::sync::oneshot::error::TryRecvError::Empty))
1409 });
1410 }
1411
1412 pub fn active_handle_count(&self) -> usize {
1418 self.stop_receivers.lock().unwrap().len()
1419 }
1420}
1421
1422impl Default for RactorRuntime {
1423 fn default() -> Self {
1424 Self::new()
1425 }
1426}
1427
1428#[async_trait::async_trait]
1439impl dactor::system_router::SystemMessageRouter for RactorRuntime {
1440 async fn route_system_envelope(
1441 &self,
1442 envelope: dactor::remote::WireEnvelope,
1443 ) -> Result<dactor::system_router::RoutingOutcome, dactor::system_router::RoutingError> {
1444 use dactor::system_actors::*;
1445 use dactor::system_router::{RoutingError, RoutingOutcome};
1446
1447 dactor::system_router::validate_system_message_type(&envelope.message_type)?;
1448
1449 let refs = self
1450 .system_actors
1451 .as_ref()
1452 .ok_or_else(|| RoutingError::new("system actors not started"))?;
1453
1454 match envelope.message_type.as_str() {
1455 SYSTEM_MSG_TYPE_SPAWN => {
1456 let request = dactor::proto::decode_spawn_request(&envelope.body)
1457 .map_err(|e| RoutingError::new(format!("decode SpawnRequest: {e}")))?;
1458
1459 let req_id = request.request_id.clone();
1460 let (tx, rx) = tokio::sync::oneshot::channel();
1461 refs.spawn_manager
1462 .cast(crate::system_actors::SpawnManagerMsg::HandleRequest {
1463 request,
1464 reply: tx,
1465 })
1466 .map_err(|e| RoutingError::new(format!("SpawnManager mailbox: {e}")))?;
1467
1468 let result = rx
1469 .await
1470 .map_err(|_| RoutingError::new("SpawnManager reply dropped"))?;
1471
1472 match result {
1473 Ok((actor_id, _actor)) => Ok(RoutingOutcome::SpawnCompleted {
1474 request_id: req_id,
1475 actor_id,
1476 }),
1477 Err(SpawnResponse::Failure { request_id, error }) => {
1478 Ok(RoutingOutcome::SpawnFailed { request_id, error })
1479 }
1480 Err(SpawnResponse::Success { .. }) => {
1481 unreachable!("SpawnResult::Err always wraps SpawnResponse::Failure")
1482 }
1483 }
1484 }
1485
1486 SYSTEM_MSG_TYPE_WATCH => {
1487 let request = dactor::proto::decode_watch_request(&envelope.body)
1488 .map_err(|e| RoutingError::new(format!("decode WatchRequest: {e}")))?;
1489
1490 refs.watch_manager
1491 .cast(crate::system_actors::WatchManagerMsg::Watch {
1492 target: request.target,
1493 watcher: request.watcher,
1494 })
1495 .map_err(|e| RoutingError::new(format!("WatchManager mailbox: {e}")))?;
1496
1497 Ok(RoutingOutcome::Acknowledged)
1498 }
1499
1500 SYSTEM_MSG_TYPE_UNWATCH => {
1501 let request = dactor::proto::decode_unwatch_request(&envelope.body)
1502 .map_err(|e| RoutingError::new(format!("decode UnwatchRequest: {e}")))?;
1503
1504 refs.watch_manager
1505 .cast(crate::system_actors::WatchManagerMsg::Unwatch {
1506 target: request.target,
1507 watcher: request.watcher,
1508 })
1509 .map_err(|e| RoutingError::new(format!("WatchManager mailbox: {e}")))?;
1510
1511 Ok(RoutingOutcome::Acknowledged)
1512 }
1513
1514 SYSTEM_MSG_TYPE_CANCEL => {
1515 let request = dactor::proto::decode_cancel_request(&envelope.body)
1516 .map_err(|e| RoutingError::new(format!("decode CancelRequest: {e}")))?;
1517
1518 let request_id = request
1519 .request_id
1520 .ok_or_else(|| RoutingError::new("CancelRequest missing request_id"))?;
1521
1522 let (tx, rx) = tokio::sync::oneshot::channel();
1523 refs.cancel_manager
1524 .cast(crate::system_actors::CancelManagerMsg::Cancel {
1525 request_id,
1526 reply: tx,
1527 })
1528 .map_err(|e| RoutingError::new(format!("CancelManager mailbox: {e}")))?;
1529
1530 let response = rx
1531 .await
1532 .map_err(|_| RoutingError::new("CancelManager reply dropped"))?;
1533
1534 match response {
1535 CancelResponse::Acknowledged => Ok(RoutingOutcome::CancelAcknowledged),
1536 CancelResponse::NotFound { reason } => {
1537 Ok(RoutingOutcome::CancelNotFound { reason })
1538 }
1539 }
1540 }
1541
1542 SYSTEM_MSG_TYPE_CONNECT_PEER => {
1543 let (peer_id, address) = dactor::proto::decode_connect_peer(&envelope.body)
1544 .map_err(|e| RoutingError::new(format!("decode ConnectPeer: {e}")))?;
1545
1546 refs.node_directory
1547 .cast(crate::system_actors::NodeDirectoryMsg::ConnectPeer {
1548 peer_id,
1549 address,
1550 })
1551 .map_err(|e| RoutingError::new(format!("NodeDirectory mailbox: {e}")))?;
1552
1553 Ok(RoutingOutcome::Acknowledged)
1554 }
1555
1556 SYSTEM_MSG_TYPE_DISCONNECT_PEER => {
1557 let peer_id = dactor::proto::decode_disconnect_peer(&envelope.body)
1558 .map_err(|e| RoutingError::new(format!("decode DisconnectPeer: {e}")))?;
1559
1560 refs.node_directory
1561 .cast(crate::system_actors::NodeDirectoryMsg::DisconnectPeer { peer_id })
1562 .map_err(|e| RoutingError::new(format!("NodeDirectory mailbox: {e}")))?;
1563
1564 Ok(RoutingOutcome::Acknowledged)
1565 }
1566
1567 other => Err(RoutingError::new(format!(
1571 "unhandled system message type: {other}"
1572 ))),
1573 }
1574 }
1575}
1576