buswatch_sdk/
output.rs

1//! Output backends for emitting snapshots.
2
3use std::path::PathBuf;
4
5use buswatch_types::Snapshot;
6
7#[cfg(feature = "otel")]
8use std::sync::Arc;
9
10#[cfg(feature = "otel")]
11use crate::otel::{OtelConfig, OtelExporter};
12
13/// Output destination for snapshots.
14///
15/// Configure where the instrumentor should emit snapshots.
16#[derive(Debug)]
17pub enum Output {
18    /// Write snapshots to a JSON file.
19    ///
20    /// The file is overwritten with each snapshot.
21    File(PathBuf),
22
23    /// Send snapshots to a TCP server.
24    ///
25    /// Each snapshot is sent as a newline-delimited JSON message.
26    Tcp(String),
27
28    /// Send snapshots through a channel.
29    ///
30    /// Use `Output::channel()` to create this variant and get the receiver.
31    #[cfg(feature = "tokio")]
32    Channel(tokio::sync::mpsc::Sender<Snapshot>),
33
34    /// Export snapshots as OpenTelemetry metrics via OTLP.
35    ///
36    /// Use `Output::otel()` to create this variant.
37    #[cfg(feature = "otel")]
38    Otel(Arc<OtelExporter>),
39}
40
41impl Output {
42    /// Create a file output.
43    ///
44    /// # Example
45    ///
46    /// ```rust
47    /// use buswatch_sdk::Output;
48    ///
49    /// let output = Output::file("metrics.json");
50    /// ```
51    pub fn file(path: impl Into<PathBuf>) -> Self {
52        Output::File(path.into())
53    }
54
55    /// Create a TCP output.
56    ///
57    /// # Example
58    ///
59    /// ```rust
60    /// use buswatch_sdk::Output;
61    ///
62    /// let output = Output::tcp("localhost:9090");
63    /// ```
64    pub fn tcp(addr: impl Into<String>) -> Self {
65        Output::Tcp(addr.into())
66    }
67
68    /// Create a channel output and return both the output and receiver.
69    ///
70    /// This is useful for integrating with your own snapshot handling.
71    ///
72    /// # Example
73    ///
74    /// ```rust
75    /// use buswatch_sdk::Output;
76    ///
77    /// let (output, mut rx) = Output::channel(16);
78    ///
79    /// // Later, receive snapshots
80    /// // while let Some(snapshot) = rx.recv().await {
81    /// //     println!("Got snapshot with {} modules", snapshot.len());
82    /// // }
83    /// ```
84    #[cfg(feature = "tokio")]
85    pub fn channel(buffer: usize) -> (Self, tokio::sync::mpsc::Receiver<Snapshot>) {
86        let (tx, rx) = tokio::sync::mpsc::channel(buffer);
87        (Output::Channel(tx), rx)
88    }
89
90    /// Create an OpenTelemetry OTLP output.
91    ///
92    /// This exports metrics via OTLP to an OpenTelemetry collector or
93    /// compatible backend (Jaeger, Prometheus, Datadog, etc.).
94    ///
95    /// # Example
96    ///
97    /// ```rust,no_run
98    /// use buswatch_sdk::Output;
99    /// use buswatch_sdk::otel::OtelConfig;
100    ///
101    /// let config = OtelConfig::builder()
102    ///     .endpoint("http://localhost:4318")
103    ///     .service_name("my-service")
104    ///     .build();
105    ///
106    /// let output = Output::otel(config).expect("Failed to create OTLP exporter");
107    /// ```
108    #[cfg(feature = "otel")]
109    pub fn otel(config: OtelConfig) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
110        let exporter = OtelExporter::new(&config)?;
111        Ok(Output::Otel(Arc::new(exporter)))
112    }
113
114    /// Emit a snapshot to this output.
115    #[cfg(feature = "tokio")]
116    pub(crate) async fn emit(&self, snapshot: &Snapshot) -> std::io::Result<()> {
117        match self {
118            Output::File(path) => {
119                let json = serde_json::to_string_pretty(snapshot)?;
120                tokio::fs::write(path, json).await?;
121            }
122            Output::Tcp(addr) => {
123                use tokio::io::AsyncWriteExt;
124                use tokio::net::TcpStream;
125
126                // Try to connect and send (best effort)
127                if let Ok(mut stream) = TcpStream::connect(addr).await {
128                    let json = serde_json::to_string(snapshot)?;
129                    let _ = stream.write_all(json.as_bytes()).await;
130                    let _ = stream.write_all(b"\n").await;
131                }
132            }
133            Output::Channel(tx) => {
134                // Best effort send (don't block if channel is full)
135                let _ = tx.try_send(snapshot.clone());
136            }
137            #[cfg(feature = "otel")]
138            Output::Otel(exporter) => {
139                // Record metrics to OpenTelemetry
140                exporter.record(snapshot);
141            }
142        }
143        Ok(())
144    }
145}