1use async_trait::async_trait;
7use base64::Engine;
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10use std::sync::Arc;
11use synaptic_core::{CallbackHandler, RunEvent, SynapticError};
12use tokio::sync::Mutex;
13
14#[derive(Debug, Clone)]
16pub struct LangfuseConfig {
17 pub public_key: String,
19 pub secret_key: String,
21 pub host: String,
23 pub flush_batch_size: usize,
25}
26
27impl LangfuseConfig {
28 pub fn new(public_key: impl Into<String>, secret_key: impl Into<String>) -> Self {
30 Self {
31 public_key: public_key.into(),
32 secret_key: secret_key.into(),
33 host: "https://cloud.langfuse.com".to_string(),
34 flush_batch_size: 20,
35 }
36 }
37
38 pub fn with_host(mut self, host: impl Into<String>) -> Self {
40 self.host = host.into();
41 self
42 }
43
44 pub fn with_flush_batch_size(mut self, size: usize) -> Self {
46 self.flush_batch_size = size;
47 self
48 }
49}
50
51#[derive(Debug, Serialize, Deserialize)]
53pub struct LangfuseEvent {
54 pub id: String,
55 pub r#type: String,
56 pub timestamp: String,
57 pub body: Value,
58}
59
60pub struct LangfuseCallback {
62 config: LangfuseConfig,
63 client: reqwest::Client,
64 event_queue: Arc<Mutex<Vec<LangfuseEvent>>>,
65}
66
67impl LangfuseCallback {
68 pub async fn new(config: LangfuseConfig) -> Result<Self, SynapticError> {
70 let client = reqwest::Client::new();
71 Ok(Self {
72 config,
73 client,
74 event_queue: Arc::new(Mutex::new(Vec::new())),
75 })
76 }
77
78 pub async fn flush(&self) -> Result<(), SynapticError> {
80 let mut queue = self.event_queue.lock().await;
81 if queue.is_empty() {
82 return Ok(());
83 }
84 let batch: Vec<LangfuseEvent> = queue.drain(..).collect();
85 drop(queue);
86
87 let body = serde_json::json!({ "batch": batch });
88 let credentials = base64::engine::general_purpose::STANDARD.encode(format!(
89 "{}:{}",
90 self.config.public_key, self.config.secret_key
91 ));
92 let url = format!("{}/api/public/ingestion", self.config.host);
93 let resp = self
94 .client
95 .post(&url)
96 .header("Authorization", format!("Basic {}", credentials))
97 .header("Content-Type", "application/json")
98 .json(&body)
99 .send()
100 .await
101 .map_err(|e| SynapticError::Callback(format!("Langfuse flush: {}", e)))?;
102 let status = resp.status().as_u16();
103 if status >= 400 {
104 let text = resp.text().await.unwrap_or_default();
105 return Err(SynapticError::Callback(format!(
106 "Langfuse API error ({}): {}",
107 status, text
108 )));
109 }
110 Ok(())
111 }
112
113 async fn queue_event(&self, event: LangfuseEvent) -> Result<(), SynapticError> {
115 let mut queue = self.event_queue.lock().await;
116 queue.push(event);
117 let should_flush = queue.len() >= self.config.flush_batch_size;
118 drop(queue);
119 if should_flush {
120 self.flush().await?;
121 }
122 Ok(())
123 }
124
125 fn now_iso8601() -> String {
126 let dur = std::time::SystemTime::now()
127 .duration_since(std::time::UNIX_EPOCH)
128 .unwrap_or_default();
129 let secs = dur.as_secs();
130 let millis = dur.subsec_millis();
131 let year = 1970 + secs / 31_536_000;
132 format!("{:04}-01-01T00:00:{:02}.{:03}Z", year, secs % 60, millis)
133 }
134}
135
136#[async_trait]
137impl CallbackHandler for LangfuseCallback {
138 async fn on_event(&self, event: RunEvent) -> Result<(), SynapticError> {
139 let event_type = format!("{:?}", event);
140 let event_name = event_type
141 .split_whitespace()
142 .next()
143 .unwrap_or("Unknown")
144 .to_string();
145 let ts = Self::now_iso8601();
146 let langfuse_event = LangfuseEvent {
147 id: uuid::Uuid::new_v4().to_string(),
148 r#type: "span-create".to_string(),
149 timestamp: ts.clone(),
150 body: serde_json::json!({
151 "name": event_name,
152 "startTime": ts,
153 }),
154 };
155 self.queue_event(langfuse_event).await
156 }
157}