recstrm 0.0.1

Special purpose flow-controlled channel used to stream records.
Documentation
use std::{thread, time};

use recstrm::*;

use testtools::thread::expect_runtime;

enum MyErr {
  Something
}

#[derive(PartialEq, Eq, Hash, Clone)]
enum Checkpoint {
  RecSent
}

#[test]
fn send_after_recv() {
  let (tx, rx) = Builder::new().queue_size(8).build::<_, ()>();

  let jh = thread::spawn(move || {
    thread::sleep(time::Duration::from_millis(500));
    tx.send("hello").unwrap();
  });

  if let Some(node) = rx.recv().unwrap() {
    assert_eq!(node, "hello");
  }

  jh.join().unwrap();
}

#[test]
fn send_before_recv() {
  let sync = testtools::sync::Checkpoint::new();

  let (tx, rx) = Builder::new().queue_size(8).build::<_, ()>();

  let tsync = sync.clone();
  let jh = thread::spawn(move || {
    tx.send("hello").unwrap();
    tsync.reached(Checkpoint::RecSent);
  });

  sync.waitfor([Checkpoint::RecSent]);
  if let Some(node) = rx.recv().unwrap() {
    assert_eq!(node, "hello");
  }

  jh.join().unwrap();
}

#[test]
fn eof_after_sender_drop() {
  let (tx, rx) = Builder::new().queue_size(8).build::<(), ()>();

  drop(tx);

  let Ok(None) = rx.recv() else {
    panic!("Unexpectedly not Ok(None)");
  };
}

#[test]
fn eof_with_queued_records() {
  let (tx, rx) = Builder::new().queue_size(8).build::<_, ()>();

  tx.send(42).unwrap();

  drop(tx);

  let Ok(Some(v)) = rx.recv() else {
    panic!("Unexpectedly not Ok(Some(42))");
  };
  assert_eq!(v, 42);

  let Ok(None) = rx.recv() else {
    panic!("Unexpectedly not Ok(None)");
  };
}

#[test]
fn try_send_full_queue() {
  let (tx, _rx) = Builder::new().queue_size(1).build::<_, ()>();

  tx.send(42).unwrap();

  let Err(Error::QueueFull) = tx.try_send(1147) else {
    panic!("Unexpectedly not Err(Error::QueueFull)");
  };
}

#[test]
fn order() {
  let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>();

  tx.send(0).unwrap();
  tx.send(1).unwrap();
  tx.send(2).unwrap();
  tx.send(3).unwrap();

  assert_eq!(rx.recv().unwrap(), Some(0));
  assert_eq!(rx.recv().unwrap(), Some(1));
  assert_eq!(rx.recv().unwrap(), Some(2));
  assert_eq!(rx.recv().unwrap(), Some(3));
}

#[test]
fn batch_order() {
  let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>();

  let ints = [0, 1, 2, 3];
  tx.send_batch(ints.into_iter()).unwrap();

  assert_eq!(rx.recv().unwrap(), Some(0));
  assert_eq!(rx.recv().unwrap(), Some(1));
  assert_eq!(rx.recv().unwrap(), Some(2));
  assert_eq!(rx.recv().unwrap(), Some(3));
}

#[test]
fn recv_all() {
  let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>();

  let ints = [0, 1, 2, 3];
  tx.send_batch(ints.into_iter()).unwrap();

  let res = rx.recv_all().unwrap().unwrap();

  assert_eq!(res, [0, 1, 2, 3]);
}

#[test]
fn recv_atmost() {
  let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>();

  let ints = [0, 1, 2, 3];
  tx.send_batch(ints.into_iter()).unwrap();

  let res = rx.recv_atmost(2).unwrap().unwrap();
  assert_eq!(res, [0, 1]);

  let res = rx.recv_atmost(2).unwrap().unwrap();
  assert_eq!(res, [2, 3]);
}

