1use std::collections::HashMap;
26use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
27use std::sync::Arc;
28use std::time::{Duration, Instant};
29
30use tokio::sync::{mpsc, Mutex, RwLock};
31use tracing::{debug, info, warn};
32
33use crate::event::Event;
34
35#[derive(Debug, Clone)]
37pub struct WorkerPoolConfig {
38 pub name: String,
40 pub workers: usize,
42 pub queue_size: usize,
44 pub backpressure: BackpressureStrategy,
46}
47
48impl Default for WorkerPoolConfig {
49 fn default() -> Self {
50 Self {
51 name: "default".to_string(),
52 workers: 4,
53 queue_size: 1000,
54 backpressure: BackpressureStrategy::Block,
55 }
56 }
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum BackpressureStrategy {
62 Block,
64 DropOldest,
66 DropNewest,
68 Error,
70}
71
72#[derive(Debug, Clone, thiserror::Error)]
76#[error("Pool '{}' queue full (depth: {})", pool_name, queue_depth)]
77pub struct PoolBackpressureError {
78 pub pool_name: String,
79 pub queue_depth: usize,
80}
81
82#[derive(Debug, Clone, Default)]
84pub struct WorkerPoolMetrics {
85 pub active_workers: usize,
87 pub idle_workers: usize,
89 pub queue_depth: usize,
91 pub events_processed: u64,
93 pub events_dropped: u64,
95 pub avg_latency_us: f64,
97 pub p99_latency_us: f64,
99}
100
101#[derive(Debug, Clone)]
103pub struct WorkerStatus {
104 pub id: usize,
106 pub state: WorkerState,
108 pub current_partition: Option<String>,
110 pub events_processed: u64,
112 pub last_active: Instant,
114}
115
116#[derive(Debug, Clone, Copy, PartialEq, Eq)]
118pub enum WorkerState {
119 Idle,
121 Processing,
123 Draining,
125 Stopped,
127}
128
129#[derive(Debug)]
131struct WorkerData {
132 id: usize,
133 state: AtomicUsize, current_partition: RwLock<Option<String>>,
135 events_processed: AtomicU64,
136 last_active: RwLock<Instant>,
137}
138
139impl WorkerData {
140 fn new(id: usize) -> Self {
141 Self {
142 id,
143 state: AtomicUsize::new(WorkerState::Idle as usize),
144 current_partition: RwLock::new(None),
145 events_processed: AtomicU64::new(0),
146 last_active: RwLock::new(Instant::now()),
147 }
148 }
149
150 fn get_state(&self) -> WorkerState {
151 match self.state.load(Ordering::Relaxed) {
152 0 => WorkerState::Idle,
153 1 => WorkerState::Processing,
154 2 => WorkerState::Draining,
155 _ => WorkerState::Stopped,
156 }
157 }
158
159 fn set_state(&self, state: WorkerState) {
160 self.state.store(state as usize, Ordering::Relaxed);
161 }
162}
163
164struct PartitionedEvent {
166 event: Event,
167 partition_key: String,
168}
169
170#[derive(Debug)]
172pub struct WorkerPool {
173 config: WorkerPoolConfig,
174 dispatch_tx: mpsc::Sender<PartitionedEvent>,
176 workers: Vec<Arc<WorkerData>>,
178 #[allow(dead_code)]
180 pub(crate) partition_affinity: Arc<RwLock<HashMap<String, usize>>>,
181 events_processed: AtomicU64,
183 events_dropped: AtomicU64,
184 latencies: Arc<Mutex<Vec<u64>>>,
186 shutdown_tx: Option<mpsc::Sender<()>>,
188}
189
190impl WorkerPool {
191 pub fn new<F>(config: WorkerPoolConfig, processor: F) -> Self
193 where
194 F: Fn(Event) + Send + Sync + Clone + 'static,
195 {
196 let (dispatch_tx, dispatch_rx) = mpsc::channel(config.queue_size * config.workers);
197 let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
198
199 let mut workers = Vec::with_capacity(config.workers);
200 for i in 0..config.workers {
201 workers.push(Arc::new(WorkerData::new(i)));
202 }
203
204 let partition_affinity = Arc::new(RwLock::new(HashMap::new()));
205 let latencies = Arc::new(Mutex::new(Vec::with_capacity(1000)));
206
207 let workers_clone = workers.clone();
209 let partition_affinity_clone = partition_affinity.clone();
210 let latencies_clone = latencies.clone();
211 let processor_clone = processor;
212 let pool_name = config.name.clone();
213 let num_workers = config.workers;
214
215 tokio::spawn(Self::dispatcher_task(
216 dispatch_rx,
217 shutdown_rx,
218 workers_clone,
219 partition_affinity_clone,
220 latencies_clone,
221 processor_clone,
222 pool_name,
223 num_workers,
224 ));
225
226 Self {
227 config,
228 dispatch_tx,
229 workers,
230 partition_affinity,
231 events_processed: AtomicU64::new(0),
232 events_dropped: AtomicU64::new(0),
233 latencies,
234 shutdown_tx: Some(shutdown_tx),
235 }
236 }
237
238 #[allow(clippy::too_many_arguments)]
240 async fn dispatcher_task<F>(
241 mut rx: mpsc::Receiver<PartitionedEvent>,
242 mut shutdown_rx: mpsc::Receiver<()>,
243 workers: Vec<Arc<WorkerData>>,
244 partition_affinity: Arc<RwLock<HashMap<String, usize>>>,
245 latencies: Arc<Mutex<Vec<u64>>>,
246 processor: F,
247 pool_name: String,
248 num_workers: usize,
249 ) where
250 F: Fn(Event) + Send + Sync + Clone + 'static,
251 {
252 info!(
253 "Worker pool '{}' started with {} workers",
254 pool_name, num_workers
255 );
256
257 loop {
258 tokio::select! {
259 Some(partitioned_event) = rx.recv() => {
260 let worker_id = {
262 let affinity = partition_affinity.read().await;
263 if let Some(&id) = affinity.get(&partitioned_event.partition_key) {
264 id
265 } else {
266 let mut min_load = u64::MAX;
268 let mut best_worker = 0;
269 for (i, w) in workers.iter().enumerate() {
270 let load = w.events_processed.load(Ordering::Relaxed);
271 if load < min_load && w.get_state() != WorkerState::Stopped {
272 min_load = load;
273 best_worker = i;
274 }
275 }
276 drop(affinity);
278 partition_affinity.write().await.insert(
279 partitioned_event.partition_key.clone(),
280 best_worker,
281 );
282 best_worker
283 }
284 };
285
286 let worker = &workers[worker_id];
288 worker.set_state(WorkerState::Processing);
289 *worker.current_partition.write().await = Some(partitioned_event.partition_key);
290
291 let start = Instant::now();
292 processor(partitioned_event.event);
293 let elapsed_us = start.elapsed().as_micros() as u64;
294
295 worker.events_processed.fetch_add(1, Ordering::Relaxed);
297 *worker.last_active.write().await = Instant::now();
298 worker.set_state(WorkerState::Idle);
299 *worker.current_partition.write().await = None;
300
301 let mut lat = latencies.lock().await;
303 if lat.len() >= 1000 {
304 lat.remove(0);
305 }
306 lat.push(elapsed_us);
307 }
308 _ = shutdown_rx.recv() => {
309 info!("Worker pool '{}' shutting down", pool_name);
310 for w in &workers {
311 w.set_state(WorkerState::Stopped);
312 }
313 break;
314 }
315 }
316 }
317 }
318
319 pub async fn submit(
321 &self,
322 event: Event,
323 partition_key: &str,
324 ) -> Result<(), PoolBackpressureError> {
325 let partitioned = PartitionedEvent {
326 event,
327 partition_key: partition_key.to_string(),
328 };
329
330 match self.config.backpressure {
331 BackpressureStrategy::Block => {
332 if self.dispatch_tx.send(partitioned).await.is_err() {
333 warn!("Pool '{}' dispatch channel closed", self.config.name);
334 }
335 self.events_processed.fetch_add(1, Ordering::Relaxed);
336 Ok(())
337 }
338 BackpressureStrategy::DropNewest => {
339 match self.dispatch_tx.try_send(partitioned) {
340 Ok(()) => {
341 self.events_processed.fetch_add(1, Ordering::Relaxed);
342 Ok(())
343 }
344 Err(_) => {
345 self.events_dropped.fetch_add(1, Ordering::Relaxed);
346 debug!("Pool '{}' dropped event (queue full)", self.config.name);
347 Ok(()) }
349 }
350 }
351 BackpressureStrategy::DropOldest => {
352 match self.dispatch_tx.try_send(partitioned) {
355 Ok(()) => {
356 self.events_processed.fetch_add(1, Ordering::Relaxed);
357 }
358 Err(mpsc::error::TrySendError::Full(p)) => {
359 self.events_dropped.fetch_add(1, Ordering::Relaxed);
361 let _ = self.dispatch_tx.send(p).await;
362 self.events_processed.fetch_add(1, Ordering::Relaxed);
363 }
364 Err(_) => {}
365 }
366 Ok(())
367 }
368 BackpressureStrategy::Error => match self.dispatch_tx.try_send(partitioned) {
369 Ok(()) => {
370 self.events_processed.fetch_add(1, Ordering::Relaxed);
371 Ok(())
372 }
373 Err(_) => {
374 self.events_dropped.fetch_add(1, Ordering::Relaxed);
375 Err(PoolBackpressureError {
376 pool_name: self.config.name.clone(),
377 queue_depth: self.queue_depth(),
378 })
379 }
380 },
381 }
382 }
383
384 pub fn queue_depth(&self) -> usize {
386 self.config.queue_size * self.config.workers - self.dispatch_tx.capacity()
388 }
389
390 pub async fn metrics(&self) -> WorkerPoolMetrics {
392 let mut active = 0;
393 let mut idle = 0;
394
395 for w in &self.workers {
396 match w.get_state() {
397 WorkerState::Processing => active += 1,
398 WorkerState::Idle => idle += 1,
399 _ => {}
400 }
401 }
402
403 let latencies = self.latencies.lock().await;
404 let avg_latency = if latencies.is_empty() {
405 0.0
406 } else {
407 latencies.iter().sum::<u64>() as f64 / latencies.len() as f64
408 };
409
410 let p99_latency = if latencies.is_empty() {
411 0.0
412 } else {
413 let mut sorted = latencies.clone();
414 sorted.sort_unstable();
415 let idx = (sorted.len() as f64 * 0.99) as usize;
416 sorted.get(idx.min(sorted.len() - 1)).copied().unwrap_or(0) as f64
417 };
418
419 WorkerPoolMetrics {
420 active_workers: active,
421 idle_workers: idle,
422 queue_depth: self.queue_depth(),
423 events_processed: self.events_processed.load(Ordering::Relaxed),
424 events_dropped: self.events_dropped.load(Ordering::Relaxed),
425 avg_latency_us: avg_latency,
426 p99_latency_us: p99_latency,
427 }
428 }
429
430 pub async fn worker_statuses(&self) -> Vec<WorkerStatus> {
432 let mut statuses = Vec::with_capacity(self.workers.len());
433 for w in &self.workers {
434 statuses.push(WorkerStatus {
435 id: w.id,
436 state: w.get_state(),
437 current_partition: w.current_partition.read().await.clone(),
438 events_processed: w.events_processed.load(Ordering::Relaxed),
439 last_active: *w.last_active.read().await,
440 });
441 }
442 statuses
443 }
444
445 pub const fn config(&self) -> &WorkerPoolConfig {
447 &self.config
448 }
449
450 pub async fn shutdown(&mut self) -> Duration {
452 let start = Instant::now();
453 if let Some(tx) = self.shutdown_tx.take() {
454 let _ = tx.send(()).await;
455 }
456 tokio::time::sleep(Duration::from_millis(100)).await;
458 start.elapsed()
459 }
460}
461
462impl Drop for WorkerPool {
463 fn drop(&mut self) {
464 if let Some(tx) = self.shutdown_tx.take() {
466 let _ = tx.try_send(());
467 }
468 }
469}
470
471#[cfg(test)]
476mod tests {
477 use std::sync::atomic::AtomicUsize;
478
479 use super::*;
480
481 #[tokio::test]
482 async fn test_worker_pool_creation() {
483 let counter = Arc::new(AtomicUsize::new(0));
484 let counter_clone = counter;
485
486 let config = WorkerPoolConfig {
487 name: "test".to_string(),
488 workers: 2,
489 queue_size: 10,
490 backpressure: BackpressureStrategy::Block,
491 };
492
493 let pool = WorkerPool::new(config, move |_event| {
494 counter_clone.fetch_add(1, Ordering::Relaxed);
495 });
496
497 assert_eq!(pool.config().workers, 2);
498 assert_eq!(pool.config().name, "test");
499 }
500
501 #[tokio::test]
502 async fn test_worker_pool_submit() {
503 let counter = Arc::new(AtomicUsize::new(0));
504 let counter_clone = counter.clone();
505
506 let config = WorkerPoolConfig {
507 name: "test".to_string(),
508 workers: 2,
509 queue_size: 100,
510 backpressure: BackpressureStrategy::Block,
511 };
512
513 let pool = WorkerPool::new(config, move |_event| {
514 counter_clone.fetch_add(1, Ordering::Relaxed);
515 });
516
517 for i in 0..10 {
519 let event = Event::new("TestEvent").with_field("id", i as i64);
520 pool.submit(event, &format!("partition_{}", i % 3))
521 .await
522 .unwrap();
523 }
524
525 tokio::time::sleep(Duration::from_millis(100)).await;
527
528 assert_eq!(counter.load(Ordering::Relaxed), 10);
529 }
530
531 #[tokio::test]
532 async fn test_worker_pool_metrics() {
533 let config = WorkerPoolConfig {
534 name: "metrics_test".to_string(),
535 workers: 4,
536 queue_size: 100,
537 backpressure: BackpressureStrategy::Block,
538 };
539
540 let pool = WorkerPool::new(config, |_| {});
541
542 let metrics = pool.metrics().await;
543 assert_eq!(metrics.idle_workers + metrics.active_workers, 4);
544 assert_eq!(metrics.events_processed, 0);
545 }
546
547 #[tokio::test]
548 async fn test_worker_pool_backpressure_drop_newest() {
549 let config = WorkerPoolConfig {
550 name: "backpressure_test".to_string(),
551 workers: 1,
552 queue_size: 2,
553 backpressure: BackpressureStrategy::DropNewest,
554 };
555
556 let pool = WorkerPool::new(config, |_| {
557 std::thread::sleep(Duration::from_millis(50));
558 });
559
560 for i in 0..100 {
562 let event = Event::new("TestEvent").with_field("id", i as i64);
563 let _ = pool.submit(event, "partition").await;
564 }
565
566 let metrics = pool.metrics().await;
567 assert!(metrics.events_dropped > 0 || metrics.events_processed > 0);
569 }
570
571 #[tokio::test]
572 async fn test_worker_pool_backpressure_error() {
573 let config = WorkerPoolConfig {
574 name: "error_test".to_string(),
575 workers: 1,
576 queue_size: 1,
577 backpressure: BackpressureStrategy::Error,
578 };
579
580 let pool = WorkerPool::new(config, |_| {
581 std::thread::sleep(Duration::from_millis(100));
582 });
583
584 let event1 = Event::new("TestEvent").with_field("id", 1i64);
586 assert!(pool.submit(event1, "partition").await.is_ok());
587
588 let mut errors = 0;
590 for i in 2..10 {
591 let event = Event::new("TestEvent").with_field("id", i as i64);
592 if pool.submit(event, "partition").await.is_err() {
593 errors += 1;
594 }
595 }
596
597 assert!(errors > 0);
599 }
600
601 #[tokio::test]
602 async fn test_worker_pool_partition_affinity() {
603 let config = WorkerPoolConfig {
604 name: "affinity_test".to_string(),
605 workers: 4,
606 queue_size: 100,
607 backpressure: BackpressureStrategy::Block,
608 };
609
610 let pool = WorkerPool::new(config, |_| {});
611
612 for _ in 0..5 {
614 let event = Event::new("TestEvent");
615 pool.submit(event, "same_partition").await.unwrap();
616 }
617
618 tokio::time::sleep(Duration::from_millis(100)).await;
619
620 let affinity = pool.partition_affinity.read().await;
622 assert!(affinity.contains_key("same_partition"));
623 }
624
625 #[tokio::test]
626 async fn test_worker_pool_shutdown() {
627 let config = WorkerPoolConfig::default();
628 let mut pool = WorkerPool::new(config, |_| {});
629
630 let duration = pool.shutdown().await;
631 assert!(duration < Duration::from_secs(1));
632 }
633}