es_entity/context/mod.rs
1//! Thread-local system for adding context data to persisted events.
2//!
3//! This module provides a context propagation system for event sourcing that allows
4//! attaching metadata (like request IDs, user IDs, or audit information) to events
5//! as they are created and persisted to the database.
6//!
7//! # Core Components
8//!
9//! - [`EventContext`]: Thread-local context manager (`!Send`) that maintains a stack
10//! of contexts within a single thread
11//! - [`ContextData`]: Immutable, thread-safe (`Send`) snapshot of context data that
12//! can be passed across thread boundaries
13//! - [`WithEventContext`]: Extension trait for `Future` types to propagate context
14//! across async boundaries
15//!
16//! # Usage Patterns
17//!
18//! ## Same Thread Context
19//! ```rust
20//! use es_entity::context::EventContext;
21//!
22//! let mut ctx = EventContext::current();
23//! ctx.insert("request_id", &"req-123").unwrap();
24//!
25//! // Fork for isolated scope
26//! {
27//! let mut child = EventContext::fork();
28//! child.insert("operation", &"update").unwrap();
29//! // Both request_id and operation are available
30//! }
31//! // Only request_id remains in parent
32//! ```
33//!
34//! ## Async Task Context
35//! ```rust
36//! use es_entity::context::{EventContext, WithEventContext};
37//!
38//! async fn spawn_with_context() {
39//! let mut ctx = EventContext::current();
40//! ctx.insert("user_id", &"user-456").unwrap();
41//!
42//! let data = ctx.data();
43//! tokio::spawn(async move {
44//! // Context is available in spawned task
45//! let ctx = EventContext::current();
46//! // Has user_id from parent
47//! }.with_event_context(data)).await.unwrap();
48//! }
49//! ```
50//!
51//! ## Cross-Thread Context
52//! ```rust
53//! use es_entity::context::EventContext;
54//!
55//! let mut ctx = EventContext::current();
56//! ctx.insert("trace_id", &"trace-789").unwrap();
57//! let data = ctx.data();
58//!
59//! std::thread::spawn(move || {
60//! let ctx = EventContext::seed(data);
61//! // New thread has trace_id
62//! });
63//! ```
64//!
65//! # Database Integration
66//!
67//! When events are persisted using repositories with `event_context = true`, the current
68//! context is automatically serialized to JSON and stored in a `context` column
69//! alongside the event data, enabling comprehensive audit trails and debugging.
70
71#[cfg(feature = "tracing")]
72mod tracing;
73mod with_event_context;
74
75use serde::Serialize;
76
77use std::{cell::RefCell, rc::Rc};
78
79pub use with_event_context::*;
80
81/// Immutable context data that can be safely shared across thread boundaries.
82///
83/// This struct holds key-value pairs of context information that gets attached
84/// to events when they are persisted. It uses an immutable HashMap internally
85/// for efficient cloning and thread-safe sharing of data snapshots.
86///
87/// `ContextData` is `Send` and can be passed between threads, unlike [`EventContext`]
88/// which is thread-local. This makes it suitable for transferring context across
89/// async boundaries via the [`WithEventContext`] trait.
90#[derive(Debug, Clone, Serialize)]
91#[serde(transparent)]
92pub struct ContextData(im::HashMap<&'static str, serde_json::Value>);
93
94impl ContextData {
95 fn new() -> Self {
96 Self(im::HashMap::new())
97 }
98
99 fn insert(&mut self, key: &'static str, value: serde_json::Value) {
100 self.0 = self.0.update(key, value);
101 }
102}
103
104struct StackEntry {
105 id: Rc<()>,
106 data: ContextData,
107}
108
109thread_local! {
110 static CONTEXT_STACK: RefCell<Vec<StackEntry>> = const { RefCell::new(Vec::new()) };
111}
112
113/// Thread-local event context for tracking metadata throughout event sourcing operations.
114///
115/// `EventContext` provides a way to attach contextual information (like request IDs, audit info,
116/// or operation metadata) to events as they are created and persisted. The context is managed
117/// as a thread-local stack, allowing for nested contexts within the same thread.
118///
119/// # Thread Safety
120///
121/// This struct is deliberately `!Send` to ensure thread-local safety. It uses `Rc` for reference
122/// counting which is not thread-safe. For propagating context across async boundaries or threads,
123/// use the [`WithEventContext`] trait which safely transfers context data.
124///
125/// # Usage Patterns
126///
127/// - **Same thread**: Use [`fork()`](Self::fork) to create isolated child contexts
128/// - **Async tasks**: Use [`with_event_context()`](WithEventContext::with_event_context) from the [`WithEventContext`] trait
129/// - **New threads**: Use [`seed()`](Self::seed) with data from [`data()`](Self::data) to transfer context
130///
131/// # Examples
132///
133/// ```rust
134/// use es_entity::context::EventContext;
135///
136/// // Create or get current context
137/// let mut ctx = EventContext::current();
138/// ctx.insert("user_id", &"123").unwrap();
139///
140/// // Fork for isolated scope
141/// {
142/// let mut child = EventContext::fork();
143/// child.insert("operation", &"update").unwrap();
144/// // Both user_id and operation are available here
145/// }
146/// // Only user_id remains in parent context
147/// ```
148pub struct EventContext {
149 id: Rc<()>,
150}
151
152impl Drop for EventContext {
153 fn drop(&mut self) {
154 // If strong_count is 2, it means this EventContext + one StackEntry reference
155 if Rc::strong_count(&self.id) == 2 {
156 CONTEXT_STACK.with(|c| {
157 let mut stack = c.borrow_mut();
158 for i in (0..stack.len()).rev() {
159 if Rc::ptr_eq(&stack[i].id, &self.id) {
160 stack.remove(i);
161 break;
162 }
163 }
164 });
165 }
166 }
167}
168
169impl EventContext {
170 /// Gets the current event context or creates a new one if none exists.
171 ///
172 /// This function is thread-local and will return a handle to the topmost context
173 /// on the current thread's context stack. If no context exists, it will create
174 /// a new empty context and push it onto the stack.
175 ///
176 /// # Examples
177 ///
178 /// ```rust
179 /// use es_entity::context::EventContext;
180 ///
181 /// let ctx = EventContext::current();
182 /// // Context is now available for the current thread
183 /// ```
184 pub fn current() -> Self {
185 CONTEXT_STACK.with(|c| {
186 let mut stack = c.borrow_mut();
187 if let Some(last) = stack.last() {
188 return EventContext {
189 id: last.id.clone(),
190 };
191 }
192
193 let id = Rc::new(());
194 let data = ContextData::new();
195 stack.push(StackEntry {
196 id: id.clone(),
197 data,
198 });
199
200 EventContext { id }
201 })
202 }
203
204 /// Creates a new event context seeded with the provided data.
205 ///
206 /// This creates a completely new context stack entry with the given context data,
207 /// independent of any existing context. This is useful for starting fresh contexts
208 /// in new threads or async tasks.
209 ///
210 /// # Arguments
211 ///
212 /// * `data` - The initial context data for the new context
213 ///
214 /// # Examples
215 ///
216 /// ```rust
217 /// use es_entity::context::{EventContext, ContextData};
218 ///
219 /// let data = EventContext::current().data();
220 /// let new_ctx = EventContext::seed(data);
221 /// // new_ctx now has its own independent context stack
222 /// ```
223 pub fn seed(data: ContextData) -> Self {
224 CONTEXT_STACK.with(|c| {
225 let mut stack = c.borrow_mut();
226 let id = Rc::new(());
227 stack.push(StackEntry {
228 id: id.clone(),
229 data,
230 });
231
232 EventContext { id }
233 })
234 }
235
236 /// Creates a new isolated context that inherits data from the current context.
237 ///
238 /// This method creates a child context that starts with a copy of the current
239 /// context's data. Changes made to the forked context will not affect the parent
240 /// context, and when the forked context is dropped, the parent context remains
241 /// unchanged. This is useful for creating isolated scopes within the same thread.
242 ///
243 /// # Examples
244 ///
245 /// ```rust
246 /// use es_entity::context::EventContext;
247 ///
248 /// let mut parent = EventContext::current();
249 /// parent.insert("shared", &"value").unwrap();
250 ///
251 /// {
252 /// let mut child = EventContext::fork();
253 /// child.insert("child_only", &"data").unwrap();
254 /// // child context has both "shared" and "child_only"
255 /// }
256 /// // parent context only has "shared" - "child_only" is gone
257 /// ```
258 pub fn fork() -> Self {
259 let current = Self::current();
260 let data = current.data();
261 Self::seed(data)
262 }
263
264 /// Inserts a key-value pair into the current context.
265 ///
266 /// The value will be serialized to JSON and stored in the context data.
267 /// This data will be available to all code running within this context
268 /// and any child contexts created via `fork()`.
269 ///
270 /// # Arguments
271 ///
272 /// * `key` - A static string key to identify the value
273 /// * `value` - Any serializable value to store in the context
274 ///
275 /// # Returns
276 ///
277 /// Returns `Ok(())` on success or a `serde_json::Error` if serialization fails.
278 ///
279 /// # Examples
280 ///
281 /// ```rust
282 /// use es_entity::context::EventContext;
283 ///
284 /// let mut ctx = EventContext::current();
285 /// ctx.insert("user_id", &"12345").unwrap();
286 /// ctx.insert("operation", &"transfer").unwrap();
287 /// ```
288 pub fn insert<T: Serialize>(
289 &mut self,
290 key: &'static str,
291 value: &T,
292 ) -> Result<(), serde_json::Error> {
293 let json_value = serde_json::to_value(value)?;
294
295 CONTEXT_STACK.with(|c| {
296 let mut stack = c.borrow_mut();
297 for entry in stack.iter_mut().rev() {
298 if Rc::ptr_eq(&entry.id, &self.id) {
299 entry.data.insert(key, json_value);
300 return;
301 }
302 }
303 panic!("EventContext missing on CONTEXT_STACK")
304 });
305
306 Ok(())
307 }
308
309 /// Returns a copy of the current context data.
310 ///
311 /// This method returns a snapshot of all key-value pairs stored in this context.
312 /// The returned [`ContextData`] can be used to seed new contexts or passed to
313 /// async tasks to maintain context across thread boundaries.
314 ///
315 /// # Examples
316 ///
317 /// ```rust
318 /// use es_entity::context::EventContext;
319 ///
320 /// let mut ctx = EventContext::current();
321 /// ctx.insert("request_id", &"abc123").unwrap();
322 ///
323 /// let data = ctx.data();
324 /// // data now contains a copy of the context with request_id
325 /// ```
326 pub fn data(&self) -> ContextData {
327 CONTEXT_STACK.with(|c| {
328 let stack = c.borrow();
329 for entry in stack.iter().rev() {
330 if Rc::ptr_eq(&entry.id, &self.id) {
331 return entry.data.clone();
332 }
333 }
334 panic!("EventContext missing on CONTEXT_STACK")
335 })
336 }
337
338 /// Serializes the current context data to JSON.
339 ///
340 /// This method is primarily used internally by the event persistence system
341 /// to store context data alongside events in the database. It converts all
342 /// context key-value pairs into a single JSON object.
343 ///
344 /// # Returns
345 ///
346 /// Returns a `serde_json::Value` containing all context data, or an error
347 /// if serialization fails.
348 ///
349 /// # Examples
350 ///
351 /// ```rust
352 /// use es_entity::context::EventContext;
353 ///
354 /// let mut ctx = EventContext::current();
355 /// ctx.insert("user_id", &"12345").unwrap();
356 ///
357 /// let json = ctx.as_json().unwrap();
358 /// // json is now: {"user_id": "12345"}
359 /// ```
360 #[allow(unused_mut)]
361 pub fn as_json(&self) -> Result<serde_json::Value, serde_json::Error> {
362 let mut data = self.data();
363 #[cfg(feature = "tracing")]
364 {
365 // Only inject if not already present
366 if !data.0.contains_key("tracing") {
367 let tracing = tracing::extract_current_tracing_context();
368 data.insert("tracing", serde_json::to_value(&tracing)?);
369 }
370 }
371 serde_json::to_value(&data)
372 }
373}
374
375#[cfg(test)]
376mod tests {
377 use super::*;
378
379 fn stack_depth() -> usize {
380 CONTEXT_STACK.with(|c| c.borrow().len())
381 }
382
383 fn current_json() -> serde_json::Value {
384 EventContext::current().as_json().unwrap()
385 }
386
387 #[test]
388 fn assert_stack_depth() {
389 fn assert_inner() {
390 let _ctx = EventContext::current();
391 assert_eq!(stack_depth(), 1);
392 }
393 assert_eq!(stack_depth(), 0);
394 {
395 let _ctx = EventContext::current();
396 assert_eq!(stack_depth(), 1);
397 assert_inner();
398 }
399 assert_eq!(stack_depth(), 0);
400 }
401
402 #[test]
403 fn insert() {
404 fn insert_inner(value: &serde_json::Value) {
405 let mut ctx = EventContext::current();
406 ctx.insert("new_data", &value).unwrap();
407 assert_eq!(
408 current_json(),
409 serde_json::json!({ "data": value, "new_data": value})
410 );
411 }
412
413 let mut ctx = EventContext::current();
414 assert_eq!(current_json(), serde_json::json!({}));
415 let value = serde_json::json!({ "hello": "world" });
416 ctx.insert("data", &value).unwrap();
417 assert_eq!(current_json(), serde_json::json!({ "data": value }));
418 insert_inner(&value);
419 assert_eq!(
420 current_json(),
421 serde_json::json!({ "data": value, "new_data": value})
422 );
423 let new_value = serde_json::json!({ "hello": "new_world" });
424 ctx.insert("data", &new_value).unwrap();
425 assert_eq!(
426 current_json(),
427 serde_json::json!({ "data": new_value, "new_data": value})
428 );
429 }
430
431 #[test]
432 fn thread_isolation() {
433 let mut ctx = EventContext::current();
434 let value = serde_json::json!({ "main": "thread" });
435 ctx.insert("data", &value).unwrap();
436 assert_eq!(stack_depth(), 1);
437
438 let ctx_data = ctx.data();
439 let handle = std::thread::spawn(move || {
440 assert_eq!(stack_depth(), 0);
441 let mut ctx = EventContext::seed(ctx_data);
442 assert_eq!(stack_depth(), 1);
443 ctx.insert("thread", &serde_json::json!("local")).unwrap();
444 assert_eq!(
445 current_json(),
446 serde_json::json!({ "data": { "main": "thread" }, "thread": "local" }),
447 );
448 });
449
450 handle.join().unwrap();
451 assert_eq!(current_json(), serde_json::json!({ "data": value }));
452 }
453
454 #[tokio::test]
455 async fn async_context() {
456 async fn inner_async() {
457 let mut ctx = EventContext::current();
458 ctx.insert("async_inner", &serde_json::json!("value"))
459 .unwrap();
460 assert_eq!(
461 current_json(),
462 serde_json::json!({ "async_data": { "test": "async" }, "async_inner": "value" })
463 );
464 }
465
466 let mut ctx = EventContext::current();
467 assert_eq!(current_json(), serde_json::json!({}));
468
469 let value = serde_json::json!({ "test": "async" });
470 ctx.insert("async_data", &value).unwrap();
471 assert_eq!(current_json(), serde_json::json!({ "async_data": value }));
472
473 inner_async().await;
474
475 assert_eq!(
476 current_json(),
477 serde_json::json!({ "async_data": value, "async_inner": "value" })
478 );
479 }
480
481 #[test]
482 fn fork() {
483 let mut ctx = EventContext::current();
484 ctx.insert("original", &serde_json::json!("value")).unwrap();
485 assert_eq!(stack_depth(), 1);
486 assert_eq!(current_json(), serde_json::json!({ "original": "value" }));
487
488 let mut forked = EventContext::fork();
489 assert_eq!(stack_depth(), 2);
490 assert_eq!(current_json(), serde_json::json!({ "original": "value" }));
491
492 forked.insert("forked", &serde_json::json!("data")).unwrap();
493 assert_eq!(
494 current_json(),
495 serde_json::json!({ "original": "value", "forked": "data" })
496 );
497
498 drop(forked);
499
500 assert_eq!(stack_depth(), 1);
501 assert_eq!(current_json(), serde_json::json!({ "original": "value" }));
502 }
503
504 #[tokio::test]
505 async fn with_event_context_spawned() {
506 let mut ctx = EventContext::current();
507 ctx.insert("parent", &serde_json::json!("context")).unwrap();
508
509 let handle = tokio::spawn(
510 async {
511 assert_eq!(stack_depth(), 2);
512
513 EventContext::current()
514 .insert("spawned", &serde_json::json!("value"))
515 .unwrap();
516
517 assert_eq!(
518 current_json(),
519 serde_json::json!({ "parent": "context", "spawned": "value" })
520 );
521 tokio::task::yield_now().await;
522 current_json()
523 }
524 .with_event_context(ctx.data()),
525 );
526
527 let result = handle.await.unwrap();
528 assert_eq!(
529 result,
530 serde_json::json!({ "parent": "context", "spawned": "value" })
531 );
532
533 assert_eq!(current_json(), serde_json::json!({ "parent": "context" }));
534 }
535
536 #[tokio::test(flavor = "multi_thread")]
537 async fn with_event_context_spawned_multi_thread() {
538 let mut ctx = EventContext::current();
539 ctx.insert("parent", &serde_json::json!("context")).unwrap();
540
541 let handle = tokio::spawn(
542 async {
543 assert_eq!(stack_depth(), 1);
544
545 EventContext::current()
546 .insert("spawned", &serde_json::json!("value"))
547 .unwrap();
548
549 assert_eq!(
550 current_json(),
551 serde_json::json!({ "parent": "context", "spawned": "value" })
552 );
553 let data = EventContext::current().data();
554 tokio::task::yield_now().with_event_context(data).await;
555 current_json()
556 }
557 .with_event_context(ctx.data()),
558 );
559
560 let result = handle.await.unwrap();
561 assert_eq!(
562 result,
563 serde_json::json!({ "parent": "context", "spawned": "value" })
564 );
565
566 assert_eq!(current_json(), serde_json::json!({ "parent": "context" }));
567 }
568}