foxtive_worker/
builder.rs1use std::sync::Arc;
2
3use crate::error::{WorkerError, WorkerResult};
4use crate::message::ReceivedMessage;
5use crate::metrics::{NoOpMetrics, WorkerMetrics};
6use crate::middleware::{MessageHandler, Middleware};
7use crate::pool::WorkerPool;
8use crate::strategies::LoadBalancingStrategy;
9use crate::worker::Worker; pub struct WorkerPoolBuilder {
41 name: String,
42 strategy: LoadBalancingStrategy,
43 concurrency_limit: usize,
44 workers: Vec<Arc<dyn Worker>>,
45 middlewares: Vec<Arc<dyn Middleware>>,
46 metrics_collector: Option<Arc<dyn WorkerMetrics>>,
47}
48
49impl WorkerPoolBuilder {
50 pub fn new(name: impl Into<String>) -> Self {
52 Self {
53 name: name.into(),
54 strategy: LoadBalancingStrategy::default(),
55 concurrency_limit: 1000,
56 workers: Vec::new(),
57 middlewares: Vec::new(),
58 metrics_collector: None,
59 }
60 }
61
62 pub fn with_strategy(mut self, strategy: LoadBalancingStrategy) -> Self {
67 self.strategy = strategy;
68 self
69 }
70
71 pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
78 self.concurrency_limit = limit;
79 self
80 }
81
82 pub fn with_metrics_collector(mut self, collector: Arc<dyn WorkerMetrics>) -> Self {
87 self.metrics_collector = Some(collector);
88 self
89 }
90
91 pub fn add_worker<W: Worker + 'static>(mut self, worker: W) -> Self {
96 self.workers.push(Arc::new(worker));
97 self
98 }
99
100 pub fn add_boxed_worker(mut self, worker: Box<dyn Worker>) -> Self {
107 self.workers.push(worker.into());
108 self
109 }
110
111 pub fn add_arc_worker(mut self, worker: Arc<dyn Worker>) -> Self {
118 self.workers.push(worker);
119 self
120 }
121
122 pub fn add_workers<W: Worker + 'static>(mut self, workers: Vec<W>) -> Self {
127 for worker in workers {
128 self.workers.push(Arc::new(worker));
129 }
130 self
131 }
132
133 pub fn with_middleware<M: Middleware + 'static>(mut self, middleware: M) -> Self {
141 self.middlewares.push(Arc::new(middleware));
142 self
143 }
144
145 pub fn with_middlewares(mut self, middlewares: Vec<Arc<dyn Middleware>>) -> Self {
147 self.middlewares.extend(middlewares);
148 self
149 }
150
151 pub fn build(self) -> WorkerResult<WorkerPool> {
160 if self.workers.is_empty() {
161 return Err(WorkerError::ConfigError(
162 "Cannot build worker pool without any workers".to_string(),
163 ));
164 }
165
166 let metrics_collector = self
167 .metrics_collector
168 .unwrap_or_else(|| Arc::new(NoOpMetrics));
169
170 let mut pool = WorkerPool::with_concurrency(
171 &self.name,
172 self.strategy,
173 self.concurrency_limit,
174 metrics_collector,
175 );
176
177 for worker in self.workers {
179 pool.add_worker(worker);
180 }
181
182 if !self.middlewares.is_empty() {
184 pool = pool.with_middlewares(self.middlewares);
185 }
186
187 Ok(pool)
188 }
189
190 pub fn build_allow_empty(self) -> WorkerPool {
194 let metrics_collector = self
195 .metrics_collector
196 .unwrap_or_else(|| Arc::new(NoOpMetrics));
197
198 let mut pool = WorkerPool::with_concurrency(
199 &self.name,
200 self.strategy,
201 self.concurrency_limit,
202 metrics_collector,
203 );
204
205 for worker in self.workers {
206 pool.add_worker(worker);
207 }
208
209 pool
210 }
211}
212
213#[allow(dead_code)]
216struct PlaceholderHandler;
217
218#[async_trait::async_trait]
219impl MessageHandler for PlaceholderHandler {
220 async fn handle(
221 &self,
222 _message: ReceivedMessage<serde_json::Value>,
223 ) -> Result<crate::middleware::MiddlewareResult, WorkerError> {
224 Err(WorkerError::ProcessingFailed(
227 "PlaceholderHandler should not be invoked".to_string(),
228 ))
229 }
230}
231
232#[allow(dead_code)]
234struct ArcWrapper(Box<dyn MessageHandler>);
235
236#[async_trait::async_trait]
237impl MessageHandler for ArcWrapper {
238 async fn handle(
239 &self,
240 message: ReceivedMessage<serde_json::Value>,
241 ) -> Result<crate::middleware::MiddlewareResult, WorkerError> {
242 self.0.handle(message).await
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249 use crate::message::{AckHandle, ReceivedMessage};
250 use async_trait::async_trait;
251 use std::time::Instant;
252
253 #[derive(Debug)]
254 #[allow(unused)]
255 struct MockAckHandle;
256
257 #[async_trait]
258 impl AckHandle for MockAckHandle {
259 async fn ack(&self) -> WorkerResult<()> {
260 Ok(())
261 }
262
263 async fn nack(&self, _requeue: bool) -> WorkerResult<()> {
264 Ok(())
265 }
266 }
267
268 struct TestWorker {
269 id: String,
270 }
271
272 impl TestWorker {
273 fn new(id: &str) -> Self {
274 Self { id: id.to_string() }
275 }
276 }
277
278 #[async_trait]
279 impl Worker for TestWorker {
280 fn id(&self) -> &str {
281 &self.id
282 }
283
284 async fn process(&self, _message: ReceivedMessage<serde_json::Value>) -> WorkerResult<()> {
285 Ok(())
286 }
287 }
288
289 #[test]
290 fn test_builder_creation() {
291 let builder = WorkerPoolBuilder::new("test-pool");
292 assert_eq!(builder.name, "test-pool");
293 }
294
295 #[test]
296 fn test_builder_with_strategy() {
297 let builder =
298 WorkerPoolBuilder::new("test-pool").with_strategy(LoadBalancingStrategy::Random);
299 assert!(matches!(builder.strategy, LoadBalancingStrategy::Random));
300 }
301
302 #[test]
303 fn test_builder_with_concurrency_limit() {
304 let builder = WorkerPoolBuilder::new("test-pool").with_concurrency_limit(50);
305 assert_eq!(builder.concurrency_limit, 50);
306 }
307
308 #[test]
309 fn test_builder_add_worker() {
310 let builder = WorkerPoolBuilder::new("test-pool").add_worker(TestWorker::new("worker-1"));
311 assert_eq!(builder.workers.len(), 1);
312 }
313
314 #[test]
315 fn test_builder_add_multiple_workers() {
316 let workers = vec![
317 TestWorker::new("worker-1"),
318 TestWorker::new("worker-2"),
319 TestWorker::new("worker-3"),
320 ];
321
322 let builder = WorkerPoolBuilder::new("test-pool").add_workers(workers);
323 assert_eq!(builder.workers.len(), 3);
324 }
325
326 #[test]
327 fn test_builder_build_success() {
328 let result = WorkerPoolBuilder::new("test-pool")
329 .add_worker(TestWorker::new("worker-1"))
330 .add_worker(TestWorker::new("worker-2"))
331 .build();
332
333 assert!(result.is_ok());
334 let pool = result.unwrap();
335 assert_eq!(pool.worker_count(), 2);
336 assert_eq!(pool.name(), "test-pool");
337 }
338
339 #[test]
340 fn test_builder_build_no_workers_error() {
341 let result = WorkerPoolBuilder::new("test-pool").build();
342
343 assert!(result.is_err());
344 match result.unwrap_err() {
345 WorkerError::ConfigError(msg) => {
346 assert!(msg.contains("without any workers"));
347 }
348 _ => panic!("Expected ConfigError"),
349 }
350 }
351
352 #[test]
353 fn test_builder_build_allow_empty() {
354 let pool = WorkerPoolBuilder::new("test-pool").build_allow_empty();
355
356 assert_eq!(pool.worker_count(), 0);
357 assert_eq!(pool.name(), "test-pool");
358 }
359
360 #[test]
361 fn test_builder_chaining() {
362 let result = WorkerPoolBuilder::new("test-pool")
363 .with_strategy(LoadBalancingStrategy::LeastLoaded)
364 .with_concurrency_limit(100)
365 .add_worker(TestWorker::new("worker-1"))
366 .add_worker(TestWorker::new("worker-2"))
367 .build();
368
369 assert!(result.is_ok());
370 let pool = result.unwrap();
371 assert_eq!(pool.worker_count(), 2);
372 }
373
374 #[test]
375 fn test_builder_with_metrics_collector() {
376 struct MockMetrics;
377 impl WorkerMetrics for MockMetrics {
378 fn record_message_received(&self, _worker_id: &str, _queue_name: &str) {}
379 fn record_message_processed(
380 &self,
381 _worker_id: &str,
382 _queue_name: &str,
383 _start_time: Instant,
384 ) {
385 }
386 fn record_message_failed(
387 &self,
388 _worker_id: &str,
389 _queue_name: &str,
390 _error_type: &str,
391 _start_time: Instant,
392 ) {
393 }
394 fn record_message_retried(&self, _worker_id: &str, _queue_name: &str, _attempt: u32) {}
395 fn record_message_retries_exhausted(&self, _worker_id: &str, _queue_name: &str) {}
396 fn record_message_sent_to_dlq(&self, _queue_name: &str, _is_poison_pill: bool) {}
397 fn record_active_workers(&self, _count: usize) {}
398 fn record_in_flight_messages(&self, _count: usize) {}
399 }
400
401 let metrics_collector = Arc::new(MockMetrics);
402 let pool = WorkerPoolBuilder::new("test-pool")
403 .add_worker(TestWorker::new("worker-1"))
404 .with_metrics_collector(metrics_collector.clone())
405 .build()
406 .unwrap();
407
408 assert_eq!(pool.worker_count(), 1);
412 }
413}