1#[cfg(feature = "tokio")]
3pub mod a_sync {
4 use crate::system::cmd::{Cmd, CmdOutput, CmdResult};
5 use rayon::slice::ParallelSliceMut as _;
6 use std::collections::VecDeque;
7 use std::sync::Arc;
8 use tokio::sync::{mpsc, Mutex};
9
10 #[derive(Debug)]
12 pub struct CmdManage {
13 queue: Arc<Mutex<VecDeque<(usize, Cmd)>>>,
15 results: Arc<Mutex<Vec<(usize, CmdResult<usize>)>>>,
17 workers: usize,
19 }
20
21 impl CmdManage {
22 pub fn new(workers: usize) -> Self {
24 CmdManage {
25 queue: Arc::new(Mutex::new(VecDeque::with_capacity(workers * 2))),
26 results: Arc::new(Mutex::new(Vec::with_capacity(workers * 2))),
27 workers,
28 }
29 }
30 pub async fn add_cmd(&self, cmd: Cmd) {
32 let mut queue = self.queue.lock().await;
33 let index = queue.len();
34 queue.push_back((index, cmd));
35 }
36 pub async fn run(&self) -> crate::Result<()> {
38 let (tx, mut rx) = mpsc::channel(self.workers);
39 for _ in 0..self.workers {
41 let queue = Arc::clone(&self.queue);
42 let results = Arc::clone(&self.results);
43 let tx = tx.clone();
44
45 tokio::spawn(async move {
46 loop {
47 let cmd = {
48 let mut queue = queue.lock().await;
49 queue.pop_front()
50 };
51
52 match cmd {
53 Some((index, cmd)) => {
54 let output = cmd.a_output().await;
56 let result = match output {
57 Ok(CmdOutput { stdout, status, .. }) => CmdResult {
58 content: stdout,
59 status: status.success(),
60 opts: index,
61 },
62 Err(e) => CmdResult {
63 content: e.to_string(),
64 status: false,
65 opts: index,
66 },
67 };
68 results.lock().await.push((index, result));
69 }
70 None => {
71 if tx.send(()).await.is_err() {
73 eprintln!("Failed to send completion signal");
74 }
75 break;
76 }
77 }
78 }
79 });
80 }
81 for _ in 0..self.workers {
83 rx.recv()
84 .await
85 .ok_or("Failed to receive completion signal")?;
86 }
87
88 Ok(())
89 }
90
91 pub async fn get_results(&self) -> Vec<CmdResult<usize>> {
93 self
94 .results
95 .lock()
96 .await
97 .clone()
98 .into_iter()
99 .map(|(_, result)| result)
100 .collect()
101 }
102 pub async fn get_full_results(&self) -> Vec<(usize, CmdResult<usize>)> {
104 self.results.lock().await.clone()
105 }
106 pub async fn sort(&mut self) -> &mut Self {
108 self
109 .results
110 .lock()
111 .await
112 .par_sort_unstable_by_key(|&(index, _)| index);
113 self
114 }
115 }
116}
117pub mod sync {
119 use crate::system::cmd::{Cmd, CmdOutput, CmdResult};
120 use rayon::slice::ParallelSliceMut as _;
121 use std::collections::VecDeque;
122 use std::sync::{Arc, Mutex as StdMutex};
123 use std::thread;
124 #[derive(Debug)]
126 pub struct CmdManage {
127 queue: Arc<StdMutex<VecDeque<(usize, Cmd)>>>,
129 results: Arc<StdMutex<Vec<(usize, CmdResult<usize>)>>>,
131 workers: usize,
133 }
134
135 impl CmdManage {
136 pub fn new(workers: usize) -> Self {
138 CmdManage {
139 queue: Arc::new(StdMutex::new(VecDeque::with_capacity(workers * 2))),
140 results: Arc::new(StdMutex::new(Vec::with_capacity(workers * 2))),
141 workers,
142 }
143 }
144 pub fn add_cmd(&self, cmd: Cmd) -> crate::Result<()> {
146 let mut queue = self.queue.lock()?;
147 let index = queue.len();
148 queue.push_back((index, cmd));
149 Ok(())
150 }
151 pub fn run(&self) -> crate::Result<()> {
153 let (tx, rx) = std::sync::mpsc::channel();
154 let process_cmd = |cmd: Cmd, index: usize| {
155 let output = cmd.output();
156 let result = match output {
157 Ok(CmdOutput { stdout, status, .. }) => CmdResult {
158 content: stdout,
159 status: status.success(),
160 opts: index,
161 },
162 Err(e) => CmdResult {
163 content: e.to_string(),
164 status: false,
165 opts: index,
166 },
167 };
168 (index, result)
169 };
170
171 for _ in 0..self.workers {
172 let queue = Arc::clone(&self.queue);
173 let results = Arc::clone(&self.results);
174 let tx = tx.clone();
175
176 thread::spawn(move || loop {
177 let cmd = {
178 let mut queue = queue.lock().unwrap();
179 queue.pop_front()
180 };
181
182 match cmd {
183 Some((index, cmd)) => {
184 let result = process_cmd(cmd, index);
185 results.lock().unwrap().push(result);
186 }
187 None => {
188 if tx.send(()).is_err() {
189 eprintln!("Failed to send completion signal");
190 }
191 break;
192 }
193 }
194 });
195 }
196 for _ in 0..self.workers {
198 rx.recv()
199 .map_err(|_| "Failed to receive completion signal")?;
200 }
201
202 Ok(())
203 }
204 pub fn get_results(&self) -> Vec<CmdResult<usize>> {
206 self.results.lock().map_or_else(
207 |_| Vec::new(),
208 |guard| {
209 guard
210 .clone()
211 .into_iter()
212 .map(|(_, result)| result)
213 .collect()
214 },
215 )
216 }
217 pub fn sort(&mut self) -> &mut Self {
219 if let Ok(mut results) = self.results.lock() {
220 results.par_sort_unstable_by_key(|&(index, _)| index);
221 }
222 self
223 }
224 pub fn get_full_results(&self) -> Vec<(usize, CmdResult<usize>)> {
226 self
227 .results
228 .lock()
229 .map_or_else(|_| Vec::new(), |guard| guard.clone())
230 }
231 }
232}
233#[cfg(test)]
234mod tests {
235 use super::*;
236 use crate::cmd::{Cmd, ExeType};
237 use std::time::Duration;
238
239 #[cfg(feature = "tokio")]
240 mod async_tests {
241 use crate::cmd::ExeType;
242 use std::sync::Arc;
243 use super::*;
244
245 #[tokio::test]
246 async fn test_empty_queue() {
247 let cmd_manage = a_sync::CmdManage::new(2);
248 cmd_manage.run().await.unwrap();
249 let results = cmd_manage.get_results().await;
250 assert_eq!(results.len(), 0, "Expected empty results for empty queue");
251 }
252
253 #[tokio::test]
254 async fn test_single_command() {
255 let cmd_manage = a_sync::CmdManage::new(1);
256 cmd_manage
257 .add_cmd(
258 Cmd::new("echo")
259 .set_type(ExeType::AutoShell)
260 .arg("Single".to_string()),
261 )
262 .await;
263 cmd_manage.run().await.unwrap();
264 let results = cmd_manage.get_results().await;
265 assert_eq!(results.len(), 1, "Expected one result");
266 assert_eq!(results[0].content, "Single");
267 }
268
269 #[tokio::test]
270 async fn test_many_commands() {
271 let mut cmd_manage = a_sync::CmdManage::new(4);
272 for i in 0..100 {
273 cmd_manage
274 .add_cmd(
275 Cmd::new("echo")
276 .set_type(ExeType::AutoShell)
277 .arg(i.to_string()),
278 )
279 .await;
280 }
281 cmd_manage.run().await.unwrap();
282 let results = cmd_manage.sort().await.get_full_results().await;
283 assert_eq!(results.len(), 100, "Expected 100 results");
284 for (i, result) in results {
285 assert_eq!(result.content.trim(), i.to_string());
286 }
287 }
288
289 #[tokio::test]
290 async fn test_error_handling() {
291 let cmd_manage = a_sync::CmdManage::new(2);
292 cmd_manage.add_cmd(Cmd::new("non_existent_command")).await;
293 cmd_manage.run().await.unwrap();
294 let results = cmd_manage.get_results().await;
295 assert_eq!(results.len(), 1, "Expected one result");
296 assert!(!results[0].status, "Expected command to fail");
297 assert!(results[0].content.is_empty(), "Expected error message");
298 }
299
300 #[tokio::test]
301 async fn test_mixed_success_and_failure() {
302 let mut cmd_manage = a_sync::CmdManage::new(2);
303 cmd_manage
304 .add_cmd(
305 Cmd::new("echo")
306 .set_type(ExeType::AutoShell)
307 .arg("Success".to_string()),
308 )
309 .await;
310 cmd_manage
311 .add_cmd(Cmd::new("adwjaio").set_type(ExeType::AutoShell))
312 .await;
313 cmd_manage.run().await.unwrap();
314 let results = cmd_manage.sort().await.get_results().await;
315 assert_eq!(results.len(), 2, "Expected two results");
316 assert!(results[0].status, "Expected first command to succeed");
317 assert!(!results[1].status, "Expected second command to fail");
318 }
319 #[tokio::test]
321 async fn test_concurrent_add_and_run() {
322 let cmd_manage = Arc::new(a_sync::CmdManage::new(4));
323 let cmd_manage_clone = Arc::clone(&cmd_manage);
324
325 let add_task = tokio::spawn(async move {
326 for i in 0..50 {
327 cmd_manage_clone
328 .add_cmd(
329 Cmd::new("echo")
330 .set_type(ExeType::AutoShell)
331 .arg(i.to_string()),
332 )
333 .await;
334 tokio::time::sleep(Duration::from_millis(1)).await;
335 }
336 });
337 let cmd_manage_clone = Arc::clone(&cmd_manage);
338 let run_task = tokio::spawn(async move {
339 tokio::time::sleep(Duration::from_millis(10)).await;
340 cmd_manage_clone.run().await.unwrap();
341 });
342
343 tokio::try_join!(add_task, run_task).unwrap();
344
345 let results = cmd_manage.get_results().await;
346 assert!(results.len() > 0, "Expected some results");
347 assert!(results.len() <= 50, "Expected no more than 50 results");
348 }
349 }
350
351 mod sync_tests {
353 use crate::cmd::ExeType;
354
355 use super::*;
356
357 #[test]
358 fn test_empty_queue() {
359 let cmd_manage = sync::CmdManage::new(2);
360 cmd_manage.run().unwrap();
361 let results = cmd_manage.get_results();
362 assert_eq!(results.len(), 0, "Expected empty results for empty queue");
363 }
364
365 #[test]
366 fn test_single_command() {
367 let cmd_manage = sync::CmdManage::new(1);
368 cmd_manage
369 .add_cmd(
370 Cmd::new("echo")
371 .set_type(ExeType::AutoShell)
372 .arg("Single".to_string()),
373 )
374 .unwrap();
375 cmd_manage.run().unwrap();
376 let results = cmd_manage.get_results();
377 assert_eq!(results.len(), 1, "Expected one result");
378 assert_eq!(results[0].content, "Single");
379 }
380
381 #[test]
382 fn test_many_commands() {
383 let mut cmd_manage = sync::CmdManage::new(4);
384 for i in 0..100 {
385 cmd_manage
386 .add_cmd(
387 Cmd::new("echo")
388 .set_type(ExeType::AutoShell)
389 .arg(i.to_string()),
390 )
391 .unwrap();
392 }
393 cmd_manage.run().unwrap();
394 let results = cmd_manage.sort().get_full_results();
395 assert_eq!(results.len(), 100, "Expected 100 results");
396 for (i, result) in results.iter() {
397 assert_eq!(result.content, i.to_string());
398 }
399 }
400
401 #[test]
402 fn test_error_handling() {
403 let cmd_manage = sync::CmdManage::new(2);
404 cmd_manage
405 .add_cmd(Cmd::new("non_existent_command"))
406 .unwrap();
407 cmd_manage.run().unwrap();
408 let results = cmd_manage.get_results();
409 assert_eq!(results.len(), 1, "Expected one result");
410 assert!(!results[0].status, "Expected command to fail");
411 assert!(results[0].content.is_empty(), "Expected error message");
412 }
413
414 #[test]
415 fn test_mixed_success_and_failure() {
416 let mut cmd_manage = sync::CmdManage::new(2);
417 cmd_manage
418 .add_cmd(
419 Cmd::new("echo")
420 .set_type(ExeType::AutoShell)
421 .arg("Success".to_string()),
422 )
423 .unwrap();
424 cmd_manage
425 .add_cmd(Cmd::new("non_existent_command"))
426 .unwrap();
427 cmd_manage.run().unwrap();
428 let results = cmd_manage.sort().get_results();
429 assert_eq!(results.len(), 2, "Expected two results");
430 assert!(results[0].status, "Expected first command to succeed");
431 assert!(!results[1].status, "Expected second command to fail");
432 }
433 }
434
435 #[test]
437 fn test_performance() {
438 use std::time::Instant;
439
440 let cmd_manage = sync::CmdManage::new(8);
441 for _ in 0..1000 {
442 cmd_manage
443 .add_cmd(
444 Cmd::new("echo")
445 .set_type(ExeType::AutoShell)
446 .arg("Performance test".to_string()),
447 )
448 .unwrap();
449 }
450
451 let start = Instant::now();
452 cmd_manage.run().unwrap();
453 let duration = start.elapsed();
454
455 println!("Time taken to run 1000 commands: {:?}", duration);
456 assert!(
457 duration < Duration::from_secs(10),
458 "Performance test took too long"
459 );
460 }
461}