llam 0.1.2

Safe, Go-style Rust bindings for the LLAM runtime
use std::io::{Read, Write};
use std::time::Duration;

macro_rules! llam_test {
    ($name:ident, $body:block) => {
        #[test]
        fn $name() -> llam::Result<()> {
            llam::test(|| $body)
        }
    };
    ($name:ident, profile = $profile:expr, $body:block) => {
        #[test]
        fn $name() -> llam::Result<()> {
            llam::test_with_profile($profile, || $body)
        }
    };
}

llam_test!(spawn_and_channel, {
    let (tx, rx) = llam::channel::bounded::<u32>(8)?;
    let handle = llam::spawn!(move {
        tx.send(42).expect("send failed");
        7u32
    });

    assert_eq!(rx.recv()?, 42);
    assert_eq!(handle.join().expect("join failed"), 7);
    Ok(())
});

llam_test!(try_spawn_task_batch_and_closed_channel, {
    let handle = llam::try_spawn!({ 5u32 })?;
    assert_eq!(handle.join().expect("try_spawn join failed"), 5);

    let mut batch = llam::TaskBatch::with_capacity(4);
    for i in 0..4u32 {
        batch.spawn(move || i + 1)?;
    }
    assert_eq!(
        batch.join().expect("task batch join failed"),
        vec![1, 2, 3, 4]
    );

    let (tx, rx) = llam::channel::bounded::<u32>(1)?;
    tx.try_send(9).expect("try_send failed");
    assert_eq!(rx.try_recv_option()?, Some(9));
    drop(tx);
    assert_eq!(rx.recv_option()?, None);

    let selected = llam::select! {
        recv(rx) -> msg => {
            let _ = msg.expect("select recv failed");
            1
        },
        closed(rx) => {
            2
        },
    };
    assert_eq!(selected, 2);
    Ok(())
});

llam_test!(select_timeout, profile = llam::Profile::DebugSafe, {
    let (_tx, rx) = llam::channel::bounded::<u32>(1)?;
    let mut timed_out = false;

    llam::select! {
        recv(rx) -> msg => {
            let _ = msg;
        },
        after(Duration::from_millis(1)) => {
            timed_out = true;
        },
    }

    assert!(timed_out);
    Ok(())
});

llam_test!(select_default_and_mixed_send_recv, {
    let (_tx, rx) = llam::channel::bounded::<u32>(1)?;
    let defaulted = llam::select! {
        recv(rx) -> msg => {
            let _ = msg;
            false
        },
        default => {
            true
        },
    };
    assert!(defaulted);

    let (tx, rx) = llam::channel::bounded::<u32>(1)?;
    let selected = llam::select! {
        recv(rx) -> msg => {
            msg.expect("select recv failed")
        },
        send(tx, 11u32) => {
            11
        },
        default => {
            0
        },
    };
    assert_eq!(selected, 11);
    assert_eq!(rx.recv()?, 11);

    let (_t0, r0) = llam::channel::bounded::<u32>(1)?;
    let (_t1, r1) = llam::channel::bounded::<u32>(1)?;
    let (_t2, r2) = llam::channel::bounded::<u32>(1)?;
    let (_t3, r3) = llam::channel::bounded::<u32>(1)?;
    let (_t4, r4) = llam::channel::bounded::<u32>(1)?;
    let any = llam::select! {
        recv(r0) -> value => value.expect("select recv failed"),
        recv(r1) -> value => value.expect("select recv failed"),
        recv(r2) -> value => value.expect("select recv failed"),
        recv(r3) -> value => value.expect("select recv failed"),
        recv(r4) -> value => value.expect("select recv failed"),
        default => { 123 },
    };
    assert_eq!(any, 123);
    Ok(())
});

llam_test!(task_local_roundtrip, {
    let key = llam::TaskLocalKey::<String>::new()?;

    key.set("main".to_string())?;
    assert_eq!(key.get_cloned()?.as_deref(), Some("main"));

    let child_key = key.clone();
    let handle = llam::spawn!(move {
        assert!(child_key.get_cloned().unwrap().is_none());
        child_key.set("child".to_string()).unwrap();
        child_key.take().unwrap()
    });

    assert_eq!(
        handle.join().expect("join failed").as_deref(),
        Some("child")
    );
    assert_eq!(key.take()?.as_deref(), Some("main"));
    assert!(key.get_cloned()?.is_none());

    let scoped = key.with("scoped".to_string(), || key.get_cloned().unwrap().unwrap())?;
    assert_eq!(scoped, "scoped");
    assert!(key.get_cloned()?.is_none());
    Ok(())
});

