execution_context/ctx.rs
1use std::any::TypeId;
2use std::cell::RefCell;
3use std::fmt;
4use std::panic;
5use std::rc::Rc;
6use std::sync::Arc;
7
8use data::{LocalMap, Opaque};
9
10lazy_static! {
11 static ref DEFAULT_ACTIVE_CONTEXT: Arc<ExecutionContextImpl> =
12 Arc::new(ExecutionContextImpl {
13 flow_propagation: FlowPropagation::Active,
14 locals: Default::default(),
15 });
16 static ref DEFAULT_DISABLED_CONTEXT: Arc<ExecutionContextImpl> =
17 Arc::new(ExecutionContextImpl {
18 flow_propagation: FlowPropagation::Disabled,
19 locals: Default::default(),
20 });
21}
22
23thread_local! {
24 static CURRENT_CONTEXT: RefCell<Arc<ExecutionContextImpl>> =
25 RefCell::new(DEFAULT_ACTIVE_CONTEXT.clone());
26}
27
28#[derive(PartialEq, Debug, Copy, Clone)]
29enum FlowPropagation {
30 Active,
31 Suppressed,
32 Disabled,
33}
34
35#[derive(Clone)]
36pub(crate) struct ExecutionContextImpl {
37 flow_propagation: FlowPropagation,
38 locals: LocalMap,
39}
40
41impl ExecutionContextImpl {
42 /// Wraps the execution context implementation in an Arc.
43 ///
44 /// Ths optimizes the two well known default cases.
45 fn into_arc(self) -> Arc<ExecutionContextImpl> {
46 match (self.flow_propagation, self.locals.is_empty()) {
47 (FlowPropagation::Active, true) => DEFAULT_ACTIVE_CONTEXT.clone(),
48 (FlowPropagation::Disabled, true) => DEFAULT_DISABLED_CONTEXT.clone(),
49 _ => Arc::new(self),
50 }
51 }
52
53 fn has_active_flow(&self) -> bool {
54 self.flow_propagation == FlowPropagation::Active
55 }
56}
57
58/// An execution context is a container for the current logical flow of execution.
59///
60/// This container holds all state that needs to be carried forward with the logical thread
61/// of execution.
62///
63/// The ExecutionContext class provides the functionality to capture and transfer the
64/// encapsulated context across asynchronous points such as threads or tasks.
65///
66/// An execution context can be captured, send and cloned. This permits a context to be
67/// carried to other threads.
68#[derive(Clone)]
69pub struct ExecutionContext {
70 inner: Arc<ExecutionContextImpl>,
71}
72
73impl fmt::Debug for ExecutionContext {
74 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
75 f.debug_struct("ExecutionContext").finish()
76 }
77}
78
79/// A guard for suspended flows.
80///
81/// This object is used as a guard to resume the flow that was suppressed by a
82/// call to `ExecutionContext::suppress_flow` or `ExecutionContext::disable_flow`.
83/// When it is dropped the flow is resumed.
84///
85/// The guard is internally reference counted.
86// the Rc is to make it non send
87#[derive(Clone)]
88pub struct FlowGuard(Rc<FlowPropagation>);
89
90impl ExecutionContext {
91 /// Captures the current execution context and returns it.
92 ///
93 /// If the current execution context is suppressed then this will instead
94 /// capture an empty default scope. Capturing will always succeed.
95 ///
96 /// Capturing the execution context means that the flow of data will
97 /// branch off here. If a flow local is modified after the flow is
98 /// captured it will not be reflected in the captured context.
99 ///
100 /// ## Example
101 ///
102 /// ```
103 /// # use execution_context::ExecutionContext;
104 /// let ec = ExecutionContext::capture();
105 /// ec.run(|| {
106 /// // this code runs in the flow of the given execution context.
107 /// });
108 /// ```
109 pub fn capture() -> ExecutionContext {
110 ExecutionContext {
111 inner: CURRENT_CONTEXT.with(|ctx| {
112 let current = ctx.borrow();
113 match current.flow_propagation {
114 FlowPropagation::Active => current.clone(),
115 FlowPropagation::Suppressed => DEFAULT_ACTIVE_CONTEXT.clone(),
116 FlowPropagation::Disabled => DEFAULT_DISABLED_CONTEXT.clone(),
117 }
118 }),
119 }
120 }
121
122 /// Suppresses the flow.
123 ///
124 /// This returns a clonable non-send guard that when dropped restores the
125 /// flow. This can be used to spawn an operation that should not be considered
126 /// to be part of the same logical flow. Once a new execution context has been
127 /// created, that context will start its own flow again.
128 ///
129 /// To permanently disable flow propagation use `disable_flow`.
130 ///
131 /// ## Example
132 ///
133 /// ```
134 /// # use execution_context::ExecutionContext;
135 /// {
136 /// let _guard = ExecutionContext::suppress_flow();
137 /// let ec = ExecutionContext::capture();
138 /// ec.run(|| {
139 /// // a new flow is started here because the captured flow was
140 /// // suppressed.
141 /// });
142 /// }
143 /// // the flow is resumed here
144 /// ```
145 pub fn suppress_flow() -> FlowGuard {
146 ExecutionContext::modify_context(|ctx| {
147 let old = ctx.flow_propagation;
148 ctx.flow_propagation = FlowPropagation::Suppressed;
149 FlowGuard(Rc::new(old))
150 })
151 }
152
153 /// Permanently disables the flow.
154 ///
155 /// This works similar to `suppress_flow` but instead of just starting a new
156 /// flow this permanently disables the flow. The flow can be manually restored
157 /// by a call to `restore_flow`.
158 pub fn disable_flow() -> FlowGuard {
159 ExecutionContext::modify_context(|ctx| {
160 let old = ctx.flow_propagation;
161 ctx.flow_propagation = FlowPropagation::Disabled;
162 FlowGuard(Rc::new(old))
163 })
164 }
165
166 /// Restores the flow.
167 ///
168 /// In normal situations the flow is restored when the flow guard is
169 /// dropped. However when for instance the flow is permanently disabled
170 /// with `disable_flow` new branches will never have their flow restored.
171 /// In those situations it might be useful to call into this function to
172 /// restore the flow.
173 pub fn restore_flow() {
174 ExecutionContext::modify_context(|ctx| {
175 ctx.flow_propagation = FlowPropagation::Active;
176 })
177 }
178
179 /// Checks if the flow is currently suppressed.
180 ///
181 /// A caller cannot determine if the flow is just temporarily suppressed
182 /// or permanently disabled.
183 pub fn is_flow_suppressed() -> bool {
184 CURRENT_CONTEXT.with(|ctx| !ctx.borrow().has_active_flow())
185 }
186
187 /// Runs a function in the context of the given execution context.
188 ///
189 /// The captured execution flow will be carried forward. If the flow
190 /// was suppressed a new flow is started. In case the flow was disabled
191 /// then it's also disabled here.
192 ///
193 /// ## Example
194 ///
195 /// ```
196 /// # use std::thread;
197 /// # use execution_context::ExecutionContext;
198 /// let ec = ExecutionContext::capture();
199 /// thread::spawn(move || {
200 /// ec.run(|| {
201 /// // the captured execution context is carried into
202 /// // another thread.
203 /// });
204 /// });
205 /// ```
206 pub fn run<F: FnOnce() -> R, R>(&self, f: F) -> R {
207 // figure out where we want to switch to. In case the current
208 // flow is the target flow, we can get away without having to do
209 // any panic handling and pointer swapping.
210 if let Some(old_ctx) = CURRENT_CONTEXT.with(|ctx| {
211 let mut ptr = ctx.borrow_mut();
212 if &**ptr as *const _ == &*self.inner as *const _ {
213 None
214 } else {
215 let old = (*ptr).clone();
216 *ptr = self.inner.clone();
217 Some(old)
218 }
219
220 // this is for the case where we just switched the execution
221 // context. This means we need to catch the panic, restore the
222 // old context and resume the panic if needed.
223 }) {
224 let rv = panic::catch_unwind(panic::AssertUnwindSafe(|| f()));
225 CURRENT_CONTEXT.with(|ctx| *ctx.borrow_mut() = old_ctx);
226 match rv {
227 Err(err) => panic::resume_unwind(err),
228 Ok(rv) => rv,
229 }
230
231 // simple case: same flow. We can just invoke the function
232 } else {
233 f()
234 }
235 }
236
237 /// Internal helper for context modifications
238 fn modify_context<F: FnOnce(&mut ExecutionContextImpl) -> R, R>(f: F) -> R {
239 CURRENT_CONTEXT.with(|ctx| {
240 let mut ptr = ctx.borrow_mut();
241 let mut new = ExecutionContextImpl {
242 flow_propagation: ptr.flow_propagation,
243 locals: ptr.locals.clone(),
244 };
245 let rv = f(&mut new);
246 *ptr = new.into_arc();
247 rv
248 })
249 }
250
251 /// Inserts a value into the locals.
252 pub(crate) fn set_local_value(key: TypeId, new_value: Arc<Box<Opaque>>) {
253 let new_locals = CURRENT_CONTEXT.with(|ctx| ctx.borrow().locals.insert(key, new_value));
254 ExecutionContext::modify_context(|ctx| {
255 ctx.locals = new_locals;
256 });
257 }
258
259 /// Returns a value from the locals.
260 pub(crate) fn get_local_value(key: TypeId) -> Option<Arc<Box<Opaque>>> {
261 CURRENT_CONTEXT.with(|ctx| ctx.borrow().locals.get(&key))
262 }
263}
264
265impl Drop for FlowGuard {
266 fn drop(&mut self) {
267 if let Some(old) = Rc::get_mut(&mut self.0) {
268 ExecutionContext::modify_context(|ctx| ctx.flow_propagation = *old);
269 }
270 }
271}