1use std::{
19 any::Any,
20 future::Future,
21 pin::Pin,
22 sync::{
23 atomic::{AtomicBool, Ordering},
24 Arc, Mutex,
25 },
26 task::{Context, Poll},
27};
28
29use crate::{Cancellation, CancellationCause};
30
31pub(crate) mod private;
32use private::*;
33
34#[must_use]
36pub struct Promise<T> {
37 state: PromiseState<T>,
39 target: Arc<PromiseTarget<T>>,
41 dependencies: Vec<Box<dyn Any + Send + Sync>>,
44}
45
46impl<T> Promise<T> {
47 pub fn sneak_peek(&self) -> &PromiseState<T> {
53 &self.state
54 }
55
56 pub fn peek(&mut self) -> &PromiseState<T> {
63 self.update();
64 &self.state
65 }
66
67 pub fn take(&mut self) -> PromiseState<T> {
71 self.update();
72 self.state.take()
73 }
74
75 pub fn wait(&self) -> &Self {
82 if !self.state.is_pending() {
83 return self;
86 }
87
88 Self::impl_wait(&self.target, None);
89 self
90 }
91
92 pub fn interruptible_wait(&self, interrupter: &Interrupter) -> &Self
93 where
94 T: 'static,
95 {
96 if !self.state.is_pending() {
97 return self;
100 }
101
102 if let Some(interrupt) = interrupter.push(self.target.clone()) {
103 Self::impl_wait(&self.target, Some(interrupt));
104 }
105
106 self
107 }
108
109 pub fn wait_mut(&mut self) -> &mut Self {
112 if !self.state.is_pending() {
113 return self;
114 }
115
116 if let Some(mut guard) = Self::impl_wait(&self.target, None) {
117 Self::impl_try_take_result(&mut self.state, &mut guard.result);
118 }
119
120 self
121 }
122
123 pub fn interruptible_wait_mut(&mut self, interrupter: &Interrupter) -> &mut Self
124 where
125 T: 'static,
126 {
127 if !self.state.is_pending() {
128 return self;
129 }
130
131 if let Some(interrupt) = interrupter.push(self.target.clone()) {
132 if let Some(mut guard) = Self::impl_wait(&self.target, Some(interrupt)) {
133 Self::impl_try_take_result(&mut self.state, &mut guard.result);
134 }
135 }
136
137 self
138 }
139
140 pub fn update(&mut self) {
145 if self.state.is_pending() {
146 match self.target.inner.lock() {
147 Ok(mut guard) => {
148 self.state.update(guard.result.take());
149 }
150 Err(_) => {
151 self.state = PromiseState::make_poisoned();
155 }
156 }
157 }
158 }
159}
160
161impl<T: 'static + Send + Sync> Promise<Promise<T>> {
162 pub fn flatten(self) -> Promise<T> {
164 self.impl_flatten()
165 }
166}
167
168impl<T> Drop for Promise<T> {
169 fn drop(&mut self) {
170 if self.state.is_pending() {
171 let f = match self.target.inner.lock() {
174 Ok(mut guard) => guard.on_promise_drop.take(),
175 Err(_) => None,
176 };
177
178 if let Some(f) = f {
179 f();
180 }
181 }
182 }
183}
184
185impl<T: Unpin> Future for Promise<T> {
186 type Output = PromiseState<T>;
187 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
188 let self_mut = self.get_mut();
189 let state = self_mut.take();
190 if state.is_pending() {
191 if let Ok(mut inner) = self_mut.target.inner.lock() {
192 inner.waker = Some(cx.waker().clone());
193 }
194 Poll::Pending
195 } else {
196 Poll::Ready(state)
197 }
198 }
199}
200
201#[derive(Debug, Clone)]
203pub enum PromiseState<T> {
204 Available(T),
206 Pending,
208 Cancelled(Cancellation),
210 Disposed,
212 Taken,
215}
216
217impl<T> PromiseState<T> {
218 pub fn as_ref(&self) -> PromiseState<&T> {
219 match self {
220 Self::Available(value) => PromiseState::Available(value),
221 Self::Pending => PromiseState::Pending,
222 Self::Cancelled(cancellation) => PromiseState::Cancelled(cancellation.clone()),
223 Self::Disposed => PromiseState::Disposed,
224 Self::Taken => PromiseState::Taken,
225 }
226 }
227
228 pub fn available(self) -> Option<T> {
229 match self {
230 Self::Available(value) => Some(value),
231 _ => None,
232 }
233 }
234
235 pub fn is_available(&self) -> bool {
236 matches!(self, Self::Available(_))
237 }
238
239 pub fn is_pending(&self) -> bool {
240 matches!(self, Self::Pending)
241 }
242
243 pub fn is_cancelled(&self) -> bool {
244 matches!(self, Self::Cancelled(_))
245 }
246
247 pub fn cancellation(&self) -> Option<&Cancellation> {
248 match self {
249 Self::Cancelled(cause) => Some(cause),
250 _ => None,
251 }
252 }
253
254 pub fn is_disposed(&self) -> bool {
255 matches!(self, Self::Disposed)
256 }
257
258 pub fn is_taken(&self) -> bool {
259 matches!(self, Self::Taken)
260 }
261
262 pub fn take(&mut self) -> PromiseState<T> {
263 let next_value = match self {
264 Self::Available(_) => Self::Taken,
265 Self::Pending => Self::Pending,
266 Self::Cancelled(cancellation) => Self::Cancelled(cancellation.clone()),
267 Self::Disposed => Self::Disposed,
268 Self::Taken => Self::Taken,
269 };
270
271 std::mem::replace(self, next_value)
272 }
273
274 pub fn map<U>(self, f: impl FnOnce(T) -> U) -> PromiseState<U> {
275 match self {
276 Self::Available(x) => PromiseState::Available(f(x)),
277 Self::Pending => PromiseState::Pending,
278 Self::Cancelled(cause) => PromiseState::Cancelled(cause),
279 Self::Disposed => PromiseState::Disposed,
280 Self::Taken => PromiseState::Taken,
281 }
282 }
283
284 pub fn then<U>(self, f: impl FnOnce(T) -> PromiseState<U>) -> PromiseState<U> {
285 self.map(f).flatten()
286 }
287
288 fn update(&mut self, result: Option<PromiseResult<T>>) {
289 match result {
290 Some(PromiseResult::Available(response)) => {
291 *self = PromiseState::Available(response);
292 }
293 Some(PromiseResult::Cancelled(cause)) => {
294 *self = PromiseState::Cancelled(cause);
295 }
296 Some(PromiseResult::Disposed) => {
297 *self = PromiseState::Disposed;
298 }
299 None => {
300 }
302 }
303 }
304
305 fn make_poisoned() -> Self {
306 Self::Cancelled(Cancellation::from_cause(
307 CancellationCause::PoisonedMutexInPromise,
308 ))
309 }
310}
311
312impl<T> PromiseState<PromiseState<T>> {
313 pub fn flatten(self) -> PromiseState<T> {
314 match self {
315 Self::Available(x) => x,
316 Self::Pending => PromiseState::Pending,
317 Self::Cancelled(cause) => PromiseState::Cancelled(cause),
318 Self::Disposed => PromiseState::Disposed,
319 Self::Taken => PromiseState::Taken,
320 }
321 }
322}
323
324pub struct Interrupter {
325 inner: Arc<Mutex<InterrupterInner>>,
326}
327
328#[allow(clippy::arc_with_non_send_sync)]
329impl Interrupter {
330 pub fn new() -> Self {
331 Self {
332 inner: Arc::new(Mutex::new(InterrupterInner::new())),
333 }
334 }
335
336 pub fn interrupt(&self) {
343 let mut guard = match self.inner.lock() {
344 Ok(guard) => guard,
345 Err(poisoned) => {
346 let mut inner = poisoned.into_inner();
347 *inner = InterrupterInner::new();
348 return;
349 }
350 };
351 guard.triggered = true;
352 for waiter in &*guard.waiters {
353 waiter.interrupt.store(true, Ordering::SeqCst);
354 waiter.interruptible.interrupt();
355 }
356 guard.waiters.clear();
357 }
358
359 pub fn reset(&self) {
363 match self.inner.lock() {
364 Ok(mut guard) => {
365 guard.triggered = false;
366 }
367 Err(poisoned) => {
368 let mut guard = poisoned.into_inner();
369 *guard = InterrupterInner::new();
370 }
371 }
372 }
373
374 fn push<T: 'static>(&self, target: Arc<PromiseTarget<T>>) -> Option<Arc<AtomicBool>> {
375 let mut guard = match self.inner.lock() {
376 Ok(guard) => guard,
377 Err(poisoned) => {
378 let mut guard = poisoned.into_inner();
379 *guard = InterrupterInner::new();
380 guard
381 }
382 };
383
384 if guard.triggered {
385 return None;
386 }
387
388 let interruptee = Interruptee {
389 interrupt: Arc::new(AtomicBool::new(false)),
390 interruptible: target,
391 };
392 let interrupt = interruptee.interrupt.clone();
393
394 guard.waiters.push(interruptee);
395 Some(interrupt)
396 }
397}
398
399impl Default for Interrupter {
400 fn default() -> Self {
401 Interrupter::new()
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use crate::prelude::*;
408
409 #[test]
410 fn test_promise_flatten() {
411 {
413 let (outer_sender, mut flat_promise) = {
414 let (outer_sender, outer_promise) = Promise::<Promise<&str>>::new();
415 (outer_sender, outer_promise.flatten())
416 };
417
418 let (inner_sender, inner_promise) = Promise::<&str>::new();
419 assert!(outer_sender.send(inner_promise).is_ok());
420 assert!(flat_promise.peek().is_pending());
421 assert!(inner_sender.send("hello").is_ok());
422 assert_eq!(flat_promise.take().available(), Some("hello"));
423 }
424
425 {
427 let (outer_sender, mut flat_promise) = {
428 let (outer_sender, outer_promise) = Promise::<Promise<&str>>::new();
429 (outer_sender, outer_promise.flatten())
430 };
431
432 let (inner_sender, inner_promise) = Promise::<&str>::new();
433 assert!(flat_promise.peek().is_pending());
434 assert!(inner_sender.send("hello").is_ok());
435 assert!(flat_promise.peek().is_pending());
436 assert!(outer_sender.send(inner_promise).is_ok());
437 assert_eq!(flat_promise.take().available(), Some("hello"));
438 }
439
440 {
442 let (inner_sender, mut flat_promise) = {
443 let (outer_sender, mut outer_promise) = Promise::<Promise<&str>>::new();
444 assert!(outer_promise.peek().is_pending());
445
446 let (inner_sender, inner_promise) = Promise::<&str>::new();
447 assert!(outer_sender.send(inner_promise).is_ok());
448 (inner_sender, outer_promise.flatten())
449 };
450
451 assert!(flat_promise.peek().is_pending());
452 assert!(inner_sender.send("hello").is_ok());
453 assert_eq!(flat_promise.take().available(), Some("hello"));
454 }
455
456 {
458 let (mut flat_promise, outer_sender, inner_promise) = {
459 let (outer_sender, outer_promise) = Promise::<Promise<&str>>::new();
460
461 let (inner_sender, inner_promise) = Promise::<&str>::new();
462 assert!(inner_sender.send("hello").is_ok());
463 (outer_promise.flatten(), outer_sender, inner_promise)
464 };
465
466 assert!(flat_promise.peek().is_pending());
467 assert!(outer_sender.send(inner_promise).is_ok());
468 assert_eq!(flat_promise.take().available(), Some("hello"));
469 }
470
471 {
473 let mut flat_promise = {
474 let (outer_sender, mut outer_promise) = Promise::<Promise<&str>>::new();
475 assert!(outer_promise.peek().is_pending());
476
477 let (inner_sender, inner_promise) = Promise::<&str>::new();
478 assert!(outer_sender.send(inner_promise).is_ok());
479 assert!(inner_sender.send("hello").is_ok());
480 assert!(outer_promise.peek().is_available());
481 outer_promise.flatten()
482 };
483
484 assert_eq!(flat_promise.take().available(), Some("hello"));
485 }
486
487 {
489 let mut flat_promise = {
490 let (outer_sender, outer_promise) = Promise::<Promise<&str>>::new();
491 let (inner_sender, inner_promise) = Promise::<&str>::new();
492 assert!(inner_sender.send("hello").is_ok());
493 assert!(outer_sender.send(inner_promise).is_ok());
494 outer_promise.flatten()
495 };
496
497 assert_eq!(flat_promise.take().available(), Some("hello"));
498 }
499 }
500
501 use super::Sender;
502 struct DoubleFlattenPairs {
503 outer_promise: Promise<Promise<Promise<&'static str>>>,
504 outer_sender: Sender<Promise<Promise<&'static str>>>,
505 mid_promise: Promise<Promise<&'static str>>,
506 mid_sender: Sender<Promise<&'static str>>,
507 inner_promise: Promise<&'static str>,
508 inner_sender: Sender<&'static str>,
509 }
510
511 impl DoubleFlattenPairs {
512 fn new() -> DoubleFlattenPairs {
513 let (outer_sender, outer_promise) = Promise::new();
514 let (mid_sender, mid_promise) = Promise::new();
515 let (inner_sender, inner_promise) = Promise::new();
516 Self {
517 outer_promise,
518 outer_sender,
519 mid_promise,
520 mid_sender,
521 inner_promise,
522 inner_sender,
523 }
524 }
525 }
526
527 #[test]
528 fn test_promise_double_flatten() {
529 {
531 let DoubleFlattenPairs {
532 outer_promise,
533 outer_sender,
534 mid_promise,
535 mid_sender,
536 inner_promise,
537 inner_sender,
538 } = DoubleFlattenPairs::new();
539 let mut flat_promise = outer_promise.flatten().flatten();
540 assert!(flat_promise.peek().is_pending());
541 assert!(outer_sender.send(mid_promise).is_ok());
542 assert!(flat_promise.peek().is_pending());
543 assert!(mid_sender.send(inner_promise).is_ok());
544 assert!(flat_promise.peek().is_pending());
545 assert!(inner_sender.send("hello").is_ok());
546 assert_eq!(flat_promise.take().available(), Some("hello"));
547 }
548
549 {
551 let DoubleFlattenPairs {
552 outer_promise,
553 outer_sender,
554 mid_promise,
555 mid_sender,
556 inner_promise,
557 inner_sender,
558 } = DoubleFlattenPairs::new();
559 let mut flat_promise = outer_promise.flatten();
560 assert!(flat_promise.peek().is_pending());
561 assert!(outer_sender.send(mid_promise).is_ok());
562 assert!(flat_promise.peek().is_pending());
563 let mut flat_promise = flat_promise.flatten();
564 assert!(flat_promise.peek().is_pending());
565 assert!(mid_sender.send(inner_promise).is_ok());
566 assert!(flat_promise.peek().is_pending());
567 assert!(inner_sender.send("hello").is_ok());
568 assert_eq!(flat_promise.take().available(), Some("hello"));
569 }
570
571 {
573 let DoubleFlattenPairs {
574 outer_promise,
575 outer_sender,
576 mid_promise,
577 mid_sender,
578 inner_promise,
579 inner_sender,
580 } = DoubleFlattenPairs::new();
581 assert!(outer_sender.send(mid_promise).is_ok());
582 let mut flat_promise = outer_promise.flatten().flatten();
583 assert!(flat_promise.peek().is_pending());
584 assert!(mid_sender.send(inner_promise).is_ok());
585 assert!(flat_promise.peek().is_pending());
586 assert!(inner_sender.send("hello").is_ok());
587 assert_eq!(flat_promise.take().available(), Some("hello"));
588 }
589
590 {
592 let DoubleFlattenPairs {
593 mut outer_promise,
594 outer_sender,
595 mid_promise,
596 mid_sender,
597 inner_promise,
598 inner_sender,
599 } = DoubleFlattenPairs::new();
600 assert!(outer_sender.send(mid_promise).is_ok());
601 assert!(outer_promise.peek().is_available());
602 assert!(mid_sender.send(inner_promise).is_ok());
603 let mut flat_promise = outer_promise.flatten().flatten();
604 assert!(flat_promise.peek().is_pending());
605 assert!(inner_sender.send("hello").is_ok());
606 assert_eq!(flat_promise.take().available(), Some("hello"));
607 }
608
609 {
611 let DoubleFlattenPairs {
612 mut outer_promise,
613 outer_sender,
614 mid_promise,
615 mid_sender,
616 inner_promise,
617 inner_sender,
618 } = DoubleFlattenPairs::new();
619 assert!(outer_sender.send(mid_promise).is_ok());
620 assert!(outer_promise.peek().is_available());
621 assert!(mid_sender.send(inner_promise).is_ok());
622 assert!(inner_sender.send("hello").is_ok());
623 let mut flat_promise = outer_promise.flatten().flatten();
624 assert_eq!(flat_promise.take().available(), Some("hello"));
625 }
626
627 {
629 let DoubleFlattenPairs {
630 outer_promise,
631 outer_sender,
632 mid_promise,
633 mid_sender,
634 inner_promise,
635 inner_sender,
636 } = DoubleFlattenPairs::new();
637 assert!(mid_sender.send(inner_promise).is_ok());
638 let mut flat_promise = outer_promise.flatten().flatten();
639 assert!(flat_promise.peek().is_pending());
640 assert!(inner_sender.send("hello").is_ok());
641 assert!(flat_promise.peek().is_pending());
642 assert!(outer_sender.send(mid_promise).is_ok());
643 assert_eq!(flat_promise.take().available(), Some("hello"));
644 }
645
646 {
648 let DoubleFlattenPairs {
649 outer_promise,
650 outer_sender,
651 mid_promise,
652 mid_sender,
653 inner_promise,
654 inner_sender,
655 } = DoubleFlattenPairs::new();
656 assert!(inner_sender.send("hello").is_ok());
657 let mut flat_promise = outer_promise.flatten().flatten();
658 assert!(flat_promise.peek().is_pending());
659 assert!(outer_sender.send(mid_promise).is_ok());
660 assert!(flat_promise.peek().is_pending());
661 assert!(mid_sender.send(inner_promise).is_ok());
662 assert_eq!(flat_promise.take().available(), Some("hello"));
663 }
664 }
665}