1use std::collections::HashMap;
21use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24
25use tokio::sync::{mpsc, Mutex, RwLock};
26use tracing::{debug, info, warn};
27
28use crate::event::Event;
29
30#[derive(Debug, Clone)]
32pub struct WorkerPoolConfig {
33 pub name: String,
35 pub workers: usize,
37 pub queue_size: usize,
39 pub backpressure: BackpressureStrategy,
41}
42
43impl Default for WorkerPoolConfig {
44 fn default() -> Self {
45 Self {
46 name: "default".to_string(),
47 workers: 4,
48 queue_size: 1000,
49 backpressure: BackpressureStrategy::Block,
50 }
51 }
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum BackpressureStrategy {
57 Block,
59 DropOldest,
61 DropNewest,
63 Error,
65}
66
67#[derive(Debug, Clone, thiserror::Error)]
71#[error("Pool '{}' queue full (depth: {})", pool_name, queue_depth)]
72pub struct PoolBackpressureError {
73 pub pool_name: String,
74 pub queue_depth: usize,
75}
76
77#[derive(Debug, Clone, Default)]
79pub struct WorkerPoolMetrics {
80 pub active_workers: usize,
82 pub idle_workers: usize,
84 pub queue_depth: usize,
86 pub events_processed: u64,
88 pub events_dropped: u64,
90 pub avg_latency_us: f64,
92 pub p99_latency_us: f64,
94}
95
96#[derive(Debug, Clone)]
98pub struct WorkerStatus {
99 pub id: usize,
101 pub state: WorkerState,
103 pub current_partition: Option<String>,
105 pub events_processed: u64,
107 pub last_active: Instant,
109}
110
111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
113pub enum WorkerState {
114 Idle,
116 Processing,
118 Draining,
120 Stopped,
122}
123
124#[derive(Debug)]
126struct WorkerData {
127 id: usize,
128 state: AtomicUsize, current_partition: RwLock<Option<String>>,
130 events_processed: AtomicU64,
131 last_active: RwLock<Instant>,
132}
133
134impl WorkerData {
135 fn new(id: usize) -> Self {
136 Self {
137 id,
138 state: AtomicUsize::new(WorkerState::Idle as usize),
139 current_partition: RwLock::new(None),
140 events_processed: AtomicU64::new(0),
141 last_active: RwLock::new(Instant::now()),
142 }
143 }
144
145 fn get_state(&self) -> WorkerState {
146 match self.state.load(Ordering::Relaxed) {
147 0 => WorkerState::Idle,
148 1 => WorkerState::Processing,
149 2 => WorkerState::Draining,
150 _ => WorkerState::Stopped,
151 }
152 }
153
154 fn set_state(&self, state: WorkerState) {
155 self.state.store(state as usize, Ordering::Relaxed);
156 }
157}
158
159struct PartitionedEvent {
161 event: Event,
162 partition_key: String,
163}
164
165#[derive(Debug)]
167pub struct WorkerPool {
168 config: WorkerPoolConfig,
169 dispatch_tx: mpsc::Sender<PartitionedEvent>,
171 workers: Vec<Arc<WorkerData>>,
173 #[allow(dead_code)]
175 pub(crate) partition_affinity: Arc<RwLock<HashMap<String, usize>>>,
176 events_processed: AtomicU64,
178 events_dropped: AtomicU64,
179 latencies: Arc<Mutex<Vec<u64>>>,
181 shutdown_tx: Option<mpsc::Sender<()>>,
183}
184
185impl WorkerPool {
186 pub fn new<F>(config: WorkerPoolConfig, processor: F) -> Self
188 where
189 F: Fn(Event) + Send + Sync + Clone + 'static,
190 {
191 let (dispatch_tx, dispatch_rx) = mpsc::channel(config.queue_size * config.workers);
192 let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
193
194 let mut workers = Vec::with_capacity(config.workers);
195 for i in 0..config.workers {
196 workers.push(Arc::new(WorkerData::new(i)));
197 }
198
199 let partition_affinity = Arc::new(RwLock::new(HashMap::new()));
200 let latencies = Arc::new(Mutex::new(Vec::with_capacity(1000)));
201
202 let workers_clone = workers.clone();
204 let partition_affinity_clone = partition_affinity.clone();
205 let latencies_clone = latencies.clone();
206 let processor_clone = processor;
207 let pool_name = config.name.clone();
208 let num_workers = config.workers;
209
210 tokio::spawn(Self::dispatcher_task(
211 dispatch_rx,
212 shutdown_rx,
213 workers_clone,
214 partition_affinity_clone,
215 latencies_clone,
216 processor_clone,
217 pool_name,
218 num_workers,
219 ));
220
221 Self {
222 config,
223 dispatch_tx,
224 workers,
225 partition_affinity,
226 events_processed: AtomicU64::new(0),
227 events_dropped: AtomicU64::new(0),
228 latencies,
229 shutdown_tx: Some(shutdown_tx),
230 }
231 }
232
233 #[allow(clippy::too_many_arguments)]
235 async fn dispatcher_task<F>(
236 mut rx: mpsc::Receiver<PartitionedEvent>,
237 mut shutdown_rx: mpsc::Receiver<()>,
238 workers: Vec<Arc<WorkerData>>,
239 partition_affinity: Arc<RwLock<HashMap<String, usize>>>,
240 latencies: Arc<Mutex<Vec<u64>>>,
241 processor: F,
242 pool_name: String,
243 num_workers: usize,
244 ) where
245 F: Fn(Event) + Send + Sync + Clone + 'static,
246 {
247 info!(
248 "Worker pool '{}' started with {} workers",
249 pool_name, num_workers
250 );
251
252 loop {
253 tokio::select! {
254 Some(partitioned_event) = rx.recv() => {
255 let worker_id = {
257 let affinity = partition_affinity.read().await;
258 if let Some(&id) = affinity.get(&partitioned_event.partition_key) {
259 id
260 } else {
261 let mut min_load = u64::MAX;
263 let mut best_worker = 0;
264 for (i, w) in workers.iter().enumerate() {
265 let load = w.events_processed.load(Ordering::Relaxed);
266 if load < min_load && w.get_state() != WorkerState::Stopped {
267 min_load = load;
268 best_worker = i;
269 }
270 }
271 drop(affinity);
273 partition_affinity.write().await.insert(
274 partitioned_event.partition_key.clone(),
275 best_worker,
276 );
277 best_worker
278 }
279 };
280
281 let worker = &workers[worker_id];
283 worker.set_state(WorkerState::Processing);
284 *worker.current_partition.write().await = Some(partitioned_event.partition_key);
285
286 let start = Instant::now();
287 processor(partitioned_event.event);
288 let elapsed_us = start.elapsed().as_micros() as u64;
289
290 worker.events_processed.fetch_add(1, Ordering::Relaxed);
292 *worker.last_active.write().await = Instant::now();
293 worker.set_state(WorkerState::Idle);
294 *worker.current_partition.write().await = None;
295
296 let mut lat = latencies.lock().await;
298 if lat.len() >= 1000 {
299 lat.remove(0);
300 }
301 lat.push(elapsed_us);
302 }
303 _ = shutdown_rx.recv() => {
304 info!("Worker pool '{}' shutting down", pool_name);
305 for w in &workers {
306 w.set_state(WorkerState::Stopped);
307 }
308 break;
309 }
310 }
311 }
312 }
313
314 pub async fn submit(
316 &self,
317 event: Event,
318 partition_key: &str,
319 ) -> Result<(), PoolBackpressureError> {
320 let partitioned = PartitionedEvent {
321 event,
322 partition_key: partition_key.to_string(),
323 };
324
325 match self.config.backpressure {
326 BackpressureStrategy::Block => {
327 if self.dispatch_tx.send(partitioned).await.is_err() {
328 warn!("Pool '{}' dispatch channel closed", self.config.name);
329 }
330 self.events_processed.fetch_add(1, Ordering::Relaxed);
331 Ok(())
332 }
333 BackpressureStrategy::DropNewest => {
334 match self.dispatch_tx.try_send(partitioned) {
335 Ok(()) => {
336 self.events_processed.fetch_add(1, Ordering::Relaxed);
337 Ok(())
338 }
339 Err(_) => {
340 self.events_dropped.fetch_add(1, Ordering::Relaxed);
341 debug!("Pool '{}' dropped event (queue full)", self.config.name);
342 Ok(()) }
344 }
345 }
346 BackpressureStrategy::DropOldest => {
347 match self.dispatch_tx.try_send(partitioned) {
350 Ok(()) => {
351 self.events_processed.fetch_add(1, Ordering::Relaxed);
352 }
353 Err(mpsc::error::TrySendError::Full(p)) => {
354 self.events_dropped.fetch_add(1, Ordering::Relaxed);
356 let _ = self.dispatch_tx.send(p).await;
357 self.events_processed.fetch_add(1, Ordering::Relaxed);
358 }
359 Err(_) => {}
360 }
361 Ok(())
362 }
363 BackpressureStrategy::Error => match self.dispatch_tx.try_send(partitioned) {
364 Ok(()) => {
365 self.events_processed.fetch_add(1, Ordering::Relaxed);
366 Ok(())
367 }
368 Err(_) => {
369 self.events_dropped.fetch_add(1, Ordering::Relaxed);
370 Err(PoolBackpressureError {
371 pool_name: self.config.name.clone(),
372 queue_depth: self.queue_depth(),
373 })
374 }
375 },
376 }
377 }
378
379 pub fn queue_depth(&self) -> usize {
381 self.config.queue_size * self.config.workers - self.dispatch_tx.capacity()
383 }
384
385 pub async fn metrics(&self) -> WorkerPoolMetrics {
387 let mut active = 0;
388 let mut idle = 0;
389
390 for w in &self.workers {
391 match w.get_state() {
392 WorkerState::Processing => active += 1,
393 WorkerState::Idle => idle += 1,
394 _ => {}
395 }
396 }
397
398 let latencies = self.latencies.lock().await;
399 let avg_latency = if latencies.is_empty() {
400 0.0
401 } else {
402 latencies.iter().sum::<u64>() as f64 / latencies.len() as f64
403 };
404
405 let p99_latency = if latencies.is_empty() {
406 0.0
407 } else {
408 let mut sorted = latencies.clone();
409 sorted.sort_unstable();
410 let idx = (sorted.len() as f64 * 0.99) as usize;
411 sorted.get(idx.min(sorted.len() - 1)).copied().unwrap_or(0) as f64
412 };
413
414 WorkerPoolMetrics {
415 active_workers: active,
416 idle_workers: idle,
417 queue_depth: self.queue_depth(),
418 events_processed: self.events_processed.load(Ordering::Relaxed),
419 events_dropped: self.events_dropped.load(Ordering::Relaxed),
420 avg_latency_us: avg_latency,
421 p99_latency_us: p99_latency,
422 }
423 }
424
425 pub async fn worker_statuses(&self) -> Vec<WorkerStatus> {
427 let mut statuses = Vec::with_capacity(self.workers.len());
428 for w in &self.workers {
429 statuses.push(WorkerStatus {
430 id: w.id,
431 state: w.get_state(),
432 current_partition: w.current_partition.read().await.clone(),
433 events_processed: w.events_processed.load(Ordering::Relaxed),
434 last_active: *w.last_active.read().await,
435 });
436 }
437 statuses
438 }
439
440 pub const fn config(&self) -> &WorkerPoolConfig {
442 &self.config
443 }
444
445 pub async fn shutdown(&mut self) -> Duration {
447 let start = Instant::now();
448 if let Some(tx) = self.shutdown_tx.take() {
449 let _ = tx.send(()).await;
450 }
451 tokio::time::sleep(Duration::from_millis(100)).await;
453 start.elapsed()
454 }
455}
456
457impl Drop for WorkerPool {
458 fn drop(&mut self) {
459 if let Some(tx) = self.shutdown_tx.take() {
461 let _ = tx.try_send(());
462 }
463 }
464}
465
466#[cfg(test)]
471mod tests {
472 use std::sync::atomic::AtomicUsize;
473
474 use super::*;
475
476 #[tokio::test]
477 async fn test_worker_pool_creation() {
478 let counter = Arc::new(AtomicUsize::new(0));
479 let counter_clone = counter;
480
481 let config = WorkerPoolConfig {
482 name: "test".to_string(),
483 workers: 2,
484 queue_size: 10,
485 backpressure: BackpressureStrategy::Block,
486 };
487
488 let pool = WorkerPool::new(config, move |_event| {
489 counter_clone.fetch_add(1, Ordering::Relaxed);
490 });
491
492 assert_eq!(pool.config().workers, 2);
493 assert_eq!(pool.config().name, "test");
494 }
495
496 #[tokio::test]
497 async fn test_worker_pool_submit() {
498 let counter = Arc::new(AtomicUsize::new(0));
499 let counter_clone = counter.clone();
500
501 let config = WorkerPoolConfig {
502 name: "test".to_string(),
503 workers: 2,
504 queue_size: 100,
505 backpressure: BackpressureStrategy::Block,
506 };
507
508 let pool = WorkerPool::new(config, move |_event| {
509 counter_clone.fetch_add(1, Ordering::Relaxed);
510 });
511
512 for i in 0..10 {
514 let event = Event::new("TestEvent").with_field("id", i as i64);
515 pool.submit(event, &format!("partition_{}", i % 3))
516 .await
517 .unwrap();
518 }
519
520 tokio::time::sleep(Duration::from_millis(100)).await;
522
523 assert_eq!(counter.load(Ordering::Relaxed), 10);
524 }
525
526 #[tokio::test]
527 async fn test_worker_pool_metrics() {
528 let config = WorkerPoolConfig {
529 name: "metrics_test".to_string(),
530 workers: 4,
531 queue_size: 100,
532 backpressure: BackpressureStrategy::Block,
533 };
534
535 let pool = WorkerPool::new(config, |_| {});
536
537 let metrics = pool.metrics().await;
538 assert_eq!(metrics.idle_workers + metrics.active_workers, 4);
539 assert_eq!(metrics.events_processed, 0);
540 }
541
542 #[tokio::test]
543 async fn test_worker_pool_backpressure_drop_newest() {
544 let config = WorkerPoolConfig {
545 name: "backpressure_test".to_string(),
546 workers: 1,
547 queue_size: 2,
548 backpressure: BackpressureStrategy::DropNewest,
549 };
550
551 let pool = WorkerPool::new(config, |_| {
552 std::thread::sleep(Duration::from_millis(50));
553 });
554
555 for i in 0..100 {
557 let event = Event::new("TestEvent").with_field("id", i as i64);
558 let _ = pool.submit(event, "partition").await;
559 }
560
561 let metrics = pool.metrics().await;
562 assert!(metrics.events_dropped > 0 || metrics.events_processed > 0);
564 }
565
566 #[tokio::test]
567 async fn test_worker_pool_backpressure_error() {
568 let config = WorkerPoolConfig {
569 name: "error_test".to_string(),
570 workers: 1,
571 queue_size: 1,
572 backpressure: BackpressureStrategy::Error,
573 };
574
575 let pool = WorkerPool::new(config, |_| {
576 std::thread::sleep(Duration::from_millis(100));
577 });
578
579 let event1 = Event::new("TestEvent").with_field("id", 1i64);
581 assert!(pool.submit(event1, "partition").await.is_ok());
582
583 let mut errors = 0;
585 for i in 2..10 {
586 let event = Event::new("TestEvent").with_field("id", i as i64);
587 if pool.submit(event, "partition").await.is_err() {
588 errors += 1;
589 }
590 }
591
592 assert!(errors > 0);
594 }
595
596 #[tokio::test]
597 async fn test_worker_pool_partition_affinity() {
598 let config = WorkerPoolConfig {
599 name: "affinity_test".to_string(),
600 workers: 4,
601 queue_size: 100,
602 backpressure: BackpressureStrategy::Block,
603 };
604
605 let pool = WorkerPool::new(config, |_| {});
606
607 for _ in 0..5 {
609 let event = Event::new("TestEvent");
610 pool.submit(event, "same_partition").await.unwrap();
611 }
612
613 tokio::time::sleep(Duration::from_millis(100)).await;
614
615 let affinity = pool.partition_affinity.read().await;
617 assert!(affinity.contains_key("same_partition"));
618 }
619
620 #[tokio::test]
621 async fn test_worker_pool_shutdown() {
622 let config = WorkerPoolConfig::default();
623 let mut pool = WorkerPool::new(config, |_| {});
624
625 let duration = pool.shutdown().await;
626 assert!(duration < Duration::from_secs(1));
627 }
628}