#[cfg(test)]
mod tests {
use aex::{
communicators::event::Event,
connection::{
context::{Context, TypeMap, TypeMapExt},
global::GlobalContext,
},
};
use futures::FutureExt;
use std::{
io::Cursor,
net::SocketAddr,
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
};
use tokio::io::{self, AsyncBufRead, AsyncWrite, BufReader};
#[test]
fn test_typemap_ext() {
let map = TypeMap::default();
map.set_value(42i32);
assert_eq!(map.get_value::<i32>(), Some(42));
map.set_value(100i32);
assert_eq!(map.get_value::<i32>(), Some(100));
assert_eq!(map.get_value::<String>(), None);
#[derive(Clone, PartialEq, Debug)]
struct User {
id: u64,
}
map.set_value(User { id: 1 });
assert_eq!(map.get_value::<User>(), Some(User { id: 1 }));
}
#[test]
fn test_global_context_init() {
let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
let global = GlobalContext::new(addr, None);
assert_eq!(global.addr, addr);
}
#[tokio::test]
async fn test_context_flow() {
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let global = Arc::new(GlobalContext::new(addr, None));
let reader_data = Cursor::new(vec![0u8; 10]);
let writer_data = Cursor::new(vec![0u8; 10]);
let reader: Option<Box<dyn AsyncBufRead + Send + Unpin + Sync>> =
Some(Box::new(BufReader::new(reader_data)));
let writer: Option<Box<dyn AsyncWrite + Send + Unpin + Sync>> = Some(Box::new(writer_data));
let mut ctx = Context::new(reader, writer, global.clone(), addr);
assert_eq!(ctx.addr, addr);
ctx.local.set_value("request_scoped".to_string());
assert_eq!(
ctx.local.get_value::<String>(),
Some("request_scoped".to_string())
);
{
let req_view = ctx.req();
assert_eq!(
req_view.local.get_value::<String>(),
Some("request_scoped".to_string())
);
}
{
let res_view = ctx.res();
assert_eq!(
res_view.local.get_value::<String>(),
Some("request_scoped".to_string())
);
let _lock = res_view.writer;
}
}
#[tokio::test]
async fn test_context_concurrency() {
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let global = Arc::new(GlobalContext::new(addr, None));
let reader_data = Cursor::new(vec![0u8; 10]);
let writer_data = Cursor::new(vec![0u8; 10]);
let reader: Option<Box<dyn AsyncBufRead + Send + Unpin + Sync>> =
Some(Box::new(BufReader::new(reader_data)));
let writer: Option<Box<dyn AsyncWrite + Send + Unpin + Sync>> = Some(Box::new(writer_data));
let mut ctx = Context::new(reader, writer, global.clone(), addr);
ctx.local.set_value(99 as usize);
assert_eq!(ctx.local.get_value::<usize>(), Some(99));
}
#[tokio::test]
async fn test_context_full_flow() {
let addr = "127.0.0.1:8080".parse().unwrap();
let mut global = GlobalContext::new(addr, None);
global.set_server_name("Aex".to_string());
let event_counter = Arc::new(AtomicUsize::new(0));
let pipe_counter = Arc::new(AtomicUsize::new(0));
let spread_counter = Arc::new(AtomicUsize::new(0));
let ec = Arc::clone(&event_counter);
Event::_on(
&global.event,
"request_received".to_string(),
Arc::new(move |req_id: u32| {
let c = Arc::clone(&ec);
(async move {
println!("Event 收到请求 ID: {}", req_id);
c.fetch_add(1, Ordering::SeqCst);
})
.boxed() }),
)
.await;
let pc = Arc::clone(&pipe_counter);
global
.pipe
.register(
"audit_log",
Box::new(move |msg: String| {
let c = Arc::clone(&pc);
(async move {
println!("Pipe 审计日志: {}", msg);
c.fetch_add(1, Ordering::SeqCst);
})
.boxed()
}),
)
.await
.unwrap();
let sc = Arc::clone(&spread_counter);
global
.spread
.subscribe(
"broadcast",
Box::new(move |val: i32| {
let c = Arc::clone(&sc);
(async move {
println!("Spread 广播接收: {}", val);
c.fetch_add(1, Ordering::SeqCst);
})
.boxed()
}),
)
.await
.unwrap();
let (client, _server) = io::duplex(64);
let (reader, writer) = io::split(client);
let remote_addr = "192.168.1.100:12345".parse().unwrap();
let reader: Option<Box<dyn AsyncBufRead + Send + Unpin + Sync>> =
Some(Box::new(BufReader::new(reader)));
let writer: Option<Box<dyn AsyncWrite + Send + Unpin + Sync>> = Some(Box::new(writer));
let ctx = Context::new(reader, writer, Arc::clone(&Arc::new(global)), remote_addr);
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let time_spent = ctx.elapsed();
assert!(
time_spent >= 100,
"Elapsed 应该大于 100ms, 当前: {}ms",
time_spent
);
let global = ctx.global.clone();
global
.event
.notify("request_received".to_string(), 1024_u32)
.await;
global
.pipe
.send(
"audit_log",
format!("Client {} processed in {}ms", ctx.addr, time_spent),
)
.await
.unwrap();
global.spread.publish("broadcast", 200_i32).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
assert_eq!(
event_counter.load(Ordering::SeqCst),
1,
"Event 应该触发一次"
);
assert_eq!(
pipe_counter.load(Ordering::SeqCst),
1,
"Pipe 应该处理一条日志"
);
assert_eq!(
spread_counter.load(Ordering::SeqCst),
1,
"Spread 应该收到一个广播"
);
println!("Context 集成功能全数测试通过!");
}
#[derive(Clone, Debug, PartialEq)]
struct UserConfig {
id: u64,
role: String,
}
use tokio::io::{empty, sink};
#[tokio::test]
async fn test_context_type_map_extensions() {
let reader_opt: Option<Box<dyn AsyncBufRead + Send + Unpin + Sync>> =
Some(Box::new(tokio::io::BufReader::new(empty())));
let writer_opt: Option<Box<dyn AsyncWrite + Send + Unpin + Sync>> = Some(Box::new(sink()));
let global = Arc::new(GlobalContext::new("127.0.0.1:8080".parse().unwrap(), None));
let addr = "127.0.0.1:1234".parse().unwrap();
let mut ctx = Context::new(reader_opt, writer_opt, global, addr);
let test_msg = "AexServerExtension".to_string();
ctx.set(test_msg.clone());
let retrieved_msg = ctx.get::<String>();
assert_eq!(retrieved_msg, Some(test_msg));
let config = UserConfig {
id: 1024,
role: "admin".to_string(),
};
ctx.set(config.clone());
let retrieved_config = ctx.get::<UserConfig>();
assert_eq!(retrieved_config, Some(config));
let non_existent = ctx.get::<u32>();
assert!(non_existent.is_none());
ctx.set(42u64);
ctx.set(99u64); assert_eq!(ctx.get::<u64>(), Some(99u64));
}
#[tokio::test]
async fn test_type_map_concurrency_cloning() {
let map = TypeMap::default();
let shared_data = Arc::new(vec![1, 2, 3]);
map.set_value(shared_data.clone());
let retrieved = map.get_value::<Arc<Vec<i32>>>().expect("Should exist");
assert_eq!(*retrieved, vec![1, 2, 3]);
assert_eq!(Arc::strong_count(&retrieved), 3); }
}