1use crate::actor::context::{ActorContext, ActorHandlerContext, ActorStatus};
2use crate::actor::lifecycle::{Status, Stop};
3use crate::actor::message::{ActorMessage, Exec, Handler, Message, MessageHandler};
4use log::error;
5use std::any::Any;
6use uuid::Uuid;
7
8pub mod context;
9pub mod lifecycle;
10pub mod message;
11pub mod scheduler;
12
13pub type ActorId = Uuid;
14
15#[async_trait]
16pub trait Actor {
17 async fn started(&mut self, _ctx: &mut ActorHandlerContext) {}
18
19 async fn stopped(&mut self, _ctx: &mut ActorHandlerContext) {}
20}
21
22pub trait GetActorRef {
23 fn get_ref(&self, ctx: &ActorHandlerContext) -> ActorRef<Self>
24 where
25 Self: Actor + Sized + Send + Sync;
26}
27
28impl<A: Actor> GetActorRef for A {
29 fn get_ref(&self, ctx: &ActorHandlerContext) -> ActorRef<Self>
30 where
31 Self: Sized + Send + Sync,
32 {
33 ctx.actor_ref()
34 }
35}
36
37pub async fn new_actor<A: Actor>(actor: A) -> Result<ActorRef<A>, ActorRefErr>
38where
39 A: 'static + Sync + Send,
40{
41 ActorContext::current_context()
42 .new_tracked_actor(actor)
43 .await
44}
45
46pub async fn get_actor<A: Actor>(id: ActorId) -> Option<ActorRef<A>>
47where
48 A: 'static + Sync + Send,
49{
50 ActorContext::current_context().get_tracked_actor(id).await
51}
52
53#[derive(Clone)]
54pub struct BoxedActorRef {
55 id: Uuid,
56 sender: tokio::sync::mpsc::Sender<Box<dyn Any + Sync + Send>>,
57}
58
59pub struct ActorRef<A: Actor>
60where
61 A: 'static + Send + Sync,
62{
63 pub id: ActorId,
64 sender: tokio::sync::mpsc::Sender<MessageHandler<A>>,
65}
66
67impl<A: Actor> From<BoxedActorRef> for ActorRef<A>
68where
69 A: 'static + Send + Sync,
70{
71 fn from(b: BoxedActorRef) -> Self {
72 ActorRef {
73 id: b.id,
74 sender: unsafe { std::mem::transmute(b.sender) },
75 }
76 }
77}
78
79impl<A: Actor> From<ActorRef<A>> for BoxedActorRef
80where
81 A: 'static + Send + Sync,
82{
83 fn from(r: ActorRef<A>) -> Self {
84 BoxedActorRef {
85 id: r.id,
86 sender: unsafe { std::mem::transmute(r.sender) },
87 }
88 }
89}
90
91impl<A> Clone for ActorRef<A>
92where
93 A: Actor + Sync + Send + 'static,
94{
95 fn clone(&self) -> Self {
96 Self {
97 id: self.id.clone(),
98 sender: self.sender.clone(),
99 }
100 }
101}
102
103#[derive(Debug, Eq, PartialEq)]
104pub enum ActorRefErr {
105 ActorUnavailable,
106}
107
108impl std::fmt::Display for ActorRefErr {
109 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110 match self {
111 ActorRefErr::ActorUnavailable => write!(f, "actor unavailable"),
112 }
113 }
114}
115
116impl std::error::Error for ActorRefErr {}
117
118impl<A: Actor> ActorRef<A>
119where
120 A: Sync + Send + 'static,
121{
122 pub async fn send<Msg: Message>(&mut self, msg: Msg) -> Result<Msg::Result, ActorRefErr>
123 where
124 Msg: 'static + Send + Sync,
125 A: Handler<Msg>,
126 Msg::Result: Send + Sync,
127 {
128 let (tx, rx) = tokio::sync::oneshot::channel();
129 match self
130 .sender
131 .send(Box::new(ActorMessage::new(msg, Some(tx))))
132 .await
133 {
134 Ok(_) => match rx.await {
135 Ok(res) => Ok(res),
136 Err(_e) => {
137 error!(target: "ActorRef", "error receiving result");
138 Err(ActorRefErr::ActorUnavailable)
139 }
140 },
141 Err(_e) => {
142 error!(target: "ActorRef", "error sending message");
143 Err(ActorRefErr::ActorUnavailable)
144 }
145 }
146 }
147
148 pub async fn notify<Msg: Message>(&mut self, msg: Msg) -> Result<(), ActorRefErr>
149 where
150 Msg: 'static + Send + Sync,
151 A: Handler<Msg>,
152 Msg::Result: Send + Sync,
153 {
154 match self
155 .sender
156 .send(Box::new(ActorMessage::new(msg, None)))
157 .await
158 {
159 Ok(_) => Ok(()),
160 Err(_e) => Err(ActorRefErr::ActorUnavailable),
161 }
162 }
163
164 pub async fn exec<F, R>(&mut self, f: F) -> Result<R, ActorRefErr>
165 where
166 F: (FnMut(&mut A) -> R) + 'static + Send + Sync,
167 R: 'static + Send + Sync,
168 {
169 self.send(Exec::new(f)).await
170 }
171
172 pub async fn notify_exec<F>(&mut self, f: F) -> Result<(), ActorRefErr>
173 where
174 F: (FnMut(&mut A) -> ()) + 'static + Send + Sync,
175 {
176 self.notify(Exec::new(f)).await
177 }
178
179 pub async fn status(&mut self) -> Result<ActorStatus, ActorRefErr> {
180 self.send(Status {}).await
181 }
182
183 pub async fn stop(&mut self) -> Result<ActorStatus, ActorRefErr> {
184 self.send(Stop {}).await
185 }
186}