use std::io::{Read, Write};
use std::time::Duration;
macro_rules! llam_test {
($name:ident, $body:block) => {
#[test]
fn $name() -> llam::Result<()> {
llam::test(|| $body)
}
};
($name:ident, profile = $profile:expr, $body:block) => {
#[test]
fn $name() -> llam::Result<()> {
llam::test_with_profile($profile, || $body)
}
};
}
llam_test!(spawn_and_channel, {
let (tx, rx) = llam::channel::bounded::<u32>(8)?;
let handle = llam::spawn!(move {
tx.send(42).expect("send failed");
7u32
});
assert_eq!(rx.recv()?, 42);
assert_eq!(handle.join().expect("join failed"), 7);
Ok(())
});
llam_test!(try_spawn_task_batch_and_closed_channel, {
let handle = llam::try_spawn!({ 5u32 })?;
assert_eq!(handle.join().expect("try_spawn join failed"), 5);
let mut batch = llam::TaskBatch::with_capacity(4);
for i in 0..4u32 {
batch.spawn(move || i + 1)?;
}
assert_eq!(
batch.join().expect("task batch join failed"),
vec![1, 2, 3, 4]
);
let (tx, rx) = llam::channel::bounded::<u32>(1)?;
tx.try_send(9).expect("try_send failed");
assert_eq!(rx.try_recv_option()?, Some(9));
drop(tx);
assert_eq!(rx.recv_option()?, None);
let selected = llam::select! {
recv(rx) -> msg => {
let _ = msg.expect("select recv failed");
1
},
closed(rx) => {
2
},
};
assert_eq!(selected, 2);
Ok(())
});
llam_test!(select_timeout, profile = llam::Profile::DebugSafe, {
let (_tx, rx) = llam::channel::bounded::<u32>(1)?;
let mut timed_out = false;
llam::select! {
recv(rx) -> msg => {
let _ = msg;
},
after(Duration::from_millis(1)) => {
timed_out = true;
},
}
assert!(timed_out);
Ok(())
});
llam_test!(select_default_and_mixed_send_recv, {
let (_tx, rx) = llam::channel::bounded::<u32>(1)?;
let defaulted = llam::select! {
recv(rx) -> msg => {
let _ = msg;
false
},
default => {
true
},
};
assert!(defaulted);
let (tx, rx) = llam::channel::bounded::<u32>(1)?;
let selected = llam::select! {
recv(rx) -> msg => {
msg.expect("select recv failed")
},
send(tx, 11u32) => {
11
},
default => {
0
},
};
assert_eq!(selected, 11);
assert_eq!(rx.recv()?, 11);
let (_t0, r0) = llam::channel::bounded::<u32>(1)?;
let (_t1, r1) = llam::channel::bounded::<u32>(1)?;
let (_t2, r2) = llam::channel::bounded::<u32>(1)?;
let (_t3, r3) = llam::channel::bounded::<u32>(1)?;
let (_t4, r4) = llam::channel::bounded::<u32>(1)?;
let any = llam::select! {
recv(r0) -> value => value.expect("select recv failed"),
recv(r1) -> value => value.expect("select recv failed"),
recv(r2) -> value => value.expect("select recv failed"),
recv(r3) -> value => value.expect("select recv failed"),
recv(r4) -> value => value.expect("select recv failed"),
default => { 123 },
};
assert_eq!(any, 123);
Ok(())
});
llam_test!(task_local_roundtrip, {
let key = llam::TaskLocalKey::<String>::new()?;
key.set("main".to_string())?;
assert_eq!(key.get_cloned()?.as_deref(), Some("main"));
let child_key = key.clone();
let handle = llam::spawn!(move {
assert!(child_key.get_cloned().unwrap().is_none());
child_key.set("child".to_string()).unwrap();
child_key.take().unwrap()
});
assert_eq!(
handle.join().expect("join failed").as_deref(),
Some("child")
);
assert_eq!(key.take()?.as_deref(), Some("main"));
assert!(key.get_cloned()?.is_none());
let scoped = key.with("scoped".to_string(), || key.get_cloned().unwrap().unwrap())?;
assert_eq!(scoped, "scoped");
assert!(key.get_cloned()?.is_none());
Ok(())
});
llam_test!(abi_stats_join_and_select_send, {
let abi = llam::AbiInfo::load()?;
assert_eq!(abi.abi_major(), llam::sys::LLAM_ABI_VERSION_MAJOR);
assert!(!llam::abi::version_string().is_empty());
let mut handle = llam::spawn!(move {
llam::time::sleep(Duration::from_millis(5)).unwrap();
99u32
});
assert!(handle.join_until(0).expect("timed join failed").is_none());
assert_eq!(
handle.join_timeout(Duration::from_secs(1)).unwrap(),
Some(99)
);
let (tx, rx) = llam::channel::bounded::<u32>(1)?;
let sent = llam::select! {
send(tx, 7u32) => {
true
},
after(Duration::from_secs(1)) => {
panic!("send select timed out");
},
};
assert!(sent);
assert_eq!(rx.recv()?, 7);
#[cfg(unix)]
{
let path = std::env::temp_dir().join(format!("llam-rs-stats-{}.json", std::process::id()));
let file = std::fs::File::create(&path).map_err(llam::Error::from)?;
llam::diagnostics::write_stats_json(&file)?;
drop(file);
let json = std::fs::read_to_string(&path).map_err(llam::Error::from)?;
let _ = std::fs::remove_file(&path);
assert!(json.contains("ctx_switches"));
}
Ok(())
});
llam_test!(
tcp_stream_connect_uses_llam_path,
profile = llam::Profile::IoLatency,
{
let listener = std::net::TcpListener::bind("127.0.0.1:0").map_err(llam::Error::from)?;
let addr = listener.local_addr().map_err(llam::Error::from)?;
let thread = std::thread::spawn(move || {
let (mut stream, _) = listener.accept().unwrap();
let mut buf = [0u8; 4];
stream.read_exact(&mut buf).unwrap();
stream.write_all(&buf).unwrap();
});
let mut stream = llam::net::TcpStream::connect(addr).map_err(llam::Error::from)?;
stream.write_all(b"ping").map_err(llam::Error::from)?;
let mut buf = [0u8; 4];
stream.read_exact(&mut buf).map_err(llam::Error::from)?;
thread.join().expect("tcp echo thread panicked");
assert_eq!(&buf, b"ping");
Ok(())
}
);
llam_test!(
udp_connected_socket_roundtrip,
profile = llam::Profile::IoLatency,
{
let a = llam::net::UdpSocket::bind("127.0.0.1:0").map_err(llam::Error::from)?;
let b = llam::net::UdpSocket::bind("127.0.0.1:0").map_err(llam::Error::from)?;
a.connect(b.local_addr().map_err(llam::Error::from)?)
.map_err(llam::Error::from)?;
b.connect(a.local_addr().map_err(llam::Error::from)?)
.map_err(llam::Error::from)?;
assert_eq!(a.send(b"pong").map_err(llam::Error::from)?, 4);
let mut buf = [0u8; 4];
assert_eq!(b.recv(&mut buf).map_err(llam::Error::from)?, 4);
assert_eq!(&buf, b"pong");
Ok(())
}
);
#[cfg(unix)]
llam_test!(
unix_stream_and_file_wrappers,
profile = llam::Profile::IoLatency,
{
let dir = std::env::temp_dir();
let sock_path = dir.join(format!("llam-rs-{}.sock", std::process::id()));
let _ = std::fs::remove_file(&sock_path);
let listener = llam::net::UnixListener::bind(&sock_path).map_err(llam::Error::from)?;
let client_path = sock_path.clone();
let thread = std::thread::spawn(move || {
let mut client = std::os::unix::net::UnixStream::connect(client_path).unwrap();
client.write_all(b"unix").unwrap();
});
let mut stream = listener.accept().map_err(llam::Error::from)?;
let mut buf = [0u8; 4];
stream.read_exact(&mut buf).map_err(llam::Error::from)?;
thread.join().expect("unix client thread panicked");
let _ = std::fs::remove_file(&sock_path);
assert_eq!(&buf, b"unix");
let file_path = dir.join(format!("llam-rs-file-{}.txt", std::process::id()));
{
let mut file = llam::fs::File::create(&file_path).map_err(llam::Error::from)?;
file.write_all(b"file").map_err(llam::Error::from)?;
file.flush().map_err(llam::Error::from)?;
}
{
let mut file = llam::fs::File::open(&file_path).map_err(llam::Error::from)?;
let mut text = String::new();
file.read_to_string(&mut text).map_err(llam::Error::from)?;
assert_eq!(text, "file");
}
let _ = std::fs::remove_file(&file_path);
Ok(())
}
);