Skip to main content

emit/platform/
thread_local_ctxt.rs

1/*!
2The [`ThreadLocalCtxt`] type.
3*/
4
5use std::{
6    cell::RefCell,
7    collections::{hash_map, HashMap},
8    mem,
9    ops::ControlFlow,
10    sync::{Arc, Mutex},
11};
12
13use emit_core::{
14    ctxt::Ctxt,
15    props::Props,
16    runtime::InternalCtxt,
17    str::Str,
18    value::{OwnedValue, ToValue, Value},
19};
20
21use crate::span::{SpanId, TraceId};
22
23/**
24A [`Ctxt`] that stores ambient state in thread local storage.
25
26Frames fully encapsulate all properties that were active when they were created so can be sent across threads to move that state with them.
27*/
28#[derive(Debug, Clone, Copy)]
29pub struct ThreadLocalCtxt {
30    id: usize,
31}
32
33impl Default for ThreadLocalCtxt {
34    fn default() -> Self {
35        Self::new()
36    }
37}
38
39impl ThreadLocalCtxt {
40    /**
41    Create a new thread local store with fully isolated storage.
42    */
43    pub fn new() -> Self {
44        ThreadLocalCtxt { id: ctxt_id() }
45    }
46
47    /**
48    Create a new thread local store sharing the same storage as any other [`ThreadLocalCtxt::shared`].
49    */
50    pub const fn shared() -> Self {
51        ThreadLocalCtxt { id: 0 }
52    }
53}
54
55/**
56A [`Ctxt::Frame`] on a [`ThreadLocalCtxt`].
57*/
58#[derive(Clone)]
59pub struct ThreadLocalCtxtFrame {
60    props: Option<Arc<HashMap<Str<'static>, ThreadLocalValue>>>,
61    generation: usize,
62}
63
64#[derive(Clone)]
65struct ThreadLocalValue {
66    inner: ThreadLocalValueInner,
67    generation: usize,
68}
69
70#[derive(Clone)]
71enum ThreadLocalValueInner {
72    TraceId(TraceId),
73    SpanId(SpanId),
74    Any(OwnedValue),
75}
76
77impl ThreadLocalValue {
78    fn from_value(generation: usize, value: Value) -> Self {
79        // Specialize a few common value types
80
81        if let Some(trace_id) = value.downcast_ref() {
82            return ThreadLocalValue {
83                inner: ThreadLocalValueInner::TraceId(*trace_id),
84                generation,
85            };
86        }
87
88        if let Some(span_id) = value.downcast_ref() {
89            return ThreadLocalValue {
90                inner: ThreadLocalValueInner::SpanId(*span_id),
91                generation,
92            };
93        }
94
95        // Fall back to buffering
96        ThreadLocalValue {
97            inner: ThreadLocalValueInner::Any(value.to_shared()),
98            generation,
99        }
100    }
101}
102
103impl ToValue for ThreadLocalValue {
104    fn to_value(&self) -> Value<'_> {
105        match self.inner {
106            ThreadLocalValueInner::TraceId(ref value) => value.to_value(),
107            ThreadLocalValueInner::SpanId(ref value) => value.to_value(),
108            ThreadLocalValueInner::Any(ref value) => value.to_value(),
109        }
110    }
111}
112
113impl Props for ThreadLocalCtxtFrame {
114    fn for_each<'a, F: FnMut(Str<'a>, Value<'a>) -> ControlFlow<()>>(
115        &'a self,
116        mut for_each: F,
117    ) -> ControlFlow<()> {
118        if let Some(ref props) = self.props {
119            for (k, v) in &**props {
120                for_each(k.by_ref(), v.to_value())?;
121            }
122        }
123
124        ControlFlow::Continue(())
125    }
126
127    fn get<'v, K: emit_core::str::ToStr>(&'v self, key: K) -> Option<Value<'v>> {
128        self.props.as_ref().and_then(|props| props.get(key))
129    }
130
131    fn is_unique(&self) -> bool {
132        true
133    }
134}
135
136impl Ctxt for ThreadLocalCtxt {
137    type Current = ThreadLocalCtxtFrame;
138    type Frame = ThreadLocalCtxtFrame;
139
140    fn with_current<R, F: FnOnce(&Self::Current) -> R>(&self, with: F) -> R {
141        let current = current(self.id);
142        with(&current)
143    }
144
145    fn open_root<P: Props>(&self, props: P) -> Self::Frame {
146        let mut span = HashMap::new();
147        let generation = 0;
148
149        let _ = props.for_each(|k, v| {
150            span.entry(k.to_shared())
151                .or_insert_with(|| ThreadLocalValue::from_value(generation, v));
152
153            ControlFlow::Continue(())
154        });
155
156        ThreadLocalCtxtFrame {
157            props: Some(Arc::new(span)),
158            generation,
159        }
160    }
161
162    fn open_push<P: Props>(&self, props: P) -> Self::Frame {
163        let mut span = current(self.id);
164
165        if span.props.is_none() {
166            span.props = Some(Arc::new(HashMap::new()));
167        }
168
169        let generation = span.generation.wrapping_add(1);
170        span.generation = generation;
171
172        let span_props = Arc::make_mut(span.props.as_mut().unwrap());
173
174        let _ = props.for_each(|k, v| {
175            match span_props.entry(k.to_shared()) {
176                hash_map::Entry::Vacant(entry) => {
177                    // If the map doesn't already contain this property then insert it
178                    entry.insert(ThreadLocalValue::from_value(generation, v));
179                }
180                hash_map::Entry::Occupied(mut entry) => {
181                    let entry = entry.get_mut();
182
183                    if entry.generation != generation {
184                        // If the map does contain this property, and it's from a different generation
185                        // then overwrite it, otherwise keep the value that's already there
186                        *entry = ThreadLocalValue::from_value(generation, v);
187                    }
188                }
189            }
190
191            ControlFlow::Continue(())
192        });
193
194        span
195    }
196
197    fn enter(&self, frame: &mut Self::Frame) {
198        swap(self.id, frame);
199    }
200
201    fn exit(&self, frame: &mut Self::Frame) {
202        swap(self.id, frame);
203    }
204
205    fn close(&self, _: Self::Frame) {}
206}
207
208impl InternalCtxt for ThreadLocalCtxt {}
209
210// Start this id from 1 so it doesn't intersect with the `shared` variant below
211static NEXT_CTXT_ID: Mutex<usize> = Mutex::new(1);
212
213fn ctxt_id() -> usize {
214    let mut next_id = NEXT_CTXT_ID.lock().unwrap();
215    let id = *next_id;
216    *next_id = id.wrapping_add(1);
217
218    id
219}
220
221thread_local! {
222    static ACTIVE: RefCell<HashMap<usize, ThreadLocalCtxtFrame>> = RefCell::new(HashMap::new());
223}
224
225fn current(id: usize) -> ThreadLocalCtxtFrame {
226    ACTIVE.with(|active| {
227        active
228            .borrow_mut()
229            .entry(id)
230            .or_insert_with(|| ThreadLocalCtxtFrame {
231                props: None,
232                generation: 0,
233            })
234            .clone()
235    })
236}
237
238fn swap(id: usize, incoming: &mut ThreadLocalCtxtFrame) {
239    ACTIVE.with(|active| {
240        let mut active = active.borrow_mut();
241
242        let current = active.entry(id).or_insert_with(|| ThreadLocalCtxtFrame {
243            props: None,
244            generation: 0,
245        });
246
247        mem::swap(current, incoming);
248    })
249}
250
251#[cfg(test)]
252mod tests {
253    use super::*;
254
255    #[cfg(all(
256        target_arch = "wasm32",
257        target_vendor = "unknown",
258        target_os = "unknown"
259    ))]
260    use wasm_bindgen_test::*;
261
262    impl ThreadLocalCtxtFrame {
263        fn props(&self) -> HashMap<Str<'static>, ThreadLocalValue> {
264            self.props
265                .as_ref()
266                .map(|props| (**props).clone())
267                .unwrap_or_default()
268        }
269    }
270
271    #[test]
272    fn can_inline() {
273        use std::mem;
274
275        // Mirrors the impl of `ErasedFrame`
276        union RawErasedFrame {
277            _data: *mut (),
278            _inline: mem::MaybeUninit<[usize; 2]>,
279        }
280
281        assert!(
282            mem::size_of::<ThreadLocalCtxt>() <= mem::size_of::<RawErasedFrame>()
283                && mem::align_of::<ThreadLocalCtxt>() <= mem::align_of::<RawErasedFrame>()
284        );
285    }
286
287    #[test]
288    #[cfg_attr(
289        all(
290            target_arch = "wasm32",
291            target_vendor = "unknown",
292            target_os = "unknown"
293        ),
294        wasm_bindgen_test
295    )]
296    fn push_props() {
297        let ctxt = ThreadLocalCtxt::new();
298
299        ctxt.clone().with_current(|props| {
300            assert_eq!(0, props.props().len());
301        });
302
303        let mut frame = ctxt.clone().open_push(("a", 1));
304
305        assert_eq!(1, frame.props().len());
306        ctxt.clone().with_current(|props| {
307            assert_eq!(0, props.props().len());
308        });
309
310        ctxt.clone().enter(&mut frame);
311
312        assert_eq!(0, frame.props().len());
313
314        let mut inner = ctxt.clone().open_push([("b", 1), ("a", 2)]);
315
316        ctxt.clone().with_current(|props| {
317            let props = props.props();
318
319            assert_eq!(1, props.len());
320            assert_eq!(1, props["a"].to_value().cast::<i32>().unwrap());
321        });
322
323        ctxt.clone().enter(&mut inner);
324
325        ctxt.clone().with_current(|props| {
326            let props = props.props();
327
328            assert_eq!(2, props.len());
329            assert_eq!(2, props["a"].to_value().cast::<i32>().unwrap());
330            assert_eq!(1, props["b"].to_value().cast::<i32>().unwrap());
331        });
332
333        ctxt.clone().exit(&mut inner);
334
335        ctxt.clone().exit(&mut frame);
336
337        assert_eq!(1, frame.props().len());
338        ctxt.clone().with_current(|props| {
339            assert_eq!(0, props.props().len());
340        });
341    }
342
343    #[test]
344    fn dedup() {
345        let ctxt = ThreadLocalCtxt::new();
346
347        ctxt.clone().with_current(|props| {
348            assert_eq!(0, props.props().len());
349        });
350
351        let mut frame = ctxt.clone().open_push([("a", 1), ("a", 2)]);
352
353        assert_eq!(
354            "1",
355            frame.props.as_ref().unwrap()["a"].to_value().to_string()
356        );
357
358        ctxt.enter(&mut frame);
359
360        let inner = ctxt.clone().open_push([("a", 2), ("a", 3)]);
361
362        assert_eq!(
363            "2",
364            inner.props.as_ref().unwrap()["a"].to_value().to_string()
365        );
366
367        ctxt.exit(&mut frame);
368    }
369
370    #[test]
371    fn out_of_order_enter_exit() {
372        let ctxt = ThreadLocalCtxt::new();
373
374        let mut a = ctxt.open_push(("a", 1));
375
376        ctxt.enter(&mut a);
377
378        let mut b = ctxt.open_push(("b", 1));
379
380        ctxt.enter(&mut b);
381
382        // Ensure out-of-order exit doesn't panic
383
384        ctxt.exit(&mut a);
385        ctxt.exit(&mut b);
386    }
387
388    #[test]
389    fn isolation() {
390        let ctxt_a = ThreadLocalCtxt::new();
391
392        let ctxt_b = ThreadLocalCtxt::new();
393
394        let mut frame_a = ctxt_a.open_push(("a", 1));
395
396        ctxt_a.enter(&mut frame_a);
397
398        ctxt_a.with_current(|props| {
399            assert_eq!(1, props.props().len());
400        });
401
402        ctxt_b.with_current(|props| {
403            assert_eq!(0, props.props().len());
404        });
405
406        ctxt_a.exit(&mut frame_a);
407    }
408
409    #[test]
410    #[cfg(not(target_arch = "wasm32"))]
411    fn frame_thread_propagation() {
412        use std::thread;
413
414        let ctxt = ThreadLocalCtxt::new();
415
416        let mut frame = ctxt.open_push(("a", 1));
417
418        ctxt.enter(&mut frame);
419
420        thread::spawn({
421            let ctxt = ctxt.clone();
422
423            move || {
424                ctxt.with_current(|props| {
425                    assert_eq!(0, props.props().len());
426                });
427            }
428        })
429        .join()
430        .unwrap();
431
432        let mut current = ctxt.with_current(|props| props.clone());
433
434        thread::spawn({
435            let ctxt = ctxt.clone();
436
437            move || {
438                ctxt.enter(&mut current);
439
440                ctxt.with_current(|props| {
441                    assert_eq!(1, props.props().len());
442                });
443
444                ctxt.exit(&mut current);
445            }
446        })
447        .join()
448        .unwrap();
449    }
450}