af_core/task/
cancel.rs

1// Copyright © 2021 Alexandra Frydl
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use crate::prelude::*;
8use crate::task::{self, Task};
9use event_listener::Event;
10use std::sync::atomic::{self, AtomicBool};
11
12/// Inner state of an canceler/signal pair.
13#[derive(Default)]
14struct Inner {
15  event: Event,
16  flag: AtomicBool,
17}
18
19/// A task canceler that triggers a cloneable [`CancelSignal`].
20pub struct Canceler {
21  inner: Arc<Inner>,
22  _inherit: Option<Task<()>>,
23}
24
25impl Canceler {
26  /// Creates a new task canceler.
27  pub fn new() -> Self {
28    Self { inner: default(), _inherit: None }
29  }
30
31  /// Creates a new task canceler that inherits the state of an existing cancel
32  /// signal.
33  ///
34  /// When the given cancel signal is triggered, the canceler will trigger its
35  /// own cancel signal.
36  pub fn inherit(cancel: CancelSignal) -> Self {
37    let linked = Self::new();
38
39    Self {
40      inner: linked.inner.clone(),
41
42      _inherit: Some(task::start(async move {
43        cancel.listen().await;
44        linked.cancel();
45      })),
46    }
47  }
48
49  /// Triggers all cancel signals.
50  pub fn cancel(&self) {
51    if !self.inner.flag.swap(true, atomic::Ordering::AcqRel) {
52      self.inner.event.notify_relaxed(usize::MAX);
53    }
54  }
55
56  /// Returns `true` if the cancel signals have been triggered.
57  pub fn is_triggered(&self) -> bool {
58    self.inner.flag.load(atomic::Ordering::Relaxed)
59  }
60
61  /// Returns a [`CancelSignal`] that is triggered by this canceler.
62  pub fn signal(&self) -> CancelSignal {
63    CancelSignal { inner: Some(self.inner.clone()) }
64  }
65}
66
67impl Default for Canceler {
68  fn default() -> Self {
69    Self::new()
70  }
71}
72
73/// An awaitable cancel signal triggered by a [`Canceler`].
74#[derive(Default)]
75pub struct CancelSignal {
76  inner: Option<Arc<Inner>>,
77}
78
79impl CancelSignal {
80  /// Waits for the given future until the cancel signal is triggered.
81  ///
82  /// If the signal is triggered, this function drops the future and returns a
83  /// [`Canceled`] error.
84  pub async fn guard<F>(&self, future: F) -> Result<F::Output, Canceled>
85  where
86    F: Future,
87  {
88    let future = async { Ok(future.await) };
89
90    let signal = async {
91      self.listen().await;
92
93      Err(Canceled)
94    };
95
96    futures_lite::future::or(future, signal).await
97  }
98
99  /// Returns `true` if the cancel signal has been triggered.
100  pub fn is_triggered(&self) -> bool {
101    match &self.inner {
102      Some(inner) => inner.flag.load(atomic::Ordering::Acquire),
103      None => false,
104    }
105  }
106
107  /// Waits for the cancel signal to be triggered.
108  pub async fn listen(&self) {
109    let inner = match &self.inner {
110      Some(inner) => inner,
111      None => return future::forever().await,
112    };
113
114    while !inner.flag.load(atomic::Ordering::Relaxed) {
115      let listener = inner.event.listen();
116
117      if self.is_triggered() {
118        return;
119      }
120
121      listener.await;
122    }
123  }
124}
125
126impl Clone for CancelSignal {
127  fn clone(&self) -> Self {
128    Self { inner: self.inner.clone() }
129  }
130}
131
132/// An error indicating a task was canceled.
133#[derive(Debug, Error)]
134#[error("Canceled.")]
135pub struct Canceled;