defect_obs/langfuse/observer.rs
1//! `LangfuseObserver` reports each session's
2//! [`AgentEvent`](defect_agent::event::AgentEvent) stream to Langfuse.
3//!
4//! The shape follows `defect-storage::StorageObserver` (`crates/storage/src/lib.rs`):
5//! in [`SessionObserver::on_session_created`], `session.subscribe()` obtains an
6//! independent
7//! mpsc stream, `tokio::spawn` a consumer task that feeds each event to
8//! [`TraceProjector`] for
9//! translation and then to [`LangfuseIngest`] for reporting; after the stream ends
10//! (session drop),
11//! `flush` any remaining data.
12//!
13//! Key difference from storage: **degradable dropping**. Storage's slow consumption
14//! backpressures the main loop
15//! ("no-drop" semantics); Langfuse cannot do that — so the consumer loop only does
16//! `enqueue` (non-blocking) +
17//! lightweight translation. All real network I/O lives in [`LangfuseIngest`]'s background
18//! task, which drops when full.
19//! Any Langfuse failure must NOT affect the agent.
20
21use std::sync::Arc;
22
23use defect_agent::error::BoxError;
24use defect_agent::session::{Session, SessionCreateInfo, SessionObserver};
25use futures::StreamExt;
26
27use super::ingest::LangfuseIngest;
28use super::projector::TraceProjector;
29
30/// Langfuse reporting observer. `Clone` is cheap (internally `Arc`).
31#[derive(Clone)]
32pub struct LangfuseObserver {
33 ingest: LangfuseIngest,
34}
35
36impl LangfuseObserver {
37 /// Constructs a new observer from an already-started ingester. The ingester's
38 /// background task is launched by [`LangfuseIngest::spawn`]; this observer only wires
39 /// in the per-session event stream.
40 #[must_use]
41 pub fn new(ingest: LangfuseIngest) -> Self {
42 Self { ingest }
43 }
44}
45
46impl SessionObserver for LangfuseObserver {
47 fn on_session_created(
48 &self,
49 session: Arc<dyn Session>,
50 info: SessionCreateInfo,
51 ) -> Result<(), BoxError> {
52 let mut events = session.subscribe();
53 let ingest = self.ingest.clone();
54 let session_id = info.id.0.to_string();
55
56 tokio::spawn(async move {
57 let mut projector = TraceProjector::new(session_id);
58 // Each ingestion event gets a random UUID as its envelope ID / trace ID.
59 let mut new_id = || uuid::Uuid::new_v4().to_string();
60
61 while let Some(event) = events.next().await {
62 // Use the receive time as an approximation of the event time (AgentEvent
63 // has no timestamp).
64 let now = chrono::Utc::now().to_rfc3339();
65 for ev in projector.project(event, &now, &mut new_id) {
66 ingest.enqueue(ev);
67 }
68 }
69
70 // On stream end (session drop / process exit): best-effort flush of remaining
71 // telemetry.
72 ingest.flush().await;
73 });
74
75 Ok(())
76 }
77}