1use std::sync::Arc;
8
9use tracing::debug;
10
11use crate::batch_writer::BatchWriter;
12use crate::langfuse::{LangfuseConfig, LangfuseExporter};
13use crate::models::{CaptureConfig, Id, Observation, Session, TokenUsage, Trace};
14use crate::trace_store::{StoreError, TraceStore};
15
16#[derive(Clone, Debug)]
37pub struct TelemetryCollector {
38 writer: BatchWriter,
39 capture_config: Arc<CaptureConfig>,
40}
41
42impl TelemetryCollector {
43 #[must_use]
45 pub fn new(store: Arc<dyn TraceStore>) -> Self {
46 Self {
47 writer: BatchWriter::new(store),
48 capture_config: Arc::new(CaptureConfig::default()),
49 }
50 }
51
52 #[must_use]
54 pub fn with_capture_config(store: Arc<dyn TraceStore>, config: CaptureConfig) -> Self {
55 Self {
56 writer: BatchWriter::new(store),
57 capture_config: Arc::new(config),
58 }
59 }
60
61 #[must_use]
66 pub fn with_langfuse(
67 store: Arc<dyn TraceStore>,
68 config: CaptureConfig,
69 langfuse_config: LangfuseConfig,
70 ) -> Self {
71 let exporter = LangfuseExporter::new(langfuse_config);
72 Self {
73 writer: BatchWriter::with_config_and_langfuse(store, Some(exporter), 50, 5_000),
74 capture_config: Arc::new(config),
75 }
76 }
77
78 #[must_use]
83 pub(crate) fn from_parts(writer: BatchWriter, config: CaptureConfig) -> Self {
84 Self {
85 writer,
86 capture_config: Arc::new(config),
87 }
88 }
89
90 #[must_use]
92 pub fn capture_config(&self) -> &CaptureConfig {
93 &self.capture_config
94 }
95
96 pub async fn begin_trace(
107 &self,
108 graph_name: impl Into<String>,
109 session_id: Option<String>,
110 ) -> Result<Trace, StoreError> {
111 let mut trace = Trace::new(graph_name);
112 trace.session_id = session_id;
113 debug!(trace_id = %trace.id, name = %trace.name, "trace started");
114 self.writer.submit_trace(trace.clone()).await?;
115 Ok(trace)
116 }
117
118 pub async fn end_trace(
124 &self,
125 mut trace: Trace,
126 output: Option<serde_json::Value>,
127 total_cost: Option<f64>,
128 total_tokens: Option<u64>,
129 ) -> Result<(), StoreError> {
130 trace.complete(output, total_cost, total_tokens);
131 debug!(
132 trace_id = %trace.id,
133 duration_ms = trace.end_time
134 .map_or(0, |e| e.signed_duration_since(trace.start_time).num_milliseconds()),
135 "trace ended"
136 );
137 self.writer.submit_trace(trace).await
138 }
139
140 pub async fn track_session(
148 &self,
149 thread_id: impl Into<String>,
150 user_id: Option<String>,
151 ) -> Result<(), StoreError> {
152 let mut session = Session::new(thread_id);
153 session.user_id = user_id;
154 self.writer.submit_session(session).await
155 }
156
157 #[must_use]
161 pub fn begin_llm_call(
162 &self,
163 trace_id: Id,
164 parent_id: Option<Id>,
165 model: impl Into<String>,
166 prompt: Option<&serde_json::Value>,
167 ) -> Observation {
168 let mut obs = Observation::generation(trace_id, "llm_call", model);
169 obs.parent_observation_id = parent_id;
170 if self.capture_config.capture_full_messages {
171 if let Some(prompt) = prompt {
172 let serialized = serde_json::to_string(prompt).unwrap_or_default();
173 let truncated = self
174 .capture_config
175 .truncate(&serialized, self.capture_config.max_prompt_chars);
176 obs.input = Some(serde_json::Value::String(truncated));
177 }
178 }
179 obs
180 }
181
182 pub async fn end_llm_call(
188 &self,
189 mut obs: Observation,
190 response: Option<&str>,
191 usage: Option<TokenUsage>,
192 cost: Option<f64>,
193 ) -> Result<(), StoreError> {
194 if let Some(response) = response {
195 let truncated = self
196 .capture_config
197 .truncate(response, self.capture_config.max_response_chars);
198 obs.output = Some(serde_json::Value::String(truncated));
199 }
200 obs.usage = usage;
201 obs.cost = cost;
202 obs.complete(obs.output.clone());
203 self.writer.submit_observation(obs).await
204 }
205
206 #[must_use]
210 pub fn begin_tool_call(
211 &self,
212 trace_id: Id,
213 parent_id: Option<Id>,
214 tool_name: impl Into<String>,
215 input: Option<&serde_json::Value>,
216 ) -> Observation {
217 let mut obs = Observation::tool_call(trace_id, tool_name);
218 obs.parent_observation_id = parent_id;
219 if self.capture_config.capture_tool_io {
220 obs.input = input.cloned();
221 }
222 obs
223 }
224
225 pub async fn end_tool_call(
231 &self,
232 mut obs: Observation,
233 output: Option<serde_json::Value>,
234 ) -> Result<(), StoreError> {
235 if self.capture_config.capture_tool_io {
236 obs.output = output;
237 }
238 obs.complete(obs.output.clone());
239 self.writer.submit_observation(obs).await
240 }
241
242 #[must_use]
246 pub fn begin_span(
247 &self,
248 trace_id: Id,
249 parent_id: Option<Id>,
250 name: impl Into<String>,
251 ) -> Observation {
252 let mut obs = Observation::span(trace_id, name);
253 obs.parent_observation_id = parent_id;
254 obs
255 }
256
257 pub async fn end_span(
263 &self,
264 mut obs: Observation,
265 output: Option<serde_json::Value>,
266 ) -> Result<(), StoreError> {
267 obs.complete(output);
268 self.writer.submit_observation(obs).await
269 }
270
271 pub async fn fail_span(
277 &self,
278 mut obs: Observation,
279 error: impl Into<String>,
280 ) -> Result<(), StoreError> {
281 obs.fail(error);
282 self.writer.submit_observation(obs).await
283 }
284
285 pub async fn flush(&self) -> Result<(), StoreError> {
293 self.writer.flush().await
294 }
295
296 pub async fn shutdown(self) -> Result<(), StoreError> {
302 self.writer.shutdown().await
303 }
304}
305
306#[cfg(test)]
307#[expect(
308 clippy::clone_on_ref_ptr,
309 reason = ".clone() needed for unsized coercion Arc<SqliteStore> -> Arc<dyn TraceStore>"
310)]
311mod tests {
312 use super::*;
313 use crate::sqlite_store::SqliteStore;
314
315 #[tokio::test]
316 #[allow(clippy::too_many_lines, reason = "comprehensive lifecycle test")]
317 async fn collector_trace_lifecycle() {
318 let store = Arc::new(SqliteStore::new_memory().await.unwrap());
319 let dyn_store: Arc<dyn TraceStore> = store.clone();
320 let collector = TelemetryCollector::new(dyn_store);
321
322 let mut trace = collector
323 .begin_trace("test_graph", Some("thread-1".to_string()))
324 .await
325 .unwrap();
326 trace.user_id = Some("user-1".to_string());
327 let trace_id = trace.id;
328
329 let obs = collector.begin_span(trace_id, None, "juncture.superstep");
330 collector.end_span(obs, None).await.unwrap();
331
332 collector
333 .end_trace(
334 trace,
335 Some(serde_json::json!({"result": "ok"})),
336 Some(0.05),
337 Some(200),
338 )
339 .await
340 .unwrap();
341
342 collector.flush().await.unwrap();
343
344 let loaded = store.get_trace(trace_id).await.unwrap();
345 assert!(loaded.is_some(), "trace should exist");
346 let loaded = loaded.unwrap();
347 assert_eq!(loaded.observations.len(), 1, "expected 1 observation");
348 }
349
350 #[tokio::test]
351 async fn collector_llm_call_lifecycle() {
352 let store = Arc::new(SqliteStore::new_memory().await.unwrap());
353 let dyn_store: Arc<dyn TraceStore> = store.clone();
354 let collector = TelemetryCollector::new(dyn_store);
355
356 let trace = collector.begin_trace("test_graph", None).await.unwrap();
357 let trace_id = trace.id;
358
359 let obs = collector.begin_llm_call(
360 trace_id,
361 None,
362 "claude-sonnet-4-20250514",
363 Some(&serde_json::json!({"messages": [{"role": "user", "content": "hello"}]})),
364 );
365
366 let usage = TokenUsage {
367 input_tokens: 10,
368 output_tokens: 5,
369 total_tokens: 15,
370 cached_tokens: None,
371 };
372 collector
373 .end_llm_call(obs, Some("hi there"), Some(usage), Some(0.001))
374 .await
375 .unwrap();
376
377 collector.end_trace(trace, None, None, None).await.unwrap();
378 collector.flush().await.unwrap();
379
380 let loaded = store.get_trace(trace_id).await.unwrap().unwrap();
381 assert_eq!(loaded.observations.len(), 1);
382 let llm_obs = &loaded.observations[0];
383 assert!(llm_obs.input.is_some());
384 assert!(llm_obs.output.is_some());
385 assert!(llm_obs.usage.is_some());
386 }
387
388 #[tokio::test]
389 async fn collector_tool_call_lifecycle() {
390 let store = Arc::new(SqliteStore::new_memory().await.unwrap());
391 let dyn_store: Arc<dyn TraceStore> = store.clone();
392 let collector = TelemetryCollector::new(dyn_store);
393
394 let trace = collector.begin_trace("test_graph", None).await.unwrap();
395 let trace_id = trace.id;
396
397 let obs = collector.begin_tool_call(
398 trace_id,
399 None,
400 "search",
401 Some(&serde_json::json!({"query": "rust async"})),
402 );
403 collector
404 .end_tool_call(obs, Some(serde_json::json!({"results": ["item1"]})))
405 .await
406 .unwrap();
407
408 collector.end_trace(trace, None, None, None).await.unwrap();
409 collector.flush().await.unwrap();
410
411 let loaded = store.get_trace(trace_id).await.unwrap().unwrap();
412 assert_eq!(loaded.observations.len(), 1);
413 }
414
415 #[tokio::test]
416 async fn collector_capture_truncation() {
417 let config = CaptureConfig {
418 max_prompt_chars: 20,
419 max_response_chars: 20,
420 ..Default::default()
421 };
422 let store = Arc::new(SqliteStore::new_memory().await.unwrap());
423 let collector = TelemetryCollector::with_capture_config(store, config);
424
425 let trace = collector.begin_trace("test_graph", None).await.unwrap();
426 let long_prompt = serde_json::json!({"content": "a".repeat(1000)});
427 let obs = collector.begin_llm_call(trace.id, None, "model", Some(&long_prompt));
428
429 let input_str = obs.input.as_ref().and_then(|v| v.as_str()).unwrap_or("");
430 assert!(input_str.contains("truncated"));
431 }
432
433 #[tokio::test]
434 async fn collector_session_tracking() {
435 let store = Arc::new(SqliteStore::new_memory().await.unwrap());
436 let dyn_store: Arc<dyn TraceStore> = store.clone();
437 let collector = TelemetryCollector::new(dyn_store);
438
439 collector
440 .track_session("thread-1", Some("user-1".to_string()))
441 .await
442 .unwrap();
443 collector.flush().await.unwrap();
444
445 let session = store.get_session("thread-1").await.unwrap();
446 assert!(session.is_some());
447 }
448
449 #[tokio::test]
452 #[allow(clippy::too_many_lines, reason = "comprehensive multi-agent test")]
453 async fn collector_multi_agent_tracing() {
454 let store = Arc::new(SqliteStore::new_memory().await.unwrap());
455 let dyn_store: Arc<dyn TraceStore> = store.clone();
456 let collector = TelemetryCollector::new(dyn_store);
457
458 collector
460 .track_session("multi-agent-session", Some("user-1".to_string()))
461 .await
462 .unwrap();
463
464 let mut trace = collector
466 .begin_trace("research_pipeline", Some("multi-agent-session".to_string()))
467 .await
468 .unwrap();
469 trace.user_id = Some("user-1".to_string());
470 trace.tags = vec!["multi-agent".to_string()];
471 let trace_id = trace.id;
472
473 let coordinator = collector.begin_span(trace_id, None, "coordinator_agent");
475
476 let coord_llm = collector.begin_llm_call(
478 trace_id,
479 Some(coordinator.id),
480 "gpt-4o",
481 Some(&serde_json::json!({"messages": [
482 {"role": "system", "content": "You are a coordinator."},
483 {"role": "user", "content": "Research quantum computing"}
484 ]})),
485 );
486 collector
487 .end_llm_call(
488 coord_llm,
489 Some("Delegating to researcher and writer."),
490 Some(TokenUsage {
491 input_tokens: 50,
492 output_tokens: 15,
493 total_tokens: 65,
494 cached_tokens: None,
495 }),
496 Some(0.0003),
497 )
498 .await
499 .unwrap();
500
501 collector.end_span(coordinator, None).await.unwrap();
502
503 let researcher = collector.begin_span(trace_id, None, "researcher_agent");
505
506 let res_llm1 = collector.begin_llm_call(
508 trace_id,
509 Some(researcher.id),
510 "gpt-4o",
511 Some(&serde_json::json!({"messages": [
512 {"role": "user", "content": "Analyze: quantum computing state"}
513 ]})),
514 );
515 collector
516 .end_llm_call(
517 res_llm1,
518 Some("Key areas: error correction, qubit scaling."),
519 Some(TokenUsage {
520 input_tokens: 80,
521 output_tokens: 30,
522 total_tokens: 110,
523 cached_tokens: None,
524 }),
525 Some(0.0005),
526 )
527 .await
528 .unwrap();
529
530 let res_tool = collector.begin_tool_call(
532 trace_id,
533 Some(researcher.id),
534 "web_search",
535 Some(&serde_json::json!({"query": "quantum computing 2025"})),
536 );
537 collector
538 .end_tool_call(
539 res_tool,
540 Some(serde_json::json!({"results": ["IBM 1000-qubit processor"]})),
541 )
542 .await
543 .unwrap();
544
545 let res_llm2 = collector.begin_llm_call(
547 trace_id,
548 Some(researcher.id),
549 "gpt-4o",
550 Some(&serde_json::json!({"messages": [
551 {"role": "user", "content": "Synthesize findings"}
552 ]})),
553 );
554 collector
555 .end_llm_call(
556 res_llm2,
557 Some("Quantum computing has made significant progress."),
558 Some(TokenUsage {
559 input_tokens: 120,
560 output_tokens: 40,
561 total_tokens: 160,
562 cached_tokens: None,
563 }),
564 Some(0.0007),
565 )
566 .await
567 .unwrap();
568
569 collector.end_span(researcher, None).await.unwrap();
570
571 let writer = collector.begin_span(trace_id, None, "writer_agent");
573
574 let writer_llm = collector.begin_llm_call(
575 trace_id,
576 Some(writer.id),
577 "gpt-4o",
578 Some(&serde_json::json!({"messages": [
579 {"role": "user", "content": "Write report based on: Quantum computing has made significant progress."}
580 ]})),
581 );
582 collector
583 .end_llm_call(
584 writer_llm,
585 Some("## Quantum Computing Report\n\nSignificant progress has been made..."),
586 Some(TokenUsage {
587 input_tokens: 100,
588 output_tokens: 80,
589 total_tokens: 180,
590 cached_tokens: None,
591 }),
592 Some(0.0008),
593 )
594 .await
595 .unwrap();
596
597 collector.end_span(writer, None).await.unwrap();
598
599 collector
601 .end_trace(
602 trace,
603 Some(serde_json::json!({"report": "Quantum computing report..."})),
604 Some(0.0023),
605 Some(515),
606 )
607 .await
608 .unwrap();
609
610 collector.flush().await.unwrap();
611
612 let loaded = store.get_trace(trace_id).await.unwrap().unwrap();
614 assert_eq!(
615 loaded.observations.len(),
616 8,
617 "expected 8 observations (3 agents + 4 LLM + 1 tool)"
618 );
619
620 let agent_spans_count = loaded
622 .observations
623 .iter()
624 .filter(|o| o.parent_observation_id.is_none())
625 .count();
626 assert_eq!(agent_spans_count, 3, "expected 3 top-level agent spans");
627
628 let coordinator_obs = loaded
629 .observations
630 .iter()
631 .find(|o| o.name == "coordinator_agent")
632 .unwrap();
633 let researcher_obs = loaded
634 .observations
635 .iter()
636 .find(|o| o.name == "researcher_agent")
637 .unwrap();
638 let writer_obs = loaded
639 .observations
640 .iter()
641 .find(|o| o.name == "writer_agent")
642 .unwrap();
643
644 let coord_children: Vec<_> = loaded
646 .observations
647 .iter()
648 .filter(|o| o.parent_observation_id == Some(coordinator_obs.id))
649 .collect();
650 assert_eq!(coord_children.len(), 1, "coordinator should have 1 child");
651 assert_eq!(coord_children[0].name, "llm_call");
652
653 let res_children: Vec<_> = loaded
655 .observations
656 .iter()
657 .filter(|o| o.parent_observation_id == Some(researcher_obs.id))
658 .collect();
659 assert_eq!(res_children.len(), 3, "researcher should have 3 children");
660
661 let res_generations_count = res_children
662 .iter()
663 .filter(|o| o.observation_type == crate::models::ObservationType::Generation)
664 .count();
665 let res_tools: Vec<_> = res_children
666 .iter()
667 .filter(|o| o.observation_type == crate::models::ObservationType::ToolCall)
668 .collect();
669 assert_eq!(
670 res_generations_count, 2,
671 "researcher should have 2 LLM calls"
672 );
673 assert_eq!(res_tools.len(), 1, "researcher should have 1 tool call");
674 assert_eq!(res_tools[0].name, "web_search");
675
676 let writer_children_count = loaded
678 .observations
679 .iter()
680 .filter(|o| o.parent_observation_id == Some(writer_obs.id))
681 .count();
682 assert_eq!(writer_children_count, 1, "writer should have 1 child");
683
684 let total_input: u64 = loaded
686 .observations
687 .iter()
688 .filter_map(|o| o.usage.as_ref())
689 .map(|u| u.input_tokens)
690 .sum();
691 let total_output: u64 = loaded
692 .observations
693 .iter()
694 .filter_map(|o| o.usage.as_ref())
695 .map(|u| u.output_tokens)
696 .sum();
697 assert_eq!(total_input, 350, "total input tokens");
698 assert_eq!(total_output, 165, "total output tokens");
699
700 let total_cost: f64 = loaded.observations.iter().filter_map(|o| o.cost).sum();
701 assert!(total_cost > 0.0, "total cost should be positive");
702 }
703}