use std::sync::Arc;
use std::time::Duration;
use reqwest::Client;
use thiserror::Error;
use tracing::{debug, error, warn};
use super::config::LangfuseConfig;
use super::types::{IngestionBatch, IngestionEvent, IngestionResponse};
use crate::utils::net::http::create_custom_client;
#[derive(Debug, Error)]
pub enum LangfuseError {
#[error("Configuration error: {0}")]
Configuration(String),
#[error("Network error: {0}")]
Network(#[from] reqwest::Error),
#[error("Authentication failed: {0}")]
Authentication(String),
#[error("API error (status {status}): {message}")]
ApiError { status: u16, message: String },
#[error("Serialization error: {0}")]
Serialization(#[from] serde_json::Error),
#[error("Langfuse client is disabled")]
Disabled,
}
const DEFAULT_TIMEOUT_SECS: u64 = 30;
#[derive(Debug, Clone)]
pub struct LangfuseClient {
client: Arc<Client>,
config: LangfuseConfig,
}
impl LangfuseClient {
pub fn new(config: LangfuseConfig) -> Result<Self, LangfuseError> {
if !config.is_valid() {
if !config.enabled {
return Err(LangfuseError::Disabled);
}
return Err(LangfuseError::Configuration(
"Missing public_key or secret_key".to_string(),
));
}
let client = create_custom_client(Duration::from_secs(DEFAULT_TIMEOUT_SECS))?;
Ok(Self {
client: Arc::new(client),
config,
})
}
pub fn from_env() -> Result<Self, LangfuseError> {
Self::new(LangfuseConfig::from_env())
}
pub fn is_debug(&self) -> bool {
self.config.debug
}
pub fn config(&self) -> &LangfuseConfig {
&self.config
}
pub async fn ingest(&self, batch: IngestionBatch) -> Result<IngestionResponse, LangfuseError> {
if batch.is_empty() {
return Ok(IngestionResponse {
successes: Vec::new(),
errors: Vec::new(),
});
}
if self.config.debug {
debug!("Langfuse debug mode - would send {} events", batch.len());
for event in &batch.batch {
debug!("Event: {:?}", event);
}
return Ok(IngestionResponse {
successes: batch
.batch
.iter()
.map(|e| super::types::IngestionSuccess {
id: e.event_id().to_string(),
status: 200,
})
.collect(),
errors: Vec::new(),
});
}
let url = self.config.ingestion_endpoint();
let auth_header = self.config.auth_header().ok_or_else(|| {
LangfuseError::Configuration("Missing authentication credentials".to_string())
})?;
debug!("Sending {} events to Langfuse: {}", batch.len(), url);
let response = self
.client
.post(&url)
.header("Authorization", auth_header)
.header("Content-Type", "application/json")
.json(&batch)
.send()
.await?;
let status = response.status();
if status.is_success() {
let result: IngestionResponse = response.json().await?;
debug!(
"Langfuse ingestion complete: {} successes, {} errors",
result.successes.len(),
result.errors.len()
);
Ok(result)
} else if status.as_u16() == 401 || status.as_u16() == 403 {
let message = response.text().await.unwrap_or_default();
error!("Langfuse authentication failed: {}", message);
Err(LangfuseError::Authentication(message))
} else {
let message = response.text().await.unwrap_or_default();
warn!("Langfuse API error ({}): {}", status.as_u16(), message);
Err(LangfuseError::ApiError {
status: status.as_u16(),
message,
})
}
}
pub async fn send_event(&self, event: IngestionEvent) -> Result<(), LangfuseError> {
let mut batch = IngestionBatch::new();
batch.add(event);
let response = self.ingest(batch).await?;
if !response.errors.is_empty() {
let error = &response.errors[0];
return Err(LangfuseError::ApiError {
status: error.status,
message: error
.message
.clone()
.or_else(|| error.error.clone())
.unwrap_or_else(|| "Unknown error".to_string()),
});
}
Ok(())
}
pub async fn health_check(&self) -> Result<bool, LangfuseError> {
if !self.config.is_valid() {
return Ok(false);
}
let batch = IngestionBatch::new();
match self.ingest(batch).await {
Ok(_) => Ok(true),
Err(LangfuseError::Authentication(_)) => Ok(false),
Err(e) => Err(e),
}
}
}
pub struct BatchSender {
client: LangfuseClient,
batch: parking_lot::Mutex<IngestionBatch>,
batch_size: usize,
}
impl BatchSender {
pub fn new(client: LangfuseClient) -> Self {
let batch_size = client.config.batch_size;
Self {
client,
batch: parking_lot::Mutex::new(IngestionBatch::new()),
batch_size,
}
}
pub fn add(&self, event: IngestionEvent) -> bool {
let mut batch = self.batch.lock();
batch.add(event);
batch.len() >= self.batch_size
}
pub async fn flush(&self) -> Result<IngestionResponse, LangfuseError> {
let batch = {
let mut batch = self.batch.lock();
if batch.is_empty() {
return Ok(IngestionResponse {
successes: Vec::new(),
errors: Vec::new(),
});
}
std::mem::take(&mut *batch)
};
self.client.ingest(batch).await
}
pub fn pending_count(&self) -> usize {
self.batch.lock().len()
}
pub fn is_debug(&self) -> bool {
self.client.is_debug()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::integrations::langfuse::types::{Generation, Trace};
fn test_config() -> LangfuseConfig {
LangfuseConfig {
public_key: Some("pk-test".to_string()),
secret_key: Some("sk-test".to_string()),
host: "https://cloud.langfuse.com".to_string(),
enabled: true,
batch_size: 10,
flush_interval_ms: 1000,
debug: true, release: None,
}
}
#[test]
fn test_client_creation() {
let config = test_config();
let client = LangfuseClient::new(config);
assert!(client.is_ok());
}
#[test]
fn test_client_disabled() {
let mut config = test_config();
config.enabled = false;
let client = LangfuseClient::new(config);
assert!(matches!(client, Err(LangfuseError::Disabled)));
}
#[test]
fn test_client_missing_credentials() {
let config = LangfuseConfig::default();
let client = LangfuseClient::new(config);
assert!(matches!(client, Err(LangfuseError::Configuration(_))));
}
#[test]
fn test_client_debug_mode() {
let config = test_config();
let client = LangfuseClient::new(config).unwrap();
assert!(client.is_debug());
}
#[tokio::test]
async fn test_ingest_empty_batch() {
let config = test_config();
let client = LangfuseClient::new(config).unwrap();
let batch = IngestionBatch::new();
let result = client.ingest(batch).await;
assert!(result.is_ok());
let response = result.unwrap();
assert!(response.successes.is_empty());
assert!(response.errors.is_empty());
}
#[tokio::test]
async fn test_ingest_debug_mode() {
let config = test_config();
let client = LangfuseClient::new(config).unwrap();
let mut batch = IngestionBatch::new();
batch.add(IngestionEvent::trace_create(Trace::new().name("test")));
let result = client.ingest(batch).await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.successes.len(), 1);
assert!(response.errors.is_empty());
}
#[test]
fn test_batch_sender_creation() {
let config = test_config();
let client = LangfuseClient::new(config).unwrap();
let sender = BatchSender::new(client);
assert_eq!(sender.pending_count(), 0);
}
#[test]
fn test_batch_sender_add() {
let config = test_config();
let client = LangfuseClient::new(config).unwrap();
let sender = BatchSender::new(client);
let trace = Trace::new().name("test");
let should_flush = sender.add(IngestionEvent::trace_create(trace));
assert!(!should_flush);
assert_eq!(sender.pending_count(), 1);
}
#[test]
fn test_batch_sender_triggers_flush() {
let mut config = test_config();
config.batch_size = 2;
let client = LangfuseClient::new(config).unwrap();
let sender = BatchSender::new(client);
sender.add(IngestionEvent::trace_create(Trace::new()));
let should_flush = sender.add(IngestionEvent::trace_create(Trace::new()));
assert!(should_flush);
assert_eq!(sender.pending_count(), 2);
}
#[tokio::test]
async fn test_batch_sender_flush() {
let config = test_config();
let client = LangfuseClient::new(config).unwrap();
let sender = BatchSender::new(client);
sender.add(IngestionEvent::trace_create(Trace::new().name("test")));
sender.add(IngestionEvent::generation_create(
Generation::new("trace-id").model("gpt-4"),
));
let result = sender.flush().await;
assert!(result.is_ok());
assert_eq!(sender.pending_count(), 0);
}
#[tokio::test]
async fn test_batch_sender_flush_empty() {
let config = test_config();
let client = LangfuseClient::new(config).unwrap();
let sender = BatchSender::new(client);
let result = sender.flush().await;
assert!(result.is_ok());
}
#[test]
fn test_error_display() {
let error = LangfuseError::Configuration("test error".to_string());
assert_eq!(error.to_string(), "Configuration error: test error");
let error = LangfuseError::ApiError {
status: 500,
message: "Server error".to_string(),
};
assert_eq!(error.to_string(), "API error (status 500): Server error");
}
}