1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
//! A runtime extension for adding support for Structured Concurrency to existing runtimes.
//!
//! # What is Strucutured Concurrency?
//! [Structured Concurrency] is a programming paradigm that lets asynchronous operations run only
//! within certain scopes so that they form an operation stack like a regular function call stack.
//! As the parent operation waits until all children complete, Structured Concurrency helps local
//! reasoning of concurrent programs.
//!
//! # Out-of-task concurrency considered harmful
//! Most of the asynchronous programs are composed of `async` functions and *in-task* concurrency
//! primitives such as `select` and `join`, and this makes such futures automatically
//! well-structured. As "futures do nothing unless polled," executions of inner (child) operations
//! are very explicit (usually at the point of `await`s). Moreover, canceling a future is done by
//! dropping it, which will reclaim resources used for the operation, including the inner futures.
//! This drop-chain propagates the cancellation to the very end of the operation stack.
//!
//! *Out-of-task* concurrencies such as `spawn`, however, breaks the structure. They allow us to
//! start a new execution unit that can escape from the parent stack. Although frameworks provide a
//! way to join on the spawned task, they don't propagate cancellation properly. If you drop the
//! spawning task, the spawned task may outlive the parent indefinitely.
//!
//! `task_scope` is designed to offer a [`spawn`] function that properly respects cancellation.
//! [`scope`] function delimits a lifetime within which inner tasks are allowed to run. If you
//! issue a graceful cancellation or drop the scope, the runtime delivers a cancellation signal to all
//! the child tasks.
//!
//! # Cancellation points
//! `task_scope` requires the tasks to pass through cancellation points regularly to work effectively.
//! Consider the following (contrived) example:
//!
//! ```no_run
//! use tokio::io::*;
//!
//! # async fn doc_loop() {
//! let mut read = repeat(42); // very fast input
//! let mut write = sink(); // very fast output
//!
//! copy(&mut read, &mut write).await.unwrap();
//! # }
//! ```
//!
//! This program is virtually running into an infinite loop because `read` and `write` never
//! terminate. To make matters worse, this loop cannot be canceled from the outside because the
//! I/O operations always succeed, and `copy` function tries to continue as much as possible.
//! Therefore, the spawned task must cooperatively check for cancellation or yield the execution
//! regularly in a loop to preserve the well-structuredness.
//!
//! `task_scope` provides a convenience function [`cancelable`] to handle cancellation
//! automatically. It wraps the given `Future`/`AsyncRead`/`AsyncWrite` and checks for cancellation
//! (graceful or forced) before proceeding with the inner computation. The example above will look
//! like:
//!
//! ```no_run
//! use futures::pin_mut;
//! use tokio::io::*;
//!
//! use task_scope::cancelable;
//!
//! # async fn doc_cancelable_loop() {
//! let read = cancelable(repeat(42)); // very fast, but cancelable input
//! pin_mut!(read); // needed for Unpin bound of copy
//! let mut write = sink(); // very fast output
//!
//! // this will terminate with an error on cancellation
//! copy(&mut read, &mut write).await.unwrap();
//! # }
//! ```
//!
//! If the cancellation logic is more complex, you can poll [`Cancellation`] manually to check for
//! a cancellation signal.
//!
//! # Grace period and mercy period
//! `task_scope` supports two modes of cancellation. Graceful cancellation and forced cancellation.
//!
//! You can initiate a graceful cancellation by calling [`cancel`] method of the scope. This
//! notifies and gives a "grace period" to tasks in the scope so that they can start their
//! cancellation. The scope waits for all the tasks to complete as usual.
//!
//! A forced cancellation is propagated to tasks when the scope is dropped, or [`force_cancel`] is
//! called. Given that canceling a long-running is really hard as shown above, the canceled tasks
//! are given to a "mercy period." The tasks can continue their execution until they yield the
//! execution next time, and then the runtime will automatically cancel the task. The tasks should
//! shorten the mercy period as short as possible because it's technically breaking the program's
//! concurrency structure (a child is outliving the dropped parent).
//!
//! # `task_scope` is (almost) executor-agnostic
//! `task_scope` works by adding cancellation information to the current asynchronous context
//! provided by runtimes such as async-std and Tokio. Therefore, the propagation of cancellation
//! works regardless of executors. Even it's possible to propagate a cancellation of a task
//! running in a runtime to its child task running in another runtime.
//!
//! However, `task_scope` doesn't provide its own `spawn` API but delegates spawning to the executors.
//! Thus, to use `spawn`, you need to declare which runtime(s) you want to extend with
//! `task_scope` by enabling features of `task_scope`. For example, if you want to spawn a
//! cancelable task in a Tokio runtime, you need to enable `"tokio"` feature of `task_scope` and to
//! call [`task_scope::spawn::tokio::spawn`]. Currently, async-std and Tokio are supported.
//!
//! If only one runtime is enabled, [`task_scope::spawn`] refers to the `spawn` function for
//! the runtime. By default, only `"tokio"` feature is enabled.
//!
//! [Structured Concurrency]: https://en.wikipedia.org/wiki/Structured_concurrency
//! [`spawn`]: crate::spawn()
//! [`scope`]: crate::scope()
//! [`cancelable`]: crate::cancelable()
//! [`Cancellation`]: crate::Cancellation
//! [`cancel`]: crate::scope::ScopeFuture::cancel
//! [`force_cancel`]: crate::scope::ScopeFuture::force_cancel
//! [`task_scope::spawn::tokio::spawn`]: crate::spawn::tokio::spawn
//! [`task_scope::spawn`]: crate::spawn()
//!

use futures_intrusive::channel::shared::StateReceiver;
use futures_intrusive::sync::ManualResetEvent;

use std::fmt;
use std::sync::{Arc, Weak};

pub mod cancelable;
pub mod cancellation;
pub mod handle;
pub mod scope;
pub mod spawn;
mod waker;
mod with_token;

pub use cancelable::{cancelable, Cancelable};
pub use cancellation::{cancellation, Cancellation};
pub use scope::scope;
pub use spawn::*;

/// The error type for cancellation.
#[derive(Debug, PartialEq, Eq, Hash)]
pub enum Canceled {
    /// The parent task issued a graceful cancellation request.
    /// The children are in a "grace period"; The parent will wait until all children complete shutdown.
    Graceful,
    /// The parent task is forcibly shutting down children (mainly because the parent is dropped).
    /// The children are in a "mercy period"; They can continue execution until the next yield,
    /// and then they will be stopped automatically.
    Forced,
}

impl fmt::Display for Canceled {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match self {
            Canceled::Graceful => write!(f, "graceful cancellation is requested"),
            Canceled::Forced => write!(f, "forced cancellation is requested"),
        }
    }
}

impl std::error::Error for Canceled {}

pub(crate) struct Join {
    pub(crate) event: Arc<ManualResetEvent>,
}

impl Drop for Join {
    fn drop(&mut self) {
        self.event.set();
    }
}

#[derive(Clone)]
pub(crate) struct Token {
    // notifies when the scope is canceled (dropped)
    pub(crate) cancel: Arc<StateReceiver<bool>>,
    // notfies when all of the spawned children finish
    pub(crate) join: Weak<Join>,
}