Skip to main content

defect_obs/
langfuse.rs

1//! Langfuse integration — uploads an `AgentEvent` stream as Langfuse traces, generations,
2//! and spans.
3//!
4//! Module layout:
5//! - [`model`]: wire types for the ingestion API.
6//! - [`ingest`]: buffered batching + background reporting (with optional drop
7//!   degradation).
8//! - [`projector`]: translates `AgentEvent` into ingestion events.
9//! - [`observer`]: `SessionObserver` implementation that subscribes to the event stream
10//!   per session.
11
12use std::time::Duration;
13
14use defect_http::HttpStack;
15
16pub mod ingest;
17pub mod model;
18pub mod observer;
19pub mod projector;
20
21pub use ingest::{IngestConfig, LangfuseIngest};
22pub use observer::LangfuseObserver;
23pub use projector::TraceProjector;
24
25/// Default Langfuse host.
26pub const DEFAULT_HOST: &str = "https://cloud.langfuse.com";
27/// Default periodic flush interval.
28pub const DEFAULT_FLUSH_INTERVAL: Duration = Duration::from_secs(2);
29/// Default maximum number of events per batch.
30pub const DEFAULT_MAX_BATCH: usize = 100;
31/// Inbound channel capacity (backpressure boundary; drops when full, does not
32/// backpressure the main loop).
33pub const DEFAULT_QUEUE_CAPACITY: usize = 1024;
34
35/// Parsed Langfuse upload parameters (credentials already validated as non-empty).
36pub struct LangfuseSetup {
37    pub host: String,
38    pub public_key: String,
39    pub secret_key: String,
40    pub flush_interval: Duration,
41    pub max_batch: usize,
42}
43
44/// Starts the reporter with a [`LangfuseSetup`] and an already-built [`HttpStack`],
45/// returning an observer.
46///
47/// The reporter's background flush task is started here; the returned
48/// [`LangfuseObserver`] is passed to `AgentCore::observe_session`.
49#[must_use]
50pub fn build_observer(setup: LangfuseSetup, http: HttpStack) -> LangfuseObserver {
51    let ingest = LangfuseIngest::spawn(IngestConfig {
52        http,
53        host: setup.host,
54        public_key: setup.public_key,
55        secret_key: setup.secret_key,
56        max_batch: setup.max_batch,
57        flush_interval: setup.flush_interval,
58        queue_capacity: DEFAULT_QUEUE_CAPACITY,
59    });
60    LangfuseObserver::new(ingest)
61}
62
63// Tests heavily use chained indexing like `value["body"]["x"]` plus `.unwrap()`
64// assertions — these are far more readable than `.get().expect()`, and panicking on test
65// failure is exactly the desired behavior. Therefore, `indexing_slicing` and
66// `unwrap_used` are suppressed for the test module.
67#[cfg(test)]
68#[allow(clippy::indexing_slicing, clippy::unwrap_used)]
69mod tests;