1use std::{fmt, mem, sync::Arc};
2
3use futures_intrusive::sync::ManualResetEvent;
4use metrics::{decrement_gauge, increment_counter, increment_gauge};
5use parking_lot::RwLock;
6use serde::{Deserialize, Serialize};
7use tracing::{error, info};
8
9use crate::{
10 addr::Addr,
11 envelope::Envelope,
12 errors::{SendError, TrySendError},
13 group::TerminationPolicy,
14 mailbox::{Mailbox, RecvResult},
15 messages::{ActorStatusReport, Terminate},
16 request_table::RequestTable,
17 scope,
18 subscription::SubscriptionManager,
19};
20
21use crate::{self as elfo};
22use elfo_macros::msg_raw as msg;
23
24#[derive(Debug, Hash, PartialEq, Eq, Serialize, Deserialize)]
28pub struct ActorMeta {
29 pub group: String,
30 pub key: String,
31}
32
33#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
38pub struct ActorStatus {
39 kind: ActorStatusKind,
40 details: Option<String>,
41}
42
43impl ActorStatus {
44 pub const ALARMING: ActorStatus = ActorStatus::new(ActorStatusKind::Alarming);
45 pub(crate) const FAILED: ActorStatus = ActorStatus::new(ActorStatusKind::Failed);
46 pub const INITIALIZING: ActorStatus = ActorStatus::new(ActorStatusKind::Initializing);
47 pub const NORMAL: ActorStatus = ActorStatus::new(ActorStatusKind::Normal);
48 pub(crate) const TERMINATED: ActorStatus = ActorStatus::new(ActorStatusKind::Terminated);
49 pub const TERMINATING: ActorStatus = ActorStatus::new(ActorStatusKind::Terminating);
50
51 const fn new(kind: ActorStatusKind) -> Self {
52 Self {
53 kind,
54 details: None,
55 }
56 }
57
58 pub fn with_details(&self, details: impl fmt::Display) -> Self {
60 ActorStatus {
61 kind: self.kind,
62 details: Some(details.to_string()),
63 }
64 }
65
66 pub fn kind(&self) -> ActorStatusKind {
68 self.kind
69 }
70
71 pub fn details(&self) -> Option<&str> {
73 self.details.as_deref()
74 }
75
76 pub(crate) fn is_failed(&self) -> bool {
77 self.kind == ActorStatusKind::Failed
78 }
79
80 fn is_finished(&self) -> bool {
81 use ActorStatusKind::*;
82 matches!(self.kind, Failed | Terminated)
83 }
84}
85
86impl fmt::Display for ActorStatus {
87 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88 match &self.details {
89 Some(details) => write!(f, "{:?}: {}", self.kind, details),
90 None => write!(f, "{:?}", self.kind),
91 }
92 }
93}
94
95#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
99#[non_exhaustive]
100pub enum ActorStatusKind {
101 Normal,
102 Initializing,
103 Terminating,
104 Terminated,
105 Alarming,
106 Failed,
107}
108
109impl ActorStatusKind {
110 fn as_str(&self) -> &'static str {
111 match self {
112 ActorStatusKind::Normal => "Normal",
113 ActorStatusKind::Initializing => "Initializing",
114 ActorStatusKind::Terminating => "Terminating",
115 ActorStatusKind::Terminated => "Terminated",
116 ActorStatusKind::Alarming => "Alarming",
117 ActorStatusKind::Failed => "Failed",
118 }
119 }
120}
121
122pub(crate) struct Actor {
125 meta: Arc<ActorMeta>,
126 termination_policy: TerminationPolicy,
127 mailbox: Mailbox,
128 request_table: RequestTable,
129 control: RwLock<ControlBlock>,
130 finished: ManualResetEvent, status_subscription: Arc<SubscriptionManager>,
132}
133
134struct ControlBlock {
135 status: ActorStatus,
136}
137
138impl Actor {
139 pub(crate) fn new(
140 meta: Arc<ActorMeta>,
141 addr: Addr,
142 termination_policy: TerminationPolicy,
143 status_subscription: Arc<SubscriptionManager>,
144 ) -> Self {
145 Actor {
146 meta,
147 termination_policy,
148 mailbox: Mailbox::new(),
149 request_table: RequestTable::new(addr),
150 control: RwLock::new(ControlBlock {
151 status: ActorStatus::INITIALIZING,
152 }),
153 finished: ManualResetEvent::new(false),
154 status_subscription,
155 }
156 }
157
158 pub(crate) fn on_start(&self) {
159 increment_gauge!("elfo_active_actors", 1.,
160 "status" => ActorStatusKind::Initializing.as_str());
161 increment_counter!("elfo_actor_status_changes_total",
162 "status" => ActorStatusKind::Initializing.as_str());
163
164 self.send_status_to_subscribers(&self.control.read());
165 }
166
167 pub(crate) fn try_send(&self, envelope: Envelope) -> Result<(), TrySendError<Envelope>> {
168 msg!(match &envelope {
169 Terminate { closing } => {
170 if *closing || self.termination_policy.close_mailbox {
171 if self.close() {
172 return Ok(());
173 } else {
174 return Err(TrySendError::Closed(envelope));
175 }
176 }
177 }
178 });
179
180 self.mailbox.try_send(envelope)
181 }
182
183 pub(crate) async fn send(&self, envelope: Envelope) -> Result<(), SendError<Envelope>> {
184 msg!(match &envelope {
185 Terminate { closing } => {
186 if *closing || self.termination_policy.close_mailbox {
187 if self.close() {
188 return Ok(());
189 } else {
190 return Err(SendError(envelope));
191 }
192 }
193 }
194 });
195
196 self.mailbox.send(envelope).await
197 }
198
199 pub(crate) async fn recv(&self) -> RecvResult {
200 self.mailbox.recv().await
201 }
202
203 pub(crate) fn try_recv(&self) -> Option<RecvResult> {
204 self.mailbox.try_recv()
205 }
206
207 pub(crate) fn request_table(&self) -> &RequestTable {
208 &self.request_table
209 }
210
211 pub(crate) fn set_status(&self, status: ActorStatus) {
213 let mut control = self.control.write();
214 let prev_status = mem::replace(&mut control.status, status.clone());
215
216 if status == prev_status {
217 return;
218 }
219
220 self.send_status_to_subscribers(&control);
221 drop(control);
222
223 if status.is_finished() {
224 self.close();
225 self.mailbox.drop_all();
227 self.finished.set();
228 }
229
230 let is_bad_kind = matches!(
231 status.kind,
232 ActorStatusKind::Alarming | ActorStatusKind::Failed
233 );
234
235 if let Some(details) = status.details.as_deref() {
236 if is_bad_kind {
237 error!(status = ?status.kind, %details, "status changed");
238 } else {
239 info!(status = ?status.kind, %details, "status changed");
240 }
241 } else if is_bad_kind {
242 error!(status = ?status.kind, "status changed");
243 } else {
244 info!(status = ?status.kind, "status changed");
245 };
246
247 if status.kind != prev_status.kind {
248 if !prev_status.is_finished() {
249 decrement_gauge!("elfo_active_actors", 1., "status" => prev_status.kind.as_str());
250 }
251 if !status.is_finished() {
252 increment_gauge!("elfo_active_actors", 1., "status" => status.kind.as_str());
253 }
254
255 increment_counter!("elfo_actor_status_changes_total", "status" => status.kind.as_str());
256 }
257
258 }
261
262 pub(crate) fn close(&self) -> bool {
263 self.mailbox.close(scope::trace_id())
264 }
265
266 pub(crate) fn is_initializing(&self) -> bool {
267 matches!(
268 self.control.read().status.kind,
269 ActorStatusKind::Initializing
270 )
271 }
272
273 pub(crate) fn is_terminating(&self) -> bool {
274 matches!(
275 self.control.read().status.kind,
276 ActorStatusKind::Terminating
277 )
278 }
279
280 pub(crate) async fn finished(&self) {
281 self.finished.wait().await
282 }
283
284 pub(crate) fn with_status<R>(&self, f: impl FnOnce(ActorStatusReport) -> R) -> R {
286 let control = self.control.read();
287 f(ActorStatusReport {
288 meta: self.meta.clone(),
289 status: control.status.clone(),
290 })
291 }
292
293 fn send_status_to_subscribers(&self, control: &ControlBlock) {
294 self.status_subscription.send(ActorStatusReport {
295 meta: self.meta.clone(),
296 status: control.status.clone(),
297 });
298 }
299}
300
301#[cfg(test)]
302#[cfg(feature = "FIXME")]
303mod tests {
304 use super::*;
305
306 #[tokio::test]
307 async fn finished() {
308 let meta = Arc::new(ActorMeta {
309 group: "foo".into(),
310 key: "bar".into(),
311 });
312
313 let actor = Actor::new(meta, Addr::NULL, TerminationPolicy::default());
314 let fut = actor.finished();
315 actor.set_status(ActorStatus::TERMINATED);
316 fut.await;
317 assert!(actor.control.read().status.is_finished());
318 actor.finished().await;
319 assert!(actor.control.read().status.is_finished());
320 }
321}