1use crate::{
4 ActorPath, Error,
5 handler::HandleHelper,
6 runner::{InnerAction, InnerSender, StopHandle, StopSender},
7 supervision::SupervisionStrategy,
8 system::SystemRef,
9};
10
11use tokio::sync::{broadcast::Receiver as EventReceiver, mpsc, oneshot};
12
13use async_trait::async_trait;
14
15use serde::{Serialize, de::DeserializeOwned};
16use tracing::Span;
17
18use std::{collections::HashMap, fmt::Debug, time::Duration};
19
20pub struct ActorContext<A: Actor + Handler<A>> {
26 stop: StopSender,
27 path: ActorPath,
29 system: SystemRef,
31 error: Option<Error>,
33 error_sender: ChildErrorSender,
35 inner_sender: InnerSender<A>,
37 child_senders: HashMap<ActorPath, StopHandle>,
39
40 span: tracing::Span,
41}
42
43impl<A> ActorContext<A>
44where
45 A: Actor + Handler<A>,
46{
47 pub(crate) fn new(
48 stop: StopSender,
49 path: ActorPath,
50 system: SystemRef,
51 error_sender: ChildErrorSender,
52 inner_sender: InnerSender<A>,
53 span: Span,
54 ) -> Self {
55 Self {
56 span,
57 stop,
58 path,
59 system,
60 error: None,
61 error_sender,
62 inner_sender,
63 child_senders: HashMap::new(),
64 }
65 }
66
67 pub(crate) async fn restart(
68 &mut self,
69 actor: &mut A,
70 error: Option<&Error>,
71 ) -> Result<(), Error>
72 where
73 A: Actor,
74 {
75 tracing::warn!(error = ?error, "Actor restarting");
76 let result = actor.pre_restart(self, error).await;
77 if let Err(ref e) = result {
78 tracing::error!(error = %e, "Actor restart failed");
79 }
80 result
81 }
82 pub async fn reference(&self) -> Result<ActorRef<A>, Error> {
84 self.system.get_actor(&self.path).await
85 }
86
87 pub const fn path(&self) -> &ActorPath {
89 &self.path
90 }
91
92 pub const fn system(&self) -> &SystemRef {
94 &self.system
95 }
96
97 pub async fn get_parent<P: Actor + Handler<P>>(
99 &self,
100 ) -> Result<ActorRef<P>, Error> {
101 self.system.get_actor(&self.path.parent()).await
102 }
103
104 pub(crate) async fn stop_childs(&mut self) {
105 let child_count = self.child_senders.len();
106 if child_count > 0 {
107 tracing::debug!(child_count, "Stopping child actors");
108 }
109
110 let mut receivers = Vec::with_capacity(child_count);
112 for (path, handle) in std::mem::take(&mut self.child_senders) {
113 let (stop_sender, stop_receiver) = oneshot::channel();
114 if handle.sender().send(Some(stop_sender)).await.is_ok() {
115 receivers.push((path, handle.timeout(), stop_receiver));
116 }
117 }
118
119 for (path, timeout, receiver) in receivers {
122 if let Some(timeout) = timeout {
123 if tokio::time::timeout(timeout, receiver).await.is_err() {
124 tracing::warn!(
125 child = %path,
126 timeout_ms = timeout.as_millis(),
127 "Timed out waiting for child actor shutdown acknowledgement"
128 );
129 }
130 } else {
131 let _ = receiver.await;
132 }
133 }
134 }
135
136 pub(crate) async fn remove_actor(&self) {
137 self.system.remove_actor(&self.path).await;
138 }
139
140 pub async fn stop(&self, sender: Option<oneshot::Sender<()>>) {
142 let _ = self.stop.send(sender).await;
143 }
144
145 pub async fn publish_event(&self, event: A::Event) -> Result<(), Error> {
149 self.inner_sender
150 .send(InnerAction::Event(event))
151 .await
152 .map_err(|e| {
153 tracing::error!(error = %e, "Failed to publish event");
154 Error::SendEvent {
155 reason: e.to_string(),
156 }
157 })
158 }
159
160 pub async fn emit_error(&mut self, error: Error) -> Result<(), Error> {
164 tracing::warn!(error = %error, "Emitting error");
165 self.inner_sender
166 .send(InnerAction::Error(error))
167 .await
168 .map_err(|e| {
169 tracing::error!(error = %e, "Failed to emit error");
170 Error::Send {
171 reason: e.to_string(),
172 }
173 })
174 }
175
176 pub async fn emit_fail(&mut self, error: Error) -> Result<(), Error> {
180 tracing::error!(error = %error, "Actor failing");
181 self.set_error(error.clone());
183 self.inner_sender
185 .send(InnerAction::Fail(error.clone()))
186 .await
187 .map_err(|e| {
188 tracing::error!(error = %e, "Failed to emit fail");
189 Error::Send {
190 reason: e.to_string(),
191 }
192 })
193 }
194
195 pub async fn create_child<C, I>(
201 &mut self,
202 name: &str,
203 actor_init: I,
204 ) -> Result<ActorRef<C>, Error>
205 where
206 C: Actor + Handler<C>,
207 I: crate::IntoActor<C>,
208 {
209 tracing::debug!(child_name = %name, "Creating child actor");
210 let actor = actor_init.into_actor();
211 let path = self.path.clone() / name;
212 let result = self
213 .system
214 .create_actor_path(
215 path.clone(),
216 actor,
217 Some(self.error_sender.clone()),
218 C::get_span(name, Some(self.span.clone())),
219 )
220 .await;
221
222 match result {
223 Ok((actor_ref, stop_sender)) => {
224 let child_path = path.clone();
225 self.child_senders.insert(
226 path,
227 StopHandle::new(stop_sender.clone(), C::stop_timeout()),
228 );
229 let inner_sender = self.inner_sender.clone();
230 tokio::spawn(async move {
231 stop_sender.closed().await;
232 let _ = inner_sender
233 .send(InnerAction::ChildStopped(child_path))
234 .await;
235 });
236 tracing::debug!(child_name = %name, "Child actor created");
237 Ok(actor_ref)
238 }
239 Err(e) => {
240 tracing::debug!(child_name = %name, error = %e, "Failed to create child actor");
241 Err(e)
242 }
243 }
244 }
245
246 pub(crate) fn remove_closed_child(&mut self, child_path: &ActorPath) {
247 let should_remove = self
248 .child_senders
249 .get(child_path)
250 .map(StopHandle::is_closed)
251 .unwrap_or(false);
252 if should_remove {
253 self.child_senders.remove(child_path);
254 }
255 }
256
257 pub async fn get_child<C>(&self, name: &str) -> Result<ActorRef<C>, Error>
262 where
263 C: Actor + Handler<C>,
264 {
265 let path = self.path.clone() / name;
266 self.system.get_actor(&path).await
267 }
268
269 pub(crate) fn error(&self) -> Option<Error> {
270 self.error.clone()
271 }
272
273 pub(crate) fn set_error(&mut self, error: Error) {
274 self.error = Some(error);
275 }
276
277 pub(crate) fn clean_error(&mut self) {
278 self.error = None;
279 }
280}
281
282#[derive(Debug, Clone, PartialEq, Eq)]
284pub enum ActorLifecycle {
285 Created,
287 Started,
289 Restarted,
291 Failed,
293 Stopped,
295 Terminated,
297}
298
299#[derive(Debug, Clone)]
301pub enum ChildAction {
302 Stop,
304 Restart,
306 Delegate,
308}
309
310pub type ChildErrorReceiver = mpsc::Receiver<ChildError>;
312
313pub type ChildErrorSender = mpsc::Sender<ChildError>;
315
316pub enum ChildError {
318 Error {
320 error: Error,
322 },
323 Fault {
325 error: Error,
327 sender: oneshot::Sender<ChildAction>,
329 },
330}
331
332#[async_trait]
338pub trait Actor: Send + Sync + Sized + 'static + Handler<Self> {
339 type Message: Message;
341
342 type Event: Event;
344
345 type Response: Response;
347
348 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span;
353
354 fn mailbox_drain_timeout() -> std::time::Duration {
357 std::time::Duration::from_secs(5)
358 }
359
360 fn event_drain_timeout() -> std::time::Duration {
363 std::time::Duration::from_secs(5)
364 }
365
366 fn startup_timeout() -> Option<Duration> {
368 None
369 }
370
371 fn stop_timeout() -> Option<Duration> {
373 None
374 }
375
376 fn supervision_strategy() -> SupervisionStrategy {
378 SupervisionStrategy::Stop
379 }
380
381 async fn pre_start(
387 &mut self,
388 _context: &mut ActorContext<Self>,
389 ) -> Result<(), Error> {
390 Ok(())
391 }
392
393 async fn pre_restart(
399 &mut self,
400 ctx: &mut ActorContext<Self>,
401 _error: Option<&Error>,
402 ) -> Result<(), Error> {
403 self.pre_start(ctx).await
404 }
405
406 async fn pre_stop(
411 &mut self,
412 _ctx: &mut ActorContext<Self>,
413 ) -> Result<(), Error> {
414 Ok(())
415 }
416
417 async fn post_stop(
419 &mut self,
420 _ctx: &mut ActorContext<Self>,
421 ) -> Result<(), Error> {
422 Ok(())
423 }
424
425 fn from_response(_response: Self::Response) -> Result<Self::Event, Error> {
427 Err(Error::Functional {
428 description: "Not implemented".to_string(),
429 })
430 }
431}
432
433pub trait Event:
435 Serialize + DeserializeOwned + Debug + Clone + Send + Sync + 'static
436{
437}
438
439pub trait Message: Clone + Send + Sync + 'static {
441 fn is_critical(&self) -> bool {
443 false
444 }
445}
446
447pub trait Response: Send + Sync + 'static {}
449
450impl Response for () {}
451impl Event for () {}
452impl Message for () {}
453
454#[async_trait]
459pub trait Handler<A: Actor + Handler<A>>: Send + Sync {
460 async fn handle_message(
466 &mut self,
467 sender: ActorPath,
468 msg: A::Message,
469 ctx: &mut ActorContext<A>,
470 ) -> Result<A::Response, Error>;
471
472 async fn on_event(&mut self, _event: A::Event, _ctx: &mut ActorContext<A>) {
474 }
476
477 async fn on_child_error(
482 &mut self,
483 error: Error,
484 _ctx: &mut ActorContext<A>,
485 ) {
486 tracing::error!(error = %error, "Child actor error");
487 }
488
489 async fn on_child_fault(
495 &mut self,
496 error: Error,
497 _ctx: &mut ActorContext<A>,
498 ) -> ChildAction {
499 tracing::error!(error = %error, "Child actor fault, stopping child");
500 ChildAction::Stop
502 }
503}
504
505pub struct ActorRef<A>
512where
513 A: Actor + Handler<A>,
514{
515 path: ActorPath,
517 sender: HandleHelper<A>,
519 event_receiver: EventReceiver<<A as Actor>::Event>,
521 stop_sender: StopSender,
523}
524
525impl<A> ActorRef<A>
526where
527 A: Actor + Handler<A>,
528{
529 pub const fn new(
530 path: ActorPath,
531 sender: HandleHelper<A>,
532 stop_sender: StopSender,
533 event_receiver: EventReceiver<<A as Actor>::Event>,
534 ) -> Self {
535 Self {
536 path,
537 sender,
538 stop_sender,
539 event_receiver,
540 }
541 }
542
543 pub async fn tell(&self, message: A::Message) -> Result<(), Error> {
545 self.sender.tell(self.path(), message).await
546 }
547
548 pub async fn ask(&self, message: A::Message) -> Result<A::Response, Error> {
553 self.sender.ask(self.path(), message).await
554 }
555
556 pub async fn ask_timeout(
558 &self,
559 message: A::Message,
560 timeout: std::time::Duration,
561 ) -> Result<A::Response, Error> {
562 tokio::time::timeout(timeout, self.sender.ask(self.path(), message))
563 .await
564 .map_err(|_| Error::Timeout {
565 ms: timeout.as_millis(),
566 })?
567 }
568
569 pub async fn ask_stop(&self) -> Result<(), Error> {
575 tracing::debug!("Stopping actor");
576 let (response_sender, response_receiver) = oneshot::channel();
577
578 if self.stop_sender.send(Some(response_sender)).await.is_err() {
579 Ok(())
580 } else {
581 response_receiver.await.map_err(|error| {
582 tracing::error!(error = %error, "Failed to confirm actor stop");
583 Error::Send {
584 reason: error.to_string(),
585 }
586 })
587 }
588 }
589
590 pub async fn tell_stop(&self) {
592 let _ = self.stop_sender.send(None).await;
593 }
594
595 pub fn path(&self) -> ActorPath {
597 self.path.clone()
598 }
599
600 pub fn is_closed(&self) -> bool {
602 self.sender.is_closed()
603 }
604
605 pub async fn closed(&self) {
607 self.sender.close().await;
608 }
609
610 pub fn subscribe(&self) -> EventReceiver<<A as Actor>::Event> {
615 self.event_receiver.resubscribe()
616 }
617}
618
619impl<A> Clone for ActorRef<A>
620where
621 A: Actor + Handler<A>,
622{
623 fn clone(&self) -> Self {
624 Self {
625 path: self.path.clone(),
626 sender: self.sender.clone(),
627 stop_sender: self.stop_sender.clone(),
628 event_receiver: self.event_receiver.resubscribe(),
629 }
630 }
631}
632
633#[cfg(test)]
634mod test {
635
636 use super::*;
637 use test_log::test;
638
639 use crate::sink::{Sink, Subscriber};
640
641 use serde::{Deserialize, Serialize};
642 use tokio::sync::mpsc;
643 use tokio_util::sync::CancellationToken;
644 use tracing::info_span;
645
646 #[derive(Debug, Clone)]
647 struct TestActor {
648 counter: usize,
649 }
650
651 impl crate::NotPersistentActor for TestActor {}
652
653 #[derive(Debug, Clone, Serialize, Deserialize)]
654 struct TestMessage(usize);
655
656 impl Message for TestMessage {}
657
658 #[derive(Debug, Clone, Serialize, Deserialize)]
659 struct TestResponse(usize);
660
661 impl Response for TestResponse {}
662
663 #[derive(Debug, Clone, Serialize, Deserialize)]
664 struct TestEvent(usize);
665
666 impl Event for TestEvent {}
667
668 #[async_trait]
669 impl Actor for TestActor {
670 type Message = TestMessage;
671 type Event = TestEvent;
672 type Response = TestResponse;
673
674 fn get_span(
675 id: &str,
676 _parent_span: Option<tracing::Span>,
677 ) -> tracing::Span {
678 info_span!("TestActor", id = %id)
679 }
680 }
681
682 #[async_trait]
683 impl Handler<TestActor> for TestActor {
684 async fn handle_message(
685 &mut self,
686 _sender: ActorPath,
687 msg: TestMessage,
688 ctx: &mut ActorContext<TestActor>,
689 ) -> Result<TestResponse, Error> {
690 if ctx.get_parent::<TestActor>().await.is_ok() {
691 panic!("Is not a root actor");
692 }
693
694 let value = msg.0;
695 self.counter += value;
696 ctx.publish_event(TestEvent(self.counter)).await.unwrap();
697 Ok(TestResponse(self.counter))
698 }
699 }
700
701 pub struct TestSubscriber;
702
703 #[async_trait]
704 impl Subscriber<TestEvent> for TestSubscriber {
705 async fn notify(&self, event: TestEvent) {
706 assert!(event.0 > 0);
707 }
708 }
709
710 #[test(tokio::test)]
711 async fn test_actor() {
712 let (event_sender, _event_receiver) = mpsc::channel(100);
713 let system = SystemRef::new(
714 event_sender,
715 CancellationToken::new(),
716 CancellationToken::new(),
717 );
718 let actor = TestActor { counter: 0 };
719 let actor_ref = system.create_root_actor("test", actor).await.unwrap();
720
721 let sink = Sink::new(actor_ref.subscribe(), TestSubscriber);
722 system.run_sink(sink).await;
723
724 actor_ref.tell(TestMessage(10)).await.unwrap();
725 let mut recv = actor_ref.subscribe();
726 let response = actor_ref.ask(TestMessage(10)).await.unwrap();
727 assert_eq!(response.0, 20);
728 let event = recv.recv().await.unwrap();
729 assert_eq!(event.0, 10);
730 let event = recv.recv().await.unwrap();
731 assert_eq!(event.0, 20);
732 actor_ref.ask_stop().await.unwrap();
733 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
734 }
735}