defect_obs/langfuse/
ingest.rs1use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20use std::time::Duration;
21
22use base64::Engine;
23use base64::engine::general_purpose::STANDARD as BASE64;
24use bytes::Bytes;
25use defect_http::HttpStack;
26use http::header::{AUTHORIZATION, CONTENT_TYPE};
27use http::{Method, Request};
28use http_body_util::{BodyExt, Full};
29use tokio::sync::{mpsc, oneshot};
30use tower::ServiceExt;
31
32use super::model::{IngestionBatch, IngestionEvent, IngestionResponse};
33
34enum Cmd {
36 Event(Box<IngestionEvent>),
38 Flush(oneshot::Sender<()>),
41}
42
43#[derive(Clone)]
45pub struct LangfuseIngest {
46 tx: mpsc::Sender<Cmd>,
47 dropped: Arc<AtomicU64>,
50}
51
52pub struct IngestConfig {
54 pub http: HttpStack,
57 pub host: String,
59 pub public_key: String,
61 pub secret_key: String,
63 pub max_batch: usize,
65 pub flush_interval: Duration,
67 pub queue_capacity: usize,
69}
70
71impl LangfuseIngest {
72 pub fn spawn(config: IngestConfig) -> Self {
74 let (tx, rx) = mpsc::channel(config.queue_capacity);
75 let dropped = Arc::new(AtomicU64::new(0));
76
77 let auth = {
78 let raw = format!("{}:{}", config.public_key, config.secret_key);
79 format!("Basic {}", BASE64.encode(raw.as_bytes()))
80 };
81 let endpoint = format!("{}/api/public/ingestion", config.host.trim_end_matches('/'));
82
83 let worker = Worker {
84 rx,
85 http: config.http,
86 endpoint,
87 auth,
88 max_batch: config.max_batch.max(1),
89 flush_interval: config.flush_interval,
90 };
91 tokio::spawn(worker.run());
92
93 Self { tx, dropped }
94 }
95
96 pub fn enqueue(&self, event: IngestionEvent) {
99 if self.tx.try_send(Cmd::Event(Box::new(event))).is_err() {
100 let n = self.dropped.fetch_add(1, Ordering::Relaxed) + 1;
101 if n.is_multiple_of(256) {
103 tracing::warn!(
104 dropped_total = n,
105 "langfuse ingest queue full; dropping telemetry events (agent unaffected)"
106 );
107 }
108 }
109 }
110
111 pub async fn flush(&self) {
117 let (ack_tx, ack_rx) = oneshot::channel();
118 if self.tx.send(Cmd::Flush(ack_tx)).await.is_ok() {
119 let _ = ack_rx.await;
120 }
121 }
122}
123
124struct Worker {
126 rx: mpsc::Receiver<Cmd>,
127 http: HttpStack,
128 endpoint: String,
129 auth: String,
130 max_batch: usize,
131 flush_interval: Duration,
132}
133
134impl Worker {
135 async fn run(mut self) {
136 let mut buf: Vec<IngestionEvent> = Vec::new();
137 let mut tick = tokio::time::interval(self.flush_interval);
138 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
139
140 loop {
141 tokio::select! {
142 cmd = self.rx.recv() => match cmd {
143 Some(Cmd::Event(ev)) => {
144 buf.push(*ev);
145 if buf.len() >= self.max_batch {
146 self.send_batch(&mut buf).await;
147 }
148 }
149 Some(Cmd::Flush(ack)) => {
150 self.send_batch(&mut buf).await;
151 let _ = ack.send(());
152 }
153 None => {
155 self.send_batch(&mut buf).await;
156 break;
157 }
158 },
159 _ = tick.tick() => {
160 self.send_batch(&mut buf).await;
161 }
162 }
163 }
164 }
165
166 async fn send_batch(&self, buf: &mut Vec<IngestionEvent>) {
168 if buf.is_empty() {
169 return;
170 }
171 let batch = std::mem::take(buf);
172 let body = match serde_json::to_vec(&IngestionBatch { batch }) {
173 Ok(b) => b,
174 Err(err) => {
175 tracing::warn!(%err, "langfuse: failed to serialize ingestion batch; dropped");
176 return;
177 }
178 };
179
180 let request = match Request::builder()
181 .method(Method::POST)
182 .uri(&self.endpoint)
183 .header(AUTHORIZATION, &self.auth)
184 .header(CONTENT_TYPE, "application/json")
185 .body(toac::body::Body::new(Full::new(Bytes::from(body))))
186 {
187 Ok(req) => req,
188 Err(err) => {
189 tracing::warn!(%err, "langfuse: failed to build ingestion request; dropped");
190 return;
191 }
192 };
193
194 match self.http.clone().oneshot(request).await {
197 Ok(resp) => self.inspect_response(resp).await,
198 Err(err) => {
199 tracing::warn!(%err, "langfuse: ingestion POST failed; batch dropped (no retry)");
200 }
201 }
202 }
203
204 async fn inspect_response(&self, resp: http::Response<hyper::body::Incoming>) {
214 let status = resp.status();
215 let body = match resp.into_body().collect().await {
216 Ok(collected) => collected.to_bytes(),
217 Err(err) => {
218 tracing::warn!(%status, %err, "langfuse: ingestion response body unreadable");
219 return;
220 }
221 };
222
223 if status.is_success() {
224 match serde_json::from_slice::<IngestionResponse>(&body) {
226 Ok(parsed) if parsed.errors.is_empty() => {
227 tracing::trace!(
229 succeeded = parsed.successes.len(),
230 "langfuse: ingestion batch accepted"
231 );
232 }
233 Ok(parsed) => {
234 tracing::warn!(
235 failed = parsed.errors.len(),
236 succeeded = parsed.successes.len(),
237 errors = ?parsed.errors,
238 "langfuse: some ingestion events rejected"
239 );
240 }
241 Err(err) => {
242 let snippet = String::from_utf8_lossy(&body);
245 let snippet = snippet.chars().take(512).collect::<String>();
246 tracing::debug!(%status, %err, body = %snippet, "langfuse: unrecognized ingestion response");
247 }
248 }
249 return;
250 }
251
252 let snippet = String::from_utf8_lossy(&body);
254 let snippet = snippet.chars().take(1024).collect::<String>();
255 tracing::warn!(%status, body = %snippet, "langfuse: ingestion request failed");
256 }
257}