Skip to main content

aster/tracing/
langfuse_layer.rs

1use crate::tracing::observation_layer::{BatchManager, ObservationLayer, SpanTracker};
2use chrono::Utc;
3use reqwest::{Client, StatusCode};
4use serde::{Deserialize, Serialize};
5use serde_json::{json, Value};
6use std::env;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::Mutex;
10use url::Url;
11use uuid::Uuid;
12
13const DEFAULT_LANGFUSE_URL: &str = "http://localhost:3000";
14
15#[derive(Debug, Serialize, Deserialize)]
16struct LangfuseIngestionResponse {
17    successes: Vec<LangfuseIngestionSuccess>,
18    errors: Vec<LangfuseIngestionError>,
19}
20
21#[derive(Debug, Serialize, Deserialize)]
22struct LangfuseIngestionSuccess {
23    id: String,
24    status: i32,
25}
26
27#[derive(Debug, Serialize, Deserialize)]
28struct LangfuseIngestionError {
29    id: String,
30    status: i32,
31    message: Option<String>,
32    error: Option<Value>,
33}
34
35#[derive(Debug, Clone)]
36pub struct LangfuseBatchManager {
37    pub batch: Vec<Value>,
38    pub client: Client,
39    pub base_url: String,
40    pub public_key: String,
41    pub secret_key: String,
42}
43
44impl LangfuseBatchManager {
45    pub fn new(public_key: String, secret_key: String, base_url: String) -> Self {
46        Self {
47            batch: Vec::new(),
48            client: Client::builder()
49                .timeout(Duration::from_secs(10))
50                .build()
51                .expect("Failed to create HTTP client"),
52            base_url,
53            public_key,
54            secret_key,
55        }
56    }
57
58    pub fn spawn_sender(manager: Arc<Mutex<Self>>) {
59        const BATCH_INTERVAL: Duration = Duration::from_secs(5);
60
61        tokio::spawn(async move {
62            loop {
63                tokio::time::sleep(BATCH_INTERVAL).await;
64                if let Err(e) = manager.lock().await.send() {
65                    tracing::error!(
66                        error.msg = %e,
67                        error.type = %std::any::type_name_of_val(&e),
68                        "Failed to send batch to Langfuse"
69                    );
70                }
71            }
72        });
73    }
74
75    pub async fn send_async(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
76        if self.batch.is_empty() {
77            return Ok(());
78        }
79
80        let payload = json!({ "batch": self.batch });
81        let base_url = Url::parse(&self.base_url).map_err(|e| format!("Invalid base URL: {e}"))?;
82        let url = base_url
83            .join("api/public/ingestion")
84            .map_err(|e| format!("Failed to construct endpoint URL: {e}"))?;
85
86        let response = self
87            .client
88            .post(url)
89            .basic_auth(&self.public_key, Some(&self.secret_key))
90            .json(&payload)
91            .send()
92            .await?;
93
94        match response.status() {
95            status if status.is_success() => {
96                let response_body: LangfuseIngestionResponse = response.json().await?;
97
98                for error in &response_body.errors {
99                    tracing::error!(
100                        id = %error.id,
101                        status = error.status,
102                        message = error.message.as_deref().unwrap_or("No message"),
103                        error = ?error.error,
104                        "Partial failure in batch ingestion"
105                    );
106                }
107
108                if !response_body.successes.is_empty() {
109                    self.batch.clear();
110                }
111
112                if response_body.successes.is_empty() && !response_body.errors.is_empty() {
113                    Err("Langfuse ingestion failed for all items".into())
114                } else {
115                    Ok(())
116                }
117            }
118            status @ (StatusCode::BAD_REQUEST
119            | StatusCode::UNAUTHORIZED
120            | StatusCode::FORBIDDEN
121            | StatusCode::NOT_FOUND
122            | StatusCode::METHOD_NOT_ALLOWED) => {
123                let err_text = response.text().await.unwrap_or_default();
124                Err(format!("Langfuse API error: {}: {}", status, err_text).into())
125            }
126            status => {
127                let err_text = response.text().await.unwrap_or_default();
128                Err(format!("Unexpected status code: {}: {}", status, err_text).into())
129            }
130        }
131    }
132}
133
134impl BatchManager for LangfuseBatchManager {
135    fn add_event(&mut self, event_type: &str, body: Value) {
136        self.batch.push(json!({
137            "id": Uuid::new_v4().to_string(),
138            "timestamp": Utc::now().to_rfc3339(),
139            "type": event_type,
140            "body": body
141        }));
142    }
143
144    fn send(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
145        tokio::task::block_in_place(|| {
146            tokio::runtime::Handle::current().block_on(self.send_async())
147        })
148    }
149
150    fn is_empty(&self) -> bool {
151        self.batch.is_empty()
152    }
153}
154
155pub fn create_langfuse_observer() -> Option<ObservationLayer> {
156    let public_key = env::var("LANGFUSE_PUBLIC_KEY")
157        .or_else(|_| env::var("LANGFUSE_INIT_PROJECT_PUBLIC_KEY"))
158        .unwrap_or_default(); // Use empty string if not found
159
160    let secret_key = env::var("LANGFUSE_SECRET_KEY")
161        .or_else(|_| env::var("LANGFUSE_INIT_PROJECT_SECRET_KEY"))
162        .unwrap_or_default(); // Use empty string if not found
163
164    // Return None if either key is empty
165    if public_key.is_empty() || secret_key.is_empty() {
166        return None;
167    }
168
169    let base_url = env::var("LANGFUSE_URL").unwrap_or_else(|_| DEFAULT_LANGFUSE_URL.to_string());
170
171    let batch_manager = Arc::new(Mutex::new(LangfuseBatchManager::new(
172        public_key, secret_key, base_url,
173    )));
174
175    if !cfg!(test) {
176        LangfuseBatchManager::spawn_sender(batch_manager.clone());
177    }
178
179    Some(ObservationLayer {
180        batch_manager,
181        span_tracker: Arc::new(Mutex::new(SpanTracker::new())),
182    })
183}
184
185#[cfg(test)]
186mod tests {
187    use super::*;
188    use serde_json::json;
189    use std::collections::HashMap;
190    use tokio::sync::Mutex;
191    use tracing::dispatcher;
192    use wiremock::matchers::{method, path};
193    use wiremock::{Mock, MockServer, ResponseTemplate};
194
195    struct TestFixture {
196        original_subscriber: Option<dispatcher::Dispatch>,
197        original_env_vars: HashMap<String, String>,
198        mock_server: Option<MockServer>,
199    }
200
201    impl TestFixture {
202        async fn new() -> Self {
203            Self {
204                original_subscriber: Some(dispatcher::get_default(dispatcher::Dispatch::clone)),
205                original_env_vars: Self::save_env_vars(),
206                mock_server: None,
207            }
208        }
209
210        fn save_env_vars() -> HashMap<String, String> {
211            [
212                "LANGFUSE_PUBLIC_KEY",
213                "LANGFUSE_INIT_PROJECT_PUBLIC_KEY",
214                "LANGFUSE_SECRET_KEY",
215                "LANGFUSE_INIT_PROJECT_SECRET_KEY",
216                "LANGFUSE_URL",
217            ]
218            .iter()
219            .filter_map(|&var| env::var(var).ok().map(|val| (var.to_string(), val)))
220            .collect()
221        }
222
223        async fn with_mock_server(mut self) -> Self {
224            self.mock_server = Some(MockServer::start().await);
225            self
226        }
227
228        fn mock_server_uri(&self) -> String {
229            self.mock_server
230                .as_ref()
231                .expect("Mock server not initialized")
232                .uri()
233        }
234
235        async fn mock_response(&self, status: u16, body: Value) {
236            Mock::given(method("POST"))
237                .and(path("/api/public/ingestion"))
238                .respond_with(ResponseTemplate::new(status).set_body_json(body))
239                .mount(self.mock_server.as_ref().unwrap())
240                .await;
241        }
242    }
243
244    impl Drop for TestFixture {
245        fn drop(&mut self) {
246            // Restore original subscriber
247            if let Some(subscriber) = &self.original_subscriber {
248                let _ = dispatcher::set_global_default(subscriber.clone());
249            }
250
251            // Restore environment
252            for var in [
253                "LANGFUSE_PUBLIC_KEY",
254                "LANGFUSE_INIT_PROJECT_PUBLIC_KEY",
255                "LANGFUSE_SECRET_KEY",
256                "LANGFUSE_INIT_PROJECT_SECRET_KEY",
257                "LANGFUSE_URL",
258            ] {
259                if let Some(value) = self.original_env_vars.get(var) {
260                    env::set_var(var, value);
261                } else {
262                    env::remove_var(var);
263                }
264            }
265        }
266    }
267
268    fn create_test_event() -> Value {
269        json!({
270            "name": "test_span",
271            "type": "SPAN"
272        })
273    }
274
275    #[tokio::test]
276    async fn test_batch_manager_creation() {
277        let _fixture = TestFixture::new().await;
278
279        let manager = LangfuseBatchManager::new(
280            "test-public".to_string(),
281            "test-secret".to_string(),
282            "http://test.local".to_string(),
283        );
284
285        assert_eq!(manager.public_key, "test-public");
286        assert_eq!(manager.secret_key, "test-secret");
287        assert_eq!(manager.base_url, "http://test.local");
288        assert!(manager.batch.is_empty());
289    }
290
291    #[tokio::test]
292    async fn test_add_event() {
293        let _fixture = TestFixture::new().await;
294        let mut manager = LangfuseBatchManager::new(
295            "test-public".to_string(),
296            "test-secret".to_string(),
297            "http://test.local".to_string(),
298        );
299
300        manager.add_event("test-event", create_test_event());
301
302        assert_eq!(manager.batch.len(), 1);
303        let event = &manager.batch[0];
304        assert_eq!(event["type"], "test-event");
305        assert_eq!(event["body"], create_test_event());
306        assert!(event["id"].as_str().is_some());
307        assert!(event["timestamp"].as_str().is_some());
308    }
309
310    #[tokio::test]
311    async fn test_batch_send_success() {
312        let fixture = TestFixture::new().await.with_mock_server().await;
313
314        fixture
315            .mock_response(
316                200,
317                json!({
318                    "successes": [{"id": "1", "status": 200}],
319                    "errors": []
320                }),
321            )
322            .await;
323
324        let mut manager = LangfuseBatchManager::new(
325            "test-public".to_string(),
326            "test-secret".to_string(),
327            fixture.mock_server_uri(),
328        );
329
330        manager.add_event("test-event", create_test_event());
331
332        let result = manager.send_async().await;
333        assert!(result.is_ok());
334        assert!(manager.batch.is_empty());
335    }
336
337    #[tokio::test]
338    async fn test_batch_send_partial_failure() {
339        let fixture = TestFixture::new().await.with_mock_server().await;
340
341        fixture
342            .mock_response(
343                200,
344                json!({
345                    "successes": [{"id": "1", "status": 200}],
346                    "errors": [{"id": "2", "status": 400, "message": "Invalid data"}]
347                }),
348            )
349            .await;
350
351        let mut manager = LangfuseBatchManager::new(
352            "test-public".to_string(),
353            "test-secret".to_string(),
354            fixture.mock_server_uri(),
355        );
356
357        manager.add_event("test-event", create_test_event());
358
359        let result = manager.send_async().await;
360        assert!(result.is_ok());
361        assert!(manager.batch.is_empty());
362    }
363
364    #[tokio::test]
365    async fn test_batch_send_complete_failure() {
366        let fixture = TestFixture::new().await.with_mock_server().await;
367
368        fixture
369            .mock_response(
370                200,
371                json!({
372                    "successes": [],
373                    "errors": [{"id": "1", "status": 400, "message": "Invalid data"}]
374                }),
375            )
376            .await;
377
378        let mut manager = LangfuseBatchManager::new(
379            "test-public".to_string(),
380            "test-secret".to_string(),
381            fixture.mock_server_uri(),
382        );
383
384        manager.add_event("test-event", create_test_event());
385
386        let result = manager.send_async().await;
387        assert!(result.is_err());
388        assert!(!manager.batch.is_empty());
389    }
390
391    #[tokio::test]
392    async fn test_create_langfuse_observer() {
393        let fixture = TestFixture::new().await.with_mock_server().await;
394
395        // Test 1: No environment variables set - remove all possible variables
396        for var in &[
397            "LANGFUSE_PUBLIC_KEY",
398            "LANGFUSE_INIT_PROJECT_PUBLIC_KEY",
399            "LANGFUSE_SECRET_KEY",
400            "LANGFUSE_INIT_PROJECT_SECRET_KEY",
401            "LANGFUSE_URL",
402        ] {
403            env::remove_var(var);
404        }
405
406        let observer = create_langfuse_observer();
407        assert!(
408            observer.is_none(),
409            "Observer should be None without environment variables"
410        );
411
412        // Test 2: Only public key set (regular)
413        env::set_var("LANGFUSE_PUBLIC_KEY", "test-public-key");
414        let observer = create_langfuse_observer();
415        assert!(
416            observer.is_none(),
417            "Observer should be None with only public key"
418        );
419        env::remove_var("LANGFUSE_PUBLIC_KEY");
420
421        // Test 3: Only secret key set (regular)
422        env::set_var("LANGFUSE_SECRET_KEY", "test-secret-key");
423        let observer = create_langfuse_observer();
424        assert!(
425            observer.is_none(),
426            "Observer should be None with only secret key"
427        );
428        env::remove_var("LANGFUSE_SECRET_KEY");
429
430        // Test 4: Only public key set (init project)
431        env::set_var("LANGFUSE_INIT_PROJECT_PUBLIC_KEY", "test-public-key");
432        let observer = create_langfuse_observer();
433        assert!(
434            observer.is_none(),
435            "Observer should be None with only init project public key"
436        );
437        env::remove_var("LANGFUSE_INIT_PROJECT_PUBLIC_KEY");
438
439        // Test 5: Only secret key set (init project)
440        env::set_var("LANGFUSE_INIT_PROJECT_SECRET_KEY", "test-secret-key");
441        let observer = create_langfuse_observer();
442        assert!(
443            observer.is_none(),
444            "Observer should be None with only init project secret key"
445        );
446        env::remove_var("LANGFUSE_INIT_PROJECT_SECRET_KEY");
447
448        // Test 6: Both regular keys set (should succeed)
449        env::set_var("LANGFUSE_PUBLIC_KEY", "test-public-key");
450        env::set_var("LANGFUSE_SECRET_KEY", "test-secret-key");
451        env::set_var("LANGFUSE_URL", fixture.mock_server_uri());
452        let observer = create_langfuse_observer();
453        assert!(
454            observer.is_some(),
455            "Observer should be Some with both regular keys set"
456        );
457
458        // Clean up regular keys
459        env::remove_var("LANGFUSE_PUBLIC_KEY");
460        env::remove_var("LANGFUSE_SECRET_KEY");
461
462        // Test 7: Both init project keys set (should succeed)
463        env::set_var("LANGFUSE_INIT_PROJECT_PUBLIC_KEY", "test-public-key");
464        env::set_var("LANGFUSE_INIT_PROJECT_SECRET_KEY", "test-secret-key");
465        let observer = create_langfuse_observer();
466        assert!(
467            observer.is_some(),
468            "Observer should be Some with both init project keys set"
469        );
470
471        // Verify the observer has an empty batch manager
472        let batch_manager = observer.unwrap().batch_manager;
473        assert!(batch_manager.lock().await.is_empty());
474    }
475    #[tokio::test]
476    async fn test_batch_manager_spawn_sender() {
477        let fixture = TestFixture::new().await.with_mock_server().await;
478
479        fixture
480            .mock_response(
481                200,
482                json!({
483                    "successes": [{"id": "1", "status": 200}],
484                    "errors": []
485                }),
486            )
487            .await;
488
489        let manager = Arc::new(Mutex::new(LangfuseBatchManager::new(
490            "test-public".to_string(),
491            "test-secret".to_string(),
492            fixture.mock_server_uri(),
493        )));
494
495        manager
496            .lock()
497            .await
498            .add_event("test-event", create_test_event());
499
500        // Instead of spawning the sender which uses blocking operations,
501        // test the async send directly
502        let result = manager.lock().await.send_async().await;
503        assert!(result.is_ok());
504        assert!(manager.lock().await.batch.is_empty());
505    }
506}