Skip to main content

juncture_telemetry/
config.rs

1//! Telemetry configuration builder and handle.
2//!
3//! Provides a `juncture-tracing`-style `init()` builder for one-line
4//! telemetry setup with optional Langfuse cloud export and embedded dashboard.
5//!
6//! # Quick Start
7//!
8//! ```ignore
9//! use juncture_telemetry::init;
10//!
11//! // Minimal -- SQLite in-memory, no export
12//! let telemetry = init().await?;
13//!
14//! // File persistence + Langfuse cloud + dashboard
15//! let telemetry = init()
16//!     .with_store("telemetry.db")
17//!     .with_langfuse_from_env()
18//!     .with_dashboard(8123)
19//!     .await?;
20//!
21//! let collector = telemetry.collector();
22//! // ... run your agent ...
23//! telemetry.shutdown().await?;
24//! ```
25
26use std::sync::Arc;
27
28#[cfg(unix)]
29use tokio::signal::unix::{SignalKind, signal};
30use tracing::{info, warn};
31
32use crate::batch_writer::BatchWriter;
33use crate::collector::TelemetryCollector;
34use crate::langfuse::{LangfuseConfig, LangfuseExporter};
35use crate::models::CaptureConfig;
36use crate::trace_store::{StoreError, TraceStore};
37
38#[cfg(feature = "sqlite")]
39use crate::sqlite_store::SqliteStore;
40
41#[cfg(feature = "web")]
42use crate::web::WebServer;
43
44/// Builder for telemetry configuration.
45///
46/// Construct via [`init()`](crate::init).
47#[derive(Debug)]
48pub struct TelemetryConfig {
49    store_path: Option<String>,
50    langfuse: Option<LangfuseConfig>,
51    dashboard_port: Option<u16>,
52    bind_ip: [u8; 4],
53    capture_config: CaptureConfig,
54}
55
56impl TelemetryConfig {
57    /// Create a new configuration with sensible defaults.
58    #[must_use]
59    pub fn new() -> Self {
60        Self {
61            store_path: None,
62            langfuse: None,
63            dashboard_port: None,
64            bind_ip: [127, 0, 0, 1],
65            capture_config: CaptureConfig::default(),
66        }
67    }
68
69    /// Set the `SQLite` database file path.
70    ///
71    /// If not called, an in-memory database is used.
72    #[must_use]
73    pub fn with_store(mut self, path: impl Into<String>) -> Self {
74        self.store_path = Some(path.into());
75        self
76    }
77
78    /// Enable Langfuse cloud export with explicit credentials.
79    #[must_use]
80    pub fn with_langfuse(mut self, config: LangfuseConfig) -> Self {
81        self.langfuse = Some(config);
82        self
83    }
84
85    /// Enable Langfuse cloud export by reading credentials from environment.
86    ///
87    /// Reads:
88    /// - `LANGFUSE_PUBLIC_KEY`
89    /// - `LANGFUSE_SECRET_KEY`
90    /// - `LANGFUSE_BASE_URL` (defaults to `https://cloud.langfuse.com`)
91    ///
92    /// If any required variable is missing, Langfuse export is silently skipped.
93    #[must_use]
94    pub fn with_langfuse_from_env(self) -> Self {
95        let public_key = std::env::var("LANGFUSE_PUBLIC_KEY").unwrap_or_default();
96        let secret_key = std::env::var("LANGFUSE_SECRET_KEY").unwrap_or_default();
97
98        if public_key.is_empty() || secret_key.is_empty() {
99            info!("LANGFUSE_PUBLIC_KEY or LANGFUSE_SECRET_KEY not set, skipping Langfuse export");
100            return self;
101        }
102
103        let base_url = std::env::var("LANGFUSE_BASE_URL")
104            .unwrap_or_else(|_| "https://cloud.langfuse.com".to_string());
105
106        self.with_langfuse(LangfuseConfig {
107            public_key,
108            secret_key,
109            base_url,
110        })
111    }
112
113    /// Enable the embedded web dashboard on the given port.
114    #[must_use]
115    pub const fn with_dashboard(mut self, port: u16) -> Self {
116        self.dashboard_port = Some(port);
117        self
118    }
119
120    /// Set the bind address for the dashboard server.
121    ///
122    /// Use `[0, 0, 0, 0]` for public access. Default is `[127, 0, 0, 1]`.
123    #[must_use]
124    pub const fn with_bind_addr(mut self, ip: [u8; 4]) -> Self {
125        self.bind_ip = ip;
126        self
127    }
128
129    /// Set custom capture configuration.
130    #[must_use]
131    pub fn with_capture_config(mut self, config: CaptureConfig) -> Self {
132        self.capture_config = config;
133        self
134    }
135
136    /// Build and start all telemetry components.
137    ///
138    /// Creates the store, collector, optional Langfuse exporter,
139    /// and optional dashboard server. Returns a handle that
140    /// provides access to the collector and manages shutdown.
141    ///
142    /// # Errors
143    ///
144    /// Returns `StoreError` if the database cannot be opened or
145    /// the dashboard server cannot start.
146    #[cfg(feature = "sqlite")]
147    pub async fn install(self) -> Result<TelemetryHandle, StoreError> {
148        let store: Arc<dyn TraceStore> = if let Some(ref path) = self.store_path {
149            let s = SqliteStore::new(path).await?;
150            info!(path = %path, "telemetry SQLite store created");
151            Arc::new(s)
152        } else {
153            let s = SqliteStore::new_memory().await?;
154            info!("telemetry in-memory store created");
155            Arc::new(s)
156        };
157
158        let exporter = self.langfuse.map(|config| {
159            let url = config.base_url.clone();
160            info!(url = %url, "Langfuse cloud export enabled");
161            LangfuseExporter::new(config)
162        });
163
164        let writer = if exporter.is_some() {
165            BatchWriter::with_config_and_langfuse(Arc::clone(&store), exporter, 50, 5_000)
166        } else {
167            BatchWriter::new(Arc::clone(&store))
168        };
169
170        let collector = TelemetryCollector::from_parts(writer, self.capture_config);
171        let server_handle = Self::start_dashboard(self.dashboard_port, self.bind_ip, &store).await;
172
173        Self::spawn_signal_handler(&collector);
174
175        Ok(TelemetryHandle {
176            collector,
177            server: server_handle,
178        })
179    }
180
181    #[cfg(feature = "web")]
182    async fn start_dashboard(
183        port: Option<u16>,
184        bind_ip: [u8; 4],
185        store: &Arc<dyn TraceStore>,
186    ) -> Option<crate::web::WebServerHandle> {
187        let port = port?;
188        let server = WebServer::new(Arc::clone(store), port).with_bind_addr(bind_ip);
189        match server.start().await {
190            Ok(h) => {
191                info!(url = %h.base_url(), "telemetry dashboard started");
192                Some(h)
193            }
194            Err(e) => {
195                warn!("failed to start telemetry dashboard: {e}");
196                None
197            }
198        }
199    }
200
201    #[cfg(not(feature = "web"))]
202    async fn start_dashboard(
203        port: Option<u16>,
204        _bind_ip: [u8; 4],
205        _store: &Arc<dyn TraceStore>,
206    ) -> Option<()> {
207        if port.is_some() {
208            warn!("dashboard requested but 'web' feature not enabled");
209        }
210        None
211    }
212
213    fn spawn_signal_handler(collector: &TelemetryCollector) {
214        let collector_clone = collector.clone();
215        tokio::spawn(async move {
216            #[cfg(unix)]
217            {
218                let mut sigterm =
219                    signal(SignalKind::terminate()).expect("failed to register SIGTERM handler");
220                tokio::select! {
221                    _ = tokio::signal::ctrl_c() => {},
222                    _ = sigterm.recv() => {},
223                }
224            }
225            #[cfg(not(unix))]
226            {
227                let _ = tokio::signal::ctrl_c().await;
228            }
229            info!("signal received, flushing telemetry...");
230            if let Err(e) = collector_clone.flush().await {
231                warn!("telemetry flush on shutdown failed: {e}");
232            }
233        });
234    }
235}
236
237impl Default for TelemetryConfig {
238    fn default() -> Self {
239        Self::new()
240    }
241}
242
243/// Handle to running telemetry components.
244///
245/// Provides access to the collector and manages graceful shutdown.
246/// Dropping the handle flushes any buffered telemetry data.
247pub struct TelemetryHandle {
248    collector: TelemetryCollector,
249    #[cfg(feature = "web")]
250    server: Option<crate::web::WebServerHandle>,
251    #[cfg(not(feature = "web"))]
252    server: Option<()>,
253}
254
255impl std::fmt::Debug for TelemetryHandle {
256    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
257        f.debug_struct("TelemetryHandle")
258            .field("has_dashboard", &self.server.is_some())
259            .finish_non_exhaustive()
260    }
261}
262
263impl TelemetryHandle {
264    /// Get a reference to the telemetry collector.
265    #[must_use]
266    pub const fn collector(&self) -> &TelemetryCollector {
267        &self.collector
268    }
269
270    /// Get the dashboard base URL, if the dashboard is running.
271    #[must_use]
272    #[cfg(feature = "web")]
273    pub fn dashboard_url(&self) -> Option<String> {
274        self.server
275            .as_ref()
276            .map(crate::web::WebServerHandle::base_url)
277    }
278
279    /// Flush all buffered telemetry and stop the dashboard server.
280    ///
281    /// # Errors
282    ///
283    /// Returns `StoreError` if the flush fails.
284    #[allow(unused_mut, reason = "mut required when web feature is enabled")]
285    pub async fn shutdown(mut self) -> Result<(), StoreError> {
286        self.collector.flush().await?;
287        #[cfg(feature = "web")]
288        if let Some(ref mut server) = self.server {
289            server.stop();
290        }
291        Ok(())
292    }
293}
294
295impl Drop for TelemetryHandle {
296    fn drop(&mut self) {
297        // Best-effort flush on drop. We cannot await here, so we
298        // spawn a task. The signal handler also flushes on ctrl-c.
299        let collector = self.collector.clone();
300        tokio::spawn(async move {
301            let _ = collector.flush().await;
302        });
303        #[cfg(feature = "web")]
304        if let Some(ref mut server) = self.server {
305            server.stop();
306        }
307    }
308}