buswatch_sdk/output.rs
1//! Output backends for emitting snapshots.
2
3use std::path::PathBuf;
4
5use buswatch_types::Snapshot;
6
7#[cfg(feature = "otel")]
8use std::sync::Arc;
9
10#[cfg(feature = "otel")]
11use crate::otel::{OtelConfig, OtelExporter};
12
13/// Output destination for snapshots.
14///
15/// Configure where the instrumentor should emit snapshots.
16#[derive(Debug)]
17pub enum Output {
18 /// Write snapshots to a JSON file.
19 ///
20 /// The file is overwritten with each snapshot.
21 File(PathBuf),
22
23 /// Send snapshots to a TCP server.
24 ///
25 /// Each snapshot is sent as a newline-delimited JSON message.
26 Tcp(String),
27
28 /// Send snapshots through a channel.
29 ///
30 /// Use `Output::channel()` to create this variant and get the receiver.
31 #[cfg(feature = "tokio")]
32 Channel(tokio::sync::mpsc::Sender<Snapshot>),
33
34 /// Export snapshots as OpenTelemetry metrics via OTLP.
35 ///
36 /// Use `Output::otel()` to create this variant.
37 #[cfg(feature = "otel")]
38 Otel(Arc<OtelExporter>),
39}
40
41impl Output {
42 /// Create a file output.
43 ///
44 /// # Example
45 ///
46 /// ```rust
47 /// use buswatch_sdk::Output;
48 ///
49 /// let output = Output::file("metrics.json");
50 /// ```
51 pub fn file(path: impl Into<PathBuf>) -> Self {
52 Output::File(path.into())
53 }
54
55 /// Create a TCP output.
56 ///
57 /// # Example
58 ///
59 /// ```rust
60 /// use buswatch_sdk::Output;
61 ///
62 /// let output = Output::tcp("localhost:9090");
63 /// ```
64 pub fn tcp(addr: impl Into<String>) -> Self {
65 Output::Tcp(addr.into())
66 }
67
68 /// Create a channel output and return both the output and receiver.
69 ///
70 /// This is useful for integrating with your own snapshot handling.
71 ///
72 /// # Example
73 ///
74 /// ```rust
75 /// use buswatch_sdk::Output;
76 ///
77 /// let (output, mut rx) = Output::channel(16);
78 ///
79 /// // Later, receive snapshots
80 /// // while let Some(snapshot) = rx.recv().await {
81 /// // println!("Got snapshot with {} modules", snapshot.len());
82 /// // }
83 /// ```
84 #[cfg(feature = "tokio")]
85 pub fn channel(buffer: usize) -> (Self, tokio::sync::mpsc::Receiver<Snapshot>) {
86 let (tx, rx) = tokio::sync::mpsc::channel(buffer);
87 (Output::Channel(tx), rx)
88 }
89
90 /// Create an OpenTelemetry OTLP output.
91 ///
92 /// This exports metrics via OTLP to an OpenTelemetry collector or
93 /// compatible backend (Jaeger, Prometheus, Datadog, etc.).
94 ///
95 /// # Example
96 ///
97 /// ```rust,no_run
98 /// use buswatch_sdk::Output;
99 /// use buswatch_sdk::otel::OtelConfig;
100 ///
101 /// let config = OtelConfig::builder()
102 /// .endpoint("http://localhost:4318")
103 /// .service_name("my-service")
104 /// .build();
105 ///
106 /// let output = Output::otel(config).expect("Failed to create OTLP exporter");
107 /// ```
108 #[cfg(feature = "otel")]
109 pub fn otel(config: OtelConfig) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
110 let exporter = OtelExporter::new(&config)?;
111 Ok(Output::Otel(Arc::new(exporter)))
112 }
113
114 /// Emit a snapshot to this output.
115 #[cfg(feature = "tokio")]
116 pub(crate) async fn emit(&self, snapshot: &Snapshot) -> std::io::Result<()> {
117 match self {
118 Output::File(path) => {
119 let json = serde_json::to_string_pretty(snapshot)?;
120 tokio::fs::write(path, json).await?;
121 }
122 Output::Tcp(addr) => {
123 use tokio::io::AsyncWriteExt;
124 use tokio::net::TcpStream;
125
126 // Try to connect and send (best effort)
127 if let Ok(mut stream) = TcpStream::connect(addr).await {
128 let json = serde_json::to_string(snapshot)?;
129 let _ = stream.write_all(json.as_bytes()).await;
130 let _ = stream.write_all(b"\n").await;
131 }
132 }
133 Output::Channel(tx) => {
134 // Best effort send (don't block if channel is full)
135 let _ = tx.try_send(snapshot.clone());
136 }
137 #[cfg(feature = "otel")]
138 Output::Otel(exporter) => {
139 // Record metrics to OpenTelemetry
140 exporter.record(snapshot);
141 }
142 }
143 Ok(())
144 }
145}