defect_obs/langfuse.rs
1//! Langfuse integration — uploads an `AgentEvent` stream as Langfuse traces, generations,
2//! and spans.
3//!
4//! Module layout:
5//! - [`model`]: wire types for the ingestion API.
6//! - [`ingest`]: buffered batching + background reporting (with optional drop
7//! degradation).
8//! - [`projector`]: translates `AgentEvent` into ingestion events.
9//! - [`observer`]: `SessionObserver` implementation that subscribes to the event stream
10//! per session.
11
12use std::time::Duration;
13
14use defect_http::HttpStack;
15
16pub mod ingest;
17pub mod model;
18pub mod observer;
19pub mod projector;
20
21pub use ingest::{IngestConfig, LangfuseIngest};
22pub use observer::LangfuseObserver;
23pub use projector::TraceProjector;
24
25/// Default Langfuse host.
26pub const DEFAULT_HOST: &str = "https://cloud.langfuse.com";
27/// Default periodic flush interval.
28pub const DEFAULT_FLUSH_INTERVAL: Duration = Duration::from_secs(2);
29/// Default maximum number of events per batch.
30pub const DEFAULT_MAX_BATCH: usize = 100;
31/// Inbound channel capacity (backpressure boundary; drops when full, does not
32/// backpressure the main loop).
33pub const DEFAULT_QUEUE_CAPACITY: usize = 1024;
34
35/// Parsed Langfuse upload parameters (credentials already validated as non-empty).
36pub struct LangfuseSetup {
37 pub host: String,
38 pub public_key: String,
39 pub secret_key: String,
40 pub flush_interval: Duration,
41 pub max_batch: usize,
42}
43
44/// Starts the reporter with a [`LangfuseSetup`] and an already-built [`HttpStack`],
45/// returning an observer.
46///
47/// The reporter's background flush task is started here; the returned
48/// [`LangfuseObserver`] is passed to `AgentCore::observe_session`.
49#[must_use]
50pub fn build_observer(setup: LangfuseSetup, http: HttpStack) -> LangfuseObserver {
51 let ingest = LangfuseIngest::spawn(IngestConfig {
52 http,
53 host: setup.host,
54 public_key: setup.public_key,
55 secret_key: setup.secret_key,
56 max_batch: setup.max_batch,
57 flush_interval: setup.flush_interval,
58 queue_capacity: DEFAULT_QUEUE_CAPACITY,
59 });
60 LangfuseObserver::new(ingest)
61}
62
63// Tests heavily use chained indexing like `value["body"]["x"]` plus `.unwrap()`
64// assertions — these are far more readable than `.get().expect()`, and panicking on test
65// failure is exactly the desired behavior. Therefore, `indexing_slicing` and
66// `unwrap_used` are suppressed for the test module.
67#[cfg(test)]
68#[allow(clippy::indexing_slicing, clippy::unwrap_used)]
69mod tests;