foxtive_worker/
builder.rs1use std::sync::Arc;
2
3use crate::error::{WorkerError, WorkerResult};
4use crate::message::ReceivedMessage;
5use crate::middleware::{MessageHandler, Middleware};
6use crate::pool::WorkerPool;
7use crate::strategies::LoadBalancingStrategy;
8use crate::worker::Worker;
9use crate::metrics::{WorkerMetrics, NoOpMetrics}; pub struct WorkerPoolBuilder {
41 name: String,
42 strategy: LoadBalancingStrategy,
43 concurrency_limit: usize,
44 workers: Vec<Arc<dyn Worker>>,
45 middlewares: Vec<Arc<dyn Middleware>>,
46 metrics_collector: Option<Arc<dyn WorkerMetrics>>,
47}
48
49impl WorkerPoolBuilder {
50 pub fn new(name: impl Into<String>) -> Self {
52 Self {
53 name: name.into(),
54 strategy: LoadBalancingStrategy::default(),
55 concurrency_limit: 1000,
56 workers: Vec::new(),
57 middlewares: Vec::new(),
58 metrics_collector: None,
59 }
60 }
61
62 pub fn with_strategy(mut self, strategy: LoadBalancingStrategy) -> Self {
67 self.strategy = strategy;
68 self
69 }
70
71 pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
78 self.concurrency_limit = limit;
79 self
80 }
81
82 pub fn with_metrics_collector(mut self, collector: Arc<dyn WorkerMetrics>) -> Self {
87 self.metrics_collector = Some(collector);
88 self
89 }
90
91 pub fn add_worker<W: Worker + 'static>(mut self, worker: W) -> Self {
96 self.workers.push(Arc::new(worker));
97 self
98 }
99
100 pub fn add_boxed_worker(mut self, worker: Box<dyn Worker>) -> Self {
107 self.workers.push(worker.into());
108 self
109 }
110
111 pub fn add_arc_worker(mut self, worker: Arc<dyn Worker>) -> Self {
118 self.workers.push(worker);
119 self
120 }
121
122 pub fn add_workers<W: Worker + 'static>(mut self, workers: Vec<W>) -> Self {
127 for worker in workers {
128 self.workers.push(Arc::new(worker));
129 }
130 self
131 }
132
133 pub fn with_middleware<M: Middleware + 'static>(mut self, middleware: M) -> Self {
141 self.middlewares.push(Arc::new(middleware));
142 self
143 }
144
145 pub fn with_middlewares(mut self, middlewares: Vec<Arc<dyn Middleware>>) -> Self {
147 self.middlewares.extend(middlewares);
148 self
149 }
150
151 pub fn build(self) -> WorkerResult<WorkerPool> {
160 if self.workers.is_empty() {
161 return Err(WorkerError::ConfigError(
162 "Cannot build worker pool without any workers".to_string(),
163 ));
164 }
165
166 let metrics_collector = self.metrics_collector.unwrap_or_else(|| Arc::new(NoOpMetrics));
167
168 let mut pool = WorkerPool::with_concurrency(
169 &self.name,
170 self.strategy,
171 self.concurrency_limit,
172 metrics_collector,
173 );
174
175 for worker in self.workers {
177 pool.add_worker(worker);
178 }
179
180 if !self.middlewares.is_empty() {
182 pool = pool.with_middlewares(self.middlewares);
183 }
184
185 Ok(pool)
186 }
187
188 pub fn build_allow_empty(self) -> WorkerPool {
192 let metrics_collector = self.metrics_collector.unwrap_or_else(|| Arc::new(NoOpMetrics));
193
194 let mut pool = WorkerPool::with_concurrency(
195 &self.name,
196 self.strategy,
197 self.concurrency_limit,
198 metrics_collector,
199 );
200
201 for worker in self.workers {
202 pool.add_worker(worker);
203 }
204
205 pool
206 }
207}
208
209#[allow(dead_code)]
212struct PlaceholderHandler;
213
214#[async_trait::async_trait]
215impl MessageHandler for PlaceholderHandler {
216 async fn handle(&self, _message: ReceivedMessage<serde_json::Value>) -> WorkerResult<()> {
217 Err(WorkerError::ProcessingFailed(
220 "PlaceholderHandler should not be invoked".to_string()
221 ))
222 }
223}
224
225#[allow(dead_code)]
227struct ArcWrapper(Box<dyn MessageHandler>);
228
229#[async_trait::async_trait]
230impl MessageHandler for ArcWrapper {
231 async fn handle(&self, message: ReceivedMessage<serde_json::Value>) -> WorkerResult<()> {
232 self.0.handle(message).await
233 }
234}
235
236#[cfg(test)]
237mod tests {
238 use super::*;
239 use crate::message::{ReceivedMessage, AckHandle};
240 use async_trait::async_trait;
241 use std::time::Instant;
242
243 #[derive(Debug)]
244 #[allow(unused)]
245 struct MockAckHandle;
246
247 #[async_trait]
248 impl AckHandle for MockAckHandle {
249 async fn ack(&self) -> WorkerResult<()> {
250 Ok(())
251 }
252
253 async fn nack(&self, _requeue: bool) -> WorkerResult<()> {
254 Ok(())
255 }
256 }
257
258 struct TestWorker {
259 id: String,
260 }
261
262 impl TestWorker {
263 fn new(id: &str) -> Self {
264 Self {
265 id: id.to_string(),
266 }
267 }
268 }
269
270 #[async_trait]
271 impl Worker for TestWorker {
272 fn id(&self) -> &str {
273 &self.id
274 }
275
276 async fn process(&self, _message: ReceivedMessage<serde_json::Value>) -> WorkerResult<()> {
277 Ok(())
278 }
279 }
280
281 #[test]
282 fn test_builder_creation() {
283 let builder = WorkerPoolBuilder::new("test-pool");
284 assert_eq!(builder.name, "test-pool");
285 }
286
287 #[test]
288 fn test_builder_with_strategy() {
289 let builder = WorkerPoolBuilder::new("test-pool")
290 .with_strategy(LoadBalancingStrategy::Random);
291 assert!(matches!(builder.strategy, LoadBalancingStrategy::Random));
292 }
293
294 #[test]
295 fn test_builder_with_concurrency_limit() {
296 let builder = WorkerPoolBuilder::new("test-pool")
297 .with_concurrency_limit(50);
298 assert_eq!(builder.concurrency_limit, 50);
299 }
300
301 #[test]
302 fn test_builder_add_worker() {
303 let builder = WorkerPoolBuilder::new("test-pool")
304 .add_worker(TestWorker::new("worker-1"));
305 assert_eq!(builder.workers.len(), 1);
306 }
307
308 #[test]
309 fn test_builder_add_multiple_workers() {
310 let workers = vec![
311 TestWorker::new("worker-1"),
312 TestWorker::new("worker-2"),
313 TestWorker::new("worker-3"),
314 ];
315
316 let builder = WorkerPoolBuilder::new("test-pool")
317 .add_workers(workers);
318 assert_eq!(builder.workers.len(), 3);
319 }
320
321 #[test]
322 fn test_builder_build_success() {
323 let result = WorkerPoolBuilder::new("test-pool")
324 .add_worker(TestWorker::new("worker-1"))
325 .add_worker(TestWorker::new("worker-2"))
326 .build();
327
328 assert!(result.is_ok());
329 let pool = result.unwrap();
330 assert_eq!(pool.worker_count(), 2);
331 assert_eq!(pool.name(), "test-pool");
332 }
333
334 #[test]
335 fn test_builder_build_no_workers_error() {
336 let result = WorkerPoolBuilder::new("test-pool").build();
337
338 assert!(result.is_err());
339 match result.unwrap_err() {
340 WorkerError::ConfigError(msg) => {
341 assert!(msg.contains("without any workers"));
342 }
343 _ => panic!("Expected ConfigError"),
344 }
345 }
346
347 #[test]
348 fn test_builder_build_allow_empty() {
349 let pool = WorkerPoolBuilder::new("test-pool")
350 .build_allow_empty();
351
352 assert_eq!(pool.worker_count(), 0);
353 assert_eq!(pool.name(), "test-pool");
354 }
355
356 #[test]
357 fn test_builder_chaining() {
358 let result = WorkerPoolBuilder::new("test-pool")
359 .with_strategy(LoadBalancingStrategy::LeastLoaded)
360 .with_concurrency_limit(100)
361 .add_worker(TestWorker::new("worker-1"))
362 .add_worker(TestWorker::new("worker-2"))
363 .build();
364
365 assert!(result.is_ok());
366 let pool = result.unwrap();
367 assert_eq!(pool.worker_count(), 2);
368 }
369
370 #[test]
371 fn test_builder_with_metrics_collector() {
372 struct MockMetrics;
373 impl WorkerMetrics for MockMetrics {
374 fn record_message_received(&self, _worker_id: &str, _queue_name: &str) {}
375 fn record_message_processed(&self, _worker_id: &str, _queue_name: &str, _start_time: Instant) {}
376 fn record_message_failed(&self, _worker_id: &str, _queue_name: &str, _error_type: &str, _start_time: Instant) {}
377 fn record_message_retried(&self, _worker_id: &str, _queue_name: &str, _attempt: u32) {}
378 fn record_message_retries_exhausted(&self, _worker_id: &str, _queue_name: &str) {}
379 fn record_message_sent_to_dlq(&self, _queue_name: &str, _is_poison_pill: bool) {}
380 fn record_active_workers(&self, _count: usize) {}
381 fn record_in_flight_messages(&self, _count: usize) {}
382 }
383
384 let metrics_collector = Arc::new(MockMetrics);
385 let pool = WorkerPoolBuilder::new("test-pool")
386 .add_worker(TestWorker::new("worker-1"))
387 .with_metrics_collector(metrics_collector.clone())
388 .build()
389 .unwrap();
390
391 assert_eq!(pool.worker_count(), 1);
395 }
396}