1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
//! _swctx_ is very similar to a one-shot channel, but with some added
//! semantics.
//!
//! ```
//! use std::thread;
//! use swctx::mkpair;
//!
//! let (sctx, wctx) = mkpair::<&str, &str, &str>();
//! let jh = thread::spawn(move || {
//!   sctx.set_state("in thread");
//!   sctx.set("hello");
//! });
//! jh.join().unwrap();
//!
//! assert_eq!(wctx.wait().unwrap(), "hello");
//! ```
//!
//! In a typical use-case an application or library calls [`mkpair()`] to
//! create a pair of linked [`SetCtx`] and [`WaitCtx`] object.  The `SetCtx`
//! object is transferred to a remote thread/task, and the `WaitCtx` is used
//! wait for an object to arrive [from the thread/task the `SetCtx` is sent
//! to].
//!
//! Once the thread/task has data to send back to the `WaitCtx` it calls
//! [`SetCtx::set()`] to send the data.
//!
//! The `SetCtx` has an internal state, settable using [`SetCtx::set_state()`]
//! that will be reported back to the `WaitCtx`, which will return
//! [`Error::Aborted`], if the `SetCtx` is dropped prematurely.
//!
//! The `SetCtx` can also signal a failure by calling [`SetCtx::fail()`] and
//! pass along an application-specific error code.  This will cause the
//! `WaitCtx` to unblock and return [`Error::App`].

mod err;
mod sctx;
pub(crate) mod wctx;

use std::{sync::Arc, task::Waker};

use parking_lot::{Condvar, Mutex};

pub use sctx::SetCtx;
pub use wctx::{WaitCtx, WaitFuture};

pub use err::Error;

enum State<T, S, E> {
  /// Waiting for a delivery.
  Waiting,

  /// Data was delivered.
  Data(T),

  /// Reply is being returned to caller.
  Finalized,

  /// An error occurred.
  Err(Error<S, E>)
}

struct Inner<T, S, E> {
  state: State<T, S, E>,
  sctx_state: S,
  waker: Option<Waker>
}

impl<T, S, E> Inner<T, S, E> {
  fn try_get(&mut self) -> Result<Option<T>, Error<S, E>> {
    match self.state {
      State::Waiting => Ok(None),
      State::Data(_) => {
        let old = std::mem::replace(&mut self.state, State::Finalized);
        let State::Data(data) = old else {
          panic!("Unable to extract data");
        };
        Ok(Some(data))
      }
      State::Err(_) => {
        let old = std::mem::replace(&mut self.state, State::Finalized);
        let State::Err(err) = old else {
          panic!("Unable to extract error");
        };
        Err(err)
      }
      _ => {
        panic!("Unexpected state");
      }
    }
  }
}


struct Shared<T, S, E> {
  inner: Mutex<Inner<T, S, E>>,
  signal: Condvar
}

impl<T, S, E> Shared<T, S, E> {
  fn notify_waiter(&self, inner: &mut Inner<T, S, E>) {
    self.signal.notify_one();
    if let Some(waker) = inner.waker.take() {
      waker.wake()
    }
  }
}


/// Create a linked [`SetCtx`] and [`WaitCtx`] pair.
///
/// The `WaitCtx` is used to wait for a value to arrive from the `SetCtx`.
pub fn mkpair<T, S, E>() -> (SetCtx<T, S, E>, WaitCtx<T, S, E>)
where
  S: Clone + Default
{
  let inner = Inner {
    state: State::Waiting,
    sctx_state: S::default(),
    waker: None
  };
  let sh = Shared {
    inner: Mutex::new(inner),
    signal: Condvar::new()
  };
  let sh = Arc::new(sh);

  let sctx = SetCtx(Arc::clone(&sh));
  let wctx = WaitCtx(sh);

  (sctx, wctx)
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :