es_entity/context/
mod.rs

1//! Thread-local system for adding context data to persisted events.
2//!
3//! This module provides a context propagation system for event sourcing that allows
4//! attaching metadata (like request IDs, user IDs, or audit information) to events
5//! as they are created and persisted to the database.
6//!
7//! # Core Components
8//!
9//! - [`EventContext`]: Thread-local context manager (`!Send`) that maintains a stack
10//!   of contexts within a single thread
11//! - [`ContextData`]: Immutable, thread-safe (`Send`) snapshot of context data that
12//!   can be passed across thread boundaries
13//! - [`WithEventContext`]: Extension trait for `Future` types to propagate context
14//!   across async boundaries
15//!
16//! # Usage Patterns
17//!
18//! ## Same Thread Context
19//! ```rust
20//! use es_entity::context::EventContext;
21//!
22//! let mut ctx = EventContext::current();
23//! ctx.insert("request_id", &"req-123").unwrap();
24//!
25//! // Fork for isolated scope
26//! {
27//!     let mut child = EventContext::fork();
28//!     child.insert("operation", &"update").unwrap();
29//!     // Both request_id and operation are available
30//! }
31//! // Only request_id remains in parent
32//! ```
33//!
34//! ## Async Task Context
35//! ```rust
36//! use es_entity::context::{EventContext, WithEventContext};
37//!
38//! async fn spawn_with_context() {
39//!     let mut ctx = EventContext::current();
40//!     ctx.insert("user_id", &"user-456").unwrap();
41//!
42//!     let data = ctx.data();
43//!     tokio::spawn(async move {
44//!         // Context is available in spawned task
45//!         let ctx = EventContext::current();
46//!         // Has user_id from parent
47//!     }.with_event_context(data)).await.unwrap();
48//! }
49//! ```
50//!
51//! ## Cross-Thread Context
52//! ```rust
53//! use es_entity::context::EventContext;
54//!
55//! let mut ctx = EventContext::current();
56//! ctx.insert("trace_id", &"trace-789").unwrap();
57//! let data = ctx.data();
58//!
59//! std::thread::spawn(move || {
60//!     let ctx = EventContext::seed(data);
61//!     // New thread has trace_id
62//! });
63//! ```
64//!
65//! # Database Integration
66//!
67//! When events are persisted using repositories with `event_ctx: true`, the current
68//! context is automatically serialized to JSON and stored in a `context` column
69//! alongside the event data, enabling comprehensive audit trails and debugging.
70
71mod with_event_context;
72
73use serde::Serialize;
74
75use std::{cell::RefCell, rc::Rc};
76
77pub use with_event_context::*;
78
79/// Immutable context data that can be safely shared across thread boundaries.
80///
81/// This struct holds key-value pairs of context information that gets attached
82/// to events when they are persisted. It uses an immutable HashMap internally
83/// for efficient cloning and thread-safe sharing of data snapshots.
84///
85/// `ContextData` is `Send` and can be passed between threads, unlike [`EventContext`]
86/// which is thread-local. This makes it suitable for transferring context across
87/// async boundaries via the [`WithEventContext`] trait.
88#[derive(Debug, Clone, Serialize)]
89#[serde(transparent)]
90pub struct ContextData(im::HashMap<&'static str, serde_json::Value>);
91
92impl ContextData {
93    fn new() -> Self {
94        Self(im::HashMap::new())
95    }
96
97    fn insert(&mut self, key: &'static str, value: serde_json::Value) {
98        self.0 = self.0.update(key, value);
99    }
100}
101
102struct StackEntry {
103    id: Rc<()>,
104    data: ContextData,
105}
106
107thread_local! {
108    static CONTEXT_STACK: RefCell<Vec<StackEntry>> = const { RefCell::new(Vec::new()) };
109}
110
111/// Thread-local event context for tracking metadata throughout event sourcing operations.
112///
113/// `EventContext` provides a way to attach contextual information (like request IDs, audit info,
114/// or operation metadata) to events as they are created and persisted. The context is managed
115/// as a thread-local stack, allowing for nested contexts within the same thread.
116///
117/// # Thread Safety
118///
119/// This struct is deliberately `!Send` to ensure thread-local safety. It uses `Rc` for reference
120/// counting which is not thread-safe. For propagating context across async boundaries or threads,
121/// use the [`WithEventContext`] trait which safely transfers context data.
122///
123/// # Usage Patterns
124///
125/// - **Same thread**: Use [`fork()`](Self::fork) to create isolated child contexts
126/// - **Async tasks**: Use [`with_event_context()`](WithEventContext::with_event_context) from the [`WithEventContext`] trait
127/// - **New threads**: Use [`seed()`](Self::seed) with data from [`data()`](Self::data) to transfer context
128///
129/// # Examples
130///
131/// ```rust
132/// use es_entity::context::EventContext;
133///
134/// // Create or get current context
135/// let mut ctx = EventContext::current();
136/// ctx.insert("user_id", &"123").unwrap();
137///
138/// // Fork for isolated scope
139/// {
140///     let mut child = EventContext::fork();
141///     child.insert("operation", &"update").unwrap();
142///     // Both user_id and operation are available here
143/// }
144/// // Only user_id remains in parent context
145/// ```
146pub struct EventContext {
147    id: Rc<()>,
148}
149
150impl Drop for EventContext {
151    fn drop(&mut self) {
152        // If strong_count is 2, it means this EventContext + one StackEntry reference
153        if Rc::strong_count(&self.id) == 2 {
154            CONTEXT_STACK.with(|c| {
155                let mut stack = c.borrow_mut();
156                for i in (0..stack.len()).rev() {
157                    if Rc::ptr_eq(&stack[i].id, &self.id) {
158                        stack.remove(i);
159                        break;
160                    }
161                }
162            });
163        }
164    }
165}
166
167impl EventContext {
168    /// Gets the current event context or creates a new one if none exists.
169    ///
170    /// This function is thread-local and will return a handle to the topmost context
171    /// on the current thread's context stack. If no context exists, it will create
172    /// a new empty context and push it onto the stack.
173    ///
174    /// # Examples
175    ///
176    /// ```rust
177    /// use es_entity::context::EventContext;
178    ///
179    /// let ctx = EventContext::current();
180    /// // Context is now available for the current thread
181    /// ```
182    pub fn current() -> Self {
183        CONTEXT_STACK.with(|c| {
184            let mut stack = c.borrow_mut();
185            if let Some(last) = stack.last() {
186                return EventContext {
187                    id: last.id.clone(),
188                };
189            }
190
191            let id = Rc::new(());
192            let data = ContextData::new();
193            stack.push(StackEntry {
194                id: id.clone(),
195                data,
196            });
197
198            EventContext { id }
199        })
200    }
201
202    /// Creates a new event context seeded with the provided data.
203    ///
204    /// This creates a completely new context stack entry with the given context data,
205    /// independent of any existing context. This is useful for starting fresh contexts
206    /// in new threads or async tasks.
207    ///
208    /// # Arguments
209    ///
210    /// * `data` - The initial context data for the new context
211    ///
212    /// # Examples
213    ///
214    /// ```rust
215    /// use es_entity::context::{EventContext, ContextData};
216    ///
217    /// let data = EventContext::current().data();
218    /// let new_ctx = EventContext::seed(data);
219    /// // new_ctx now has its own independent context stack
220    /// ```
221    pub fn seed(data: ContextData) -> Self {
222        CONTEXT_STACK.with(|c| {
223            let mut stack = c.borrow_mut();
224            let id = Rc::new(());
225            stack.push(StackEntry {
226                id: id.clone(),
227                data,
228            });
229
230            EventContext { id }
231        })
232    }
233
234    /// Creates a new isolated context that inherits data from the current context.
235    ///
236    /// This method creates a child context that starts with a copy of the current
237    /// context's data. Changes made to the forked context will not affect the parent
238    /// context, and when the forked context is dropped, the parent context remains
239    /// unchanged. This is useful for creating isolated scopes within the same thread.
240    ///
241    /// # Examples
242    ///
243    /// ```rust
244    /// use es_entity::context::EventContext;
245    ///
246    /// let mut parent = EventContext::current();
247    /// parent.insert("shared", &"value").unwrap();
248    ///
249    /// {
250    ///     let mut child = EventContext::fork();
251    ///     child.insert("child_only", &"data").unwrap();
252    ///     // child context has both "shared" and "child_only"
253    /// }
254    /// // parent context only has "shared" - "child_only" is gone
255    /// ```
256    pub fn fork() -> Self {
257        let current = Self::current();
258        let data = current.data();
259        Self::seed(data)
260    }
261
262    /// Inserts a key-value pair into the current context.
263    ///
264    /// The value will be serialized to JSON and stored in the context data.
265    /// This data will be available to all code running within this context
266    /// and any child contexts created via `fork()`.
267    ///
268    /// # Arguments
269    ///
270    /// * `key` - A static string key to identify the value
271    /// * `value` - Any serializable value to store in the context
272    ///
273    /// # Returns
274    ///
275    /// Returns `Ok(())` on success or a `serde_json::Error` if serialization fails.
276    ///
277    /// # Examples
278    ///
279    /// ```rust
280    /// use es_entity::context::EventContext;
281    ///
282    /// let mut ctx = EventContext::current();
283    /// ctx.insert("user_id", &"12345").unwrap();
284    /// ctx.insert("operation", &"transfer").unwrap();
285    /// ```
286    pub fn insert<T: Serialize>(
287        &mut self,
288        key: &'static str,
289        value: &T,
290    ) -> Result<(), serde_json::Error> {
291        let json_value = serde_json::to_value(value)?;
292
293        CONTEXT_STACK.with(|c| {
294            let mut stack = c.borrow_mut();
295            for entry in stack.iter_mut().rev() {
296                if Rc::ptr_eq(&entry.id, &self.id) {
297                    entry.data.insert(key, json_value);
298                    return;
299                }
300            }
301            panic!("EventContext missing on CONTEXT_STACK")
302        });
303
304        Ok(())
305    }
306
307    /// Returns a copy of the current context data.
308    ///
309    /// This method returns a snapshot of all key-value pairs stored in this context.
310    /// The returned [`ContextData`] can be used to seed new contexts or passed to
311    /// async tasks to maintain context across thread boundaries.
312    ///
313    /// # Examples
314    ///
315    /// ```rust
316    /// use es_entity::context::EventContext;
317    ///
318    /// let mut ctx = EventContext::current();
319    /// ctx.insert("request_id", &"abc123").unwrap();
320    ///
321    /// let data = ctx.data();
322    /// // data now contains a copy of the context with request_id
323    /// ```
324    pub fn data(&self) -> ContextData {
325        CONTEXT_STACK.with(|c| {
326            let stack = c.borrow();
327            for entry in stack.iter().rev() {
328                if Rc::ptr_eq(&entry.id, &self.id) {
329                    return entry.data.clone();
330                }
331            }
332            panic!("EventContext missing on CONTEXT_STACK")
333        })
334    }
335
336    /// Serializes the current context data to JSON.
337    ///
338    /// This method is primarily used internally by the event persistence system
339    /// to store context data alongside events in the database. It converts all
340    /// context key-value pairs into a single JSON object.
341    ///
342    /// # Returns
343    ///
344    /// Returns a `serde_json::Value` containing all context data, or an error
345    /// if serialization fails.
346    ///
347    /// # Examples
348    ///
349    /// ```rust
350    /// use es_entity::context::EventContext;
351    ///
352    /// let mut ctx = EventContext::current();
353    /// ctx.insert("user_id", &"12345").unwrap();
354    ///
355    /// let json = ctx.as_json().unwrap();
356    /// // json is now: {"user_id": "12345"}
357    /// ```
358    pub fn as_json(&self) -> Result<serde_json::Value, serde_json::Error> {
359        let data = self.data();
360        serde_json::to_value(&data)
361    }
362}
363
364#[cfg(test)]
365mod tests {
366    use super::*;
367
368    fn stack_depth() -> usize {
369        CONTEXT_STACK.with(|c| c.borrow().len())
370    }
371
372    fn current_json() -> serde_json::Value {
373        EventContext::current().as_json().unwrap()
374    }
375
376    #[test]
377    fn assert_stack_depth() {
378        fn assert_inner() {
379            let _ctx = EventContext::current();
380            assert_eq!(stack_depth(), 1);
381        }
382        assert_eq!(stack_depth(), 0);
383        {
384            let _ctx = EventContext::current();
385            assert_eq!(stack_depth(), 1);
386            assert_inner();
387        }
388        assert_eq!(stack_depth(), 0);
389    }
390
391    #[test]
392    fn insert() {
393        fn insert_inner(value: &serde_json::Value) {
394            let mut ctx = EventContext::current();
395            ctx.insert("new_data", &value).unwrap();
396            assert_eq!(
397                current_json(),
398                serde_json::json!({ "data": value, "new_data": value})
399            );
400        }
401
402        let mut ctx = EventContext::current();
403        assert_eq!(current_json(), serde_json::json!({}));
404        let value = serde_json::json!({ "hello": "world" });
405        ctx.insert("data", &value).unwrap();
406        assert_eq!(current_json(), serde_json::json!({ "data": value }));
407        insert_inner(&value);
408        assert_eq!(
409            current_json(),
410            serde_json::json!({ "data": value, "new_data": value})
411        );
412        let new_value = serde_json::json!({ "hello": "new_world" });
413        ctx.insert("data", &new_value).unwrap();
414        assert_eq!(
415            current_json(),
416            serde_json::json!({ "data": new_value, "new_data": value})
417        );
418    }
419
420    #[test]
421    fn thread_isolation() {
422        let mut ctx = EventContext::current();
423        let value = serde_json::json!({ "main": "thread" });
424        ctx.insert("data", &value).unwrap();
425        assert_eq!(stack_depth(), 1);
426
427        let ctx_data = ctx.data();
428        let handle = std::thread::spawn(move || {
429            assert_eq!(stack_depth(), 0);
430            let mut ctx = EventContext::seed(ctx_data);
431            assert_eq!(stack_depth(), 1);
432            ctx.insert("thread", &serde_json::json!("local")).unwrap();
433            assert_eq!(
434                current_json(),
435                serde_json::json!({ "data": { "main": "thread" }, "thread": "local" }),
436            );
437        });
438
439        handle.join().unwrap();
440        assert_eq!(current_json(), serde_json::json!({ "data": value }));
441    }
442
443    #[tokio::test]
444    async fn async_context() {
445        async fn inner_async() {
446            let mut ctx = EventContext::current();
447            ctx.insert("async_inner", &serde_json::json!("value"))
448                .unwrap();
449            assert_eq!(
450                current_json(),
451                serde_json::json!({ "async_data": { "test": "async" }, "async_inner": "value" })
452            );
453        }
454
455        let mut ctx = EventContext::current();
456        assert_eq!(current_json(), serde_json::json!({}));
457
458        let value = serde_json::json!({ "test": "async" });
459        ctx.insert("async_data", &value).unwrap();
460        assert_eq!(current_json(), serde_json::json!({ "async_data": value }));
461
462        inner_async().await;
463
464        assert_eq!(
465            current_json(),
466            serde_json::json!({ "async_data": value, "async_inner": "value" })
467        );
468    }
469
470    #[test]
471    fn fork() {
472        let mut ctx = EventContext::current();
473        ctx.insert("original", &serde_json::json!("value")).unwrap();
474        assert_eq!(stack_depth(), 1);
475        assert_eq!(current_json(), serde_json::json!({ "original": "value" }));
476
477        let mut forked = EventContext::fork();
478        assert_eq!(stack_depth(), 2);
479        assert_eq!(current_json(), serde_json::json!({ "original": "value" }));
480
481        forked.insert("forked", &serde_json::json!("data")).unwrap();
482        assert_eq!(
483            current_json(),
484            serde_json::json!({ "original": "value", "forked": "data" })
485        );
486
487        drop(forked);
488
489        assert_eq!(stack_depth(), 1);
490        assert_eq!(current_json(), serde_json::json!({ "original": "value" }));
491    }
492
493    #[tokio::test]
494    async fn with_event_context_spawned() {
495        let mut ctx = EventContext::current();
496        ctx.insert("parent", &serde_json::json!("context")).unwrap();
497
498        let handle = tokio::spawn(
499            async {
500                assert_eq!(stack_depth(), 2);
501
502                EventContext::current()
503                    .insert("spawned", &serde_json::json!("value"))
504                    .unwrap();
505
506                assert_eq!(
507                    current_json(),
508                    serde_json::json!({ "parent": "context", "spawned": "value" })
509                );
510                tokio::task::yield_now().await;
511                current_json()
512            }
513            .with_event_context(ctx.data()),
514        );
515
516        let result = handle.await.unwrap();
517        assert_eq!(
518            result,
519            serde_json::json!({ "parent": "context", "spawned": "value" })
520        );
521
522        assert_eq!(current_json(), serde_json::json!({ "parent": "context" }));
523    }
524
525    #[tokio::test(flavor = "multi_thread")]
526    async fn with_event_context_spawned_multi_thread() {
527        let mut ctx = EventContext::current();
528        ctx.insert("parent", &serde_json::json!("context")).unwrap();
529
530        let handle = tokio::spawn(
531            async {
532                assert_eq!(stack_depth(), 1);
533
534                EventContext::current()
535                    .insert("spawned", &serde_json::json!("value"))
536                    .unwrap();
537
538                assert_eq!(
539                    current_json(),
540                    serde_json::json!({ "parent": "context", "spawned": "value" })
541                );
542                let data = EventContext::current().data();
543                tokio::task::yield_now().with_event_context(data).await;
544                current_json()
545            }
546            .with_event_context(ctx.data()),
547        );
548
549        let result = handle.await.unwrap();
550        assert_eq!(
551            result,
552            serde_json::json!({ "parent": "context", "spawned": "value" })
553        );
554
555        assert_eq!(current_json(), serde_json::json!({ "parent": "context" }));
556    }
557}