juncture_telemetry/
config.rs1use std::sync::Arc;
27
28#[cfg(unix)]
29use tokio::signal::unix::{SignalKind, signal};
30use tracing::{info, warn};
31
32use crate::batch_writer::BatchWriter;
33use crate::collector::TelemetryCollector;
34use crate::langfuse::{LangfuseConfig, LangfuseExporter};
35use crate::models::CaptureConfig;
36use crate::trace_store::{StoreError, TraceStore};
37
38#[cfg(feature = "sqlite")]
39use crate::sqlite_store::SqliteStore;
40
41#[cfg(feature = "web")]
42use crate::web::WebServer;
43
44#[derive(Debug)]
48pub struct TelemetryConfig {
49 store_path: Option<String>,
50 langfuse: Option<LangfuseConfig>,
51 dashboard_port: Option<u16>,
52 bind_ip: [u8; 4],
53 capture_config: CaptureConfig,
54}
55
56impl TelemetryConfig {
57 #[must_use]
59 pub fn new() -> Self {
60 Self {
61 store_path: None,
62 langfuse: None,
63 dashboard_port: None,
64 bind_ip: [127, 0, 0, 1],
65 capture_config: CaptureConfig::default(),
66 }
67 }
68
69 #[must_use]
73 pub fn with_store(mut self, path: impl Into<String>) -> Self {
74 self.store_path = Some(path.into());
75 self
76 }
77
78 #[must_use]
80 pub fn with_langfuse(mut self, config: LangfuseConfig) -> Self {
81 self.langfuse = Some(config);
82 self
83 }
84
85 #[must_use]
94 pub fn with_langfuse_from_env(self) -> Self {
95 let public_key = std::env::var("LANGFUSE_PUBLIC_KEY").unwrap_or_default();
96 let secret_key = std::env::var("LANGFUSE_SECRET_KEY").unwrap_or_default();
97
98 if public_key.is_empty() || secret_key.is_empty() {
99 info!("LANGFUSE_PUBLIC_KEY or LANGFUSE_SECRET_KEY not set, skipping Langfuse export");
100 return self;
101 }
102
103 let base_url = std::env::var("LANGFUSE_BASE_URL")
104 .unwrap_or_else(|_| "https://cloud.langfuse.com".to_string());
105
106 self.with_langfuse(LangfuseConfig {
107 public_key,
108 secret_key,
109 base_url,
110 })
111 }
112
113 #[must_use]
115 pub const fn with_dashboard(mut self, port: u16) -> Self {
116 self.dashboard_port = Some(port);
117 self
118 }
119
120 #[must_use]
124 pub const fn with_bind_addr(mut self, ip: [u8; 4]) -> Self {
125 self.bind_ip = ip;
126 self
127 }
128
129 #[must_use]
131 pub fn with_capture_config(mut self, config: CaptureConfig) -> Self {
132 self.capture_config = config;
133 self
134 }
135
136 #[cfg(feature = "sqlite")]
147 pub async fn install(self) -> Result<TelemetryHandle, StoreError> {
148 let store: Arc<dyn TraceStore> = if let Some(ref path) = self.store_path {
149 let s = SqliteStore::new(path).await?;
150 info!(path = %path, "telemetry SQLite store created");
151 Arc::new(s)
152 } else {
153 let s = SqliteStore::new_memory().await?;
154 info!("telemetry in-memory store created");
155 Arc::new(s)
156 };
157
158 let exporter = self.langfuse.map(|config| {
159 let url = config.base_url.clone();
160 info!(url = %url, "Langfuse cloud export enabled");
161 LangfuseExporter::new(config)
162 });
163
164 let writer = if exporter.is_some() {
165 BatchWriter::with_config_and_langfuse(Arc::clone(&store), exporter, 50, 5_000)
166 } else {
167 BatchWriter::new(Arc::clone(&store))
168 };
169
170 let collector = TelemetryCollector::from_parts(writer, self.capture_config);
171 let server_handle = Self::start_dashboard(self.dashboard_port, self.bind_ip, &store).await;
172
173 Self::spawn_signal_handler(&collector);
174
175 Ok(TelemetryHandle {
176 collector,
177 server: server_handle,
178 })
179 }
180
181 #[cfg(feature = "web")]
182 async fn start_dashboard(
183 port: Option<u16>,
184 bind_ip: [u8; 4],
185 store: &Arc<dyn TraceStore>,
186 ) -> Option<crate::web::WebServerHandle> {
187 let port = port?;
188 let server = WebServer::new(Arc::clone(store), port).with_bind_addr(bind_ip);
189 match server.start().await {
190 Ok(h) => {
191 info!(url = %h.base_url(), "telemetry dashboard started");
192 Some(h)
193 }
194 Err(e) => {
195 warn!("failed to start telemetry dashboard: {e}");
196 None
197 }
198 }
199 }
200
201 #[cfg(not(feature = "web"))]
202 async fn start_dashboard(
203 port: Option<u16>,
204 _bind_ip: [u8; 4],
205 _store: &Arc<dyn TraceStore>,
206 ) -> Option<()> {
207 if port.is_some() {
208 warn!("dashboard requested but 'web' feature not enabled");
209 }
210 None
211 }
212
213 fn spawn_signal_handler(collector: &TelemetryCollector) {
214 let collector_clone = collector.clone();
215 tokio::spawn(async move {
216 #[cfg(unix)]
217 {
218 let mut sigterm =
219 signal(SignalKind::terminate()).expect("failed to register SIGTERM handler");
220 tokio::select! {
221 _ = tokio::signal::ctrl_c() => {},
222 _ = sigterm.recv() => {},
223 }
224 }
225 #[cfg(not(unix))]
226 {
227 let _ = tokio::signal::ctrl_c().await;
228 }
229 info!("signal received, flushing telemetry...");
230 if let Err(e) = collector_clone.flush().await {
231 warn!("telemetry flush on shutdown failed: {e}");
232 }
233 });
234 }
235}
236
237impl Default for TelemetryConfig {
238 fn default() -> Self {
239 Self::new()
240 }
241}
242
243pub struct TelemetryHandle {
248 collector: TelemetryCollector,
249 #[cfg(feature = "web")]
250 server: Option<crate::web::WebServerHandle>,
251 #[cfg(not(feature = "web"))]
252 server: Option<()>,
253}
254
255impl std::fmt::Debug for TelemetryHandle {
256 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
257 f.debug_struct("TelemetryHandle")
258 .field("has_dashboard", &self.server.is_some())
259 .finish_non_exhaustive()
260 }
261}
262
263impl TelemetryHandle {
264 #[must_use]
266 pub const fn collector(&self) -> &TelemetryCollector {
267 &self.collector
268 }
269
270 #[must_use]
272 #[cfg(feature = "web")]
273 pub fn dashboard_url(&self) -> Option<String> {
274 self.server
275 .as_ref()
276 .map(crate::web::WebServerHandle::base_url)
277 }
278
279 #[allow(unused_mut, reason = "mut required when web feature is enabled")]
285 pub async fn shutdown(mut self) -> Result<(), StoreError> {
286 self.collector.flush().await?;
287 #[cfg(feature = "web")]
288 if let Some(ref mut server) = self.server {
289 server.stop();
290 }
291 Ok(())
292 }
293}
294
295impl Drop for TelemetryHandle {
296 fn drop(&mut self) {
297 let collector = self.collector.clone();
300 tokio::spawn(async move {
301 let _ = collector.flush().await;
302 });
303 #[cfg(feature = "web")]
304 if let Some(ref mut server) = self.server {
305 server.stop();
306 }
307 }
308}