1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
//! Future extension for async context propagation.
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use pin_project_lite::pin_project;
use crate::store::{ContextStore, CONTEXT};
pin_project! {
/// A future wrapped with a `ContextStore` that is swapped into the
/// thread-local on each poll and swapped back out after.
pub struct WithContext<F> {
#[pin]
inner: F,
store: Option<ContextStore>,
}
}
impl<F: Future> Future for WithContext<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let prev = std::thread::LocalKey::with(&CONTEXT, |cell| cell.replace(this.store.take()));
let result = this.inner.poll(cx);
*this.store = std::thread::LocalKey::with(&CONTEXT, |cell| cell.replace(prev));
result
}
}
/// Extension trait providing context propagation for futures.
///
/// Available on all `Sized` types - typically used on futures before spawning.
pub trait ContextFutureExt: Sized {
/// Wrap this future with a specific `ContextStore`.
///
/// On each poll, the store is swapped into thread-local storage.
/// Mutations within the future persist across await points.
fn with(self, store: ContextStore) -> WithContext<Self> {
WithContext {
inner: self,
store: Some(store),
}
}
/// Attach a `ContextSnapshot` as the active store for this future.
///
/// Equivalent to `.with(snap.into())`.
/// Use at inbound boundaries where a deserialized snapshot needs to
/// become the active context.
fn attach(self, snap: crate::ContextSnapshot) -> WithContext<Self> {
self.with(snap.into())
}
/// Fork the current context and wrap this future with the forked child.
///
/// The child inherits parent values via frozen parent (cheap, Arc-shared).
/// Writes in the child are isolated from the parent.
fn fork(self) -> WithContext<Self> {
self.with(crate::fork())
}
/// Capture the current context and wrap this future with it.
///
/// Creates a snapshot (excluding local-only variables) and converts
/// it to a store.
fn capture(self) -> WithContext<Self> {
let snap = crate::capture();
self.with(snap.into())
}
/// Push a named scope into the given store and wrap this future.
///
/// Compose with other wrappers:
/// ```ignore
/// next.scope("remote:MyActor").attach(snap).await
/// ```
fn scope(self, name: &str) -> WithContext<Self> {
crate::registry::with_global_registry(|registry| {
let mut store = crate::fork();
store.push_scope(registry, Some(name.to_string()));
self.with(store)
})
}
}
impl<F: Sized> ContextFutureExt for F {}