1#![cfg_attr(feature = "docs", feature(doc_cfg))]
26#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
27#![doc(test(attr(deny(rust_2018_idioms, warnings))))]
28#![doc(test(attr(allow(unused_extern_crates))))]
29#![doc(html_root_url = "https://docs.rs/pinky-swear/6.2.1/")]
30
31doc_comment::doctest!("../README.md");
32
33use flume::{Receiver, Sender};
34use parking_lot::{Mutex, RwLock};
35use std::{
36 fmt,
37 future::Future,
38 pin::Pin,
39 sync::Arc,
40 task::{Context, Poll, Waker},
41};
42use tracing::{trace, warn};
43
44pub trait Cancellable<E> {
46 fn cancel(&self, err: E);
48}
49
50#[must_use = "PinkySwear should be used or you can miss errors"]
52pub struct PinkySwear<T> {
53 recv: Receiver<T>,
54 pinky: Pinky<T>,
55}
56
57impl<T: Send + 'static> PinkySwear<T> {
58 pub fn new() -> (Self, Pinky<T>) {
60 let (send, recv) = flume::unbounded();
61 let pinky = Pinky {
62 send,
63 waker: Default::default(),
64 marker: Default::default(),
65 };
66 let promise = Self { recv, pinky };
67 let pinky = promise.pinky.clone();
68 (promise, pinky)
69 }
70
71 pub fn new_with_data(data: T) -> Self {
73 let (promise, pinky) = Self::new();
74 pinky.swear(data);
75 promise
76 }
77
78 pub fn try_wait(&self) -> Option<T> {
80 self.recv.try_recv().ok()
81 }
82
83 pub fn wait(&self) -> T {
85 self.recv.recv().unwrap()
86 }
87
88 pub fn set_marker(&self, marker: String) {
90 self.pinky.set_marker(marker);
91 }
92
93 fn set_waker(&self, waker: Waker) {
94 trace!(
95 promise = %self.pinky.marker(),
96 "Called from future, registering waker.",
97 );
98 *self.pinky.waker.lock() = Some(waker);
99 }
100}
101
102impl<T: Send + 'static> Future for PinkySwear<T> {
103 type Output = T;
104
105 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
106 self.set_waker(cx.waker().clone());
107 self.try_wait().map(Poll::Ready).unwrap_or(Poll::Pending)
108 }
109}
110
111impl<T> fmt::Debug for PinkySwear<T> {
112 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113 write!(f, "PinkySwear")
114 }
115}
116
117impl<T> Drop for PinkySwear<T> {
118 fn drop(&mut self) {
119 trace!(
120 promise = %self.pinky.marker(),
121 "Dropping promise.",
122 );
123 }
124}
125
126pub struct Pinky<T> {
128 send: Sender<T>,
129 waker: Arc<Mutex<Option<Waker>>>,
130 marker: Arc<RwLock<Option<String>>>,
131}
132
133impl<T> Pinky<T> {
134 pub fn swear(&self, data: T) {
136 trace!(
137 promise = %self.marker(),
138 "Resolving promise.",
139 );
140 if let Err(err) = self.send.send(data) {
141 warn!(
142 promise = %self.marker(),
143 error = %err,
144 "Failed resolving promise, promise has vanished.",
145 );
146 }
147 if let Some(waker) = self.waker.lock().as_ref() {
148 trace!("Got data, waking our waker.");
149 waker.wake_by_ref();
150 } else {
151 trace!("Got data but we have no one to notify.");
152 }
153 }
154
155 fn set_marker(&self, marker: String) {
156 *self.marker.write() = Some(marker);
157 }
158
159 fn marker(&self) -> String {
160 self.marker
161 .read()
162 .as_ref()
163 .map_or(String::default(), |marker| format!("[{}] ", marker))
164 }
165}
166
167impl<T> Clone for Pinky<T> {
168 fn clone(&self) -> Self {
169 Self {
170 send: self.send.clone(),
171 waker: self.waker.clone(),
172 marker: self.marker.clone(),
173 }
174 }
175}
176
177impl<T, E> Cancellable<E> for Pinky<Result<T, E>> {
178 fn cancel(&self, err: E) {
179 self.swear(Err(err))
180 }
181}
182
183impl<T> PartialEq for Pinky<T> {
184 fn eq(&self, other: &Pinky<T>) -> bool {
185 Arc::ptr_eq(&self.waker, &other.waker)
186 }
187}
188
189impl<T> fmt::Debug for Pinky<T> {
190 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191 write!(f, "Pinky")
192 }
193}
194
195pub struct PinkyErrorBroadcaster<T, E: Clone> {
197 marker: Arc<RwLock<Option<String>>>,
198 inner: Arc<Mutex<ErrorBroadcasterInner<E>>>,
199 pinky: Pinky<Result<T, E>>,
200}
201
202impl<T: Send + 'static, E: Send + Clone + 'static> PinkyErrorBroadcaster<T, E> {
203 pub fn new() -> (PinkySwear<Result<T, E>>, Self) {
205 let (promise, pinky) = PinkySwear::new();
206 (
207 promise,
208 Self {
209 marker: Default::default(),
210 inner: Arc::new(Mutex::new(ErrorBroadcasterInner(Vec::default()))),
211 pinky,
212 },
213 )
214 }
215
216 pub fn set_marker(&self, marker: String) {
218 for subscriber in self.inner.lock().0.iter() {
219 subscriber.set_marker(marker.clone());
220 }
221 *self.marker.write() = Some(marker);
222 }
223
224 pub fn subscribe(&self) -> PinkySwear<Result<(), E>> {
226 self.inner.lock().subscribe(self.marker.read().clone())
227 }
228
229 pub fn unsubscribe(&self, promise: PinkySwear<Result<(), E>>) {
231 self.inner.lock().unsubscribe(promise);
232 }
233
234 pub fn swear(&self, data: Result<T, E>) {
236 for subscriber in self.inner.lock().0.iter() {
237 subscriber.swear(data.as_ref().map(|_| ()).map_err(Clone::clone))
238 }
239 self.pinky.swear(data);
240 }
241}
242
243impl<T, E: Clone> fmt::Debug for PinkyErrorBroadcaster<T, E> {
244 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
245 write!(f, "PinkyErrorBroadcaster")
246 }
247}
248
249struct ErrorBroadcasterInner<E>(Vec<Pinky<Result<(), E>>>);
250
251impl<E: Send + 'static> ErrorBroadcasterInner<E> {
252 fn subscribe(&mut self, marker: Option<String>) -> PinkySwear<Result<(), E>> {
253 let (promise, pinky) = PinkySwear::new();
254 self.0.push(pinky);
255 if let Some(marker) = marker {
256 promise.set_marker(marker);
257 }
258 promise
259 }
260
261 fn unsubscribe(&mut self, promise: PinkySwear<Result<(), E>>) {
262 self.0.retain(|pinky| pinky != &promise.pinky)
263 }
264}