1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291
//! Implementation of the golang Context (https://pkg.go.dev/context).
//! You can think of context as a runtime equivalent of the rust lifetime:
//! As soon as its context is canceled the function should return ASAP,
//! without doing any further blocking calls.
//!
//! It is NOT possible to extend the context provided by the caller, who defines
//! how long it allows the function call to execute.
//!
//! Context is essentially a cancellation token which should be passed
//! down the call stack, so that it can be awaited together with every blocking call:
//! Instead of "awaiting for new data on the channel", you "await for new data on the channel OR
//! for context to get canceled". This way you can implement graceful shutdown
//! in a very uniform way.
use crate::{signal, time};
use std::{fmt, future::Future, pin::Pin, sync::Arc, task};
pub mod channel;
mod clock;
mod no_copy;
mod rng;
mod testonly;
#[cfg(test)]
mod tests;
pub use clock::*;
pub use no_copy::NoCopy;
pub use testonly::*;
/// Contexts are composed into a tree via `_parent` link.
/// We maintain an invariant `_parent.deadline <= deadline`.
/// If a parent gets canceled, the child also gets canceled immediately afterwards,
/// although not atomically. If deadline passes the context also gets canceled.
///
/// The cascade cancellation is implemented by spawning a tokio
/// task awaiting for parent to be canceled and canceling the child afterwards
/// (it also awaits for deadline and the child itself to be canceled just in case, to avoid memory leaks).
pub struct Ctx(Arc<Inner>);
/// Inner representation of the context.
struct Inner {
clock: Clock,
rng_provider: rng::Provider,
/// Signal sent once this context is canceled.
canceled: Arc<signal::Once>,
/// Deadline after which the context will be automatically canceled.
deadline: time::Deadline,
/// Parent context.
_parent: Option<Arc<Inner>>,
}
impl Drop for Inner {
fn drop(&mut self) {
// Automatically cancel the context when State is dropped.
// This wakes the task awaiting parent cancellation, so that it doesn't leak.
// Note that since children keep a reference to the parent, no
// cascade cancellation will happen here.
self.canceled.send();
}
}
/// Error returned when the blocking operation was interrupted
/// due to context getting canceled.
#[derive(Debug, PartialEq, Eq, thiserror::Error)]
#[error("canceled")]
pub struct Canceled;
/// Wraps result with ErrCancel as an error.
pub type OrCanceled<T> = std::result::Result<T, Canceled>;
/// Blocks the current thread until future f is completed, using
/// the local tokio runtime. Use this function to generate a blocking
/// version of async context-aware primitives.
/// Blocking.
#[track_caller]
pub(crate) fn block_on<F: Future>(f: F) -> F::Output {
tokio::runtime::Handle::current().block_on(f)
}
/// Constructs a top-level context.
/// Should be called only at the start of the `main()` function of the binary.
pub fn root() -> Ctx {
Ctx(Arc::new(Inner {
clock: RealClock.into(),
rng_provider: rng::Provider::real(),
canceled: Arc::new(signal::Once::new()),
deadline: time::Deadline::Infinite,
_parent: None,
}))
}
impl fmt::Debug for Ctx {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.debug_struct("Ctx").finish_non_exhaustive()
}
}
/// Context-aware future, that an async task can await and
/// sync task can block on.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[pin_project::pin_project]
pub struct CtxAware<F>(#[pin] pub(crate) F);
impl<F: Future> Future for CtxAware<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
self.project().0.poll(cx)
}
}
impl<F: Future> CtxAware<F> {
/// Blocks the current thread until the future is completed.
pub fn block(self) -> F::Output {
block_on(self)
}
}
impl Ctx {
/// Clones the context.
/// `Ctx` doesn't implement `Clone` so that it is not possible to
/// clone it outside of this crate.
pub(crate) fn clone(&self) -> Self {
Self(self.0.clone())
}
/// Constructs a new child context.
pub(crate) fn child(&self, deadline: time::Deadline) -> Self {
self.child_with_clock(self.0.clock.clone(), deadline)
}
fn child_with_clock(&self, clock: Clock, deadline: time::Deadline) -> Self {
let deadline = std::cmp::min(self.0.deadline, deadline);
let parent_canceled = self.0.canceled.clone();
let child_canceled = Arc::new(signal::Once::new());
let child = Self(Arc::new(Inner {
clock: clock.clone(),
rng_provider: self.0.rng_provider.split(),
canceled: child_canceled.clone(),
deadline,
_parent: Some(self.0.clone()),
}));
// Spawn a task propagating task cancelation.
// This task takes references only to the `canceled` signals
// of parent and child (rather that the whole context object)
// to avoid a reference loop and therefore a memory leak:
// context is automatically canceled when dropped, which
// guarantees that this task eventually completes.
tokio::spawn(async move {
tokio::select! {
_ = clock.sleep_until(deadline,false) => child_canceled.send(),
_ = parent_canceled.cancel_safe_recv() => child_canceled.send(),
_ = child_canceled.cancel_safe_recv() => {},
}
});
child
}
/// Cascade cancels this context and all the descendants.
pub(crate) fn cancel(&self) {
self.0.canceled.send();
}
/// Awaits until this context gets canceled.
pub fn canceled(&self) -> CtxAware<impl '_ + Future<Output = ()>> {
CtxAware(self.0.canceled.cancel_safe_recv())
}
/// Checks if this context is still active (i.e., not canceled).
pub fn is_active(&self) -> bool {
!self.0.canceled.try_recv()
}
/// The time at which this context will be canceled.
/// The task should use it to schedule its work accordingly.
/// Remember that this is just a hint, because the local context
/// may get canceled before the deadline.
pub fn deadline(&self) -> time::Deadline {
self.0.deadline
}
/// Awaits until the provided future `fut` completes, or the context gets canceled.
/// `fut` is required to be cancel-safe. It logically doesn't make sense to call this method
/// for context-aware futures, since they can handle context cancellation already.
pub fn wait<'a, F: 'a + Future>(
&'a self,
fut: F,
) -> CtxAware<impl 'a + Future<Output = OrCanceled<F::Output>>> {
CtxAware(async {
tokio::select! {
output = fut => OrCanceled::Ok(output),
() = self.0.canceled.cancel_safe_recv() => OrCanceled::Err(Canceled),
}
})
}
/// Constructs a sub-context with deadline `d`.
pub fn with_deadline(&self, d: time::Deadline) -> Self {
self.child(d)
}
/// Constructs a sub-context with deadline `now() + d`.
pub fn with_timeout(&self, d: time::Duration) -> Self {
self.child((self.now() + d).into())
}
/// Current time according to the monotone clock.
pub fn now(&self) -> time::Instant {
self.0.clock.now()
}
/// Current time according to the system/walltime clock.
pub fn now_utc(&self) -> time::Utc {
self.0.clock.now_utc()
}
/// Waits for a specific time.
pub fn sleep(&self, d: time::Duration) -> CtxAware<impl '_ + Future<Output = OrCanceled<()>>> {
self.wait(self.0.clock.sleep(d))
}
/// Blocks until `t`.
pub fn sleep_until(
&self,
t: time::Instant,
) -> CtxAware<impl '_ + Future<Output = OrCanceled<()>>> {
self.wait(self.0.clock.sleep_until(t.into(), true))
}
/// Blocks until deadline `t`.
pub fn sleep_until_deadline(
&self,
t: time::Deadline,
) -> CtxAware<impl '_ + Future<Output = OrCanceled<()>>> {
self.wait(self.0.clock.sleep_until(t, true))
}
/// Constructs a cryptographically-safe random number generator,
/// * in prod, seeded with system entropy
/// WARNING: do not store the result, but rather create new RNG
/// whenever you do computation requiring entropy source.
/// call to rng() is moderately expensive, so don't call it separately
/// for every single bit of entropy though.
/// TODO(gprusak): for now we do not have use cases for reseeding
/// but perhaps eventually we should returns an auto-reseeding RNG instead.
/// * in test, seeded deterministically
/// TODO(gprusak): this is not a perfect determinism, because multiple
/// tasks currently are allowed to access the same ctx, so the order in
/// which they call rng() is not deterministic. To fix this we
/// would need to move the Provider to task-local storage.
pub fn rng(&self) -> rand::rngs::StdRng {
self.0.rng_provider.rng()
}
}
/// anyhow::Error + "canceled" variant.
/// Useful for working with concurrent code which doesn't need structured errors,
/// but needs to handle cancelation explicitly.
///
/// WARNING: this error type implements both `Wrap` and `From<anyhow::Error>`.
/// You should be careful to NOT use `context()` instead of `wrap()`,
/// because otherwise the `Canceled` error will get silently translated
/// to `Internal` error.
#[derive(thiserror::Error, Debug)]
pub enum Error {
/// Context has been canceled before call completion.
#[error(transparent)]
Canceled(#[from] Canceled),
/// Other error.
#[error(transparent)]
Internal(#[from] anyhow::Error),
}
/// Alias for Result with `ctx::Error`.
pub type Result<T> = std::result::Result<T, Error>;
/// Equivalent to Ok::<_, ctx::Error>(value).
///
/// This simplifies creation of an ctx::Result in places where type inference
/// cannot deduce the `E` type of the result — without needing to write
/// `Ok::<_, ctx::Error>(value)`.
#[allow(non_snake_case)]
pub fn Ok<T>(t: T) -> Result<T> {
Result::Ok(t)
}
impl crate::error::Wrap for Error {
fn with_wrap<C: fmt::Display + Send + Sync + 'static, F: FnOnce() -> C>(self, f: F) -> Self {
match self {
Error::Internal(err) => Error::Internal(err.context(f())),
err => err,
}
}
}