worker_pool/
worker_pool.rs

1use actor12::prelude::*;
2use actor12::{Multi, MpscChannel, Call, spawn};
3use std::time::Duration;
4use std::future::Future;
5use tokio::time::sleep;
6
7// Worker actor that processes tasks
8pub struct Worker {
9    id: u32,
10    tasks_processed: u32,
11}
12
13// Messages
14#[derive(Debug)]
15pub struct Task {
16    pub id: u32,
17    pub data: String,
18    pub processing_time_ms: u64,
19}
20
21#[derive(Debug)]
22pub struct TaskResult {
23    pub task_id: u32,
24    pub worker_id: u32,
25    pub result: String,
26}
27
28#[derive(Debug)]
29pub struct GetWorkerStats;
30
31#[derive(Debug)]
32pub struct WorkerStats {
33    pub worker_id: u32,
34    pub tasks_processed: u32,
35}
36
37// Worker implementation
38impl Actor for Worker {
39    type Spec = u32; // worker_id
40    type Message = Multi<Self>;
41    type Channel = MpscChannel<Self::Message>;
42    type Cancel = ();
43    type State = ();
44
45    fn state(_spec: &Self::Spec) -> Self::State {}
46
47    fn init(ctx: Init<'_, Self>) -> impl Future<Output = Result<Self, Self::Cancel>> + Send + 'static {
48        let id = ctx.spec;
49        async move {
50            println!("Worker {} initialized", id);
51            Ok(Worker {
52                id,
53                tasks_processed: 0,
54            })
55        }
56    }
57}
58
59impl Handler<Task> for Worker {
60    type Reply = Result<TaskResult, anyhow::Error>;
61
62    async fn handle(&mut self, _ctx: Call<'_, Self, Self::Reply>, msg: Task) -> Self::Reply {
63        println!("Worker {} processing task {} ({}ms)", self.id, msg.id, msg.processing_time_ms);
64        
65        // Simulate work
66        sleep(Duration::from_millis(msg.processing_time_ms)).await;
67        
68        self.tasks_processed += 1;
69        let result = format!("Processed '{}' by worker {}", msg.data, self.id);
70        
71        println!("Worker {} completed task {} -> '{}'", self.id, msg.id, result);
72        
73        Ok(TaskResult {
74            task_id: msg.id,
75            worker_id: self.id,
76            result,
77        })
78    }
79}
80
81impl Handler<GetWorkerStats> for Worker {
82    type Reply = Result<WorkerStats, anyhow::Error>;
83
84    async fn handle(&mut self, _ctx: Call<'_, Self, Self::Reply>, _msg: GetWorkerStats) -> Self::Reply {
85        Ok(WorkerStats {
86            worker_id: self.id,
87            tasks_processed: self.tasks_processed,
88        })
89    }
90}
91
92#[tokio::main]
93async fn main() -> anyhow::Result<()> {
94    println!("=== Worker Pool Example ===\n");
95    
96    // Create workers
97    let worker1 = spawn::<Worker>(1);
98    let worker2 = spawn::<Worker>(2);
99    let worker3 = spawn::<Worker>(3);
100    
101    // Submit several tasks to different workers
102    println!("Submitting tasks...\n");
103    
104    let tasks = vec![
105        ("Calculate prime numbers", 500),
106        ("Process image", 800),
107        ("Analyze data", 300),
108        ("Generate report", 1000),
109        ("Send email", 200),
110        ("Update database", 600),
111    ];
112    
113    // Submit tasks concurrently using tokio::spawn
114    let mut handles = vec![];
115    
116    for (i, (task_name, duration)) in tasks.into_iter().enumerate() {
117        let worker = match i % 3 {
118            0 => worker1.clone(),
119            1 => worker2.clone(),
120            _ => worker3.clone(),
121        };
122        
123        let handle = tokio::spawn(async move {
124            let task = Task {
125                id: (i + 1) as u32,
126                data: format!("{} #{}", task_name, i + 1),
127                processing_time_ms: duration,
128            };
129            
130            worker.ask_dyn(task).await
131        });
132        handles.push(handle);
133    }
134    
135    // Wait for all tasks to complete
136    println!("Waiting for all tasks to complete...\n");
137    for handle in handles {
138        match handle.await? {
139            Ok(result) => println!("✓ Task {} completed: {}", result.task_id, result.result),
140            Err(e) => println!("✗ Task failed: {}", e),
141        }
142    }
143    
144    // Get worker statistics
145    println!("\n=== Worker Statistics ===");
146    for worker in [&worker1, &worker2, &worker3] {
147        match worker.ask_dyn(GetWorkerStats).await {
148            Ok(stats) => println!("Worker {}: {} tasks processed", stats.worker_id, stats.tasks_processed),
149            Err(e) => println!("Failed to get stats: {}", e),
150        }
151    }
152    
153    println!("\n=== All tasks completed ===");
154    
155    // Wait a moment for any remaining output
156    sleep(Duration::from_millis(500)).await;
157    
158    Ok(())
159}