bevy_impulse/
promise.rs

1/*
2 * Copyright (C) 2023 Open Source Robotics Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16*/
17
18use std::{
19    any::Any,
20    future::Future,
21    pin::Pin,
22    sync::{
23        atomic::{AtomicBool, Ordering},
24        Arc, Mutex,
25    },
26    task::{Context, Poll},
27};
28
29use crate::{Cancellation, CancellationCause};
30
31pub(crate) mod private;
32use private::*;
33
34/// A promise expects to receive a value in the future.
35#[must_use]
36pub struct Promise<T> {
37    /// Cache of latest known state for this Promise.
38    state: PromiseState<T>,
39    /// State that gets shared with the [`Sender`]`
40    target: Arc<PromiseTarget<T>>,
41    /// Used to preserve livelihood for the dependencies of this promise.
42    /// Currently only used for implementing [`Promise::flatten`].
43    dependencies: Vec<Box<dyn Any + Send + Sync>>,
44}
45
46impl<T> Promise<T> {
47    /// Check the last known state of the promise without performing any update.
48    /// This will never block, but it might provide a state that is out of date.
49    ///
50    /// To borrow a view of the most current state at the cost of synchronizing
51    /// you can use [`Self::peek`].
52    pub fn sneak_peek(&self) -> &PromiseState<T> {
53        &self.state
54    }
55
56    /// View the state of the promise. If a response is available, you will
57    /// borrow it, but it will continue to be stored inside the promise.
58    ///
59    /// This requires a mutable reference to the promise because it may try to
60    /// update the current state if needed. To peek at that last known state
61    /// without trying to synchronize you can use [`Self::sneak_peek()`].
62    pub fn peek(&mut self) -> &PromiseState<T> {
63        self.update();
64        &self.state
65    }
66
67    /// Try to take the response of the promise. If the response is available,
68    /// it will be contained within the returned state, and the internal state
69    /// of this promise will permanently change to [`PromiseState::Taken`].
70    pub fn take(&mut self) -> PromiseState<T> {
71        self.update();
72        self.state.take()
73    }
74
75    /// Wait for the promise to be resolved. The internal state of the
76    /// [`Promise`] will not be updated; that requires a follow-up call to one
77    /// of the mutable methods.
78    ///
79    /// To both wait for a result and update the Promise's internal state once
80    /// it is available, use [`Self::wait_mut`].
81    pub fn wait(&self) -> &Self {
82        if !self.state.is_pending() {
83            // The result arrived and ownership has been transferred to this
84            // promise.
85            return self;
86        }
87
88        Self::impl_wait(&self.target, None);
89        self
90    }
91
92    pub fn interruptible_wait(&self, interrupter: &Interrupter) -> &Self
93    where
94        T: 'static,
95    {
96        if !self.state.is_pending() {
97            // The result arrived and ownership has been transferred to this
98            // promise.
99            return self;
100        }
101
102        if let Some(interrupt) = interrupter.push(self.target.clone()) {
103            Self::impl_wait(&self.target, Some(interrupt));
104        }
105
106        self
107    }
108
109    /// Wait for the promise to be resolved and update the internal state with
110    /// the result.
111    pub fn wait_mut(&mut self) -> &mut Self {
112        if !self.state.is_pending() {
113            return self;
114        }
115
116        if let Some(mut guard) = Self::impl_wait(&self.target, None) {
117            Self::impl_try_take_result(&mut self.state, &mut guard.result);
118        }
119
120        self
121    }
122
123    pub fn interruptible_wait_mut(&mut self, interrupter: &Interrupter) -> &mut Self
124    where
125        T: 'static,
126    {
127        if !self.state.is_pending() {
128            return self;
129        }
130
131        if let Some(interrupt) = interrupter.push(self.target.clone()) {
132            if let Some(mut guard) = Self::impl_wait(&self.target, Some(interrupt)) {
133                Self::impl_try_take_result(&mut self.state, &mut guard.result);
134            }
135        }
136
137        self
138    }
139
140    /// Update the internal state of the promise if it is still pending. This
141    /// will automatically be done by [`Self::peek`] and [`Self::take`] so there
142    /// is no need to call this explicitly unless you want a specific timing for
143    /// when to synchronize the internal state.
144    pub fn update(&mut self) {
145        if self.state.is_pending() {
146            match self.target.inner.lock() {
147                Ok(mut guard) => {
148                    self.state.update(guard.result.take());
149                }
150                Err(_) => {
151                    // If the mutex is poisoned, that has to mean the sender
152                    // crashed while trying to send the value, so we should
153                    // treat it as cancelled.
154                    self.state = PromiseState::make_poisoned();
155                }
156            }
157        }
158    }
159}
160
161impl<T: 'static + Send + Sync> Promise<Promise<T>> {
162    /// Reduce a nested promise into a single flat end-to-end promise.
163    pub fn flatten(self) -> Promise<T> {
164        self.impl_flatten()
165    }
166}
167
168impl<T> Drop for Promise<T> {
169    fn drop(&mut self) {
170        if self.state.is_pending() {
171            // We never received the result from the sender so we will trigger
172            // the cancellation.
173            let f = match self.target.inner.lock() {
174                Ok(mut guard) => guard.on_promise_drop.take(),
175                Err(_) => None,
176            };
177
178            if let Some(f) = f {
179                f();
180            }
181        }
182    }
183}
184
185impl<T: Unpin> Future for Promise<T> {
186    type Output = PromiseState<T>;
187    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
188        let self_mut = self.get_mut();
189        let state = self_mut.take();
190        if state.is_pending() {
191            if let Ok(mut inner) = self_mut.target.inner.lock() {
192                inner.waker = Some(cx.waker().clone());
193            }
194            Poll::Pending
195        } else {
196            Poll::Ready(state)
197        }
198    }
199}
200
201/// The state of a promise.
202#[derive(Debug, Clone)]
203pub enum PromiseState<T> {
204    /// The promise received its result and can be seen in this state.
205    Available(T),
206    /// The promise is still pending, so you need to keep waiting for the state.
207    Pending,
208    /// The promise has been cancelled and will never receive a response.
209    Cancelled(Cancellation),
210    /// The sender was disposed of, so the promise will never receive a response.
211    Disposed,
212    /// The promise was delivered and has been taken. It will never be available
213    /// to take again.
214    Taken,
215}
216
217impl<T> PromiseState<T> {
218    pub fn as_ref(&self) -> PromiseState<&T> {
219        match self {
220            Self::Available(value) => PromiseState::Available(value),
221            Self::Pending => PromiseState::Pending,
222            Self::Cancelled(cancellation) => PromiseState::Cancelled(cancellation.clone()),
223            Self::Disposed => PromiseState::Disposed,
224            Self::Taken => PromiseState::Taken,
225        }
226    }
227
228    pub fn available(self) -> Option<T> {
229        match self {
230            Self::Available(value) => Some(value),
231            _ => None,
232        }
233    }
234
235    pub fn is_available(&self) -> bool {
236        matches!(self, Self::Available(_))
237    }
238
239    pub fn is_pending(&self) -> bool {
240        matches!(self, Self::Pending)
241    }
242
243    pub fn is_cancelled(&self) -> bool {
244        matches!(self, Self::Cancelled(_))
245    }
246
247    pub fn cancellation(&self) -> Option<&Cancellation> {
248        match self {
249            Self::Cancelled(cause) => Some(cause),
250            _ => None,
251        }
252    }
253
254    pub fn is_disposed(&self) -> bool {
255        matches!(self, Self::Disposed)
256    }
257
258    pub fn is_taken(&self) -> bool {
259        matches!(self, Self::Taken)
260    }
261
262    pub fn take(&mut self) -> PromiseState<T> {
263        let next_value = match self {
264            Self::Available(_) => Self::Taken,
265            Self::Pending => Self::Pending,
266            Self::Cancelled(cancellation) => Self::Cancelled(cancellation.clone()),
267            Self::Disposed => Self::Disposed,
268            Self::Taken => Self::Taken,
269        };
270
271        std::mem::replace(self, next_value)
272    }
273
274    pub fn map<U>(self, f: impl FnOnce(T) -> U) -> PromiseState<U> {
275        match self {
276            Self::Available(x) => PromiseState::Available(f(x)),
277            Self::Pending => PromiseState::Pending,
278            Self::Cancelled(cause) => PromiseState::Cancelled(cause),
279            Self::Disposed => PromiseState::Disposed,
280            Self::Taken => PromiseState::Taken,
281        }
282    }
283
284    pub fn then<U>(self, f: impl FnOnce(T) -> PromiseState<U>) -> PromiseState<U> {
285        self.map(f).flatten()
286    }
287
288    fn update(&mut self, result: Option<PromiseResult<T>>) {
289        match result {
290            Some(PromiseResult::Available(response)) => {
291                *self = PromiseState::Available(response);
292            }
293            Some(PromiseResult::Cancelled(cause)) => {
294                *self = PromiseState::Cancelled(cause);
295            }
296            Some(PromiseResult::Disposed) => {
297                *self = PromiseState::Disposed;
298            }
299            None => {
300                // Do nothing
301            }
302        }
303    }
304
305    fn make_poisoned() -> Self {
306        Self::Cancelled(Cancellation::from_cause(
307            CancellationCause::PoisonedMutexInPromise,
308        ))
309    }
310}
311
312impl<T> PromiseState<PromiseState<T>> {
313    pub fn flatten(self) -> PromiseState<T> {
314        match self {
315            Self::Available(x) => x,
316            Self::Pending => PromiseState::Pending,
317            Self::Cancelled(cause) => PromiseState::Cancelled(cause),
318            Self::Disposed => PromiseState::Disposed,
319            Self::Taken => PromiseState::Taken,
320        }
321    }
322}
323
324pub struct Interrupter {
325    inner: Arc<Mutex<InterrupterInner>>,
326}
327
328#[allow(clippy::arc_with_non_send_sync)]
329impl Interrupter {
330    pub fn new() -> Self {
331        Self {
332            inner: Arc::new(Mutex::new(InterrupterInner::new())),
333        }
334    }
335
336    /// Tell all waiters that are listening to this Interrupter to interrupt
337    /// their waiting.
338    ///
339    /// Any new waiters added to this Interrupter after this is triggered will
340    /// not wait at all until [`Interrupter::reset`] is called for this
341    /// Interrupter.
342    pub fn interrupt(&self) {
343        let mut guard = match self.inner.lock() {
344            Ok(guard) => guard,
345            Err(poisoned) => {
346                let mut inner = poisoned.into_inner();
347                *inner = InterrupterInner::new();
348                return;
349            }
350        };
351        guard.triggered = true;
352        for waiter in &*guard.waiters {
353            waiter.interrupt.store(true, Ordering::SeqCst);
354            waiter.interruptible.interrupt();
355        }
356        guard.waiters.clear();
357    }
358
359    /// If interrupt() has been called on this Interrupter in the past, calling
360    /// this function will clear out the after-effect of that, allowing new
361    /// waiters to wait for a new call to interrupt() to happen.
362    pub fn reset(&self) {
363        match self.inner.lock() {
364            Ok(mut guard) => {
365                guard.triggered = false;
366            }
367            Err(poisoned) => {
368                let mut guard = poisoned.into_inner();
369                *guard = InterrupterInner::new();
370            }
371        }
372    }
373
374    fn push<T: 'static>(&self, target: Arc<PromiseTarget<T>>) -> Option<Arc<AtomicBool>> {
375        let mut guard = match self.inner.lock() {
376            Ok(guard) => guard,
377            Err(poisoned) => {
378                let mut guard = poisoned.into_inner();
379                *guard = InterrupterInner::new();
380                guard
381            }
382        };
383
384        if guard.triggered {
385            return None;
386        }
387
388        let interruptee = Interruptee {
389            interrupt: Arc::new(AtomicBool::new(false)),
390            interruptible: target,
391        };
392        let interrupt = interruptee.interrupt.clone();
393
394        guard.waiters.push(interruptee);
395        Some(interrupt)
396    }
397}
398
399impl Default for Interrupter {
400    fn default() -> Self {
401        Interrupter::new()
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use crate::prelude::*;
408
409    #[test]
410    fn test_promise_flatten() {
411        // Flatten, Outer, Inner
412        {
413            let (outer_sender, mut flat_promise) = {
414                let (outer_sender, outer_promise) = Promise::<Promise<&str>>::new();
415                (outer_sender, outer_promise.flatten())
416            };
417
418            let (inner_sender, inner_promise) = Promise::<&str>::new();
419            assert!(outer_sender.send(inner_promise).is_ok());
420            assert!(flat_promise.peek().is_pending());
421            assert!(inner_sender.send("hello").is_ok());
422            assert_eq!(flat_promise.take().available(), Some("hello"));
423        }
424
425        // Flatten, Inner, Outer
426        {
427            let (outer_sender, mut flat_promise) = {
428                let (outer_sender, outer_promise) = Promise::<Promise<&str>>::new();
429                (outer_sender, outer_promise.flatten())
430            };
431
432            let (inner_sender, inner_promise) = Promise::<&str>::new();
433            assert!(flat_promise.peek().is_pending());
434            assert!(inner_sender.send("hello").is_ok());
435            assert!(flat_promise.peek().is_pending());
436            assert!(outer_sender.send(inner_promise).is_ok());
437            assert_eq!(flat_promise.take().available(), Some("hello"));
438        }
439
440        // Outer, Flatten, Inner
441        {
442            let (inner_sender, mut flat_promise) = {
443                let (outer_sender, mut outer_promise) = Promise::<Promise<&str>>::new();
444                assert!(outer_promise.peek().is_pending());
445
446                let (inner_sender, inner_promise) = Promise::<&str>::new();
447                assert!(outer_sender.send(inner_promise).is_ok());
448                (inner_sender, outer_promise.flatten())
449            };
450
451            assert!(flat_promise.peek().is_pending());
452            assert!(inner_sender.send("hello").is_ok());
453            assert_eq!(flat_promise.take().available(), Some("hello"));
454        }
455
456        // Inner, Flatten, Outer
457        {
458            let (mut flat_promise, outer_sender, inner_promise) = {
459                let (outer_sender, outer_promise) = Promise::<Promise<&str>>::new();
460
461                let (inner_sender, inner_promise) = Promise::<&str>::new();
462                assert!(inner_sender.send("hello").is_ok());
463                (outer_promise.flatten(), outer_sender, inner_promise)
464            };
465
466            assert!(flat_promise.peek().is_pending());
467            assert!(outer_sender.send(inner_promise).is_ok());
468            assert_eq!(flat_promise.take().available(), Some("hello"));
469        }
470
471        // Outer, Inner, Flatten
472        {
473            let mut flat_promise = {
474                let (outer_sender, mut outer_promise) = Promise::<Promise<&str>>::new();
475                assert!(outer_promise.peek().is_pending());
476
477                let (inner_sender, inner_promise) = Promise::<&str>::new();
478                assert!(outer_sender.send(inner_promise).is_ok());
479                assert!(inner_sender.send("hello").is_ok());
480                assert!(outer_promise.peek().is_available());
481                outer_promise.flatten()
482            };
483
484            assert_eq!(flat_promise.take().available(), Some("hello"));
485        }
486
487        // Inner, Outer, Flatten
488        {
489            let mut flat_promise = {
490                let (outer_sender, outer_promise) = Promise::<Promise<&str>>::new();
491                let (inner_sender, inner_promise) = Promise::<&str>::new();
492                assert!(inner_sender.send("hello").is_ok());
493                assert!(outer_sender.send(inner_promise).is_ok());
494                outer_promise.flatten()
495            };
496
497            assert_eq!(flat_promise.take().available(), Some("hello"));
498        }
499    }
500
501    use super::Sender;
502    struct DoubleFlattenPairs {
503        outer_promise: Promise<Promise<Promise<&'static str>>>,
504        outer_sender: Sender<Promise<Promise<&'static str>>>,
505        mid_promise: Promise<Promise<&'static str>>,
506        mid_sender: Sender<Promise<&'static str>>,
507        inner_promise: Promise<&'static str>,
508        inner_sender: Sender<&'static str>,
509    }
510
511    impl DoubleFlattenPairs {
512        fn new() -> DoubleFlattenPairs {
513            let (outer_sender, outer_promise) = Promise::new();
514            let (mid_sender, mid_promise) = Promise::new();
515            let (inner_sender, inner_promise) = Promise::new();
516            Self {
517                outer_promise,
518                outer_sender,
519                mid_promise,
520                mid_sender,
521                inner_promise,
522                inner_sender,
523            }
524        }
525    }
526
527    #[test]
528    fn test_promise_double_flatten() {
529        // Flatten, Flatten, Outer, Mid, Inner
530        {
531            let DoubleFlattenPairs {
532                outer_promise,
533                outer_sender,
534                mid_promise,
535                mid_sender,
536                inner_promise,
537                inner_sender,
538            } = DoubleFlattenPairs::new();
539            let mut flat_promise = outer_promise.flatten().flatten();
540            assert!(flat_promise.peek().is_pending());
541            assert!(outer_sender.send(mid_promise).is_ok());
542            assert!(flat_promise.peek().is_pending());
543            assert!(mid_sender.send(inner_promise).is_ok());
544            assert!(flat_promise.peek().is_pending());
545            assert!(inner_sender.send("hello").is_ok());
546            assert_eq!(flat_promise.take().available(), Some("hello"));
547        }
548
549        // Flatten, Outer, Flatten, Mid, Inner
550        {
551            let DoubleFlattenPairs {
552                outer_promise,
553                outer_sender,
554                mid_promise,
555                mid_sender,
556                inner_promise,
557                inner_sender,
558            } = DoubleFlattenPairs::new();
559            let mut flat_promise = outer_promise.flatten();
560            assert!(flat_promise.peek().is_pending());
561            assert!(outer_sender.send(mid_promise).is_ok());
562            assert!(flat_promise.peek().is_pending());
563            let mut flat_promise = flat_promise.flatten();
564            assert!(flat_promise.peek().is_pending());
565            assert!(mid_sender.send(inner_promise).is_ok());
566            assert!(flat_promise.peek().is_pending());
567            assert!(inner_sender.send("hello").is_ok());
568            assert_eq!(flat_promise.take().available(), Some("hello"));
569        }
570
571        // Outer, Flatten, Flatten, Mid, Inner
572        {
573            let DoubleFlattenPairs {
574                outer_promise,
575                outer_sender,
576                mid_promise,
577                mid_sender,
578                inner_promise,
579                inner_sender,
580            } = DoubleFlattenPairs::new();
581            assert!(outer_sender.send(mid_promise).is_ok());
582            let mut flat_promise = outer_promise.flatten().flatten();
583            assert!(flat_promise.peek().is_pending());
584            assert!(mid_sender.send(inner_promise).is_ok());
585            assert!(flat_promise.peek().is_pending());
586            assert!(inner_sender.send("hello").is_ok());
587            assert_eq!(flat_promise.take().available(), Some("hello"));
588        }
589
590        // Outer, Mid, Flatten, Flatten, Inner
591        {
592            let DoubleFlattenPairs {
593                mut outer_promise,
594                outer_sender,
595                mid_promise,
596                mid_sender,
597                inner_promise,
598                inner_sender,
599            } = DoubleFlattenPairs::new();
600            assert!(outer_sender.send(mid_promise).is_ok());
601            assert!(outer_promise.peek().is_available());
602            assert!(mid_sender.send(inner_promise).is_ok());
603            let mut flat_promise = outer_promise.flatten().flatten();
604            assert!(flat_promise.peek().is_pending());
605            assert!(inner_sender.send("hello").is_ok());
606            assert_eq!(flat_promise.take().available(), Some("hello"));
607        }
608
609        // Outer, Mid, Inner, Flatten, Flatten
610        {
611            let DoubleFlattenPairs {
612                mut outer_promise,
613                outer_sender,
614                mid_promise,
615                mid_sender,
616                inner_promise,
617                inner_sender,
618            } = DoubleFlattenPairs::new();
619            assert!(outer_sender.send(mid_promise).is_ok());
620            assert!(outer_promise.peek().is_available());
621            assert!(mid_sender.send(inner_promise).is_ok());
622            assert!(inner_sender.send("hello").is_ok());
623            let mut flat_promise = outer_promise.flatten().flatten();
624            assert_eq!(flat_promise.take().available(), Some("hello"));
625        }
626
627        // Mid, Flatten, Flatten, Inner, Outer
628        {
629            let DoubleFlattenPairs {
630                outer_promise,
631                outer_sender,
632                mid_promise,
633                mid_sender,
634                inner_promise,
635                inner_sender,
636            } = DoubleFlattenPairs::new();
637            assert!(mid_sender.send(inner_promise).is_ok());
638            let mut flat_promise = outer_promise.flatten().flatten();
639            assert!(flat_promise.peek().is_pending());
640            assert!(inner_sender.send("hello").is_ok());
641            assert!(flat_promise.peek().is_pending());
642            assert!(outer_sender.send(mid_promise).is_ok());
643            assert_eq!(flat_promise.take().available(), Some("hello"));
644        }
645
646        // Inner, Flatten, Flatten, Outer, Mid
647        {
648            let DoubleFlattenPairs {
649                outer_promise,
650                outer_sender,
651                mid_promise,
652                mid_sender,
653                inner_promise,
654                inner_sender,
655            } = DoubleFlattenPairs::new();
656            assert!(inner_sender.send("hello").is_ok());
657            let mut flat_promise = outer_promise.flatten().flatten();
658            assert!(flat_promise.peek().is_pending());
659            assert!(outer_sender.send(mid_promise).is_ok());
660            assert!(flat_promise.peek().is_pending());
661            assert!(mid_sender.send(inner_promise).is_ok());
662            assert_eq!(flat_promise.take().available(), Some("hello"));
663        }
664    }
665}