use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64;
use bytes::Bytes;
use defect_http::HttpStack;
use http::header::{AUTHORIZATION, CONTENT_TYPE};
use http::{Method, Request};
use http_body_util::{BodyExt, Full};
use tokio::sync::{mpsc, oneshot};
use tower::ServiceExt;
use super::model::{IngestionBatch, IngestionEvent, IngestionResponse};
enum Cmd {
Event(Box<IngestionEvent>),
Flush(oneshot::Sender<()>),
}
#[derive(Clone)]
pub struct LangfuseIngest {
tx: mpsc::Sender<Cmd>,
dropped: Arc<AtomicU64>,
}
pub struct IngestConfig {
pub http: HttpStack,
pub host: String,
pub public_key: String,
pub secret_key: String,
pub max_batch: usize,
pub flush_interval: Duration,
pub queue_capacity: usize,
}
impl LangfuseIngest {
pub fn spawn(config: IngestConfig) -> Self {
let (tx, rx) = mpsc::channel(config.queue_capacity);
let dropped = Arc::new(AtomicU64::new(0));
let auth = {
let raw = format!("{}:{}", config.public_key, config.secret_key);
format!("Basic {}", BASE64.encode(raw.as_bytes()))
};
let endpoint = format!("{}/api/public/ingestion", config.host.trim_end_matches('/'));
let worker = Worker {
rx,
http: config.http,
endpoint,
auth,
max_batch: config.max_batch.max(1),
flush_interval: config.flush_interval,
};
tokio::spawn(worker.run());
Self { tx, dropped }
}
pub fn enqueue(&self, event: IngestionEvent) {
if self.tx.try_send(Cmd::Event(Box::new(event))).is_err() {
let n = self.dropped.fetch_add(1, Ordering::Relaxed) + 1;
if n.is_multiple_of(256) {
tracing::warn!(
dropped_total = n,
"langfuse ingest queue full; dropping telemetry events (agent unaffected)"
);
}
}
}
pub async fn flush(&self) {
let (ack_tx, ack_rx) = oneshot::channel();
if self.tx.send(Cmd::Flush(ack_tx)).await.is_ok() {
let _ = ack_rx.await;
}
}
}
struct Worker {
rx: mpsc::Receiver<Cmd>,
http: HttpStack,
endpoint: String,
auth: String,
max_batch: usize,
flush_interval: Duration,
}
impl Worker {
async fn run(mut self) {
let mut buf: Vec<IngestionEvent> = Vec::new();
let mut tick = tokio::time::interval(self.flush_interval);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
cmd = self.rx.recv() => match cmd {
Some(Cmd::Event(ev)) => {
buf.push(*ev);
if buf.len() >= self.max_batch {
self.send_batch(&mut buf).await;
}
}
Some(Cmd::Flush(ack)) => {
self.send_batch(&mut buf).await;
let _ = ack.send(());
}
None => {
self.send_batch(&mut buf).await;
break;
}
},
_ = tick.tick() => {
self.send_batch(&mut buf).await;
}
}
}
}
async fn send_batch(&self, buf: &mut Vec<IngestionEvent>) {
if buf.is_empty() {
return;
}
let batch = std::mem::take(buf);
let body = match serde_json::to_vec(&IngestionBatch { batch }) {
Ok(b) => b,
Err(err) => {
tracing::warn!(%err, "langfuse: failed to serialize ingestion batch; dropped");
return;
}
};
let request = match Request::builder()
.method(Method::POST)
.uri(&self.endpoint)
.header(AUTHORIZATION, &self.auth)
.header(CONTENT_TYPE, "application/json")
.body(toac::body::Body::new(Full::new(Bytes::from(body))))
{
Ok(req) => req,
Err(err) => {
tracing::warn!(%err, "langfuse: failed to build ingestion request; dropped");
return;
}
};
match self.http.clone().oneshot(request).await {
Ok(resp) => self.inspect_response(resp).await,
Err(err) => {
tracing::warn!(%err, "langfuse: ingestion POST failed; batch dropped (no retry)");
}
}
}
async fn inspect_response(&self, resp: http::Response<hyper::body::Incoming>) {
let status = resp.status();
let body = match resp.into_body().collect().await {
Ok(collected) => collected.to_bytes(),
Err(err) => {
tracing::warn!(%status, %err, "langfuse: ingestion response body unreadable");
return;
}
};
if status.is_success() {
match serde_json::from_slice::<IngestionResponse>(&body) {
Ok(parsed) if parsed.errors.is_empty() => {
tracing::trace!(
succeeded = parsed.successes.len(),
"langfuse: ingestion batch accepted"
);
}
Ok(parsed) => {
tracing::warn!(
failed = parsed.errors.len(),
succeeded = parsed.successes.len(),
errors = ?parsed.errors,
"langfuse: some ingestion events rejected"
);
}
Err(err) => {
let snippet = String::from_utf8_lossy(&body);
let snippet = snippet.chars().take(512).collect::<String>();
tracing::debug!(%status, %err, body = %snippet, "langfuse: unrecognized ingestion response");
}
}
return;
}
let snippet = String::from_utf8_lossy(&body);
let snippet = snippet.chars().take(1024).collect::<String>();
tracing::warn!(%status, body = %snippet, "langfuse: ingestion request failed");
}
}