subx_cli/core/parallel/
worker.rs

1//! Worker pool and worker definitions for parallel processing
2use super::task::{Task, TaskResult};
3use std::collections::HashMap;
4use std::sync::{Arc, Mutex};
5use tokio::task::JoinHandle;
6use uuid::Uuid;
7
8/// Pool managing active workers
9pub struct WorkerPool {
10    workers: Arc<Mutex<HashMap<Uuid, WorkerInfo>>>,
11    max_workers: usize,
12}
13
14#[derive(Debug)]
15struct WorkerInfo {
16    handle: JoinHandle<TaskResult>,
17    task_id: String,
18    start_time: std::time::Instant,
19    worker_type: WorkerType,
20}
21
22#[derive(Debug, Clone)]
23pub enum WorkerType {
24    CpuIntensive,
25    IoIntensive,
26    Mixed,
27}
28
29impl WorkerPool {
30    pub fn new(max_workers: usize) -> Self {
31        Self {
32            workers: Arc::new(Mutex::new(HashMap::new())),
33            max_workers,
34        }
35    }
36
37    /// Execute a task by spawning a worker
38    pub async fn execute(&self, task: Box<dyn Task + Send + Sync>) -> Result<TaskResult, String> {
39        let worker_id = Uuid::new_v4();
40        let task_id = task.task_id();
41        let worker_type = self.determine_worker_type(task.task_type());
42
43        {
44            let workers = self.workers.lock().unwrap();
45            if workers.len() >= self.max_workers {
46                return Err("工作者池已滿".to_string());
47            }
48        }
49
50        let handle = tokio::spawn(async move { task.execute().await });
51
52        {
53            let mut workers = self.workers.lock().unwrap();
54            workers.insert(
55                worker_id,
56                WorkerInfo {
57                    handle,
58                    task_id: task_id.clone(),
59                    start_time: std::time::Instant::now(),
60                    worker_type,
61                },
62            );
63        }
64
65        // For simplicity, return immediately indicating submission
66        Ok(TaskResult::Success("任務已提交".to_string()))
67    }
68
69    fn determine_worker_type(&self, task_type: &str) -> WorkerType {
70        match task_type {
71            "convert" => WorkerType::CpuIntensive,
72            "sync" => WorkerType::Mixed,
73            "match" => WorkerType::IoIntensive,
74            "validate" => WorkerType::IoIntensive,
75            _ => WorkerType::Mixed,
76        }
77    }
78
79    /// Number of active workers
80    pub fn get_active_count(&self) -> usize {
81        self.workers.lock().unwrap().len()
82    }
83
84    /// Maximum capacity of worker pool
85    pub fn get_capacity(&self) -> usize {
86        self.max_workers
87    }
88
89    /// Statistics about current workers
90    pub fn get_worker_stats(&self) -> WorkerStats {
91        let workers = self.workers.lock().unwrap();
92        let mut cpu = 0;
93        let mut io = 0;
94        let mut mixed = 0;
95        for w in workers.values() {
96            match w.worker_type {
97                WorkerType::CpuIntensive => cpu += 1,
98                WorkerType::IoIntensive => io += 1,
99                WorkerType::Mixed => mixed += 1,
100            }
101        }
102        WorkerStats {
103            total_active: workers.len(),
104            cpu_intensive_count: cpu,
105            io_intensive_count: io,
106            mixed_count: mixed,
107            max_capacity: self.max_workers,
108        }
109    }
110
111    /// Shutdown and wait for all workers
112    pub async fn shutdown(&self) {
113        let workers = { std::mem::take(&mut *self.workers.lock().unwrap()) };
114        for (id, info) in workers {
115            println!("等待工作者 {} 完成任務 {}", id, info.task_id);
116            let _ = info.handle.await;
117        }
118    }
119
120    /// List active worker infos
121    pub fn list_active_workers(&self) -> Vec<ActiveWorkerInfo> {
122        let workers = self.workers.lock().unwrap();
123        workers
124            .iter()
125            .map(|(id, info)| ActiveWorkerInfo {
126                worker_id: *id,
127                task_id: info.task_id.clone(),
128                worker_type: info.worker_type.clone(),
129                runtime: info.start_time.elapsed(),
130            })
131            .collect()
132    }
133}
134
135impl Clone for WorkerPool {
136    fn clone(&self) -> Self {
137        Self {
138            workers: Arc::clone(&self.workers),
139            max_workers: self.max_workers,
140        }
141    }
142}
143
144#[derive(Debug, Clone)]
145pub struct WorkerStats {
146    pub total_active: usize,
147    pub cpu_intensive_count: usize,
148    pub io_intensive_count: usize,
149    pub mixed_count: usize,
150    pub max_capacity: usize,
151}
152
153#[derive(Debug, Clone)]
154pub struct ActiveWorkerInfo {
155    pub worker_id: Uuid,
156    pub task_id: String,
157    pub worker_type: WorkerType,
158    pub runtime: std::time::Duration,
159}
160
161/// Represents an individual worker for monitoring
162pub struct Worker {
163    id: Uuid,
164    status: WorkerStatus,
165}
166
167#[derive(Debug, Clone)]
168pub enum WorkerStatus {
169    Idle,
170    Busy(String),
171    Stopped,
172    Error(String),
173}
174
175impl Worker {
176    pub fn new() -> Self {
177        Self {
178            id: Uuid::new_v4(),
179            status: WorkerStatus::Idle,
180        }
181    }
182
183    pub fn id(&self) -> Uuid {
184        self.id
185    }
186
187    pub fn status(&self) -> &WorkerStatus {
188        &self.status
189    }
190
191    pub fn set_status(&mut self, status: WorkerStatus) {
192        self.status = status;
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199
200    #[tokio::test]
201    async fn test_worker_pool_capacity() {
202        let pool = WorkerPool::new(2);
203        assert_eq!(pool.get_capacity(), 2);
204        assert_eq!(pool.get_active_count(), 0);
205        let stats = pool.get_worker_stats();
206        assert_eq!(stats.max_capacity, 2);
207        assert_eq!(stats.total_active, 0);
208    }
209}