use std::time::Duration;
use chrono::{DateTime, Utc};
use futures_util::stream::{self, BoxStream, StreamExt};
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast::{self, Receiver, error};
use tokio::time::timeout;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct SinkDiagnostic {
pub sink: String,
pub error: String,
pub when: DateTime<Utc>,
pub occurrence: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct SinkHealth {
pub sink: String,
pub error_count: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_error_at: Option<DateTime<Utc>>,
}
#[derive(Debug, Default, Clone)]
pub struct HealthState {
pub error_count: u64,
pub last_error: Option<String>,
pub last_error_at: Option<DateTime<Utc>>,
}
#[derive(Debug)]
pub struct DiagnosticsStream {
receiver: Receiver<SinkDiagnostic>,
}
impl DiagnosticsStream {
pub fn new(receiver: Receiver<SinkDiagnostic>) -> Self {
Self { receiver }
}
pub async fn recv(&mut self) -> Result<SinkDiagnostic, error::RecvError> {
self.receiver.recv().await
}
pub fn try_recv(&mut self) -> Result<SinkDiagnostic, broadcast::error::TryRecvError> {
self.receiver.try_recv()
}
pub fn into_inner(self) -> Receiver<SinkDiagnostic> {
self.receiver
}
pub fn into_async_stream(self) -> BoxStream<'static, SinkDiagnostic> {
let receiver = self.receiver;
stream::unfold(receiver, |mut receiver| async move {
loop {
match receiver.recv().await {
Ok(diag) => return Some((diag, receiver)),
Err(error::RecvError::Lagged(_)) => continue,
Err(error::RecvError::Closed) => return None,
}
}
})
.boxed()
}
pub async fn next_timeout(&mut self, duration: Duration) -> Option<SinkDiagnostic> {
loop {
match timeout(duration, self.recv()).await {
Ok(Ok(diag)) => return Some(diag),
Ok(Err(error::RecvError::Lagged(_))) => continue,
Ok(Err(error::RecvError::Closed)) => return None,
Err(_) => return None,
}
}
}
}