Skip to main content

atomr_telemetry/
lib.rs

1//! # atomr-telemetry
2//!
3//! Optional probe surface for observing a running `atomr` node.
4//!
5//! The crate is passive and opt-in. Construct a [`TelemetryExtension`] and
6//! register it on an [`atomr_core::actor::ActorSystem`] via
7//! `atomr_core::actor::Extensions`. Subsystems check for the extension
8//! at runtime (cheap `Arc<T>` lookup) and, when present, emit snapshots +
9//! events into the telemetry [`bus::TelemetryBus`]. When absent, there is
10//! no cost beyond a single `DashMap` lookup.
11//!
12//! See [`crate::exporters`] for the Prometheus / OpenTelemetry exporters
13//! gated behind cargo features.
14
15#![forbid(unsafe_code)]
16#![deny(rust_2018_idioms)]
17
18pub mod actor_registry;
19pub mod bus;
20pub mod cluster;
21pub mod ddata;
22pub mod dead_letters;
23pub mod dto;
24pub mod exporters;
25pub mod persistence;
26pub mod remote;
27pub mod sharding;
28pub mod streams;
29
30use std::sync::Arc;
31
32use atomr_core::actor::ActorSystem;
33use parking_lot::RwLock;
34
35use crate::actor_registry::ActorRegistry;
36use crate::bus::TelemetryBus;
37use crate::dead_letters::DeadLetterFeed;
38
39/// The telemetry extension. Construct once per actor system, register via
40/// the actor system's extensions, and all other probes will pick it up.
41pub struct TelemetryExtension {
42    pub node: String,
43    pub bus: TelemetryBus,
44    pub actors: Arc<ActorRegistry>,
45    pub dead_letters: Arc<DeadLetterFeed>,
46    pub cluster: Arc<cluster::ClusterProbe>,
47    pub sharding: Arc<sharding::ShardingProbe>,
48    pub persistence: Arc<persistence::PersistenceProbe>,
49    pub remote: Arc<remote::RemoteProbe>,
50    pub streams: Arc<streams::StreamsProbe>,
51    pub ddata: Arc<ddata::DDataProbe>,
52    pub(crate) exporters: RwLock<Vec<Arc<dyn exporters::Exporter>>>,
53}
54
55impl TelemetryExtension {
56    /// Build a telemetry extension for the given node name. Channel
57    /// capacity controls how many in-flight `TelemetryEvent`s the broadcast
58    /// bus will buffer per subscriber.
59    pub fn new(node: impl Into<String>, channel_capacity: usize) -> Arc<Self> {
60        let bus = TelemetryBus::new(channel_capacity);
61        Arc::new(Self {
62            node: node.into(),
63            actors: Arc::new(ActorRegistry::new(bus.clone())),
64            dead_letters: Arc::new(DeadLetterFeed::new(bus.clone(), 1024)),
65            cluster: Arc::new(cluster::ClusterProbe::new(bus.clone())),
66            sharding: Arc::new(sharding::ShardingProbe::new(bus.clone())),
67            persistence: Arc::new(persistence::PersistenceProbe::new(bus.clone())),
68            remote: Arc::new(remote::RemoteProbe::new(bus.clone())),
69            streams: Arc::new(streams::StreamsProbe::new(bus.clone())),
70            ddata: Arc::new(ddata::DDataProbe::new(bus.clone())),
71            bus,
72            exporters: RwLock::new(Vec::new()),
73        })
74    }
75
76    /// Install this extension on the given `ActorSystem`. Returns a clone
77    /// of the shared `Arc<TelemetryExtension>`; the caller may keep it to
78    /// feed probes directly from application code.
79    pub fn install(self: Arc<Self>, system: &ActorSystem) -> Arc<Self> {
80        system.extensions().register(TelemetryHandle(self.clone()));
81        system.set_spawn_observer(self.actors.clone());
82        system.set_dead_letter_observer(self.dead_letters.clone());
83        self
84    }
85
86    /// Look up an installed extension on an `ActorSystem`.
87    pub fn from_system(system: &ActorSystem) -> Option<Arc<Self>> {
88        system.extensions().get::<TelemetryHandle>().map(|h| h.0.clone())
89    }
90
91    /// Register an exporter. Exporters receive every event published to
92    /// the bus and may poll probes for snapshots on their own cadence.
93    pub fn add_exporter(&self, exporter: Arc<dyn exporters::Exporter>) {
94        self.bus.attach_exporter(exporter.clone());
95        self.exporters.write().push(exporter);
96    }
97
98    /// Snapshot the full telemetry state of this node (one JSON payload).
99    pub fn snapshot(&self) -> dto::NodeSnapshot {
100        dto::NodeSnapshot {
101            node: self.node.clone(),
102            generated_at: chrono::Utc::now().to_rfc3339(),
103            actors: self.actors.snapshot(),
104            dead_letters: self.dead_letters.recent(100),
105            cluster: self.cluster.snapshot(),
106            sharding: self.sharding.snapshot(),
107            persistence: self.persistence.snapshot(),
108            remote: self.remote.snapshot(),
109            streams: self.streams.snapshot(),
110            ddata: self.ddata.snapshot(),
111        }
112    }
113}
114
115/// Shim so we can register `Arc<TelemetryExtension>` in the typed
116/// `Extensions` bag under a stable handle type.
117pub struct TelemetryHandle(pub Arc<TelemetryExtension>);