Skip to main content

defect_obs/langfuse/
ingest.rs

1//! Langfuse batch uploader.
2//!
3//! Pipeline: `enqueue` (non-blocking) → bounded mpsc → background flush task (batch on N
4//! items or every T seconds)
5//! → reuses `defect-http`'s [`HttpStack`] POST `/api/public/ingestion`.
6//!
7//! ## Drop-safe degradation (hard constraint)
8//!
9//! Langfuse is out-of-band telemetry; **no failure may affect the agent's main loop**:
10//! - `enqueue` uses `try_send`; when the channel is full, **drop and count a warning**,
11//!   never block;
12//! - POST failures only `warn!`, **no retry** (to avoid backpressure buildup);
13//! - On 207 (partial success), read the body and log errors, but do not affect subsequent
14//!   processing.
15//!
16//! Langfuse ingestion — batch upload of traces and observations.
17
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20use std::time::Duration;
21
22use base64::Engine;
23use base64::engine::general_purpose::STANDARD as BASE64;
24use bytes::Bytes;
25use defect_http::HttpStack;
26use http::header::{AUTHORIZATION, CONTENT_TYPE};
27use http::{Method, Request};
28use http_body_util::{BodyExt, Full};
29use tokio::sync::{mpsc, oneshot};
30use tower::ServiceExt;
31
32use super::model::{IngestionBatch, IngestionEvent, IngestionResponse};
33
34/// Commands for the background task.
35enum Cmd {
36    /// An event to be reported.
37    Event(Box<IngestionEvent>),
38    /// Flush the buffer immediately and signal completion via the oneshot (used for
39    /// flushing before shutdown).
40    Flush(oneshot::Sender<()>),
41}
42
43/// Ingest handle. `Clone` is cheap (inner `Arc`) — each session's observer holds one.
44#[derive(Clone)]
45pub struct LangfuseIngest {
46    tx: mpsc::Sender<Cmd>,
47    /// Cumulative count of events dropped due to a full channel. Used only for throttling
48    /// alerts.
49    dropped: Arc<AtomicU64>,
50}
51
52/// Configuration for building the reporter.
53pub struct IngestConfig {
54    /// Pre-built HTTP stack (shared with the LLM provider, includes
55    /// timeout/retry/proxy/UA/trace).
56    pub http: HttpStack,
57    /// Langfuse host, e.g. `https://cloud.langfuse.com` (without trailing slash).
58    pub host: String,
59    /// Public key.
60    pub public_key: String,
61    /// Secret key.
62    pub secret_key: String,
63    /// Flush when the batch reaches this many items.
64    pub max_batch: usize,
65    /// Periodic flush interval.
66    pub flush_interval: Duration,
67    /// Capacity of the enqueue channel (backpressure boundary; drops when full).
68    pub queue_capacity: usize,
69}
70
71impl LangfuseIngest {
72    /// Spawns the background flush task and returns a handle.
73    pub fn spawn(config: IngestConfig) -> Self {
74        let (tx, rx) = mpsc::channel(config.queue_capacity);
75        let dropped = Arc::new(AtomicU64::new(0));
76
77        let auth = {
78            let raw = format!("{}:{}", config.public_key, config.secret_key);
79            format!("Basic {}", BASE64.encode(raw.as_bytes()))
80        };
81        let endpoint = format!("{}/api/public/ingestion", config.host.trim_end_matches('/'));
82
83        let worker = Worker {
84            rx,
85            http: config.http,
86            endpoint,
87            auth,
88            max_batch: config.max_batch.max(1),
89            flush_interval: config.flush_interval,
90        };
91        tokio::spawn(worker.run());
92
93        Self { tx, dropped }
94    }
95
96    /// Non‑blocking enqueue. Drops and counts when the channel is full — never blocks the
97    /// caller (agent main loop).
98    pub fn enqueue(&self, event: IngestionEvent) {
99        if self.tx.try_send(Cmd::Event(Box::new(event))).is_err() {
100            let n = self.dropped.fetch_add(1, Ordering::Relaxed) + 1;
101            // Throttle warnings: only warn once per batch of drops to avoid log storms.
102            if n.is_multiple_of(256) {
103                tracing::warn!(
104                    dropped_total = n,
105                    "langfuse ingest queue full; dropping telemetry events (agent unaffected)"
106                );
107            }
108        }
109    }
110
111    /// Flushes the buffer and waits for completion. Used for best-effort delivery before
112    /// a session stream ends or the process exits.
113    ///
114    /// Returns immediately if the background task has already exited (receiver closed) —
115    /// best-effort, no delivery guarantee.
116    pub async fn flush(&self) {
117        let (ack_tx, ack_rx) = oneshot::channel();
118        if self.tx.send(Cmd::Flush(ack_tx)).await.is_ok() {
119            let _ = ack_rx.await;
120        }
121    }
122}
123
124/// State of the background flush task.
125struct Worker {
126    rx: mpsc::Receiver<Cmd>,
127    http: HttpStack,
128    endpoint: String,
129    auth: String,
130    max_batch: usize,
131    flush_interval: Duration,
132}
133
134impl Worker {
135    async fn run(mut self) {
136        let mut buf: Vec<IngestionEvent> = Vec::new();
137        let mut tick = tokio::time::interval(self.flush_interval);
138        tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
139
140        loop {
141            tokio::select! {
142                cmd = self.rx.recv() => match cmd {
143                    Some(Cmd::Event(ev)) => {
144                        buf.push(*ev);
145                        if buf.len() >= self.max_batch {
146                            self.send_batch(&mut buf).await;
147                        }
148                    }
149                    Some(Cmd::Flush(ack)) => {
150                        self.send_batch(&mut buf).await;
151                        let _ = ack.send(());
152                    }
153                    // All senders dropped: flush remaining data and exit.
154                    None => {
155                        self.send_batch(&mut buf).await;
156                        break;
157                    }
158                },
159                _ = tick.tick() => {
160                    self.send_batch(&mut buf).await;
161                }
162            }
163        }
164    }
165
166    /// Sends the current buffer as a single request. An empty buffer is a no-op.
167    async fn send_batch(&self, buf: &mut Vec<IngestionEvent>) {
168        if buf.is_empty() {
169            return;
170        }
171        let batch = std::mem::take(buf);
172        let body = match serde_json::to_vec(&IngestionBatch { batch }) {
173            Ok(b) => b,
174            Err(err) => {
175                tracing::warn!(%err, "langfuse: failed to serialize ingestion batch; dropped");
176                return;
177            }
178        };
179
180        let request = match Request::builder()
181            .method(Method::POST)
182            .uri(&self.endpoint)
183            .header(AUTHORIZATION, &self.auth)
184            .header(CONTENT_TYPE, "application/json")
185            .body(toac::body::Body::new(Full::new(Bytes::from(body))))
186        {
187            Ok(req) => req,
188            Err(err) => {
189                tracing::warn!(%err, "langfuse: failed to build ingestion request; dropped");
190                return;
191            }
192        };
193
194        // `HttpStack` is a cloneable tower service — clone an independent copy and call
195        // `oneshot` on it.
196        match self.http.clone().oneshot(request).await {
197            Ok(resp) => self.inspect_response(resp).await,
198            Err(err) => {
199                tracing::warn!(%err, "langfuse: ingestion POST failed; batch dropped (no retry)");
200            }
201        }
202    }
203
204    /// Inspect the response.
205    ///
206    /// The Langfuse ingestion endpoint **always returns 207 Multi-Status** for batch
207    /// requests, with per-item results in the body's `successes` / `errors` fields.
208    /// Therefore:
209    /// - **2xx (including 207)**: parse the body; warn only if `errors` is **non-empty**
210    ///   (partial failure). If `errors` is empty (all succeeded), return silently — this
211    ///   is the normal path, not an error.
212    /// - **Non-2xx** (401/403/5xx etc., genuine errors): warn as-is.
213    async fn inspect_response(&self, resp: http::Response<hyper::body::Incoming>) {
214        let status = resp.status();
215        let body = match resp.into_body().collect().await {
216            Ok(collected) => collected.to_bytes(),
217            Err(err) => {
218                tracing::warn!(%status, %err, "langfuse: ingestion response body unreadable");
219                return;
220            }
221        };
222
223        if status.is_success() {
224            // Parse individual results; warn only when there are actual failures.
225            match serde_json::from_slice::<IngestionResponse>(&body) {
226                Ok(parsed) if parsed.errors.is_empty() => {
227                    // Normal path: all succeeded, silent.
228                    tracing::trace!(
229                        succeeded = parsed.successes.len(),
230                        "langfuse: ingestion batch accepted"
231                    );
232                }
233                Ok(parsed) => {
234                    tracing::warn!(
235                        failed = parsed.errors.len(),
236                        succeeded = parsed.successes.len(),
237                        errors = ?parsed.errors,
238                        "langfuse: some ingestion events rejected"
239                    );
240                }
241                Err(err) => {
242                    // 2xx but body is not the expected structure — log a debug line, do
243                    // not treat as an error.
244                    let snippet = String::from_utf8_lossy(&body);
245                    let snippet = snippet.chars().take(512).collect::<String>();
246                    tracing::debug!(%status, %err, body = %snippet, "langfuse: unrecognized ingestion response");
247                }
248            }
249            return;
250        }
251
252        // Non-2xx: real error (auth failure / server error, etc.).
253        let snippet = String::from_utf8_lossy(&body);
254        let snippet = snippet.chars().take(1024).collect::<String>();
255        tracing::warn!(%status, body = %snippet, "langfuse: ingestion request failed");
256    }
257}