Skip to main content

varpulis_runtime/
worker_pool.rs

1//! Worker Pool for Controlled Parallelism
2//!
3//! Unlike Apama's opaque spawn/contexts, Varpulis provides explicit control over
4//! parallel workers with monitoring, backpressure, and dynamic resizing.
5//!
6//! # Example
7//! ```text
8//! let config = WorkerPoolConfig {
9//!     name: "OrderProcessors".to_string(),
10//!     workers: 4,
11//!     queue_size: 1000,
12//!     backpressure: BackpressureStrategy::DropOldest,
13//! };
14//!
15//! let pool = WorkerPool::new(config);
16//! pool.submit(event, "customer_123").await?;
17//! println!("Metrics: {:?}", pool.metrics());
18//! ```
19
20use std::collections::HashMap;
21use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24
25use tokio::sync::{mpsc, Mutex, RwLock};
26use tracing::{debug, info, warn};
27
28use crate::event::Event;
29
30/// Configuration for a worker pool
31#[derive(Debug, Clone)]
32pub struct WorkerPoolConfig {
33    /// Pool name for identification
34    pub name: String,
35    /// Number of worker tasks
36    pub workers: usize,
37    /// Maximum queue size per worker
38    pub queue_size: usize,
39    /// Backpressure strategy when queue is full
40    pub backpressure: BackpressureStrategy,
41}
42
43impl Default for WorkerPoolConfig {
44    fn default() -> Self {
45        Self {
46            name: "default".to_string(),
47            workers: 4,
48            queue_size: 1000,
49            backpressure: BackpressureStrategy::Block,
50        }
51    }
52}
53
54/// Strategy for handling backpressure when queue is full
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum BackpressureStrategy {
57    /// Block sender until space available
58    Block,
59    /// Drop oldest events in queue
60    DropOldest,
61    /// Drop incoming events
62    DropNewest,
63    /// Return error to sender
64    Error,
65}
66
67/// Error returned when backpressure strategy is Error and the worker pool queue is full.
68///
69/// Distinct from [`crate::sase::BackpressureError`] which covers SASE run-level backpressure.
70#[derive(Debug, Clone, thiserror::Error)]
71#[error("Pool '{}' queue full (depth: {})", pool_name, queue_depth)]
72pub struct PoolBackpressureError {
73    pub pool_name: String,
74    pub queue_depth: usize,
75}
76
77/// Metrics for a worker pool
78#[derive(Debug, Clone, Default)]
79pub struct WorkerPoolMetrics {
80    /// Number of currently active (processing) workers
81    pub active_workers: usize,
82    /// Number of idle workers
83    pub idle_workers: usize,
84    /// Current total queue depth across all workers
85    pub queue_depth: usize,
86    /// Total events processed
87    pub events_processed: u64,
88    /// Events dropped due to backpressure
89    pub events_dropped: u64,
90    /// Average processing latency in microseconds
91    pub avg_latency_us: f64,
92    /// 99th percentile latency in microseconds
93    pub p99_latency_us: f64,
94}
95
96/// Status of an individual worker
97#[derive(Debug, Clone)]
98pub struct WorkerStatus {
99    /// Worker ID
100    pub id: usize,
101    /// Current state
102    pub state: WorkerState,
103    /// Current partition being processed (if any)
104    pub current_partition: Option<String>,
105    /// Events processed by this worker
106    pub events_processed: u64,
107    /// Last time this worker was active
108    pub last_active: Instant,
109}
110
111/// State of a worker
112#[derive(Debug, Clone, Copy, PartialEq, Eq)]
113pub enum WorkerState {
114    /// Waiting for events
115    Idle,
116    /// Processing an event
117    Processing,
118    /// Finishing current work before shutdown
119    Draining,
120    /// Stopped
121    Stopped,
122}
123
124/// Internal worker data shared between pool and worker tasks
125#[derive(Debug)]
126struct WorkerData {
127    id: usize,
128    state: AtomicUsize, // WorkerState as usize
129    current_partition: RwLock<Option<String>>,
130    events_processed: AtomicU64,
131    last_active: RwLock<Instant>,
132}
133
134impl WorkerData {
135    fn new(id: usize) -> Self {
136        Self {
137            id,
138            state: AtomicUsize::new(WorkerState::Idle as usize),
139            current_partition: RwLock::new(None),
140            events_processed: AtomicU64::new(0),
141            last_active: RwLock::new(Instant::now()),
142        }
143    }
144
145    fn get_state(&self) -> WorkerState {
146        match self.state.load(Ordering::Relaxed) {
147            0 => WorkerState::Idle,
148            1 => WorkerState::Processing,
149            2 => WorkerState::Draining,
150            _ => WorkerState::Stopped,
151        }
152    }
153
154    fn set_state(&self, state: WorkerState) {
155        self.state.store(state as usize, Ordering::Relaxed);
156    }
157}
158
159/// Event with partition key for routing
160struct PartitionedEvent {
161    event: Event,
162    partition_key: String,
163}
164
165/// A pool of workers for parallel event processing
166#[derive(Debug)]
167pub struct WorkerPool {
168    config: WorkerPoolConfig,
169    /// Sender to dispatch events to workers
170    dispatch_tx: mpsc::Sender<PartitionedEvent>,
171    /// Worker data for monitoring
172    workers: Vec<Arc<WorkerData>>,
173    /// Partition to worker mapping for affinity
174    #[allow(dead_code)]
175    pub(crate) partition_affinity: Arc<RwLock<HashMap<String, usize>>>,
176    /// Global metrics
177    events_processed: AtomicU64,
178    events_dropped: AtomicU64,
179    /// Latency tracking (ring buffer of recent latencies in microseconds)
180    latencies: Arc<Mutex<Vec<u64>>>,
181    /// Shutdown signal
182    shutdown_tx: Option<mpsc::Sender<()>>,
183}
184
185impl WorkerPool {
186    /// Create a new worker pool with the given configuration
187    pub fn new<F>(config: WorkerPoolConfig, processor: F) -> Self
188    where
189        F: Fn(Event) + Send + Sync + Clone + 'static,
190    {
191        let (dispatch_tx, dispatch_rx) = mpsc::channel(config.queue_size * config.workers);
192        let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
193
194        let mut workers = Vec::with_capacity(config.workers);
195        for i in 0..config.workers {
196            workers.push(Arc::new(WorkerData::new(i)));
197        }
198
199        let partition_affinity = Arc::new(RwLock::new(HashMap::new()));
200        let latencies = Arc::new(Mutex::new(Vec::with_capacity(1000)));
201
202        // Spawn the dispatcher task
203        let workers_clone = workers.clone();
204        let partition_affinity_clone = partition_affinity.clone();
205        let latencies_clone = latencies.clone();
206        let processor_clone = processor;
207        let pool_name = config.name.clone();
208        let num_workers = config.workers;
209
210        tokio::spawn(Self::dispatcher_task(
211            dispatch_rx,
212            shutdown_rx,
213            workers_clone,
214            partition_affinity_clone,
215            latencies_clone,
216            processor_clone,
217            pool_name,
218            num_workers,
219        ));
220
221        Self {
222            config,
223            dispatch_tx,
224            workers,
225            partition_affinity,
226            events_processed: AtomicU64::new(0),
227            events_dropped: AtomicU64::new(0),
228            latencies,
229            shutdown_tx: Some(shutdown_tx),
230        }
231    }
232
233    /// Dispatcher task that routes events to workers
234    #[allow(clippy::too_many_arguments)]
235    async fn dispatcher_task<F>(
236        mut rx: mpsc::Receiver<PartitionedEvent>,
237        mut shutdown_rx: mpsc::Receiver<()>,
238        workers: Vec<Arc<WorkerData>>,
239        partition_affinity: Arc<RwLock<HashMap<String, usize>>>,
240        latencies: Arc<Mutex<Vec<u64>>>,
241        processor: F,
242        pool_name: String,
243        num_workers: usize,
244    ) where
245        F: Fn(Event) + Send + Sync + Clone + 'static,
246    {
247        info!(
248            "Worker pool '{}' started with {} workers",
249            pool_name, num_workers
250        );
251
252        loop {
253            tokio::select! {
254                Some(partitioned_event) = rx.recv() => {
255                    // Determine which worker should handle this partition
256                    let worker_id = {
257                        let affinity = partition_affinity.read().await;
258                        if let Some(&id) = affinity.get(&partitioned_event.partition_key) {
259                            id
260                        } else {
261                            // Assign to least loaded worker (simple round-robin for now)
262                            let mut min_load = u64::MAX;
263                            let mut best_worker = 0;
264                            for (i, w) in workers.iter().enumerate() {
265                                let load = w.events_processed.load(Ordering::Relaxed);
266                                if load < min_load && w.get_state() != WorkerState::Stopped {
267                                    min_load = load;
268                                    best_worker = i;
269                                }
270                            }
271                            // Update affinity
272                            drop(affinity);
273                            partition_affinity.write().await.insert(
274                                partitioned_event.partition_key.clone(),
275                                best_worker,
276                            );
277                            best_worker
278                        }
279                    };
280
281                    // Process the event
282                    let worker = &workers[worker_id];
283                    worker.set_state(WorkerState::Processing);
284                    *worker.current_partition.write().await = Some(partitioned_event.partition_key);
285
286                    let start = Instant::now();
287                    processor(partitioned_event.event);
288                    let elapsed_us = start.elapsed().as_micros() as u64;
289
290                    // Update metrics
291                    worker.events_processed.fetch_add(1, Ordering::Relaxed);
292                    *worker.last_active.write().await = Instant::now();
293                    worker.set_state(WorkerState::Idle);
294                    *worker.current_partition.write().await = None;
295
296                    // Track latency
297                    let mut lat = latencies.lock().await;
298                    if lat.len() >= 1000 {
299                        lat.remove(0);
300                    }
301                    lat.push(elapsed_us);
302                }
303                _ = shutdown_rx.recv() => {
304                    info!("Worker pool '{}' shutting down", pool_name);
305                    for w in &workers {
306                        w.set_state(WorkerState::Stopped);
307                    }
308                    break;
309                }
310            }
311        }
312    }
313
314    /// Submit an event to the pool with a partition key
315    pub async fn submit(
316        &self,
317        event: Event,
318        partition_key: &str,
319    ) -> Result<(), PoolBackpressureError> {
320        let partitioned = PartitionedEvent {
321            event,
322            partition_key: partition_key.to_string(),
323        };
324
325        match self.config.backpressure {
326            BackpressureStrategy::Block => {
327                if self.dispatch_tx.send(partitioned).await.is_err() {
328                    warn!("Pool '{}' dispatch channel closed", self.config.name);
329                }
330                self.events_processed.fetch_add(1, Ordering::Relaxed);
331                Ok(())
332            }
333            BackpressureStrategy::DropNewest => {
334                match self.dispatch_tx.try_send(partitioned) {
335                    Ok(()) => {
336                        self.events_processed.fetch_add(1, Ordering::Relaxed);
337                        Ok(())
338                    }
339                    Err(_) => {
340                        self.events_dropped.fetch_add(1, Ordering::Relaxed);
341                        debug!("Pool '{}' dropped event (queue full)", self.config.name);
342                        Ok(()) // Silently drop
343                    }
344                }
345            }
346            BackpressureStrategy::DropOldest => {
347                // For drop oldest, we always accept but may drop from queue
348                // This is a simplification - proper implementation would need a custom queue
349                match self.dispatch_tx.try_send(partitioned) {
350                    Ok(()) => {
351                        self.events_processed.fetch_add(1, Ordering::Relaxed);
352                    }
353                    Err(mpsc::error::TrySendError::Full(p)) => {
354                        // Queue full, force send (may block briefly)
355                        self.events_dropped.fetch_add(1, Ordering::Relaxed);
356                        let _ = self.dispatch_tx.send(p).await;
357                        self.events_processed.fetch_add(1, Ordering::Relaxed);
358                    }
359                    Err(_) => {}
360                }
361                Ok(())
362            }
363            BackpressureStrategy::Error => match self.dispatch_tx.try_send(partitioned) {
364                Ok(()) => {
365                    self.events_processed.fetch_add(1, Ordering::Relaxed);
366                    Ok(())
367                }
368                Err(_) => {
369                    self.events_dropped.fetch_add(1, Ordering::Relaxed);
370                    Err(PoolBackpressureError {
371                        pool_name: self.config.name.clone(),
372                        queue_depth: self.queue_depth(),
373                    })
374                }
375            },
376        }
377    }
378
379    /// Get current queue depth
380    pub fn queue_depth(&self) -> usize {
381        // Approximate based on channel capacity
382        self.config.queue_size * self.config.workers - self.dispatch_tx.capacity()
383    }
384
385    /// Get pool metrics
386    pub async fn metrics(&self) -> WorkerPoolMetrics {
387        let mut active = 0;
388        let mut idle = 0;
389
390        for w in &self.workers {
391            match w.get_state() {
392                WorkerState::Processing => active += 1,
393                WorkerState::Idle => idle += 1,
394                _ => {}
395            }
396        }
397
398        let latencies = self.latencies.lock().await;
399        let avg_latency = if latencies.is_empty() {
400            0.0
401        } else {
402            latencies.iter().sum::<u64>() as f64 / latencies.len() as f64
403        };
404
405        let p99_latency = if latencies.is_empty() {
406            0.0
407        } else {
408            let mut sorted = latencies.clone();
409            sorted.sort_unstable();
410            let idx = (sorted.len() as f64 * 0.99) as usize;
411            sorted.get(idx.min(sorted.len() - 1)).copied().unwrap_or(0) as f64
412        };
413
414        WorkerPoolMetrics {
415            active_workers: active,
416            idle_workers: idle,
417            queue_depth: self.queue_depth(),
418            events_processed: self.events_processed.load(Ordering::Relaxed),
419            events_dropped: self.events_dropped.load(Ordering::Relaxed),
420            avg_latency_us: avg_latency,
421            p99_latency_us: p99_latency,
422        }
423    }
424
425    /// Get status of all workers
426    pub async fn worker_statuses(&self) -> Vec<WorkerStatus> {
427        let mut statuses = Vec::with_capacity(self.workers.len());
428        for w in &self.workers {
429            statuses.push(WorkerStatus {
430                id: w.id,
431                state: w.get_state(),
432                current_partition: w.current_partition.read().await.clone(),
433                events_processed: w.events_processed.load(Ordering::Relaxed),
434                last_active: *w.last_active.read().await,
435            });
436        }
437        statuses
438    }
439
440    /// Get pool configuration
441    pub const fn config(&self) -> &WorkerPoolConfig {
442        &self.config
443    }
444
445    /// Graceful shutdown
446    pub async fn shutdown(&mut self) -> Duration {
447        let start = Instant::now();
448        if let Some(tx) = self.shutdown_tx.take() {
449            let _ = tx.send(()).await;
450        }
451        // Give workers time to finish
452        tokio::time::sleep(Duration::from_millis(100)).await;
453        start.elapsed()
454    }
455}
456
457impl Drop for WorkerPool {
458    fn drop(&mut self) {
459        // Best effort shutdown
460        if let Some(tx) = self.shutdown_tx.take() {
461            let _ = tx.try_send(());
462        }
463    }
464}
465
466// ============================================================================
467// Tests
468// ============================================================================
469
470#[cfg(test)]
471mod tests {
472    use std::sync::atomic::AtomicUsize;
473
474    use super::*;
475
476    #[tokio::test]
477    async fn test_worker_pool_creation() {
478        let counter = Arc::new(AtomicUsize::new(0));
479        let counter_clone = counter;
480
481        let config = WorkerPoolConfig {
482            name: "test".to_string(),
483            workers: 2,
484            queue_size: 10,
485            backpressure: BackpressureStrategy::Block,
486        };
487
488        let pool = WorkerPool::new(config, move |_event| {
489            counter_clone.fetch_add(1, Ordering::Relaxed);
490        });
491
492        assert_eq!(pool.config().workers, 2);
493        assert_eq!(pool.config().name, "test");
494    }
495
496    #[tokio::test]
497    async fn test_worker_pool_submit() {
498        let counter = Arc::new(AtomicUsize::new(0));
499        let counter_clone = counter.clone();
500
501        let config = WorkerPoolConfig {
502            name: "test".to_string(),
503            workers: 2,
504            queue_size: 100,
505            backpressure: BackpressureStrategy::Block,
506        };
507
508        let pool = WorkerPool::new(config, move |_event| {
509            counter_clone.fetch_add(1, Ordering::Relaxed);
510        });
511
512        // Submit some events
513        for i in 0..10 {
514            let event = Event::new("TestEvent").with_field("id", i as i64);
515            pool.submit(event, &format!("partition_{}", i % 3))
516                .await
517                .unwrap();
518        }
519
520        // Wait for processing
521        tokio::time::sleep(Duration::from_millis(100)).await;
522
523        assert_eq!(counter.load(Ordering::Relaxed), 10);
524    }
525
526    #[tokio::test]
527    async fn test_worker_pool_metrics() {
528        let config = WorkerPoolConfig {
529            name: "metrics_test".to_string(),
530            workers: 4,
531            queue_size: 100,
532            backpressure: BackpressureStrategy::Block,
533        };
534
535        let pool = WorkerPool::new(config, |_| {});
536
537        let metrics = pool.metrics().await;
538        assert_eq!(metrics.idle_workers + metrics.active_workers, 4);
539        assert_eq!(metrics.events_processed, 0);
540    }
541
542    #[tokio::test]
543    async fn test_worker_pool_backpressure_drop_newest() {
544        let config = WorkerPoolConfig {
545            name: "backpressure_test".to_string(),
546            workers: 1,
547            queue_size: 2,
548            backpressure: BackpressureStrategy::DropNewest,
549        };
550
551        let pool = WorkerPool::new(config, |_| {
552            std::thread::sleep(Duration::from_millis(50));
553        });
554
555        // Submit many events quickly
556        for i in 0..100 {
557            let event = Event::new("TestEvent").with_field("id", i as i64);
558            let _ = pool.submit(event, "partition").await;
559        }
560
561        let metrics = pool.metrics().await;
562        // Some events should have been dropped
563        assert!(metrics.events_dropped > 0 || metrics.events_processed > 0);
564    }
565
566    #[tokio::test]
567    async fn test_worker_pool_backpressure_error() {
568        let config = WorkerPoolConfig {
569            name: "error_test".to_string(),
570            workers: 1,
571            queue_size: 1,
572            backpressure: BackpressureStrategy::Error,
573        };
574
575        let pool = WorkerPool::new(config, |_| {
576            std::thread::sleep(Duration::from_millis(100));
577        });
578
579        // First event should succeed
580        let event1 = Event::new("TestEvent").with_field("id", 1i64);
581        assert!(pool.submit(event1, "partition").await.is_ok());
582
583        // Subsequent events may fail due to backpressure
584        let mut errors = 0;
585        for i in 2..10 {
586            let event = Event::new("TestEvent").with_field("id", i as i64);
587            if pool.submit(event, "partition").await.is_err() {
588                errors += 1;
589            }
590        }
591
592        // Should have some errors
593        assert!(errors > 0);
594    }
595
596    #[tokio::test]
597    async fn test_worker_pool_partition_affinity() {
598        let config = WorkerPoolConfig {
599            name: "affinity_test".to_string(),
600            workers: 4,
601            queue_size: 100,
602            backpressure: BackpressureStrategy::Block,
603        };
604
605        let pool = WorkerPool::new(config, |_| {});
606
607        // Submit events with same partition key
608        for _ in 0..5 {
609            let event = Event::new("TestEvent");
610            pool.submit(event, "same_partition").await.unwrap();
611        }
612
613        tokio::time::sleep(Duration::from_millis(100)).await;
614
615        // Check that partition affinity was established
616        let affinity = pool.partition_affinity.read().await;
617        assert!(affinity.contains_key("same_partition"));
618    }
619
620    #[tokio::test]
621    async fn test_worker_pool_shutdown() {
622        let config = WorkerPoolConfig::default();
623        let mut pool = WorkerPool::new(config, |_| {});
624
625        let duration = pool.shutdown().await;
626        assert!(duration < Duration::from_secs(1));
627    }
628}