Skip to main content

coerce_rt/actor/
mod.rs

1use crate::actor::context::{ActorContext, ActorHandlerContext, ActorStatus};
2use crate::actor::lifecycle::{Status, Stop};
3use crate::actor::message::{ActorMessage, Exec, Handler, Message, MessageHandler};
4use log::error;
5use std::any::Any;
6use uuid::Uuid;
7
8pub mod context;
9pub mod lifecycle;
10pub mod message;
11pub mod scheduler;
12
13pub type ActorId = Uuid;
14
15#[async_trait]
16pub trait Actor {
17    async fn started(&mut self, _ctx: &mut ActorHandlerContext) {}
18
19    async fn stopped(&mut self, _ctx: &mut ActorHandlerContext) {}
20}
21
22pub trait GetActorRef {
23    fn get_ref(&self, ctx: &ActorHandlerContext) -> ActorRef<Self>
24    where
25        Self: Actor + Sized + Send + Sync;
26}
27
28impl<A: Actor> GetActorRef for A {
29    fn get_ref(&self, ctx: &ActorHandlerContext) -> ActorRef<Self>
30    where
31        Self: Sized + Send + Sync,
32    {
33        ctx.actor_ref()
34    }
35}
36
37pub async fn new_actor<A: Actor>(actor: A) -> Result<ActorRef<A>, ActorRefErr>
38where
39    A: 'static + Sync + Send,
40{
41    ActorContext::current_context()
42        .new_tracked_actor(actor)
43        .await
44}
45
46pub async fn get_actor<A: Actor>(id: ActorId) -> Option<ActorRef<A>>
47where
48    A: 'static + Sync + Send,
49{
50    ActorContext::current_context().get_tracked_actor(id).await
51}
52
53#[derive(Clone)]
54pub struct BoxedActorRef {
55    id: Uuid,
56    sender: tokio::sync::mpsc::Sender<Box<dyn Any + Sync + Send>>,
57}
58
59pub struct ActorRef<A: Actor>
60where
61    A: 'static + Send + Sync,
62{
63    pub id: ActorId,
64    sender: tokio::sync::mpsc::Sender<MessageHandler<A>>,
65}
66
67impl<A: Actor> From<BoxedActorRef> for ActorRef<A>
68where
69    A: 'static + Send + Sync,
70{
71    fn from(b: BoxedActorRef) -> Self {
72        ActorRef {
73            id: b.id,
74            sender: unsafe { std::mem::transmute(b.sender) },
75        }
76    }
77}
78
79impl<A: Actor> From<ActorRef<A>> for BoxedActorRef
80where
81    A: 'static + Send + Sync,
82{
83    fn from(r: ActorRef<A>) -> Self {
84        BoxedActorRef {
85            id: r.id,
86            sender: unsafe { std::mem::transmute(r.sender) },
87        }
88    }
89}
90
91impl<A> Clone for ActorRef<A>
92where
93    A: Actor + Sync + Send + 'static,
94{
95    fn clone(&self) -> Self {
96        Self {
97            id: self.id.clone(),
98            sender: self.sender.clone(),
99        }
100    }
101}
102
103#[derive(Debug, Eq, PartialEq)]
104pub enum ActorRefErr {
105    ActorUnavailable,
106}
107
108impl std::fmt::Display for ActorRefErr {
109    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110        match self {
111            ActorRefErr::ActorUnavailable => write!(f, "actor unavailable"),
112        }
113    }
114}
115
116impl std::error::Error for ActorRefErr {}
117
118impl<A: Actor> ActorRef<A>
119where
120    A: Sync + Send + 'static,
121{
122    pub async fn send<Msg: Message>(&mut self, msg: Msg) -> Result<Msg::Result, ActorRefErr>
123    where
124        Msg: 'static + Send + Sync,
125        A: Handler<Msg>,
126        Msg::Result: Send + Sync,
127    {
128        let (tx, rx) = tokio::sync::oneshot::channel();
129        match self
130            .sender
131            .send(Box::new(ActorMessage::new(msg, Some(tx))))
132            .await
133        {
134            Ok(_) => match rx.await {
135                Ok(res) => Ok(res),
136                Err(_e) => {
137                    error!(target: "ActorRef", "error receiving result");
138                    Err(ActorRefErr::ActorUnavailable)
139                }
140            },
141            Err(_e) => {
142                error!(target: "ActorRef", "error sending message");
143                Err(ActorRefErr::ActorUnavailable)
144            }
145        }
146    }
147
148    pub async fn notify<Msg: Message>(&mut self, msg: Msg) -> Result<(), ActorRefErr>
149    where
150        Msg: 'static + Send + Sync,
151        A: Handler<Msg>,
152        Msg::Result: Send + Sync,
153    {
154        match self
155            .sender
156            .send(Box::new(ActorMessage::new(msg, None)))
157            .await
158        {
159            Ok(_) => Ok(()),
160            Err(_e) => Err(ActorRefErr::ActorUnavailable),
161        }
162    }
163
164    pub async fn exec<F, R>(&mut self, f: F) -> Result<R, ActorRefErr>
165    where
166        F: (FnMut(&mut A) -> R) + 'static + Send + Sync,
167        R: 'static + Send + Sync,
168    {
169        self.send(Exec::new(f)).await
170    }
171
172    pub async fn notify_exec<F>(&mut self, f: F) -> Result<(), ActorRefErr>
173    where
174        F: (FnMut(&mut A) -> ()) + 'static + Send + Sync,
175    {
176        self.notify(Exec::new(f)).await
177    }
178
179    pub async fn status(&mut self) -> Result<ActorStatus, ActorRefErr> {
180        self.send(Status {}).await
181    }
182
183    pub async fn stop(&mut self) -> Result<ActorStatus, ActorRefErr> {
184        self.send(Stop {}).await
185    }
186}