1use crate::{
4 ActorPath, Error,
5 handler::HandleHelper,
6 runner::{InnerAction, InnerSender, StopHandle, StopSender},
7 supervision::SupervisionStrategy,
8 system::SystemRef,
9};
10
11use tokio::sync::{broadcast::Receiver as EventReceiver, mpsc, oneshot};
12
13use async_trait::async_trait;
14
15use serde::{Serialize, de::DeserializeOwned};
16use tracing::Span;
17
18use std::{collections::HashMap, fmt::Debug, time::Duration};
19
20pub struct ActorContext<A: Actor + Handler<A>> {
26 stop: StopSender,
27 path: ActorPath,
29 system: SystemRef,
31 error: Option<Error>,
33 error_sender: ChildErrorSender,
35 inner_sender: InnerSender<A>,
37 child_senders: HashMap<ActorPath, StopHandle>,
39
40 span: tracing::Span,
41}
42
43impl<A> ActorContext<A>
44where
45 A: Actor + Handler<A>,
46{
47 pub(crate) fn new(
48 stop: StopSender,
49 path: ActorPath,
50 system: SystemRef,
51 error_sender: ChildErrorSender,
52 inner_sender: InnerSender<A>,
53 span: Span,
54 ) -> Self {
55 Self {
56 span,
57 stop,
58 path,
59 system,
60 error: None,
61 error_sender,
62 inner_sender,
63 child_senders: HashMap::new(),
64 }
65 }
66
67 pub(crate) async fn restart(
68 &mut self,
69 actor: &mut A,
70 error: Option<&Error>,
71 ) -> Result<(), Error>
72 where
73 A: Actor,
74 {
75 tracing::warn!(error = ?error, "Actor restarting");
76 let result = actor.pre_restart(self, error).await;
77 if let Err(ref e) = result {
78 tracing::error!(error = %e, "Actor restart failed");
79 }
80 result
81 }
82 pub async fn reference(&self) -> Result<ActorRef<A>, Error> {
84 self.system.get_actor(&self.path).await
85 }
86
87 pub const fn path(&self) -> &ActorPath {
89 &self.path
90 }
91
92 pub const fn system(&self) -> &SystemRef {
94 &self.system
95 }
96
97 pub async fn get_parent<P: Actor + Handler<P>>(
99 &self,
100 ) -> Result<ActorRef<P>, Error> {
101 self.system.get_actor(&self.path.parent()).await
102 }
103
104 pub(crate) async fn stop_childs(&mut self) {
105 let child_count = self.child_senders.len();
106 if child_count > 0 {
107 tracing::debug!(child_count, "Stopping child actors");
108 }
109
110 let mut receivers = Vec::with_capacity(child_count);
112 for (path, handle) in std::mem::take(&mut self.child_senders) {
113 let (stop_sender, stop_receiver) = oneshot::channel();
114 if handle.sender().send(Some(stop_sender)).await.is_ok() {
115 receivers.push((path, handle.timeout(), stop_receiver));
116 }
117 }
118
119 for (path, timeout, receiver) in receivers {
122 if let Some(timeout) = timeout {
123 if tokio::time::timeout(timeout, receiver).await.is_err() {
124 tracing::warn!(
125 child = %path,
126 timeout_ms = timeout.as_millis(),
127 "Timed out waiting for child actor shutdown acknowledgement"
128 );
129 }
130 } else {
131 let _ = receiver.await;
132 }
133 }
134 }
135
136 pub(crate) async fn remove_actor(&self) {
137 self.system.remove_actor(&self.path).await;
138 }
139
140 pub async fn stop(&self, sender: Option<oneshot::Sender<()>>) {
142 let _ = self.stop.send(sender).await;
143 }
144
145 pub async fn publish_event(&self, event: A::Event) -> Result<(), Error> {
149 self.inner_sender
150 .send(InnerAction::Event(event))
151 .await
152 .map_err(|e| {
153 tracing::error!(error = %e, "Failed to publish event");
154 Error::SendEvent {
155 reason: e.to_string(),
156 }
157 })
158 }
159
160 pub async fn emit_error(&mut self, error: Error) -> Result<(), Error> {
164 tracing::warn!(error = %error, "Emitting error");
165 self.inner_sender
166 .send(InnerAction::Error(error))
167 .await
168 .map_err(|e| {
169 tracing::error!(error = %e, "Failed to emit error");
170 Error::Send {
171 reason: e.to_string(),
172 }
173 })
174 }
175
176 pub async fn emit_fail(&mut self, error: Error) -> Result<(), Error> {
180 tracing::error!(error = %error, "Actor failing");
181 self.set_error(error.clone());
183 self.inner_sender
185 .send(InnerAction::Fail(error.clone()))
186 .await
187 .map_err(|e| {
188 tracing::error!(error = %e, "Failed to emit fail");
189 Error::Send {
190 reason: e.to_string(),
191 }
192 })
193 }
194
195 pub async fn create_child<C, I>(
201 &mut self,
202 name: &str,
203 actor_init: I,
204 ) -> Result<ActorRef<C>, Error>
205 where
206 C: Actor + Handler<C>,
207 I: crate::IntoActor<C>,
208 {
209 tracing::debug!(child_name = %name, "Creating child actor");
210 let actor = actor_init.into_actor();
211 let path = self.path.clone() / name;
212 let result = self
213 .system
214 .create_actor_path(
215 path.clone(),
216 actor,
217 Some(self.error_sender.clone()),
218 C::get_span(name, Some(self.span.clone())),
219 )
220 .await;
221
222 match result {
223 Ok((actor_ref, stop_sender)) => {
224 let child_path = path.clone();
225 self.child_senders.insert(
226 path,
227 StopHandle::new(stop_sender.clone(), C::stop_timeout()),
228 );
229 let inner_sender = self.inner_sender.clone();
230 tokio::spawn(async move {
231 stop_sender.closed().await;
232 let _ = inner_sender
233 .send(InnerAction::ChildStopped(child_path))
234 .await;
235 });
236 tracing::debug!(child_name = %name, "Child actor created");
237 Ok(actor_ref)
238 }
239 Err(e) => {
240 tracing::debug!(child_name = %name, error = %e, "Failed to create child actor");
241 Err(e)
242 }
243 }
244 }
245
246 pub(crate) fn remove_closed_child(&mut self, child_path: &ActorPath) {
247 let should_remove = self
248 .child_senders
249 .get(child_path)
250 .map(StopHandle::is_closed)
251 .unwrap_or(false);
252 if should_remove {
253 self.child_senders.remove(child_path);
254 }
255 }
256
257 pub async fn get_child<C>(&self, name: &str) -> Result<ActorRef<C>, Error>
262 where
263 C: Actor + Handler<C>,
264 {
265 let path = self.path.clone() / name;
266 self.system.get_actor(&path).await
267 }
268
269 pub(crate) fn error(&self) -> Option<Error> {
270 self.error.clone()
271 }
272
273 pub(crate) fn set_error(&mut self, error: Error) {
274 self.error = Some(error);
275 }
276
277 pub(crate) fn clean_error(&mut self) {
278 self.error = None;
279 }
280}
281
282#[derive(Debug, Clone, PartialEq, Eq)]
284pub enum ActorLifecycle {
285 Created,
287 Started,
289 Restarted,
291 Failed,
293 Stopped,
295 Terminated,
297}
298
299#[derive(Debug, Clone)]
301pub enum ChildAction {
302 Stop,
304 Restart,
306 Delegate,
308}
309
310pub type ChildErrorReceiver = mpsc::Receiver<ChildError>;
312
313pub type ChildErrorSender = mpsc::Sender<ChildError>;
315
316pub enum ChildError {
318 Error {
320 error: Error,
322 },
323 Fault {
325 error: Error,
327 sender: oneshot::Sender<ChildAction>,
329 },
330}
331
332#[async_trait]
338pub trait Actor: Send + Sync + Sized + 'static + Handler<Self> {
339 type Message: Message;
341
342 type Event: Event;
344
345 type Response: Response;
347
348 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span;
353
354 fn drain_timeout() -> std::time::Duration {
356 std::time::Duration::from_secs(5)
357 }
358
359 fn startup_timeout() -> Option<Duration> {
361 None
362 }
363
364 fn stop_timeout() -> Option<Duration> {
366 None
367 }
368
369 fn supervision_strategy() -> SupervisionStrategy {
371 SupervisionStrategy::Stop
372 }
373
374 async fn pre_start(
380 &mut self,
381 _context: &mut ActorContext<Self>,
382 ) -> Result<(), Error> {
383 Ok(())
384 }
385
386 async fn pre_restart(
392 &mut self,
393 ctx: &mut ActorContext<Self>,
394 _error: Option<&Error>,
395 ) -> Result<(), Error> {
396 self.pre_start(ctx).await
397 }
398
399 async fn pre_stop(
404 &mut self,
405 _ctx: &mut ActorContext<Self>,
406 ) -> Result<(), Error> {
407 Ok(())
408 }
409
410 async fn post_stop(
412 &mut self,
413 _ctx: &mut ActorContext<Self>,
414 ) -> Result<(), Error> {
415 Ok(())
416 }
417
418 fn from_response(_response: Self::Response) -> Result<Self::Event, Error> {
420 Err(Error::Functional {
421 description: "Not implemented".to_string(),
422 })
423 }
424}
425
426pub trait Event:
428 Serialize + DeserializeOwned + Debug + Clone + Send + Sync + 'static
429{
430}
431
432pub trait Message: Clone + Send + Sync + 'static {
434 fn is_critical(&self) -> bool {
436 false
437 }
438}
439
440pub trait Response: Send + Sync + 'static {}
442
443impl Response for () {}
444impl Event for () {}
445impl Message for () {}
446
447#[async_trait]
452pub trait Handler<A: Actor + Handler<A>>: Send + Sync {
453 async fn handle_message(
459 &mut self,
460 sender: ActorPath,
461 msg: A::Message,
462 ctx: &mut ActorContext<A>,
463 ) -> Result<A::Response, Error>;
464
465 async fn on_event(&mut self, _event: A::Event, _ctx: &mut ActorContext<A>) {
467 }
469
470 async fn on_child_error(
475 &mut self,
476 error: Error,
477 _ctx: &mut ActorContext<A>,
478 ) {
479 tracing::error!(error = %error, "Child actor error");
480 }
481
482 async fn on_child_fault(
488 &mut self,
489 error: Error,
490 _ctx: &mut ActorContext<A>,
491 ) -> ChildAction {
492 tracing::error!(error = %error, "Child actor fault, stopping child");
493 ChildAction::Stop
495 }
496}
497
498pub struct ActorRef<A>
505where
506 A: Actor + Handler<A>,
507{
508 path: ActorPath,
510 sender: HandleHelper<A>,
512 event_receiver: EventReceiver<<A as Actor>::Event>,
514 stop_sender: StopSender,
516}
517
518impl<A> ActorRef<A>
519where
520 A: Actor + Handler<A>,
521{
522 pub const fn new(
523 path: ActorPath,
524 sender: HandleHelper<A>,
525 stop_sender: StopSender,
526 event_receiver: EventReceiver<<A as Actor>::Event>,
527 ) -> Self {
528 Self {
529 path,
530 sender,
531 stop_sender,
532 event_receiver,
533 }
534 }
535
536 pub async fn tell(&self, message: A::Message) -> Result<(), Error> {
538 self.sender.tell(self.path(), message).await
539 }
540
541 pub async fn ask(&self, message: A::Message) -> Result<A::Response, Error> {
546 self.sender.ask(self.path(), message).await
547 }
548
549 pub async fn ask_timeout(
551 &self,
552 message: A::Message,
553 timeout: std::time::Duration,
554 ) -> Result<A::Response, Error> {
555 tokio::time::timeout(timeout, self.sender.ask(self.path(), message))
556 .await
557 .map_err(|_| Error::Timeout {
558 ms: timeout.as_millis(),
559 })?
560 }
561
562 pub async fn ask_stop(&self) -> Result<(), Error> {
568 tracing::debug!("Stopping actor");
569 let (response_sender, response_receiver) = oneshot::channel();
570
571 if self.stop_sender.send(Some(response_sender)).await.is_err() {
572 Ok(())
573 } else {
574 response_receiver.await.map_err(|error| {
575 tracing::error!(error = %error, "Failed to confirm actor stop");
576 Error::Send {
577 reason: error.to_string(),
578 }
579 })
580 }
581 }
582
583 pub async fn tell_stop(&self) {
585 let _ = self.stop_sender.send(None).await;
586 }
587
588 pub fn path(&self) -> ActorPath {
590 self.path.clone()
591 }
592
593 pub fn is_closed(&self) -> bool {
595 self.sender.is_closed()
596 }
597
598 pub async fn closed(&self) {
600 self.sender.close().await;
601 }
602
603 pub fn subscribe(&self) -> EventReceiver<<A as Actor>::Event> {
608 self.event_receiver.resubscribe()
609 }
610}
611
612impl<A> Clone for ActorRef<A>
613where
614 A: Actor + Handler<A>,
615{
616 fn clone(&self) -> Self {
617 Self {
618 path: self.path.clone(),
619 sender: self.sender.clone(),
620 stop_sender: self.stop_sender.clone(),
621 event_receiver: self.event_receiver.resubscribe(),
622 }
623 }
624}
625
626#[cfg(test)]
627mod test {
628
629 use super::*;
630 use test_log::test;
631
632 use crate::sink::{Sink, Subscriber};
633
634 use serde::{Deserialize, Serialize};
635 use tokio::sync::mpsc;
636 use tokio_util::sync::CancellationToken;
637 use tracing::info_span;
638
639 #[derive(Debug, Clone)]
640 struct TestActor {
641 counter: usize,
642 }
643
644 impl crate::NotPersistentActor for TestActor {}
645
646 #[derive(Debug, Clone, Serialize, Deserialize)]
647 struct TestMessage(usize);
648
649 impl Message for TestMessage {}
650
651 #[derive(Debug, Clone, Serialize, Deserialize)]
652 struct TestResponse(usize);
653
654 impl Response for TestResponse {}
655
656 #[derive(Debug, Clone, Serialize, Deserialize)]
657 struct TestEvent(usize);
658
659 impl Event for TestEvent {}
660
661 #[async_trait]
662 impl Actor for TestActor {
663 type Message = TestMessage;
664 type Event = TestEvent;
665 type Response = TestResponse;
666
667 fn get_span(
668 id: &str,
669 _parent_span: Option<tracing::Span>,
670 ) -> tracing::Span {
671 info_span!("TestActor", id = %id)
672 }
673 }
674
675 #[async_trait]
676 impl Handler<TestActor> for TestActor {
677 async fn handle_message(
678 &mut self,
679 _sender: ActorPath,
680 msg: TestMessage,
681 ctx: &mut ActorContext<TestActor>,
682 ) -> Result<TestResponse, Error> {
683 if ctx.get_parent::<TestActor>().await.is_ok() {
684 panic!("Is not a root actor");
685 }
686
687 let value = msg.0;
688 self.counter += value;
689 ctx.publish_event(TestEvent(self.counter)).await.unwrap();
690 Ok(TestResponse(self.counter))
691 }
692 }
693
694 pub struct TestSubscriber;
695
696 #[async_trait]
697 impl Subscriber<TestEvent> for TestSubscriber {
698 async fn notify(&self, event: TestEvent) {
699 assert!(event.0 > 0);
700 }
701 }
702
703 #[test(tokio::test)]
704 async fn test_actor() {
705 let (event_sender, _event_receiver) = mpsc::channel(100);
706 let system = SystemRef::new(
707 event_sender,
708 CancellationToken::new(),
709 CancellationToken::new(),
710 );
711 let actor = TestActor { counter: 0 };
712 let actor_ref = system.create_root_actor("test", actor).await.unwrap();
713
714 let sink = Sink::new(actor_ref.subscribe(), TestSubscriber);
715 system.run_sink(sink).await;
716
717 actor_ref.tell(TestMessage(10)).await.unwrap();
718 let mut recv = actor_ref.subscribe();
719 let response = actor_ref.ask(TestMessage(10)).await.unwrap();
720 assert_eq!(response.0, 20);
721 let event = recv.recv().await.unwrap();
722 assert_eq!(event.0, 10);
723 let event = recv.recv().await.unwrap();
724 assert_eq!(event.0, 20);
725 actor_ref.ask_stop().await.unwrap();
726 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
727 }
728}