lutetium/
system.rs

1mod extension;
2mod lifecycle;
3mod registry;
4
5pub use self::extension::*;
6
7use std::future::Future;
8use std::sync::Arc;
9
10use crate::actor::refs::ActorRef;
11use crate::actor::{Actor, ActorContext, FromMessage, Message, TryIntoActor};
12use crate::errors::ActorError;
13use crate::identifier::{IntoActorId, ToActorId};
14use crate::system::registry::Registry;
15
16pub struct ActorSystem {
17    pub(crate) ext: Arc<Extensions>,
18    pub(crate) registry: Registry
19}
20
21#[async_trait::async_trait]
22pub trait LutetiumActorSystem: 'static + Sync + Send {
23    async fn spawn<A: Actor>(&self, id: impl IntoActorId, actor: A) -> Result<ActorRef<A>, ActorError>;
24    async fn spawn_from<A: Actor, M: Message>(&self, from: M) -> Result<Result<ActorRef<A>, ActorError>, A::Rejection>
25        where A: FromMessage<M>;
26    async fn try_spawn<A: Actor, T: TryIntoActor<A>>(&self, id: T::Identifier, into: T) -> Result<Result<ActorRef<A>, ActorError>, T::Rejection>;
27    async fn shutdown(&self, id: &impl ToActorId) -> Result<(), ActorError>;
28    async fn shutdown_all(&self) -> Result<(), ActorError>;
29    async fn find<A: Actor>(&self, id: impl ToActorId) -> Result<ActorRef<A>, ActorError>;
30    async fn find_or<A: Actor, I: ToActorId, Fn, Fut>(&self, id: I, or_nothing: Fn) -> Result<ActorRef<A>, ActorError> 
31        where
32            Fn: FnOnce(I) -> Fut + 'static + Sync + Send,
33            Fut: Future<Output = A> + 'static + Sync + Send;
34}
35
36#[async_trait::async_trait]
37impl LutetiumActorSystem for ActorSystem {
38    async fn spawn<A: Actor>(&self, id: impl IntoActorId, actor: A) -> Result<ActorRef<A>, ActorError> {
39        let id = id.into_actor_id();
40        let behavior = Factory::create(actor, id.clone(), self.clone());
41        let registered = self.registry
42            .register(id, behavior)
43            .await?;
44        Ok(registered)
45    }
46
47
48    async fn spawn_from<A: Actor, M: Message>(&self, from: M) -> Result<Result<ActorRef<A>, ActorError>, A::Rejection>
49        where A: FromMessage<M>
50    {
51        let mut ctx = A::Context::track_with_system("prepare", self.clone());
52        let (id, actor) = A::once(from, &mut ctx).await?;
53        let id = id.into_actor_id();
54        let ctx = A::Context::track_with_system(id.clone(), self.clone());
55        let behavior = Behavior::new(actor, ctx);
56        let registered = self.registry
57            .register(id, behavior)
58            .await;
59        Ok(registered)
60    }
61    
62    async fn try_spawn<A: Actor,T: TryIntoActor<A>>(&self, id: T::Identifier, into: T) -> Result<Result<ActorRef<A>, ActorError>, T::Rejection> {
63        let (id, actor) = into.try_into_actor(id)?;
64        Ok(self.spawn(id, actor).await)
65    }
66    
67    async fn shutdown(&self, id: &impl ToActorId) -> Result<(), ActorError> {
68        self.registry
69            .deregister(&id.to_actor_id())
70            .await
71    }
72    
73    async fn shutdown_all(&self) -> Result<(), ActorError> {
74        self.registry
75            .shutdown_all()
76            .await
77    }
78    
79    async fn find<A: Actor>(&self, id: impl ToActorId) -> Result<ActorRef<A>, ActorError> {
80        let id = id.to_actor_id();
81        let Some((_, actor)) = self.registry.find(&id).await else {
82            return Err(ActorError::NotFoundActor { id })
83        };
84        let refs = actor.downcast::<A>()?;
85        Ok(refs)
86    }
87    
88    async fn find_or<A: Actor, I: ToActorId, Fn, Fut>(&self, id: I, or_nothing: Fn) -> Result<ActorRef<A>, ActorError> 
89        where 
90            Fn: FnOnce(I) -> Fut + 'static + Sync + Send,
91            Fut: Future<Output = A> + 'static + Sync + Send
92    {
93        let i = id.to_actor_id();
94        match self.registry.find(&i).await {
95            Some((_, actor)) => {
96                actor.downcast::<A>()
97            },
98            None => {
99                let actor = or_nothing(id).await;
100                self.spawn(i, actor).await
101            }
102        }
103    }
104}
105
106impl ActorSystem {
107    pub fn builder() -> SystemBuilder {
108        SystemBuilder {
109            ext: Default::default(),
110        }
111    }
112}
113
114impl ActorSystem {
115    pub fn extension(&self) -> &Arc<Extensions> {
116        &self.ext
117    }
118}
119
120impl Clone for ActorSystem {
121    fn clone(&self) -> Self {
122        Self { 
123            ext: Arc::clone(&self.ext),
124            registry: self.registry.clone(),
125        }
126    }
127}
128
129pub(crate) struct Factory;
130
131impl Factory {
132    pub fn create<A: Actor>(actor: A, id: impl IntoActorId, system: ActorSystem) -> Behavior<A> {
133        Behavior::new(actor, A::Context::track_with_system(id, system))
134    }
135}
136
137pub(crate) struct Behavior<A: Actor> {
138    actor: A,
139    ctx: A::Context
140}
141
142impl<A: Actor> Behavior<A> {
143    pub fn new(actor: A, ctx: A::Context) -> Self {
144        Self { actor, ctx }
145    }
146}
147
148pub struct SystemBuilder {
149    ext: Extensions
150}
151
152impl SystemBuilder {
153    pub fn extension(&mut self, procedure: impl FnOnce(&mut Extensions)) -> &mut Self {
154        procedure(&mut self.ext);
155        self
156    }
157    
158    pub fn build(self) -> ActorSystem {
159        ActorSystem {
160            ext: Arc::new(self.ext),
161            registry: Registry::default(),
162        }
163    }
164}