Skip to main content

foxtive_worker/
builder.rs

1use std::sync::Arc;
2
3use crate::error::{WorkerError, WorkerResult};
4use crate::message::ReceivedMessage;
5use crate::metrics::{NoOpMetrics, WorkerMetrics};
6use crate::middleware::{MessageHandler, Middleware};
7use crate::pool::WorkerPool;
8use crate::strategies::LoadBalancingStrategy;
9use crate::worker::Worker; // Import NoOpMetrics
10
11/// Builder for configuring and creating worker pools.
12///
13/// Provides a fluent API for setting up worker pools with custom configurations.
14///
15/// # Example
16/// ```rust,no_run
17/// use foxtive_worker::builder::WorkerPoolBuilder;
18/// use foxtive_worker::strategies::LoadBalancingStrategy;
19/// use foxtive_worker::metrics::NoOpMetrics;
20/// use foxtive_worker::{Worker, ReceivedMessage};
21/// use foxtive_worker::error::WorkerResult;
22/// use std::sync::Arc;
23///
24/// struct MyWorker;
25/// #[async_trait::async_trait]
26/// impl Worker for MyWorker {
27///     fn id(&self) -> &str { "my-worker" }
28///     async fn process(&self, _msg: ReceivedMessage<serde_json::Value>) -> WorkerResult<()> {
29///         Ok(())
30///     }
31/// }
32///
33/// let pool = WorkerPoolBuilder::new("my-pool")
34///     .with_strategy(LoadBalancingStrategy::RoundRobin)
35///     .with_concurrency_limit(100)
36///     .with_metrics_collector(Arc::new(NoOpMetrics))
37///     .add_worker(MyWorker)
38///     .build();
39/// ```
40pub struct WorkerPoolBuilder {
41    name: String,
42    strategy: LoadBalancingStrategy,
43    concurrency_limit: usize,
44    workers: Vec<Arc<dyn Worker>>,
45    middlewares: Vec<Arc<dyn Middleware>>,
46    metrics_collector: Option<Arc<dyn WorkerMetrics>>,
47}
48
49impl WorkerPoolBuilder {
50    /// Create a new builder with the given pool name.
51    pub fn new(name: impl Into<String>) -> Self {
52        Self {
53            name: name.into(),
54            strategy: LoadBalancingStrategy::default(),
55            concurrency_limit: 1000,
56            workers: Vec::new(),
57            middlewares: Vec::new(),
58            metrics_collector: None,
59        }
60    }
61
62    /// Set the load balancing strategy.
63    ///
64    /// # Arguments
65    /// * `strategy` - The strategy to use for distributing messages
66    pub fn with_strategy(mut self, strategy: LoadBalancingStrategy) -> Self {
67        self.strategy = strategy;
68        self
69    }
70
71    /// Set the concurrency limit for the pool.
72    ///
73    /// This limits how many messages can be processed concurrently across all workers.
74    ///
75    /// # Arguments
76    /// * `limit` - Maximum concurrent messages
77    pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
78        self.concurrency_limit = limit;
79        self
80    }
81
82    /// Set the metrics collector for the pool.
83    ///
84    /// # Arguments
85    /// * `collector` - An `Arc` to an object implementing `WorkerMetrics`.
86    pub fn with_metrics_collector(mut self, collector: Arc<dyn WorkerMetrics>) -> Self {
87        self.metrics_collector = Some(collector);
88        self
89    }
90
91    /// Add a single worker to the pool.
92    ///
93    /// # Arguments
94    /// * `worker` - The worker to add
95    pub fn add_worker<W: Worker + 'static>(mut self, worker: W) -> Self {
96        self.workers.push(Arc::new(worker));
97        self
98    }
99
100    /// Add a boxed worker to the pool.
101    ///
102    /// Useful when adding heterogeneous workers.
103    ///
104    /// # Arguments
105    /// * `worker` - The boxed worker to add
106    pub fn add_boxed_worker(mut self, worker: Box<dyn Worker>) -> Self {
107        self.workers.push(worker.into());
108        self
109    }
110
111    /// Add an Arc-wrapped worker to the pool.
112    ///
113    /// This is the most efficient way if you already have an Arc.
114    ///
115    /// # Arguments
116    /// * `worker` - The Arc-wrapped worker to add
117    pub fn add_arc_worker(mut self, worker: Arc<dyn Worker>) -> Self {
118        self.workers.push(worker);
119        self
120    }
121
122    /// Add multiple workers to the pool.
123    ///
124    /// # Arguments
125    /// * `workers` - Vector of workers to add
126    pub fn add_workers<W: Worker + 'static>(mut self, workers: Vec<W>) -> Self {
127        for worker in workers {
128            self.workers.push(Arc::new(worker));
129        }
130        self
131    }
132
133    /// Add a middleware to the pool.
134    ///
135    /// Middleware will be executed in the order they are added, forming a chain
136    /// that processes messages before they reach the workers.
137    ///
138    /// # Arguments
139    /// * `middleware` - The middleware to add
140    pub fn with_middleware<M: Middleware + 'static>(mut self, middleware: M) -> Self {
141        self.middlewares.push(Arc::new(middleware));
142        self
143    }
144
145    /// Add multiple middleware to the pool.
146    pub fn with_middlewares(mut self, middlewares: Vec<Arc<dyn Middleware>>) -> Self {
147        self.middlewares.extend(middlewares);
148        self
149    }
150
151    /// Build the worker pool with the configured settings.
152    ///
153    /// # Returns
154    /// A configured `WorkerPool` ready to accept messages
155    ///
156    /// # Errors
157    /// Returns `WorkerError::ConfigError` if:
158    /// - No workers were added to the pool
159    pub fn build(self) -> WorkerResult<WorkerPool> {
160        if self.workers.is_empty() {
161            return Err(WorkerError::ConfigError(
162                "Cannot build worker pool without any workers".to_string(),
163            ));
164        }
165
166        let metrics_collector = self
167            .metrics_collector
168            .unwrap_or_else(|| Arc::new(NoOpMetrics));
169
170        let mut pool = WorkerPool::with_concurrency(
171            &self.name,
172            self.strategy,
173            self.concurrency_limit,
174            metrics_collector,
175        );
176
177        // Add workers
178        for worker in self.workers {
179            pool.add_worker(worker);
180        }
181
182        // Pass middleware list to pool (will be used to build dynamic chains at dispatch time)
183        if !self.middlewares.is_empty() {
184            pool = pool.with_middlewares(self.middlewares);
185        }
186
187        Ok(pool)
188    }
189
190    /// Build the worker pool, returning the pool even if no workers were added.
191    ///
192    /// This is useful for dynamic worker addition later.
193    pub fn build_allow_empty(self) -> WorkerPool {
194        let metrics_collector = self
195            .metrics_collector
196            .unwrap_or_else(|| Arc::new(NoOpMetrics));
197
198        let mut pool = WorkerPool::with_concurrency(
199            &self.name,
200            self.strategy,
201            self.concurrency_limit,
202            metrics_collector,
203        );
204
205        for worker in self.workers {
206            pool.add_worker(worker);
207        }
208
209        pool
210    }
211}
212
213/// Placeholder handler used during middleware chain building.
214/// In practice, the WorkerPool replaces this with actual worker dispatch.
215#[allow(dead_code)]
216struct PlaceholderHandler;
217
218#[async_trait::async_trait]
219impl MessageHandler for PlaceholderHandler {
220    async fn handle(
221        &self,
222        _message: ReceivedMessage<serde_json::Value>,
223    ) -> Result<crate::middleware::MiddlewareResult, WorkerError> {
224        // This should never be called in production
225        // The WorkerPool's dispatch method wraps workers with proper handlers
226        Err(WorkerError::ProcessingFailed(
227            "PlaceholderHandler should not be invoked".to_string(),
228        ))
229    }
230}
231
232/// Wrapper to convert Box<dyn MessageHandler> to Arc-compatible type
233#[allow(dead_code)]
234struct ArcWrapper(Box<dyn MessageHandler>);
235
236#[async_trait::async_trait]
237impl MessageHandler for ArcWrapper {
238    async fn handle(
239        &self,
240        message: ReceivedMessage<serde_json::Value>,
241    ) -> Result<crate::middleware::MiddlewareResult, WorkerError> {
242        self.0.handle(message).await
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249    use crate::message::{AckHandle, ReceivedMessage};
250    use async_trait::async_trait;
251    use std::time::Instant;
252
253    #[derive(Debug)]
254    #[allow(unused)]
255    struct MockAckHandle;
256
257    #[async_trait]
258    impl AckHandle for MockAckHandle {
259        async fn ack(&self) -> WorkerResult<()> {
260            Ok(())
261        }
262
263        async fn nack(&self, _requeue: bool) -> WorkerResult<()> {
264            Ok(())
265        }
266    }
267
268    struct TestWorker {
269        id: String,
270    }
271
272    impl TestWorker {
273        fn new(id: &str) -> Self {
274            Self { id: id.to_string() }
275        }
276    }
277
278    #[async_trait]
279    impl Worker for TestWorker {
280        fn id(&self) -> &str {
281            &self.id
282        }
283
284        async fn process(&self, _message: ReceivedMessage<serde_json::Value>) -> WorkerResult<()> {
285            Ok(())
286        }
287    }
288
289    #[test]
290    fn test_builder_creation() {
291        let builder = WorkerPoolBuilder::new("test-pool");
292        assert_eq!(builder.name, "test-pool");
293    }
294
295    #[test]
296    fn test_builder_with_strategy() {
297        let builder =
298            WorkerPoolBuilder::new("test-pool").with_strategy(LoadBalancingStrategy::Random);
299        assert!(matches!(builder.strategy, LoadBalancingStrategy::Random));
300    }
301
302    #[test]
303    fn test_builder_with_concurrency_limit() {
304        let builder = WorkerPoolBuilder::new("test-pool").with_concurrency_limit(50);
305        assert_eq!(builder.concurrency_limit, 50);
306    }
307
308    #[test]
309    fn test_builder_add_worker() {
310        let builder = WorkerPoolBuilder::new("test-pool").add_worker(TestWorker::new("worker-1"));
311        assert_eq!(builder.workers.len(), 1);
312    }
313
314    #[test]
315    fn test_builder_add_multiple_workers() {
316        let workers = vec![
317            TestWorker::new("worker-1"),
318            TestWorker::new("worker-2"),
319            TestWorker::new("worker-3"),
320        ];
321
322        let builder = WorkerPoolBuilder::new("test-pool").add_workers(workers);
323        assert_eq!(builder.workers.len(), 3);
324    }
325
326    #[test]
327    fn test_builder_build_success() {
328        let result = WorkerPoolBuilder::new("test-pool")
329            .add_worker(TestWorker::new("worker-1"))
330            .add_worker(TestWorker::new("worker-2"))
331            .build();
332
333        assert!(result.is_ok());
334        let pool = result.unwrap();
335        assert_eq!(pool.worker_count(), 2);
336        assert_eq!(pool.name(), "test-pool");
337    }
338
339    #[test]
340    fn test_builder_build_no_workers_error() {
341        let result = WorkerPoolBuilder::new("test-pool").build();
342
343        assert!(result.is_err());
344        match result.unwrap_err() {
345            WorkerError::ConfigError(msg) => {
346                assert!(msg.contains("without any workers"));
347            }
348            _ => panic!("Expected ConfigError"),
349        }
350    }
351
352    #[test]
353    fn test_builder_build_allow_empty() {
354        let pool = WorkerPoolBuilder::new("test-pool").build_allow_empty();
355
356        assert_eq!(pool.worker_count(), 0);
357        assert_eq!(pool.name(), "test-pool");
358    }
359
360    #[test]
361    fn test_builder_chaining() {
362        let result = WorkerPoolBuilder::new("test-pool")
363            .with_strategy(LoadBalancingStrategy::LeastLoaded)
364            .with_concurrency_limit(100)
365            .add_worker(TestWorker::new("worker-1"))
366            .add_worker(TestWorker::new("worker-2"))
367            .build();
368
369        assert!(result.is_ok());
370        let pool = result.unwrap();
371        assert_eq!(pool.worker_count(), 2);
372    }
373
374    #[test]
375    fn test_builder_with_metrics_collector() {
376        struct MockMetrics;
377        impl WorkerMetrics for MockMetrics {
378            fn record_message_received(&self, _worker_id: &str, _queue_name: &str) {}
379            fn record_message_processed(
380                &self,
381                _worker_id: &str,
382                _queue_name: &str,
383                _start_time: Instant,
384            ) {
385            }
386            fn record_message_failed(
387                &self,
388                _worker_id: &str,
389                _queue_name: &str,
390                _error_type: &str,
391                _start_time: Instant,
392            ) {
393            }
394            fn record_message_retried(&self, _worker_id: &str, _queue_name: &str, _attempt: u32) {}
395            fn record_message_retries_exhausted(&self, _worker_id: &str, _queue_name: &str) {}
396            fn record_message_sent_to_dlq(&self, _queue_name: &str, _is_poison_pill: bool) {}
397            fn record_active_workers(&self, _count: usize) {}
398            fn record_in_flight_messages(&self, _count: usize) {}
399        }
400
401        let metrics_collector = Arc::new(MockMetrics);
402        let pool = WorkerPoolBuilder::new("test-pool")
403            .add_worker(TestWorker::new("worker-1"))
404            .with_metrics_collector(metrics_collector.clone())
405            .build()
406            .unwrap();
407
408        // Assert that the pool has the custom metrics collector (by comparing Arc pointers)
409        // Note: metrics_collector is private, so we can't directly access it in tests.
410        // We'll just verify the pool was built successfully.
411        assert_eq!(pool.worker_count(), 1);
412    }
413}