llam_test!(abi_stats_join_and_select_send, {
    let abi = llam::AbiInfo::load()?;
    assert_eq!(abi.abi_major(), llam::sys::LLAM_ABI_VERSION_MAJOR);
    assert!(!llam::abi::version_string().is_empty());

    let mut handle = llam::spawn!(move {
        llam::time::sleep(Duration::from_millis(5)).unwrap();
        99u32
    });
    assert!(handle.join_until(0).expect("timed join failed").is_none());
    assert_eq!(
        handle.join_timeout(Duration::from_secs(1)).unwrap(),
        Some(99)
    );

    let (tx, rx) = llam::channel::bounded::<u32>(1)?;
    let sent = llam::select! {
        send(tx, 7u32) => {
            true
        },
        after(Duration::from_secs(1)) => {
            panic!("send select timed out");
        },
    };
    assert!(sent);
    assert_eq!(rx.recv()?, 7);

    #[cfg(unix)]
    {
        let path = std::env::temp_dir().join(format!("llam-rs-stats-{}.json", std::process::id()));
        let file = std::fs::File::create(&path).map_err(llam::Error::from)?;
        llam::diagnostics::write_stats_json(&file)?;
        drop(file);
        let json = std::fs::read_to_string(&path).map_err(llam::Error::from)?;
        let _ = std::fs::remove_file(&path);
        assert!(json.contains("ctx_switches"));
    }
    Ok(())
});

llam_test!(
    tcp_stream_connect_uses_llam_path,
    profile = llam::Profile::IoLatency,
    {
        let listener = std::net::TcpListener::bind("127.0.0.1:0").map_err(llam::Error::from)?;
        let addr = listener.local_addr().map_err(llam::Error::from)?;
        let thread = std::thread::spawn(move || {
            let (mut stream, _) = listener.accept().unwrap();
            let mut buf = [0u8; 4];
            stream.read_exact(&mut buf).unwrap();
            stream.write_all(&buf).unwrap();
        });

        let mut stream = llam::net::TcpStream::connect(addr).map_err(llam::Error::from)?;
        stream.write_all(b"ping").map_err(llam::Error::from)?;
        let mut buf = [0u8; 4];
        stream.read_exact(&mut buf).map_err(llam::Error::from)?;
        thread.join().expect("tcp echo thread panicked");
        assert_eq!(&buf, b"ping");
        Ok(())
    }
);

llam_test!(
    udp_connected_socket_roundtrip,
    profile = llam::Profile::IoLatency,
    {
        let a = llam::net::UdpSocket::bind("127.0.0.1:0").map_err(llam::Error::from)?;
        let b = llam::net::UdpSocket::bind("127.0.0.1:0").map_err(llam::Error::from)?;
        a.connect(b.local_addr().map_err(llam::Error::from)?)
            .map_err(llam::Error::from)?;
        b.connect(a.local_addr().map_err(llam::Error::from)?)
            .map_err(llam::Error::from)?;

        assert_eq!(a.send(b"pong").map_err(llam::Error::from)?, 4);
        let mut buf = [0u8; 4];
        assert_eq!(b.recv(&mut buf).map_err(llam::Error::from)?, 4);
        assert_eq!(&buf, b"pong");
        Ok(())
    }
);

#[cfg(unix)]
llam_test!(
    unix_stream_and_file_wrappers,
    profile = llam::Profile::IoLatency,
    {
        let dir = std::env::temp_dir();
        let sock_path = dir.join(format!("llam-rs-{}.sock", std::process::id()));
        let _ = std::fs::remove_file(&sock_path);

        let listener = llam::net::UnixListener::bind(&sock_path).map_err(llam::Error::from)?;
        let client_path = sock_path.clone();
        let thread = std::thread::spawn(move || {
            let mut client = std::os::unix::net::UnixStream::connect(client_path).unwrap();
            client.write_all(b"unix").unwrap();
        });
        let mut stream = listener.accept().map_err(llam::Error::from)?;
        let mut buf = [0u8; 4];
        stream.read_exact(&mut buf).map_err(llam::Error::from)?;
        thread.join().expect("unix client thread panicked");
        let _ = std::fs::remove_file(&sock_path);
        assert_eq!(&buf, b"unix");

        let file_path = dir.join(format!("llam-rs-file-{}.txt", std::process::id()));
        {
            let mut file = llam::fs::File::create(&file_path).map_err(llam::Error::from)?;
            file.write_all(b"file").map_err(llam::Error::from)?;
            file.flush().map_err(llam::Error::from)?;
        }
        {
            let mut file = llam::fs::File::open(&file_path).map_err(llam::Error::from)?;
            let mut text = String::new();
            file.read_to_string(&mut text).map_err(llam::Error::from)?;
            assert_eq!(text, "file");
        }
        let _ = std::fs::remove_file(&file_path);
        Ok(())
    }
);