Skip to main content

elfo_core/
scope.rs

1#![allow(clippy::declare_interior_mutable_const)] // see tokio#4872
2
3use std::{
4    cell::Cell,
5    future::Future,
6    sync::{
7        atomic::{AtomicUsize, Ordering},
8        Arc,
9    },
10};
11
12use crate::{
13    actor::ActorMeta,
14    addr::{Addr, NodeLaunchId, NodeNo},
15    config::SystemConfig,
16    dumping::DumpingControl,
17    logging::LoggingControl,
18    permissions::{AtomicPermissions, Permissions},
19    telemetry::config::TelemetryConfig,
20    tracing::TraceId,
21};
22
23tokio::task_local! {
24    static SCOPE: Scope;
25}
26
27#[derive(Clone)]
28pub struct Scope {
29    trace_id: Cell<TraceId>,
30    actor: Arc<ScopeActorShared>,
31    group: Arc<ScopeGroupShared>,
32}
33
34assert_impl_all!(Scope: Send);
35assert_not_impl_all!(Scope: Sync);
36
37impl Scope {
38    /// Private API for now.
39    #[doc(hidden)]
40    pub fn test(actor: Addr, meta: Arc<ActorMeta>) -> Self {
41        let node_no = NodeNo::from_bits(u16::MAX).unwrap();
42        let node_launch_id = NodeLaunchId::from_bits(u64::MAX).unwrap();
43        let group_scope = Arc::new(ScopeGroupShared::new(node_no, node_launch_id, Addr::NULL));
44        Self::new(TraceId::generate(), actor, meta, group_scope)
45    }
46
47    pub(crate) fn new(
48        trace_id: TraceId,
49        addr: Addr,
50        meta: Arc<ActorMeta>,
51        group: Arc<ScopeGroupShared>,
52    ) -> Self {
53        Self {
54            trace_id: Cell::new(trace_id),
55            actor: Arc::new(ScopeActorShared::new(addr, meta)),
56            group,
57        }
58    }
59
60    pub(crate) fn with_telemetry(mut self, config: &TelemetryConfig) -> Self {
61        self.actor = Arc::new(self.actor.with_telemetry(config));
62        self
63    }
64
65    #[inline]
66    pub fn actor(&self) -> Addr {
67        self.actor.addr
68    }
69
70    #[inline]
71    pub fn group(&self) -> Addr {
72        self.group.addr
73    }
74
75    #[instability::unstable]
76    #[inline]
77    pub fn node_no(&self) -> NodeNo {
78        self.group.node_no
79    }
80
81    #[instability::stable(since = "v0.2.0")]
82    #[inline]
83    pub fn node_launch_id(&self) -> NodeLaunchId {
84        self.group.node_launch_id
85    }
86
87    /// Returns the current object's meta.
88    #[inline]
89    pub fn meta(&self) -> &Arc<ActorMeta> {
90        &self.actor.meta
91    }
92
93    /// Private API for now.
94    #[inline]
95    #[instability::unstable]
96    #[doc(hidden)]
97    pub fn telemetry_meta(&self) -> &Arc<ActorMeta> {
98        &self.actor.telemetry_meta
99    }
100
101    /// Returns the current trace id.
102    #[inline]
103    pub fn trace_id(&self) -> TraceId {
104        self.trace_id.get()
105    }
106
107    /// Replaces the current trace id with the provided one.
108    #[inline]
109    pub fn set_trace_id(&self, trace_id: TraceId) {
110        self.trace_id.set(trace_id);
111    }
112
113    /// Returns the current permissions (for logging, telemetry and so on).
114    #[inline]
115    pub fn permissions(&self) -> Permissions {
116        self.group.permissions.load()
117    }
118
119    /// Private API for now.
120    #[doc(hidden)]
121    #[instability::unstable]
122    #[inline]
123    pub fn logging(&self) -> &LoggingControl {
124        &self.group.logging
125    }
126
127    /// Private API for now.
128    #[doc(hidden)]
129    #[instability::unstable]
130    #[inline]
131    pub fn dumping(&self) -> &DumpingControl {
132        &self.group.dumping
133    }
134
135    #[doc(hidden)]
136    #[instability::unstable]
137    pub fn increment_allocated_bytes(&self, by: usize) {
138        self.actor.allocated_bytes.fetch_add(by, Ordering::Relaxed);
139    }
140
141    #[doc(hidden)]
142    #[instability::unstable]
143    pub fn increment_deallocated_bytes(&self, by: usize) {
144        self.actor
145            .deallocated_bytes
146            .fetch_add(by, Ordering::Relaxed);
147    }
148
149    pub(crate) fn take_allocated_bytes(&self) -> usize {
150        self.actor.allocated_bytes.swap(0, Ordering::Relaxed)
151    }
152
153    pub(crate) fn take_deallocated_bytes(&self) -> usize {
154        self.actor.deallocated_bytes.swap(0, Ordering::Relaxed)
155    }
156
157    /// Wraps the provided future with the current scope.
158    pub async fn within<F: Future>(self, f: F) -> F::Output {
159        SCOPE.scope(self, f).await
160    }
161
162    /// Runs the provided function with the current scope.
163    pub fn sync_within<R>(self, f: impl FnOnce() -> R) -> R {
164        SCOPE.sync_scope(self, f)
165    }
166}
167
168struct ScopeActorShared {
169    addr: Addr,
170    meta: Arc<ActorMeta>,
171    telemetry_meta: Arc<ActorMeta>,
172    allocated_bytes: AtomicUsize,
173    deallocated_bytes: AtomicUsize,
174}
175
176impl ScopeActorShared {
177    fn new(addr: Addr, meta: Arc<ActorMeta>) -> Self {
178        Self {
179            addr,
180            meta: meta.clone(),
181            telemetry_meta: meta,
182            allocated_bytes: AtomicUsize::new(0),
183            deallocated_bytes: AtomicUsize::new(0),
184        }
185    }
186
187    fn with_telemetry(&self, config: &TelemetryConfig) -> Self {
188        Self {
189            addr: self.addr,
190            meta: self.meta.clone(),
191            telemetry_meta: config
192                .per_actor_key
193                .key(&self.meta.key)
194                .map(|key| {
195                    Arc::new(ActorMeta {
196                        group: self.meta.group.clone(),
197                        key,
198                    })
199                })
200                .unwrap_or_else(|| self.meta.clone()),
201            allocated_bytes: AtomicUsize::new(0),
202            deallocated_bytes: AtomicUsize::new(0),
203        }
204    }
205}
206
207assert_impl_all!(ScopeGroupShared: Send, Sync);
208
209pub(crate) struct ScopeGroupShared {
210    node_no: NodeNo,
211    node_launch_id: NodeLaunchId,
212    addr: Addr,
213    permissions: AtomicPermissions,
214    logging: LoggingControl,
215    dumping: DumpingControl,
216}
217
218assert_impl_all!(ScopeGroupShared: Send, Sync);
219
220impl ScopeGroupShared {
221    pub(crate) fn new(node_no: NodeNo, node_launch_id: NodeLaunchId, addr: Addr) -> Self {
222        Self {
223            node_no,
224            node_launch_id,
225            addr,
226            permissions: Default::default(), // everything is disabled
227            logging: Default::default(),
228            dumping: Default::default(),
229        }
230    }
231
232    pub(crate) fn configure(&self, config: &SystemConfig) {
233        // Update the logging subsystem.
234        self.logging.configure(&config.logging);
235
236        // Update the dumping subsystem.
237        self.dumping.configure(&config.dumping);
238
239        // Update permissions.
240        let mut perm = self.permissions.load();
241        perm.set_logging_enabled(config.logging.max_level.into());
242        perm.set_dumping_enabled(!config.dumping.disabled);
243        perm.set_telemetry_per_actor_group_enabled(config.telemetry.per_actor_group);
244        perm.set_telemetry_per_actor_key_enabled(config.telemetry.per_actor_key.is_enabled());
245        self.permissions.store(perm);
246    }
247}
248
249/// Exposes the current scope in order to send to other tasks.
250///
251/// # Panics
252/// This function will panic if called outside actors.
253pub fn expose() -> Scope {
254    SCOPE.with(Clone::clone)
255}
256
257/// Exposes the current scope if inside the actor system.
258pub fn try_expose() -> Option<Scope> {
259    SCOPE.try_with(Clone::clone).ok()
260}
261
262/// Accesses the current scope and runs the provided closure.
263///
264/// # Panics
265/// This function will panic if called ouside the actor system.
266#[inline]
267pub fn with<R>(f: impl FnOnce(&Scope) -> R) -> R {
268    try_with(f).expect("cannot access a scope outside the actor system")
269}
270
271/// Accesses the current scope and runs the provided closure.
272///
273/// Returns `None` if called outside the actor system.
274/// For a panicking variant, see `with`.
275#[inline]
276pub fn try_with<R>(f: impl FnOnce(&Scope) -> R) -> Option<R> {
277    SCOPE.try_with(|scope| f(scope)).ok()
278}
279
280/// Returns the current trace id.
281///
282/// # Panics
283/// This function will panic if called ouside the actor system.
284#[inline]
285pub fn trace_id() -> TraceId {
286    with(Scope::trace_id)
287}
288
289/// Returns the current trace id if inside the actor system.
290#[inline]
291pub fn try_trace_id() -> Option<TraceId> {
292    try_with(Scope::trace_id)
293}
294
295/// Replaces the current trace id with the provided one.
296///
297/// # Panics
298/// This function will panic if called ouside the actor system.
299#[inline]
300pub fn set_trace_id(trace_id: TraceId) {
301    with(|scope| scope.set_trace_id(trace_id));
302}
303
304/// Replaces the current trace id with the provided one
305/// if inside the actor system.
306///
307/// Returns `true` if the trace id has been replaced.
308#[inline]
309pub fn try_set_trace_id(trace_id: TraceId) -> bool {
310    try_with(|scope| scope.set_trace_id(trace_id)).is_some()
311}
312
313/// Returns the current object's meta.
314///
315/// # Panics
316/// This function will panic if called ouside the actor system.
317#[inline]
318pub fn meta() -> Arc<ActorMeta> {
319    with(|scope| scope.meta().clone())
320}
321
322/// Returns the current object's meta if inside the actor system.
323#[inline]
324pub fn try_meta() -> Option<Arc<ActorMeta>> {
325    try_with(|scope| scope.meta().clone())
326}
327
328/// Returns the node number.
329///
330/// # Panics
331/// This function will panic if called ouside the actor system.
332#[instability::unstable]
333#[inline]
334pub fn node_no() -> NodeNo {
335    with(|scope| scope.node_no())
336}
337
338/// Returns the node number if inside the actor system.
339#[instability::unstable]
340#[inline]
341pub fn try_node_no() -> Option<NodeNo> {
342    try_with(|scope| scope.node_no())
343}
344
345thread_local! {
346    static SERDE_MODE: Cell<SerdeMode> = const { Cell::new(SerdeMode::Normal) };
347}
348
349/// A mode of (de)serialization.
350/// Useful to alternate a behavior depending on a context.
351#[instability::unstable]
352#[derive(Clone, Copy, PartialEq, Eq)]
353#[non_exhaustive]
354pub enum SerdeMode {
355    /// A default mode, regular ser/de calls.
356    Normal,
357    /// Serialization for dumping purposes.
358    Dumping,
359    /// Serialization for network purposes.
360    Network,
361}
362
363/// Sets the specified serde mode and runs the function.
364///
365/// # Panics
366/// If the provided function panics.
367#[instability::unstable]
368#[inline]
369pub fn with_serde_mode<R>(mode: SerdeMode, f: impl FnOnce() -> R) -> R {
370    // We use a guard here to restore the current serde mode even on panics.
371    struct Guard(SerdeMode);
372    impl Drop for Guard {
373        fn drop(&mut self) {
374            SERDE_MODE.with(|cell| cell.set(self.0));
375        }
376    }
377
378    let mode = SERDE_MODE.with(|cell| cell.replace(mode));
379    let _guard = Guard(mode);
380    f()
381}
382
383/// Returns the current serde mode.
384#[instability::unstable]
385#[inline]
386pub fn serde_mode() -> SerdeMode {
387    SERDE_MODE.with(Cell::get)
388}
389
390#[test]
391fn serde_mode_works() {
392    #[derive(serde::Serialize)]
393    struct S {
394        #[serde(serialize_with = "crate::dumping::hide")]
395        f: u32,
396    }
397
398    let value = S { f: 42 };
399
400    // `Normal` mode
401    assert_eq!(serde_json::to_string(&value).unwrap(), r#"{"f":42}"#);
402
403    // `Dumping` mode
404    let json = with_serde_mode(SerdeMode::Dumping, || {
405        serde_json::to_string(&value).unwrap()
406    });
407    assert_eq!(json, r#"{"f":"<hidden>"}"#);
408
409    // Restored `Normal` mode
410    assert_eq!(serde_json::to_string(&value).unwrap(), r#"{"f":42}"#);
411
412    // `Normal` mode must be restored after panic
413    let res = std::panic::catch_unwind(|| with_serde_mode(SerdeMode::Dumping, || panic!("oops")));
414    assert!(res.is_err());
415    assert_eq!(serde_json::to_string(&value).unwrap(), r#"{"f":42}"#);
416}