agner_actors/
system.rs

1use std::any::Any;
2use std::future::Future;
3use std::sync::atomic::AtomicUsize;
4use std::sync::{Arc, Weak};
5
6use agner_utils::std_error_pp::StdErrorPP;
7use futures::{stream, Stream, StreamExt};
8use tokio::sync::{mpsc, oneshot, RwLock};
9use tracing::Instrument;
10
11use crate::actor::Actor;
12use crate::actor_id::ActorID;
13use crate::actor_runner::sys_msg::{ActorInfo, SysMsg};
14use crate::actor_runner::ActorRunner;
15use crate::exit::Exit;
16use crate::exit_handler::ExitHandler;
17use crate::spawn_opts::SpawnOpts;
18use crate::system_config::SystemConfig;
19
20mod actor_entry;
21mod sys_actor_entry;
22use actor_entry::ActorEntry;
23
24mod actor_id_pool;
25use actor_id_pool::ActorIDPool;
26
27mod errors;
28pub use errors::{SysChannelError, SysSpawnError};
29
30pub type ActorChannel<M> = mpsc::UnboundedSender<M>;
31
32/// A [`System`](crate::system::System) is a scope within which the actors run.
33#[derive(Debug, Clone)]
34pub struct System(Arc<Inner>);
35
36impl System {
37    pub fn rc_downgrade(&self) -> SystemWeakRef {
38        SystemWeakRef(Arc::downgrade(&self.0))
39    }
40}
41
42#[derive(Debug, Clone)]
43pub struct SystemWeakRef(Weak<Inner>);
44impl SystemWeakRef {
45    pub fn rc_upgrade(&self) -> Option<System> {
46        self.0.upgrade().map(System)
47    }
48}
49
50impl System {
51    /// Create a new [`System`] using the provided config.
52    pub fn new(config: SystemConfig) -> Self {
53        static NEXT_SYSTEM_ID: AtomicUsize = AtomicUsize::new(1);
54
55        let system_id = NEXT_SYSTEM_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
56
57        let actor_id_pool = ActorIDPool::new(system_id, config.max_actors);
58        let actor_entries =
59            (0..config.max_actors).map(|_| RwLock::new(Default::default())).collect();
60
61        let exit_handler = config.exit_handler.to_owned();
62
63        let inner = Inner { config, system_id, actor_id_pool, actor_entries, exit_handler };
64        Self(Arc::new(inner))
65    }
66
67    /// The config with which this [`System`] was created.
68    pub fn config(&self) -> &SystemConfig {
69        &self.0.config
70    }
71}
72
73impl System {
74    /// Spawn an actor
75    ///
76    /// Example:
77    /// ```
78    /// use agner_actors::{System, Context, Event};
79    ///
80    /// async fn actor_behaviour(context: &mut Context<&'static str>, actor_name: &'static str) {
81    ///     loop {
82    ///         if let Event::Message(message) = context.next_event().await {
83    ///             eprintln!("[{}] received: {}", actor_name, message);
84    ///         }
85    ///     }
86    /// }
87    /// let _ = async {
88    ///     let system = System::new(Default::default());
89    ///
90    ///     let alice = system.spawn(actor_behaviour, "Alice", Default::default()).await.expect("Failed to spawn an actor");
91    ///     let bob = system.spawn(actor_behaviour, "Bob", Default::default()).await.expect("Failed to spawn an actor");
92    /// };
93    /// ```
94    #[tracing::instrument(skip_all, fields(
95        sys_id = self.0.system_id,
96        behaviour = std::any::type_name::<Behaviour>(),
97    ))]
98    pub async fn spawn<Behaviour, Args, Message>(
99        &self,
100        behaviour: Behaviour,
101        args: Args,
102        mut spawn_opts: SpawnOpts,
103    ) -> Result<ActorID, SysSpawnError>
104    where
105        Args: Send + 'static,
106        Message: Unpin + Send + 'static,
107        for<'a> Behaviour: Actor<'a, Args, Message>,
108    {
109        let exit_handler =
110            spawn_opts.take_exit_handler().unwrap_or_else(|| self.0.exit_handler.to_owned());
111
112        let system = self.to_owned();
113        let actor_id_lease =
114            system.0.actor_id_pool.acquire_id().ok_or(SysSpawnError::MaxActorsLimit)?;
115        let actor_id = *actor_id_lease;
116
117        let (messages_tx, messages_rx) = mpsc::unbounded_channel::<Message>();
118        let (sys_msg_tx, sys_msg_rx) = mpsc::unbounded_channel();
119
120        let actor = ActorRunner {
121            actor_id,
122            system_opt: system.rc_downgrade(),
123            messages_rx,
124            sys_msg_rx,
125            sys_msg_tx: sys_msg_tx.to_owned(),
126            exit_handler,
127            spawn_opts,
128        };
129        tokio::spawn(actor.run(behaviour, args));
130
131        let entry = ActorEntry::new(actor_id_lease, messages_tx, sys_msg_tx);
132        // let entry = ActorEntryOld { actor_id_lease, messages_tx: Box::new(messages_tx),
133        // sys_msg_tx };
134
135        self.actor_entry_put(entry).await;
136
137        Ok(actor_id)
138    }
139
140    /// Send SigExit to the specified actor.
141    #[tracing::instrument(skip_all, fields(
142        sys_id = self.0.system_id,
143        actor_id = display(actor_id),
144        exit_reason = display(exit_reason.pp())
145    ))]
146    pub async fn exit(&self, actor_id: ActorID, exit_reason: Exit) {
147        self.send_sys_msg(actor_id, SysMsg::SigExit(actor_id, exit_reason)).await;
148    }
149
150    /// Wait for the specified actor to terminate, and return upon its termination the
151    /// [`Exit`](crate::exit::Exit). In case the actor with the specified `actor_id` does not exist
152    /// — return [`Exit::no_actor()`](`crate::exit::Exit::no_actor`) right away.
153    pub fn wait(&self, actor_id: ActorID) -> impl Future<Output = Exit> {
154        let sys = self.clone();
155        async move {
156            let (tx, rx) = oneshot::channel();
157
158            if let Some(mut entry) = sys.actor_entry_write(actor_id).await {
159                entry.add_watch(tx);
160            } else {
161                tracing::warn!("attempt to install a watch before the ActorEntry is initialized [actor_id: {}]", actor_id);
162            }
163            rx.await.unwrap_or_else(|_| Exit::no_actor())
164        }.instrument(
165            tracing::span!(
166                tracing::Level::TRACE,
167                "System::wait",
168                sys_id = self.0.system_id,
169                actor_id = display(actor_id)
170            )
171        )
172    }
173
174    /// Send a [`SysMsg`] to the specified process.
175    /// Returns `true` if both:
176    /// - the process entry corresponding to the `to` existed;
177    /// - the underlying mpsc-channel accepted the message (i.e. was not closed before this message
178    ///   is sent).
179    #[tracing::instrument(skip_all, fields(
180        sys_id = self.0.system_id,
181        to = display(to)
182    ))]
183    pub(crate) async fn send_sys_msg(&self, to: ActorID, sys_msg: SysMsg) -> bool {
184        tracing::trace!(
185            "[sys:{}] trying to send sys-msg [to: {}, sys-msg: {:?}]",
186            self.0.system_id,
187            to,
188            sys_msg
189        );
190
191        if let Some(entry) = self.actor_entry_read(to).await {
192            if entry.running_actor_id() == Some(to) {
193                if let Some(tx) = entry.sys_msg_tx() {
194                    return tx.send(sys_msg).is_ok()
195                } else {
196                    tracing::warn!("actor_entry is not occupied")
197                }
198            } else {
199                tracing::warn!("actor_id mismatch")
200            }
201        }
202        false
203    }
204
205    /// Send a single message to the specified actor.
206    #[tracing::instrument(skip_all, fields(
207        sys_id = self.0.system_id,
208        to = display(to),
209        msg_type = std::any::type_name::<M>()
210    ))]
211    pub async fn send<M>(&self, to: ActorID, message: M)
212    where
213        M: Send + 'static,
214    {
215        tracing::trace!("trying to send message",);
216        if let Some(entry) = self.actor_entry_read(to).await {
217            if entry.running_actor_id() == Some(to) {
218                if let Some(tx) = entry.messages_tx::<M>() {
219                    let _ = tx.send(message);
220                } else {
221                    tracing::warn!("message-type mismatch or actor_entry is not occupied");
222                }
223            } else {
224                tracing::warn!("actor_id mismatch")
225            }
226        } else {
227            tracing::trace!("no actor_entry")
228        }
229    }
230
231    /// Open a channel to the specified actor.
232    ///
233    /// When sending a series of messages to an actor, it may be better from the performance point
234    /// of view to open a channel to an actor, rather than sending each message separately using
235    /// [`System::send::<Message>(&self, ActorID, Message)`](crate::system::System::send).
236    #[tracing::instrument(skip_all, fields(
237        sys_id = self.0.system_id,
238        to = display(to)
239    ))]
240    pub async fn channel<M>(&self, to: ActorID) -> Result<ActorChannel<M>, SysChannelError>
241    where
242        M: Send + 'static,
243    {
244        self.actor_entry_read(to)
245            .await
246            .ok_or(SysChannelError::NoActor)?
247            .messages_tx()
248            .cloned()
249            .ok_or(SysChannelError::InvalidMessageType)
250    }
251
252    /// Link two actors
253    #[tracing::instrument(skip_all, fields(
254        sys_id = self.0.system_id,
255        left = display(left),
256        right = display(right)
257    ))]
258    pub async fn link(&self, left: ActorID, right: ActorID) {
259        let left_accepted_sys_msg = self.send_sys_msg(left, SysMsg::Link(right)).await;
260        let right_accepted_sys_msg = self.send_sys_msg(right, SysMsg::Link(left)).await;
261
262        if !right_accepted_sys_msg {
263            self.send_sys_msg(left, SysMsg::SigExit(right, Exit::no_actor())).await;
264        }
265        if !left_accepted_sys_msg {
266            self.send_sys_msg(right, SysMsg::SigExit(left, Exit::no_actor())).await;
267        }
268    }
269
270    /// Associate arbitrary data with the specified actor.
271    /// Upon actor termination that data will be dropped.
272    /// If no actor with the specified id exists, the data will be dropped right away.
273    #[tracing::instrument(skip_all, fields(
274        sys_id = self.0.system_id,
275        actor_id = display(actor_id),
276        data_type = std::any::type_name::<D>()
277    ))]
278    pub async fn put_data<D: Any + Send + Sync + 'static>(&self, actor_id: ActorID, data: D) {
279        if let Some(mut actor_entry) = self.actor_entry_write(actor_id).await {
280            actor_entry.put_data(data);
281        }
282    }
283
284    #[tracing::instrument(skip_all, fields(
285        sys_id = self.0.system_id,
286        actor_id = display(actor_id),
287        data_type = std::any::type_name::<D>()
288    ))]
289    pub async fn get_data<D: Any + Clone>(&self, actor_id: ActorID) -> Option<D> {
290        self.actor_entry_read(actor_id)
291            .await
292            .and_then(|actor_entry| actor_entry.get_data().cloned())
293    }
294
295    #[tracing::instrument(skip_all, fields(
296        sys_id = self.0.system_id,
297        actor_id = display(actor_id),
298        data_type = std::any::type_name::<D>()
299    ))]
300    pub async fn take_data<D: Any>(&self, actor_id: ActorID) -> Option<D> {
301        self.actor_entry_write(actor_id)
302            .await
303            .and_then(|mut actor_entry| actor_entry.take_data())
304    }
305
306    pub fn all_actors(&self) -> impl Stream<Item = ActorID> + '_ {
307        stream::iter(&self.0.actor_entries[..])
308            .filter_map(|slot| async move { slot.read().await.running_actor_id() })
309    }
310
311    #[tracing::instrument(skip_all, fields(
312        sys_id = self.0.system_id,
313        actor_id = display(actor_id)
314    ))]
315    pub async fn actor_info(&self, actor_id: ActorID) -> Option<ActorInfo> {
316        let (tx, rx) = oneshot::channel();
317        self.send_sys_msg(actor_id, SysMsg::GetInfo(tx)).await;
318        rx.await.ok()
319    }
320}
321
322#[derive(Debug)]
323struct Inner {
324    config: SystemConfig,
325    system_id: usize,
326    actor_id_pool: ActorIDPool,
327    actor_entries: Box<[RwLock<ActorEntry>]>,
328    exit_handler: Arc<dyn ExitHandler>,
329}