#[cfg(test)]
mod tests {
use aex::communicators::pipe::PipeManager;
use futures::future::FutureExt;
use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
};
#[tokio::test]
async fn test_pipe_register_and_send_success() {
let manager = PipeManager::new();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = Arc::clone(&counter);
manager
.register(
"add_one",
Box::new(move |val: usize| {
let c = Arc::clone(&counter_clone);
(async move {
c.fetch_add(val, Ordering::SeqCst);
})
.boxed()
}),
)
.await
.expect("Register should succeed");
manager
.send("add_one", 10usize)
.await
.expect("Send 1 should succeed");
manager
.send("add_one", 20usize)
.await
.expect("Send 2 should succeed");
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
assert_eq!(counter.load(Ordering::SeqCst), 30);
}
#[tokio::test]
async fn test_pipe_conflict_error() {
let manager = PipeManager::new();
let res1 = manager
.register("unique_pipe", Box::new(|_: String| (async {}).boxed()))
.await;
assert!(res1.is_ok());
let res2 = manager
.register("unique_pipe", Box::new(|_: String| (async {}).boxed()))
.await;
assert!(res2.is_err());
assert!(res2.unwrap_err().contains("already in use"));
}
#[tokio::test]
async fn test_pipe_type_mismatch() {
let manager = PipeManager::new();
manager
.register("string_pipe", Box::new(|_: String| (async {}).boxed()))
.await
.unwrap();
let res = manager.send("string_pipe", 123i32).await;
assert!(res.is_err());
assert!(res.unwrap_err().contains("Type mismatch"));
}
#[tokio::test]
async fn test_pipe_not_found() {
let manager = PipeManager::new();
let res = manager.send("ghost_pipe", "hello".to_string()).await;
assert!(res.is_err());
assert!(res.unwrap_err().contains("not registered"));
}
#[tokio::test]
async fn test_pipe_registration_race_condition_fixed() {
let manager = Arc::new(PipeManager::new());
let pipe_name = "race_pipe";
let num_tasks = 10;
let barrier = Arc::new(tokio::sync::Barrier::new(num_tasks));
let mut handles = Vec::new();
for _ in 0..num_tasks {
let mgr = Arc::clone(&manager);
let bar = Arc::clone(&barrier);
handles.push(tokio::spawn(async move {
bar.wait().await;
mgr.register(pipe_name, Box::new(|_: String| (async {}).boxed()))
.await
}));
}
let mut results = Vec::new();
for h in handles {
results.push(h.await.unwrap());
}
let ok_count = results.iter().filter(|r| r.is_ok()).count();
let err_count = results.iter().filter(|r| r.is_err()).count();
assert_eq!(ok_count, 1);
assert_eq!(err_count, num_tasks - 1);
let has_race_msg = results
.iter()
.filter_map(|r| r.as_ref().err())
.any(|e| e.contains("conflict during race condition"));
println!("是否存在竞态冲突报错: {}", has_race_msg);
}
}