use chrono::Utc;
use langfuse_client_base::models::{IngestionEvent, IngestionEventOneOf, TraceBody};
use langfuse_ergonomic::{BackpressurePolicy, Batcher, ClientBuilder};
use std::time::Duration;
use uuid::Uuid;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
tracing_subscriber::fmt::init();
println!(" Starting comprehensive batch test");
println!("{}", "=".repeat(50));
let client = ClientBuilder::from_env()?.build()?;
println!(" Connected to Langfuse");
let batcher = Batcher::builder()
.client(client)
.max_events(10) .max_bytes(50_000) .flush_interval(Duration::from_secs(2))
.max_retries(3)
.max_queue_size(100)
.backpressure_policy(BackpressurePolicy::Block)
.retry_jitter(true)
.build()
.await;
println!(" Batcher configured with:");
println!(" - Max events per batch: 10");
println!(" - Max bytes per batch: 50,000");
println!(" - Flush interval: 2 seconds");
println!(" - Max retries: 3");
println!(" - Max queue size: 100");
println!(" - Backpressure policy: Block");
println!(" - Retry jitter: Enabled");
let initial_metrics = batcher.metrics();
println!("\n Initial metrics:");
println!(" - Events queued: {}", initial_metrics.queued);
println!(" - Events flushed: {}", initial_metrics.flushed);
println!(" - Events failed: {}", initial_metrics.failed);
println!(" - Events dropped: {}", initial_metrics.dropped);
println!("\n{}", "=".repeat(50));
println!("Creating test events...\n");
let session_id = format!("batch-test-session-{}", Uuid::new_v4());
let user_id = "test-user-batch";
println!(" Session ID: {}", session_id);
println!(" User ID: {}\n", user_id);
let mut trace_ids = Vec::new();
for i in 0..25 {
let trace_id = Uuid::new_v4().to_string();
trace_ids.push(trace_id.clone());
let trace_type = match i % 5 {
0 => "llm-chat",
1 => "api-call",
2 => "database-query",
3 => "background-job",
_ => "user-interaction",
};
let trace_event = IngestionEvent::IngestionEventOneOf(Box::new(IngestionEventOneOf {
id: trace_id.clone(),
timestamp: Utc::now().to_rfc3339(),
r#type: langfuse_client_base::models::ingestion_event_one_of::Type::TraceCreate,
body: Box::new(TraceBody {
id: Some(Some(trace_id.clone())),
timestamp: Some(Some(Utc::now().to_rfc3339())),
name: Some(Some(format!("{}-trace-{}", trace_type, i))),
user_id: Some(Some(user_id.to_string())),
session_id: Some(Some(session_id.clone())),
metadata: Some(Some(serde_json::json!({
"test_type": "comprehensive_batch",
"trace_index": i,
"trace_type": trace_type,
"batch_test": true,
"features_tested": ["batching", "chunking", "retries", "metrics"],
"timestamp": Utc::now().to_rfc3339(),
"test_data": {
"importance": if i < 5 { "critical" } else if i < 15 { "normal" } else { "low" },
"processing_time_ms": 100 + i * 10,
"data_size_bytes": 1024 * (i + 1)
}
}))),
release: Some(Some("v2.0.0".to_string())),
version: Some(Some("2.0.0".to_string())),
tags: Some(Some(vec![
"batch-test".to_string(),
"comprehensive".to_string(),
trace_type.to_string(),
format!(
"priority-{}",
if i < 5 {
"high"
} else if i < 15 {
"medium"
} else {
"low"
}
),
])),
input: Some(Some(serde_json::json!({
"query": format!("Test query for {} #{}", trace_type, i),
"params": {
"index": i,
"type": trace_type,
"batch_size": 10,
"test_mode": true
},
"context": {
"session": session_id.clone(),
"user": user_id,
"environment": "test"
}
}))),
output: Some(Some(serde_json::json!({
"status": "success",
"result": format!("Processed {} request #{}", trace_type, i),
"metrics": {
"duration_ms": 100 + i * 10,
"tokens_used": if trace_type == "llm-chat" { Some(150 + i * 5) } else { None },
"cache_hit": i % 3 == 0
},
"data": {
"response_size": 2048 * (i % 5 + 1),
"items_processed": i * 2 + 1
}
}))),
public: Some(Some(false)),
environment: None,
}),
metadata: None,
}));
batcher.add(trace_event).await?;
if (i + 1) % 5 == 0 {
println!(" Added {} traces (latest: {})", i + 1, trace_type);
let current_metrics = batcher.metrics();
println!(
" Current: queued={}, flushed={}",
current_metrics.queued, current_metrics.flushed
);
}
if i % 3 == 0 {
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
println!("\n{}", "=".repeat(50));
println!(" Checking metrics after adding all events...");
let after_add_metrics = batcher.metrics();
println!(" - Total events queued: {}", after_add_metrics.queued);
println!(" - Events flushed so far: {}", after_add_metrics.flushed);
println!(" - Events failed: {}", after_add_metrics.failed);
println!(" - Events dropped: {}", after_add_metrics.dropped);
println!(" - Total retries: {}", after_add_metrics.retries);
println!("\n Waiting for automatic flush (3 seconds)...");
tokio::time::sleep(Duration::from_secs(4)).await;
let after_auto_flush_metrics = batcher.metrics();
println!(" Metrics after automatic flush:");
println!(
" - Total events queued: {}",
after_auto_flush_metrics.queued
);
println!(" - Events flushed: {}", after_auto_flush_metrics.flushed);
println!(" - Events failed: {}", after_auto_flush_metrics.failed);
println!(" - Events dropped: {}", after_auto_flush_metrics.dropped);
println!(" - Total retries: {}", after_auto_flush_metrics.retries);
println!("\n Performing manual flush...");
let flush_response = batcher.flush().await?;
println!(" Manual flush complete:");
println!(" - Succeeded: {}", flush_response.success_count);
println!(" - Failed: {}", flush_response.failure_count);
if !flush_response.failures.is_empty() {
println!(" Failures detected:");
for failure in &flush_response.failures {
println!(
" - Event {}: {} (retryable: {})",
failure.event_id, failure.message, failure.retryable
);
}
}
println!("\n Adding 5 more traces to test continued operation...");
for i in 25..30 {
let trace_id = Uuid::new_v4().to_string();
let trace_event = IngestionEvent::IngestionEventOneOf(Box::new(IngestionEventOneOf {
id: trace_id.clone(),
timestamp: Utc::now().to_rfc3339(),
r#type: langfuse_client_base::models::ingestion_event_one_of::Type::TraceCreate,
body: Box::new(TraceBody {
id: Some(Some(trace_id.clone())),
timestamp: Some(Some(Utc::now().to_rfc3339())),
name: Some(Some(format!("additional-trace-{}", i))),
user_id: Some(Some(user_id.to_string())),
session_id: Some(Some(session_id.clone())),
metadata: Some(Some(serde_json::json!({
"test_phase": "additional",
"trace_index": i
}))),
tags: Some(Some(vec![
"additional".to_string(),
"post-flush".to_string(),
])),
input: Some(Some(serde_json::json!({ "additional": true, "index": i }))),
output: Some(Some(serde_json::json!({ "processed": true }))),
release: Some(Some("v2.0.0".to_string())),
version: Some(Some("2.0.0".to_string())),
public: Some(Some(false)),
environment: None,
}),
metadata: None,
}));
batcher.add(trace_event).await?;
}
println!(" Added 5 additional traces");
let final_metrics = batcher.metrics();
println!("\n Shutting down batcher...");
let shutdown_response = batcher.shutdown().await?;
println!(" Shutdown complete:");
println!(" - Total succeeded: {}", shutdown_response.success_count);
println!(" - Total failed: {}", shutdown_response.failure_count);
println!("\n Final metrics summary:");
println!(" - Total events queued: {}", final_metrics.queued);
println!(" - Total events flushed: {}", final_metrics.flushed);
println!(" - Total events failed: {}", final_metrics.failed);
println!(" - Total events dropped: {}", final_metrics.dropped);
println!(" - Total retries: {}", final_metrics.retries);
let success_rate = if final_metrics.queued > 0 {
(final_metrics.flushed as f64 / final_metrics.queued as f64) * 100.0
} else {
0.0
};
println!(" - Success rate: {:.2}%", success_rate);
if final_metrics.last_error_ts > 0 {
let error_time = std::time::UNIX_EPOCH + Duration::from_secs(final_metrics.last_error_ts);
if let Ok(error_time) = error_time.duration_since(std::time::UNIX_EPOCH) {
println!(" - Last error: {} seconds ago", error_time.as_secs());
}
}
println!("\n{}", "=".repeat(50));
println!(" Test complete!");
println!("\n View results at:");
println!(
" Session: https://cloud.langfuse.com/sessions/{}",
session_id
);
if !trace_ids.is_empty() {
println!(
" First trace: https://cloud.langfuse.com/trace/{}",
trace_ids[0]
);
println!(
" Last trace: https://cloud.langfuse.com/trace/{}",
trace_ids[trace_ids.len() - 1]
);
}
println!("\n Login with:");
println!(" Email: langfuse@timvw.be");
println!(" URL: https://cloud.langfuse.com");
Ok(())
}