use futures::future::BoxFuture;
use serde::{Deserialize, Serialize};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tito::{
queue::{run_worker, QueueConfig, QueueEvent},
types::DBUuid,
TiKV, TitoError, TitoQueue, WorkerConfig,
};
use tokio::sync::broadcast;
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
struct UserEvent {
user_id: String,
name: String,
email: String,
action: String,
}
#[tokio::main]
async fn main() -> Result<(), TitoError> {
println!("Testing FIFO Queue\n");
let tito_db = TiKV::connect(vec!["127.0.0.1:2379"]).await?;
let queue = Arc::new(TitoQueue::new(tito_db.clone(), QueueConfig::new(1)));
println!("Publishing 5 events...\n");
for i in 1..=5 {
let user_id = DBUuid::new_v4().to_string();
let event = QueueEvent::new(
format!("user:{}", user_id),
UserEvent {
user_id,
name: format!("User {}", i),
email: format!("user{}@example.com", i),
action: "created".to_string(),
},
);
queue.publish(event).await?;
println!("Published event for User {}", i);
}
println!("\nStarting worker...\n");
let events_processed = Arc::new(std::sync::atomic::AtomicU32::new(0));
let events_processed_clone = events_processed.clone();
let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
let handler = move |event: QueueEvent<UserEvent>| {
let counter = events_processed_clone.clone();
Box::pin(async move {
let count = counter.fetch_add(1, Ordering::SeqCst) + 1;
println!(
"[{}] {} - {} ({})",
count, event.payload.action, event.payload.name, event.payload.email,
);
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Ok::<_, TitoError>(())
}) as BoxFuture<'static, Result<(), TitoError>>
};
let worker_handle = run_worker(
queue.clone(),
WorkerConfig {
consumer: String::from("example-consumer"),
partition_range: 0..1,
},
handler,
shutdown_rx,
)
.await;
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
println!("\nShutting down...");
let _ = shutdown_tx.send(());
let _ = worker_handle.await;
let total = events_processed.load(Ordering::SeqCst);
println!("\nProcessed {} events", total);
Ok(())
}