Skip to main content

auths_cli/
telemetry.rs

1//! CLI-owned telemetry infrastructure.
2//!
3//! Provides `TokioMpscSink` — an async, non-blocking telemetry sink backed by
4//! a `tokio::sync::mpsc` channel. The CLI binary owns the background worker
5//! task so the library crate (`auths-telemetry`) remains tokio-free.
6//!
7//! # Usage (async CLI entry point)
8//!
9//! ```ignore
10//! use auths_cli::telemetry::TokioMpscSink;
11//! use auths_telemetry::init_telemetry_with_sink;
12//! use std::sync::Arc;
13//!
14//! #[tokio::main]
15//! async fn main() -> anyhow::Result<()> {
16//!     let sink = TokioMpscSink::new(256);
17//!     init_telemetry_with_sink(Arc::new(sink.clone()));
18//!
19//!     let result = run_command().await;
20//!
21//!     sink.flush().await;
22//!     result
23//! }
24//! ```
25
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::sync::mpsc as std_mpsc;
28
29use auths_telemetry::EventSink;
30use tokio::sync::mpsc;
31
32static DROPPED: AtomicU64 = AtomicU64::new(0);
33
34enum WorkerMsg {
35    Event(String),
36    Flush(std_mpsc::SyncSender<()>),
37}
38
39/// Non-blocking telemetry sink backed by a Tokio MPSC channel.
40///
41/// `emit()` is non-blocking — events are queued to the channel. `flush()` is
42/// synchronous and blocks until the background worker acknowledges all prior
43/// events. Call `flush().await` before the process exits.
44#[derive(Clone)]
45pub struct TokioMpscSink {
46    tx: mpsc::WeakSender<WorkerMsg>,
47    /// Kept alive so the channel stays open until `flush()` drops it.
48    _strong: std::sync::Arc<mpsc::Sender<WorkerMsg>>,
49}
50
51impl TokioMpscSink {
52    /// Spawn the background writer and return the sink.
53    ///
54    /// Args:
55    /// * `capacity`: MPSC channel buffer depth in events.
56    ///
57    /// Usage:
58    /// ```ignore
59    /// let sink = TokioMpscSink::new(256);
60    /// ```
61    pub fn new(capacity: usize) -> Self {
62        let (tx, mut rx) = mpsc::channel::<WorkerMsg>(capacity);
63        let weak = tx.downgrade();
64        let strong = std::sync::Arc::new(tx);
65
66        tokio::spawn(async move {
67            use tokio::io::AsyncWriteExt;
68            let mut writer = tokio::io::BufWriter::new(tokio::io::stdout());
69
70            while let Some(msg) = rx.recv().await {
71                match msg {
72                    WorkerMsg::Event(line) => {
73                        let _ = writer.write_all(line.as_bytes()).await;
74                        let _ = writer.write_all(b"\n").await;
75                        let _ = writer.flush().await;
76
77                        let dropped = DROPPED.swap(0, Ordering::AcqRel);
78                        if dropped > 0 {
79                            let meta = serde_json::json!({
80                                "event_type": "TelemetryDegradation",
81                                "dropped_count": dropped,
82                            });
83                            let s = serde_json::to_string(&meta).unwrap_or_default();
84                            let _ = writer.write_all(s.as_bytes()).await;
85                            let _ = writer.write_all(b"\n").await;
86                            let _ = writer.flush().await;
87                        }
88                    }
89                    WorkerMsg::Flush(ack) => {
90                        let _ = writer.flush().await;
91                        let _ = ack.send(());
92                    }
93                }
94            }
95            let _ = writer.flush().await;
96        });
97
98        Self {
99            tx: weak,
100            _strong: strong,
101        }
102    }
103
104    /// Flush all buffered events and wait for the worker to drain.
105    ///
106    /// Call this before the process exits to prevent losing events.
107    pub async fn flush(self) {
108        // Drop the strong sender so the channel closes after the flush message.
109        let strong = self._strong;
110        let tx = self.tx.upgrade();
111        drop(strong);
112        if let Some(tx) = tx {
113            let (ack_tx, ack_rx) = std_mpsc::sync_channel(0);
114            let _ = tx.send(WorkerMsg::Flush(ack_tx)).await;
115            let _ = tokio::task::spawn_blocking(move || {
116                let _ = ack_rx.recv_timeout(std::time::Duration::from_secs(5));
117            })
118            .await;
119        }
120    }
121}
122
123impl EventSink for TokioMpscSink {
124    fn emit(&self, payload: &str) {
125        let Some(tx) = self.tx.upgrade() else { return };
126        if tx.try_send(WorkerMsg::Event(payload.to_string())).is_err() {
127            DROPPED.fetch_add(1, Ordering::Relaxed);
128        }
129    }
130
131    fn flush(&self) {
132        let Some(tx) = self.tx.upgrade() else { return };
133        let (ack_tx, ack_rx) = std_mpsc::sync_channel(0);
134        if tx.try_send(WorkerMsg::Flush(ack_tx)).is_ok() {
135            let _ = ack_rx.recv_timeout(std::time::Duration::from_secs(5));
136        }
137    }
138}