#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct WorkerConfig {
pub worker_count: usize,
pub max_pending: usize,
pub batch_size: usize,
}
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct WorkerTask {
pub task_id: u64,
pub priority: u8,
pub cancelled: bool,
}
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct WorkerPoolStats {
pub pending: usize,
pub completed: usize,
pub cancelled: usize,
pub workers: usize,
}
#[allow(dead_code)]
pub struct WorkerPool {
pub config: WorkerConfig,
pending: Vec<WorkerTask>,
completed: usize,
cancelled: usize,
}
#[allow(dead_code)]
pub fn default_worker_config() -> WorkerConfig {
WorkerConfig {
worker_count: 4,
max_pending: 0,
batch_size: 0,
}
}
#[allow(dead_code)]
pub fn new_worker_pool(cfg: &WorkerConfig) -> WorkerPool {
WorkerPool {
config: cfg.clone(),
pending: Vec::new(),
completed: 0,
cancelled: 0,
}
}
#[allow(dead_code)]
pub fn submit_task(pool: &mut WorkerPool, task_id: u64, priority: u8) {
if pool.config.max_pending > 0 && pool.pending.len() >= pool.config.max_pending {
return;
}
let task = WorkerTask { task_id, priority, cancelled: false };
let pos = pool.pending.partition_point(|t| t.priority > priority);
pool.pending.insert(pos, task);
}
#[allow(dead_code)]
pub fn process_pending_tasks(pool: &mut WorkerPool) -> usize {
let limit = if pool.config.batch_size == 0 {
pool.pending.len()
} else {
pool.config.batch_size.min(pool.pending.len())
};
let mut processed = 0;
for task in pool.pending.drain(..limit) {
if task.cancelled {
pool.cancelled += 1;
} else {
pool.completed += 1;
}
processed += 1;
}
processed
}
#[allow(dead_code)]
pub fn worker_pool_stats(pool: &WorkerPool) -> WorkerPoolStats {
WorkerPoolStats {
pending: pool.pending.len(),
completed: pool.completed,
cancelled: pool.cancelled,
workers: pool.config.worker_count,
}
}
#[allow(dead_code)]
pub fn pool_pending_count(pool: &WorkerPool) -> usize {
pool.pending.len()
}
#[allow(dead_code)]
pub fn pool_completed_count(pool: &WorkerPool) -> usize {
pool.completed
}
#[allow(dead_code)]
pub fn cancel_task(pool: &mut WorkerPool, task_id: u64) -> bool {
for task in &mut pool.pending {
if task.task_id == task_id && !task.cancelled {
task.cancelled = true;
return true;
}
}
false
}
#[allow(dead_code)]
pub fn reset_pool(pool: &mut WorkerPool) {
pool.pending.clear();
pool.completed = 0;
pool.cancelled = 0;
}
#[allow(dead_code)]
pub fn pool_worker_count(pool: &WorkerPool) -> usize {
pool.config.worker_count
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new_pool_empty() {
let cfg = default_worker_config();
let pool = new_worker_pool(&cfg);
assert_eq!(pool_pending_count(&pool), 0);
assert_eq!(pool_completed_count(&pool), 0);
}
#[test]
fn test_submit_and_process() {
let cfg = default_worker_config();
let mut pool = new_worker_pool(&cfg);
submit_task(&mut pool, 1, 5);
submit_task(&mut pool, 2, 10);
assert_eq!(pool_pending_count(&pool), 2);
let done = process_pending_tasks(&mut pool);
assert_eq!(done, 2);
assert_eq!(pool_completed_count(&pool), 2);
assert_eq!(pool_pending_count(&pool), 0);
}
#[test]
fn test_priority_ordering() {
let cfg = default_worker_config();
let mut pool = new_worker_pool(&cfg);
submit_task(&mut pool, 1, 1);
submit_task(&mut pool, 2, 255);
submit_task(&mut pool, 3, 128);
assert_eq!(pool.pending[0].priority, 255);
}
#[test]
fn test_cancel_task() {
let cfg = default_worker_config();
let mut pool = new_worker_pool(&cfg);
submit_task(&mut pool, 42, 5);
let cancelled = cancel_task(&mut pool, 42);
assert!(cancelled);
process_pending_tasks(&mut pool);
let stats = worker_pool_stats(&pool);
assert_eq!(stats.cancelled, 1);
assert_eq!(stats.completed, 0);
}
#[test]
fn test_cancel_nonexistent_task() {
let cfg = default_worker_config();
let mut pool = new_worker_pool(&cfg);
assert!(!cancel_task(&mut pool, 999));
}
#[test]
fn test_reset_pool() {
let cfg = default_worker_config();
let mut pool = new_worker_pool(&cfg);
submit_task(&mut pool, 1, 1);
process_pending_tasks(&mut pool);
reset_pool(&mut pool);
assert_eq!(pool_pending_count(&pool), 0);
assert_eq!(pool_completed_count(&pool), 0);
}
#[test]
fn test_batch_size_limits_processing() {
let cfg = WorkerConfig { worker_count: 2, max_pending: 0, batch_size: 2 };
let mut pool = new_worker_pool(&cfg);
for i in 0..5 {
submit_task(&mut pool, i, 1);
}
let done = process_pending_tasks(&mut pool);
assert_eq!(done, 2);
assert_eq!(pool_pending_count(&pool), 3);
}
#[test]
fn test_pool_worker_count() {
let cfg = WorkerConfig { worker_count: 8, max_pending: 0, batch_size: 0 };
let pool = new_worker_pool(&cfg);
assert_eq!(pool_worker_count(&pool), 8);
}
}