swctx 0.3.0

One-shot channel with some special semantics.
Documentation
//! _swctx_ is similar to a cross-thread/task one-shot channel, with the added
//! ability to store a generic "current state" of the channel prior to passing
//! a value over the channel.
//!
//! ```
//! use std::thread;
//! use swctx::mkpair;
//!
//! let (sctx, wctx) = mkpair::<&str, &str, &str>();
//! let jh = thread::spawn(move || {
//!   sctx.set_state("in thread");
//!   sctx.set("hello");
//! });
//! jh.join().unwrap();
//!
//! assert_eq!(wctx.wait().unwrap(), "hello");
//! ```
//!
//! In a typical use-case an application or library calls [`mkpair()`] to
//! create a pair of linked [`SetCtx`] and [`WaitCtx`] object.  The `SetCtx`
//! object is transferred to a remote thread/task, and the `WaitCtx` is used
//! wait for an object to arrive [from the thread/task the `SetCtx` is sent
//! to].
//!
//! Once the thread/task has data to send back to the `WaitCtx` it calls
//! [`SetCtx::set()`] to send the data.
//!
//! The `SetCtx` has an internal state, settable using [`SetCtx::set_state()`]
//! that will be reported back to the `WaitCtx`, which will return
//! [`Error::Aborted`], if the `SetCtx` is dropped prematurely.
//!
//! The `SetCtx` can also signal a failure by calling [`SetCtx::fail()`] and
//! pass along an application-specific error code.  This will cause the
//! `WaitCtx` to unblock and return [`Error::App`].

mod err;
mod sctx;
pub(crate) mod wctx;

use std::{sync::Arc, task::Waker};

use parking_lot::{Condvar, Mutex};

pub use sctx::SetCtx;
pub use wctx::{WaitCtx, WaitFuture};

pub use err::Error;

enum State<T, S, E> {
  /// Waiting for a delivery.
  Waiting,

  /// Data was delivered.
  Data(T),

  /// Reply is being returned to caller.
  Finalized,

  /// An error occurred.
  Err(Error<S, E>)
}

struct Inner<T, S, E> {
  state: State<T, S, E>,
  sctx_state: S,
  waker: Option<Waker>,
  wctx_dropped: bool
}

impl<T, S, E> Inner<T, S, E> {
  fn try_get(&mut self) -> Result<Option<T>, Error<S, E>> {
    match self.state {
      State::Waiting => Ok(None),
      State::Data(_) => {
        let old = std::mem::replace(&mut self.state, State::Finalized);
        let State::Data(data) = old else {
          panic!("Unable to extract data");
        };
        Ok(Some(data))
      }
      State::Err(_) => {
        let old = std::mem::replace(&mut self.state, State::Finalized);
        let State::Err(err) = old else {
          panic!("Unable to extract error");
        };
        Err(err)
      }
      State::Finalized => {
        // Shouldn't be possible
        unimplemented!("Unexpected state");
      }
    }
  }
}


struct Shared<T, S, E> {
  inner: Mutex<Inner<T, S, E>>,
  signal: Condvar
}

impl<T, S, E> Shared<T, S, E> {
  fn notify_waiter(&self, inner: &mut Inner<T, S, E>) {
    self.signal.notify_one();
    if let Some(waker) = inner.waker.take() {
      waker.wake();
    }
  }
}


/// Create a linked [`SetCtx`] and [`WaitCtx`] pair.
///
/// The `WaitCtx` is used to wait for a value to arrive from the `SetCtx`.
#[must_use]
pub fn mkpair<T, S, E>() -> (SetCtx<T, S, E>, WaitCtx<T, S, E>)
where
  S: Clone + Default
{
  let inner = Inner {
    state: State::Waiting,
    sctx_state: S::default(),
    waker: None,
    wctx_dropped: false
  };
  let sh = Shared {
    inner: Mutex::new(inner),
    signal: Condvar::new()
  };
  let sh = Arc::new(sh);

  let sctx = SetCtx(Arc::clone(&sh));
  let wctx = WaitCtx(sh);

  (sctx, wctx)
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :