1use std::{
6 cell::RefCell,
7 collections::{hash_map, HashMap},
8 mem,
9 ops::ControlFlow,
10 sync::{Arc, Mutex},
11};
12
13use emit_core::{
14 ctxt::Ctxt,
15 props::Props,
16 runtime::InternalCtxt,
17 str::Str,
18 value::{OwnedValue, ToValue, Value},
19};
20
21use crate::span::{SpanId, TraceId};
22
23#[derive(Debug, Clone, Copy)]
29pub struct ThreadLocalCtxt {
30 id: usize,
31}
32
33impl Default for ThreadLocalCtxt {
34 fn default() -> Self {
35 Self::new()
36 }
37}
38
39impl ThreadLocalCtxt {
40 pub fn new() -> Self {
44 ThreadLocalCtxt { id: ctxt_id() }
45 }
46
47 pub const fn shared() -> Self {
51 ThreadLocalCtxt { id: 0 }
52 }
53}
54
55#[derive(Clone)]
59pub struct ThreadLocalCtxtFrame {
60 props: Option<Arc<HashMap<Str<'static>, ThreadLocalValue>>>,
61 generation: usize,
62}
63
64#[derive(Clone)]
65struct ThreadLocalValue {
66 inner: ThreadLocalValueInner,
67 generation: usize,
68}
69
70#[derive(Clone)]
71enum ThreadLocalValueInner {
72 TraceId(TraceId),
73 SpanId(SpanId),
74 Any(OwnedValue),
75}
76
77impl ThreadLocalValue {
78 fn from_value(generation: usize, value: Value) -> Self {
79 if let Some(trace_id) = value.downcast_ref() {
82 return ThreadLocalValue {
83 inner: ThreadLocalValueInner::TraceId(*trace_id),
84 generation,
85 };
86 }
87
88 if let Some(span_id) = value.downcast_ref() {
89 return ThreadLocalValue {
90 inner: ThreadLocalValueInner::SpanId(*span_id),
91 generation,
92 };
93 }
94
95 ThreadLocalValue {
97 inner: ThreadLocalValueInner::Any(value.to_shared()),
98 generation,
99 }
100 }
101}
102
103impl ToValue for ThreadLocalValue {
104 fn to_value(&self) -> Value<'_> {
105 match self.inner {
106 ThreadLocalValueInner::TraceId(ref value) => value.to_value(),
107 ThreadLocalValueInner::SpanId(ref value) => value.to_value(),
108 ThreadLocalValueInner::Any(ref value) => value.to_value(),
109 }
110 }
111}
112
113impl Props for ThreadLocalCtxtFrame {
114 fn for_each<'a, F: FnMut(Str<'a>, Value<'a>) -> ControlFlow<()>>(
115 &'a self,
116 mut for_each: F,
117 ) -> ControlFlow<()> {
118 if let Some(ref props) = self.props {
119 for (k, v) in &**props {
120 for_each(k.by_ref(), v.to_value())?;
121 }
122 }
123
124 ControlFlow::Continue(())
125 }
126
127 fn get<'v, K: emit_core::str::ToStr>(&'v self, key: K) -> Option<Value<'v>> {
128 self.props.as_ref().and_then(|props| props.get(key))
129 }
130
131 fn is_unique(&self) -> bool {
132 true
133 }
134}
135
136impl Ctxt for ThreadLocalCtxt {
137 type Current = ThreadLocalCtxtFrame;
138 type Frame = ThreadLocalCtxtFrame;
139
140 fn with_current<R, F: FnOnce(&Self::Current) -> R>(&self, with: F) -> R {
141 let current = current(self.id);
142 with(¤t)
143 }
144
145 fn open_root<P: Props>(&self, props: P) -> Self::Frame {
146 let mut span = HashMap::new();
147 let generation = 0;
148
149 let _ = props.for_each(|k, v| {
150 span.entry(k.to_shared())
151 .or_insert_with(|| ThreadLocalValue::from_value(generation, v));
152
153 ControlFlow::Continue(())
154 });
155
156 ThreadLocalCtxtFrame {
157 props: Some(Arc::new(span)),
158 generation,
159 }
160 }
161
162 fn open_push<P: Props>(&self, props: P) -> Self::Frame {
163 let mut span = current(self.id);
164
165 if span.props.is_none() {
166 span.props = Some(Arc::new(HashMap::new()));
167 }
168
169 let generation = span.generation.wrapping_add(1);
170 span.generation = generation;
171
172 let span_props = Arc::make_mut(span.props.as_mut().unwrap());
173
174 let _ = props.for_each(|k, v| {
175 match span_props.entry(k.to_shared()) {
176 hash_map::Entry::Vacant(entry) => {
177 entry.insert(ThreadLocalValue::from_value(generation, v));
179 }
180 hash_map::Entry::Occupied(mut entry) => {
181 let entry = entry.get_mut();
182
183 if entry.generation != generation {
184 *entry = ThreadLocalValue::from_value(generation, v);
187 }
188 }
189 }
190
191 ControlFlow::Continue(())
192 });
193
194 span
195 }
196
197 fn enter(&self, frame: &mut Self::Frame) {
198 swap(self.id, frame);
199 }
200
201 fn exit(&self, frame: &mut Self::Frame) {
202 swap(self.id, frame);
203 }
204
205 fn close(&self, _: Self::Frame) {}
206}
207
208impl InternalCtxt for ThreadLocalCtxt {}
209
210static NEXT_CTXT_ID: Mutex<usize> = Mutex::new(1);
212
213fn ctxt_id() -> usize {
214 let mut next_id = NEXT_CTXT_ID.lock().unwrap();
215 let id = *next_id;
216 *next_id = id.wrapping_add(1);
217
218 id
219}
220
221thread_local! {
222 static ACTIVE: RefCell<HashMap<usize, ThreadLocalCtxtFrame>> = RefCell::new(HashMap::new());
223}
224
225fn current(id: usize) -> ThreadLocalCtxtFrame {
226 ACTIVE.with(|active| {
227 active
228 .borrow_mut()
229 .entry(id)
230 .or_insert_with(|| ThreadLocalCtxtFrame {
231 props: None,
232 generation: 0,
233 })
234 .clone()
235 })
236}
237
238fn swap(id: usize, incoming: &mut ThreadLocalCtxtFrame) {
239 ACTIVE.with(|active| {
240 let mut active = active.borrow_mut();
241
242 let current = active.entry(id).or_insert_with(|| ThreadLocalCtxtFrame {
243 props: None,
244 generation: 0,
245 });
246
247 mem::swap(current, incoming);
248 })
249}
250
251#[cfg(test)]
252mod tests {
253 use super::*;
254
255 #[cfg(all(
256 target_arch = "wasm32",
257 target_vendor = "unknown",
258 target_os = "unknown"
259 ))]
260 use wasm_bindgen_test::*;
261
262 impl ThreadLocalCtxtFrame {
263 fn props(&self) -> HashMap<Str<'static>, ThreadLocalValue> {
264 self.props
265 .as_ref()
266 .map(|props| (**props).clone())
267 .unwrap_or_default()
268 }
269 }
270
271 #[test]
272 fn can_inline() {
273 use std::mem;
274
275 union RawErasedFrame {
277 _data: *mut (),
278 _inline: mem::MaybeUninit<[usize; 2]>,
279 }
280
281 assert!(
282 mem::size_of::<ThreadLocalCtxt>() <= mem::size_of::<RawErasedFrame>()
283 && mem::align_of::<ThreadLocalCtxt>() <= mem::align_of::<RawErasedFrame>()
284 );
285 }
286
287 #[test]
288 #[cfg_attr(
289 all(
290 target_arch = "wasm32",
291 target_vendor = "unknown",
292 target_os = "unknown"
293 ),
294 wasm_bindgen_test
295 )]
296 fn push_props() {
297 let ctxt = ThreadLocalCtxt::new();
298
299 ctxt.clone().with_current(|props| {
300 assert_eq!(0, props.props().len());
301 });
302
303 let mut frame = ctxt.clone().open_push(("a", 1));
304
305 assert_eq!(1, frame.props().len());
306 ctxt.clone().with_current(|props| {
307 assert_eq!(0, props.props().len());
308 });
309
310 ctxt.clone().enter(&mut frame);
311
312 assert_eq!(0, frame.props().len());
313
314 let mut inner = ctxt.clone().open_push([("b", 1), ("a", 2)]);
315
316 ctxt.clone().with_current(|props| {
317 let props = props.props();
318
319 assert_eq!(1, props.len());
320 assert_eq!(1, props["a"].to_value().cast::<i32>().unwrap());
321 });
322
323 ctxt.clone().enter(&mut inner);
324
325 ctxt.clone().with_current(|props| {
326 let props = props.props();
327
328 assert_eq!(2, props.len());
329 assert_eq!(2, props["a"].to_value().cast::<i32>().unwrap());
330 assert_eq!(1, props["b"].to_value().cast::<i32>().unwrap());
331 });
332
333 ctxt.clone().exit(&mut inner);
334
335 ctxt.clone().exit(&mut frame);
336
337 assert_eq!(1, frame.props().len());
338 ctxt.clone().with_current(|props| {
339 assert_eq!(0, props.props().len());
340 });
341 }
342
343 #[test]
344 fn dedup() {
345 let ctxt = ThreadLocalCtxt::new();
346
347 ctxt.clone().with_current(|props| {
348 assert_eq!(0, props.props().len());
349 });
350
351 let mut frame = ctxt.clone().open_push([("a", 1), ("a", 2)]);
352
353 assert_eq!(
354 "1",
355 frame.props.as_ref().unwrap()["a"].to_value().to_string()
356 );
357
358 ctxt.enter(&mut frame);
359
360 let inner = ctxt.clone().open_push([("a", 2), ("a", 3)]);
361
362 assert_eq!(
363 "2",
364 inner.props.as_ref().unwrap()["a"].to_value().to_string()
365 );
366
367 ctxt.exit(&mut frame);
368 }
369
370 #[test]
371 fn out_of_order_enter_exit() {
372 let ctxt = ThreadLocalCtxt::new();
373
374 let mut a = ctxt.open_push(("a", 1));
375
376 ctxt.enter(&mut a);
377
378 let mut b = ctxt.open_push(("b", 1));
379
380 ctxt.enter(&mut b);
381
382 ctxt.exit(&mut a);
385 ctxt.exit(&mut b);
386 }
387
388 #[test]
389 fn isolation() {
390 let ctxt_a = ThreadLocalCtxt::new();
391
392 let ctxt_b = ThreadLocalCtxt::new();
393
394 let mut frame_a = ctxt_a.open_push(("a", 1));
395
396 ctxt_a.enter(&mut frame_a);
397
398 ctxt_a.with_current(|props| {
399 assert_eq!(1, props.props().len());
400 });
401
402 ctxt_b.with_current(|props| {
403 assert_eq!(0, props.props().len());
404 });
405
406 ctxt_a.exit(&mut frame_a);
407 }
408
409 #[test]
410 #[cfg(not(target_arch = "wasm32"))]
411 fn frame_thread_propagation() {
412 use std::thread;
413
414 let ctxt = ThreadLocalCtxt::new();
415
416 let mut frame = ctxt.open_push(("a", 1));
417
418 ctxt.enter(&mut frame);
419
420 thread::spawn({
421 let ctxt = ctxt.clone();
422
423 move || {
424 ctxt.with_current(|props| {
425 assert_eq!(0, props.props().len());
426 });
427 }
428 })
429 .join()
430 .unwrap();
431
432 let mut current = ctxt.with_current(|props| props.clone());
433
434 thread::spawn({
435 let ctxt = ctxt.clone();
436
437 move || {
438 ctxt.enter(&mut current);
439
440 ctxt.with_current(|props| {
441 assert_eq!(1, props.props().len());
442 });
443
444 ctxt.exit(&mut current);
445 }
446 })
447 .join()
448 .unwrap();
449 }
450}