#![allow(clippy::declare_interior_mutable_const)]
use std::{
cell::Cell,
future::Future,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use crate::{
actor::ActorMeta,
addr::{Addr, NodeLaunchId, NodeNo},
config::SystemConfig,
dumping::DumpingControl,
logging::LoggingControl,
permissions::{AtomicPermissions, Permissions},
telemetry::config::TelemetryConfig,
tracing::TraceId,
};
tokio::task_local! {
static SCOPE: Scope;
}
#[derive(Clone)]
pub struct Scope {
trace_id: Cell<TraceId>,
actor: Arc<ScopeActorShared>,
group: Arc<ScopeGroupShared>,
}
assert_impl_all!(Scope: Send);
assert_not_impl_all!(Scope: Sync);
impl Scope {
#[doc(hidden)]
pub fn test(actor: Addr, meta: Arc<ActorMeta>) -> Self {
let node_no = NodeNo::from_bits(u16::MAX).unwrap();
let node_launch_id = NodeLaunchId::from_bits(u64::MAX).unwrap();
let group_scope = Arc::new(ScopeGroupShared::new(node_no, node_launch_id, Addr::NULL));
Self::new(TraceId::generate(), actor, meta, group_scope)
}
pub(crate) fn new(
trace_id: TraceId,
addr: Addr,
meta: Arc<ActorMeta>,
group: Arc<ScopeGroupShared>,
) -> Self {
Self {
trace_id: Cell::new(trace_id),
actor: Arc::new(ScopeActorShared::new(addr, meta)),
group,
}
}
pub(crate) fn with_telemetry(mut self, config: &TelemetryConfig) -> Self {
self.actor = Arc::new(self.actor.with_telemetry(config));
self
}
#[inline]
pub fn actor(&self) -> Addr {
self.actor.addr
}
#[inline]
pub fn group(&self) -> Addr {
self.group.addr
}
#[instability::unstable]
#[inline]
pub fn node_no(&self) -> NodeNo {
self.group.node_no
}
#[instability::stable(since = "v0.2.0")]
#[inline]
pub fn node_launch_id(&self) -> NodeLaunchId {
self.group.node_launch_id
}
#[inline]
pub fn meta(&self) -> &Arc<ActorMeta> {
&self.actor.meta
}
#[inline]
#[instability::unstable]
#[doc(hidden)]
pub fn telemetry_meta(&self) -> &Arc<ActorMeta> {
&self.actor.telemetry_meta
}
#[inline]
pub fn trace_id(&self) -> TraceId {
self.trace_id.get()
}
#[inline]
pub fn set_trace_id(&self, trace_id: TraceId) {
self.trace_id.set(trace_id);
}
#[inline]
pub fn permissions(&self) -> Permissions {
self.group.permissions.load()
}
#[doc(hidden)]
#[instability::unstable]
#[inline]
pub fn logging(&self) -> &LoggingControl {
&self.group.logging
}
#[doc(hidden)]
#[instability::unstable]
#[inline]
pub fn dumping(&self) -> &DumpingControl {
&self.group.dumping
}
#[doc(hidden)]
#[instability::unstable]
pub fn increment_allocated_bytes(&self, by: usize) {
self.actor.allocated_bytes.fetch_add(by, Ordering::Relaxed);
}
#[doc(hidden)]
#[instability::unstable]
pub fn increment_deallocated_bytes(&self, by: usize) {
self.actor
.deallocated_bytes
.fetch_add(by, Ordering::Relaxed);
}
pub(crate) fn take_allocated_bytes(&self) -> usize {
self.actor.allocated_bytes.swap(0, Ordering::Relaxed)
}
pub(crate) fn take_deallocated_bytes(&self) -> usize {
self.actor.deallocated_bytes.swap(0, Ordering::Relaxed)
}
pub async fn within<F: Future>(self, f: F) -> F::Output {
SCOPE.scope(self, f).await
}
pub fn sync_within<R>(self, f: impl FnOnce() -> R) -> R {
SCOPE.sync_scope(self, f)
}
}
struct ScopeActorShared {
addr: Addr,
meta: Arc<ActorMeta>,
telemetry_meta: Arc<ActorMeta>,
allocated_bytes: AtomicUsize,
deallocated_bytes: AtomicUsize,
}
impl ScopeActorShared {
fn new(addr: Addr, meta: Arc<ActorMeta>) -> Self {
Self {
addr,
meta: meta.clone(),
telemetry_meta: meta,
allocated_bytes: AtomicUsize::new(0),
deallocated_bytes: AtomicUsize::new(0),
}
}
fn with_telemetry(&self, config: &TelemetryConfig) -> Self {
Self {
addr: self.addr,
meta: self.meta.clone(),
telemetry_meta: config
.per_actor_key
.key(&self.meta.key)
.map(|key| {
Arc::new(ActorMeta {
group: self.meta.group.clone(),
key,
})
})
.unwrap_or_else(|| self.meta.clone()),
allocated_bytes: AtomicUsize::new(0),
deallocated_bytes: AtomicUsize::new(0),
}
}
}
assert_impl_all!(ScopeGroupShared: Send, Sync);
pub(crate) struct ScopeGroupShared {
node_no: NodeNo,
node_launch_id: NodeLaunchId,
addr: Addr,
permissions: AtomicPermissions,
logging: LoggingControl,
dumping: DumpingControl,
}
assert_impl_all!(ScopeGroupShared: Send, Sync);
impl ScopeGroupShared {
pub(crate) fn new(node_no: NodeNo, node_launch_id: NodeLaunchId, addr: Addr) -> Self {
Self {
node_no,
node_launch_id,
addr,
permissions: Default::default(), logging: Default::default(),
dumping: Default::default(),
}
}
pub(crate) fn configure(&self, config: &SystemConfig) {
self.logging.configure(&config.logging);
self.dumping.configure(&config.dumping);
let mut perm = self.permissions.load();
perm.set_logging_enabled(config.logging.max_level.into());
perm.set_dumping_enabled(!config.dumping.disabled);
perm.set_telemetry_per_actor_group_enabled(config.telemetry.per_actor_group);
perm.set_telemetry_per_actor_key_enabled(config.telemetry.per_actor_key.is_enabled());
self.permissions.store(perm);
}
}
pub fn expose() -> Scope {
SCOPE.with(Clone::clone)
}
pub fn try_expose() -> Option<Scope> {
SCOPE.try_with(Clone::clone).ok()
}
#[inline]
pub fn with<R>(f: impl FnOnce(&Scope) -> R) -> R {
try_with(f).expect("cannot access a scope outside the actor system")
}
#[inline]
pub fn try_with<R>(f: impl FnOnce(&Scope) -> R) -> Option<R> {
SCOPE.try_with(|scope| f(scope)).ok()
}
#[inline]
pub fn trace_id() -> TraceId {
with(Scope::trace_id)
}
#[inline]
pub fn try_trace_id() -> Option<TraceId> {
try_with(Scope::trace_id)
}
#[inline]
pub fn set_trace_id(trace_id: TraceId) {
with(|scope| scope.set_trace_id(trace_id));
}
#[inline]
pub fn try_set_trace_id(trace_id: TraceId) -> bool {
try_with(|scope| scope.set_trace_id(trace_id)).is_some()
}
#[inline]
pub fn meta() -> Arc<ActorMeta> {
with(|scope| scope.meta().clone())
}
#[inline]
pub fn try_meta() -> Option<Arc<ActorMeta>> {
try_with(|scope| scope.meta().clone())
}
#[instability::unstable]
#[inline]
pub fn node_no() -> NodeNo {
with(|scope| scope.node_no())
}
#[instability::unstable]
#[inline]
pub fn try_node_no() -> Option<NodeNo> {
try_with(|scope| scope.node_no())
}
thread_local! {
static SERDE_MODE: Cell<SerdeMode> = const { Cell::new(SerdeMode::Normal) };
}
#[instability::unstable]
#[derive(Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum SerdeMode {
Normal,
Dumping,
Network,
}
#[instability::unstable]
#[inline]
pub fn with_serde_mode<R>(mode: SerdeMode, f: impl FnOnce() -> R) -> R {
struct Guard(SerdeMode);
impl Drop for Guard {
fn drop(&mut self) {
SERDE_MODE.with(|cell| cell.set(self.0));
}
}
let mode = SERDE_MODE.with(|cell| cell.replace(mode));
let _guard = Guard(mode);
f()
}
#[instability::unstable]
#[inline]
pub fn serde_mode() -> SerdeMode {
SERDE_MODE.with(Cell::get)
}
#[test]
fn serde_mode_works() {
#[derive(serde::Serialize)]
struct S {
#[serde(serialize_with = "crate::dumping::hide")]
f: u32,
}
let value = S { f: 42 };
assert_eq!(serde_json::to_string(&value).unwrap(), r#"{"f":42}"#);
let json = with_serde_mode(SerdeMode::Dumping, || {
serde_json::to_string(&value).unwrap()
});
assert_eq!(json, r#"{"f":"<hidden>"}"#);
assert_eq!(serde_json::to_string(&value).unwrap(), r#"{"f":42}"#);
let res = std::panic::catch_unwind(|| with_serde_mode(SerdeMode::Dumping, || panic!("oops")));
assert!(res.is_err());
assert_eq!(serde_json::to_string(&value).unwrap(), r#"{"f":42}"#);
}