subx_cli/core/parallel/
worker.rs1use super::task::{Task, TaskResult};
3use std::collections::HashMap;
4use std::sync::{Arc, Mutex};
5use tokio::task::JoinHandle;
6use uuid::Uuid;
7
8pub struct WorkerPool {
10 workers: Arc<Mutex<HashMap<Uuid, WorkerInfo>>>,
11 max_workers: usize,
12}
13
14#[derive(Debug)]
15struct WorkerInfo {
16 handle: JoinHandle<TaskResult>,
17 task_id: String,
18 start_time: std::time::Instant,
19 worker_type: WorkerType,
20}
21
22#[derive(Debug, Clone)]
23pub enum WorkerType {
24 CpuIntensive,
25 IoIntensive,
26 Mixed,
27}
28
29impl WorkerPool {
30 pub fn new(max_workers: usize) -> Self {
31 Self {
32 workers: Arc::new(Mutex::new(HashMap::new())),
33 max_workers,
34 }
35 }
36
37 pub async fn execute(&self, task: Box<dyn Task + Send + Sync>) -> Result<TaskResult, String> {
39 let worker_id = Uuid::new_v4();
40 let task_id = task.task_id();
41 let worker_type = self.determine_worker_type(task.task_type());
42
43 {
44 let workers = self.workers.lock().unwrap();
45 if workers.len() >= self.max_workers {
46 return Err("工作者池已滿".to_string());
47 }
48 }
49
50 let handle = tokio::spawn(async move { task.execute().await });
51
52 {
53 let mut workers = self.workers.lock().unwrap();
54 workers.insert(
55 worker_id,
56 WorkerInfo {
57 handle,
58 task_id: task_id.clone(),
59 start_time: std::time::Instant::now(),
60 worker_type,
61 },
62 );
63 }
64
65 Ok(TaskResult::Success("任務已提交".to_string()))
67 }
68
69 fn determine_worker_type(&self, task_type: &str) -> WorkerType {
70 match task_type {
71 "convert" => WorkerType::CpuIntensive,
72 "sync" => WorkerType::Mixed,
73 "match" => WorkerType::IoIntensive,
74 "validate" => WorkerType::IoIntensive,
75 _ => WorkerType::Mixed,
76 }
77 }
78
79 pub fn get_active_count(&self) -> usize {
81 self.workers.lock().unwrap().len()
82 }
83
84 pub fn get_capacity(&self) -> usize {
86 self.max_workers
87 }
88
89 pub fn get_worker_stats(&self) -> WorkerStats {
91 let workers = self.workers.lock().unwrap();
92 let mut cpu = 0;
93 let mut io = 0;
94 let mut mixed = 0;
95 for w in workers.values() {
96 match w.worker_type {
97 WorkerType::CpuIntensive => cpu += 1,
98 WorkerType::IoIntensive => io += 1,
99 WorkerType::Mixed => mixed += 1,
100 }
101 }
102 WorkerStats {
103 total_active: workers.len(),
104 cpu_intensive_count: cpu,
105 io_intensive_count: io,
106 mixed_count: mixed,
107 max_capacity: self.max_workers,
108 }
109 }
110
111 pub async fn shutdown(&self) {
113 let workers = { std::mem::take(&mut *self.workers.lock().unwrap()) };
114 for (id, info) in workers {
115 println!("等待工作者 {} 完成任務 {}", id, info.task_id);
116 let _ = info.handle.await;
117 }
118 }
119
120 pub fn list_active_workers(&self) -> Vec<ActiveWorkerInfo> {
122 let workers = self.workers.lock().unwrap();
123 workers
124 .iter()
125 .map(|(id, info)| ActiveWorkerInfo {
126 worker_id: *id,
127 task_id: info.task_id.clone(),
128 worker_type: info.worker_type.clone(),
129 runtime: info.start_time.elapsed(),
130 })
131 .collect()
132 }
133}
134
135impl Clone for WorkerPool {
136 fn clone(&self) -> Self {
137 Self {
138 workers: Arc::clone(&self.workers),
139 max_workers: self.max_workers,
140 }
141 }
142}
143
144#[derive(Debug, Clone)]
145pub struct WorkerStats {
146 pub total_active: usize,
147 pub cpu_intensive_count: usize,
148 pub io_intensive_count: usize,
149 pub mixed_count: usize,
150 pub max_capacity: usize,
151}
152
153#[derive(Debug, Clone)]
154pub struct ActiveWorkerInfo {
155 pub worker_id: Uuid,
156 pub task_id: String,
157 pub worker_type: WorkerType,
158 pub runtime: std::time::Duration,
159}
160
161pub struct Worker {
163 id: Uuid,
164 status: WorkerStatus,
165}
166
167#[derive(Debug, Clone)]
168pub enum WorkerStatus {
169 Idle,
170 Busy(String),
171 Stopped,
172 Error(String),
173}
174
175impl Worker {
176 pub fn new() -> Self {
177 Self {
178 id: Uuid::new_v4(),
179 status: WorkerStatus::Idle,
180 }
181 }
182
183 pub fn id(&self) -> Uuid {
184 self.id
185 }
186
187 pub fn status(&self) -> &WorkerStatus {
188 &self.status
189 }
190
191 pub fn set_status(&mut self, status: WorkerStatus) {
192 self.status = status;
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199
200 #[tokio::test]
201 async fn test_worker_pool_capacity() {
202 let pool = WorkerPool::new(2);
203 assert_eq!(pool.get_capacity(), 2);
204 assert_eq!(pool.get_active_count(), 0);
205 let stats = pool.get_worker_stats();
206 assert_eq!(stats.max_capacity, 2);
207 assert_eq!(stats.total_active, 0);
208 }
209}