// Make sure that sender blocks on send if the queue is full.
//
// Because the call is expected to block we do it on a thread, and make the
// receiver sleep for a known about of time before taking out data from the
// queue.
#[test]
fn sender_blocking() {
  const GRACE_MS: u64 = 250;

  let (tx, rx) = Builder::new().queue_size(2).build::<_, ()>();

  let jh = thread::spawn(move || {
    // Fill the queue up
    let ints = [0, 1];
    tx.send_batch(ints.into_iter()).unwrap();

    // Attempt to send another record.  This should take at least 250ms,
    // because the main thread is delaying 250ms before takeing a record
    // out of the queue
    expect_runtime(time::Duration::from_millis(GRACE_MS), || {
      tx.send(2).unwrap();
    });
  });

  // Wait a bit so we can measure the block of send()
  thread::sleep(time::Duration::from_millis(GRACE_MS));

  let res = rx.recv_atmost(2).unwrap().unwrap();
  assert_eq!(res, [0, 1]);

  let res = rx.recv().unwrap();
  assert_eq!(res, Some(2));

  jh.join().unwrap();
}

#[test]
fn receiver_disappeared() {
  let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>();

  drop(rx);

  let Err(Error::ReceiverDisappeared) = tx.send("hello") else {
    panic!("Result unexpectedly not Err(Error::ReceiverDisappeared)");
  };
}

#[test]
fn fail_sender() {
  let (tx, rx) = Builder::new().queue_size(4).build::<&str, MyErr>();

  tx.fail(MyErr::Something);

  let Err(Error::App(MyErr::Something)) = rx.recv() else {
    panic!("Unexpected return value");
  };
}

#[test]
fn fail_receiver() {
  let (tx, rx) = Builder::new().queue_size(4).build::<&str, MyErr>();

  rx.fail(MyErr::Something);

  let Err(Error::App(MyErr::Something)) = tx.send("hello") else {
    panic!("Unexpected return value");
  };
}

// Construct a records queue with a fixed number of records.  Drop the sender.
// The receiver should return Error::RecordsUnderflow when trying to read
// from the queue.
#[test]
fn drop_sender_before_num_recs() {
  let (tx, rx) = Builder::new()
    .queue_size(4)
    .num_records(8)
    .build::<&str, ()>();

  drop(tx);

  let Err(Error::RecordsUnderflow) = rx.recv() else {
    panic!("Result unexpectedly not Err(Error::ReceiverDisappeared)");
  };
}

// Make sure that there's no off-by-one error when checking for
// RecordsUnderflow.
#[test]
fn drop_sender_before_num_recs_one_off() {
  let (tx, rx) = Builder::new().queue_size(4).num_records(8).build::<_, ()>();

  // Fill up queue
  let ints = [0, 1, 2, 3];
  tx.send_batch(ints.into_iter()).unwrap();

  // Drain queue
  let res = rx.recv_all().unwrap().unwrap();
  assert_eq!(res, [0, 1, 2, 3]);

  // Add elements so we reach num expected records - 1
  let ints = [4, 5, 6];
  tx.send_batch(ints.into_iter()).unwrap();

  drop(tx);

  let Err(Error::RecordsUnderflow) = rx.recv() else {
    panic!("Result unexpectedly not Err(Error::ReceiverDisappeared)");
  };
}


// Make sure that there's no off-by-one error when checking for
// RecordsUnderflow.
#[test]
fn drop_sender_after_num_recs() {
  let (tx, rx) = Builder::new().queue_size(4).num_records(8).build::<_, ()>();

  // Fill up queue
  let ints = [0, 1, 2, 3];
  tx.send_batch(ints.into_iter()).unwrap();

  // Drain queue
  let res = rx.recv_all().unwrap().unwrap();
  assert_eq!(res, [0, 1, 2, 3]);

  // Add elements so we reach num expected records - 1
  let ints = [4, 5, 6, 7];
  tx.send_batch(ints.into_iter()).unwrap();

  drop(tx);

  // Drain queue
  let res = rx.recv_all().unwrap().unwrap();
  assert_eq!(res, [4, 5, 6, 7]);
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :