1#![allow(clippy::declare_interior_mutable_const)] use std::{
4 cell::Cell,
5 future::Future,
6 sync::{
7 atomic::{AtomicUsize, Ordering},
8 Arc,
9 },
10};
11
12use crate::{
13 actor::ActorMeta,
14 addr::Addr,
15 config::SystemConfig,
16 dumping::DumpingControl,
17 logging::_priv::LoggingControl,
18 permissions::{AtomicPermissions, Permissions},
19 telemetry::TelemetryConfig,
20 tracing::TraceId,
21};
22
23tokio::task_local! {
24 static SCOPE: Scope;
25}
26
27#[derive(Clone)]
28pub struct Scope {
29 trace_id: Cell<TraceId>,
30 actor: Arc<ScopeActorShared>,
31 group: Arc<ScopeGroupShared>,
32}
33
34assert_impl_all!(Scope: Send);
35assert_not_impl_all!(Scope: Sync);
36
37impl Scope {
38 #[doc(hidden)]
40 pub fn test(actor: Addr, meta: Arc<ActorMeta>) -> Self {
41 Self::new(
42 TraceId::generate(),
43 actor,
44 meta,
45 Arc::new(ScopeGroupShared::new(Addr::NULL)),
46 )
47 }
48
49 pub(crate) fn new(
50 trace_id: TraceId,
51 addr: Addr,
52 meta: Arc<ActorMeta>,
53 group: Arc<ScopeGroupShared>,
54 ) -> Self {
55 Self {
56 trace_id: Cell::new(trace_id),
57 actor: Arc::new(ScopeActorShared::new(addr, meta)),
58 group,
59 }
60 }
61
62 pub(crate) fn with_telemetry(mut self, config: &TelemetryConfig) -> Self {
63 self.actor = Arc::new(self.actor.with_telemetry(config));
64 self
65 }
66
67 #[inline]
68 #[deprecated(note = "use `actor()` instead")]
69 pub fn addr(&self) -> Addr {
70 self.actor()
71 }
72
73 #[inline]
74 pub fn actor(&self) -> Addr {
75 self.actor.addr
76 }
77
78 #[inline]
79 pub fn group(&self) -> Addr {
80 self.group.addr
81 }
82
83 #[inline]
85 pub fn meta(&self) -> &Arc<ActorMeta> {
86 &self.actor.meta
87 }
88
89 #[inline]
91 #[stability::unstable]
92 #[doc(hidden)]
93 pub fn telemetry_meta(&self) -> &Arc<ActorMeta> {
94 &self.actor.telemetry_meta
95 }
96
97 #[inline]
99 pub fn trace_id(&self) -> TraceId {
100 self.trace_id.get()
101 }
102
103 #[inline]
105 pub fn set_trace_id(&self, trace_id: TraceId) {
106 self.trace_id.set(trace_id);
107 }
108
109 #[inline]
111 pub fn permissions(&self) -> Permissions {
112 self.group.permissions.load()
113 }
114
115 #[inline]
117 #[stability::unstable]
118 #[doc(hidden)]
119 pub fn logging(&self) -> &LoggingControl {
120 &self.group.logging
121 }
122
123 #[inline]
125 #[stability::unstable]
126 #[doc(hidden)]
127 pub fn dumping(&self) -> &DumpingControl {
128 &self.group.dumping
129 }
130
131 #[doc(hidden)]
132 #[stability::unstable]
133 pub fn increment_allocated_bytes(&self, by: usize) {
134 self.actor.allocated_bytes.fetch_add(by, Ordering::Relaxed);
135 }
136
137 #[doc(hidden)]
138 #[stability::unstable]
139 pub fn increment_deallocated_bytes(&self, by: usize) {
140 self.actor
141 .deallocated_bytes
142 .fetch_add(by, Ordering::Relaxed);
143 }
144
145 pub(crate) fn take_allocated_bytes(&self) -> usize {
146 self.actor.allocated_bytes.swap(0, Ordering::Relaxed)
147 }
148
149 pub(crate) fn take_deallocated_bytes(&self) -> usize {
150 self.actor.deallocated_bytes.swap(0, Ordering::Relaxed)
151 }
152
153 pub async fn within<F: Future>(self, f: F) -> F::Output {
155 SCOPE.scope(self, f).await
156 }
157
158 pub fn sync_within<R>(self, f: impl FnOnce() -> R) -> R {
160 SCOPE.sync_scope(self, f)
161 }
162}
163
164struct ScopeActorShared {
165 addr: Addr,
166 meta: Arc<ActorMeta>,
167 telemetry_meta: Arc<ActorMeta>,
168 allocated_bytes: AtomicUsize,
169 deallocated_bytes: AtomicUsize,
170}
171
172impl ScopeActorShared {
173 fn new(addr: Addr, meta: Arc<ActorMeta>) -> Self {
174 Self {
175 addr,
176 meta: meta.clone(),
177 telemetry_meta: meta,
178 allocated_bytes: AtomicUsize::new(0),
179 deallocated_bytes: AtomicUsize::new(0),
180 }
181 }
182
183 fn with_telemetry(&self, config: &TelemetryConfig) -> Self {
184 Self {
185 addr: self.addr,
186 meta: self.meta.clone(),
187 telemetry_meta: config
188 .per_actor_key
189 .key(&self.meta.key)
190 .map(|key| {
191 Arc::new(ActorMeta {
192 group: self.meta.group.clone(),
193 key,
194 })
195 })
196 .unwrap_or_else(|| self.meta.clone()),
197 allocated_bytes: AtomicUsize::new(0),
198 deallocated_bytes: AtomicUsize::new(0),
199 }
200 }
201}
202
203assert_impl_all!(ScopeGroupShared: Send, Sync);
204
205pub(crate) struct ScopeGroupShared {
206 addr: Addr,
207 permissions: AtomicPermissions,
208 logging: LoggingControl,
209 dumping: DumpingControl,
210}
211
212assert_impl_all!(ScopeGroupShared: Send, Sync);
213
214impl ScopeGroupShared {
215 pub(crate) fn new(addr: Addr) -> Self {
216 Self {
217 addr,
218 permissions: Default::default(), logging: Default::default(),
220 dumping: Default::default(),
221 }
222 }
223
224 pub(crate) fn configure(&self, config: &SystemConfig) {
225 self.logging.configure(&config.logging);
227 let max_level = self.logging.max_level_hint().into_level();
228
229 self.dumping.configure(&config.dumping);
231
232 let mut perm = self.permissions.load();
234 perm.set_logging_enabled(max_level);
235 perm.set_dumping_enabled(!config.dumping.disabled);
236 perm.set_telemetry_per_actor_group_enabled(config.telemetry.per_actor_group);
237 perm.set_telemetry_per_actor_key_enabled(config.telemetry.per_actor_key.is_enabled());
238 self.permissions.store(perm);
239 }
240}
241
242pub fn expose() -> Scope {
247 SCOPE.with(Clone::clone)
248}
249
250pub fn try_expose() -> Option<Scope> {
252 SCOPE.try_with(Clone::clone).ok()
253}
254
255#[inline]
260pub fn with<R>(f: impl FnOnce(&Scope) -> R) -> R {
261 try_with(f).expect("cannot access a scope outside the actor system")
262}
263
264#[inline]
269pub fn try_with<R>(f: impl FnOnce(&Scope) -> R) -> Option<R> {
270 SCOPE.try_with(|scope| f(scope)).ok()
271}
272
273#[inline]
278pub fn trace_id() -> TraceId {
279 with(Scope::trace_id)
280}
281
282#[inline]
284pub fn try_trace_id() -> Option<TraceId> {
285 try_with(Scope::trace_id)
286}
287
288#[inline]
293pub fn set_trace_id(trace_id: TraceId) {
294 with(|scope| scope.set_trace_id(trace_id));
295}
296
297#[inline]
302pub fn try_set_trace_id(trace_id: TraceId) -> bool {
303 try_with(|scope| scope.set_trace_id(trace_id)).is_some()
304}
305
306#[inline]
311pub fn meta() -> Arc<ActorMeta> {
312 with(|scope| scope.meta().clone())
313}
314
315#[inline]
317pub fn try_meta() -> Option<Arc<ActorMeta>> {
318 try_with(|scope| scope.meta().clone())
319}
320
321thread_local! {
322 static SERDE_MODE: Cell<SerdeMode> = Cell::new(SerdeMode::Normal);
323}
324
325#[stability::unstable]
328#[derive(Clone, Copy, PartialEq, Eq)]
329#[non_exhaustive]
330pub enum SerdeMode {
331 Normal,
333 Dumping,
335 }
337
338#[stability::unstable]
343#[inline]
344pub fn with_serde_mode<R>(mode: SerdeMode, f: impl FnOnce() -> R) -> R {
345 struct Guard(SerdeMode);
347 impl Drop for Guard {
348 fn drop(&mut self) {
349 SERDE_MODE.with(|cell| cell.set(self.0));
350 }
351 }
352
353 let mode = SERDE_MODE.with(|cell| cell.replace(mode));
354 let _guard = Guard(mode);
355 f()
356}
357
358#[stability::unstable]
360#[inline]
361pub fn serde_mode() -> SerdeMode {
362 SERDE_MODE.with(Cell::get)
363}
364
365#[test]
366fn serde_mode_works() {
367 #[derive(serde::Serialize)]
368 struct S {
369 #[serde(serialize_with = "crate::dumping::hide")]
370 f: u32,
371 }
372
373 let value = S { f: 42 };
374
375 assert_eq!(serde_json::to_string(&value).unwrap(), r#"{"f":42}"#);
377
378 let json = with_serde_mode(SerdeMode::Dumping, || {
380 serde_json::to_string(&value).unwrap()
381 });
382 assert_eq!(json, r#"{"f":"<hidden>"}"#);
383
384 assert_eq!(serde_json::to_string(&value).unwrap(), r#"{"f":42}"#);
386
387 let res = std::panic::catch_unwind(|| with_serde_mode(SerdeMode::Dumping, || panic!("oops")));
389 assert!(res.is_err());
390 assert_eq!(serde_json::to_string(&value).unwrap(), r#"{"f":42}"#);
391}