e_utils/system/cmd/
tasks.rs

1/// 异步版本的命令管理模块
2#[cfg(feature = "tokio")]
3pub mod a_sync {
4  use crate::system::cmd::{Cmd, CmdOutput, CmdResult};
5  use rayon::slice::ParallelSliceMut as _;
6  use std::collections::VecDeque;
7  use std::sync::Arc;
8  use tokio::sync::{mpsc, Mutex};
9
10  /// 异步命令管理结构体
11  #[derive(Debug)]
12  pub struct CmdManage {
13    /// 命令队列,包含索引和命令
14    queue: Arc<Mutex<VecDeque<(usize, Cmd)>>>,
15    /// 结果列表,包含索引和结果
16    results: Arc<Mutex<Vec<(usize, CmdResult<usize>)>>>,
17    /// 工作线程数量
18    workers: usize,
19  }
20
21  impl CmdManage {
22    /// 创建新的异步命令管理器
23    pub fn new(workers: usize) -> Self {
24      CmdManage {
25        queue: Arc::new(Mutex::new(VecDeque::with_capacity(workers * 2))),
26        results: Arc::new(Mutex::new(Vec::with_capacity(workers * 2))),
27        workers,
28      }
29    }
30    /// 异步添加命令到队列
31    pub async fn add_cmd(&self, cmd: Cmd) {
32      let mut queue = self.queue.lock().await;
33      let index = queue.len();
34      queue.push_back((index, cmd));
35    }
36    /// 异步运行所有命令
37    pub async fn run(&self) -> crate::Result<()> {
38      let (tx, mut rx) = mpsc::channel(self.workers);
39      // 创建工作线程
40      for _ in 0..self.workers {
41        let queue = Arc::clone(&self.queue);
42        let results = Arc::clone(&self.results);
43        let tx = tx.clone();
44
45        tokio::spawn(async move {
46          loop {
47            let cmd = {
48              let mut queue = queue.lock().await;
49              queue.pop_front()
50            };
51
52            match cmd {
53              Some((index, cmd)) => {
54                // 执行命令并存储结果
55                let output = cmd.a_output().await;
56                let result = match output {
57                  Ok(CmdOutput { stdout, status, .. }) => CmdResult {
58                    content: stdout,
59                    status: status.success(),
60                    opts: index,
61                  },
62                  Err(e) => CmdResult {
63                    content: e.to_string(),
64                    status: false,
65                    opts: index,
66                  },
67                };
68                results.lock().await.push((index, result));
69              }
70              None => {
71                // 队列为空,发送完成信号
72                if tx.send(()).await.is_err() {
73                  eprintln!("Failed to send completion signal");
74                }
75                break;
76              }
77            }
78          }
79        });
80      }
81      // 等待所有工作线程完成
82      for _ in 0..self.workers {
83        rx.recv()
84          .await
85          .ok_or("Failed to receive completion signal")?;
86      }
87
88      Ok(())
89    }
90
91    /// 异步获取结果
92    pub async fn get_results(&self) -> Vec<CmdResult<usize>> {
93      self
94        .results
95        .lock()
96        .await
97        .clone()
98        .into_iter()
99        .map(|(_, result)| result)
100        .collect()
101    }
102    /// 异步获取完整结果
103    pub async fn get_full_results(&self) -> Vec<(usize, CmdResult<usize>)> {
104      self.results.lock().await.clone()
105    }
106    /// 排序
107    pub async fn sort(&mut self) -> &mut Self {
108      self
109        .results
110        .lock()
111        .await
112        .par_sort_unstable_by_key(|&(index, _)| index);
113      self
114    }
115  }
116}
117/// 同步版本的命令管理模块
118pub mod sync {
119  use crate::system::cmd::{Cmd, CmdOutput, CmdResult};
120  use rayon::slice::ParallelSliceMut as _;
121  use std::collections::VecDeque;
122  use std::sync::{Arc, Mutex as StdMutex};
123  use std::thread;
124  /// 同步命令管理结构体
125  #[derive(Debug)]
126  pub struct CmdManage {
127    /// 命令队列,包含索引和命令
128    queue: Arc<StdMutex<VecDeque<(usize, Cmd)>>>,
129    /// 结果列表,包含索引和结果
130    results: Arc<StdMutex<Vec<(usize, CmdResult<usize>)>>>,
131    /// 工作线程数量
132    workers: usize,
133  }
134
135  impl CmdManage {
136    /// 创建新的同步命令管理器
137    pub fn new(workers: usize) -> Self {
138      CmdManage {
139        queue: Arc::new(StdMutex::new(VecDeque::with_capacity(workers * 2))),
140        results: Arc::new(StdMutex::new(Vec::with_capacity(workers * 2))),
141        workers,
142      }
143    }
144    /// 同步添加命令到队列
145    pub fn add_cmd(&self, cmd: Cmd) -> crate::Result<()> {
146      let mut queue = self.queue.lock()?;
147      let index = queue.len();
148      queue.push_back((index, cmd));
149      Ok(())
150    }
151    /// 同步运行所有命令
152    pub fn run(&self) -> crate::Result<()> {
153      let (tx, rx) = std::sync::mpsc::channel();
154      let process_cmd = |cmd: Cmd, index: usize| {
155        let output = cmd.output();
156        let result = match output {
157          Ok(CmdOutput { stdout, status, .. }) => CmdResult {
158            content: stdout,
159            status: status.success(),
160            opts: index,
161          },
162          Err(e) => CmdResult {
163            content: e.to_string(),
164            status: false,
165            opts: index,
166          },
167        };
168        (index, result)
169      };
170
171      for _ in 0..self.workers {
172        let queue = Arc::clone(&self.queue);
173        let results = Arc::clone(&self.results);
174        let tx = tx.clone();
175
176        thread::spawn(move || loop {
177          let cmd = {
178            let mut queue = queue.lock().unwrap();
179            queue.pop_front()
180          };
181
182          match cmd {
183            Some((index, cmd)) => {
184              let result = process_cmd(cmd, index);
185              results.lock().unwrap().push(result);
186            }
187            None => {
188              if tx.send(()).is_err() {
189                eprintln!("Failed to send completion signal");
190              }
191              break;
192            }
193          }
194        });
195      }
196      // 等待所有工作线程完成
197      for _ in 0..self.workers {
198        rx.recv()
199          .map_err(|_| "Failed to receive completion signal")?;
200      }
201
202      Ok(())
203    }
204    /// 异步获取结果
205    pub fn get_results(&self) -> Vec<CmdResult<usize>> {
206      self.results.lock().map_or_else(
207        |_| Vec::new(),
208        |guard| {
209          guard
210            .clone()
211            .into_iter()
212            .map(|(_, result)| result)
213            .collect()
214        },
215      )
216    }
217    /// 排序
218    pub fn sort(&mut self) -> &mut Self {
219      if let Ok(mut results) = self.results.lock() {
220        results.par_sort_unstable_by_key(|&(index, _)| index);
221      }
222      self
223    }
224    /// 同步获取完整结果
225    pub fn get_full_results(&self) -> Vec<(usize, CmdResult<usize>)> {
226      self
227        .results
228        .lock()
229        .map_or_else(|_| Vec::new(), |guard| guard.clone())
230    }
231  }
232}
233#[cfg(test)]
234mod tests {
235  use super::*;
236  use crate::cmd::{Cmd, ExeType};
237  use std::time::Duration;
238
239  #[cfg(feature = "tokio")]
240  mod async_tests {
241    use crate::cmd::ExeType;
242    use std::sync::Arc;
243    use super::*;
244
245    #[tokio::test]
246    async fn test_empty_queue() {
247      let cmd_manage = a_sync::CmdManage::new(2);
248      cmd_manage.run().await.unwrap();
249      let results = cmd_manage.get_results().await;
250      assert_eq!(results.len(), 0, "Expected empty results for empty queue");
251    }
252
253    #[tokio::test]
254    async fn test_single_command() {
255      let cmd_manage = a_sync::CmdManage::new(1);
256      cmd_manage
257        .add_cmd(
258          Cmd::new("echo")
259            .set_type(ExeType::AutoShell)
260            .arg("Single".to_string()),
261        )
262        .await;
263      cmd_manage.run().await.unwrap();
264      let results = cmd_manage.get_results().await;
265      assert_eq!(results.len(), 1, "Expected one result");
266      assert_eq!(results[0].content, "Single");
267    }
268
269    #[tokio::test]
270    async fn test_many_commands() {
271      let mut cmd_manage = a_sync::CmdManage::new(4);
272      for i in 0..100 {
273        cmd_manage
274          .add_cmd(
275            Cmd::new("echo")
276              .set_type(ExeType::AutoShell)
277              .arg(i.to_string()),
278          )
279          .await;
280      }
281      cmd_manage.run().await.unwrap();
282      let results = cmd_manage.sort().await.get_full_results().await;
283      assert_eq!(results.len(), 100, "Expected 100 results");
284      for (i, result) in results {
285        assert_eq!(result.content.trim(), i.to_string());
286      }
287    }
288
289    #[tokio::test]
290    async fn test_error_handling() {
291      let cmd_manage = a_sync::CmdManage::new(2);
292      cmd_manage.add_cmd(Cmd::new("non_existent_command")).await;
293      cmd_manage.run().await.unwrap();
294      let results = cmd_manage.get_results().await;
295      assert_eq!(results.len(), 1, "Expected one result");
296      assert!(!results[0].status, "Expected command to fail");
297      assert!(results[0].content.is_empty(), "Expected error message");
298    }
299
300    #[tokio::test]
301    async fn test_mixed_success_and_failure() {
302      let mut cmd_manage = a_sync::CmdManage::new(2);
303      cmd_manage
304        .add_cmd(
305          Cmd::new("echo")
306            .set_type(ExeType::AutoShell)
307            .arg("Success".to_string()),
308        )
309        .await;
310      cmd_manage
311        .add_cmd(Cmd::new("adwjaio").set_type(ExeType::AutoShell))
312        .await;
313      cmd_manage.run().await.unwrap();
314      let results = cmd_manage.sort().await.get_results().await;
315      assert_eq!(results.len(), 2, "Expected two results");
316      assert!(results[0].status, "Expected first command to succeed");
317      assert!(!results[1].status, "Expected second command to fail");
318    }
319    // 并发测试
320    #[tokio::test]
321    async fn test_concurrent_add_and_run() {
322      let cmd_manage = Arc::new(a_sync::CmdManage::new(4));
323      let cmd_manage_clone = Arc::clone(&cmd_manage);
324
325      let add_task = tokio::spawn(async move {
326        for i in 0..50 {
327          cmd_manage_clone
328            .add_cmd(
329              Cmd::new("echo")
330                .set_type(ExeType::AutoShell)
331                .arg(i.to_string()),
332            )
333            .await;
334          tokio::time::sleep(Duration::from_millis(1)).await;
335        }
336      });
337      let cmd_manage_clone = Arc::clone(&cmd_manage);
338      let run_task = tokio::spawn(async move {
339        tokio::time::sleep(Duration::from_millis(10)).await;
340        cmd_manage_clone.run().await.unwrap();
341      });
342
343      tokio::try_join!(add_task, run_task).unwrap();
344
345      let results = cmd_manage.get_results().await;
346      assert!(results.len() > 0, "Expected some results");
347      assert!(results.len() <= 50, "Expected no more than 50 results");
348    }
349  }
350
351  // 同步测试
352  mod sync_tests {
353    use crate::cmd::ExeType;
354
355    use super::*;
356
357    #[test]
358    fn test_empty_queue() {
359      let cmd_manage = sync::CmdManage::new(2);
360      cmd_manage.run().unwrap();
361      let results = cmd_manage.get_results();
362      assert_eq!(results.len(), 0, "Expected empty results for empty queue");
363    }
364
365    #[test]
366    fn test_single_command() {
367      let cmd_manage = sync::CmdManage::new(1);
368      cmd_manage
369        .add_cmd(
370          Cmd::new("echo")
371            .set_type(ExeType::AutoShell)
372            .arg("Single".to_string()),
373        )
374        .unwrap();
375      cmd_manage.run().unwrap();
376      let results = cmd_manage.get_results();
377      assert_eq!(results.len(), 1, "Expected one result");
378      assert_eq!(results[0].content, "Single");
379    }
380
381    #[test]
382    fn test_many_commands() {
383      let mut cmd_manage = sync::CmdManage::new(4);
384      for i in 0..100 {
385        cmd_manage
386          .add_cmd(
387            Cmd::new("echo")
388              .set_type(ExeType::AutoShell)
389              .arg(i.to_string()),
390          )
391          .unwrap();
392      }
393      cmd_manage.run().unwrap();
394      let results = cmd_manage.sort().get_full_results();
395      assert_eq!(results.len(), 100, "Expected 100 results");
396      for (i, result) in results.iter() {
397        assert_eq!(result.content, i.to_string());
398      }
399    }
400
401    #[test]
402    fn test_error_handling() {
403      let cmd_manage = sync::CmdManage::new(2);
404      cmd_manage
405        .add_cmd(Cmd::new("non_existent_command"))
406        .unwrap();
407      cmd_manage.run().unwrap();
408      let results = cmd_manage.get_results();
409      assert_eq!(results.len(), 1, "Expected one result");
410      assert!(!results[0].status, "Expected command to fail");
411      assert!(results[0].content.is_empty(), "Expected error message");
412    }
413
414    #[test]
415    fn test_mixed_success_and_failure() {
416      let mut cmd_manage = sync::CmdManage::new(2);
417      cmd_manage
418        .add_cmd(
419          Cmd::new("echo")
420            .set_type(ExeType::AutoShell)
421            .arg("Success".to_string()),
422        )
423        .unwrap();
424      cmd_manage
425        .add_cmd(Cmd::new("non_existent_command"))
426        .unwrap();
427      cmd_manage.run().unwrap();
428      let results = cmd_manage.sort().get_results();
429      assert_eq!(results.len(), 2, "Expected two results");
430      assert!(results[0].status, "Expected first command to succeed");
431      assert!(!results[1].status, "Expected second command to fail");
432    }
433  }
434
435  // 性能测试
436  #[test]
437  fn test_performance() {
438    use std::time::Instant;
439
440    let cmd_manage = sync::CmdManage::new(8);
441    for _ in 0..1000 {
442      cmd_manage
443        .add_cmd(
444          Cmd::new("echo")
445            .set_type(ExeType::AutoShell)
446            .arg("Performance test".to_string()),
447        )
448        .unwrap();
449    }
450
451    let start = Instant::now();
452    cmd_manage.run().unwrap();
453    let duration = start.elapsed();
454
455    println!("Time taken to run 1000 commands: {:?}", duration);
456    assert!(
457      duration < Duration::from_secs(10),
458      "Performance test took too long"
459    );
460  }
461}