Skip to main content

juncture_telemetry/
collector.rs

1//! Telemetry collector - main entry point for the observability engine.
2//!
3//! The `TelemetryCollector` orchestrates trace/observation lifecycle,
4//! delegates writes to the `BatchWriter`, and provides convenience
5//! methods for common telemetry operations.
6
7use std::sync::Arc;
8
9use tracing::debug;
10
11use crate::batch_writer::BatchWriter;
12use crate::langfuse::{LangfuseConfig, LangfuseExporter};
13use crate::models::{CaptureConfig, Id, Observation, Session, TokenUsage, Trace};
14use crate::trace_store::{StoreError, TraceStore};
15
16/// Main telemetry collector for Juncture graph execution.
17///
18/// Creates traces and observations, applies capture configuration,
19/// and submits them to the batch writer for async persistence.
20///
21/// # Examples
22///
23/// ```ignore
24/// use juncture_telemetry::{TelemetryCollector, SqliteStore};
25/// use std::sync::Arc;
26///
27/// let store = Arc::new(SqliteStore::new("telemetry.db").await?);
28/// let collector = TelemetryCollector::new(store);
29///
30/// let trace = collector.begin_trace("my_graph", Some("thread-1"));
31/// let obs = collector.begin_llm_call(trace.id, "claude-sonnet-4-20250514");
32/// // ... execute LLM call ...
33/// collector.end_llm_call(obs.id, Some(response), usage, cost).await;
34/// collector.end_trace(trace.id, Some(output), total_cost, total_tokens).await;
35/// ```
36#[derive(Clone, Debug)]
37pub struct TelemetryCollector {
38    writer: BatchWriter,
39    capture_config: Arc<CaptureConfig>,
40}
41
42impl TelemetryCollector {
43    /// Create a new collector with default capture configuration.
44    #[must_use]
45    pub fn new(store: Arc<dyn TraceStore>) -> Self {
46        Self {
47            writer: BatchWriter::new(store),
48            capture_config: Arc::new(CaptureConfig::default()),
49        }
50    }
51
52    /// Create a new collector with custom capture configuration.
53    #[must_use]
54    pub fn with_capture_config(store: Arc<dyn TraceStore>, config: CaptureConfig) -> Self {
55        Self {
56            writer: BatchWriter::new(store),
57            capture_config: Arc::new(config),
58        }
59    }
60
61    /// Create a new collector with Langfuse cloud export enabled.
62    ///
63    /// When configured, `flush()` and `shutdown()` automatically export
64    /// traces and observations to Langfuse cloud alongside local storage.
65    #[must_use]
66    pub fn with_langfuse(
67        store: Arc<dyn TraceStore>,
68        config: CaptureConfig,
69        langfuse_config: LangfuseConfig,
70    ) -> Self {
71        let exporter = LangfuseExporter::new(langfuse_config);
72        Self {
73            writer: BatchWriter::with_config_and_langfuse(store, Some(exporter), 50, 5_000),
74            capture_config: Arc::new(config),
75        }
76    }
77
78    /// Create a collector from pre-built components.
79    ///
80    /// Used by [`TelemetryConfig`](crate::config::TelemetryConfig) to
81    /// construct a collector with a pre-configured batch writer.
82    #[must_use]
83    pub(crate) fn from_parts(writer: BatchWriter, config: CaptureConfig) -> Self {
84        Self {
85            writer,
86            capture_config: Arc::new(config),
87        }
88    }
89
90    /// Get the capture configuration.
91    #[must_use]
92    pub fn capture_config(&self) -> &CaptureConfig {
93        &self.capture_config
94    }
95
96    // ── Trace lifecycle ──────────────────────────────────────────
97
98    /// Begin a new trace for a graph invocation.
99    ///
100    /// The trace is immediately submitted to the buffer so that
101    /// observations can reference it without FK constraint violations.
102    ///
103    /// # Errors
104    ///
105    /// Returns `StoreError::Storage` if the submission fails.
106    pub async fn begin_trace(
107        &self,
108        graph_name: impl Into<String>,
109        session_id: Option<String>,
110    ) -> Result<Trace, StoreError> {
111        let mut trace = Trace::new(graph_name);
112        trace.session_id = session_id;
113        debug!(trace_id = %trace.id, name = %trace.name, "trace started");
114        self.writer.submit_trace(trace.clone()).await?;
115        Ok(trace)
116    }
117
118    /// End a trace and submit the completed version for async writing.
119    ///
120    /// # Errors
121    ///
122    /// Returns `StoreError::Storage` if the submission fails.
123    pub async fn end_trace(
124        &self,
125        mut trace: Trace,
126        output: Option<serde_json::Value>,
127        total_cost: Option<f64>,
128        total_tokens: Option<u64>,
129    ) -> Result<(), StoreError> {
130        trace.complete(output, total_cost, total_tokens);
131        debug!(
132            trace_id = %trace.id,
133            duration_ms = trace.end_time
134                .map_or(0, |e| e.signed_duration_since(trace.start_time).num_milliseconds()),
135            "trace ended"
136        );
137        self.writer.submit_trace(trace).await
138    }
139
140    // ── Session management ───────────────────────────────────────
141
142    /// Create or update a session.
143    ///
144    /// # Errors
145    ///
146    /// Returns `StoreError::Storage` if the submission fails.
147    pub async fn track_session(
148        &self,
149        thread_id: impl Into<String>,
150        user_id: Option<String>,
151    ) -> Result<(), StoreError> {
152        let mut session = Session::new(thread_id);
153        session.user_id = user_id;
154        self.writer.submit_session(session).await
155    }
156
157    // ── LLM call lifecycle ───────────────────────────────────────
158
159    /// Begin an LLM call observation.
160    #[must_use]
161    pub fn begin_llm_call(
162        &self,
163        trace_id: Id,
164        parent_id: Option<Id>,
165        model: impl Into<String>,
166        prompt: Option<&serde_json::Value>,
167    ) -> Observation {
168        let mut obs = Observation::generation(trace_id, "llm_call", model);
169        obs.parent_observation_id = parent_id;
170        if self.capture_config.capture_full_messages {
171            if let Some(prompt) = prompt {
172                let serialized = serde_json::to_string(prompt).unwrap_or_default();
173                let truncated = self
174                    .capture_config
175                    .truncate(&serialized, self.capture_config.max_prompt_chars);
176                obs.input = Some(serde_json::Value::String(truncated));
177            }
178        }
179        obs
180    }
181
182    /// End an LLM call observation and submit it.
183    ///
184    /// # Errors
185    ///
186    /// Returns `StoreError::Storage` if the submission fails.
187    pub async fn end_llm_call(
188        &self,
189        mut obs: Observation,
190        response: Option<&str>,
191        usage: Option<TokenUsage>,
192        cost: Option<f64>,
193    ) -> Result<(), StoreError> {
194        if let Some(response) = response {
195            let truncated = self
196                .capture_config
197                .truncate(response, self.capture_config.max_response_chars);
198            obs.output = Some(serde_json::Value::String(truncated));
199        }
200        obs.usage = usage;
201        obs.cost = cost;
202        obs.complete(obs.output.clone());
203        self.writer.submit_observation(obs).await
204    }
205
206    // ── Tool call lifecycle ──────────────────────────────────────
207
208    /// Begin a tool call observation.
209    #[must_use]
210    pub fn begin_tool_call(
211        &self,
212        trace_id: Id,
213        parent_id: Option<Id>,
214        tool_name: impl Into<String>,
215        input: Option<&serde_json::Value>,
216    ) -> Observation {
217        let mut obs = Observation::tool_call(trace_id, tool_name);
218        obs.parent_observation_id = parent_id;
219        if self.capture_config.capture_tool_io {
220            obs.input = input.cloned();
221        }
222        obs
223    }
224
225    /// End a tool call observation and submit it.
226    ///
227    /// # Errors
228    ///
229    /// Returns `StoreError::Storage` if the submission fails.
230    pub async fn end_tool_call(
231        &self,
232        mut obs: Observation,
233        output: Option<serde_json::Value>,
234    ) -> Result<(), StoreError> {
235        if self.capture_config.capture_tool_io {
236            obs.output = output;
237        }
238        obs.complete(obs.output.clone());
239        self.writer.submit_observation(obs).await
240    }
241
242    // ── Generic span lifecycle ───────────────────────────────────
243
244    /// Begin a generic span observation.
245    #[must_use]
246    pub fn begin_span(
247        &self,
248        trace_id: Id,
249        parent_id: Option<Id>,
250        name: impl Into<String>,
251    ) -> Observation {
252        let mut obs = Observation::span(trace_id, name);
253        obs.parent_observation_id = parent_id;
254        obs
255    }
256
257    /// End a span observation and submit it.
258    ///
259    /// # Errors
260    ///
261    /// Returns `StoreError::Storage` if the submission fails.
262    pub async fn end_span(
263        &self,
264        mut obs: Observation,
265        output: Option<serde_json::Value>,
266    ) -> Result<(), StoreError> {
267        obs.complete(output);
268        self.writer.submit_observation(obs).await
269    }
270
271    /// Record a failed span and submit it.
272    ///
273    /// # Errors
274    ///
275    /// Returns `StoreError::Storage` if the submission fails.
276    pub async fn fail_span(
277        &self,
278        mut obs: Observation,
279        error: impl Into<String>,
280    ) -> Result<(), StoreError> {
281        obs.fail(error);
282        self.writer.submit_observation(obs).await
283    }
284
285    // ── Flush / Shutdown ─────────────────────────────────────────
286
287    /// Flush any buffered telemetry items to the store.
288    ///
289    /// # Errors
290    ///
291    /// Returns `StoreError::Storage` if any write fails.
292    pub async fn flush(&self) -> Result<(), StoreError> {
293        self.writer.flush().await
294    }
295
296    /// Shutdown the collector, flushing all remaining items.
297    ///
298    /// # Errors
299    ///
300    /// Returns `StoreError::Storage` if any write fails.
301    pub async fn shutdown(self) -> Result<(), StoreError> {
302        self.writer.shutdown().await
303    }
304}
305
306#[cfg(test)]
307#[expect(
308    clippy::clone_on_ref_ptr,
309    reason = ".clone() needed for unsized coercion Arc<SqliteStore> -> Arc<dyn TraceStore>"
310)]
311mod tests {
312    use super::*;
313    use crate::sqlite_store::SqliteStore;
314
315    #[tokio::test]
316    #[allow(clippy::too_many_lines, reason = "comprehensive lifecycle test")]
317    async fn collector_trace_lifecycle() {
318        let store = Arc::new(SqliteStore::new_memory().await.unwrap());
319        let dyn_store: Arc<dyn TraceStore> = store.clone();
320        let collector = TelemetryCollector::new(dyn_store);
321
322        let mut trace = collector
323            .begin_trace("test_graph", Some("thread-1".to_string()))
324            .await
325            .unwrap();
326        trace.user_id = Some("user-1".to_string());
327        let trace_id = trace.id;
328
329        let obs = collector.begin_span(trace_id, None, "juncture.superstep");
330        collector.end_span(obs, None).await.unwrap();
331
332        collector
333            .end_trace(
334                trace,
335                Some(serde_json::json!({"result": "ok"})),
336                Some(0.05),
337                Some(200),
338            )
339            .await
340            .unwrap();
341
342        collector.flush().await.unwrap();
343
344        let loaded = store.get_trace(trace_id).await.unwrap();
345        assert!(loaded.is_some(), "trace should exist");
346        let loaded = loaded.unwrap();
347        assert_eq!(loaded.observations.len(), 1, "expected 1 observation");
348    }
349
350    #[tokio::test]
351    async fn collector_llm_call_lifecycle() {
352        let store = Arc::new(SqliteStore::new_memory().await.unwrap());
353        let dyn_store: Arc<dyn TraceStore> = store.clone();
354        let collector = TelemetryCollector::new(dyn_store);
355
356        let trace = collector.begin_trace("test_graph", None).await.unwrap();
357        let trace_id = trace.id;
358
359        let obs = collector.begin_llm_call(
360            trace_id,
361            None,
362            "claude-sonnet-4-20250514",
363            Some(&serde_json::json!({"messages": [{"role": "user", "content": "hello"}]})),
364        );
365
366        let usage = TokenUsage {
367            input_tokens: 10,
368            output_tokens: 5,
369            total_tokens: 15,
370            cached_tokens: None,
371        };
372        collector
373            .end_llm_call(obs, Some("hi there"), Some(usage), Some(0.001))
374            .await
375            .unwrap();
376
377        collector.end_trace(trace, None, None, None).await.unwrap();
378        collector.flush().await.unwrap();
379
380        let loaded = store.get_trace(trace_id).await.unwrap().unwrap();
381        assert_eq!(loaded.observations.len(), 1);
382        let llm_obs = &loaded.observations[0];
383        assert!(llm_obs.input.is_some());
384        assert!(llm_obs.output.is_some());
385        assert!(llm_obs.usage.is_some());
386    }
387
388    #[tokio::test]
389    async fn collector_tool_call_lifecycle() {
390        let store = Arc::new(SqliteStore::new_memory().await.unwrap());
391        let dyn_store: Arc<dyn TraceStore> = store.clone();
392        let collector = TelemetryCollector::new(dyn_store);
393
394        let trace = collector.begin_trace("test_graph", None).await.unwrap();
395        let trace_id = trace.id;
396
397        let obs = collector.begin_tool_call(
398            trace_id,
399            None,
400            "search",
401            Some(&serde_json::json!({"query": "rust async"})),
402        );
403        collector
404            .end_tool_call(obs, Some(serde_json::json!({"results": ["item1"]})))
405            .await
406            .unwrap();
407
408        collector.end_trace(trace, None, None, None).await.unwrap();
409        collector.flush().await.unwrap();
410
411        let loaded = store.get_trace(trace_id).await.unwrap().unwrap();
412        assert_eq!(loaded.observations.len(), 1);
413    }
414
415    #[tokio::test]
416    async fn collector_capture_truncation() {
417        let config = CaptureConfig {
418            max_prompt_chars: 20,
419            max_response_chars: 20,
420            ..Default::default()
421        };
422        let store = Arc::new(SqliteStore::new_memory().await.unwrap());
423        let collector = TelemetryCollector::with_capture_config(store, config);
424
425        let trace = collector.begin_trace("test_graph", None).await.unwrap();
426        let long_prompt = serde_json::json!({"content": "a".repeat(1000)});
427        let obs = collector.begin_llm_call(trace.id, None, "model", Some(&long_prompt));
428
429        let input_str = obs.input.as_ref().and_then(|v| v.as_str()).unwrap_or("");
430        assert!(input_str.contains("truncated"));
431    }
432
433    #[tokio::test]
434    async fn collector_session_tracking() {
435        let store = Arc::new(SqliteStore::new_memory().await.unwrap());
436        let dyn_store: Arc<dyn TraceStore> = store.clone();
437        let collector = TelemetryCollector::new(dyn_store);
438
439        collector
440            .track_session("thread-1", Some("user-1".to_string()))
441            .await
442            .unwrap();
443        collector.flush().await.unwrap();
444
445        let session = store.get_session("thread-1").await.unwrap();
446        assert!(session.is_some());
447    }
448
449    /// Verify multi-agent tracing: coordinator + researcher + writer agents
450    /// with nested LLM calls and tool calls, forming a proper observation tree.
451    #[tokio::test]
452    #[allow(clippy::too_many_lines, reason = "comprehensive multi-agent test")]
453    async fn collector_multi_agent_tracing() {
454        let store = Arc::new(SqliteStore::new_memory().await.unwrap());
455        let dyn_store: Arc<dyn TraceStore> = store.clone();
456        let collector = TelemetryCollector::new(dyn_store);
457
458        // Track session
459        collector
460            .track_session("multi-agent-session", Some("user-1".to_string()))
461            .await
462            .unwrap();
463
464        // Start trace
465        let mut trace = collector
466            .begin_trace("research_pipeline", Some("multi-agent-session".to_string()))
467            .await
468            .unwrap();
469        trace.user_id = Some("user-1".to_string());
470        trace.tags = vec!["multi-agent".to_string()];
471        let trace_id = trace.id;
472
473        // ── Coordinator agent ────────────────────────────────
474        let coordinator = collector.begin_span(trace_id, None, "coordinator_agent");
475
476        // Coordinator LLM: decide routing
477        let coord_llm = collector.begin_llm_call(
478            trace_id,
479            Some(coordinator.id),
480            "gpt-4o",
481            Some(&serde_json::json!({"messages": [
482                {"role": "system", "content": "You are a coordinator."},
483                {"role": "user", "content": "Research quantum computing"}
484            ]})),
485        );
486        collector
487            .end_llm_call(
488                coord_llm,
489                Some("Delegating to researcher and writer."),
490                Some(TokenUsage {
491                    input_tokens: 50,
492                    output_tokens: 15,
493                    total_tokens: 65,
494                    cached_tokens: None,
495                }),
496                Some(0.0003),
497            )
498            .await
499            .unwrap();
500
501        collector.end_span(coordinator, None).await.unwrap();
502
503        // ── Researcher agent ─────────────────────────────────
504        let researcher = collector.begin_span(trace_id, None, "researcher_agent");
505
506        // Researcher LLM: analyze query
507        let res_llm1 = collector.begin_llm_call(
508            trace_id,
509            Some(researcher.id),
510            "gpt-4o",
511            Some(&serde_json::json!({"messages": [
512                {"role": "user", "content": "Analyze: quantum computing state"}
513            ]})),
514        );
515        collector
516            .end_llm_call(
517                res_llm1,
518                Some("Key areas: error correction, qubit scaling."),
519                Some(TokenUsage {
520                    input_tokens: 80,
521                    output_tokens: 30,
522                    total_tokens: 110,
523                    cached_tokens: None,
524                }),
525                Some(0.0005),
526            )
527            .await
528            .unwrap();
529
530        // Researcher tool: web search
531        let res_tool = collector.begin_tool_call(
532            trace_id,
533            Some(researcher.id),
534            "web_search",
535            Some(&serde_json::json!({"query": "quantum computing 2025"})),
536        );
537        collector
538            .end_tool_call(
539                res_tool,
540                Some(serde_json::json!({"results": ["IBM 1000-qubit processor"]})),
541            )
542            .await
543            .unwrap();
544
545        // Researcher LLM: synthesize
546        let res_llm2 = collector.begin_llm_call(
547            trace_id,
548            Some(researcher.id),
549            "gpt-4o",
550            Some(&serde_json::json!({"messages": [
551                {"role": "user", "content": "Synthesize findings"}
552            ]})),
553        );
554        collector
555            .end_llm_call(
556                res_llm2,
557                Some("Quantum computing has made significant progress."),
558                Some(TokenUsage {
559                    input_tokens: 120,
560                    output_tokens: 40,
561                    total_tokens: 160,
562                    cached_tokens: None,
563                }),
564                Some(0.0007),
565            )
566            .await
567            .unwrap();
568
569        collector.end_span(researcher, None).await.unwrap();
570
571        // ── Writer agent ─────────────────────────────────────
572        let writer = collector.begin_span(trace_id, None, "writer_agent");
573
574        let writer_llm = collector.begin_llm_call(
575            trace_id,
576            Some(writer.id),
577            "gpt-4o",
578            Some(&serde_json::json!({"messages": [
579                {"role": "user", "content": "Write report based on: Quantum computing has made significant progress."}
580            ]})),
581        );
582        collector
583            .end_llm_call(
584                writer_llm,
585                Some("## Quantum Computing Report\n\nSignificant progress has been made..."),
586                Some(TokenUsage {
587                    input_tokens: 100,
588                    output_tokens: 80,
589                    total_tokens: 180,
590                    cached_tokens: None,
591                }),
592                Some(0.0008),
593            )
594            .await
595            .unwrap();
596
597        collector.end_span(writer, None).await.unwrap();
598
599        // End trace
600        collector
601            .end_trace(
602                trace,
603                Some(serde_json::json!({"report": "Quantum computing report..."})),
604                Some(0.0023),
605                Some(515),
606            )
607            .await
608            .unwrap();
609
610        collector.flush().await.unwrap();
611
612        // ── Verify observation tree ──────────────────────────
613        let loaded = store.get_trace(trace_id).await.unwrap().unwrap();
614        assert_eq!(
615            loaded.observations.len(),
616            8,
617            "expected 8 observations (3 agents + 4 LLM + 1 tool)"
618        );
619
620        // Verify tree structure via parent_observation_id
621        let agent_spans_count = loaded
622            .observations
623            .iter()
624            .filter(|o| o.parent_observation_id.is_none())
625            .count();
626        assert_eq!(agent_spans_count, 3, "expected 3 top-level agent spans");
627
628        let coordinator_obs = loaded
629            .observations
630            .iter()
631            .find(|o| o.name == "coordinator_agent")
632            .unwrap();
633        let researcher_obs = loaded
634            .observations
635            .iter()
636            .find(|o| o.name == "researcher_agent")
637            .unwrap();
638        let writer_obs = loaded
639            .observations
640            .iter()
641            .find(|o| o.name == "writer_agent")
642            .unwrap();
643
644        // Coordinator has 1 LLM call
645        let coord_children: Vec<_> = loaded
646            .observations
647            .iter()
648            .filter(|o| o.parent_observation_id == Some(coordinator_obs.id))
649            .collect();
650        assert_eq!(coord_children.len(), 1, "coordinator should have 1 child");
651        assert_eq!(coord_children[0].name, "llm_call");
652
653        // Researcher has 3 children: 2 LLM calls + 1 tool call
654        let res_children: Vec<_> = loaded
655            .observations
656            .iter()
657            .filter(|o| o.parent_observation_id == Some(researcher_obs.id))
658            .collect();
659        assert_eq!(res_children.len(), 3, "researcher should have 3 children");
660
661        let res_generations_count = res_children
662            .iter()
663            .filter(|o| o.observation_type == crate::models::ObservationType::Generation)
664            .count();
665        let res_tools: Vec<_> = res_children
666            .iter()
667            .filter(|o| o.observation_type == crate::models::ObservationType::ToolCall)
668            .collect();
669        assert_eq!(
670            res_generations_count, 2,
671            "researcher should have 2 LLM calls"
672        );
673        assert_eq!(res_tools.len(), 1, "researcher should have 1 tool call");
674        assert_eq!(res_tools[0].name, "web_search");
675
676        // Writer has 1 LLM call
677        let writer_children_count = loaded
678            .observations
679            .iter()
680            .filter(|o| o.parent_observation_id == Some(writer_obs.id))
681            .count();
682        assert_eq!(writer_children_count, 1, "writer should have 1 child");
683
684        // Verify token usage and cost are recorded
685        let total_input: u64 = loaded
686            .observations
687            .iter()
688            .filter_map(|o| o.usage.as_ref())
689            .map(|u| u.input_tokens)
690            .sum();
691        let total_output: u64 = loaded
692            .observations
693            .iter()
694            .filter_map(|o| o.usage.as_ref())
695            .map(|u| u.output_tokens)
696            .sum();
697        assert_eq!(total_input, 350, "total input tokens");
698        assert_eq!(total_output, 165, "total output tokens");
699
700        let total_cost: f64 = loaded.observations.iter().filter_map(|o| o.cost).sum();
701        assert!(total_cost > 0.0, "total cost should be positive");
702    }
703}