Skip to main content

synaptic_langfuse/
lib.rs

1//! Langfuse observability integration for Synaptic.
2//!
3//! Langfuse is an open-source LLM observability platform. This crate provides a
4//! CallbackHandler that records all Synaptic run events to Langfuse traces.
5
6use async_trait::async_trait;
7use base64::Engine;
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10use std::sync::Arc;
11use synaptic_core::{CallbackHandler, RunEvent, SynapticError};
12use tokio::sync::Mutex;
13
14/// Configuration for the Langfuse callback.
15#[derive(Debug, Clone)]
16pub struct LangfuseConfig {
17    /// Langfuse public key (pk-lf-...)
18    pub public_key: String,
19    /// Langfuse secret key (sk-lf-...)
20    pub secret_key: String,
21    /// Langfuse host (default: https://cloud.langfuse.com)
22    pub host: String,
23    /// How many events to buffer before flushing (default: 20)
24    pub flush_batch_size: usize,
25}
26
27impl LangfuseConfig {
28    /// Create a new Langfuse config for Langfuse Cloud.
29    pub fn new(public_key: impl Into<String>, secret_key: impl Into<String>) -> Self {
30        Self {
31            public_key: public_key.into(),
32            secret_key: secret_key.into(),
33            host: "https://cloud.langfuse.com".to_string(),
34            flush_batch_size: 20,
35        }
36    }
37
38    /// Use a self-hosted Langfuse instance.
39    pub fn with_host(mut self, host: impl Into<String>) -> Self {
40        self.host = host.into();
41        self
42    }
43
44    /// Set the number of events to buffer before flushing.
45    pub fn with_flush_batch_size(mut self, size: usize) -> Self {
46        self.flush_batch_size = size;
47        self
48    }
49}
50
51/// A single Langfuse event for the ingestion API.
52#[derive(Debug, Serialize, Deserialize)]
53pub struct LangfuseEvent {
54    pub id: String,
55    pub r#type: String,
56    pub timestamp: String,
57    pub body: Value,
58}
59
60/// Callback handler that sends Synaptic run events to Langfuse.
61pub struct LangfuseCallback {
62    config: LangfuseConfig,
63    client: reqwest::Client,
64    event_queue: Arc<Mutex<Vec<LangfuseEvent>>>,
65}
66
67impl LangfuseCallback {
68    /// Create a new Langfuse callback.
69    pub async fn new(config: LangfuseConfig) -> Result<Self, SynapticError> {
70        let client = reqwest::Client::new();
71        Ok(Self {
72            config,
73            client,
74            event_queue: Arc::new(Mutex::new(Vec::new())),
75        })
76    }
77
78    /// Flush buffered events to the Langfuse ingestion API.
79    pub async fn flush(&self) -> Result<(), SynapticError> {
80        let mut queue = self.event_queue.lock().await;
81        if queue.is_empty() {
82            return Ok(());
83        }
84        let batch: Vec<LangfuseEvent> = queue.drain(..).collect();
85        drop(queue);
86
87        let body = serde_json::json!({ "batch": batch });
88        let credentials = base64::engine::general_purpose::STANDARD.encode(format!(
89            "{}:{}",
90            self.config.public_key, self.config.secret_key
91        ));
92        let url = format!("{}/api/public/ingestion", self.config.host);
93        let resp = self
94            .client
95            .post(&url)
96            .header("Authorization", format!("Basic {}", credentials))
97            .header("Content-Type", "application/json")
98            .json(&body)
99            .send()
100            .await
101            .map_err(|e| SynapticError::Callback(format!("Langfuse flush: {}", e)))?;
102        let status = resp.status().as_u16();
103        if status >= 400 {
104            let text = resp.text().await.unwrap_or_default();
105            return Err(SynapticError::Callback(format!(
106                "Langfuse API error ({}): {}",
107                status, text
108            )));
109        }
110        Ok(())
111    }
112
113    /// Queue a Langfuse event. Flushes automatically when batch size is reached.
114    async fn queue_event(&self, event: LangfuseEvent) -> Result<(), SynapticError> {
115        let mut queue = self.event_queue.lock().await;
116        queue.push(event);
117        let should_flush = queue.len() >= self.config.flush_batch_size;
118        drop(queue);
119        if should_flush {
120            self.flush().await?;
121        }
122        Ok(())
123    }
124
125    fn now_iso8601() -> String {
126        let dur = std::time::SystemTime::now()
127            .duration_since(std::time::UNIX_EPOCH)
128            .unwrap_or_default();
129        let secs = dur.as_secs();
130        let millis = dur.subsec_millis();
131        let year = 1970 + secs / 31_536_000;
132        format!("{:04}-01-01T00:00:{:02}.{:03}Z", year, secs % 60, millis)
133    }
134}
135
136#[async_trait]
137impl CallbackHandler for LangfuseCallback {
138    async fn on_event(&self, event: RunEvent) -> Result<(), SynapticError> {
139        let event_type = format!("{:?}", event);
140        let event_name = event_type
141            .split_whitespace()
142            .next()
143            .unwrap_or("Unknown")
144            .to_string();
145        let ts = Self::now_iso8601();
146        let langfuse_event = LangfuseEvent {
147            id: uuid::Uuid::new_v4().to_string(),
148            r#type: "span-create".to_string(),
149            timestamp: ts.clone(),
150            body: serde_json::json!({
151                "name": event_name,
152                "startTime": ts,
153            }),
154        };
155        self.queue_event(langfuse_event).await
156    }
157}