use async_trait::async_trait;
use foxtive_worker::{BatchHandler, BatchConfig, MessageBatch, BatchProcessor};
use std::sync::Arc;
use std::time::Duration;
struct DatabaseBatchHandler;
#[async_trait]
impl BatchHandler for DatabaseBatchHandler {
async fn process_batch(&self, batch: MessageBatch<serde_json::Value>) -> Result<(), foxtive_worker::WorkerError> {
println!("\n📦 Processing batch: {}", batch.id);
println!(" Messages: {}", batch.len());
println!(" Age: {:?}", batch.age());
println!(" Performing bulk insert...");
tokio::time::sleep(Duration::from_millis(100)).await;
for (idx, msg) in batch.messages.iter().enumerate() {
println!(" [{}] Message {}: {:?}", idx + 1, msg.message.id, msg.message.payload);
}
println!(" ✅ Batch completed successfully");
Ok(())
}
fn max_batch_size(&self) -> usize {
10 }
fn max_batch_age(&self) -> Duration {
Duration::from_secs(5) }
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
println!("=== Batch Processing Example ===\n");
println!("Creating batch handler...");
let handler = Arc::new(DatabaseBatchHandler);
println!("✅ Batch handler created\n");
println!("Configuring batch processor...");
let config = BatchConfig::default()
.with_batch_size(5) .with_flush_interval(Duration::from_secs(3)) .wait_for_full_batch(false) .with_processing_timeout(Duration::from_secs(30));
println!(" Batch size: {}", config.batch_size);
println!(" Flush interval: {:?}", config.flush_interval);
println!(" Wait for full batch: {}", config.wait_for_full_batch);
println!(" Processing timeout: {:?}", config.processing_timeout);
println!("✅ Configuration complete\n");
println!("Starting batch processor...");
let mut processor = BatchProcessor::new(handler, config);
processor.start().await?;
println!("✅ Batch processor started\n");
println!("Simulating message arrival...\n");
use foxtive_worker::{Message, MessageMetadata, ReceivedMessage};
#[derive(Debug)]
struct MockAckHandle;
#[async_trait]
impl foxtive_worker::message::AckHandle for MockAckHandle {
async fn ack(&self) -> Result<(), foxtive_worker::WorkerError> { Ok(()) }
async fn nack(&self, _requeue: bool) -> Result<(), foxtive_worker::WorkerError> { Ok(()) }
}
for i in 1..=12 {
let message = ReceivedMessage::new(
Message {
id: format!("msg-{}", i),
payload: serde_json::json!({
"user_id": i,
"action": "purchase",
"amount": i * 10
}),
metadata: MessageMetadata::new("orders"),
},
Arc::new(MockAckHandle),
);
println!("📨 Enqueuing message {}", i);
processor.enqueue(message).await?;
tokio::time::sleep(Duration::from_millis(500)).await;
}
println!("\n⏳ Waiting for final batch to flush...");
tokio::time::sleep(Duration::from_secs(5)).await;
println!("\n🛑 Shutting down batch processor...");
processor.shutdown().await?;
println!("✅ Shutdown complete");
println!("\n=== Example Complete ===");
println!("\nKey takeaways:");
println!("1. Messages are automatically assembled into batches");
println!("2. Batches flush when they reach batch_size OR flush_interval");
println!("3. All messages in a batch are processed together");
println!("4. Partial batches are flushed on timeout or shutdown");
println!("5. Batch processing is more efficient for bulk operations");
Ok(())
}