use {
crate::{
Criteria,
PeerId,
StreamId,
discovery::PeerEntry,
primitives::Digest,
},
chrono::{DateTime, Utc},
core::{
fmt,
sync::atomic::{AtomicI64, AtomicUsize, Ordering},
time::Duration,
},
humansize::{DECIMAL, format_size},
humantime::format_duration,
std::sync::Arc,
tokio::sync::watch,
};
pub struct Stats {
connected_at: AtomicI64,
datums: AtomicUsize,
bytes: AtomicUsize,
network_label: [(&'static str, String); 2],
}
impl Stats {
pub fn datums(&self) -> usize {
self.datums.load(Ordering::Relaxed)
}
pub fn bytes(&self) -> usize {
self.bytes.load(Ordering::Relaxed)
}
pub fn uptime(&self) -> Option<Duration> {
let ts = self.connected_at.load(Ordering::Relaxed);
if ts == 0 {
return None;
}
#[allow(clippy::missing_panics_doc)]
let connected_at = DateTime::<Utc>::from_timestamp_millis(ts).expect(
"stored connected_at timestamp should always be a valid datetime",
);
(Utc::now() - connected_at).to_std().ok()
}
}
impl Stats {
pub(crate) fn increment_datums(&self) {
self.datums.fetch_add(1, Ordering::Relaxed);
metrics::counter!(
"mosaik.streams.datums.total",
self.network_label.as_slice()
)
.increment(1);
}
pub(crate) fn increment_bytes(&self, n: usize) {
self.bytes.fetch_add(n, Ordering::Relaxed);
metrics::counter!(
"mosaik.streams.bytes.total",
self.network_label.as_slice()
)
.increment(n as u64);
}
pub(crate) fn disconnected(&self) {
self.connected_at.store(0, Ordering::Relaxed);
}
pub(crate) fn connected(&self) {
self
.connected_at
.store(Utc::now().timestamp_millis(), Ordering::Relaxed);
}
pub const fn new(network_label: [(&'static str, String); 2]) -> Self {
Self {
connected_at: AtomicI64::new(0),
bytes: AtomicUsize::new(0),
datums: AtomicUsize::new(0),
network_label,
}
}
pub(crate) fn new_connected(
network_label: [(&'static str, String); 2],
) -> Self {
let stats = Self::new(network_label);
stats.connected();
stats
}
}
impl core::fmt::Display for Stats {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(
f,
"uptime: {}, datums: {}, bytes: {}",
self
.uptime()
.map_or_else(|| "N/A".to_string(), |d| format_duration(d).to_string()),
self.datums(),
format_size(self.bytes(), DECIMAL),
)
}
}
impl fmt::Debug for Stats {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Stats")
.field("uptime", &self.uptime().map(format_duration))
.field("datums", &self.datums())
.field("bytes", &format_size(self.bytes(), DECIMAL))
.finish_non_exhaustive()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum State {
Connecting,
Connected,
Terminated,
}
#[derive(Debug, Clone)]
pub struct ChannelInfo {
pub(crate) stream_id: StreamId,
pub(crate) criteria: Criteria,
pub(crate) producer_id: PeerId,
pub(crate) consumer_id: PeerId,
pub(crate) stats: Arc<Stats>,
pub(crate) peer: Arc<PeerEntry>,
pub(crate) state: watch::Receiver<State>,
}
impl ChannelInfo {
pub const fn stream_id(&self) -> &StreamId {
&self.stream_id
}
pub const fn criteria(&self) -> &Criteria {
&self.criteria
}
pub const fn producer_id(&self) -> &PeerId {
&self.producer_id
}
pub const fn consumer_id(&self) -> &PeerId {
&self.consumer_id
}
pub fn peer(&self) -> &PeerEntry {
&self.peer
}
pub fn state(&self) -> State {
*self.state.borrow()
}
pub const fn state_watcher(&self) -> &watch::Receiver<State> {
&self.state
}
pub fn stats(&self) -> &Stats {
&self.stats
}
pub fn is_connected(&self) -> bool {
self.state() == State::Connected
}
pub fn disconnected(
&self,
) -> impl Future<Output = ()> + Send + Sync + 'static {
let mut state_rx = self.state.clone();
async move {
let _ = state_rx.wait_for(|s| s == &State::Terminated).await;
}
}
}
pub type ActiveChannelsMap = im::HashMap<Digest, ChannelInfo>;