es_entity/context/
mod.rs

1//! Thread-local system for adding context data to persisted events.
2//!
3//! This module provides a context propagation system for event sourcing that allows
4//! attaching metadata (like request IDs, user IDs, or audit information) to events
5//! as they are created and persisted to the database.
6//!
7//! # Core Components
8//!
9//! - [`EventContext`]: Thread-local context manager (`!Send`) that maintains a stack
10//!   of contexts within a single thread
11//! - [`ContextData`]: Immutable, thread-safe (`Send`) snapshot of context data that
12//!   can be passed across thread boundaries
13//! - [`WithEventContext`]: Extension trait for `Future` types to propagate context
14//!   across async boundaries
15//!
16//! # Usage Patterns
17//!
18//! ## Same Thread Context
19//! ```rust
20//! use es_entity::context::EventContext;
21//!
22//! let mut ctx = EventContext::current();
23//! ctx.insert("request_id", &"req-123").unwrap();
24//!
25//! // Fork for isolated scope
26//! {
27//!     let mut child = EventContext::fork();
28//!     child.insert("operation", &"update").unwrap();
29//!     // Both request_id and operation are available
30//! }
31//! // Only request_id remains in parent
32//! ```
33//!
34//! ## Async Task Context
35//! ```rust
36//! use es_entity::context::{EventContext, WithEventContext};
37//!
38//! async fn spawn_with_context() {
39//!     let mut ctx = EventContext::current();
40//!     ctx.insert("user_id", &"user-456").unwrap();
41//!
42//!     let data = ctx.data();
43//!     tokio::spawn(async move {
44//!         // Context is available in spawned task
45//!         let ctx = EventContext::current();
46//!         // Has user_id from parent
47//!     }.with_event_context(data)).await.unwrap();
48//! }
49//! ```
50//!
51//! ## Cross-Thread Context
52//! ```rust
53//! use es_entity::context::EventContext;
54//!
55//! let mut ctx = EventContext::current();
56//! ctx.insert("trace_id", &"trace-789").unwrap();
57//! let data = ctx.data();
58//!
59//! std::thread::spawn(move || {
60//!     let ctx = EventContext::seed(data);
61//!     // New thread has trace_id
62//! });
63//! ```
64//!
65//! # Database Integration
66//!
67//! When events are persisted using repositories with `event_context = true`, the current
68//! context is automatically serialized to JSON and stored in a `context` column
69//! alongside the event data, enabling comprehensive audit trails and debugging.
70
71mod sqlx;
72mod tracing;
73mod with_event_context;
74
75use serde::{Deserialize, Serialize};
76
77use std::{borrow::Cow, cell::RefCell, rc::Rc};
78
79pub use tracing::*;
80pub use with_event_context::*;
81
82/// Immutable context data that can be safely shared across thread boundaries.
83///
84/// This struct holds key-value pairs of context information that gets attached
85/// to events when they are persisted. It uses an immutable HashMap internally
86/// for efficient cloning and thread-safe sharing of data snapshots.
87///
88/// `ContextData` is `Send` and can be passed between threads, unlike [`EventContext`]
89/// which is thread-local. This makes it suitable for transferring context across
90/// async boundaries via the [`WithEventContext`] trait.
91#[derive(Debug, Clone, Serialize, Deserialize)]
92#[serde(transparent)]
93pub struct ContextData(im::HashMap<Cow<'static, str>, serde_json::Value>);
94
95impl ContextData {
96    fn new() -> Self {
97        Self(im::HashMap::new())
98    }
99
100    fn insert(&mut self, key: &'static str, value: serde_json::Value) {
101        self.0 = self.0.update(Cow::Borrowed(key), value);
102    }
103
104    #[cfg(feature = "tracing-context")]
105    pub(crate) fn with_tracing_info(mut self) -> Self {
106        let tracing = TracingContext::current();
107        self.insert(
108            "tracing",
109            serde_json::to_value(&tracing).expect("Could not inject tracing"),
110        );
111        self
112    }
113
114    pub fn lookup<T: serde::de::DeserializeOwned>(
115        &self,
116        key: &'static str,
117    ) -> Result<Option<T>, serde_json::Error> {
118        let Some(val) = self.0.get(key) else {
119            return Ok(None);
120        };
121        serde_json::from_value(val.clone()).map(Some)
122    }
123}
124
125struct StackEntry {
126    id: Rc<()>,
127    data: ContextData,
128}
129
130thread_local! {
131    static CONTEXT_STACK: RefCell<Vec<StackEntry>> = const { RefCell::new(Vec::new()) };
132}
133
134/// Thread-local event context for tracking metadata throughout event sourcing operations.
135///
136/// `EventContext` provides a way to attach contextual information (like request IDs, audit info,
137/// or operation metadata) to events as they are created and persisted. The context is managed
138/// as a thread-local stack, allowing for nested contexts within the same thread.
139///
140/// # Thread Safety
141///
142/// This struct is deliberately `!Send` to ensure thread-local safety. It uses `Rc` for reference
143/// counting which is not thread-safe. For propagating context across async boundaries or threads,
144/// use the [`WithEventContext`] trait which safely transfers context data.
145///
146/// # Usage Patterns
147///
148/// - **Same thread**: Use [`fork()`](Self::fork) to create isolated child contexts
149/// - **Async tasks**: Use [`with_event_context()`](WithEventContext::with_event_context) from the [`WithEventContext`] trait
150/// - **New threads**: Use [`seed()`](Self::seed) with data from [`data()`](Self::data) to transfer context
151///
152/// # Examples
153///
154/// ```rust
155/// use es_entity::context::EventContext;
156///
157/// // Create or get current context
158/// let mut ctx = EventContext::current();
159/// ctx.insert("user_id", &"123").unwrap();
160///
161/// // Fork for isolated scope
162/// {
163///     let mut child = EventContext::fork();
164///     child.insert("operation", &"update").unwrap();
165///     // Both user_id and operation are available here
166/// }
167/// // Only user_id remains in parent context
168/// ```
169pub struct EventContext {
170    id: Rc<()>,
171}
172
173impl Drop for EventContext {
174    fn drop(&mut self) {
175        // If strong_count is 2, it means this EventContext + one StackEntry reference
176        if Rc::strong_count(&self.id) == 2 {
177            CONTEXT_STACK.with(|c| {
178                let mut stack = c.borrow_mut();
179                for i in (0..stack.len()).rev() {
180                    if Rc::ptr_eq(&stack[i].id, &self.id) {
181                        stack.remove(i);
182                        break;
183                    }
184                }
185            });
186        }
187    }
188}
189
190impl EventContext {
191    /// Gets the current event context or creates a new one if none exists.
192    ///
193    /// This function is thread-local and will return a handle to the topmost context
194    /// on the current thread's context stack. If no context exists, it will create
195    /// a new empty context and push it onto the stack.
196    ///
197    /// # Examples
198    ///
199    /// ```rust
200    /// use es_entity::context::EventContext;
201    ///
202    /// let ctx = EventContext::current();
203    /// // Context is now available for the current thread
204    /// ```
205    pub fn current() -> Self {
206        CONTEXT_STACK.with(|c| {
207            let mut stack = c.borrow_mut();
208            if let Some(last) = stack.last() {
209                return EventContext {
210                    id: last.id.clone(),
211                };
212            }
213
214            let id = Rc::new(());
215            let data = ContextData::new();
216            stack.push(StackEntry {
217                id: id.clone(),
218                data,
219            });
220
221            EventContext { id }
222        })
223    }
224
225    /// Creates a new event context seeded with the provided data.
226    ///
227    /// This creates a completely new context stack entry with the given context data,
228    /// independent of any existing context. This is useful for starting fresh contexts
229    /// in new threads or async tasks.
230    ///
231    /// # Arguments
232    ///
233    /// * `data` - The initial context data for the new context
234    ///
235    /// # Examples
236    ///
237    /// ```rust
238    /// use es_entity::context::{EventContext, ContextData};
239    ///
240    /// let data = EventContext::current().data();
241    /// let new_ctx = EventContext::seed(data);
242    /// // new_ctx now has its own independent context stack
243    /// ```
244    pub fn seed(data: ContextData) -> Self {
245        CONTEXT_STACK.with(|c| {
246            let mut stack = c.borrow_mut();
247            let id = Rc::new(());
248            stack.push(StackEntry {
249                id: id.clone(),
250                data,
251            });
252
253            EventContext { id }
254        })
255    }
256
257    /// Creates a new isolated context that inherits data from the current context.
258    ///
259    /// This method creates a child context that starts with a copy of the current
260    /// context's data. Changes made to the forked context will not affect the parent
261    /// context, and when the forked context is dropped, the parent context remains
262    /// unchanged. This is useful for creating isolated scopes within the same thread.
263    ///
264    /// # Examples
265    ///
266    /// ```rust
267    /// use es_entity::context::EventContext;
268    ///
269    /// let mut parent = EventContext::current();
270    /// parent.insert("shared", &"value").unwrap();
271    ///
272    /// {
273    ///     let mut child = EventContext::fork();
274    ///     child.insert("child_only", &"data").unwrap();
275    ///     // child context has both "shared" and "child_only"
276    /// }
277    /// // parent context only has "shared" - "child_only" is gone
278    /// ```
279    pub fn fork() -> Self {
280        let current = Self::current();
281        let data = current.data();
282        Self::seed(data)
283    }
284
285    /// Inserts a key-value pair into the current context.
286    ///
287    /// The value will be serialized to JSON and stored in the context data.
288    /// This data will be available to all code running within this context
289    /// and any child contexts created via `fork()`.
290    ///
291    /// # Arguments
292    ///
293    /// * `key` - A static string key to identify the value
294    /// * `value` - Any serializable value to store in the context
295    ///
296    /// # Returns
297    ///
298    /// Returns `Ok(())` on success or a `serde_json::Error` if serialization fails.
299    ///
300    /// # Examples
301    ///
302    /// ```rust
303    /// use es_entity::context::EventContext;
304    ///
305    /// let mut ctx = EventContext::current();
306    /// ctx.insert("user_id", &"12345").unwrap();
307    /// ctx.insert("operation", &"transfer").unwrap();
308    /// ```
309    pub fn insert<T: Serialize>(
310        &mut self,
311        key: &'static str,
312        value: &T,
313    ) -> Result<(), serde_json::Error> {
314        let json_value = serde_json::to_value(value)?;
315
316        CONTEXT_STACK.with(|c| {
317            let mut stack = c.borrow_mut();
318            for entry in stack.iter_mut().rev() {
319                if Rc::ptr_eq(&entry.id, &self.id) {
320                    entry.data.insert(key, json_value);
321                    return;
322                }
323            }
324            panic!("EventContext missing on CONTEXT_STACK")
325        });
326
327        Ok(())
328    }
329
330    /// Returns a copy of the current context data.
331    ///
332    /// This method returns a snapshot of all key-value pairs stored in this context.
333    /// The returned [`ContextData`] can be used to seed new contexts or passed to
334    /// async tasks to maintain context across thread boundaries.
335    ///
336    /// # Examples
337    ///
338    /// ```rust
339    /// use es_entity::context::EventContext;
340    ///
341    /// let mut ctx = EventContext::current();
342    /// ctx.insert("request_id", &"abc123").unwrap();
343    ///
344    /// let data = ctx.data();
345    /// // data now contains a copy of the context with request_id
346    /// ```
347    pub fn data(&self) -> ContextData {
348        CONTEXT_STACK.with(|c| {
349            let stack = c.borrow();
350            for entry in stack.iter().rev() {
351                if Rc::ptr_eq(&entry.id, &self.id) {
352                    return entry.data.clone();
353                }
354            }
355            panic!("EventContext missing on CONTEXT_STACK")
356        })
357    }
358
359    #[allow(unused_mut)]
360    pub(crate) fn data_for_storing() -> ContextData {
361        let mut data = Self::current().data();
362        #[cfg(feature = "tracing-context")]
363        {
364            data = data.with_tracing_info();
365        }
366        data
367    }
368}
369
370#[cfg(test)]
371mod tests {
372    use super::*;
373
374    fn stack_depth() -> usize {
375        CONTEXT_STACK.with(|c| c.borrow().len())
376    }
377
378    fn current_json() -> serde_json::Value {
379        serde_json::to_value(EventContext::current().data()).unwrap()
380    }
381
382    #[test]
383    fn assert_stack_depth() {
384        fn assert_inner() {
385            let _ctx = EventContext::current();
386            assert_eq!(stack_depth(), 1);
387        }
388        assert_eq!(stack_depth(), 0);
389        {
390            let _ctx = EventContext::current();
391            assert_eq!(stack_depth(), 1);
392            assert_inner();
393        }
394        assert_eq!(stack_depth(), 0);
395    }
396
397    #[test]
398    fn insert() {
399        fn insert_inner(value: &serde_json::Value) {
400            let mut ctx = EventContext::current();
401            ctx.insert("new_data", &value).unwrap();
402            assert_eq!(
403                current_json(),
404                serde_json::json!({ "data": value, "new_data": value})
405            );
406        }
407
408        let mut ctx = EventContext::current();
409        assert_eq!(current_json(), serde_json::json!({}));
410        let value = serde_json::json!({ "hello": "world" });
411        ctx.insert("data", &value).unwrap();
412        assert_eq!(current_json(), serde_json::json!({ "data": value }));
413        insert_inner(&value);
414        assert_eq!(
415            current_json(),
416            serde_json::json!({ "data": value, "new_data": value})
417        );
418        let new_value = serde_json::json!({ "hello": "new_world" });
419        ctx.insert("data", &new_value).unwrap();
420        assert_eq!(
421            current_json(),
422            serde_json::json!({ "data": new_value, "new_data": value})
423        );
424    }
425
426    #[test]
427    fn thread_isolation() {
428        let mut ctx = EventContext::current();
429        let value = serde_json::json!({ "main": "thread" });
430        ctx.insert("data", &value).unwrap();
431        assert_eq!(stack_depth(), 1);
432
433        let ctx_data = ctx.data();
434        let handle = std::thread::spawn(move || {
435            assert_eq!(stack_depth(), 0);
436            let mut ctx = EventContext::seed(ctx_data);
437            assert_eq!(stack_depth(), 1);
438            ctx.insert("thread", &serde_json::json!("local")).unwrap();
439            assert_eq!(
440                current_json(),
441                serde_json::json!({ "data": { "main": "thread" }, "thread": "local" }),
442            );
443        });
444
445        handle.join().unwrap();
446        assert_eq!(current_json(), serde_json::json!({ "data": value }));
447    }
448
449    #[tokio::test]
450    async fn async_context() {
451        async fn inner_async() {
452            let mut ctx = EventContext::current();
453            ctx.insert("async_inner", &serde_json::json!("value"))
454                .unwrap();
455            assert_eq!(
456                current_json(),
457                serde_json::json!({ "async_data": { "test": "async" }, "async_inner": "value" })
458            );
459        }
460
461        let mut ctx = EventContext::current();
462        assert_eq!(current_json(), serde_json::json!({}));
463
464        let value = serde_json::json!({ "test": "async" });
465        ctx.insert("async_data", &value).unwrap();
466        assert_eq!(current_json(), serde_json::json!({ "async_data": value }));
467
468        inner_async().await;
469
470        assert_eq!(
471            current_json(),
472            serde_json::json!({ "async_data": value, "async_inner": "value" })
473        );
474    }
475
476    #[test]
477    fn fork() {
478        let mut ctx = EventContext::current();
479        ctx.insert("original", &serde_json::json!("value")).unwrap();
480        assert_eq!(stack_depth(), 1);
481        assert_eq!(current_json(), serde_json::json!({ "original": "value" }));
482
483        let mut forked = EventContext::fork();
484        assert_eq!(stack_depth(), 2);
485        assert_eq!(current_json(), serde_json::json!({ "original": "value" }));
486
487        forked.insert("forked", &serde_json::json!("data")).unwrap();
488        assert_eq!(
489            current_json(),
490            serde_json::json!({ "original": "value", "forked": "data" })
491        );
492
493        drop(forked);
494
495        assert_eq!(stack_depth(), 1);
496        assert_eq!(current_json(), serde_json::json!({ "original": "value" }));
497    }
498
499    #[tokio::test]
500    async fn with_event_context_spawned() {
501        let mut ctx = EventContext::current();
502        ctx.insert("parent", &serde_json::json!("context")).unwrap();
503
504        let handle = tokio::spawn(
505            async {
506                assert_eq!(stack_depth(), 2);
507
508                EventContext::current()
509                    .insert("spawned", &serde_json::json!("value"))
510                    .unwrap();
511
512                assert_eq!(
513                    current_json(),
514                    serde_json::json!({ "parent": "context", "spawned": "value" })
515                );
516                tokio::task::yield_now().await;
517                current_json()
518            }
519            .with_event_context(ctx.data()),
520        );
521
522        let result = handle.await.unwrap();
523        assert_eq!(
524            result,
525            serde_json::json!({ "parent": "context", "spawned": "value" })
526        );
527
528        assert_eq!(current_json(), serde_json::json!({ "parent": "context" }));
529    }
530
531    #[tokio::test(flavor = "multi_thread")]
532    async fn with_event_context_spawned_multi_thread() {
533        let mut ctx = EventContext::current();
534        ctx.insert("parent", &serde_json::json!("context")).unwrap();
535
536        let handle = tokio::spawn(
537            async {
538                assert_eq!(stack_depth(), 1);
539
540                EventContext::current()
541                    .insert("spawned", &serde_json::json!("value"))
542                    .unwrap();
543
544                assert_eq!(
545                    current_json(),
546                    serde_json::json!({ "parent": "context", "spawned": "value" })
547                );
548                let data = EventContext::current().data();
549                tokio::task::yield_now().with_event_context(data).await;
550                current_json()
551            }
552            .with_event_context(ctx.data()),
553        );
554
555        let result = handle.await.unwrap();
556        assert_eq!(
557            result,
558            serde_json::json!({ "parent": "context", "spawned": "value" })
559        );
560
561        assert_eq!(current_json(), serde_json::json!({ "parent": "context" }));
562    }
563}