#[cfg(feature = "tokio")]
pub mod a_sync {
use crate::system::cmd::{Cmd, CmdOutput, CmdResult};
use rayon::slice::ParallelSliceMut as _;
use std::collections::VecDeque;
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
#[derive(Debug)]
pub struct CmdManage {
queue: Arc<Mutex<VecDeque<(usize, Cmd)>>>,
results: Arc<Mutex<Vec<(usize, CmdResult<usize>)>>>,
workers: usize,
}
impl CmdManage {
pub fn new(workers: usize) -> Self {
CmdManage {
queue: Arc::new(Mutex::new(VecDeque::with_capacity(workers * 2))),
results: Arc::new(Mutex::new(Vec::with_capacity(workers * 2))),
workers,
}
}
pub async fn add_cmd(&self, cmd: Cmd) {
let mut queue = self.queue.lock().await;
let index = queue.len();
queue.push_back((index, cmd));
}
pub async fn run(&self) -> crate::Result<()> {
let (tx, mut rx) = mpsc::channel(self.workers);
for _ in 0..self.workers {
let queue = Arc::clone(&self.queue);
let results = Arc::clone(&self.results);
let tx = tx.clone();
tokio::spawn(async move {
loop {
let cmd = {
let mut queue = queue.lock().await;
queue.pop_front()
};
match cmd {
Some((index, cmd)) => {
let output = cmd.a_output().await;
let result = match output {
Ok(CmdOutput { stdout, status, .. }) => CmdResult {
content: stdout,
status: status.success(),
opts: index,
},
Err(e) => CmdResult {
content: e.to_string(),
status: false,
opts: index,
},
};
results.lock().await.push((index, result));
}
None => {
if tx.send(()).await.is_err() {
eprintln!("Failed to send completion signal");
}
break;
}
}
}
});
}
for _ in 0..self.workers {
rx.recv()
.await
.ok_or("Failed to receive completion signal")?;
}
Ok(())
}
pub async fn get_results(&self) -> Vec<CmdResult<usize>> {
self
.results
.lock()
.await
.clone()
.into_iter()
.map(|(_, result)| result)
.collect()
}
pub async fn get_full_results(&self) -> Vec<(usize, CmdResult<usize>)> {
self.results.lock().await.clone()
}
pub async fn sort(&mut self) -> &mut Self {
self
.results
.lock()
.await
.par_sort_unstable_by_key(|&(index, _)| index);
self
}
}
}
pub mod sync {
use crate::system::cmd::{Cmd, CmdOutput, CmdResult};
use rayon::slice::ParallelSliceMut as _;
use std::collections::VecDeque;
use std::sync::{Arc, Mutex as StdMutex};
use std::thread;
#[derive(Debug)]
pub struct CmdManage {
queue: Arc<StdMutex<VecDeque<(usize, Cmd)>>>,
results: Arc<StdMutex<Vec<(usize, CmdResult<usize>)>>>,
workers: usize,
}
impl CmdManage {
pub fn new(workers: usize) -> Self {
CmdManage {
queue: Arc::new(StdMutex::new(VecDeque::with_capacity(workers * 2))),
results: Arc::new(StdMutex::new(Vec::with_capacity(workers * 2))),
workers,
}
}
pub fn add_cmd(&self, cmd: Cmd) -> crate::Result<()> {
let mut queue = self.queue.lock()?;
let index = queue.len();
queue.push_back((index, cmd));
Ok(())
}
pub fn run(&self) -> crate::Result<()> {
let (tx, rx) = std::sync::mpsc::channel();
let process_cmd = |cmd: Cmd, index: usize| {
let output = cmd.output();
let result = match output {
Ok(CmdOutput { stdout, status, .. }) => CmdResult {
content: stdout,
status: status.success(),
opts: index,
},
Err(e) => CmdResult {
content: e.to_string(),
status: false,
opts: index,
},
};
(index, result)
};
for _ in 0..self.workers {
let queue = Arc::clone(&self.queue);
let results = Arc::clone(&self.results);
let tx = tx.clone();
thread::spawn(move || loop {
let cmd = {
let mut queue = queue.lock().unwrap();
queue.pop_front()
};
match cmd {
Some((index, cmd)) => {
let result = process_cmd(cmd, index);
results.lock().unwrap().push(result);
}
None => {
if tx.send(()).is_err() {
eprintln!("Failed to send completion signal");
}
break;
}
}
});
}
for _ in 0..self.workers {
rx.recv()
.map_err(|_| "Failed to receive completion signal")?;
}
Ok(())
}
pub fn get_results(&self) -> Vec<CmdResult<usize>> {
self.results.lock().map_or_else(
|_| Vec::new(),
|guard| {
guard
.clone()
.into_iter()
.map(|(_, result)| result)
.collect()
},
)
}
pub fn sort(&mut self) -> &mut Self {
if let Ok(mut results) = self.results.lock() {
results.par_sort_unstable_by_key(|&(index, _)| index);
}
self
}
pub fn get_full_results(&self) -> Vec<(usize, CmdResult<usize>)> {
self
.results
.lock()
.map_or_else(|_| Vec::new(), |guard| guard.clone())
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cmd::{Cmd, ExeType};
use std::time::Duration;
#[cfg(feature = "tokio")]
mod async_tests {
use crate::cmd::ExeType;
use std::sync::Arc;
use super::*;
#[tokio::test]
async fn test_empty_queue() {
let cmd_manage = a_sync::CmdManage::new(2);
cmd_manage.run().await.unwrap();
let results = cmd_manage.get_results().await;
assert_eq!(results.len(), 0, "Expected empty results for empty queue");
}
#[tokio::test]
async fn test_single_command() {
let cmd_manage = a_sync::CmdManage::new(1);
cmd_manage
.add_cmd(
Cmd::new("echo")
.set_type(ExeType::AutoShell)
.arg("Single".to_string()),
)
.await;
cmd_manage.run().await.unwrap();
let results = cmd_manage.get_results().await;
assert_eq!(results.len(), 1, "Expected one result");
assert_eq!(results[0].content, "Single");
}
#[tokio::test]
async fn test_many_commands() {
let mut cmd_manage = a_sync::CmdManage::new(4);
for i in 0..100 {
cmd_manage
.add_cmd(
Cmd::new("echo")
.set_type(ExeType::AutoShell)
.arg(i.to_string()),
)
.await;
}
cmd_manage.run().await.unwrap();
let results = cmd_manage.sort().await.get_full_results().await;
assert_eq!(results.len(), 100, "Expected 100 results");
for (i, result) in results {
assert_eq!(result.content.trim(), i.to_string());
}
}
#[tokio::test]
async fn test_error_handling() {
let cmd_manage = a_sync::CmdManage::new(2);
cmd_manage.add_cmd(Cmd::new("non_existent_command")).await;
cmd_manage.run().await.unwrap();
let results = cmd_manage.get_results().await;
assert_eq!(results.len(), 1, "Expected one result");
assert!(!results[0].status, "Expected command to fail");
assert!(results[0].content.is_empty(), "Expected error message");
}
#[tokio::test]
async fn test_mixed_success_and_failure() {
let mut cmd_manage = a_sync::CmdManage::new(2);
cmd_manage
.add_cmd(
Cmd::new("echo")
.set_type(ExeType::AutoShell)
.arg("Success".to_string()),
)
.await;
cmd_manage
.add_cmd(Cmd::new("adwjaio").set_type(ExeType::AutoShell))
.await;
cmd_manage.run().await.unwrap();
let results = cmd_manage.sort().await.get_results().await;
assert_eq!(results.len(), 2, "Expected two results");
assert!(results[0].status, "Expected first command to succeed");
assert!(!results[1].status, "Expected second command to fail");
}
#[tokio::test]
async fn test_concurrent_add_and_run() {
let cmd_manage = Arc::new(a_sync::CmdManage::new(4));
let cmd_manage_clone = Arc::clone(&cmd_manage);
let add_task = tokio::spawn(async move {
for i in 0..50 {
cmd_manage_clone
.add_cmd(
Cmd::new("echo")
.set_type(ExeType::AutoShell)
.arg(i.to_string()),
)
.await;
tokio::time::sleep(Duration::from_millis(1)).await;
}
});
let cmd_manage_clone = Arc::clone(&cmd_manage);
let run_task = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
cmd_manage_clone.run().await.unwrap();
});
tokio::try_join!(add_task, run_task).unwrap();
let results = cmd_manage.get_results().await;
assert!(results.len() > 0, "Expected some results");
assert!(results.len() <= 50, "Expected no more than 50 results");
}
}
mod sync_tests {
use crate::cmd::ExeType;
use super::*;
#[test]
fn test_empty_queue() {
let cmd_manage = sync::CmdManage::new(2);
cmd_manage.run().unwrap();
let results = cmd_manage.get_results();
assert_eq!(results.len(), 0, "Expected empty results for empty queue");
}
#[test]
fn test_single_command() {
let cmd_manage = sync::CmdManage::new(1);
cmd_manage
.add_cmd(
Cmd::new("echo")
.set_type(ExeType::AutoShell)
.arg("Single".to_string()),
)
.unwrap();
cmd_manage.run().unwrap();
let results = cmd_manage.get_results();
assert_eq!(results.len(), 1, "Expected one result");
assert_eq!(results[0].content, "Single");
}
#[test]
fn test_many_commands() {
let mut cmd_manage = sync::CmdManage::new(4);
for i in 0..100 {
cmd_manage
.add_cmd(
Cmd::new("echo")
.set_type(ExeType::AutoShell)
.arg(i.to_string()),
)
.unwrap();
}
cmd_manage.run().unwrap();
let results = cmd_manage.sort().get_full_results();
assert_eq!(results.len(), 100, "Expected 100 results");
for (i, result) in results.iter() {
assert_eq!(result.content, i.to_string());
}
}
#[test]
fn test_error_handling() {
let cmd_manage = sync::CmdManage::new(2);
cmd_manage
.add_cmd(Cmd::new("non_existent_command"))
.unwrap();
cmd_manage.run().unwrap();
let results = cmd_manage.get_results();
assert_eq!(results.len(), 1, "Expected one result");
assert!(!results[0].status, "Expected command to fail");
assert!(results[0].content.is_empty(), "Expected error message");
}
#[test]
fn test_mixed_success_and_failure() {
let mut cmd_manage = sync::CmdManage::new(2);
cmd_manage
.add_cmd(
Cmd::new("echo")
.set_type(ExeType::AutoShell)
.arg("Success".to_string()),
)
.unwrap();
cmd_manage
.add_cmd(Cmd::new("non_existent_command"))
.unwrap();
cmd_manage.run().unwrap();
let results = cmd_manage.sort().get_results();
assert_eq!(results.len(), 2, "Expected two results");
assert!(results[0].status, "Expected first command to succeed");
assert!(!results[1].status, "Expected second command to fail");
}
}
#[test]
fn test_performance() {
use std::time::Instant;
let cmd_manage = sync::CmdManage::new(8);
for _ in 0..1000 {
cmd_manage
.add_cmd(
Cmd::new("echo")
.set_type(ExeType::AutoShell)
.arg("Performance test".to_string()),
)
.unwrap();
}
let start = Instant::now();
cmd_manage.run().unwrap();
let duration = start.elapsed();
println!("Time taken to run 1000 commands: {:?}", duration);
assert!(
duration < Duration::from_secs(10),
"Performance test took too long"
);
}
}