Skip to main content

varpulis_runtime/
worker_pool.rs

1//! Worker Pool for Controlled Parallelism
2//!
3//! **Status: Production-ready, opt-in.**  The worker pool is a library component
4//! for callers who need explicit control over parallel event dispatch.  It is not
5//! auto-wired into the engine — instantiate [`WorkerPool`] and submit events to
6//! it directly.
7//!
8//! Unlike Apama's opaque spawn/contexts, Varpulis provides explicit control over
9//! parallel workers with monitoring, backpressure, and dynamic resizing.
10//!
11//! # Example
12//! ```text
13//! let config = WorkerPoolConfig {
14//!     name: "OrderProcessors".to_string(),
15//!     workers: 4,
16//!     queue_size: 1000,
17//!     backpressure: BackpressureStrategy::DropOldest,
18//! };
19//!
20//! let pool = WorkerPool::new(config);
21//! pool.submit(event, "customer_123").await?;
22//! println!("Metrics: {:?}", pool.metrics());
23//! ```
24
25use std::collections::HashMap;
26use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
27use std::sync::Arc;
28use std::time::{Duration, Instant};
29
30use tokio::sync::{mpsc, Mutex, RwLock};
31use tracing::{debug, info, warn};
32
33use crate::event::Event;
34
35/// Configuration for a worker pool
36#[derive(Debug, Clone)]
37pub struct WorkerPoolConfig {
38    /// Pool name for identification
39    pub name: String,
40    /// Number of worker tasks
41    pub workers: usize,
42    /// Maximum queue size per worker
43    pub queue_size: usize,
44    /// Backpressure strategy when queue is full
45    pub backpressure: BackpressureStrategy,
46}
47
48impl Default for WorkerPoolConfig {
49    fn default() -> Self {
50        Self {
51            name: "default".to_string(),
52            workers: 4,
53            queue_size: 1000,
54            backpressure: BackpressureStrategy::Block,
55        }
56    }
57}
58
59/// Strategy for handling backpressure when queue is full
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum BackpressureStrategy {
62    /// Block sender until space available
63    Block,
64    /// Drop oldest events in queue
65    DropOldest,
66    /// Drop incoming events
67    DropNewest,
68    /// Return error to sender
69    Error,
70}
71
72/// Error returned when backpressure strategy is Error and the worker pool queue is full.
73///
74/// Distinct from [`crate::sase::BackpressureError`] which covers SASE run-level backpressure.
75#[derive(Debug, Clone, thiserror::Error)]
76#[error("Pool '{}' queue full (depth: {})", pool_name, queue_depth)]
77pub struct PoolBackpressureError {
78    pub pool_name: String,
79    pub queue_depth: usize,
80}
81
82/// Metrics for a worker pool
83#[derive(Debug, Clone, Default)]
84pub struct WorkerPoolMetrics {
85    /// Number of currently active (processing) workers
86    pub active_workers: usize,
87    /// Number of idle workers
88    pub idle_workers: usize,
89    /// Current total queue depth across all workers
90    pub queue_depth: usize,
91    /// Total events processed
92    pub events_processed: u64,
93    /// Events dropped due to backpressure
94    pub events_dropped: u64,
95    /// Average processing latency in microseconds
96    pub avg_latency_us: f64,
97    /// 99th percentile latency in microseconds
98    pub p99_latency_us: f64,
99}
100
101/// Status of an individual worker
102#[derive(Debug, Clone)]
103pub struct WorkerStatus {
104    /// Worker ID
105    pub id: usize,
106    /// Current state
107    pub state: WorkerState,
108    /// Current partition being processed (if any)
109    pub current_partition: Option<String>,
110    /// Events processed by this worker
111    pub events_processed: u64,
112    /// Last time this worker was active
113    pub last_active: Instant,
114}
115
116/// State of a worker
117#[derive(Debug, Clone, Copy, PartialEq, Eq)]
118pub enum WorkerState {
119    /// Waiting for events
120    Idle,
121    /// Processing an event
122    Processing,
123    /// Finishing current work before shutdown
124    Draining,
125    /// Stopped
126    Stopped,
127}
128
129/// Internal worker data shared between pool and worker tasks
130#[derive(Debug)]
131struct WorkerData {
132    id: usize,
133    state: AtomicUsize, // WorkerState as usize
134    current_partition: RwLock<Option<String>>,
135    events_processed: AtomicU64,
136    last_active: RwLock<Instant>,
137}
138
139impl WorkerData {
140    fn new(id: usize) -> Self {
141        Self {
142            id,
143            state: AtomicUsize::new(WorkerState::Idle as usize),
144            current_partition: RwLock::new(None),
145            events_processed: AtomicU64::new(0),
146            last_active: RwLock::new(Instant::now()),
147        }
148    }
149
150    fn get_state(&self) -> WorkerState {
151        match self.state.load(Ordering::Relaxed) {
152            0 => WorkerState::Idle,
153            1 => WorkerState::Processing,
154            2 => WorkerState::Draining,
155            _ => WorkerState::Stopped,
156        }
157    }
158
159    fn set_state(&self, state: WorkerState) {
160        self.state.store(state as usize, Ordering::Relaxed);
161    }
162}
163
164/// Event with partition key for routing
165struct PartitionedEvent {
166    event: Event,
167    partition_key: String,
168}
169
170/// A pool of workers for parallel event processing
171#[derive(Debug)]
172pub struct WorkerPool {
173    config: WorkerPoolConfig,
174    /// Sender to dispatch events to workers
175    dispatch_tx: mpsc::Sender<PartitionedEvent>,
176    /// Worker data for monitoring
177    workers: Vec<Arc<WorkerData>>,
178    /// Partition to worker mapping for affinity
179    #[allow(dead_code)]
180    pub(crate) partition_affinity: Arc<RwLock<HashMap<String, usize>>>,
181    /// Global metrics
182    events_processed: AtomicU64,
183    events_dropped: AtomicU64,
184    /// Latency tracking (ring buffer of recent latencies in microseconds)
185    latencies: Arc<Mutex<Vec<u64>>>,
186    /// Shutdown signal
187    shutdown_tx: Option<mpsc::Sender<()>>,
188}
189
190impl WorkerPool {
191    /// Create a new worker pool with the given configuration
192    pub fn new<F>(config: WorkerPoolConfig, processor: F) -> Self
193    where
194        F: Fn(Event) + Send + Sync + Clone + 'static,
195    {
196        let (dispatch_tx, dispatch_rx) = mpsc::channel(config.queue_size * config.workers);
197        let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
198
199        let mut workers = Vec::with_capacity(config.workers);
200        for i in 0..config.workers {
201            workers.push(Arc::new(WorkerData::new(i)));
202        }
203
204        let partition_affinity = Arc::new(RwLock::new(HashMap::new()));
205        let latencies = Arc::new(Mutex::new(Vec::with_capacity(1000)));
206
207        // Spawn the dispatcher task
208        let workers_clone = workers.clone();
209        let partition_affinity_clone = partition_affinity.clone();
210        let latencies_clone = latencies.clone();
211        let processor_clone = processor;
212        let pool_name = config.name.clone();
213        let num_workers = config.workers;
214
215        tokio::spawn(Self::dispatcher_task(
216            dispatch_rx,
217            shutdown_rx,
218            workers_clone,
219            partition_affinity_clone,
220            latencies_clone,
221            processor_clone,
222            pool_name,
223            num_workers,
224        ));
225
226        Self {
227            config,
228            dispatch_tx,
229            workers,
230            partition_affinity,
231            events_processed: AtomicU64::new(0),
232            events_dropped: AtomicU64::new(0),
233            latencies,
234            shutdown_tx: Some(shutdown_tx),
235        }
236    }
237
238    /// Dispatcher task that routes events to workers
239    #[allow(clippy::too_many_arguments)]
240    async fn dispatcher_task<F>(
241        mut rx: mpsc::Receiver<PartitionedEvent>,
242        mut shutdown_rx: mpsc::Receiver<()>,
243        workers: Vec<Arc<WorkerData>>,
244        partition_affinity: Arc<RwLock<HashMap<String, usize>>>,
245        latencies: Arc<Mutex<Vec<u64>>>,
246        processor: F,
247        pool_name: String,
248        num_workers: usize,
249    ) where
250        F: Fn(Event) + Send + Sync + Clone + 'static,
251    {
252        info!(
253            "Worker pool '{}' started with {} workers",
254            pool_name, num_workers
255        );
256
257        loop {
258            tokio::select! {
259                Some(partitioned_event) = rx.recv() => {
260                    // Determine which worker should handle this partition
261                    let worker_id = {
262                        let affinity = partition_affinity.read().await;
263                        if let Some(&id) = affinity.get(&partitioned_event.partition_key) {
264                            id
265                        } else {
266                            // Assign to least loaded worker (simple round-robin for now)
267                            let mut min_load = u64::MAX;
268                            let mut best_worker = 0;
269                            for (i, w) in workers.iter().enumerate() {
270                                let load = w.events_processed.load(Ordering::Relaxed);
271                                if load < min_load && w.get_state() != WorkerState::Stopped {
272                                    min_load = load;
273                                    best_worker = i;
274                                }
275                            }
276                            // Update affinity
277                            drop(affinity);
278                            partition_affinity.write().await.insert(
279                                partitioned_event.partition_key.clone(),
280                                best_worker,
281                            );
282                            best_worker
283                        }
284                    };
285
286                    // Process the event
287                    let worker = &workers[worker_id];
288                    worker.set_state(WorkerState::Processing);
289                    *worker.current_partition.write().await = Some(partitioned_event.partition_key);
290
291                    let start = Instant::now();
292                    processor(partitioned_event.event);
293                    let elapsed_us = start.elapsed().as_micros() as u64;
294
295                    // Update metrics
296                    worker.events_processed.fetch_add(1, Ordering::Relaxed);
297                    *worker.last_active.write().await = Instant::now();
298                    worker.set_state(WorkerState::Idle);
299                    *worker.current_partition.write().await = None;
300
301                    // Track latency
302                    let mut lat = latencies.lock().await;
303                    if lat.len() >= 1000 {
304                        lat.remove(0);
305                    }
306                    lat.push(elapsed_us);
307                }
308                _ = shutdown_rx.recv() => {
309                    info!("Worker pool '{}' shutting down", pool_name);
310                    for w in &workers {
311                        w.set_state(WorkerState::Stopped);
312                    }
313                    break;
314                }
315            }
316        }
317    }
318
319    /// Submit an event to the pool with a partition key
320    pub async fn submit(
321        &self,
322        event: Event,
323        partition_key: &str,
324    ) -> Result<(), PoolBackpressureError> {
325        let partitioned = PartitionedEvent {
326            event,
327            partition_key: partition_key.to_string(),
328        };
329
330        match self.config.backpressure {
331            BackpressureStrategy::Block => {
332                if self.dispatch_tx.send(partitioned).await.is_err() {
333                    warn!("Pool '{}' dispatch channel closed", self.config.name);
334                }
335                self.events_processed.fetch_add(1, Ordering::Relaxed);
336                Ok(())
337            }
338            BackpressureStrategy::DropNewest => {
339                match self.dispatch_tx.try_send(partitioned) {
340                    Ok(()) => {
341                        self.events_processed.fetch_add(1, Ordering::Relaxed);
342                        Ok(())
343                    }
344                    Err(_) => {
345                        self.events_dropped.fetch_add(1, Ordering::Relaxed);
346                        debug!("Pool '{}' dropped event (queue full)", self.config.name);
347                        Ok(()) // Silently drop
348                    }
349                }
350            }
351            BackpressureStrategy::DropOldest => {
352                // For drop oldest, we always accept but may drop from queue
353                // This is a simplification - proper implementation would need a custom queue
354                match self.dispatch_tx.try_send(partitioned) {
355                    Ok(()) => {
356                        self.events_processed.fetch_add(1, Ordering::Relaxed);
357                    }
358                    Err(mpsc::error::TrySendError::Full(p)) => {
359                        // Queue full, force send (may block briefly)
360                        self.events_dropped.fetch_add(1, Ordering::Relaxed);
361                        let _ = self.dispatch_tx.send(p).await;
362                        self.events_processed.fetch_add(1, Ordering::Relaxed);
363                    }
364                    Err(_) => {}
365                }
366                Ok(())
367            }
368            BackpressureStrategy::Error => match self.dispatch_tx.try_send(partitioned) {
369                Ok(()) => {
370                    self.events_processed.fetch_add(1, Ordering::Relaxed);
371                    Ok(())
372                }
373                Err(_) => {
374                    self.events_dropped.fetch_add(1, Ordering::Relaxed);
375                    Err(PoolBackpressureError {
376                        pool_name: self.config.name.clone(),
377                        queue_depth: self.queue_depth(),
378                    })
379                }
380            },
381        }
382    }
383
384    /// Get current queue depth
385    pub fn queue_depth(&self) -> usize {
386        // Approximate based on channel capacity
387        self.config.queue_size * self.config.workers - self.dispatch_tx.capacity()
388    }
389
390    /// Get pool metrics
391    pub async fn metrics(&self) -> WorkerPoolMetrics {
392        let mut active = 0;
393        let mut idle = 0;
394
395        for w in &self.workers {
396            match w.get_state() {
397                WorkerState::Processing => active += 1,
398                WorkerState::Idle => idle += 1,
399                _ => {}
400            }
401        }
402
403        let latencies = self.latencies.lock().await;
404        let avg_latency = if latencies.is_empty() {
405            0.0
406        } else {
407            latencies.iter().sum::<u64>() as f64 / latencies.len() as f64
408        };
409
410        let p99_latency = if latencies.is_empty() {
411            0.0
412        } else {
413            let mut sorted = latencies.clone();
414            sorted.sort_unstable();
415            let idx = (sorted.len() as f64 * 0.99) as usize;
416            sorted.get(idx.min(sorted.len() - 1)).copied().unwrap_or(0) as f64
417        };
418
419        WorkerPoolMetrics {
420            active_workers: active,
421            idle_workers: idle,
422            queue_depth: self.queue_depth(),
423            events_processed: self.events_processed.load(Ordering::Relaxed),
424            events_dropped: self.events_dropped.load(Ordering::Relaxed),
425            avg_latency_us: avg_latency,
426            p99_latency_us: p99_latency,
427        }
428    }
429
430    /// Get status of all workers
431    pub async fn worker_statuses(&self) -> Vec<WorkerStatus> {
432        let mut statuses = Vec::with_capacity(self.workers.len());
433        for w in &self.workers {
434            statuses.push(WorkerStatus {
435                id: w.id,
436                state: w.get_state(),
437                current_partition: w.current_partition.read().await.clone(),
438                events_processed: w.events_processed.load(Ordering::Relaxed),
439                last_active: *w.last_active.read().await,
440            });
441        }
442        statuses
443    }
444
445    /// Get pool configuration
446    pub const fn config(&self) -> &WorkerPoolConfig {
447        &self.config
448    }
449
450    /// Graceful shutdown
451    pub async fn shutdown(&mut self) -> Duration {
452        let start = Instant::now();
453        if let Some(tx) = self.shutdown_tx.take() {
454            let _ = tx.send(()).await;
455        }
456        // Give workers time to finish
457        tokio::time::sleep(Duration::from_millis(100)).await;
458        start.elapsed()
459    }
460}
461
462impl Drop for WorkerPool {
463    fn drop(&mut self) {
464        // Best effort shutdown
465        if let Some(tx) = self.shutdown_tx.take() {
466            let _ = tx.try_send(());
467        }
468    }
469}
470
471// ============================================================================
472// Tests
473// ============================================================================
474
475#[cfg(test)]
476mod tests {
477    use std::sync::atomic::AtomicUsize;
478
479    use super::*;
480
481    #[tokio::test]
482    async fn test_worker_pool_creation() {
483        let counter = Arc::new(AtomicUsize::new(0));
484        let counter_clone = counter;
485
486        let config = WorkerPoolConfig {
487            name: "test".to_string(),
488            workers: 2,
489            queue_size: 10,
490            backpressure: BackpressureStrategy::Block,
491        };
492
493        let pool = WorkerPool::new(config, move |_event| {
494            counter_clone.fetch_add(1, Ordering::Relaxed);
495        });
496
497        assert_eq!(pool.config().workers, 2);
498        assert_eq!(pool.config().name, "test");
499    }
500
501    #[tokio::test]
502    async fn test_worker_pool_submit() {
503        let counter = Arc::new(AtomicUsize::new(0));
504        let counter_clone = counter.clone();
505
506        let config = WorkerPoolConfig {
507            name: "test".to_string(),
508            workers: 2,
509            queue_size: 100,
510            backpressure: BackpressureStrategy::Block,
511        };
512
513        let pool = WorkerPool::new(config, move |_event| {
514            counter_clone.fetch_add(1, Ordering::Relaxed);
515        });
516
517        // Submit some events
518        for i in 0..10 {
519            let event = Event::new("TestEvent").with_field("id", i as i64);
520            pool.submit(event, &format!("partition_{}", i % 3))
521                .await
522                .unwrap();
523        }
524
525        // Wait for processing
526        tokio::time::sleep(Duration::from_millis(100)).await;
527
528        assert_eq!(counter.load(Ordering::Relaxed), 10);
529    }
530
531    #[tokio::test]
532    async fn test_worker_pool_metrics() {
533        let config = WorkerPoolConfig {
534            name: "metrics_test".to_string(),
535            workers: 4,
536            queue_size: 100,
537            backpressure: BackpressureStrategy::Block,
538        };
539
540        let pool = WorkerPool::new(config, |_| {});
541
542        let metrics = pool.metrics().await;
543        assert_eq!(metrics.idle_workers + metrics.active_workers, 4);
544        assert_eq!(metrics.events_processed, 0);
545    }
546
547    #[tokio::test]
548    async fn test_worker_pool_backpressure_drop_newest() {
549        let config = WorkerPoolConfig {
550            name: "backpressure_test".to_string(),
551            workers: 1,
552            queue_size: 2,
553            backpressure: BackpressureStrategy::DropNewest,
554        };
555
556        let pool = WorkerPool::new(config, |_| {
557            std::thread::sleep(Duration::from_millis(50));
558        });
559
560        // Submit many events quickly
561        for i in 0..100 {
562            let event = Event::new("TestEvent").with_field("id", i as i64);
563            let _ = pool.submit(event, "partition").await;
564        }
565
566        let metrics = pool.metrics().await;
567        // Some events should have been dropped
568        assert!(metrics.events_dropped > 0 || metrics.events_processed > 0);
569    }
570
571    #[tokio::test]
572    async fn test_worker_pool_backpressure_error() {
573        let config = WorkerPoolConfig {
574            name: "error_test".to_string(),
575            workers: 1,
576            queue_size: 1,
577            backpressure: BackpressureStrategy::Error,
578        };
579
580        let pool = WorkerPool::new(config, |_| {
581            std::thread::sleep(Duration::from_millis(100));
582        });
583
584        // First event should succeed
585        let event1 = Event::new("TestEvent").with_field("id", 1i64);
586        assert!(pool.submit(event1, "partition").await.is_ok());
587
588        // Subsequent events may fail due to backpressure
589        let mut errors = 0;
590        for i in 2..10 {
591            let event = Event::new("TestEvent").with_field("id", i as i64);
592            if pool.submit(event, "partition").await.is_err() {
593                errors += 1;
594            }
595        }
596
597        // Should have some errors
598        assert!(errors > 0);
599    }
600
601    #[tokio::test]
602    async fn test_worker_pool_partition_affinity() {
603        let config = WorkerPoolConfig {
604            name: "affinity_test".to_string(),
605            workers: 4,
606            queue_size: 100,
607            backpressure: BackpressureStrategy::Block,
608        };
609
610        let pool = WorkerPool::new(config, |_| {});
611
612        // Submit events with same partition key
613        for _ in 0..5 {
614            let event = Event::new("TestEvent");
615            pool.submit(event, "same_partition").await.unwrap();
616        }
617
618        tokio::time::sleep(Duration::from_millis(100)).await;
619
620        // Check that partition affinity was established
621        let affinity = pool.partition_affinity.read().await;
622        assert!(affinity.contains_key("same_partition"));
623    }
624
625    #[tokio::test]
626    async fn test_worker_pool_shutdown() {
627        let config = WorkerPoolConfig::default();
628        let mut pool = WorkerPool::new(config, |_| {});
629
630        let duration = pool.shutdown().await;
631        assert!(duration < Duration::from_secs(1));
632    }
633}