#![forbid(unsafe_code)]
#![deny(rust_2018_idioms)]
pub mod actor_registry;
pub mod bus;
pub mod cluster;
pub mod ddata;
pub mod dead_letters;
pub mod dto;
pub mod exporters;
pub mod persistence;
pub mod remote;
pub mod sharding;
pub mod streams;
use std::sync::Arc;
use atomr_core::actor::ActorSystem;
use parking_lot::RwLock;
use crate::actor_registry::ActorRegistry;
use crate::bus::TelemetryBus;
use crate::dead_letters::DeadLetterFeed;
pub struct TelemetryExtension {
pub node: String,
pub bus: TelemetryBus,
pub actors: Arc<ActorRegistry>,
pub dead_letters: Arc<DeadLetterFeed>,
pub cluster: Arc<cluster::ClusterProbe>,
pub sharding: Arc<sharding::ShardingProbe>,
pub persistence: Arc<persistence::PersistenceProbe>,
pub remote: Arc<remote::RemoteProbe>,
pub streams: Arc<streams::StreamsProbe>,
pub ddata: Arc<ddata::DDataProbe>,
pub(crate) exporters: RwLock<Vec<Arc<dyn exporters::Exporter>>>,
}
impl TelemetryExtension {
pub fn new(node: impl Into<String>, channel_capacity: usize) -> Arc<Self> {
let bus = TelemetryBus::new(channel_capacity);
Arc::new(Self {
node: node.into(),
actors: Arc::new(ActorRegistry::new(bus.clone())),
dead_letters: Arc::new(DeadLetterFeed::new(bus.clone(), 1024)),
cluster: Arc::new(cluster::ClusterProbe::new(bus.clone())),
sharding: Arc::new(sharding::ShardingProbe::new(bus.clone())),
persistence: Arc::new(persistence::PersistenceProbe::new(bus.clone())),
remote: Arc::new(remote::RemoteProbe::new(bus.clone())),
streams: Arc::new(streams::StreamsProbe::new(bus.clone())),
ddata: Arc::new(ddata::DDataProbe::new(bus.clone())),
bus,
exporters: RwLock::new(Vec::new()),
})
}
pub fn install(self: Arc<Self>, system: &ActorSystem) -> Arc<Self> {
system.extensions().register(TelemetryHandle(self.clone()));
system.set_spawn_observer(self.actors.clone());
system.set_dead_letter_observer(self.dead_letters.clone());
self
}
pub fn from_system(system: &ActorSystem) -> Option<Arc<Self>> {
system.extensions().get::<TelemetryHandle>().map(|h| h.0.clone())
}
pub fn add_exporter(&self, exporter: Arc<dyn exporters::Exporter>) {
self.bus.attach_exporter(exporter.clone());
self.exporters.write().push(exporter);
}
pub fn snapshot(&self) -> dto::NodeSnapshot {
dto::NodeSnapshot {
node: self.node.clone(),
generated_at: chrono::Utc::now().to_rfc3339(),
actors: self.actors.snapshot(),
dead_letters: self.dead_letters.recent(100),
cluster: self.cluster.snapshot(),
sharding: self.sharding.snapshot(),
persistence: self.persistence.snapshot(),
remote: self.remote.snapshot(),
streams: self.streams.snapshot(),
ddata: self.ddata.snapshot(),
}
}
}
pub struct TelemetryHandle(pub Arc<TelemetryExtension>);