weavegraph 0.7.0

Graph-driven, concurrent agent workflow framework with versioned state, deterministic barrier merges, and rich diagnostics.
Documentation
//! Per-sink health tracking and the diagnostics broadcast stream.

use std::time::Duration;

use chrono::{DateTime, Utc};
use futures_util::stream::{self, BoxStream, StreamExt};
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast::{
    Receiver,
    error::{RecvError, TryRecvError},
};
use tokio::time::timeout;

/// A single error event emitted when a sink fails.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct SinkDiagnostic {
    /// Logical sink identifier.
    pub sink: String,
    /// Error message produced by the sink.
    pub error: String,
    /// Timestamp of the failure.
    pub when: DateTime<Utc>,
    /// Monotonically increasing error count for this sink.
    pub occurrence: u64,
}

/// Health snapshot for a single sink.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct SinkHealth {
    /// Sink identifier.
    pub sink: String,
    /// Total errors recorded.
    pub error_count: u64,
    /// Most recent error message, if any.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub last_error: Option<String>,
    /// Timestamp of the most recent error, if any.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub last_error_at: Option<DateTime<Utc>>,
}

/// Internal per-sink error accumulator.
#[derive(Debug, Default, Clone)]
pub struct HealthState {
    /// Total errors recorded.
    pub error_count: u64,
    /// Last error message.
    pub last_error: Option<String>,
    /// Timestamp of the last error.
    pub last_error_at: Option<DateTime<Utc>>,
}

/// Broadcast receiver wrapper for sink diagnostics.
#[derive(Debug)]
pub struct DiagnosticsStream {
    receiver: Receiver<SinkDiagnostic>,
}

impl DiagnosticsStream {
    /// Wrap a broadcast receiver.
    pub fn new(receiver: Receiver<SinkDiagnostic>) -> Self {
        Self { receiver }
    }

    /// Await the next diagnostic.
    pub async fn recv(&mut self) -> Result<SinkDiagnostic, RecvError> {
        self.receiver.recv().await
    }

    /// Poll for the next diagnostic without blocking.
    pub fn try_recv(&mut self) -> Result<SinkDiagnostic, TryRecvError> {
        self.receiver.try_recv()
    }

    /// Unwrap into the inner broadcast receiver.
    pub fn into_inner(self) -> Receiver<SinkDiagnostic> {
        self.receiver
    }

    /// Convert into a boxed async stream, silently skipping lagged messages.
    pub fn into_async_stream(self) -> BoxStream<'static, SinkDiagnostic> {
        stream::unfold(self.receiver, |mut rx| async move {
            loop {
                match rx.recv().await {
                    Ok(diag) => return Some((diag, rx)),
                    Err(RecvError::Lagged(_)) => continue,
                    Err(RecvError::Closed) => return None,
                }
            }
        })
        .boxed()
    }

    /// Wait up to `duration` for the next diagnostic, skipping lag notifications.
    pub async fn next_timeout(&mut self, duration: Duration) -> Option<SinkDiagnostic> {
        loop {
            match timeout(duration, self.recv()).await {
                Ok(Ok(diag)) => return Some(diag),
                Ok(Err(RecvError::Lagged(_))) => continue,
                Ok(Err(RecvError::Closed)) | Err(_) => return None,
            }
        }
    }
}