use std::time::Duration;
use chrono::{DateTime, Utc};
use futures_util::stream::{self, BoxStream, StreamExt};
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast::{
Receiver,
error::{RecvError, TryRecvError},
};
use tokio::time::timeout;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct SinkDiagnostic {
pub sink: String,
pub error: String,
pub when: DateTime<Utc>,
pub occurrence: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct SinkHealth {
pub sink: String,
pub error_count: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_error_at: Option<DateTime<Utc>>,
}
#[derive(Debug, Default, Clone)]
pub struct HealthState {
pub error_count: u64,
pub last_error: Option<String>,
pub last_error_at: Option<DateTime<Utc>>,
}
#[derive(Debug)]
pub struct DiagnosticsStream {
receiver: Receiver<SinkDiagnostic>,
}
impl DiagnosticsStream {
pub fn new(receiver: Receiver<SinkDiagnostic>) -> Self {
Self { receiver }
}
pub async fn recv(&mut self) -> Result<SinkDiagnostic, RecvError> {
self.receiver.recv().await
}
pub fn try_recv(&mut self) -> Result<SinkDiagnostic, TryRecvError> {
self.receiver.try_recv()
}
pub fn into_inner(self) -> Receiver<SinkDiagnostic> {
self.receiver
}
pub fn into_async_stream(self) -> BoxStream<'static, SinkDiagnostic> {
stream::unfold(self.receiver, |mut rx| async move {
loop {
match rx.recv().await {
Ok(diag) => return Some((diag, rx)),
Err(RecvError::Lagged(_)) => continue,
Err(RecvError::Closed) => return None,
}
}
})
.boxed()
}
pub async fn next_timeout(&mut self, duration: Duration) -> Option<SinkDiagnostic> {
loop {
match timeout(duration, self.recv()).await {
Ok(Ok(diag)) => return Some(diag),
Ok(Err(RecvError::Lagged(_))) => continue,
Ok(Err(RecvError::Closed)) | Err(_) => return None,
}
}
}
}