execution_context/
ctx.rs

1use std::any::TypeId;
2use std::cell::RefCell;
3use std::fmt;
4use std::panic;
5use std::rc::Rc;
6use std::sync::Arc;
7
8use data::{LocalMap, Opaque};
9
10lazy_static! {
11    static ref DEFAULT_ACTIVE_CONTEXT: Arc<ExecutionContextImpl> =
12        Arc::new(ExecutionContextImpl {
13            flow_propagation: FlowPropagation::Active,
14            locals: Default::default(),
15        });
16    static ref DEFAULT_DISABLED_CONTEXT: Arc<ExecutionContextImpl> =
17        Arc::new(ExecutionContextImpl {
18            flow_propagation: FlowPropagation::Disabled,
19            locals: Default::default(),
20        });
21}
22
23thread_local! {
24    static CURRENT_CONTEXT: RefCell<Arc<ExecutionContextImpl>> =
25        RefCell::new(DEFAULT_ACTIVE_CONTEXT.clone());
26}
27
28#[derive(PartialEq, Debug, Copy, Clone)]
29enum FlowPropagation {
30    Active,
31    Suppressed,
32    Disabled,
33}
34
35#[derive(Clone)]
36pub(crate) struct ExecutionContextImpl {
37    flow_propagation: FlowPropagation,
38    locals: LocalMap,
39}
40
41impl ExecutionContextImpl {
42    /// Wraps the execution context implementation in an Arc.
43    ///
44    /// Ths optimizes the two well known default cases.
45    fn into_arc(self) -> Arc<ExecutionContextImpl> {
46        match (self.flow_propagation, self.locals.is_empty()) {
47            (FlowPropagation::Active, true) => DEFAULT_ACTIVE_CONTEXT.clone(),
48            (FlowPropagation::Disabled, true) => DEFAULT_DISABLED_CONTEXT.clone(),
49            _ => Arc::new(self),
50        }
51    }
52
53    fn has_active_flow(&self) -> bool {
54        self.flow_propagation == FlowPropagation::Active
55    }
56}
57
58/// An execution context is a container for the current logical flow of execution.
59///
60/// This container holds all state that needs to be carried forward with the logical thread
61/// of execution.
62///
63/// The ExecutionContext class provides the functionality to capture and transfer the
64/// encapsulated context across asynchronous points such as threads or tasks.
65///
66/// An execution context can be captured, send and cloned.  This permits a context to be
67/// carried to other threads.
68#[derive(Clone)]
69pub struct ExecutionContext {
70    inner: Arc<ExecutionContextImpl>,
71}
72
73impl fmt::Debug for ExecutionContext {
74    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
75        f.debug_struct("ExecutionContext").finish()
76    }
77}
78
79/// A guard for suspended flows.
80///
81/// This object is used as a guard to resume the flow that was suppressed by a
82/// call to `ExecutionContext::suppress_flow` or `ExecutionContext::disable_flow`.
83/// When it is dropped the flow is resumed.
84///
85/// The guard is internally reference counted.
86// the Rc is to make it non send
87#[derive(Clone)]
88pub struct FlowGuard(Rc<FlowPropagation>);
89
90impl ExecutionContext {
91    /// Captures the current execution context and returns it.
92    ///
93    /// If the current execution context is suppressed then this will instead
94    /// capture an empty default scope.  Capturing will always succeed.
95    ///
96    /// Capturing the execution context means that the flow of data will
97    /// branch off here.  If a flow local is modified after the flow is
98    /// captured it will not be reflected in the captured context.
99    ///
100    /// ## Example
101    ///
102    /// ```
103    /// # use execution_context::ExecutionContext;
104    /// let ec = ExecutionContext::capture();
105    /// ec.run(|| {
106    ///     // this code runs in the flow of the given execution context.
107    /// });
108    /// ```
109    pub fn capture() -> ExecutionContext {
110        ExecutionContext {
111            inner: CURRENT_CONTEXT.with(|ctx| {
112                let current = ctx.borrow();
113                match current.flow_propagation {
114                    FlowPropagation::Active => current.clone(),
115                    FlowPropagation::Suppressed => DEFAULT_ACTIVE_CONTEXT.clone(),
116                    FlowPropagation::Disabled => DEFAULT_DISABLED_CONTEXT.clone(),
117                }
118            }),
119        }
120    }
121
122    /// Suppresses the flow.
123    ///
124    /// This returns a clonable non-send guard that when dropped restores the
125    /// flow.  This can be used to spawn an operation that should not be considered
126    /// to be part of the same logical flow.  Once a new execution context has been
127    /// created, that context will start its own flow again.
128    ///
129    /// To permanently disable flow propagation use `disable_flow`.
130    ///
131    /// ## Example
132    ///
133    /// ```
134    /// # use execution_context::ExecutionContext;
135    /// {
136    ///     let _guard = ExecutionContext::suppress_flow();
137    ///     let ec = ExecutionContext::capture();
138    ///     ec.run(|| {
139    ///         // a new flow is started here because the captured flow was
140    ///         // suppressed.
141    ///     });
142    /// }
143    /// // the flow is resumed here
144    /// ```
145    pub fn suppress_flow() -> FlowGuard {
146        ExecutionContext::modify_context(|ctx| {
147            let old = ctx.flow_propagation;
148            ctx.flow_propagation = FlowPropagation::Suppressed;
149            FlowGuard(Rc::new(old))
150        })
151    }
152
153    /// Permanently disables the flow.
154    ///
155    /// This works similar to `suppress_flow` but instead of just starting a new
156    /// flow this permanently disables the flow.  The flow can be manually restored
157    /// by a call to `restore_flow`.
158    pub fn disable_flow() -> FlowGuard {
159        ExecutionContext::modify_context(|ctx| {
160            let old = ctx.flow_propagation;
161            ctx.flow_propagation = FlowPropagation::Disabled;
162            FlowGuard(Rc::new(old))
163        })
164    }
165
166    /// Restores the flow.
167    ///
168    /// In normal situations the flow is restored when the flow guard is
169    /// dropped.  However when for instance the flow is permanently disabled
170    /// with `disable_flow` new branches will never have their flow restored.
171    /// In those situations it might be useful to call into this function to
172    /// restore the flow.
173    pub fn restore_flow() {
174        ExecutionContext::modify_context(|ctx| {
175            ctx.flow_propagation = FlowPropagation::Active;
176        })
177    }
178
179    /// Checks if the flow is currently suppressed.
180    ///
181    /// A caller cannot determine if the flow is just temporarily suppressed
182    /// or permanently disabled.
183    pub fn is_flow_suppressed() -> bool {
184        CURRENT_CONTEXT.with(|ctx| !ctx.borrow().has_active_flow())
185    }
186
187    /// Runs a function in the context of the given execution context.
188    ///
189    /// The captured execution flow will be carried forward.  If the flow
190    /// was suppressed a new flow is started.  In case the flow was disabled
191    /// then it's also disabled here.
192    ///
193    /// ## Example
194    ///
195    /// ```
196    /// # use std::thread;
197    /// # use execution_context::ExecutionContext;
198    /// let ec = ExecutionContext::capture();
199    /// thread::spawn(move || {
200    ///     ec.run(|| {
201    ///         // the captured execution context is carried into
202    ///         // another thread.
203    ///     });
204    /// });
205    /// ```
206    pub fn run<F: FnOnce() -> R, R>(&self, f: F) -> R {
207        // figure out where we want to switch to.  In case the current
208        // flow is the target flow, we can get away without having to do
209        // any panic handling and pointer swapping.
210        if let Some(old_ctx) = CURRENT_CONTEXT.with(|ctx| {
211            let mut ptr = ctx.borrow_mut();
212            if &**ptr as *const _ == &*self.inner as *const _ {
213                None
214            } else {
215                let old = (*ptr).clone();
216                *ptr = self.inner.clone();
217                Some(old)
218            }
219
220        // this is for the case where we just switched the execution
221        // context.  This means we need to catch the panic, restore the
222        // old context and resume the panic if needed.
223        }) {
224            let rv = panic::catch_unwind(panic::AssertUnwindSafe(|| f()));
225            CURRENT_CONTEXT.with(|ctx| *ctx.borrow_mut() = old_ctx);
226            match rv {
227                Err(err) => panic::resume_unwind(err),
228                Ok(rv) => rv,
229            }
230
231        // simple case: same flow.  We can just invoke the function
232        } else {
233            f()
234        }
235    }
236
237    /// Internal helper for context modifications
238    fn modify_context<F: FnOnce(&mut ExecutionContextImpl) -> R, R>(f: F) -> R {
239        CURRENT_CONTEXT.with(|ctx| {
240            let mut ptr = ctx.borrow_mut();
241            let mut new = ExecutionContextImpl {
242                flow_propagation: ptr.flow_propagation,
243                locals: ptr.locals.clone(),
244            };
245            let rv = f(&mut new);
246            *ptr = new.into_arc();
247            rv
248        })
249    }
250
251    /// Inserts a value into the locals.
252    pub(crate) fn set_local_value(key: TypeId, new_value: Arc<Box<Opaque>>) {
253        let new_locals = CURRENT_CONTEXT.with(|ctx| ctx.borrow().locals.insert(key, new_value));
254        ExecutionContext::modify_context(|ctx| {
255            ctx.locals = new_locals;
256        });
257    }
258
259    /// Returns a value from the locals.
260    pub(crate) fn get_local_value(key: TypeId) -> Option<Arc<Box<Opaque>>> {
261        CURRENT_CONTEXT.with(|ctx| ctx.borrow().locals.get(&key))
262    }
263}
264
265impl Drop for FlowGuard {
266    fn drop(&mut self) {
267        if let Some(old) = Rc::get_mut(&mut self.0) {
268            ExecutionContext::modify_context(|ctx| ctx.flow_propagation = *old);
269        }
270    }
271}