1#![allow(clippy::declare_interior_mutable_const)] use std::{
4 cell::Cell,
5 future::Future,
6 sync::{
7 atomic::{AtomicUsize, Ordering},
8 Arc,
9 },
10};
11
12use crate::{
13 actor::ActorMeta,
14 addr::{Addr, NodeLaunchId, NodeNo},
15 config::SystemConfig,
16 dumping::DumpingControl,
17 logging::LoggingControl,
18 permissions::{AtomicPermissions, Permissions},
19 telemetry::config::TelemetryConfig,
20 tracing::TraceId,
21};
22
23tokio::task_local! {
24 static SCOPE: Scope;
25}
26
27#[derive(Clone)]
28pub struct Scope {
29 trace_id: Cell<TraceId>,
30 actor: Arc<ScopeActorShared>,
31 group: Arc<ScopeGroupShared>,
32}
33
34assert_impl_all!(Scope: Send);
35assert_not_impl_all!(Scope: Sync);
36
37impl Scope {
38 #[doc(hidden)]
40 pub fn test(actor: Addr, meta: Arc<ActorMeta>) -> Self {
41 let node_no = NodeNo::from_bits(u16::MAX).unwrap();
42 let node_launch_id = NodeLaunchId::from_bits(u64::MAX).unwrap();
43 let group_scope = Arc::new(ScopeGroupShared::new(node_no, node_launch_id, Addr::NULL));
44 Self::new(TraceId::generate(), actor, meta, group_scope)
45 }
46
47 pub(crate) fn new(
48 trace_id: TraceId,
49 addr: Addr,
50 meta: Arc<ActorMeta>,
51 group: Arc<ScopeGroupShared>,
52 ) -> Self {
53 Self {
54 trace_id: Cell::new(trace_id),
55 actor: Arc::new(ScopeActorShared::new(addr, meta)),
56 group,
57 }
58 }
59
60 pub(crate) fn with_telemetry(mut self, config: &TelemetryConfig) -> Self {
61 self.actor = Arc::new(self.actor.with_telemetry(config));
62 self
63 }
64
65 #[inline]
66 pub fn actor(&self) -> Addr {
67 self.actor.addr
68 }
69
70 #[inline]
71 pub fn group(&self) -> Addr {
72 self.group.addr
73 }
74
75 #[instability::unstable]
76 #[inline]
77 pub fn node_no(&self) -> NodeNo {
78 self.group.node_no
79 }
80
81 #[instability::stable(since = "v0.2.0")]
82 #[inline]
83 pub fn node_launch_id(&self) -> NodeLaunchId {
84 self.group.node_launch_id
85 }
86
87 #[inline]
89 pub fn meta(&self) -> &Arc<ActorMeta> {
90 &self.actor.meta
91 }
92
93 #[inline]
95 #[instability::unstable]
96 #[doc(hidden)]
97 pub fn telemetry_meta(&self) -> &Arc<ActorMeta> {
98 &self.actor.telemetry_meta
99 }
100
101 #[inline]
103 pub fn trace_id(&self) -> TraceId {
104 self.trace_id.get()
105 }
106
107 #[inline]
109 pub fn set_trace_id(&self, trace_id: TraceId) {
110 self.trace_id.set(trace_id);
111 }
112
113 #[inline]
115 pub fn permissions(&self) -> Permissions {
116 self.group.permissions.load()
117 }
118
119 #[doc(hidden)]
121 #[instability::unstable]
122 #[inline]
123 pub fn logging(&self) -> &LoggingControl {
124 &self.group.logging
125 }
126
127 #[doc(hidden)]
129 #[instability::unstable]
130 #[inline]
131 pub fn dumping(&self) -> &DumpingControl {
132 &self.group.dumping
133 }
134
135 #[doc(hidden)]
136 #[instability::unstable]
137 pub fn increment_allocated_bytes(&self, by: usize) {
138 self.actor.allocated_bytes.fetch_add(by, Ordering::Relaxed);
139 }
140
141 #[doc(hidden)]
142 #[instability::unstable]
143 pub fn increment_deallocated_bytes(&self, by: usize) {
144 self.actor
145 .deallocated_bytes
146 .fetch_add(by, Ordering::Relaxed);
147 }
148
149 pub(crate) fn take_allocated_bytes(&self) -> usize {
150 self.actor.allocated_bytes.swap(0, Ordering::Relaxed)
151 }
152
153 pub(crate) fn take_deallocated_bytes(&self) -> usize {
154 self.actor.deallocated_bytes.swap(0, Ordering::Relaxed)
155 }
156
157 pub async fn within<F: Future>(self, f: F) -> F::Output {
159 SCOPE.scope(self, f).await
160 }
161
162 pub fn sync_within<R>(self, f: impl FnOnce() -> R) -> R {
164 SCOPE.sync_scope(self, f)
165 }
166}
167
168struct ScopeActorShared {
169 addr: Addr,
170 meta: Arc<ActorMeta>,
171 telemetry_meta: Arc<ActorMeta>,
172 allocated_bytes: AtomicUsize,
173 deallocated_bytes: AtomicUsize,
174}
175
176impl ScopeActorShared {
177 fn new(addr: Addr, meta: Arc<ActorMeta>) -> Self {
178 Self {
179 addr,
180 meta: meta.clone(),
181 telemetry_meta: meta,
182 allocated_bytes: AtomicUsize::new(0),
183 deallocated_bytes: AtomicUsize::new(0),
184 }
185 }
186
187 fn with_telemetry(&self, config: &TelemetryConfig) -> Self {
188 Self {
189 addr: self.addr,
190 meta: self.meta.clone(),
191 telemetry_meta: config
192 .per_actor_key
193 .key(&self.meta.key)
194 .map(|key| {
195 Arc::new(ActorMeta {
196 group: self.meta.group.clone(),
197 key,
198 })
199 })
200 .unwrap_or_else(|| self.meta.clone()),
201 allocated_bytes: AtomicUsize::new(0),
202 deallocated_bytes: AtomicUsize::new(0),
203 }
204 }
205}
206
207assert_impl_all!(ScopeGroupShared: Send, Sync);
208
209pub(crate) struct ScopeGroupShared {
210 node_no: NodeNo,
211 node_launch_id: NodeLaunchId,
212 addr: Addr,
213 permissions: AtomicPermissions,
214 logging: LoggingControl,
215 dumping: DumpingControl,
216}
217
218assert_impl_all!(ScopeGroupShared: Send, Sync);
219
220impl ScopeGroupShared {
221 pub(crate) fn new(node_no: NodeNo, node_launch_id: NodeLaunchId, addr: Addr) -> Self {
222 Self {
223 node_no,
224 node_launch_id,
225 addr,
226 permissions: Default::default(), logging: Default::default(),
228 dumping: Default::default(),
229 }
230 }
231
232 pub(crate) fn configure(&self, config: &SystemConfig) {
233 self.logging.configure(&config.logging);
235
236 self.dumping.configure(&config.dumping);
238
239 let mut perm = self.permissions.load();
241 perm.set_logging_enabled(config.logging.max_level.into());
242 perm.set_dumping_enabled(!config.dumping.disabled);
243 perm.set_telemetry_per_actor_group_enabled(config.telemetry.per_actor_group);
244 perm.set_telemetry_per_actor_key_enabled(config.telemetry.per_actor_key.is_enabled());
245 self.permissions.store(perm);
246 }
247}
248
249pub fn expose() -> Scope {
254 SCOPE.with(Clone::clone)
255}
256
257pub fn try_expose() -> Option<Scope> {
259 SCOPE.try_with(Clone::clone).ok()
260}
261
262#[inline]
267pub fn with<R>(f: impl FnOnce(&Scope) -> R) -> R {
268 try_with(f).expect("cannot access a scope outside the actor system")
269}
270
271#[inline]
276pub fn try_with<R>(f: impl FnOnce(&Scope) -> R) -> Option<R> {
277 SCOPE.try_with(|scope| f(scope)).ok()
278}
279
280#[inline]
285pub fn trace_id() -> TraceId {
286 with(Scope::trace_id)
287}
288
289#[inline]
291pub fn try_trace_id() -> Option<TraceId> {
292 try_with(Scope::trace_id)
293}
294
295#[inline]
300pub fn set_trace_id(trace_id: TraceId) {
301 with(|scope| scope.set_trace_id(trace_id));
302}
303
304#[inline]
309pub fn try_set_trace_id(trace_id: TraceId) -> bool {
310 try_with(|scope| scope.set_trace_id(trace_id)).is_some()
311}
312
313#[inline]
318pub fn meta() -> Arc<ActorMeta> {
319 with(|scope| scope.meta().clone())
320}
321
322#[inline]
324pub fn try_meta() -> Option<Arc<ActorMeta>> {
325 try_with(|scope| scope.meta().clone())
326}
327
328#[instability::unstable]
333#[inline]
334pub fn node_no() -> NodeNo {
335 with(|scope| scope.node_no())
336}
337
338#[instability::unstable]
340#[inline]
341pub fn try_node_no() -> Option<NodeNo> {
342 try_with(|scope| scope.node_no())
343}
344
345thread_local! {
346 static SERDE_MODE: Cell<SerdeMode> = const { Cell::new(SerdeMode::Normal) };
347}
348
349#[instability::unstable]
352#[derive(Clone, Copy, PartialEq, Eq)]
353#[non_exhaustive]
354pub enum SerdeMode {
355 Normal,
357 Dumping,
359 Network,
361}
362
363#[instability::unstable]
368#[inline]
369pub fn with_serde_mode<R>(mode: SerdeMode, f: impl FnOnce() -> R) -> R {
370 struct Guard(SerdeMode);
372 impl Drop for Guard {
373 fn drop(&mut self) {
374 SERDE_MODE.with(|cell| cell.set(self.0));
375 }
376 }
377
378 let mode = SERDE_MODE.with(|cell| cell.replace(mode));
379 let _guard = Guard(mode);
380 f()
381}
382
383#[instability::unstable]
385#[inline]
386pub fn serde_mode() -> SerdeMode {
387 SERDE_MODE.with(Cell::get)
388}
389
390#[test]
391fn serde_mode_works() {
392 #[derive(serde::Serialize)]
393 struct S {
394 #[serde(serialize_with = "crate::dumping::hide")]
395 f: u32,
396 }
397
398 let value = S { f: 42 };
399
400 assert_eq!(serde_json::to_string(&value).unwrap(), r#"{"f":42}"#);
402
403 let json = with_serde_mode(SerdeMode::Dumping, || {
405 serde_json::to_string(&value).unwrap()
406 });
407 assert_eq!(json, r#"{"f":"<hidden>"}"#);
408
409 assert_eq!(serde_json::to_string(&value).unwrap(), r#"{"f":42}"#);
411
412 let res = std::panic::catch_unwind(|| with_serde_mode(SerdeMode::Dumping, || panic!("oops")));
414 assert!(res.is_err());
415 assert_eq!(serde_json::to_string(&value).unwrap(), r#"{"f":42}"#);
416}