Skip to main content

defect_obs/langfuse/
observer.rs

1//! `LangfuseObserver` reports each session's
2//! [`AgentEvent`](defect_agent::event::AgentEvent) stream to Langfuse.
3//!
4//! The shape follows `defect-storage::StorageObserver` (`crates/storage/src/lib.rs`):
5//! in [`SessionObserver::on_session_created`], `session.subscribe()` obtains an
6//! independent
7//! mpsc stream, `tokio::spawn` a consumer task that feeds each event to
8//! [`TraceProjector`] for
9//! translation and then to [`LangfuseIngest`] for reporting; after the stream ends
10//! (session drop),
11//! `flush` any remaining data.
12//!
13//! Key difference from storage: **degradable dropping**. Storage's slow consumption
14//! backpressures the main loop
15//! ("no-drop" semantics); Langfuse cannot do that — so the consumer loop only does
16//! `enqueue` (non-blocking) +
17//! lightweight translation. All real network I/O lives in [`LangfuseIngest`]'s background
18//! task, which drops when full.
19//! Any Langfuse failure must NOT affect the agent.
20
21use std::sync::Arc;
22
23use defect_agent::error::BoxError;
24use defect_agent::session::{Session, SessionCreateInfo, SessionObserver};
25use futures::StreamExt;
26
27use super::ingest::LangfuseIngest;
28use super::projector::TraceProjector;
29
30/// Langfuse reporting observer. `Clone` is cheap (internally `Arc`).
31#[derive(Clone)]
32pub struct LangfuseObserver {
33    ingest: LangfuseIngest,
34}
35
36impl LangfuseObserver {
37    /// Constructs a new observer from an already-started ingester. The ingester's
38    /// background task is launched by [`LangfuseIngest::spawn`]; this observer only wires
39    /// in the per-session event stream.
40    #[must_use]
41    pub fn new(ingest: LangfuseIngest) -> Self {
42        Self { ingest }
43    }
44}
45
46impl SessionObserver for LangfuseObserver {
47    fn on_session_created(
48        &self,
49        session: Arc<dyn Session>,
50        info: SessionCreateInfo,
51    ) -> Result<(), BoxError> {
52        let mut events = session.subscribe();
53        let ingest = self.ingest.clone();
54        let session_id = info.id.0.to_string();
55
56        tokio::spawn(async move {
57            let mut projector = TraceProjector::new(session_id);
58            // Each ingestion event gets a random UUID as its envelope ID / trace ID.
59            let mut new_id = || uuid::Uuid::new_v4().to_string();
60
61            while let Some(event) = events.next().await {
62                // Use the receive time as an approximation of the event time (AgentEvent
63                // has no timestamp).
64                let now = chrono::Utc::now().to_rfc3339();
65                for ev in projector.project(event, &now, &mut new_id) {
66                    ingest.enqueue(ev);
67                }
68            }
69
70            // On stream end (session drop / process exit): best-effort flush of remaining
71            // telemetry.
72            ingest.flush().await;
73        });
74
75        Ok(())
76    }
77}