sigq 0.13.5

Queue that signals waiting consumers about node availability
Documentation
use sigq::{Puller, Pusher, StaleErr};


/// A reustable test
fn std_test(tx: &Pusher<&str>, rx: &Puller<&str>) {
  // There's a client, no data; try_pop() should return Ok(None)
  assert_eq!(rx.try_pop(), Ok(None));

  tx.push("hello").unwrap();

  // There's a client, data; try_pop() should return Ok(Some)
  assert_eq!(rx.try_pop(), Ok(Some("hello")));

  // There's a client; try_pop() should return Ok(None)
  assert_eq!(rx.try_pop(), Ok(None));
}

#[test]
fn try_pop_retvals() {
  let (tx, rx) = sigq::new();

  // Run the common tests
  std_test(&tx, &rx);

  // Drop the only pusher.
  drop(tx);

  // There are no more clients, so try_pop() should return Err(())
  assert_eq!(rx.try_pop(), Err(StaleErr));
}

#[test]
fn try_pop_with_only_weak_tx() {
  let (tx, rx) = sigq::new::<&str>();

  // Create a weak reference to the pusher
  let _wtx = tx.weak();

  // Drop the only (strong) pusher.
  drop(tx);

  // There are no more clients; try_pop() should return Err(StaleErr)
  assert_eq!(rx.try_pop(), Err(StaleErr));
}

#[test]
fn drop_nonempty() {
  let (tx, rx) = sigq::new();

  // There's a client, no data, so try_pop() should return Ok(None)
  assert_eq!(rx.try_pop(), Ok(None));

  tx.push("hello").unwrap();

  // Drop the only pusher.  It should still be possible to pull existing nodes
  // off the stack.
  drop(tx);

  // There's a client, data, so try_pop() should return Ok(Some)
  assert_eq!(rx.try_pop(), Ok(Some("hello")));

  // There are no more clients, so try_pop() should return Err(())
  assert_eq!(rx.try_pop(), Err(StaleErr));
}

#[test]
fn unblock_on_stale() {
  let (tx, rx) = sigq::new::<&str>();

  // Kick off a thread with a Puller; blocking and waiting for a node or
  // termination.  When the Pusher is dropped this should return Err(StaleErr).
  let jh = std::thread::spawn(move || match rx.pop() {
    Ok(_) => false,
    Err(StaleErr) => true
  });

  std::thread::sleep(std::time::Duration::from_millis(500));

  drop(tx);

  assert!(jh.join().unwrap());
}

#[test]
fn stale_puller() {
  let (tx, rx) = sigq::new::<&str>();

  drop(rx);

  assert_eq!(tx.push("hello"), Err(StaleErr));
}

/// Make sure that a weakened pusher can be upgraded and used
#[test]
fn weak_to_strong() {
  let (tx, rx) = sigq::new::<&str>();

  // Create a weak reference to the pusher
  let wtx = tx.weak();

  // Create a strong Pusher from the weak reference
  let Some(tx2) = wtx.upgrade() else {
    panic!("Unable to upgrade weak Pusher");
  };

  // Run the common tests
  std_test(&tx2, &rx);

  // Drop the only (strong) pusher.
  drop(tx);

  // There's still a Pusher; try_pop() should return Ok(None)
  assert_eq!(rx.try_pop(), Ok(None));

  drop(tx2);

  // There are no more Pushers; try_pop() should return Err(StaleErr)
  assert_eq!(rx.try_pop(), Err(StaleErr));
}

/// If the last strong `Pusher` is dropped a `WeakPusher` should fail to
/// upgrade to a `Pusher`.
#[test]
fn no_strong_weak_upgrade_fail() {
  let (tx, _rx) = sigq::new::<&str>();

  // Create a weak reference to the pusher
  let wtx = tx.weak();

  // Drop the only (strong) pusher.
  drop(tx);

  // The only strong pusher was released; upgrading the weak one should fail
  let Some(_) = wtx.upgrade() else {
    panic!("Upgrade unexpectedly successful");
  };
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :