use celers_broker_sql::MysqlBroker;
use celers_core::SerializedTask;
use serde_json::json;
use std::time::Duration;
use tokio::time::sleep;
use uuid::Uuid;
#[tokio::main]
#[allow(dead_code)]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.init();
println!("=== CeleRS MySQL Broker: Idempotency Keys Demo ===\n");
let database_url = std::env::var("DATABASE_URL")
.unwrap_or_else(|_| "mysql://root:password@localhost:3306/celers_test".to_string());
let broker = MysqlBroker::new(&database_url).await?;
println!("✓ Connected to MySQL broker\n");
broker.migrate().await?;
println!("✓ Migrations applied\n");
broker.purge_all().await?;
broker.cleanup_expired_idempotency_keys().await?;
println!("--- Demo 1: Payment Processing with Idempotency ---");
demo_payment_processing(&broker).await?;
println!("\n--- Demo 2: Notification Sending ---");
demo_notification_sending(&broker).await?;
println!("\n--- Demo 3: TTL and Expiration ---");
demo_ttl_expiration(&broker).await?;
println!("\n--- Demo 4: Statistics and Monitoring ---");
demo_statistics(&broker).await?;
println!("\n--- Demo 5: Cleanup Expired Keys ---");
demo_cleanup(&broker).await?;
println!("\n=== Demo Complete ===");
Ok(())
}
async fn demo_payment_processing(broker: &MysqlBroker) -> Result<(), Box<dyn std::error::Error>> {
let payment_id = Uuid::new_v4().to_string();
let idempotency_key = format!("payment-{}", payment_id);
let payment_payload = json!({
"amount": 99.99,
"currency": "USD",
"user_id": "user123",
"payment_method": "card_****1234",
"payment_id": payment_id,
"transaction_type": "purchase",
"merchant_id": "merchant_abc"
});
let payment_task = SerializedTask::new(
"process_payment".to_string(),
serde_json::to_vec(&payment_payload)?,
);
println!(" 1. Submitting payment request (first time)...");
let task_id1 = broker
.enqueue_with_idempotency(
payment_task.clone(),
&idempotency_key,
3600, Some(json!({"client_ip": "192.168.1.100", "request_id": "req-001"})),
)
.await?;
println!(" ✓ Task created: {}", task_id1);
println!(" 2. Submitting same payment request (retry)...");
let task_id2 = broker
.enqueue_with_idempotency(
payment_task.clone(),
&idempotency_key,
3600,
Some(json!({"client_ip": "192.168.1.100", "request_id": "req-002"})),
)
.await?;
println!(" ✓ Returned existing task: {}", task_id2);
assert_eq!(
task_id1, task_id2,
"Idempotency check failed: different task IDs returned"
);
println!(
" ✓ Idempotency verified: Same task ID returned ({} == {})",
task_id1, task_id2
);
if let Some(record) = broker
.get_idempotency_record(&idempotency_key, "process_payment")
.await?
{
println!("\n Idempotency Record:");
println!(" - Key: {}", record.idempotency_key);
println!(" - Task ID: {}", record.task_id);
println!(" - Task Name: {}", record.task_name);
println!(" - Created: {}", record.created_at);
println!(" - Expires: {}", record.expires_at);
if let Some(meta) = record.metadata {
println!(" - Metadata: {}", meta);
}
}
Ok(())
}
async fn demo_notification_sending(broker: &MysqlBroker) -> Result<(), Box<dyn std::error::Error>> {
let user_id = "user456";
let notification_type = "order_confirmation";
let order_id = "order-789";
let idempotency_key = format!("{}-{}-{}", user_id, notification_type, order_id);
let notification_payload = json!({
"to": "user@example.com",
"subject": "Your Order Confirmation",
"template": "order_confirmation",
"user_id": user_id,
"order_id": order_id,
"notification_type": notification_type
});
let notification_task = SerializedTask::new(
"send_email".to_string(),
serde_json::to_vec(¬ification_payload)?,
);
println!(" 1. Sending order confirmation email...");
let task_id1 = broker
.enqueue_with_idempotency(
notification_task.clone(),
&idempotency_key,
7200, None,
)
.await?;
println!(" ✓ Email task created: {}", task_id1);
println!(" 2. Retrying email send (simulated network retry)...");
let task_id2 = broker
.enqueue_with_idempotency(notification_task, &idempotency_key, 7200, None)
.await?;
println!(" ✓ Returned existing task: {}", task_id2);
assert_eq!(task_id1, task_id2);
println!(" ✓ Duplicate notification prevented!");
Ok(())
}
async fn demo_ttl_expiration(broker: &MysqlBroker) -> Result<(), Box<dyn std::error::Error>> {
let idempotency_key = "api-call-xyz-123";
let api_payload = json!({
"endpoint": "/api/v1/resource",
"method": "POST",
"payload": {"data": "example"}
});
let api_task = SerializedTask::new(
"call_external_api".to_string(),
serde_json::to_vec(&api_payload)?,
);
println!(" 1. Submitting API call with 3-second TTL...");
let task_id1 = broker
.enqueue_with_idempotency(api_task.clone(), idempotency_key, 3, None)
.await?;
println!(" ✓ Task created: {}", task_id1);
println!(" 2. Trying to submit again (before expiration)...");
let task_id2 = broker
.enqueue_with_idempotency(api_task.clone(), idempotency_key, 3, None)
.await?;
println!(" ✓ Returned existing task: {}", task_id2);
assert_eq!(task_id1, task_id2);
println!(" 3. Waiting for TTL to expire (3 seconds)...");
sleep(Duration::from_secs(4)).await;
let cleaned = broker.cleanup_expired_idempotency_keys().await?;
println!(" ✓ Cleaned up {} expired keys", cleaned);
println!(" 4. Submitting again after expiration...");
let task_id3 = broker
.enqueue_with_idempotency(api_task, idempotency_key, 3, None)
.await?;
println!(" ✓ New task created: {}", task_id3);
assert_ne!(
task_id1, task_id3,
"Expected different task ID after expiration"
);
println!(
" ✓ New task allowed after expiration ({} != {})",
task_id1, task_id3
);
Ok(())
}
async fn demo_statistics(broker: &MysqlBroker) -> Result<(), Box<dyn std::error::Error>> {
println!(" 1. Creating tasks with various idempotency keys...");
for i in 0..5 {
let payload = json!({"batch_id": i});
let task =
SerializedTask::new("batch_operation".to_string(), serde_json::to_vec(&payload)?);
broker
.enqueue_with_idempotency(
task,
&format!("batch-op-{}", i),
3600,
Some(json!({"batch_number": i})),
)
.await?;
}
println!(" ✓ Created 5 tasks with unique idempotency keys");
println!(" 2. Attempting to create duplicates...");
for i in 0..5 {
let payload = json!({"batch_id": i});
let task =
SerializedTask::new("batch_operation".to_string(), serde_json::to_vec(&payload)?);
broker
.enqueue_with_idempotency(task, &format!("batch-op-{}", i), 3600, None)
.await?;
}
println!(" ✓ Duplicate submissions handled (returned existing tasks)");
println!("\n Idempotency Statistics:");
let stats = broker.get_idempotency_statistics().await?;
for stat in stats {
println!(" Task: {}", stat.task_name);
println!(" - Total keys: {}", stat.total_keys);
println!(" - Unique keys: {}", stat.unique_keys);
println!(" - Active keys: {}", stat.active_keys);
println!(" - Expired keys: {}", stat.expired_keys);
if let Some(oldest) = stat.oldest_key {
println!(" - Oldest key: {}", oldest);
}
if let Some(newest) = stat.newest_key {
println!(" - Newest key: {}", newest);
}
}
Ok(())
}
async fn demo_cleanup(broker: &MysqlBroker) -> Result<(), Box<dyn std::error::Error>> {
println!(" 1. Creating tasks with 2-second TTL...");
for i in 0..10 {
let payload = json!({"op_id": i});
let task = SerializedTask::new(
"temporary_operation".to_string(),
serde_json::to_vec(&payload)?,
);
broker
.enqueue_with_idempotency(task, &format!("temp-op-{}", i), 2, None)
.await?;
}
println!(" ✓ Created 10 tasks");
let stats_before = broker.get_idempotency_statistics().await?;
let active_before = stats_before
.iter()
.find(|s| s.task_name == "temporary_operation")
.map(|s| s.active_keys)
.unwrap_or(0);
println!(" 2. Active keys before expiration: {}", active_before);
println!(" 3. Waiting for keys to expire (3 seconds)...");
sleep(Duration::from_secs(3)).await;
println!(" 4. Running cleanup...");
let cleaned = broker.cleanup_expired_idempotency_keys().await?;
println!(" ✓ Cleaned up {} expired keys", cleaned);
let stats_after = broker.get_idempotency_statistics().await?;
let active_after = stats_after
.iter()
.find(|s| s.task_name == "temporary_operation")
.map(|s| s.active_keys)
.unwrap_or(0);
println!(" 5. Active keys after cleanup: {}", active_after);
println!("\n 💡 Tip: In production, run cleanup_expired_idempotency_keys()");
println!(" periodically (e.g., via cron job or scheduled task) to prevent");
println!(" table bloat and maintain optimal performance.");
Ok(())
}