es_entity/context/
mod.rs

1//! Thread-local system for adding context data to persisted events.
2//!
3//! This module provides a context propagation system for event sourcing that allows
4//! attaching metadata (like request IDs, user IDs, or audit information) to events
5//! as they are created and persisted to the database.
6//!
7//! # Core Components
8//!
9//! - [`EventContext`]: Thread-local context manager (`!Send`) that maintains a stack
10//!   of contexts within a single thread
11//! - [`ContextData`]: Immutable, thread-safe (`Send`) snapshot of context data that
12//!   can be passed across thread boundaries
13//! - [`WithEventContext`]: Extension trait for `Future` types to propagate context
14//!   across async boundaries
15//!
16//! # Usage Patterns
17//!
18//! ## Same Thread Context
19//! ```rust
20//! use es_entity::context::EventContext;
21//!
22//! let mut ctx = EventContext::current();
23//! ctx.insert("request_id", &"req-123").unwrap();
24//!
25//! // Fork for isolated scope
26//! {
27//!     let mut child = EventContext::fork();
28//!     child.insert("operation", &"update").unwrap();
29//!     // Both request_id and operation are available
30//! }
31//! // Only request_id remains in parent
32//! ```
33//!
34//! ## Async Task Context
35//! ```rust
36//! use es_entity::context::{EventContext, WithEventContext};
37//!
38//! async fn spawn_with_context() {
39//!     let mut ctx = EventContext::current();
40//!     ctx.insert("user_id", &"user-456").unwrap();
41//!
42//!     let data = ctx.data();
43//!     tokio::spawn(async move {
44//!         // Context is available in spawned task
45//!         let ctx = EventContext::current();
46//!         // Has user_id from parent
47//!     }.with_event_context(data)).await.unwrap();
48//! }
49//! ```
50//!
51//! ## Cross-Thread Context
52//! ```rust
53//! use es_entity::context::EventContext;
54//!
55//! let mut ctx = EventContext::current();
56//! ctx.insert("trace_id", &"trace-789").unwrap();
57//! let data = ctx.data();
58//!
59//! std::thread::spawn(move || {
60//!     let ctx = EventContext::seed(data);
61//!     // New thread has trace_id
62//! });
63//! ```
64//!
65//! # Database Integration
66//!
67//! When events are persisted using repositories with `event_context = true`, the current
68//! context is automatically serialized to JSON and stored in a `context` column
69//! alongside the event data, enabling comprehensive audit trails and debugging.
70
71mod sqlx;
72#[cfg(feature = "tracing-context")]
73mod tracing;
74mod with_event_context;
75
76use serde::{Deserialize, Serialize};
77
78use std::{borrow::Cow, cell::RefCell, rc::Rc};
79
80#[cfg(feature = "tracing-context")]
81pub use tracing::*;
82pub use with_event_context::*;
83
84/// Immutable context data that can be safely shared across thread boundaries.
85///
86/// This struct holds key-value pairs of context information that gets attached
87/// to events when they are persisted. It uses an immutable HashMap internally
88/// for efficient cloning and thread-safe sharing of data snapshots.
89///
90/// `ContextData` is `Send` and can be passed between threads, unlike [`EventContext`]
91/// which is thread-local. This makes it suitable for transferring context across
92/// async boundaries via the [`WithEventContext`] trait.
93#[derive(Debug, Clone, Serialize, Deserialize)]
94#[serde(transparent)]
95pub struct ContextData(im::HashMap<Cow<'static, str>, serde_json::Value>);
96
97impl ContextData {
98    fn new() -> Self {
99        Self(im::HashMap::new())
100    }
101
102    fn insert(&mut self, key: &'static str, value: serde_json::Value) {
103        self.0 = self.0.update(Cow::Borrowed(key), value);
104    }
105
106    #[cfg(feature = "tracing-context")]
107    pub(crate) fn with_tracing_info(mut self) -> Self {
108        let tracing = TracingContext::current();
109        self.insert(
110            "tracing",
111            serde_json::to_value(&tracing).expect("Could not inject tracing"),
112        );
113        self
114    }
115
116    pub fn lookup<T: serde::de::DeserializeOwned>(
117        &self,
118        key: &'static str,
119    ) -> Result<Option<T>, serde_json::Error> {
120        let Some(val) = self.0.get(key) else {
121            return Ok(None);
122        };
123        serde_json::from_value(val.clone()).map(Some)
124    }
125}
126
127struct StackEntry {
128    id: Rc<()>,
129    data: ContextData,
130}
131
132thread_local! {
133    static CONTEXT_STACK: RefCell<Vec<StackEntry>> = const { RefCell::new(Vec::new()) };
134}
135
136/// Thread-local event context for tracking metadata throughout event sourcing operations.
137///
138/// `EventContext` provides a way to attach contextual information (like request IDs, audit info,
139/// or operation metadata) to events as they are created and persisted. The context is managed
140/// as a thread-local stack, allowing for nested contexts within the same thread.
141///
142/// # Thread Safety
143///
144/// This struct is deliberately `!Send` to ensure thread-local safety. It uses `Rc` for reference
145/// counting which is not thread-safe. For propagating context across async boundaries or threads,
146/// use the [`WithEventContext`] trait which safely transfers context data.
147///
148/// # Usage Patterns
149///
150/// - **Same thread**: Use [`fork()`](Self::fork) to create isolated child contexts
151/// - **Async tasks**: Use [`with_event_context()`](WithEventContext::with_event_context) from the [`WithEventContext`] trait
152/// - **New threads**: Use [`seed()`](Self::seed) with data from [`data()`](Self::data) to transfer context
153///
154/// # Examples
155///
156/// ```rust
157/// use es_entity::context::EventContext;
158///
159/// // Create or get current context
160/// let mut ctx = EventContext::current();
161/// ctx.insert("user_id", &"123").unwrap();
162///
163/// // Fork for isolated scope
164/// {
165///     let mut child = EventContext::fork();
166///     child.insert("operation", &"update").unwrap();
167///     // Both user_id and operation are available here
168/// }
169/// // Only user_id remains in parent context
170/// ```
171pub struct EventContext {
172    id: Rc<()>,
173}
174
175impl Drop for EventContext {
176    fn drop(&mut self) {
177        // If strong_count is 2, it means this EventContext + one StackEntry reference
178        if Rc::strong_count(&self.id) == 2 {
179            CONTEXT_STACK.with(|c| {
180                let mut stack = c.borrow_mut();
181                for i in (0..stack.len()).rev() {
182                    if Rc::ptr_eq(&stack[i].id, &self.id) {
183                        stack.remove(i);
184                        break;
185                    }
186                }
187            });
188        }
189    }
190}
191
192impl EventContext {
193    /// Gets the current event context or creates a new one if none exists.
194    ///
195    /// This function is thread-local and will return a handle to the topmost context
196    /// on the current thread's context stack. If no context exists, it will create
197    /// a new empty context and push it onto the stack.
198    ///
199    /// # Examples
200    ///
201    /// ```rust
202    /// use es_entity::context::EventContext;
203    ///
204    /// let ctx = EventContext::current();
205    /// // Context is now available for the current thread
206    /// ```
207    pub fn current() -> Self {
208        CONTEXT_STACK.with(|c| {
209            let mut stack = c.borrow_mut();
210            if let Some(last) = stack.last() {
211                return EventContext {
212                    id: last.id.clone(),
213                };
214            }
215
216            let id = Rc::new(());
217            let data = ContextData::new();
218            stack.push(StackEntry {
219                id: id.clone(),
220                data,
221            });
222
223            EventContext { id }
224        })
225    }
226
227    /// Creates a new event context seeded with the provided data.
228    ///
229    /// This creates a completely new context stack entry with the given context data,
230    /// independent of any existing context. This is useful for starting fresh contexts
231    /// in new threads or async tasks.
232    ///
233    /// # Arguments
234    ///
235    /// * `data` - The initial context data for the new context
236    ///
237    /// # Examples
238    ///
239    /// ```rust
240    /// use es_entity::context::{EventContext, ContextData};
241    ///
242    /// let data = EventContext::current().data();
243    /// let new_ctx = EventContext::seed(data);
244    /// // new_ctx now has its own independent context stack
245    /// ```
246    pub fn seed(data: ContextData) -> Self {
247        CONTEXT_STACK.with(|c| {
248            let mut stack = c.borrow_mut();
249            let id = Rc::new(());
250            stack.push(StackEntry {
251                id: id.clone(),
252                data,
253            });
254
255            EventContext { id }
256        })
257    }
258
259    /// Creates a new isolated context that inherits data from the current context.
260    ///
261    /// This method creates a child context that starts with a copy of the current
262    /// context's data. Changes made to the forked context will not affect the parent
263    /// context, and when the forked context is dropped, the parent context remains
264    /// unchanged. This is useful for creating isolated scopes within the same thread.
265    ///
266    /// # Examples
267    ///
268    /// ```rust
269    /// use es_entity::context::EventContext;
270    ///
271    /// let mut parent = EventContext::current();
272    /// parent.insert("shared", &"value").unwrap();
273    ///
274    /// {
275    ///     let mut child = EventContext::fork();
276    ///     child.insert("child_only", &"data").unwrap();
277    ///     // child context has both "shared" and "child_only"
278    /// }
279    /// // parent context only has "shared" - "child_only" is gone
280    /// ```
281    pub fn fork() -> Self {
282        let current = Self::current();
283        let data = current.data();
284        Self::seed(data)
285    }
286
287    /// Inserts a key-value pair into the current context.
288    ///
289    /// The value will be serialized to JSON and stored in the context data.
290    /// This data will be available to all code running within this context
291    /// and any child contexts created via `fork()`.
292    ///
293    /// # Arguments
294    ///
295    /// * `key` - A static string key to identify the value
296    /// * `value` - Any serializable value to store in the context
297    ///
298    /// # Returns
299    ///
300    /// Returns `Ok(())` on success or a `serde_json::Error` if serialization fails.
301    ///
302    /// # Examples
303    ///
304    /// ```rust
305    /// use es_entity::context::EventContext;
306    ///
307    /// let mut ctx = EventContext::current();
308    /// ctx.insert("user_id", &"12345").unwrap();
309    /// ctx.insert("operation", &"transfer").unwrap();
310    /// ```
311    pub fn insert<T: Serialize>(
312        &mut self,
313        key: &'static str,
314        value: &T,
315    ) -> Result<(), serde_json::Error> {
316        let json_value = serde_json::to_value(value)?;
317
318        CONTEXT_STACK.with(|c| {
319            let mut stack = c.borrow_mut();
320            for entry in stack.iter_mut().rev() {
321                if Rc::ptr_eq(&entry.id, &self.id) {
322                    entry.data.insert(key, json_value);
323                    return;
324                }
325            }
326            panic!("EventContext missing on CONTEXT_STACK")
327        });
328
329        Ok(())
330    }
331
332    /// Returns a copy of the current context data.
333    ///
334    /// This method returns a snapshot of all key-value pairs stored in this context.
335    /// The returned [`ContextData`] can be used to seed new contexts or passed to
336    /// async tasks to maintain context across thread boundaries.
337    ///
338    /// # Examples
339    ///
340    /// ```rust
341    /// use es_entity::context::EventContext;
342    ///
343    /// let mut ctx = EventContext::current();
344    /// ctx.insert("request_id", &"abc123").unwrap();
345    ///
346    /// let data = ctx.data();
347    /// // data now contains a copy of the context with request_id
348    /// ```
349    pub fn data(&self) -> ContextData {
350        CONTEXT_STACK.with(|c| {
351            let stack = c.borrow();
352            for entry in stack.iter().rev() {
353                if Rc::ptr_eq(&entry.id, &self.id) {
354                    return entry.data.clone();
355                }
356            }
357            panic!("EventContext missing on CONTEXT_STACK")
358        })
359    }
360
361    #[allow(unused_mut)]
362    pub(crate) fn data_for_storing() -> ContextData {
363        let mut data = Self::current().data();
364        #[cfg(feature = "tracing-context")]
365        {
366            data = data.with_tracing_info();
367        }
368        data
369    }
370}
371
372#[cfg(test)]
373mod tests {
374    use super::*;
375
376    fn stack_depth() -> usize {
377        CONTEXT_STACK.with(|c| c.borrow().len())
378    }
379
380    fn current_json() -> serde_json::Value {
381        serde_json::to_value(EventContext::current().data()).unwrap()
382    }
383
384    #[test]
385    fn assert_stack_depth() {
386        fn assert_inner() {
387            let _ctx = EventContext::current();
388            assert_eq!(stack_depth(), 1);
389        }
390        assert_eq!(stack_depth(), 0);
391        {
392            let _ctx = EventContext::current();
393            assert_eq!(stack_depth(), 1);
394            assert_inner();
395        }
396        assert_eq!(stack_depth(), 0);
397    }
398
399    #[test]
400    fn insert() {
401        fn insert_inner(value: &serde_json::Value) {
402            let mut ctx = EventContext::current();
403            ctx.insert("new_data", &value).unwrap();
404            assert_eq!(
405                current_json(),
406                serde_json::json!({ "data": value, "new_data": value})
407            );
408        }
409
410        let mut ctx = EventContext::current();
411        assert_eq!(current_json(), serde_json::json!({}));
412        let value = serde_json::json!({ "hello": "world" });
413        ctx.insert("data", &value).unwrap();
414        assert_eq!(current_json(), serde_json::json!({ "data": value }));
415        insert_inner(&value);
416        assert_eq!(
417            current_json(),
418            serde_json::json!({ "data": value, "new_data": value})
419        );
420        let new_value = serde_json::json!({ "hello": "new_world" });
421        ctx.insert("data", &new_value).unwrap();
422        assert_eq!(
423            current_json(),
424            serde_json::json!({ "data": new_value, "new_data": value})
425        );
426    }
427
428    #[test]
429    fn thread_isolation() {
430        let mut ctx = EventContext::current();
431        let value = serde_json::json!({ "main": "thread" });
432        ctx.insert("data", &value).unwrap();
433        assert_eq!(stack_depth(), 1);
434
435        let ctx_data = ctx.data();
436        let handle = std::thread::spawn(move || {
437            assert_eq!(stack_depth(), 0);
438            let mut ctx = EventContext::seed(ctx_data);
439            assert_eq!(stack_depth(), 1);
440            ctx.insert("thread", &serde_json::json!("local")).unwrap();
441            assert_eq!(
442                current_json(),
443                serde_json::json!({ "data": { "main": "thread" }, "thread": "local" }),
444            );
445        });
446
447        handle.join().unwrap();
448        assert_eq!(current_json(), serde_json::json!({ "data": value }));
449    }
450
451    #[tokio::test]
452    async fn async_context() {
453        async fn inner_async() {
454            let mut ctx = EventContext::current();
455            ctx.insert("async_inner", &serde_json::json!("value"))
456                .unwrap();
457            assert_eq!(
458                current_json(),
459                serde_json::json!({ "async_data": { "test": "async" }, "async_inner": "value" })
460            );
461        }
462
463        let mut ctx = EventContext::current();
464        assert_eq!(current_json(), serde_json::json!({}));
465
466        let value = serde_json::json!({ "test": "async" });
467        ctx.insert("async_data", &value).unwrap();
468        assert_eq!(current_json(), serde_json::json!({ "async_data": value }));
469
470        inner_async().await;
471
472        assert_eq!(
473            current_json(),
474            serde_json::json!({ "async_data": value, "async_inner": "value" })
475        );
476    }
477
478    #[test]
479    fn fork() {
480        let mut ctx = EventContext::current();
481        ctx.insert("original", &serde_json::json!("value")).unwrap();
482        assert_eq!(stack_depth(), 1);
483        assert_eq!(current_json(), serde_json::json!({ "original": "value" }));
484
485        let mut forked = EventContext::fork();
486        assert_eq!(stack_depth(), 2);
487        assert_eq!(current_json(), serde_json::json!({ "original": "value" }));
488
489        forked.insert("forked", &serde_json::json!("data")).unwrap();
490        assert_eq!(
491            current_json(),
492            serde_json::json!({ "original": "value", "forked": "data" })
493        );
494
495        drop(forked);
496
497        assert_eq!(stack_depth(), 1);
498        assert_eq!(current_json(), serde_json::json!({ "original": "value" }));
499    }
500
501    #[tokio::test]
502    async fn with_event_context_spawned() {
503        let mut ctx = EventContext::current();
504        ctx.insert("parent", &serde_json::json!("context")).unwrap();
505
506        let handle = tokio::spawn(
507            async {
508                assert_eq!(stack_depth(), 2);
509
510                EventContext::current()
511                    .insert("spawned", &serde_json::json!("value"))
512                    .unwrap();
513
514                assert_eq!(
515                    current_json(),
516                    serde_json::json!({ "parent": "context", "spawned": "value" })
517                );
518                tokio::task::yield_now().await;
519                current_json()
520            }
521            .with_event_context(ctx.data()),
522        );
523
524        let result = handle.await.unwrap();
525        assert_eq!(
526            result,
527            serde_json::json!({ "parent": "context", "spawned": "value" })
528        );
529
530        assert_eq!(current_json(), serde_json::json!({ "parent": "context" }));
531    }
532
533    #[tokio::test(flavor = "multi_thread")]
534    async fn with_event_context_spawned_multi_thread() {
535        let mut ctx = EventContext::current();
536        ctx.insert("parent", &serde_json::json!("context")).unwrap();
537
538        let handle = tokio::spawn(
539            async {
540                assert_eq!(stack_depth(), 1);
541
542                EventContext::current()
543                    .insert("spawned", &serde_json::json!("value"))
544                    .unwrap();
545
546                assert_eq!(
547                    current_json(),
548                    serde_json::json!({ "parent": "context", "spawned": "value" })
549                );
550                let data = EventContext::current().data();
551                tokio::task::yield_now().with_event_context(data).await;
552                current_json()
553            }
554            .with_event_context(ctx.data()),
555        );
556
557        let result = handle.await.unwrap();
558        assert_eq!(
559            result,
560            serde_json::json!({ "parent": "context", "spawned": "value" })
561        );
562
563        assert_eq!(current_json(), serde_json::json!({ "parent": "context" }));
564    }
565}