worker_pool/
worker_pool.rs1use actor12::prelude::*;
2use actor12::{Multi, MpscChannel, Call, spawn};
3use std::time::Duration;
4use std::future::Future;
5use tokio::time::sleep;
6
7pub struct Worker {
9 id: u32,
10 tasks_processed: u32,
11}
12
13#[derive(Debug)]
15pub struct Task {
16 pub id: u32,
17 pub data: String,
18 pub processing_time_ms: u64,
19}
20
21#[derive(Debug)]
22pub struct TaskResult {
23 pub task_id: u32,
24 pub worker_id: u32,
25 pub result: String,
26}
27
28#[derive(Debug)]
29pub struct GetWorkerStats;
30
31#[derive(Debug)]
32pub struct WorkerStats {
33 pub worker_id: u32,
34 pub tasks_processed: u32,
35}
36
37impl Actor for Worker {
39 type Spec = u32; type Message = Multi<Self>;
41 type Channel = MpscChannel<Self::Message>;
42 type Cancel = ();
43 type State = ();
44
45 fn state(_spec: &Self::Spec) -> Self::State {}
46
47 fn init(ctx: Init<'_, Self>) -> impl Future<Output = Result<Self, Self::Cancel>> + Send + 'static {
48 let id = ctx.spec;
49 async move {
50 println!("Worker {} initialized", id);
51 Ok(Worker {
52 id,
53 tasks_processed: 0,
54 })
55 }
56 }
57}
58
59impl Handler<Task> for Worker {
60 type Reply = Result<TaskResult, anyhow::Error>;
61
62 async fn handle(&mut self, _ctx: Call<'_, Self, Self::Reply>, msg: Task) -> Self::Reply {
63 println!("Worker {} processing task {} ({}ms)", self.id, msg.id, msg.processing_time_ms);
64
65 sleep(Duration::from_millis(msg.processing_time_ms)).await;
67
68 self.tasks_processed += 1;
69 let result = format!("Processed '{}' by worker {}", msg.data, self.id);
70
71 println!("Worker {} completed task {} -> '{}'", self.id, msg.id, result);
72
73 Ok(TaskResult {
74 task_id: msg.id,
75 worker_id: self.id,
76 result,
77 })
78 }
79}
80
81impl Handler<GetWorkerStats> for Worker {
82 type Reply = Result<WorkerStats, anyhow::Error>;
83
84 async fn handle(&mut self, _ctx: Call<'_, Self, Self::Reply>, _msg: GetWorkerStats) -> Self::Reply {
85 Ok(WorkerStats {
86 worker_id: self.id,
87 tasks_processed: self.tasks_processed,
88 })
89 }
90}
91
92#[tokio::main]
93async fn main() -> anyhow::Result<()> {
94 println!("=== Worker Pool Example ===\n");
95
96 let worker1 = spawn::<Worker>(1);
98 let worker2 = spawn::<Worker>(2);
99 let worker3 = spawn::<Worker>(3);
100
101 println!("Submitting tasks...\n");
103
104 let tasks = vec![
105 ("Calculate prime numbers", 500),
106 ("Process image", 800),
107 ("Analyze data", 300),
108 ("Generate report", 1000),
109 ("Send email", 200),
110 ("Update database", 600),
111 ];
112
113 let mut handles = vec![];
115
116 for (i, (task_name, duration)) in tasks.into_iter().enumerate() {
117 let worker = match i % 3 {
118 0 => worker1.clone(),
119 1 => worker2.clone(),
120 _ => worker3.clone(),
121 };
122
123 let handle = tokio::spawn(async move {
124 let task = Task {
125 id: (i + 1) as u32,
126 data: format!("{} #{}", task_name, i + 1),
127 processing_time_ms: duration,
128 };
129
130 worker.ask_dyn(task).await
131 });
132 handles.push(handle);
133 }
134
135 println!("Waiting for all tasks to complete...\n");
137 for handle in handles {
138 match handle.await? {
139 Ok(result) => println!("✓ Task {} completed: {}", result.task_id, result.result),
140 Err(e) => println!("✗ Task failed: {}", e),
141 }
142 }
143
144 println!("\n=== Worker Statistics ===");
146 for worker in [&worker1, &worker2, &worker3] {
147 match worker.ask_dyn(GetWorkerStats).await {
148 Ok(stats) => println!("Worker {}: {} tasks processed", stats.worker_id, stats.tasks_processed),
149 Err(e) => println!("Failed to get stats: {}", e),
150 }
151 }
152
153 println!("\n=== All tasks completed ===");
154
155 sleep(Duration::from_millis(500)).await;
157
158 Ok(())
159}