elfo_core/
scope.rs

1#![allow(clippy::declare_interior_mutable_const)] // see tokio#4872
2
3use std::{
4    cell::Cell,
5    future::Future,
6    sync::{
7        atomic::{AtomicUsize, Ordering},
8        Arc,
9    },
10};
11
12use crate::{
13    actor::ActorMeta,
14    addr::Addr,
15    config::SystemConfig,
16    dumping::DumpingControl,
17    logging::_priv::LoggingControl,
18    permissions::{AtomicPermissions, Permissions},
19    telemetry::TelemetryConfig,
20    tracing::TraceId,
21};
22
23tokio::task_local! {
24    static SCOPE: Scope;
25}
26
27#[derive(Clone)]
28pub struct Scope {
29    trace_id: Cell<TraceId>,
30    actor: Arc<ScopeActorShared>,
31    group: Arc<ScopeGroupShared>,
32}
33
34assert_impl_all!(Scope: Send);
35assert_not_impl_all!(Scope: Sync);
36
37impl Scope {
38    /// Private API for now.
39    #[doc(hidden)]
40    pub fn test(actor: Addr, meta: Arc<ActorMeta>) -> Self {
41        Self::new(
42            TraceId::generate(),
43            actor,
44            meta,
45            Arc::new(ScopeGroupShared::new(Addr::NULL)),
46        )
47    }
48
49    pub(crate) fn new(
50        trace_id: TraceId,
51        addr: Addr,
52        meta: Arc<ActorMeta>,
53        group: Arc<ScopeGroupShared>,
54    ) -> Self {
55        Self {
56            trace_id: Cell::new(trace_id),
57            actor: Arc::new(ScopeActorShared::new(addr, meta)),
58            group,
59        }
60    }
61
62    pub(crate) fn with_telemetry(mut self, config: &TelemetryConfig) -> Self {
63        self.actor = Arc::new(self.actor.with_telemetry(config));
64        self
65    }
66
67    #[inline]
68    #[deprecated(note = "use `actor()` instead")]
69    pub fn addr(&self) -> Addr {
70        self.actor()
71    }
72
73    #[inline]
74    pub fn actor(&self) -> Addr {
75        self.actor.addr
76    }
77
78    #[inline]
79    pub fn group(&self) -> Addr {
80        self.group.addr
81    }
82
83    /// Returns the current object's meta.
84    #[inline]
85    pub fn meta(&self) -> &Arc<ActorMeta> {
86        &self.actor.meta
87    }
88
89    /// Private API for now.
90    #[inline]
91    #[stability::unstable]
92    #[doc(hidden)]
93    pub fn telemetry_meta(&self) -> &Arc<ActorMeta> {
94        &self.actor.telemetry_meta
95    }
96
97    /// Returns the current trace id.
98    #[inline]
99    pub fn trace_id(&self) -> TraceId {
100        self.trace_id.get()
101    }
102
103    /// Replaces the current trace id with the provided one.
104    #[inline]
105    pub fn set_trace_id(&self, trace_id: TraceId) {
106        self.trace_id.set(trace_id);
107    }
108
109    /// Returns the current permissions (for logging, telemetry and so on).
110    #[inline]
111    pub fn permissions(&self) -> Permissions {
112        self.group.permissions.load()
113    }
114
115    /// Private API for now.
116    #[inline]
117    #[stability::unstable]
118    #[doc(hidden)]
119    pub fn logging(&self) -> &LoggingControl {
120        &self.group.logging
121    }
122
123    /// Private API for now.
124    #[inline]
125    #[stability::unstable]
126    #[doc(hidden)]
127    pub fn dumping(&self) -> &DumpingControl {
128        &self.group.dumping
129    }
130
131    #[doc(hidden)]
132    #[stability::unstable]
133    pub fn increment_allocated_bytes(&self, by: usize) {
134        self.actor.allocated_bytes.fetch_add(by, Ordering::Relaxed);
135    }
136
137    #[doc(hidden)]
138    #[stability::unstable]
139    pub fn increment_deallocated_bytes(&self, by: usize) {
140        self.actor
141            .deallocated_bytes
142            .fetch_add(by, Ordering::Relaxed);
143    }
144
145    pub(crate) fn take_allocated_bytes(&self) -> usize {
146        self.actor.allocated_bytes.swap(0, Ordering::Relaxed)
147    }
148
149    pub(crate) fn take_deallocated_bytes(&self) -> usize {
150        self.actor.deallocated_bytes.swap(0, Ordering::Relaxed)
151    }
152
153    /// Wraps the provided future with the current scope.
154    pub async fn within<F: Future>(self, f: F) -> F::Output {
155        SCOPE.scope(self, f).await
156    }
157
158    /// Runs the provided function with the current scope.
159    pub fn sync_within<R>(self, f: impl FnOnce() -> R) -> R {
160        SCOPE.sync_scope(self, f)
161    }
162}
163
164struct ScopeActorShared {
165    addr: Addr,
166    meta: Arc<ActorMeta>,
167    telemetry_meta: Arc<ActorMeta>,
168    allocated_bytes: AtomicUsize,
169    deallocated_bytes: AtomicUsize,
170}
171
172impl ScopeActorShared {
173    fn new(addr: Addr, meta: Arc<ActorMeta>) -> Self {
174        Self {
175            addr,
176            meta: meta.clone(),
177            telemetry_meta: meta,
178            allocated_bytes: AtomicUsize::new(0),
179            deallocated_bytes: AtomicUsize::new(0),
180        }
181    }
182
183    fn with_telemetry(&self, config: &TelemetryConfig) -> Self {
184        Self {
185            addr: self.addr,
186            meta: self.meta.clone(),
187            telemetry_meta: config
188                .per_actor_key
189                .key(&self.meta.key)
190                .map(|key| {
191                    Arc::new(ActorMeta {
192                        group: self.meta.group.clone(),
193                        key,
194                    })
195                })
196                .unwrap_or_else(|| self.meta.clone()),
197            allocated_bytes: AtomicUsize::new(0),
198            deallocated_bytes: AtomicUsize::new(0),
199        }
200    }
201}
202
203assert_impl_all!(ScopeGroupShared: Send, Sync);
204
205pub(crate) struct ScopeGroupShared {
206    addr: Addr,
207    permissions: AtomicPermissions,
208    logging: LoggingControl,
209    dumping: DumpingControl,
210}
211
212assert_impl_all!(ScopeGroupShared: Send, Sync);
213
214impl ScopeGroupShared {
215    pub(crate) fn new(addr: Addr) -> Self {
216        Self {
217            addr,
218            permissions: Default::default(), // everything is disabled
219            logging: Default::default(),
220            dumping: Default::default(),
221        }
222    }
223
224    pub(crate) fn configure(&self, config: &SystemConfig) {
225        // Update the logging subsystem.
226        self.logging.configure(&config.logging);
227        let max_level = self.logging.max_level_hint().into_level();
228
229        // Update the dumping subsystem.
230        self.dumping.configure(&config.dumping);
231
232        // Update permissions.
233        let mut perm = self.permissions.load();
234        perm.set_logging_enabled(max_level);
235        perm.set_dumping_enabled(!config.dumping.disabled);
236        perm.set_telemetry_per_actor_group_enabled(config.telemetry.per_actor_group);
237        perm.set_telemetry_per_actor_key_enabled(config.telemetry.per_actor_key.is_enabled());
238        self.permissions.store(perm);
239    }
240}
241
242/// Exposes the current scope in order to send to other tasks.
243///
244/// # Panics
245/// This function will panic if called outside actors.
246pub fn expose() -> Scope {
247    SCOPE.with(Clone::clone)
248}
249
250/// Exposes the current scope if inside the actor system.
251pub fn try_expose() -> Option<Scope> {
252    SCOPE.try_with(Clone::clone).ok()
253}
254
255/// Accesses the current scope and runs the provided closure.
256///
257/// # Panics
258/// This function will panic if called ouside the actor system.
259#[inline]
260pub fn with<R>(f: impl FnOnce(&Scope) -> R) -> R {
261    try_with(f).expect("cannot access a scope outside the actor system")
262}
263
264/// Accesses the current scope and runs the provided closure.
265///
266/// Returns `None` if called outside the actor system.
267/// For a panicking variant, see `with`.
268#[inline]
269pub fn try_with<R>(f: impl FnOnce(&Scope) -> R) -> Option<R> {
270    SCOPE.try_with(|scope| f(scope)).ok()
271}
272
273/// Returns the current trace id.
274///
275/// # Panics
276/// This function will panic if called ouside the actor system.
277#[inline]
278pub fn trace_id() -> TraceId {
279    with(Scope::trace_id)
280}
281
282/// Returns the current trace id if inside the actor system.
283#[inline]
284pub fn try_trace_id() -> Option<TraceId> {
285    try_with(Scope::trace_id)
286}
287
288/// Replaces the current trace id with the provided one.
289///
290/// # Panics
291/// This function will panic if called ouside the actor system.
292#[inline]
293pub fn set_trace_id(trace_id: TraceId) {
294    with(|scope| scope.set_trace_id(trace_id));
295}
296
297/// Replaces the current trace id with the provided one
298/// if inside the actor system.
299///
300/// Returns `true` if the trace id has been replaced.
301#[inline]
302pub fn try_set_trace_id(trace_id: TraceId) -> bool {
303    try_with(|scope| scope.set_trace_id(trace_id)).is_some()
304}
305
306/// Returns the current object's meta.
307///
308/// # Panics
309/// This function will panic if called ouside the actor system.
310#[inline]
311pub fn meta() -> Arc<ActorMeta> {
312    with(|scope| scope.meta().clone())
313}
314
315/// Returns the current object's meta if inside the actor system.
316#[inline]
317pub fn try_meta() -> Option<Arc<ActorMeta>> {
318    try_with(|scope| scope.meta().clone())
319}
320
321thread_local! {
322    static SERDE_MODE: Cell<SerdeMode> = Cell::new(SerdeMode::Normal);
323}
324
325/// A mode of (de)serialization.
326/// Useful to alternate a behavior depending on a context.
327#[stability::unstable]
328#[derive(Clone, Copy, PartialEq, Eq)]
329#[non_exhaustive]
330pub enum SerdeMode {
331    /// A default mode, regular ser/de calls.
332    Normal,
333    /// Serialzation for dumping purposes.
334    Dumping,
335    // Network
336}
337
338/// Sets the specified serde mode and runs the function.
339///
340/// # Panics
341/// If the provided function panics.
342#[stability::unstable]
343#[inline]
344pub fn with_serde_mode<R>(mode: SerdeMode, f: impl FnOnce() -> R) -> R {
345    // We use a guard here to restore the current serde mode even on panics.
346    struct Guard(SerdeMode);
347    impl Drop for Guard {
348        fn drop(&mut self) {
349            SERDE_MODE.with(|cell| cell.set(self.0));
350        }
351    }
352
353    let mode = SERDE_MODE.with(|cell| cell.replace(mode));
354    let _guard = Guard(mode);
355    f()
356}
357
358/// Returns the current serde mode.
359#[stability::unstable]
360#[inline]
361pub fn serde_mode() -> SerdeMode {
362    SERDE_MODE.with(Cell::get)
363}
364
365#[test]
366fn serde_mode_works() {
367    #[derive(serde::Serialize)]
368    struct S {
369        #[serde(serialize_with = "crate::dumping::hide")]
370        f: u32,
371    }
372
373    let value = S { f: 42 };
374
375    // `Normal` mode
376    assert_eq!(serde_json::to_string(&value).unwrap(), r#"{"f":42}"#);
377
378    // `Dumping` mode
379    let json = with_serde_mode(SerdeMode::Dumping, || {
380        serde_json::to_string(&value).unwrap()
381    });
382    assert_eq!(json, r#"{"f":"<hidden>"}"#);
383
384    // Restored `Normal` mode
385    assert_eq!(serde_json::to_string(&value).unwrap(), r#"{"f":42}"#);
386
387    // `Normal` mode must be restored after panic
388    let res = std::panic::catch_unwind(|| with_serde_mode(SerdeMode::Dumping, || panic!("oops")));
389    assert!(res.is_err());
390    assert_eq!(serde_json::to_string(&value).unwrap(), r#"{"f":42}"#);
391}