selium-messaging 1.0.0-alpha.6

Streaming compute fabric
Documentation
#![cfg(not(feature = "loom"))]

use std::pin::pin;

use futures::join;
use selium_messaging::{Channel, ChannelError, Writer};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::test]
async fn test_readwrite() {
    let channel = Channel::new(4);
    let writer1 = channel.new_writer();
    let writer2 = channel.new_writer();
    let mut strong_r = pin!(channel.new_strong_reader());
    let mut weak_r = pin!(channel.new_weak_reader());

    let mut buf = [0; 120000];
    let (_, _, r) = join!(
        write(writer1, &[1, 1, 1]),
        write(writer2, &[2, 2, 2]),
        strong_r.read_exact(&mut buf)
    );
    r.unwrap();

    for line in buf.chunks(3) {
        if line != [1, 1, 1] && line != [2, 2, 2] {
            panic!("Writes out of order: {line:?}");
        }
    }

    let mut buf = Vec::new();
    let err: ChannelError = weak_r.read(&mut buf).await.unwrap_err().downcast().unwrap();
    assert_eq!(err, ChannelError::ReaderBehind(119996));
}

async fn write(mut writer: Writer, buf: &[u8]) {
    for _ in 0..20000 {
        writer.write_all(buf).await.unwrap();
    }
}