use std::io::{Read, Write};
use std::time::Duration;
#[cfg(unix)]
use std::os::fd::AsRawFd;
#[cfg(windows)]
use std::os::windows::io::AsRawSocket;
macro_rules! llam_test {
($name:ident, $body:block) => {
#[test]
fn $name() -> llam::Result<()> {
llam::test(|| $body)
}
};
($name:ident, profile = $profile:expr, $body:block) => {
#[test]
fn $name() -> llam::Result<()> {
llam::test_with_profile($profile, || $body)
}
};
}
llam_test!(spawn_and_channel, {
let (tx, rx) = llam::channel::bounded::<u32>(8)?;
let handle = llam::spawn!(move {
tx.send(42).expect("send failed");
7u32
});
assert_eq!(rx.recv()?, 42);
assert_eq!(handle.join().expect("join failed"), 7);
Ok(())
});
llam_test!(try_spawn_task_batch_and_closed_channel, {
let handle = llam::try_spawn!({ 5u32 })?;
assert_eq!(handle.join().expect("try_spawn join failed"), 5);
let mut batch = llam::TaskBatch::with_capacity(4);
for i in 0..4u32 {
batch.spawn(move || i + 1)?;
}
assert_eq!(
batch.join().expect("task batch join failed"),
vec![1, 2, 3, 4]
);
let (tx, rx) = llam::channel::bounded::<u32>(1)?;
tx.try_send(9).expect("try_send failed");
assert_eq!(rx.try_recv_option()?, Some(9));
drop(tx);
assert_eq!(rx.recv_option()?, None);
let selected = llam::select! {
recv(rx) -> msg => {
let _ = msg.expect("select recv failed");
1
},
closed(rx) => {
2
},
};
assert_eq!(selected, 2);
Ok(())
});
llam_test!(select_timeout, profile = llam::Profile::DebugSafe, {
let (_tx, rx) = llam::channel::bounded::<u32>(1)?;
let mut timed_out = false;
llam::select! {
recv(rx) -> msg => {
let _ = msg;
},
after(Duration::from_millis(1)) => {
timed_out = true;
},
}
assert!(timed_out);
Ok(())
});
llam_test!(select_default_and_mixed_send_recv, {
let (_tx, rx) = llam::channel::bounded::<u32>(1)?;
let defaulted = llam::select! {
recv(rx) -> msg => {
let _ = msg;
false
},
default => {
true
},
};
assert!(defaulted);
let (tx, rx) = llam::channel::bounded::<u32>(1)?;
let selected = llam::select! {
recv(rx) -> msg => {
msg.expect("select recv failed")
},
send(tx, 11u32) => {
11
},
default => {
0
},
};
assert_eq!(selected, 11);
assert_eq!(rx.recv()?, 11);
let (_t0, r0) = llam::channel::bounded::<u32>(1)?;
let (_t1, r1) = llam::channel::bounded::<u32>(1)?;
let (_t2, r2) = llam::channel::bounded::<u32>(1)?;
let (_t3, r3) = llam::channel::bounded::<u32>(1)?;
let (_t4, r4) = llam::channel::bounded::<u32>(1)?;
let any = llam::select! {
recv(r0) -> value => value.expect("select recv failed"),
recv(r1) -> value => value.expect("select recv failed"),
recv(r2) -> value => value.expect("select recv failed"),
recv(r3) -> value => value.expect("select recv failed"),
recv(r4) -> value => value.expect("select recv failed"),
default => { 123 },
};
assert_eq!(any, 123);
let try_selected = llam::try_select! {
recv(r0) -> value => value.expect("try_select recv failed"),
default => { 99 },
}?;
assert_eq!(try_selected, 99);
Ok(())
});
llam_test!(raw_select_and_raw_accept_addr, {
let raw_channel = unsafe { llam::sys::llam_channel_create(1) };
assert!(!raw_channel.is_null());
let boxed = Box::new(1234u32);
let value_ptr = Box::into_raw(boxed).cast::<std::ffi::c_void>();
let send_rc = unsafe { llam::sys::llam_channel_send(raw_channel, value_ptr) };
assert_eq!(send_rc, 0);
let mut recv_ptr = std::ptr::null_mut();
let mut ops = [llam::sys::llam_select_op_t {
kind: llam::sys::LLAM_SELECT_OP_RECV,
reserved0: 0,
channel: raw_channel,
send_value: std::ptr::null_mut(),
recv_out: &mut recv_ptr,
result_errno: 0,
}];
let selected = unsafe { llam::channel::select_raw(&mut ops, u64::MAX)? };
assert_eq!(selected.selected, 0);
assert_eq!(selected.result_errno, 0);
assert_eq!(unsafe { *Box::from_raw(recv_ptr.cast::<u32>()) }, 1234);
unsafe {
let _ = llam::sys::llam_channel_destroy(raw_channel);
}
let std_listener = std::net::TcpListener::bind("127.0.0.1:0").map_err(llam::Error::from)?;
std_listener
.set_nonblocking(true)
.map_err(llam::Error::from)?;
let addr = std_listener.local_addr().map_err(llam::Error::from)?;
let thread = std::thread::spawn(move || {
let mut stream = std::net::TcpStream::connect(addr).unwrap();
stream.write_all(b"addr").unwrap();
});
let accepted =
llam::io::accept_with_addr(raw_listener_fd(&std_listener)).map_err(llam::Error::from)?;
assert!(accepted.addr.is_some());
let mut stream = unsafe { std_stream_from_fd(accepted.fd) };
let mut buf = [0u8; 4];
stream.read_exact(&mut buf).map_err(llam::Error::from)?;
thread.join().expect("tcp raw accept client panicked");
assert_eq!(&buf, b"addr");
Ok(())
});
llam_test!(task_local_roundtrip, {
let key = llam::TaskLocalKey::<String>::new()?;
key.set("main".to_string())?;
assert!(key.is_set()?);
assert_eq!(key.get_cloned()?.as_deref(), Some("main"));
assert_eq!(key.replace("next".to_string())?.as_deref(), Some("main"));
assert_eq!(key.get_cloned()?.as_deref(), Some("next"));
let child_key = key.clone();
let handle = llam::spawn!(move {
assert!(child_key.get_cloned().unwrap().is_none());
child_key.set("child".to_string()).unwrap();
child_key.take().unwrap()
});
assert_eq!(
handle.join().expect("join failed").as_deref(),
Some("child")
);
assert_eq!(key.take()?.as_deref(), Some("next"));
assert!(key.get_cloned()?.is_none());
assert!(!key.is_set()?);
key.set("root".to_string())?;
let scoped = key.with("scoped".to_string(), || key.get_cloned().unwrap().unwrap())?;
assert_eq!(scoped, "scoped");
assert_eq!(key.get_cloned()?.as_deref(), Some("root"));
assert_eq!(key.get_cloned_or("fallback".to_string())?, "root");
assert_eq!(key.take()?.as_deref(), Some("root"));
assert_eq!(key.get_cloned_or_default()?, String::default());
Ok(())
});
llam_test!(cancel_scope_spawns_with_shared_token, {
let scope = llam::CancelScope::new()?;
let token = scope.token();
let handle = scope.try_spawn(move || {
while !token.is_cancelled() {
llam::task::yield_now();
}
17u32
})?;
scope.cancel()?;
assert!(scope.is_cancelled());
assert_eq!(handle.join().expect("cancel scoped task failed"), 17);
Ok(())
});
llam_test!(structured_scope_and_nursery, {
let input = String::from("borrowed");
let scoped = llam::try_scope(|scope| {
let handle = scope.spawn(|| input.len());
handle.join().expect("scoped join failed")
})
.expect("try_scope failed");
assert_eq!(scoped, 8);
llam::scope(|scope| {
let _unjoined = scope.spawn(|| {
llam::task::yield_now();
11u32
});
});
llam::nursery(|nursery| {
let token = nursery.token();
let handle = nursery.try_spawn(move || {
while !token.is_cancelled() {
llam::task::yield_now();
}
23u32
})?;
nursery.cancel()?;
assert_eq!(handle.join().expect("nursery join failed"), 23);
Ok(())
})?;
Ok(())
});
llam_test!(task_batch_collects_individual_results, {
let mut batch = llam::TaskBatch::new();
batch.spawn(|| 1u32)?;
batch.spawn(|| 2u32)?;
let results = batch.join_results();
assert_eq!(results.len(), 2);
assert_eq!(results[0].as_ref().expect("first task failed"), &1);
assert_eq!(results[1].as_ref().expect("second task failed"), &2);
Ok(())
});
llam_test!(abi_stats_join_and_select_send, {
let abi = llam::AbiInfo::load()?;
assert_eq!(abi.abi_major(), llam::sys::LLAM_ABI_VERSION_MAJOR);
assert!(!llam::abi::version_string().is_empty());
let mut handle = llam::spawn!(move {
llam::time::sleep(Duration::from_millis(5)).unwrap();
99u32
});
assert!(handle.join_until(0).expect("timed join failed").is_none());
assert_eq!(
handle.join_timeout(Duration::from_secs(1)).unwrap(),
Some(99)
);
let (tx, rx) = llam::channel::bounded::<u32>(1)?;
let sent = llam::select! {
send(tx, 7u32) => {
true
},
after(Duration::from_secs(1)) => {
panic!("send select timed out");
},
};
assert!(sent);
assert_eq!(rx.recv()?, 7);
#[cfg(unix)]
{
let path = std::env::temp_dir().join(format!("llam-rs-stats-{}.json", std::process::id()));
let file = std::fs::File::create(&path).map_err(llam::Error::from)?;
llam::diagnostics::write_stats_json(&file)?;
drop(file);
let json = std::fs::read_to_string(&path).map_err(llam::Error::from)?;
let _ = std::fs::remove_file(&path);
assert!(json.contains("ctx_switches"));
}
Ok(())
});
llam_test!(
tcp_stream_connect_uses_llam_path,
profile = llam::Profile::IoLatency,
{
let listener = std::net::TcpListener::bind("127.0.0.1:0").map_err(llam::Error::from)?;
let addr = listener.local_addr().map_err(llam::Error::from)?;
let thread = std::thread::spawn(move || {
let (mut stream, _) = listener.accept().unwrap();
let mut buf = [0u8; 4];
stream.read_exact(&mut buf).unwrap();
stream.write_all(&buf).unwrap();
});
let mut stream = llam::net::TcpStream::connect(addr).map_err(llam::Error::from)?;
stream.write_all(b"ping").map_err(llam::Error::from)?;
let mut buf = [0u8; 4];
stream.read_exact(&mut buf).map_err(llam::Error::from)?;
thread.join().expect("tcp echo thread panicked");
assert_eq!(&buf, b"ping");
Ok(())
}
);
llam_test!(tcp_from_std_wrappers, profile = llam::Profile::IoLatency, {
let std_listener = std::net::TcpListener::bind("127.0.0.1:0").map_err(llam::Error::from)?;
let listener = llam::net::TcpListener::from_std(std_listener).map_err(llam::Error::from)?;
let addr = listener.local_addr().map_err(llam::Error::from)?;
let thread = std::thread::spawn(move || {
let mut client = std::net::TcpStream::connect(addr).unwrap();
client.write_all(b"wrap").unwrap();
});
let (mut accepted, peer) = listener.accept().map_err(llam::Error::from)?;
assert!(peer.is_some());
let mut buf = [0u8; 4];
accepted.read_exact(&mut buf).map_err(llam::Error::from)?;
thread.join().expect("tcp from_std client panicked");
assert_eq!(&buf, b"wrap");
let std_listener = std::net::TcpListener::bind("127.0.0.1:0").map_err(llam::Error::from)?;
let addr = std_listener.local_addr().map_err(llam::Error::from)?;
let thread = std::thread::spawn(move || {
let (mut server, _) = std_listener.accept().unwrap();
let mut buf = [0u8; 4];
server.read_exact(&mut buf).unwrap();
server.write_all(&buf).unwrap();
});
let std_stream = std::net::TcpStream::connect(addr).map_err(llam::Error::from)?;
let mut stream = llam::net::TcpStream::from_std(std_stream).map_err(llam::Error::from)?;
stream.write_all(b"echo").map_err(llam::Error::from)?;
let mut buf = [0u8; 4];
stream.read_exact(&mut buf).map_err(llam::Error::from)?;
thread.join().expect("tcp from_std echo thread panicked");
assert_eq!(&buf, b"echo");
Ok(())
});
llam_test!(
udp_connected_socket_roundtrip,
profile = llam::Profile::IoLatency,
{
let a = llam::net::UdpSocket::bind("127.0.0.1:0").map_err(llam::Error::from)?;
let b = llam::net::UdpSocket::bind("127.0.0.1:0").map_err(llam::Error::from)?;
a.connect(b.local_addr().map_err(llam::Error::from)?)
.map_err(llam::Error::from)?;
b.connect(a.local_addr().map_err(llam::Error::from)?)
.map_err(llam::Error::from)?;
assert_eq!(a.send(b"pong").map_err(llam::Error::from)?, 4);
let mut buf = [0u8; 4];
assert_eq!(b.recv(&mut buf).map_err(llam::Error::from)?, 4);
assert_eq!(&buf, b"pong");
Ok(())
}
);
llam_test!(
udp_unconnected_and_from_std_roundtrip,
profile = llam::Profile::IoLatency,
{
let a = llam::net::UdpSocket::from_std(
std::net::UdpSocket::bind("127.0.0.1:0").map_err(llam::Error::from)?,
)
.map_err(llam::Error::from)?;
let b = llam::net::UdpSocket::from_std(
std::net::UdpSocket::bind("127.0.0.1:0").map_err(llam::Error::from)?,
)
.map_err(llam::Error::from)?;
let b_addr = b.local_addr().map_err(llam::Error::from)?;
assert_eq!(
a.send_to(b"datagram", b_addr).map_err(llam::Error::from)?,
8
);
let mut buf = [0u8; 16];
let (n, peer) = b.recv_from(&mut buf).map_err(llam::Error::from)?;
assert_eq!(&buf[..n], b"datagram");
assert_eq!(peer, a.local_addr().map_err(llam::Error::from)?);
Ok(())
}
);
#[cfg(unix)]
llam_test!(
unix_stream_and_file_wrappers,
profile = llam::Profile::IoLatency,
{
let dir = std::env::temp_dir();
let sock_path = dir.join(format!("llam-rs-{}.sock", std::process::id()));
let _ = std::fs::remove_file(&sock_path);
let listener = llam::net::UnixListener::from_std(
std::os::unix::net::UnixListener::bind(&sock_path).map_err(llam::Error::from)?,
)
.map_err(llam::Error::from)?;
let client_path = sock_path.clone();
let thread = std::thread::spawn(move || {
let mut client = std::os::unix::net::UnixStream::connect(client_path).unwrap();
client.write_all(b"unix").unwrap();
});
let mut stream = listener.accept().map_err(llam::Error::from)?;
let mut buf = [0u8; 4];
stream.read_exact(&mut buf).map_err(llam::Error::from)?;
thread.join().expect("unix client thread panicked");
let _ = std::fs::remove_file(&sock_path);
assert_eq!(&buf, b"unix");
let (std_a, mut std_b) =
std::os::unix::net::UnixStream::pair().map_err(llam::Error::from)?;
let mut wrapped = llam::net::UnixStream::from_std(std_a).map_err(llam::Error::from)?;
std_b.write_all(b"pair").map_err(llam::Error::from)?;
let mut pair_buf = [0u8; 4];
wrapped
.read_exact(&mut pair_buf)
.map_err(llam::Error::from)?;
assert_eq!(&pair_buf, b"pair");
let file_path = dir.join(format!("llam-rs-file-{}.txt", std::process::id()));
{
let mut file = llam::fs::File::create(&file_path).map_err(llam::Error::from)?;
file.write_all(b"file").map_err(llam::Error::from)?;
file.flush().map_err(llam::Error::from)?;
}
{
let mut file = llam::fs::File::open(&file_path).map_err(llam::Error::from)?;
let mut text = String::new();
file.read_to_string(&mut text).map_err(llam::Error::from)?;
assert_eq!(text, "file");
}
{
let raw_file = std::fs::File::open(&file_path).map_err(llam::Error::from)?;
let handle = raw_file.as_raw_fd() as llam::io::Handle;
let mut raw = [0u8; 4];
let n = llam::io::read_handle(handle, &mut raw).map_err(llam::Error::from)?;
assert_eq!(n, 4);
assert_eq!(&raw, b"file");
}
let _ = std::fs::remove_file(&file_path);
Ok(())
}
);
#[cfg(unix)]
fn raw_listener_fd(listener: &std::net::TcpListener) -> llam::io::Fd {
listener.as_raw_fd()
}
#[cfg(windows)]
fn raw_listener_fd(listener: &std::net::TcpListener) -> llam::io::Fd {
listener.as_raw_socket() as llam::io::Fd
}
#[cfg(unix)]
unsafe fn std_stream_from_fd(fd: llam::io::Fd) -> std::net::TcpStream {
use std::os::fd::FromRawFd;
std::net::TcpStream::from_raw_fd(fd)
}
#[cfg(windows)]
unsafe fn std_stream_from_fd(fd: llam::io::Fd) -> std::net::TcpStream {
use std::os::windows::io::FromRawSocket;
std::net::TcpStream::from_raw_socket(fd as _)
}