elfo_core/
actor.rs

1use std::{fmt, mem, sync::Arc};
2
3use futures_intrusive::sync::ManualResetEvent;
4use metrics::{decrement_gauge, increment_counter, increment_gauge};
5use parking_lot::RwLock;
6use serde::{Deserialize, Serialize};
7use tracing::{error, info};
8
9use crate::{
10    addr::Addr,
11    envelope::Envelope,
12    errors::{SendError, TrySendError},
13    group::TerminationPolicy,
14    mailbox::{Mailbox, RecvResult},
15    messages::{ActorStatusReport, Terminate},
16    request_table::RequestTable,
17    scope,
18    subscription::SubscriptionManager,
19};
20
21use crate::{self as elfo};
22use elfo_macros::msg_raw as msg;
23
24// === ActorMeta ===
25
26/// Represents meta information about actor: his group and key.
27#[derive(Debug, Hash, PartialEq, Eq, Serialize, Deserialize)]
28pub struct ActorMeta {
29    pub group: String,
30    pub key: String,
31}
32
33// === ActorStatus ===
34
35/// Represents the current status of an actor.
36/// See [The Actoromicon](https://actoromicon.rs/ch03-01-actor-lifecycle.html) for details.
37#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
38pub struct ActorStatus {
39    kind: ActorStatusKind,
40    details: Option<String>,
41}
42
43impl ActorStatus {
44    pub const ALARMING: ActorStatus = ActorStatus::new(ActorStatusKind::Alarming);
45    pub(crate) const FAILED: ActorStatus = ActorStatus::new(ActorStatusKind::Failed);
46    pub const INITIALIZING: ActorStatus = ActorStatus::new(ActorStatusKind::Initializing);
47    pub const NORMAL: ActorStatus = ActorStatus::new(ActorStatusKind::Normal);
48    pub(crate) const TERMINATED: ActorStatus = ActorStatus::new(ActorStatusKind::Terminated);
49    pub const TERMINATING: ActorStatus = ActorStatus::new(ActorStatusKind::Terminating);
50
51    const fn new(kind: ActorStatusKind) -> Self {
52        Self {
53            kind,
54            details: None,
55        }
56    }
57
58    /// Creates a new status with the same kind and provided details.
59    pub fn with_details(&self, details: impl fmt::Display) -> Self {
60        ActorStatus {
61            kind: self.kind,
62            details: Some(details.to_string()),
63        }
64    }
65
66    /// Returns the corresponding [`ActorStatusKind`] for this status.
67    pub fn kind(&self) -> ActorStatusKind {
68        self.kind
69    }
70
71    /// Returns details for this status, if provided.
72    pub fn details(&self) -> Option<&str> {
73        self.details.as_deref()
74    }
75
76    pub(crate) fn is_failed(&self) -> bool {
77        self.kind == ActorStatusKind::Failed
78    }
79
80    fn is_finished(&self) -> bool {
81        use ActorStatusKind::*;
82        matches!(self.kind, Failed | Terminated)
83    }
84}
85
86impl fmt::Display for ActorStatus {
87    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88        match &self.details {
89            Some(details) => write!(f, "{:?}: {}", self.kind, details),
90            None => write!(f, "{:?}", self.kind),
91        }
92    }
93}
94
95// === ActorStatusKind ===
96
97/// A list specifying statuses of actors. It's used with the [`ActorStatus`].
98#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
99#[non_exhaustive]
100pub enum ActorStatusKind {
101    Normal,
102    Initializing,
103    Terminating,
104    Terminated,
105    Alarming,
106    Failed,
107}
108
109impl ActorStatusKind {
110    fn as_str(&self) -> &'static str {
111        match self {
112            ActorStatusKind::Normal => "Normal",
113            ActorStatusKind::Initializing => "Initializing",
114            ActorStatusKind::Terminating => "Terminating",
115            ActorStatusKind::Terminated => "Terminated",
116            ActorStatusKind::Alarming => "Alarming",
117            ActorStatusKind::Failed => "Failed",
118        }
119    }
120}
121
122// === Actor ===
123
124pub(crate) struct Actor {
125    meta: Arc<ActorMeta>,
126    termination_policy: TerminationPolicy,
127    mailbox: Mailbox,
128    request_table: RequestTable,
129    control: RwLock<ControlBlock>,
130    finished: ManualResetEvent, // TODO: remove in favor of `status_subscription`?
131    status_subscription: Arc<SubscriptionManager>,
132}
133
134struct ControlBlock {
135    status: ActorStatus,
136}
137
138impl Actor {
139    pub(crate) fn new(
140        meta: Arc<ActorMeta>,
141        addr: Addr,
142        termination_policy: TerminationPolicy,
143        status_subscription: Arc<SubscriptionManager>,
144    ) -> Self {
145        Actor {
146            meta,
147            termination_policy,
148            mailbox: Mailbox::new(),
149            request_table: RequestTable::new(addr),
150            control: RwLock::new(ControlBlock {
151                status: ActorStatus::INITIALIZING,
152            }),
153            finished: ManualResetEvent::new(false),
154            status_subscription,
155        }
156    }
157
158    pub(crate) fn on_start(&self) {
159        increment_gauge!("elfo_active_actors", 1.,
160            "status" => ActorStatusKind::Initializing.as_str());
161        increment_counter!("elfo_actor_status_changes_total",
162            "status" => ActorStatusKind::Initializing.as_str());
163
164        self.send_status_to_subscribers(&self.control.read());
165    }
166
167    pub(crate) fn try_send(&self, envelope: Envelope) -> Result<(), TrySendError<Envelope>> {
168        msg!(match &envelope {
169            Terminate { closing } => {
170                if *closing || self.termination_policy.close_mailbox {
171                    if self.close() {
172                        return Ok(());
173                    } else {
174                        return Err(TrySendError::Closed(envelope));
175                    }
176                }
177            }
178        });
179
180        self.mailbox.try_send(envelope)
181    }
182
183    pub(crate) async fn send(&self, envelope: Envelope) -> Result<(), SendError<Envelope>> {
184        msg!(match &envelope {
185            Terminate { closing } => {
186                if *closing || self.termination_policy.close_mailbox {
187                    if self.close() {
188                        return Ok(());
189                    } else {
190                        return Err(SendError(envelope));
191                    }
192                }
193            }
194        });
195
196        self.mailbox.send(envelope).await
197    }
198
199    pub(crate) async fn recv(&self) -> RecvResult {
200        self.mailbox.recv().await
201    }
202
203    pub(crate) fn try_recv(&self) -> Option<RecvResult> {
204        self.mailbox.try_recv()
205    }
206
207    pub(crate) fn request_table(&self) -> &RequestTable {
208        &self.request_table
209    }
210
211    // Note that this method should be called inside a right scope.
212    pub(crate) fn set_status(&self, status: ActorStatus) {
213        let mut control = self.control.write();
214        let prev_status = mem::replace(&mut control.status, status.clone());
215
216        if status == prev_status {
217            return;
218        }
219
220        self.send_status_to_subscribers(&control);
221        drop(control);
222
223        if status.is_finished() {
224            self.close();
225            // Drop all messages to release requests immediately.
226            self.mailbox.drop_all();
227            self.finished.set();
228        }
229
230        let is_bad_kind = matches!(
231            status.kind,
232            ActorStatusKind::Alarming | ActorStatusKind::Failed
233        );
234
235        if let Some(details) = status.details.as_deref() {
236            if is_bad_kind {
237                error!(status = ?status.kind, %details, "status changed");
238            } else {
239                info!(status = ?status.kind, %details, "status changed");
240            }
241        } else if is_bad_kind {
242            error!(status = ?status.kind, "status changed");
243        } else {
244            info!(status = ?status.kind, "status changed");
245        };
246
247        if status.kind != prev_status.kind {
248            if !prev_status.is_finished() {
249                decrement_gauge!("elfo_active_actors", 1., "status" => prev_status.kind.as_str());
250            }
251            if !status.is_finished() {
252                increment_gauge!("elfo_active_actors", 1., "status" => status.kind.as_str());
253            }
254
255            increment_counter!("elfo_actor_status_changes_total", "status" => status.kind.as_str());
256        }
257
258        // TODO: use `sdnotify` to provide a detailed status to systemd.
259        //       or use another actor to listen all statuses for this.
260    }
261
262    pub(crate) fn close(&self) -> bool {
263        self.mailbox.close(scope::trace_id())
264    }
265
266    pub(crate) fn is_initializing(&self) -> bool {
267        matches!(
268            self.control.read().status.kind,
269            ActorStatusKind::Initializing
270        )
271    }
272
273    pub(crate) fn is_terminating(&self) -> bool {
274        matches!(
275            self.control.read().status.kind,
276            ActorStatusKind::Terminating
277        )
278    }
279
280    pub(crate) async fn finished(&self) {
281        self.finished.wait().await
282    }
283
284    /// Accesses the actor's status under lock to avoid race conditions.
285    pub(crate) fn with_status<R>(&self, f: impl FnOnce(ActorStatusReport) -> R) -> R {
286        let control = self.control.read();
287        f(ActorStatusReport {
288            meta: self.meta.clone(),
289            status: control.status.clone(),
290        })
291    }
292
293    fn send_status_to_subscribers(&self, control: &ControlBlock) {
294        self.status_subscription.send(ActorStatusReport {
295            meta: self.meta.clone(),
296            status: control.status.clone(),
297        });
298    }
299}
300
301#[cfg(test)]
302#[cfg(feature = "FIXME")]
303mod tests {
304    use super::*;
305
306    #[tokio::test]
307    async fn finished() {
308        let meta = Arc::new(ActorMeta {
309            group: "foo".into(),
310            key: "bar".into(),
311        });
312
313        let actor = Actor::new(meta, Addr::NULL, TerminationPolicy::default());
314        let fut = actor.finished();
315        actor.set_status(ActorStatus::TERMINATED);
316        fut.await;
317        assert!(actor.control.read().status.is_finished());
318        actor.finished().await;
319        assert!(actor.control.read().status.is_finished());
320    }
321}