pinky_swear/
lib.rs

1//! Simple promise library compatible with `std::future` and async/await
2//!
3//! # Example
4//!
5//! Create a promise and wait for the result while computing the result in another thread
6//!
7//! ```rust
8//! use pinky_swear::{Pinky, PinkySwear};
9//! use std::{thread, time::Duration};
10//!
11//! fn compute(pinky: Pinky<Result<u32, ()>>) {
12//!     thread::sleep(Duration::from_millis(1000));
13//!     pinky.swear(Ok(42));
14//! }
15//!
16//! fn main() {
17//!     let (promise, pinky) = PinkySwear::new();
18//!     thread::spawn(move || {
19//!         compute(pinky);
20//!     });
21//!     assert_eq!(promise.wait(), Ok(42));
22//! }
23//! ```
24
25#![cfg_attr(feature = "docs", feature(doc_cfg))]
26#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
27#![doc(test(attr(deny(rust_2018_idioms, warnings))))]
28#![doc(test(attr(allow(unused_extern_crates))))]
29#![doc(html_root_url = "https://docs.rs/pinky-swear/6.2.1/")]
30
31doc_comment::doctest!("../README.md");
32
33use flume::{Receiver, Sender};
34use parking_lot::{Mutex, RwLock};
35use std::{
36    fmt,
37    future::Future,
38    pin::Pin,
39    sync::Arc,
40    task::{Context, Poll, Waker},
41};
42use tracing::{trace, warn};
43
44/// Sometimes you just cannot keep your Promises.
45pub trait Cancellable<E> {
46    /// Cancel the Promise you made, explaining why with an Error.
47    fn cancel(&self, err: E);
48}
49
50/// A PinkySwear is a Promise that the other party is supposed to honour at some point.
51#[must_use = "PinkySwear should be used or you can miss errors"]
52pub struct PinkySwear<T> {
53    recv: Receiver<T>,
54    pinky: Pinky<T>,
55}
56
57impl<T: Send + 'static> PinkySwear<T> {
58    /// Create a new PinkySwear and its associated Pinky.
59    pub fn new() -> (Self, Pinky<T>) {
60        let (send, recv) = flume::unbounded();
61        let pinky = Pinky {
62            send,
63            waker: Default::default(),
64            marker: Default::default(),
65        };
66        let promise = Self { recv, pinky };
67        let pinky = promise.pinky.clone();
68        (promise, pinky)
69    }
70
71    /// Create a new PinkySwear and honour it at the same time.
72    pub fn new_with_data(data: T) -> Self {
73        let (promise, pinky) = Self::new();
74        pinky.swear(data);
75        promise
76    }
77
78    /// Check whether the Promise has been honoured or not.
79    pub fn try_wait(&self) -> Option<T> {
80        self.recv.try_recv().ok()
81    }
82
83    /// Wait until the Promise has been honoured.
84    pub fn wait(&self) -> T {
85        self.recv.recv().unwrap()
86    }
87
88    /// Add a marker to logs
89    pub fn set_marker(&self, marker: String) {
90        self.pinky.set_marker(marker);
91    }
92
93    fn set_waker(&self, waker: Waker) {
94        trace!(
95            promise = %self.pinky.marker(),
96            "Called from future, registering waker.",
97        );
98        *self.pinky.waker.lock() = Some(waker);
99    }
100}
101
102impl<T: Send + 'static> Future for PinkySwear<T> {
103    type Output = T;
104
105    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
106        self.set_waker(cx.waker().clone());
107        self.try_wait().map(Poll::Ready).unwrap_or(Poll::Pending)
108    }
109}
110
111impl<T> fmt::Debug for PinkySwear<T> {
112    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113        write!(f, "PinkySwear")
114    }
115}
116
117impl<T> Drop for PinkySwear<T> {
118    fn drop(&mut self) {
119        trace!(
120            promise = %self.pinky.marker(),
121            "Dropping promise.",
122        );
123    }
124}
125
126/// A Pinky allows you to fulfill a Promise that you made.
127pub struct Pinky<T> {
128    send: Sender<T>,
129    waker: Arc<Mutex<Option<Waker>>>,
130    marker: Arc<RwLock<Option<String>>>,
131}
132
133impl<T> Pinky<T> {
134    /// Honour your PinkySwear by giving the promised data.
135    pub fn swear(&self, data: T) {
136        trace!(
137            promise = %self.marker(),
138            "Resolving promise.",
139        );
140        if let Err(err) = self.send.send(data) {
141            warn!(
142                promise = %self.marker(),
143                error = %err,
144                "Failed resolving promise, promise has vanished.",
145            );
146        }
147        if let Some(waker) = self.waker.lock().as_ref() {
148            trace!("Got data, waking our waker.");
149            waker.wake_by_ref();
150        } else {
151            trace!("Got data but we have no one to notify.");
152        }
153    }
154
155    fn set_marker(&self, marker: String) {
156        *self.marker.write() = Some(marker);
157    }
158
159    fn marker(&self) -> String {
160        self.marker
161            .read()
162            .as_ref()
163            .map_or(String::default(), |marker| format!("[{}] ", marker))
164    }
165}
166
167impl<T> Clone for Pinky<T> {
168    fn clone(&self) -> Self {
169        Self {
170            send: self.send.clone(),
171            waker: self.waker.clone(),
172            marker: self.marker.clone(),
173        }
174    }
175}
176
177impl<T, E> Cancellable<E> for Pinky<Result<T, E>> {
178    fn cancel(&self, err: E) {
179        self.swear(Err(err))
180    }
181}
182
183impl<T> PartialEq for Pinky<T> {
184    fn eq(&self, other: &Pinky<T>) -> bool {
185        Arc::ptr_eq(&self.waker, &other.waker)
186    }
187}
188
189impl<T> fmt::Debug for Pinky<T> {
190    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191        write!(f, "Pinky")
192    }
193}
194
195/// A PinkyErrorBroadcaster allows you to broacast the success/error of a promise resolution to several subscribers.
196pub struct PinkyErrorBroadcaster<T, E: Clone> {
197    marker: Arc<RwLock<Option<String>>>,
198    inner: Arc<Mutex<ErrorBroadcasterInner<E>>>,
199    pinky: Pinky<Result<T, E>>,
200}
201
202impl<T: Send + 'static, E: Send + Clone + 'static> PinkyErrorBroadcaster<T, E> {
203    /// Create a new promise with associated error broadcaster
204    pub fn new() -> (PinkySwear<Result<T, E>>, Self) {
205        let (promise, pinky) = PinkySwear::new();
206        (
207            promise,
208            Self {
209                marker: Default::default(),
210                inner: Arc::new(Mutex::new(ErrorBroadcasterInner(Vec::default()))),
211                pinky,
212            },
213        )
214    }
215
216    /// Add a marker to logs
217    pub fn set_marker(&self, marker: String) {
218        for subscriber in self.inner.lock().0.iter() {
219            subscriber.set_marker(marker.clone());
220        }
221        *self.marker.write() = Some(marker);
222    }
223
224    /// Subscribe to receive a broacast when the underlying promise get henoured.
225    pub fn subscribe(&self) -> PinkySwear<Result<(), E>> {
226        self.inner.lock().subscribe(self.marker.read().clone())
227    }
228
229    /// Unsubscribe a promise from the broadcast.
230    pub fn unsubscribe(&self, promise: PinkySwear<Result<(), E>>) {
231        self.inner.lock().unsubscribe(promise);
232    }
233
234    /// Resolve the underlying promise and broadcast the result to subscribers.
235    pub fn swear(&self, data: Result<T, E>) {
236        for subscriber in self.inner.lock().0.iter() {
237            subscriber.swear(data.as_ref().map(|_| ()).map_err(Clone::clone))
238        }
239        self.pinky.swear(data);
240    }
241}
242
243impl<T, E: Clone> fmt::Debug for PinkyErrorBroadcaster<T, E> {
244    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
245        write!(f, "PinkyErrorBroadcaster")
246    }
247}
248
249struct ErrorBroadcasterInner<E>(Vec<Pinky<Result<(), E>>>);
250
251impl<E: Send + 'static> ErrorBroadcasterInner<E> {
252    fn subscribe(&mut self, marker: Option<String>) -> PinkySwear<Result<(), E>> {
253        let (promise, pinky) = PinkySwear::new();
254        self.0.push(pinky);
255        if let Some(marker) = marker {
256            promise.set_marker(marker);
257        }
258        promise
259    }
260
261    fn unsubscribe(&mut self, promise: PinkySwear<Result<(), E>>) {
262        self.0.retain(|pinky| pinky != &promise.pinky)
263    }
264}