1use crate::tracing::observation_layer::{BatchManager, ObservationLayer, SpanTracker};
2use chrono::Utc;
3use reqwest::{Client, StatusCode};
4use serde::{Deserialize, Serialize};
5use serde_json::{json, Value};
6use std::env;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::Mutex;
10use url::Url;
11use uuid::Uuid;
12
13const DEFAULT_LANGFUSE_URL: &str = "http://localhost:3000";
14
15#[derive(Debug, Serialize, Deserialize)]
16struct LangfuseIngestionResponse {
17 successes: Vec<LangfuseIngestionSuccess>,
18 errors: Vec<LangfuseIngestionError>,
19}
20
21#[derive(Debug, Serialize, Deserialize)]
22struct LangfuseIngestionSuccess {
23 id: String,
24 status: i32,
25}
26
27#[derive(Debug, Serialize, Deserialize)]
28struct LangfuseIngestionError {
29 id: String,
30 status: i32,
31 message: Option<String>,
32 error: Option<Value>,
33}
34
35#[derive(Debug, Clone)]
36pub struct LangfuseBatchManager {
37 pub batch: Vec<Value>,
38 pub client: Client,
39 pub base_url: String,
40 pub public_key: String,
41 pub secret_key: String,
42}
43
44impl LangfuseBatchManager {
45 pub fn new(public_key: String, secret_key: String, base_url: String) -> Self {
46 Self {
47 batch: Vec::new(),
48 client: Client::builder()
49 .timeout(Duration::from_secs(10))
50 .build()
51 .expect("Failed to create HTTP client"),
52 base_url,
53 public_key,
54 secret_key,
55 }
56 }
57
58 pub fn spawn_sender(manager: Arc<Mutex<Self>>) {
59 const BATCH_INTERVAL: Duration = Duration::from_secs(5);
60
61 tokio::spawn(async move {
62 loop {
63 tokio::time::sleep(BATCH_INTERVAL).await;
64 if let Err(e) = manager.lock().await.send() {
65 tracing::error!(
66 error.msg = %e,
67 error.type = %std::any::type_name_of_val(&e),
68 "Failed to send batch to Langfuse"
69 );
70 }
71 }
72 });
73 }
74
75 pub async fn send_async(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
76 if self.batch.is_empty() {
77 return Ok(());
78 }
79
80 let payload = json!({ "batch": self.batch });
81 let base_url = Url::parse(&self.base_url).map_err(|e| format!("Invalid base URL: {e}"))?;
82 let url = base_url
83 .join("api/public/ingestion")
84 .map_err(|e| format!("Failed to construct endpoint URL: {e}"))?;
85
86 let response = self
87 .client
88 .post(url)
89 .basic_auth(&self.public_key, Some(&self.secret_key))
90 .json(&payload)
91 .send()
92 .await?;
93
94 match response.status() {
95 status if status.is_success() => {
96 let response_body: LangfuseIngestionResponse = response.json().await?;
97
98 for error in &response_body.errors {
99 tracing::error!(
100 id = %error.id,
101 status = error.status,
102 message = error.message.as_deref().unwrap_or("No message"),
103 error = ?error.error,
104 "Partial failure in batch ingestion"
105 );
106 }
107
108 if !response_body.successes.is_empty() {
109 self.batch.clear();
110 }
111
112 if response_body.successes.is_empty() && !response_body.errors.is_empty() {
113 Err("Langfuse ingestion failed for all items".into())
114 } else {
115 Ok(())
116 }
117 }
118 status @ (StatusCode::BAD_REQUEST
119 | StatusCode::UNAUTHORIZED
120 | StatusCode::FORBIDDEN
121 | StatusCode::NOT_FOUND
122 | StatusCode::METHOD_NOT_ALLOWED) => {
123 let err_text = response.text().await.unwrap_or_default();
124 Err(format!("Langfuse API error: {}: {}", status, err_text).into())
125 }
126 status => {
127 let err_text = response.text().await.unwrap_or_default();
128 Err(format!("Unexpected status code: {}: {}", status, err_text).into())
129 }
130 }
131 }
132}
133
134impl BatchManager for LangfuseBatchManager {
135 fn add_event(&mut self, event_type: &str, body: Value) {
136 self.batch.push(json!({
137 "id": Uuid::new_v4().to_string(),
138 "timestamp": Utc::now().to_rfc3339(),
139 "type": event_type,
140 "body": body
141 }));
142 }
143
144 fn send(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
145 tokio::task::block_in_place(|| {
146 tokio::runtime::Handle::current().block_on(self.send_async())
147 })
148 }
149
150 fn is_empty(&self) -> bool {
151 self.batch.is_empty()
152 }
153}
154
155pub fn create_langfuse_observer() -> Option<ObservationLayer> {
156 let public_key = env::var("LANGFUSE_PUBLIC_KEY")
157 .or_else(|_| env::var("LANGFUSE_INIT_PROJECT_PUBLIC_KEY"))
158 .unwrap_or_default(); let secret_key = env::var("LANGFUSE_SECRET_KEY")
161 .or_else(|_| env::var("LANGFUSE_INIT_PROJECT_SECRET_KEY"))
162 .unwrap_or_default(); if public_key.is_empty() || secret_key.is_empty() {
166 return None;
167 }
168
169 let base_url = env::var("LANGFUSE_URL").unwrap_or_else(|_| DEFAULT_LANGFUSE_URL.to_string());
170
171 let batch_manager = Arc::new(Mutex::new(LangfuseBatchManager::new(
172 public_key, secret_key, base_url,
173 )));
174
175 if !cfg!(test) {
176 LangfuseBatchManager::spawn_sender(batch_manager.clone());
177 }
178
179 Some(ObservationLayer {
180 batch_manager,
181 span_tracker: Arc::new(Mutex::new(SpanTracker::new())),
182 })
183}
184
185#[cfg(test)]
186mod tests {
187 use super::*;
188 use serde_json::json;
189 use std::collections::HashMap;
190 use tokio::sync::Mutex;
191 use tracing::dispatcher;
192 use wiremock::matchers::{method, path};
193 use wiremock::{Mock, MockServer, ResponseTemplate};
194
195 struct TestFixture {
196 original_subscriber: Option<dispatcher::Dispatch>,
197 original_env_vars: HashMap<String, String>,
198 mock_server: Option<MockServer>,
199 }
200
201 impl TestFixture {
202 async fn new() -> Self {
203 Self {
204 original_subscriber: Some(dispatcher::get_default(dispatcher::Dispatch::clone)),
205 original_env_vars: Self::save_env_vars(),
206 mock_server: None,
207 }
208 }
209
210 fn save_env_vars() -> HashMap<String, String> {
211 [
212 "LANGFUSE_PUBLIC_KEY",
213 "LANGFUSE_INIT_PROJECT_PUBLIC_KEY",
214 "LANGFUSE_SECRET_KEY",
215 "LANGFUSE_INIT_PROJECT_SECRET_KEY",
216 "LANGFUSE_URL",
217 ]
218 .iter()
219 .filter_map(|&var| env::var(var).ok().map(|val| (var.to_string(), val)))
220 .collect()
221 }
222
223 async fn with_mock_server(mut self) -> Self {
224 self.mock_server = Some(MockServer::start().await);
225 self
226 }
227
228 fn mock_server_uri(&self) -> String {
229 self.mock_server
230 .as_ref()
231 .expect("Mock server not initialized")
232 .uri()
233 }
234
235 async fn mock_response(&self, status: u16, body: Value) {
236 Mock::given(method("POST"))
237 .and(path("/api/public/ingestion"))
238 .respond_with(ResponseTemplate::new(status).set_body_json(body))
239 .mount(self.mock_server.as_ref().unwrap())
240 .await;
241 }
242 }
243
244 impl Drop for TestFixture {
245 fn drop(&mut self) {
246 if let Some(subscriber) = &self.original_subscriber {
248 let _ = dispatcher::set_global_default(subscriber.clone());
249 }
250
251 for var in [
253 "LANGFUSE_PUBLIC_KEY",
254 "LANGFUSE_INIT_PROJECT_PUBLIC_KEY",
255 "LANGFUSE_SECRET_KEY",
256 "LANGFUSE_INIT_PROJECT_SECRET_KEY",
257 "LANGFUSE_URL",
258 ] {
259 if let Some(value) = self.original_env_vars.get(var) {
260 env::set_var(var, value);
261 } else {
262 env::remove_var(var);
263 }
264 }
265 }
266 }
267
268 fn create_test_event() -> Value {
269 json!({
270 "name": "test_span",
271 "type": "SPAN"
272 })
273 }
274
275 #[tokio::test]
276 async fn test_batch_manager_creation() {
277 let _fixture = TestFixture::new().await;
278
279 let manager = LangfuseBatchManager::new(
280 "test-public".to_string(),
281 "test-secret".to_string(),
282 "http://test.local".to_string(),
283 );
284
285 assert_eq!(manager.public_key, "test-public");
286 assert_eq!(manager.secret_key, "test-secret");
287 assert_eq!(manager.base_url, "http://test.local");
288 assert!(manager.batch.is_empty());
289 }
290
291 #[tokio::test]
292 async fn test_add_event() {
293 let _fixture = TestFixture::new().await;
294 let mut manager = LangfuseBatchManager::new(
295 "test-public".to_string(),
296 "test-secret".to_string(),
297 "http://test.local".to_string(),
298 );
299
300 manager.add_event("test-event", create_test_event());
301
302 assert_eq!(manager.batch.len(), 1);
303 let event = &manager.batch[0];
304 assert_eq!(event["type"], "test-event");
305 assert_eq!(event["body"], create_test_event());
306 assert!(event["id"].as_str().is_some());
307 assert!(event["timestamp"].as_str().is_some());
308 }
309
310 #[tokio::test]
311 async fn test_batch_send_success() {
312 let fixture = TestFixture::new().await.with_mock_server().await;
313
314 fixture
315 .mock_response(
316 200,
317 json!({
318 "successes": [{"id": "1", "status": 200}],
319 "errors": []
320 }),
321 )
322 .await;
323
324 let mut manager = LangfuseBatchManager::new(
325 "test-public".to_string(),
326 "test-secret".to_string(),
327 fixture.mock_server_uri(),
328 );
329
330 manager.add_event("test-event", create_test_event());
331
332 let result = manager.send_async().await;
333 assert!(result.is_ok());
334 assert!(manager.batch.is_empty());
335 }
336
337 #[tokio::test]
338 async fn test_batch_send_partial_failure() {
339 let fixture = TestFixture::new().await.with_mock_server().await;
340
341 fixture
342 .mock_response(
343 200,
344 json!({
345 "successes": [{"id": "1", "status": 200}],
346 "errors": [{"id": "2", "status": 400, "message": "Invalid data"}]
347 }),
348 )
349 .await;
350
351 let mut manager = LangfuseBatchManager::new(
352 "test-public".to_string(),
353 "test-secret".to_string(),
354 fixture.mock_server_uri(),
355 );
356
357 manager.add_event("test-event", create_test_event());
358
359 let result = manager.send_async().await;
360 assert!(result.is_ok());
361 assert!(manager.batch.is_empty());
362 }
363
364 #[tokio::test]
365 async fn test_batch_send_complete_failure() {
366 let fixture = TestFixture::new().await.with_mock_server().await;
367
368 fixture
369 .mock_response(
370 200,
371 json!({
372 "successes": [],
373 "errors": [{"id": "1", "status": 400, "message": "Invalid data"}]
374 }),
375 )
376 .await;
377
378 let mut manager = LangfuseBatchManager::new(
379 "test-public".to_string(),
380 "test-secret".to_string(),
381 fixture.mock_server_uri(),
382 );
383
384 manager.add_event("test-event", create_test_event());
385
386 let result = manager.send_async().await;
387 assert!(result.is_err());
388 assert!(!manager.batch.is_empty());
389 }
390
391 #[tokio::test]
392 async fn test_create_langfuse_observer() {
393 let fixture = TestFixture::new().await.with_mock_server().await;
394
395 for var in &[
397 "LANGFUSE_PUBLIC_KEY",
398 "LANGFUSE_INIT_PROJECT_PUBLIC_KEY",
399 "LANGFUSE_SECRET_KEY",
400 "LANGFUSE_INIT_PROJECT_SECRET_KEY",
401 "LANGFUSE_URL",
402 ] {
403 env::remove_var(var);
404 }
405
406 let observer = create_langfuse_observer();
407 assert!(
408 observer.is_none(),
409 "Observer should be None without environment variables"
410 );
411
412 env::set_var("LANGFUSE_PUBLIC_KEY", "test-public-key");
414 let observer = create_langfuse_observer();
415 assert!(
416 observer.is_none(),
417 "Observer should be None with only public key"
418 );
419 env::remove_var("LANGFUSE_PUBLIC_KEY");
420
421 env::set_var("LANGFUSE_SECRET_KEY", "test-secret-key");
423 let observer = create_langfuse_observer();
424 assert!(
425 observer.is_none(),
426 "Observer should be None with only secret key"
427 );
428 env::remove_var("LANGFUSE_SECRET_KEY");
429
430 env::set_var("LANGFUSE_INIT_PROJECT_PUBLIC_KEY", "test-public-key");
432 let observer = create_langfuse_observer();
433 assert!(
434 observer.is_none(),
435 "Observer should be None with only init project public key"
436 );
437 env::remove_var("LANGFUSE_INIT_PROJECT_PUBLIC_KEY");
438
439 env::set_var("LANGFUSE_INIT_PROJECT_SECRET_KEY", "test-secret-key");
441 let observer = create_langfuse_observer();
442 assert!(
443 observer.is_none(),
444 "Observer should be None with only init project secret key"
445 );
446 env::remove_var("LANGFUSE_INIT_PROJECT_SECRET_KEY");
447
448 env::set_var("LANGFUSE_PUBLIC_KEY", "test-public-key");
450 env::set_var("LANGFUSE_SECRET_KEY", "test-secret-key");
451 env::set_var("LANGFUSE_URL", fixture.mock_server_uri());
452 let observer = create_langfuse_observer();
453 assert!(
454 observer.is_some(),
455 "Observer should be Some with both regular keys set"
456 );
457
458 env::remove_var("LANGFUSE_PUBLIC_KEY");
460 env::remove_var("LANGFUSE_SECRET_KEY");
461
462 env::set_var("LANGFUSE_INIT_PROJECT_PUBLIC_KEY", "test-public-key");
464 env::set_var("LANGFUSE_INIT_PROJECT_SECRET_KEY", "test-secret-key");
465 let observer = create_langfuse_observer();
466 assert!(
467 observer.is_some(),
468 "Observer should be Some with both init project keys set"
469 );
470
471 let batch_manager = observer.unwrap().batch_manager;
473 assert!(batch_manager.lock().await.is_empty());
474 }
475 #[tokio::test]
476 async fn test_batch_manager_spawn_sender() {
477 let fixture = TestFixture::new().await.with_mock_server().await;
478
479 fixture
480 .mock_response(
481 200,
482 json!({
483 "successes": [{"id": "1", "status": 200}],
484 "errors": []
485 }),
486 )
487 .await;
488
489 let manager = Arc::new(Mutex::new(LangfuseBatchManager::new(
490 "test-public".to_string(),
491 "test-secret".to_string(),
492 fixture.mock_server_uri(),
493 )));
494
495 manager
496 .lock()
497 .await
498 .add_event("test-event", create_test_event());
499
500 let result = manager.lock().await.send_async().await;
503 assert!(result.is_ok());
504 assert!(manager.lock().await.batch.is_empty());
505 }
506}