1use std::{
2 fmt, mem,
3 sync::{atomic, Arc},
4};
5
6use futures_intrusive::sync::ManualResetEvent;
7use metrics::{decrement_gauge, increment_counter, increment_gauge};
8use parking_lot::RwLock;
9use serde::{Deserialize, Serialize};
10use tracing::{error, info, warn};
11
12use crate::{
13 actor_status::{ActorStatus, ActorStatusKind, AtomicActorStatusKind},
14 envelope::Envelope,
15 errors::{SendError, TrySendError},
16 group::TerminationPolicy,
17 mailbox::{config::MailboxConfig, Mailbox, RecvResult},
18 messages::{ActorStatusReport, Terminate},
19 msg,
20 request_table::RequestTable,
21 restarting::RestartPolicy,
22 scope,
23 subscription::SubscriptionManager,
24 Addr,
25};
26
27#[derive(Debug, Hash, PartialEq, Eq, Serialize, Deserialize)]
31pub struct ActorMeta {
32 pub group: String,
34 pub key: String,
36}
37
38impl fmt::Display for ActorMeta {
39 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40 f.write_str(&self.group)?;
41
42 if !self.key.is_empty() {
43 f.write_str("/")?;
44 f.write_str(&self.key)?;
45 }
46
47 Ok(())
48 }
49}
50
51#[derive(Debug, Clone)]
55#[non_exhaustive]
56pub struct ActorStartInfo {
57 pub cause: ActorStartCause,
60}
61
62#[derive(Debug, Clone)]
64#[non_exhaustive]
65pub enum ActorStartCause {
66 GroupMounted,
68 OnMessage,
70 Restarted,
72}
73
74impl ActorStartInfo {
75 pub(crate) fn on_group_mounted() -> Self {
76 Self {
77 cause: ActorStartCause::GroupMounted,
78 }
79 }
80
81 pub(crate) fn on_message() -> Self {
82 Self {
83 cause: ActorStartCause::OnMessage,
84 }
85 }
86
87 pub(crate) fn on_restart() -> Self {
88 Self {
89 cause: ActorStartCause::Restarted,
90 }
91 }
92}
93
94impl ActorStartCause {
95 pub fn is_group_mounted(&self) -> bool {
96 matches!(self, ActorStartCause::GroupMounted)
97 }
98
99 pub fn is_restarted(&self) -> bool {
100 matches!(self, ActorStartCause::Restarted)
101 }
102
103 pub fn is_on_message(&self) -> bool {
104 matches!(self, ActorStartCause::OnMessage)
105 }
106}
107
108pub(crate) struct Actor {
111 meta: Arc<ActorMeta>,
112 termination_policy: TerminationPolicy,
113 mailbox: Mailbox,
114 request_table: RequestTable,
115 status_kind: AtomicActorStatusKind,
116 control: RwLock<Control>,
117 finished: ManualResetEvent, status_subscription: Arc<SubscriptionManager>,
119}
120
121struct Control {
122 status: ActorStatus,
123 restart_policy: Option<RestartPolicy>,
125 mailbox_capacity_config: usize,
127 mailbox_capacity_override: Option<usize>,
129}
130
131impl Actor {
132 pub(crate) fn new(
133 meta: Arc<ActorMeta>,
134 addr: Addr,
135 mailbox_config: &MailboxConfig,
136 termination_policy: TerminationPolicy,
137 status_subscription: Arc<SubscriptionManager>,
138 ) -> Self {
139 Actor {
140 status_kind: AtomicActorStatusKind::from(ActorStatusKind::Initializing),
141 meta,
142 termination_policy,
143 mailbox: Mailbox::new(mailbox_config),
144 request_table: RequestTable::new(addr),
145 control: RwLock::new(Control {
146 status: ActorStatus::INITIALIZING,
147 restart_policy: None,
148 mailbox_capacity_config: mailbox_config.capacity,
149 mailbox_capacity_override: None,
150 }),
151 finished: ManualResetEvent::new(false),
152 status_subscription,
153 }
154 }
155
156 pub(crate) fn on_start(&self) {
157 increment_gauge!("elfo_active_actors", 1.,
158 "status" => ActorStatusKind::Initializing.as_str());
159 increment_counter!("elfo_actor_status_changes_total",
160 "status" => ActorStatusKind::Initializing.as_str());
161
162 self.send_status_to_subscribers(&self.control.read());
163 }
164
165 pub(crate) async fn send(&self, envelope: Envelope) -> Result<(), SendError<Envelope>> {
166 match self.handle_system(envelope) {
167 Some(envelope) => self.mailbox.send(envelope).await,
168 None => Ok(()),
169 }
170 }
171
172 pub(crate) fn try_send(&self, envelope: Envelope) -> Result<(), TrySendError<Envelope>> {
173 match self.handle_system(envelope) {
174 Some(envelope) => self.mailbox.try_send(envelope),
175 None => Ok(()),
176 }
177 }
178
179 pub(crate) fn unbounded_send(&self, envelope: Envelope) -> Result<(), SendError<Envelope>> {
180 match self.handle_system(envelope) {
181 Some(envelope) => self.mailbox.unbounded_send(envelope),
182 None => Ok(()),
183 }
184 }
185
186 #[inline(always)]
187 fn handle_system(&self, envelope: Envelope) -> Option<Envelope> {
188 msg!(match &envelope {
189 Terminate { closing, .. } => {
190 if (*closing || self.termination_policy.close_mailbox) && self.close() {
191 return None;
193 }
194 }
195 });
196
197 Some(envelope)
199 }
200
201 pub(crate) async fn recv(&self) -> RecvResult {
202 self.mailbox.recv().await
203 }
204
205 pub(crate) fn try_recv(&self) -> Option<RecvResult> {
206 self.mailbox.try_recv()
207 }
208
209 pub(crate) fn request_table(&self) -> &RequestTable {
210 &self.request_table
211 }
212
213 pub(crate) fn set_mailbox_capacity_config(&self, capacity: usize) {
214 self.control.write().mailbox_capacity_config = capacity;
215 self.update_mailbox_capacity();
216 }
217
218 pub(crate) fn set_mailbox_capacity_override(&self, capacity: Option<usize>) {
219 self.control.write().mailbox_capacity_override = capacity;
220 self.update_mailbox_capacity();
221 }
222
223 fn update_mailbox_capacity(&self) {
224 let control = self.control.read();
225
226 let capacity = control
227 .mailbox_capacity_override
228 .unwrap_or(control.mailbox_capacity_config);
229
230 self.mailbox.set_capacity(capacity);
231 }
232
233 pub(crate) fn restart_policy(&self) -> Option<RestartPolicy> {
234 self.control.read().restart_policy.clone()
235 }
236
237 pub(crate) fn set_restart_policy(&self, policy: Option<RestartPolicy>) {
238 self.control.write().restart_policy = policy;
239 }
240
241 pub(crate) fn status_kind(&self) -> ActorStatusKind {
242 self.status_kind.load(atomic::Ordering::Acquire)
243 }
244
245 pub(crate) fn set_status(&self, status: ActorStatus) {
247 self.status_kind
248 .store(status.kind(), atomic::Ordering::Release);
249
250 let mut control = self.control.write();
251 let prev_status = mem::replace(&mut control.status, status.clone());
252
253 if status == prev_status {
254 return;
255 }
256
257 self.send_status_to_subscribers(&control);
258 drop(control);
259
260 if status.kind().is_finished() {
261 self.close();
262 self.mailbox.drop_all();
264 self.finished.set();
265 }
266
267 log_status(&status);
268
269 if status.kind != prev_status.kind {
270 if !prev_status.kind().is_finished() {
271 decrement_gauge!("elfo_active_actors", 1., "status" => prev_status.kind.as_str());
272 }
273 if !status.kind().is_finished() {
274 increment_gauge!("elfo_active_actors", 1., "status" => status.kind.as_str());
275 }
276
277 increment_counter!("elfo_actor_status_changes_total", "status" => status.kind.as_str());
278 }
279
280 }
283
284 #[cold]
285 #[inline(never)]
286 pub(crate) fn close(&self) -> bool {
287 self.mailbox.close(scope::trace_id())
288 }
289
290 pub(crate) async fn finished(&self) {
291 self.finished.wait().await
292 }
293
294 pub(crate) fn with_status<R>(&self, f: impl FnOnce(ActorStatusReport) -> R) -> R {
296 let control = self.control.read();
297 f(ActorStatusReport {
298 meta: self.meta.clone(),
299 status: control.status.clone(),
300 })
301 }
302
303 fn send_status_to_subscribers(&self, control: &Control) {
304 self.status_subscription.send(ActorStatusReport {
305 meta: self.meta.clone(),
306 status: control.status.clone(),
307 });
308 }
309}
310
311fn log_status(status: &ActorStatus) {
312 if let Some(details) = status.details.as_deref() {
313 match status.kind {
314 ActorStatusKind::Failed => error!(status = ?status.kind, %details, "status changed"),
315 ActorStatusKind::Alarming => warn!(status = ?status.kind, %details, "status changed"),
316 _ => info!(status = ?status.kind, %details, "status changed"),
317 }
318 } else {
319 match status.kind {
320 ActorStatusKind::Failed => error!(status = ?status.kind, "status changed"),
321 ActorStatusKind::Alarming => warn!(status = ?status.kind, "status changed"),
322 _ => info!(status = ?status.kind, "status changed"),
323 }
324 }
325}
326
327#[cfg(test)]
328#[cfg(feature = "FIXME")]
329mod tests {
330 use super::*;
331
332 #[tokio::test]
333 async fn finished() {
334 let meta = Arc::new(ActorMeta {
335 group: "foo".into(),
336 key: "bar".into(),
337 });
338
339 let actor = Actor::new(meta, Addr::NULL, TerminationPolicy::default());
340 let fut = actor.finished();
341 actor.set_status(ActorStatus::TERMINATED);
342 fut.await;
343 assert!(actor.status_kind().is_finished());
344 actor.finished().await;
345 assert!(actor.status_kind().is_finished());
346 }
347}