Skip to main content

elfo_core/
actor.rs

1use std::{
2    fmt, mem,
3    sync::{atomic, Arc},
4};
5
6use futures_intrusive::sync::ManualResetEvent;
7use metrics::{decrement_gauge, increment_counter, increment_gauge};
8use parking_lot::RwLock;
9use serde::{Deserialize, Serialize};
10use tracing::{error, info, warn};
11
12use crate::{
13    actor_status::{ActorStatus, ActorStatusKind, AtomicActorStatusKind},
14    envelope::Envelope,
15    errors::{SendError, TrySendError},
16    group::TerminationPolicy,
17    mailbox::{config::MailboxConfig, Mailbox, RecvResult},
18    messages::{ActorStatusReport, Terminate},
19    msg,
20    request_table::RequestTable,
21    restarting::RestartPolicy,
22    scope,
23    subscription::SubscriptionManager,
24    Addr,
25};
26
27// === ActorMeta ===
28
29/// Represents meta information about actor: his group and key.
30#[derive(Debug, Hash, PartialEq, Eq, Serialize, Deserialize)]
31pub struct ActorMeta {
32    /// Actor's group name set in the topology.
33    pub group: String,
34    /// Actor's key set by the router.
35    pub key: String,
36}
37
38impl fmt::Display for ActorMeta {
39    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40        f.write_str(&self.group)?;
41
42        if !self.key.is_empty() {
43            f.write_str("/")?;
44            f.write_str(&self.key)?;
45        }
46
47        Ok(())
48    }
49}
50
51// === ActorStartInfo ===
52
53/// A struct holding information related to an actor start.
54#[derive(Debug, Clone)]
55#[non_exhaustive]
56pub struct ActorStartInfo {
57    /// The cause for the actor start, indicating why the actor is being
58    /// initialized.
59    pub cause: ActorStartCause,
60}
61
62/// An enum representing various causes for an actor to start.
63#[derive(Debug, Clone)]
64#[non_exhaustive]
65pub enum ActorStartCause {
66    /// The actor started because its group was mounted.
67    GroupMounted,
68    /// The actor started in response to a message.
69    OnMessage,
70    /// The actor started due to the restart policy.
71    Restarted,
72}
73
74impl ActorStartInfo {
75    pub(crate) fn on_group_mounted() -> Self {
76        Self {
77            cause: ActorStartCause::GroupMounted,
78        }
79    }
80
81    pub(crate) fn on_message() -> Self {
82        Self {
83            cause: ActorStartCause::OnMessage,
84        }
85    }
86
87    pub(crate) fn on_restart() -> Self {
88        Self {
89            cause: ActorStartCause::Restarted,
90        }
91    }
92}
93
94impl ActorStartCause {
95    pub fn is_group_mounted(&self) -> bool {
96        matches!(self, ActorStartCause::GroupMounted)
97    }
98
99    pub fn is_restarted(&self) -> bool {
100        matches!(self, ActorStartCause::Restarted)
101    }
102
103    pub fn is_on_message(&self) -> bool {
104        matches!(self, ActorStartCause::OnMessage)
105    }
106}
107
108// === Actor ===
109
110pub(crate) struct Actor {
111    meta: Arc<ActorMeta>,
112    termination_policy: TerminationPolicy,
113    mailbox: Mailbox,
114    request_table: RequestTable,
115    status_kind: AtomicActorStatusKind,
116    control: RwLock<Control>,
117    finished: ManualResetEvent, // TODO: remove in favor of `status_subscription`?
118    status_subscription: Arc<SubscriptionManager>,
119}
120
121struct Control {
122    status: ActorStatus,
123    /// If `None`, a group's policy will be used.
124    restart_policy: Option<RestartPolicy>,
125    /// A mailbox capacity set in the config.
126    mailbox_capacity_config: usize,
127    /// Explicitly set mailbox capacity via `Context::set_mailbox_capacity()`.
128    mailbox_capacity_override: Option<usize>,
129}
130
131impl Actor {
132    pub(crate) fn new(
133        meta: Arc<ActorMeta>,
134        addr: Addr,
135        mailbox_config: &MailboxConfig,
136        termination_policy: TerminationPolicy,
137        status_subscription: Arc<SubscriptionManager>,
138    ) -> Self {
139        Actor {
140            status_kind: AtomicActorStatusKind::from(ActorStatusKind::Initializing),
141            meta,
142            termination_policy,
143            mailbox: Mailbox::new(mailbox_config),
144            request_table: RequestTable::new(addr),
145            control: RwLock::new(Control {
146                status: ActorStatus::INITIALIZING,
147                restart_policy: None,
148                mailbox_capacity_config: mailbox_config.capacity,
149                mailbox_capacity_override: None,
150            }),
151            finished: ManualResetEvent::new(false),
152            status_subscription,
153        }
154    }
155
156    pub(crate) fn on_start(&self) {
157        increment_gauge!("elfo_active_actors", 1.,
158            "status" => ActorStatusKind::Initializing.as_str());
159        increment_counter!("elfo_actor_status_changes_total",
160            "status" => ActorStatusKind::Initializing.as_str());
161
162        self.send_status_to_subscribers(&self.control.read());
163    }
164
165    pub(crate) async fn send(&self, envelope: Envelope) -> Result<(), SendError<Envelope>> {
166        match self.handle_system(envelope) {
167            Some(envelope) => self.mailbox.send(envelope).await,
168            None => Ok(()),
169        }
170    }
171
172    pub(crate) fn try_send(&self, envelope: Envelope) -> Result<(), TrySendError<Envelope>> {
173        match self.handle_system(envelope) {
174            Some(envelope) => self.mailbox.try_send(envelope),
175            None => Ok(()),
176        }
177    }
178
179    pub(crate) fn unbounded_send(&self, envelope: Envelope) -> Result<(), SendError<Envelope>> {
180        match self.handle_system(envelope) {
181            Some(envelope) => self.mailbox.unbounded_send(envelope),
182            None => Ok(()),
183        }
184    }
185
186    #[inline(always)]
187    fn handle_system(&self, envelope: Envelope) -> Option<Envelope> {
188        msg!(match &envelope {
189            Terminate { closing, .. } => {
190                if (*closing || self.termination_policy.close_mailbox) && self.close() {
191                    // First closing `Terminate` is considered successful.
192                    return None;
193                }
194            }
195        });
196
197        // If the mailbox is closed, all following `*_send()` returns an error.
198        Some(envelope)
199    }
200
201    pub(crate) async fn recv(&self) -> RecvResult {
202        self.mailbox.recv().await
203    }
204
205    pub(crate) fn try_recv(&self) -> Option<RecvResult> {
206        self.mailbox.try_recv()
207    }
208
209    pub(crate) fn request_table(&self) -> &RequestTable {
210        &self.request_table
211    }
212
213    pub(crate) fn set_mailbox_capacity_config(&self, capacity: usize) {
214        self.control.write().mailbox_capacity_config = capacity;
215        self.update_mailbox_capacity();
216    }
217
218    pub(crate) fn set_mailbox_capacity_override(&self, capacity: Option<usize>) {
219        self.control.write().mailbox_capacity_override = capacity;
220        self.update_mailbox_capacity();
221    }
222
223    fn update_mailbox_capacity(&self) {
224        let control = self.control.read();
225
226        let capacity = control
227            .mailbox_capacity_override
228            .unwrap_or(control.mailbox_capacity_config);
229
230        self.mailbox.set_capacity(capacity);
231    }
232
233    pub(crate) fn restart_policy(&self) -> Option<RestartPolicy> {
234        self.control.read().restart_policy.clone()
235    }
236
237    pub(crate) fn set_restart_policy(&self, policy: Option<RestartPolicy>) {
238        self.control.write().restart_policy = policy;
239    }
240
241    pub(crate) fn status_kind(&self) -> ActorStatusKind {
242        self.status_kind.load(atomic::Ordering::Acquire)
243    }
244
245    // Note that this method should be called inside a right scope.
246    pub(crate) fn set_status(&self, status: ActorStatus) {
247        self.status_kind
248            .store(status.kind(), atomic::Ordering::Release);
249
250        let mut control = self.control.write();
251        let prev_status = mem::replace(&mut control.status, status.clone());
252
253        if status == prev_status {
254            return;
255        }
256
257        self.send_status_to_subscribers(&control);
258        drop(control);
259
260        if status.kind().is_finished() {
261            self.close();
262            // Drop all messages to release requests immediately.
263            self.mailbox.drop_all();
264            self.finished.set();
265        }
266
267        log_status(&status);
268
269        if status.kind != prev_status.kind {
270            if !prev_status.kind().is_finished() {
271                decrement_gauge!("elfo_active_actors", 1., "status" => prev_status.kind.as_str());
272            }
273            if !status.kind().is_finished() {
274                increment_gauge!("elfo_active_actors", 1., "status" => status.kind.as_str());
275            }
276
277            increment_counter!("elfo_actor_status_changes_total", "status" => status.kind.as_str());
278        }
279
280        // TODO: use `sdnotify` to provide a detailed status to systemd.
281        //       or use another actor to listen all statuses for this.
282    }
283
284    #[cold]
285    #[inline(never)]
286    pub(crate) fn close(&self) -> bool {
287        self.mailbox.close(scope::trace_id())
288    }
289
290    pub(crate) async fn finished(&self) {
291        self.finished.wait().await
292    }
293
294    /// Accesses the actor's status under lock to avoid race conditions.
295    pub(crate) fn with_status<R>(&self, f: impl FnOnce(ActorStatusReport) -> R) -> R {
296        let control = self.control.read();
297        f(ActorStatusReport {
298            meta: self.meta.clone(),
299            status: control.status.clone(),
300        })
301    }
302
303    fn send_status_to_subscribers(&self, control: &Control) {
304        self.status_subscription.send(ActorStatusReport {
305            meta: self.meta.clone(),
306            status: control.status.clone(),
307        });
308    }
309}
310
311fn log_status(status: &ActorStatus) {
312    if let Some(details) = status.details.as_deref() {
313        match status.kind {
314            ActorStatusKind::Failed => error!(status = ?status.kind, %details, "status changed"),
315            ActorStatusKind::Alarming => warn!(status = ?status.kind, %details, "status changed"),
316            _ => info!(status = ?status.kind, %details, "status changed"),
317        }
318    } else {
319        match status.kind {
320            ActorStatusKind::Failed => error!(status = ?status.kind, "status changed"),
321            ActorStatusKind::Alarming => warn!(status = ?status.kind, "status changed"),
322            _ => info!(status = ?status.kind, "status changed"),
323        }
324    }
325}
326
327#[cfg(test)]
328#[cfg(feature = "FIXME")]
329mod tests {
330    use super::*;
331
332    #[tokio::test]
333    async fn finished() {
334        let meta = Arc::new(ActorMeta {
335            group: "foo".into(),
336            key: "bar".into(),
337        });
338
339        let actor = Actor::new(meta, Addr::NULL, TerminationPolicy::default());
340        let fut = actor.finished();
341        actor.set_status(ActorStatus::TERMINATED);
342        fut.await;
343        assert!(actor.status_kind().is_finished());
344        actor.finished().await;
345        assert!(actor.status_kind().is_finished());
346    }
347}