use std::sync::Arc;
use tokio::sync::{broadcast, oneshot};
use crate::data::DomusAddr;
use crate::ids::{AureliaError, ErrorId};
use super::actor::{ObservabilityCommand, ObservabilityStore};
use super::types::{DomusMetrics, DomusMetricsDelta, DomusReportingEvent, PeerIdentityReport};
#[derive(Clone)]
pub struct DomusReporting {
pub(super) store: Arc<ObservabilityStore>,
}
impl DomusReporting {
pub async fn snapshot(&self) -> Result<DomusMetrics, AureliaError> {
let (tx, rx) = oneshot::channel();
self.store
.tx
.send(ObservabilityCommand::Snapshot { reply: tx })
.await
.map_err(|_| snapshot_not_available())?;
rx.await.map_err(|_| snapshot_not_available())
}
pub async fn snapshot_and_reset(&self) -> Result<DomusMetricsDelta, AureliaError> {
let (tx, rx) = oneshot::channel();
self.store
.tx
.send(ObservabilityCommand::SnapshotAndReset { reply: tx })
.await
.map_err(|_| snapshot_not_available())?;
rx.await.map_err(|_| snapshot_not_available())
}
pub async fn connected_peer_identities(&self) -> Result<Vec<DomusAddr>, AureliaError> {
let (tx, rx) = oneshot::channel();
self.store
.tx
.send(ObservabilityCommand::ConnectedPeerIdentities { reply: tx })
.await
.map_err(|_| snapshot_not_available())?;
rx.await.map_err(|_| snapshot_not_available())
}
pub async fn connected_peers(&self) -> Result<Vec<PeerIdentityReport>, AureliaError> {
let (tx, rx) = oneshot::channel();
self.store
.tx
.send(ObservabilityCommand::ConnectedPeers { reply: tx })
.await
.map_err(|_| snapshot_not_available())?;
rx.await.map_err(|_| snapshot_not_available())
}
pub fn subscribe_events(&self) -> broadcast::Receiver<DomusReportingEvent> {
self.store.events.subscribe()
}
pub fn subscribe_errors(&self) -> broadcast::Receiver<(u64, AureliaError)> {
self.store.errors.subscribe()
}
pub fn feeds(&self) -> DomusReportingFeeds {
DomusReportingFeeds {
events: self.subscribe_events(),
errors: self.subscribe_errors(),
}
}
pub async fn errors_since(
&self,
seq: u64,
limit: usize,
) -> Result<Vec<(u64, AureliaError)>, AureliaError> {
let (tx, rx) = oneshot::channel();
self.store
.tx
.send(ObservabilityCommand::ErrorsSince {
seq,
limit,
reply: tx,
})
.await
.map_err(|_| snapshot_not_available())?;
rx.await.map_err(|_| snapshot_not_available())
}
}
fn snapshot_not_available() -> AureliaError {
AureliaError::new(ErrorId::SnapshotNotAvailable)
}
pub struct DomusReportingFeeds {
pub events: broadcast::Receiver<DomusReportingEvent>,
pub errors: broadcast::Receiver<(u64, AureliaError)>,
}