1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
// Copyright © 2021 Alexandra Frydl
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

use crate::prelude::*;
use crate::task::{self, Task};
use event_listener::Event;
use std::sync::atomic::{self, AtomicBool};

/// Inner state of an canceler/signal pair.
#[derive(Default)]
struct Inner {
  event: Event,
  flag: AtomicBool,
}

/// A task canceler that triggers a cloneable [`CancelSignal`].
pub struct Canceler {
  inner: Arc<Inner>,
  _inherit: Option<Task<()>>,
}

impl Canceler {
  /// Creates a new task canceler.
  pub fn new() -> Self {
    Self { inner: default(), _inherit: None }
  }

  /// Creates a new task canceler that inherits the state of an existing cancel
  /// signal.
  ///
  /// When the given cancel signal is triggered, the canceler will trigger its
  /// own cancel signal.
  pub fn inherit(cancel: CancelSignal) -> Self {
    let linked = Self::new();

    Self {
      inner: linked.inner.clone(),

      _inherit: Some(task::start(async move {
        cancel.listen().await;
        linked.cancel();
      })),
    }
  }

  /// Triggers all cancel signals.
  pub fn cancel(&self) {
    if !self.inner.flag.swap(true, atomic::Ordering::AcqRel) {
      self.inner.event.notify_relaxed(usize::MAX);
    }
  }

  /// Returns `true` if the cancel signals have been triggered.
  pub fn is_triggered(&self) -> bool {
    self.inner.flag.load(atomic::Ordering::Relaxed)
  }

  /// Returns a [`CancelSignal`] that is triggered by this canceler.
  pub fn signal(&self) -> CancelSignal {
    CancelSignal { inner: Some(self.inner.clone()) }
  }
}

impl Default for Canceler {
  fn default() -> Self {
    Self::new()
  }
}

/// An awaitable cancel signal triggered by a [`Canceler`].
#[derive(Default)]
pub struct CancelSignal {
  inner: Option<Arc<Inner>>,
}

impl CancelSignal {
  /// Waits for the given future until the cancel signal is triggered.
  ///
  /// If the signal is triggered, this function drops the future and returns a
  /// [`Canceled`] error.
  pub async fn guard<F>(&self, future: F) -> Result<F::Output, Canceled>
  where
    F: Future,
  {
    let future = async { Ok(future.await) };

    let signal = async {
      self.listen().await;

      Err(Canceled)
    };

    futures_lite::future::or(future, signal).await
  }

  /// Returns `true` if the cancel signal has been triggered.
  pub fn is_triggered(&self) -> bool {
    match &self.inner {
      Some(inner) => inner.flag.load(atomic::Ordering::Acquire),
      None => false,
    }
  }

  /// Waits for the cancel signal to be triggered.
  pub async fn listen(&self) {
    let inner = match &self.inner {
      Some(inner) => inner,
      None => return future::forever().await,
    };

    while !inner.flag.load(atomic::Ordering::Relaxed) {
      let listener = inner.event.listen();

      if self.is_triggered() {
        return;
      }

      listener.await;
    }
  }
}

impl Clone for CancelSignal {
  fn clone(&self) -> Self {
    Self { inner: self.inner.clone() }
  }
}

/// An error indicating a task was canceled.
#[derive(Debug, Error)]
#[error("Canceled.")]
pub struct Canceled;