recstrm 0.0.1

Special purpose flow-controlled channel used to stream records.
Documentation
use tokio::{task, time};

use testtools::task::expect_runtime;

use recstrm::*;

enum MyErr {
  Something
}

#[tokio::test]
async fn send_after_recv() {
  let (tx, rx) = Builder::new().queue_size(8).build::<_, ()>();

  let jh = task::spawn(async move {
    time::sleep(time::Duration::from_millis(500)).await;
    tx.send_async("hello").await.unwrap();
  });

  if let Some(node) = rx.recv_async().await.unwrap() {
    assert_eq!(node, "hello");
  }

  jh.await.unwrap();
}

#[tokio::test]
async fn send_before_recv() {
  let (tx, rx) = Builder::new().queue_size(8).build::<_, ()>();

  let jh = task::spawn(async move {
    tx.send_async("hello").await.unwrap();
  });

  time::sleep(time::Duration::from_millis(500)).await;
  if let Some(node) = rx.recv_async().await.unwrap() {
    assert_eq!(node, "hello");
  }

  jh.await.unwrap();
}

#[tokio::test]
async fn eof_after_sender_drop() {
  let (tx, rx) = Builder::new().queue_size(8).build::<(), ()>();

  drop(tx);

  let Ok(None) = rx.recv_async().await else {
    panic!("Unexpectedly not Ok(None)");
  };
}

#[tokio::test]
async fn eof_with_queued_records() {
  let (tx, rx) = Builder::new().queue_size(8).build::<_, ()>();

  tx.send_async(42).await.unwrap();

  drop(tx);

  let Ok(Some(v)) = rx.recv_async().await else {
    panic!("Unexpectedly not Ok(Some(42))");
  };
  assert_eq!(v, 42);

  let Ok(None) = rx.recv_async().await else {
    panic!("Unexpectedly not Ok(None)");
  };
}

#[tokio::test]
async fn try_send_full_queue() {
  let (tx, _rx) = Builder::new().queue_size(1).build::<_, ()>();

  tx.send_async(42).await.unwrap();

  let Err(Error::QueueFull) = tx.try_send(1147) else {
    panic!("Unexpectedly not Err(Error::QueueFull)");
  };
}

#[tokio::test]
async fn order() {
  let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>();

  tx.send_async(0).await.unwrap();
  tx.send_async(1).await.unwrap();
  tx.send_async(2).await.unwrap();
  tx.send_async(3).await.unwrap();

  assert_eq!(rx.recv_async().await.unwrap(), Some(0));
  assert_eq!(rx.recv_async().await.unwrap(), Some(1));
  assert_eq!(rx.recv_async().await.unwrap(), Some(2));
  assert_eq!(rx.recv_async().await.unwrap(), Some(3));
}

#[tokio::test]
async fn batch_order() {
  let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>();

  let ints = [0, 1, 2, 3];
  tx.send_batch_async(ints.into_iter()).await.unwrap();

  assert_eq!(rx.recv_async().await.unwrap(), Some(0));
  assert_eq!(rx.recv_async().await.unwrap(), Some(1));
  assert_eq!(rx.recv_async().await.unwrap(), Some(2));
  assert_eq!(rx.recv_async().await.unwrap(), Some(3));
}

#[tokio::test]
async fn recv_all() {
  let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>();

  let ints = [0, 1, 2, 3];
  tx.send_batch_async(ints.into_iter()).await.unwrap();

  let res = rx.recv_all_async().await.unwrap().unwrap();

  assert_eq!(res, [0, 1, 2, 3]);
}

#[tokio::test]
async fn recv_atmost() {
  let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>();

  let ints = [0, 1, 2, 3];
  tx.send_batch_async(ints.into_iter()).await.unwrap();

  let res = rx.recv_atmost_async(2).await.unwrap().unwrap();
  assert_eq!(res, [0, 1]);

  let res = rx.recv_atmost_async(2).await.unwrap().unwrap();
  assert_eq!(res, [2, 3]);
}

// Make sure that sender blocks on send if the queue is full.
//
// Because the call is expected to block we do it on a task, and make the
// receiver sleep for a known about of time before taking out data from the
// queue.
#[tokio::test]
async fn sender_blocking() {
  const GRACE_MS: u64 = 250;

  let (tx, rx) = Builder::new().queue_size(2).build::<_, ()>();

  let jh = task::spawn(async {
    // Fill the queue up
    let ints = [0, 1];
    tx.send_batch_async(ints.into_iter()).await.unwrap();

    // Attempt to send another record.  This should take at least 250ms,
    // because the main thread is delaying 250ms before takeing a record
    // out of the queue

    expect_runtime(time::Duration::from_millis(GRACE_MS), async move {
      tx.send_async(2).await.unwrap();
    })
    .await;
  });

  // Wait a bit so we can measure the block of send()
  time::sleep(time::Duration::from_millis(GRACE_MS)).await;

  let res = rx.recv_atmost_async(2).await.unwrap().unwrap();
  assert_eq!(res, [0, 1]);

  let res = rx.recv_async().await.unwrap();
  assert_eq!(res, Some(2));

  jh.await.unwrap();
}


#[tokio::test]
async fn receiver_disappeared() {
  let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>();

  drop(rx);

  let Err(Error::ReceiverDisappeared) = tx.send_async("hello").await else {
    panic!("Result unexpectedly not Err(Error::ReceiverDisappeared)");
  };
}

#[tokio::test]
async fn fail_sender() {
  let (tx, rx) = Builder::new().queue_size(4).build::<&str, MyErr>();

  tx.fail(MyErr::Something);

  let Err(Error::App(MyErr::Something)) = rx.recv_async().await else {
    panic!("Unexpected return value");
  };
}

#[tokio::test]
async fn fail_receiver() {
  let (tx, rx) = Builder::new().queue_size(4).build::<&str, MyErr>();

  rx.fail(MyErr::Something);

  let Err(Error::App(MyErr::Something)) = tx.send_async("hello").await else {
    panic!("Unexpected return value");
  };
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :