1#![forbid(unsafe_code)]
16#![deny(rust_2018_idioms)]
17
18pub mod actor_registry;
19pub mod bus;
20pub mod cluster;
21pub mod ddata;
22pub mod dead_letters;
23pub mod dto;
24pub mod exporters;
25pub mod persistence;
26pub mod remote;
27pub mod sharding;
28pub mod streams;
29
30use std::sync::Arc;
31
32use atomr_core::actor::ActorSystem;
33use parking_lot::RwLock;
34
35use crate::actor_registry::ActorRegistry;
36use crate::bus::TelemetryBus;
37use crate::dead_letters::DeadLetterFeed;
38
39pub struct TelemetryExtension {
42 pub node: String,
43 pub bus: TelemetryBus,
44 pub actors: Arc<ActorRegistry>,
45 pub dead_letters: Arc<DeadLetterFeed>,
46 pub cluster: Arc<cluster::ClusterProbe>,
47 pub sharding: Arc<sharding::ShardingProbe>,
48 pub persistence: Arc<persistence::PersistenceProbe>,
49 pub remote: Arc<remote::RemoteProbe>,
50 pub streams: Arc<streams::StreamsProbe>,
51 pub ddata: Arc<ddata::DDataProbe>,
52 pub(crate) exporters: RwLock<Vec<Arc<dyn exporters::Exporter>>>,
53}
54
55impl TelemetryExtension {
56 pub fn new(node: impl Into<String>, channel_capacity: usize) -> Arc<Self> {
60 let bus = TelemetryBus::new(channel_capacity);
61 Arc::new(Self {
62 node: node.into(),
63 actors: Arc::new(ActorRegistry::new(bus.clone())),
64 dead_letters: Arc::new(DeadLetterFeed::new(bus.clone(), 1024)),
65 cluster: Arc::new(cluster::ClusterProbe::new(bus.clone())),
66 sharding: Arc::new(sharding::ShardingProbe::new(bus.clone())),
67 persistence: Arc::new(persistence::PersistenceProbe::new(bus.clone())),
68 remote: Arc::new(remote::RemoteProbe::new(bus.clone())),
69 streams: Arc::new(streams::StreamsProbe::new(bus.clone())),
70 ddata: Arc::new(ddata::DDataProbe::new(bus.clone())),
71 bus,
72 exporters: RwLock::new(Vec::new()),
73 })
74 }
75
76 pub fn install(self: Arc<Self>, system: &ActorSystem) -> Arc<Self> {
80 system.extensions().register(TelemetryHandle(self.clone()));
81 system.set_spawn_observer(self.actors.clone());
82 system.set_dead_letter_observer(self.dead_letters.clone());
83 self
84 }
85
86 pub fn from_system(system: &ActorSystem) -> Option<Arc<Self>> {
88 system.extensions().get::<TelemetryHandle>().map(|h| h.0.clone())
89 }
90
91 pub fn add_exporter(&self, exporter: Arc<dyn exporters::Exporter>) {
94 self.bus.attach_exporter(exporter.clone());
95 self.exporters.write().push(exporter);
96 }
97
98 pub fn snapshot(&self) -> dto::NodeSnapshot {
100 dto::NodeSnapshot {
101 node: self.node.clone(),
102 generated_at: chrono::Utc::now().to_rfc3339(),
103 actors: self.actors.snapshot(),
104 dead_letters: self.dead_letters.recent(100),
105 cluster: self.cluster.snapshot(),
106 sharding: self.sharding.snapshot(),
107 persistence: self.persistence.snapshot(),
108 remote: self.remote.snapshot(),
109 streams: self.streams.snapshot(),
110 ddata: self.ddata.snapshot(),
111 }
112 }
113}
114
115pub struct TelemetryHandle(pub Arc<TelemetryExtension>);