consumer/
consumer.rs

1use std::env;
2
3use postgres_queue::{initialize_database, TaskData, TaskError, TaskRegistry};
4
5async fn send_email_handler(task_id: i32, task_data: TaskData) -> Result<(), TaskError> {
6    let recipient = task_data.get("recipient").unwrap().as_str().unwrap();
7    let subject = task_data.get("subject").unwrap().as_str().unwrap();
8    let body = task_data.get("body").unwrap().as_str().unwrap();
9
10    println!(
11        "[{}] Sending email to {} with subject '{}' and body '{}'",
12        task_id, recipient, subject, body
13    );
14
15    // Simulate sending the email
16    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
17
18    println!("[{}] Email sent to {}", task_id, recipient);
19
20    Ok(())
21}
22
23#[tokio::main]
24async fn main() {
25    let args: Vec<String> = env::args().collect();
26    if args.len() != 2 {
27        eprintln!("Usage: consumer <num_workers>");
28        std::process::exit(1);
29    }
30
31    let num_workers = args[1].parse().expect("Invalid number of tasks");
32    let database_url = "postgresql://postgres:postgres@localhost/queue";
33
34    let pool = postgres_queue::connect(database_url)
35        .await
36        .expect("Failed to connect to the database");
37
38    initialize_database(&pool)
39        .await
40        .expect("Failed to initialize database");
41
42    let mut registry = TaskRegistry::new();
43    registry.register_task("send_email".to_string(), send_email_handler);
44
45    let pool_arc = std::sync::Arc::new(pool);
46
47    // Run the task processor
48    let tasks = registry
49        .run(&pool_arc, num_workers)
50        .await
51        .expect("Failed to run tasks");
52
53    println!("Running {} tasks", tasks.len());
54
55    // Wait for all tasks to complete
56    for task in tasks {
57        task.await.expect("Task failed");
58    }
59
60    println!("All tasks completed.");
61}