auths_cli/telemetry.rs
1//! CLI-owned telemetry infrastructure.
2//!
3//! Provides `TokioMpscSink` — an async, non-blocking telemetry sink backed by
4//! a `tokio::sync::mpsc` channel. The CLI binary owns the background worker
5//! task so the library crate (`auths-telemetry`) remains tokio-free.
6//!
7//! # Usage (async CLI entry point)
8//!
9//! ```ignore
10//! use auths_cli::telemetry::TokioMpscSink;
11//! use auths_telemetry::init_telemetry_with_sink;
12//! use std::sync::Arc;
13//!
14//! #[tokio::main]
15//! async fn main() -> anyhow::Result<()> {
16//! let sink = TokioMpscSink::new(256);
17//! init_telemetry_with_sink(Arc::new(sink.clone()));
18//!
19//! let result = run_command().await;
20//!
21//! sink.flush().await;
22//! result
23//! }
24//! ```
25
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::sync::mpsc as std_mpsc;
28
29use auths_telemetry::EventSink;
30use tokio::sync::mpsc;
31
32static DROPPED: AtomicU64 = AtomicU64::new(0);
33
34enum WorkerMsg {
35 Event(String),
36 Flush(std_mpsc::SyncSender<()>),
37}
38
39/// Non-blocking telemetry sink backed by a Tokio MPSC channel.
40///
41/// `emit()` is non-blocking — events are queued to the channel. `flush()` is
42/// synchronous and blocks until the background worker acknowledges all prior
43/// events. Call `flush().await` before the process exits.
44#[derive(Clone)]
45pub struct TokioMpscSink {
46 tx: mpsc::WeakSender<WorkerMsg>,
47 /// Kept alive so the channel stays open until `flush()` drops it.
48 _strong: std::sync::Arc<mpsc::Sender<WorkerMsg>>,
49}
50
51impl TokioMpscSink {
52 /// Spawn the background writer and return the sink.
53 ///
54 /// Args:
55 /// * `capacity`: MPSC channel buffer depth in events.
56 ///
57 /// Usage:
58 /// ```ignore
59 /// let sink = TokioMpscSink::new(256);
60 /// ```
61 pub fn new(capacity: usize) -> Self {
62 let (tx, mut rx) = mpsc::channel::<WorkerMsg>(capacity);
63 let weak = tx.downgrade();
64 let strong = std::sync::Arc::new(tx);
65
66 tokio::spawn(async move {
67 use tokio::io::AsyncWriteExt;
68 let mut writer = tokio::io::BufWriter::new(tokio::io::stdout());
69
70 while let Some(msg) = rx.recv().await {
71 match msg {
72 WorkerMsg::Event(line) => {
73 let _ = writer.write_all(line.as_bytes()).await;
74 let _ = writer.write_all(b"\n").await;
75 let _ = writer.flush().await;
76
77 let dropped = DROPPED.swap(0, Ordering::AcqRel);
78 if dropped > 0 {
79 let meta = serde_json::json!({
80 "event_type": "TelemetryDegradation",
81 "dropped_count": dropped,
82 });
83 let s = serde_json::to_string(&meta).unwrap_or_default();
84 let _ = writer.write_all(s.as_bytes()).await;
85 let _ = writer.write_all(b"\n").await;
86 let _ = writer.flush().await;
87 }
88 }
89 WorkerMsg::Flush(ack) => {
90 let _ = writer.flush().await;
91 let _ = ack.send(());
92 }
93 }
94 }
95 let _ = writer.flush().await;
96 });
97
98 Self {
99 tx: weak,
100 _strong: strong,
101 }
102 }
103
104 /// Flush all buffered events and wait for the worker to drain.
105 ///
106 /// Call this before the process exits to prevent losing events.
107 pub async fn flush(self) {
108 // Drop the strong sender so the channel closes after the flush message.
109 let strong = self._strong;
110 let tx = self.tx.upgrade();
111 drop(strong);
112 if let Some(tx) = tx {
113 let (ack_tx, ack_rx) = std_mpsc::sync_channel(0);
114 let _ = tx.send(WorkerMsg::Flush(ack_tx)).await;
115 let _ = tokio::task::spawn_blocking(move || {
116 let _ = ack_rx.recv_timeout(std::time::Duration::from_secs(5));
117 })
118 .await;
119 }
120 }
121}
122
123impl EventSink for TokioMpscSink {
124 fn emit(&self, payload: &str) {
125 let Some(tx) = self.tx.upgrade() else { return };
126 if tx.try_send(WorkerMsg::Event(payload.to_string())).is_err() {
127 DROPPED.fetch_add(1, Ordering::Relaxed);
128 }
129 }
130
131 fn flush(&self) {
132 let Some(tx) = self.tx.upgrade() else { return };
133 let (ack_tx, ack_rx) = std_mpsc::sync_channel(0);
134 if tx.try_send(WorkerMsg::Flush(ack_tx)).is_ok() {
135 let _ = ack_rx.recv_timeout(std::time::Duration::from_secs(5));
136 }
137 }
138}