es_entity/context/
mod.rs

1//! Thread-local system for adding context data to persisted events.
2//!
3//! This module provides a context propagation system for event sourcing that allows
4//! attaching metadata (like request IDs, user IDs, or audit information) to events
5//! as they are created and persisted to the database.
6//!
7//! # Core Components
8//!
9//! - [`EventContext`]: Thread-local context manager (`!Send`) that maintains a stack
10//!   of contexts within a single thread
11//! - [`ContextData`]: Immutable, thread-safe (`Send`) snapshot of context data that
12//!   can be passed across thread boundaries
13//! - [`WithEventContext`]: Extension trait for `Future` types to propagate context
14//!   across async boundaries
15//!
16//! # Usage Patterns
17//!
18//! ## Same Thread Context
19//! ```rust
20//! use es_entity::context::EventContext;
21//!
22//! let mut ctx = EventContext::current();
23//! ctx.insert("request_id", &"req-123").unwrap();
24//!
25//! // Fork for isolated scope
26//! {
27//!     let mut child = EventContext::fork();
28//!     child.insert("operation", &"update").unwrap();
29//!     // Both request_id and operation are available
30//! }
31//! // Only request_id remains in parent
32//! ```
33//!
34//! ## Async Task Context
35//! ```rust
36//! use es_entity::context::{EventContext, WithEventContext};
37//!
38//! async fn spawn_with_context() {
39//!     let mut ctx = EventContext::current();
40//!     ctx.insert("user_id", &"user-456").unwrap();
41//!
42//!     let data = ctx.data();
43//!     tokio::spawn(async move {
44//!         // Context is available in spawned task
45//!         let ctx = EventContext::current();
46//!         // Has user_id from parent
47//!     }.with_event_context(data)).await.unwrap();
48//! }
49//! ```
50//!
51//! ## Cross-Thread Context
52//! ```rust
53//! use es_entity::context::EventContext;
54//!
55//! let mut ctx = EventContext::current();
56//! ctx.insert("trace_id", &"trace-789").unwrap();
57//! let data = ctx.data();
58//!
59//! std::thread::spawn(move || {
60//!     let ctx = EventContext::seed(data);
61//!     // New thread has trace_id
62//! });
63//! ```
64//!
65//! # Database Integration
66//!
67//! When events are persisted using repositories with `event_context = true`, the current
68//! context is automatically serialized to JSON and stored in a `context` column
69//! alongside the event data, enabling comprehensive audit trails and debugging.
70
71#[cfg(feature = "tracing")]
72mod tracing;
73mod with_event_context;
74
75use serde::Serialize;
76
77use std::{cell::RefCell, rc::Rc};
78
79pub use with_event_context::*;
80
81/// Immutable context data that can be safely shared across thread boundaries.
82///
83/// This struct holds key-value pairs of context information that gets attached
84/// to events when they are persisted. It uses an immutable HashMap internally
85/// for efficient cloning and thread-safe sharing of data snapshots.
86///
87/// `ContextData` is `Send` and can be passed between threads, unlike [`EventContext`]
88/// which is thread-local. This makes it suitable for transferring context across
89/// async boundaries via the [`WithEventContext`] trait.
90#[derive(Debug, Clone, Serialize)]
91#[serde(transparent)]
92pub struct ContextData(im::HashMap<&'static str, serde_json::Value>);
93
94impl ContextData {
95    fn new() -> Self {
96        Self(im::HashMap::new())
97    }
98
99    fn insert(&mut self, key: &'static str, value: serde_json::Value) {
100        self.0 = self.0.update(key, value);
101    }
102}
103
104struct StackEntry {
105    id: Rc<()>,
106    data: ContextData,
107}
108
109thread_local! {
110    static CONTEXT_STACK: RefCell<Vec<StackEntry>> = const { RefCell::new(Vec::new()) };
111}
112
113/// Thread-local event context for tracking metadata throughout event sourcing operations.
114///
115/// `EventContext` provides a way to attach contextual information (like request IDs, audit info,
116/// or operation metadata) to events as they are created and persisted. The context is managed
117/// as a thread-local stack, allowing for nested contexts within the same thread.
118///
119/// # Thread Safety
120///
121/// This struct is deliberately `!Send` to ensure thread-local safety. It uses `Rc` for reference
122/// counting which is not thread-safe. For propagating context across async boundaries or threads,
123/// use the [`WithEventContext`] trait which safely transfers context data.
124///
125/// # Usage Patterns
126///
127/// - **Same thread**: Use [`fork()`](Self::fork) to create isolated child contexts
128/// - **Async tasks**: Use [`with_event_context()`](WithEventContext::with_event_context) from the [`WithEventContext`] trait
129/// - **New threads**: Use [`seed()`](Self::seed) with data from [`data()`](Self::data) to transfer context
130///
131/// # Examples
132///
133/// ```rust
134/// use es_entity::context::EventContext;
135///
136/// // Create or get current context
137/// let mut ctx = EventContext::current();
138/// ctx.insert("user_id", &"123").unwrap();
139///
140/// // Fork for isolated scope
141/// {
142///     let mut child = EventContext::fork();
143///     child.insert("operation", &"update").unwrap();
144///     // Both user_id and operation are available here
145/// }
146/// // Only user_id remains in parent context
147/// ```
148pub struct EventContext {
149    id: Rc<()>,
150}
151
152impl Drop for EventContext {
153    fn drop(&mut self) {
154        // If strong_count is 2, it means this EventContext + one StackEntry reference
155        if Rc::strong_count(&self.id) == 2 {
156            CONTEXT_STACK.with(|c| {
157                let mut stack = c.borrow_mut();
158                for i in (0..stack.len()).rev() {
159                    if Rc::ptr_eq(&stack[i].id, &self.id) {
160                        stack.remove(i);
161                        break;
162                    }
163                }
164            });
165        }
166    }
167}
168
169impl EventContext {
170    /// Gets the current event context or creates a new one if none exists.
171    ///
172    /// This function is thread-local and will return a handle to the topmost context
173    /// on the current thread's context stack. If no context exists, it will create
174    /// a new empty context and push it onto the stack.
175    ///
176    /// # Examples
177    ///
178    /// ```rust
179    /// use es_entity::context::EventContext;
180    ///
181    /// let ctx = EventContext::current();
182    /// // Context is now available for the current thread
183    /// ```
184    pub fn current() -> Self {
185        CONTEXT_STACK.with(|c| {
186            let mut stack = c.borrow_mut();
187            if let Some(last) = stack.last() {
188                return EventContext {
189                    id: last.id.clone(),
190                };
191            }
192
193            let id = Rc::new(());
194            let data = ContextData::new();
195            stack.push(StackEntry {
196                id: id.clone(),
197                data,
198            });
199
200            EventContext { id }
201        })
202    }
203
204    /// Creates a new event context seeded with the provided data.
205    ///
206    /// This creates a completely new context stack entry with the given context data,
207    /// independent of any existing context. This is useful for starting fresh contexts
208    /// in new threads or async tasks.
209    ///
210    /// # Arguments
211    ///
212    /// * `data` - The initial context data for the new context
213    ///
214    /// # Examples
215    ///
216    /// ```rust
217    /// use es_entity::context::{EventContext, ContextData};
218    ///
219    /// let data = EventContext::current().data();
220    /// let new_ctx = EventContext::seed(data);
221    /// // new_ctx now has its own independent context stack
222    /// ```
223    pub fn seed(data: ContextData) -> Self {
224        CONTEXT_STACK.with(|c| {
225            let mut stack = c.borrow_mut();
226            let id = Rc::new(());
227            stack.push(StackEntry {
228                id: id.clone(),
229                data,
230            });
231
232            EventContext { id }
233        })
234    }
235
236    /// Creates a new isolated context that inherits data from the current context.
237    ///
238    /// This method creates a child context that starts with a copy of the current
239    /// context's data. Changes made to the forked context will not affect the parent
240    /// context, and when the forked context is dropped, the parent context remains
241    /// unchanged. This is useful for creating isolated scopes within the same thread.
242    ///
243    /// # Examples
244    ///
245    /// ```rust
246    /// use es_entity::context::EventContext;
247    ///
248    /// let mut parent = EventContext::current();
249    /// parent.insert("shared", &"value").unwrap();
250    ///
251    /// {
252    ///     let mut child = EventContext::fork();
253    ///     child.insert("child_only", &"data").unwrap();
254    ///     // child context has both "shared" and "child_only"
255    /// }
256    /// // parent context only has "shared" - "child_only" is gone
257    /// ```
258    pub fn fork() -> Self {
259        let current = Self::current();
260        let data = current.data();
261        Self::seed(data)
262    }
263
264    /// Inserts a key-value pair into the current context.
265    ///
266    /// The value will be serialized to JSON and stored in the context data.
267    /// This data will be available to all code running within this context
268    /// and any child contexts created via `fork()`.
269    ///
270    /// # Arguments
271    ///
272    /// * `key` - A static string key to identify the value
273    /// * `value` - Any serializable value to store in the context
274    ///
275    /// # Returns
276    ///
277    /// Returns `Ok(())` on success or a `serde_json::Error` if serialization fails.
278    ///
279    /// # Examples
280    ///
281    /// ```rust
282    /// use es_entity::context::EventContext;
283    ///
284    /// let mut ctx = EventContext::current();
285    /// ctx.insert("user_id", &"12345").unwrap();
286    /// ctx.insert("operation", &"transfer").unwrap();
287    /// ```
288    pub fn insert<T: Serialize>(
289        &mut self,
290        key: &'static str,
291        value: &T,
292    ) -> Result<(), serde_json::Error> {
293        let json_value = serde_json::to_value(value)?;
294
295        CONTEXT_STACK.with(|c| {
296            let mut stack = c.borrow_mut();
297            for entry in stack.iter_mut().rev() {
298                if Rc::ptr_eq(&entry.id, &self.id) {
299                    entry.data.insert(key, json_value);
300                    return;
301                }
302            }
303            panic!("EventContext missing on CONTEXT_STACK")
304        });
305
306        Ok(())
307    }
308
309    /// Returns a copy of the current context data.
310    ///
311    /// This method returns a snapshot of all key-value pairs stored in this context.
312    /// The returned [`ContextData`] can be used to seed new contexts or passed to
313    /// async tasks to maintain context across thread boundaries.
314    ///
315    /// # Examples
316    ///
317    /// ```rust
318    /// use es_entity::context::EventContext;
319    ///
320    /// let mut ctx = EventContext::current();
321    /// ctx.insert("request_id", &"abc123").unwrap();
322    ///
323    /// let data = ctx.data();
324    /// // data now contains a copy of the context with request_id
325    /// ```
326    pub fn data(&self) -> ContextData {
327        CONTEXT_STACK.with(|c| {
328            let stack = c.borrow();
329            for entry in stack.iter().rev() {
330                if Rc::ptr_eq(&entry.id, &self.id) {
331                    return entry.data.clone();
332                }
333            }
334            panic!("EventContext missing on CONTEXT_STACK")
335        })
336    }
337
338    /// Serializes the current context data to JSON.
339    ///
340    /// This method is primarily used internally by the event persistence system
341    /// to store context data alongside events in the database. It converts all
342    /// context key-value pairs into a single JSON object.
343    ///
344    /// # Returns
345    ///
346    /// Returns a `serde_json::Value` containing all context data, or an error
347    /// if serialization fails.
348    ///
349    /// # Examples
350    ///
351    /// ```rust
352    /// use es_entity::context::EventContext;
353    ///
354    /// let mut ctx = EventContext::current();
355    /// ctx.insert("user_id", &"12345").unwrap();
356    ///
357    /// let json = ctx.as_json().unwrap();
358    /// // json is now: {"user_id": "12345"}
359    /// ```
360    #[allow(unused_mut)]
361    pub fn as_json(&self) -> Result<serde_json::Value, serde_json::Error> {
362        let mut data = self.data();
363        #[cfg(feature = "tracing")]
364        {
365            // Only inject if not already present
366            if !data.0.contains_key("tracing") {
367                let tracing = tracing::extract_current_tracing_context();
368                data.insert("tracing", serde_json::to_value(&tracing)?);
369            }
370        }
371        serde_json::to_value(&data)
372    }
373}
374
375#[cfg(test)]
376mod tests {
377    use super::*;
378
379    fn stack_depth() -> usize {
380        CONTEXT_STACK.with(|c| c.borrow().len())
381    }
382
383    fn current_json() -> serde_json::Value {
384        EventContext::current().as_json().unwrap()
385    }
386
387    #[test]
388    fn assert_stack_depth() {
389        fn assert_inner() {
390            let _ctx = EventContext::current();
391            assert_eq!(stack_depth(), 1);
392        }
393        assert_eq!(stack_depth(), 0);
394        {
395            let _ctx = EventContext::current();
396            assert_eq!(stack_depth(), 1);
397            assert_inner();
398        }
399        assert_eq!(stack_depth(), 0);
400    }
401
402    #[test]
403    fn insert() {
404        fn insert_inner(value: &serde_json::Value) {
405            let mut ctx = EventContext::current();
406            ctx.insert("new_data", &value).unwrap();
407            assert_eq!(
408                current_json(),
409                serde_json::json!({ "data": value, "new_data": value})
410            );
411        }
412
413        let mut ctx = EventContext::current();
414        assert_eq!(current_json(), serde_json::json!({}));
415        let value = serde_json::json!({ "hello": "world" });
416        ctx.insert("data", &value).unwrap();
417        assert_eq!(current_json(), serde_json::json!({ "data": value }));
418        insert_inner(&value);
419        assert_eq!(
420            current_json(),
421            serde_json::json!({ "data": value, "new_data": value})
422        );
423        let new_value = serde_json::json!({ "hello": "new_world" });
424        ctx.insert("data", &new_value).unwrap();
425        assert_eq!(
426            current_json(),
427            serde_json::json!({ "data": new_value, "new_data": value})
428        );
429    }
430
431    #[test]
432    fn thread_isolation() {
433        let mut ctx = EventContext::current();
434        let value = serde_json::json!({ "main": "thread" });
435        ctx.insert("data", &value).unwrap();
436        assert_eq!(stack_depth(), 1);
437
438        let ctx_data = ctx.data();
439        let handle = std::thread::spawn(move || {
440            assert_eq!(stack_depth(), 0);
441            let mut ctx = EventContext::seed(ctx_data);
442            assert_eq!(stack_depth(), 1);
443            ctx.insert("thread", &serde_json::json!("local")).unwrap();
444            assert_eq!(
445                current_json(),
446                serde_json::json!({ "data": { "main": "thread" }, "thread": "local" }),
447            );
448        });
449
450        handle.join().unwrap();
451        assert_eq!(current_json(), serde_json::json!({ "data": value }));
452    }
453
454    #[tokio::test]
455    async fn async_context() {
456        async fn inner_async() {
457            let mut ctx = EventContext::current();
458            ctx.insert("async_inner", &serde_json::json!("value"))
459                .unwrap();
460            assert_eq!(
461                current_json(),
462                serde_json::json!({ "async_data": { "test": "async" }, "async_inner": "value" })
463            );
464        }
465
466        let mut ctx = EventContext::current();
467        assert_eq!(current_json(), serde_json::json!({}));
468
469        let value = serde_json::json!({ "test": "async" });
470        ctx.insert("async_data", &value).unwrap();
471        assert_eq!(current_json(), serde_json::json!({ "async_data": value }));
472
473        inner_async().await;
474
475        assert_eq!(
476            current_json(),
477            serde_json::json!({ "async_data": value, "async_inner": "value" })
478        );
479    }
480
481    #[test]
482    fn fork() {
483        let mut ctx = EventContext::current();
484        ctx.insert("original", &serde_json::json!("value")).unwrap();
485        assert_eq!(stack_depth(), 1);
486        assert_eq!(current_json(), serde_json::json!({ "original": "value" }));
487
488        let mut forked = EventContext::fork();
489        assert_eq!(stack_depth(), 2);
490        assert_eq!(current_json(), serde_json::json!({ "original": "value" }));
491
492        forked.insert("forked", &serde_json::json!("data")).unwrap();
493        assert_eq!(
494            current_json(),
495            serde_json::json!({ "original": "value", "forked": "data" })
496        );
497
498        drop(forked);
499
500        assert_eq!(stack_depth(), 1);
501        assert_eq!(current_json(), serde_json::json!({ "original": "value" }));
502    }
503
504    #[tokio::test]
505    async fn with_event_context_spawned() {
506        let mut ctx = EventContext::current();
507        ctx.insert("parent", &serde_json::json!("context")).unwrap();
508
509        let handle = tokio::spawn(
510            async {
511                assert_eq!(stack_depth(), 2);
512
513                EventContext::current()
514                    .insert("spawned", &serde_json::json!("value"))
515                    .unwrap();
516
517                assert_eq!(
518                    current_json(),
519                    serde_json::json!({ "parent": "context", "spawned": "value" })
520                );
521                tokio::task::yield_now().await;
522                current_json()
523            }
524            .with_event_context(ctx.data()),
525        );
526
527        let result = handle.await.unwrap();
528        assert_eq!(
529            result,
530            serde_json::json!({ "parent": "context", "spawned": "value" })
531        );
532
533        assert_eq!(current_json(), serde_json::json!({ "parent": "context" }));
534    }
535
536    #[tokio::test(flavor = "multi_thread")]
537    async fn with_event_context_spawned_multi_thread() {
538        let mut ctx = EventContext::current();
539        ctx.insert("parent", &serde_json::json!("context")).unwrap();
540
541        let handle = tokio::spawn(
542            async {
543                assert_eq!(stack_depth(), 1);
544
545                EventContext::current()
546                    .insert("spawned", &serde_json::json!("value"))
547                    .unwrap();
548
549                assert_eq!(
550                    current_json(),
551                    serde_json::json!({ "parent": "context", "spawned": "value" })
552                );
553                let data = EventContext::current().data();
554                tokio::task::yield_now().with_event_context(data).await;
555                current_json()
556            }
557            .with_event_context(ctx.data()),
558        );
559
560        let result = handle.await.unwrap();
561        assert_eq!(
562            result,
563            serde_json::json!({ "parent": "context", "spawned": "value" })
564        );
565
566        assert_eq!(current_json(), serde_json::json!({ "parent": "context" }));
567    }
568}