1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
//! Simple promise library compatible with `std::future` and async/await
//!
//! # Example
//!
//! Create a promise and wait for the result while computing the result in another thread
//!
//! ```rust
//! use pinky_swear::{Pinky, PinkySwear};
//! use std::{thread, time::Duration};
//!
//! fn compute(pinky: Pinky<Result<u32, ()>>) {
//!     thread::sleep(Duration::from_millis(1000));
//!     pinky.swear(Ok(42));
//! }
//!
//! fn main() {
//!     let (promise, pinky) = PinkySwear::new();
//!     thread::spawn(move || {
//!         compute(pinky);
//!     });
//!     assert_eq!(promise.wait(), Ok(42));
//! }
//! ```

#![cfg_attr(feature = "docs", feature(doc_cfg))]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
#![doc(test(attr(deny(rust_2018_idioms, warnings))))]
#![doc(test(attr(allow(unused_extern_crates))))]
#![doc(html_root_url = "https://docs.rs/pinky-swear/5.1.0/")]

doc_comment::doctest!("../README.md");

use flume::{Receiver, Sender};
use parking_lot::{Mutex, RwLock};
use std::{
    fmt,
    future::Future,
    pin::Pin,
    sync::Arc,
    task::{Context, Poll, Waker},
};
use tracing::{trace, warn};

/// Sometimes you just cannot keep your Promises.
pub trait Cancellable<E> {
    /// Cancel the Promise you made, explaining why with an Error.
    fn cancel(&self, err: E);
}

/// A PinkySwear is a Promise that the other party is supposed to honour at some point.
#[must_use = "PinkySwear should be used or you can miss errors"]
pub struct PinkySwear<T> {
    recv: Receiver<T>,
    pinky: Pinky<T>,
}

impl<T: Send + 'static> PinkySwear<T> {
    /// Create a new PinkySwear and its associated Pinky.
    pub fn new() -> (Self, Pinky<T>) {
        let (send, recv) = flume::unbounded();
        let pinky = Pinky {
            send,
            waker: Default::default(),
            marker: Default::default(),
        };
        let promise = Self { recv, pinky };
        let pinky = promise.pinky.clone();
        (promise, pinky)
    }

    /// Create a new PinkySwear and honour it at the same time.
    pub fn new_with_data(data: T) -> Self {
        let (promise, pinky) = Self::new();
        pinky.swear(data);
        promise
    }

    /// Check whether the Promise has been honoured or not.
    pub fn try_wait(&self) -> Option<T> {
        self.recv.try_recv().ok()
    }

    /// Wait until the Promise has been honoured.
    pub fn wait(&self) -> T {
        self.recv.recv().unwrap()
    }

    /// Add a marker to logs
    pub fn set_marker(&self, marker: String) {
        self.pinky.set_marker(marker);
    }

    fn set_waker(&self, waker: Waker) {
        trace!(
            promise = %self.pinky.marker(),
            "Called from future, registering waker.",
        );
        *self.pinky.waker.lock() = Some(waker);
    }
}

impl<T: Send + 'static> Future for PinkySwear<T> {
    type Output = T;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.set_waker(cx.waker().clone());
        self.try_wait().map(Poll::Ready).unwrap_or(Poll::Pending)
    }
}

impl<T> fmt::Debug for PinkySwear<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "PinkySwear")
    }
}

impl<T> Drop for PinkySwear<T> {
    fn drop(&mut self) {
        trace!(
            promise = %self.pinky.marker(),
            "Dropping promise.",
        );
    }
}

/// A Pinky allows you to fulfill a Promise that you made.
pub struct Pinky<T> {
    send: Sender<T>,
    waker: Arc<Mutex<Option<Waker>>>,
    marker: Arc<RwLock<Option<String>>>,
}

impl<T> Pinky<T> {
    /// Honour your PinkySwear by giving the promised data.
    pub fn swear(&self, data: T) {
        trace!(
            promise = %self.marker(),
            "Resolving promise.",
        );
        if let Err(err) = self.send.send(data) {
            warn!(
                promise = %self.marker(),
                error = %err,
                "Failed resolving promise, promise has vanished.",
            );
        }
        if let Some(waker) = self.waker.lock().as_ref() {
            trace!("Got data, waking our waker.");
            waker.wake_by_ref();
        } else {
            trace!("Got data but we have no one to notify.");
        }
    }

    fn set_marker(&self, marker: String) {
        *self.marker.write() = Some(marker);
    }

    fn marker(&self) -> String {
        self.marker
            .read()
            .as_ref()
            .map_or(String::default(), |marker| format!("[{}] ", marker))
    }
}

impl<T> Clone for Pinky<T> {
    fn clone(&self) -> Self {
        Self {
            send: self.send.clone(),
            waker: self.waker.clone(),
            marker: self.marker.clone(),
        }
    }
}

impl<T, E> Cancellable<E> for Pinky<Result<T, E>> {
    fn cancel(&self, err: E) {
        self.swear(Err(err))
    }
}

impl<T> PartialEq for Pinky<T> {
    fn eq(&self, other: &Pinky<T>) -> bool {
        Arc::ptr_eq(&self.waker, &other.waker)
    }
}

impl<T> fmt::Debug for Pinky<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "Pinky")
    }
}

/// A PinkyBroadcaster allows you to broacast a promise resolution to several subscribers.
#[must_use = "PinkyBroadcaster must be subscribed"]
pub struct PinkyBroadcaster<T: Clone> {
    marker: Arc<RwLock<Option<String>>>,
    inner: Arc<Mutex<BroadcasterInner<T>>>,
}

impl<T: Send + Clone + 'static> PinkyBroadcaster<T> {
    /// Add a marker to logs
    pub fn set_marker(&self, marker: String) {
        for subscriber in self.inner.lock().0.iter() {
            subscriber.set_marker(marker.clone());
        }
        *self.marker.write() = Some(marker);
    }

    /// Subscribe to receive a broacast when the underlying promise get henoured.
    pub fn subscribe(&self) -> PinkySwear<T> {
        self.inner.lock().subscribe(self.marker.read().clone())
    }

    /// Unsubscribe a promise from the broadcast.
    pub fn unsubscribe(&self, promise: PinkySwear<T>) {
        self.inner.lock().unsubscribe(promise);
    }

    /// Resolve the underlying promise and broadcast the result to subscribers.
    pub fn swear(&self, data: T) {
        for subscriber in self.inner.lock().0.iter() {
            subscriber.swear(data.clone())
        }
    }
}

impl<T: Send + Clone + 'static> Default for PinkyBroadcaster<T> {
    fn default() -> Self {
        Self {
            marker: Default::default(),
            inner: Arc::new(Mutex::new(BroadcasterInner(Vec::default()))),
        }
    }
}

impl<T: Clone> fmt::Debug for PinkyBroadcaster<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "PinkyBroadcaster")
    }
}

struct BroadcasterInner<T>(Vec<Pinky<T>>);

impl<T: Send + 'static> BroadcasterInner<T> {
    fn subscribe(&mut self, marker: Option<String>) -> PinkySwear<T> {
        let (promise, pinky) = PinkySwear::new();
        self.0.push(pinky);
        if let Some(marker) = marker {
            promise.set_marker(marker);
        }
        promise
    }

    fn unsubscribe(&mut self, promise: PinkySwear<T>) {
        self.0.retain(|pinky| pinky != &promise.pinky)
    }
}