es_entity/context/mod.rs
1//! Thread-local system for adding context data to persisted events.
2//!
3//! This module provides a context propagation system for event sourcing that allows
4//! attaching metadata (like request IDs, user IDs, or audit information) to events
5//! as they are created and persisted to the database.
6//!
7//! # Core Components
8//!
9//! - [`EventContext`]: Thread-local context manager (`!Send`) that maintains a stack
10//! of contexts within a single thread
11//! - [`ContextData`]: Immutable, thread-safe (`Send`) snapshot of context data that
12//! can be passed across thread boundaries
13//! - [`WithEventContext`]: Extension trait for `Future` types to propagate context
14//! across async boundaries
15//!
16//! # Usage Patterns
17//!
18//! ## Same Thread Context
19//! ```rust
20//! use es_entity::context::EventContext;
21//!
22//! let mut ctx = EventContext::current();
23//! ctx.insert("request_id", &"req-123").unwrap();
24//!
25//! // Fork for isolated scope
26//! {
27//! let mut child = EventContext::fork();
28//! child.insert("operation", &"update").unwrap();
29//! // Both request_id and operation are available
30//! }
31//! // Only request_id remains in parent
32//! ```
33//!
34//! ## Async Task Context
35//! ```rust
36//! use es_entity::context::{EventContext, WithEventContext};
37//!
38//! async fn spawn_with_context() {
39//! let mut ctx = EventContext::current();
40//! ctx.insert("user_id", &"user-456").unwrap();
41//!
42//! let data = ctx.data();
43//! tokio::spawn(async move {
44//! // Context is available in spawned task
45//! let ctx = EventContext::current();
46//! // Has user_id from parent
47//! }.with_event_context(data)).await.unwrap();
48//! }
49//! ```
50//!
51//! ## Cross-Thread Context
52//! ```rust
53//! use es_entity::context::EventContext;
54//!
55//! let mut ctx = EventContext::current();
56//! ctx.insert("trace_id", &"trace-789").unwrap();
57//! let data = ctx.data();
58//!
59//! std::thread::spawn(move || {
60//! let ctx = EventContext::seed(data);
61//! // New thread has trace_id
62//! });
63//! ```
64//!
65//! # Database Integration
66//!
67//! When events are persisted using repositories with `event_ctx: true`, the current
68//! context is automatically serialized to JSON and stored in a `context` column
69//! alongside the event data, enabling comprehensive audit trails and debugging.
70
71mod with_event_context;
72
73use serde::Serialize;
74
75use std::{cell::RefCell, rc::Rc};
76
77pub use with_event_context::*;
78
79/// Immutable context data that can be safely shared across thread boundaries.
80///
81/// This struct holds key-value pairs of context information that gets attached
82/// to events when they are persisted. It uses an immutable HashMap internally
83/// for efficient cloning and thread-safe sharing of data snapshots.
84///
85/// `ContextData` is `Send` and can be passed between threads, unlike [`EventContext`]
86/// which is thread-local. This makes it suitable for transferring context across
87/// async boundaries via the [`WithEventContext`] trait.
88#[derive(Debug, Clone, Serialize)]
89#[serde(transparent)]
90pub struct ContextData(im::HashMap<&'static str, serde_json::Value>);
91
92impl ContextData {
93 fn new() -> Self {
94 Self(im::HashMap::new())
95 }
96
97 fn insert(&mut self, key: &'static str, value: serde_json::Value) {
98 self.0 = self.0.update(key, value);
99 }
100}
101
102struct StackEntry {
103 id: Rc<()>,
104 data: ContextData,
105}
106
107thread_local! {
108 static CONTEXT_STACK: RefCell<Vec<StackEntry>> = const { RefCell::new(Vec::new()) };
109}
110
111/// Thread-local event context for tracking metadata throughout event sourcing operations.
112///
113/// `EventContext` provides a way to attach contextual information (like request IDs, audit info,
114/// or operation metadata) to events as they are created and persisted. The context is managed
115/// as a thread-local stack, allowing for nested contexts within the same thread.
116///
117/// # Thread Safety
118///
119/// This struct is deliberately `!Send` to ensure thread-local safety. It uses `Rc` for reference
120/// counting which is not thread-safe. For propagating context across async boundaries or threads,
121/// use the [`WithEventContext`] trait which safely transfers context data.
122///
123/// # Usage Patterns
124///
125/// - **Same thread**: Use [`fork()`](Self::fork) to create isolated child contexts
126/// - **Async tasks**: Use [`with_event_context()`](WithEventContext::with_event_context) from the [`WithEventContext`] trait
127/// - **New threads**: Use [`seed()`](Self::seed) with data from [`data()`](Self::data) to transfer context
128///
129/// # Examples
130///
131/// ```rust
132/// use es_entity::context::EventContext;
133///
134/// // Create or get current context
135/// let mut ctx = EventContext::current();
136/// ctx.insert("user_id", &"123").unwrap();
137///
138/// // Fork for isolated scope
139/// {
140/// let mut child = EventContext::fork();
141/// child.insert("operation", &"update").unwrap();
142/// // Both user_id and operation are available here
143/// }
144/// // Only user_id remains in parent context
145/// ```
146pub struct EventContext {
147 id: Rc<()>,
148}
149
150impl Drop for EventContext {
151 fn drop(&mut self) {
152 // If strong_count is 2, it means this EventContext + one StackEntry reference
153 if Rc::strong_count(&self.id) == 2 {
154 CONTEXT_STACK.with(|c| {
155 let mut stack = c.borrow_mut();
156 for i in (0..stack.len()).rev() {
157 if Rc::ptr_eq(&stack[i].id, &self.id) {
158 stack.remove(i);
159 break;
160 }
161 }
162 });
163 }
164 }
165}
166
167impl EventContext {
168 /// Gets the current event context or creates a new one if none exists.
169 ///
170 /// This function is thread-local and will return a handle to the topmost context
171 /// on the current thread's context stack. If no context exists, it will create
172 /// a new empty context and push it onto the stack.
173 ///
174 /// # Examples
175 ///
176 /// ```rust
177 /// use es_entity::context::EventContext;
178 ///
179 /// let ctx = EventContext::current();
180 /// // Context is now available for the current thread
181 /// ```
182 pub fn current() -> Self {
183 CONTEXT_STACK.with(|c| {
184 let mut stack = c.borrow_mut();
185 if let Some(last) = stack.last() {
186 return EventContext {
187 id: last.id.clone(),
188 };
189 }
190
191 let id = Rc::new(());
192 let data = ContextData::new();
193 stack.push(StackEntry {
194 id: id.clone(),
195 data,
196 });
197
198 EventContext { id }
199 })
200 }
201
202 /// Creates a new event context seeded with the provided data.
203 ///
204 /// This creates a completely new context stack entry with the given context data,
205 /// independent of any existing context. This is useful for starting fresh contexts
206 /// in new threads or async tasks.
207 ///
208 /// # Arguments
209 ///
210 /// * `data` - The initial context data for the new context
211 ///
212 /// # Examples
213 ///
214 /// ```rust
215 /// use es_entity::context::{EventContext, ContextData};
216 ///
217 /// let data = EventContext::current().data();
218 /// let new_ctx = EventContext::seed(data);
219 /// // new_ctx now has its own independent context stack
220 /// ```
221 pub fn seed(data: ContextData) -> Self {
222 CONTEXT_STACK.with(|c| {
223 let mut stack = c.borrow_mut();
224 let id = Rc::new(());
225 stack.push(StackEntry {
226 id: id.clone(),
227 data,
228 });
229
230 EventContext { id }
231 })
232 }
233
234 /// Creates a new isolated context that inherits data from the current context.
235 ///
236 /// This method creates a child context that starts with a copy of the current
237 /// context's data. Changes made to the forked context will not affect the parent
238 /// context, and when the forked context is dropped, the parent context remains
239 /// unchanged. This is useful for creating isolated scopes within the same thread.
240 ///
241 /// # Examples
242 ///
243 /// ```rust
244 /// use es_entity::context::EventContext;
245 ///
246 /// let mut parent = EventContext::current();
247 /// parent.insert("shared", &"value").unwrap();
248 ///
249 /// {
250 /// let mut child = EventContext::fork();
251 /// child.insert("child_only", &"data").unwrap();
252 /// // child context has both "shared" and "child_only"
253 /// }
254 /// // parent context only has "shared" - "child_only" is gone
255 /// ```
256 pub fn fork() -> Self {
257 let current = Self::current();
258 let data = current.data();
259 Self::seed(data)
260 }
261
262 /// Inserts a key-value pair into the current context.
263 ///
264 /// The value will be serialized to JSON and stored in the context data.
265 /// This data will be available to all code running within this context
266 /// and any child contexts created via `fork()`.
267 ///
268 /// # Arguments
269 ///
270 /// * `key` - A static string key to identify the value
271 /// * `value` - Any serializable value to store in the context
272 ///
273 /// # Returns
274 ///
275 /// Returns `Ok(())` on success or a `serde_json::Error` if serialization fails.
276 ///
277 /// # Examples
278 ///
279 /// ```rust
280 /// use es_entity::context::EventContext;
281 ///
282 /// let mut ctx = EventContext::current();
283 /// ctx.insert("user_id", &"12345").unwrap();
284 /// ctx.insert("operation", &"transfer").unwrap();
285 /// ```
286 pub fn insert<T: Serialize>(
287 &mut self,
288 key: &'static str,
289 value: &T,
290 ) -> Result<(), serde_json::Error> {
291 let json_value = serde_json::to_value(value)?;
292
293 CONTEXT_STACK.with(|c| {
294 let mut stack = c.borrow_mut();
295 for entry in stack.iter_mut().rev() {
296 if Rc::ptr_eq(&entry.id, &self.id) {
297 entry.data.insert(key, json_value);
298 return;
299 }
300 }
301 panic!("EventContext missing on CONTEXT_STACK")
302 });
303
304 Ok(())
305 }
306
307 /// Returns a copy of the current context data.
308 ///
309 /// This method returns a snapshot of all key-value pairs stored in this context.
310 /// The returned [`ContextData`] can be used to seed new contexts or passed to
311 /// async tasks to maintain context across thread boundaries.
312 ///
313 /// # Examples
314 ///
315 /// ```rust
316 /// use es_entity::context::EventContext;
317 ///
318 /// let mut ctx = EventContext::current();
319 /// ctx.insert("request_id", &"abc123").unwrap();
320 ///
321 /// let data = ctx.data();
322 /// // data now contains a copy of the context with request_id
323 /// ```
324 pub fn data(&self) -> ContextData {
325 CONTEXT_STACK.with(|c| {
326 let stack = c.borrow();
327 for entry in stack.iter().rev() {
328 if Rc::ptr_eq(&entry.id, &self.id) {
329 return entry.data.clone();
330 }
331 }
332 panic!("EventContext missing on CONTEXT_STACK")
333 })
334 }
335
336 /// Serializes the current context data to JSON.
337 ///
338 /// This method is primarily used internally by the event persistence system
339 /// to store context data alongside events in the database. It converts all
340 /// context key-value pairs into a single JSON object.
341 ///
342 /// # Returns
343 ///
344 /// Returns a `serde_json::Value` containing all context data, or an error
345 /// if serialization fails.
346 ///
347 /// # Examples
348 ///
349 /// ```rust
350 /// use es_entity::context::EventContext;
351 ///
352 /// let mut ctx = EventContext::current();
353 /// ctx.insert("user_id", &"12345").unwrap();
354 ///
355 /// let json = ctx.as_json().unwrap();
356 /// // json is now: {"user_id": "12345"}
357 /// ```
358 pub fn as_json(&self) -> Result<serde_json::Value, serde_json::Error> {
359 let data = self.data();
360 serde_json::to_value(&data)
361 }
362}
363
364#[cfg(test)]
365mod tests {
366 use super::*;
367
368 fn stack_depth() -> usize {
369 CONTEXT_STACK.with(|c| c.borrow().len())
370 }
371
372 fn current_json() -> serde_json::Value {
373 EventContext::current().as_json().unwrap()
374 }
375
376 #[test]
377 fn assert_stack_depth() {
378 fn assert_inner() {
379 let _ctx = EventContext::current();
380 assert_eq!(stack_depth(), 1);
381 }
382 assert_eq!(stack_depth(), 0);
383 {
384 let _ctx = EventContext::current();
385 assert_eq!(stack_depth(), 1);
386 assert_inner();
387 }
388 assert_eq!(stack_depth(), 0);
389 }
390
391 #[test]
392 fn insert() {
393 fn insert_inner(value: &serde_json::Value) {
394 let mut ctx = EventContext::current();
395 ctx.insert("new_data", &value).unwrap();
396 assert_eq!(
397 current_json(),
398 serde_json::json!({ "data": value, "new_data": value})
399 );
400 }
401
402 let mut ctx = EventContext::current();
403 assert_eq!(current_json(), serde_json::json!({}));
404 let value = serde_json::json!({ "hello": "world" });
405 ctx.insert("data", &value).unwrap();
406 assert_eq!(current_json(), serde_json::json!({ "data": value }));
407 insert_inner(&value);
408 assert_eq!(
409 current_json(),
410 serde_json::json!({ "data": value, "new_data": value})
411 );
412 let new_value = serde_json::json!({ "hello": "new_world" });
413 ctx.insert("data", &new_value).unwrap();
414 assert_eq!(
415 current_json(),
416 serde_json::json!({ "data": new_value, "new_data": value})
417 );
418 }
419
420 #[test]
421 fn thread_isolation() {
422 let mut ctx = EventContext::current();
423 let value = serde_json::json!({ "main": "thread" });
424 ctx.insert("data", &value).unwrap();
425 assert_eq!(stack_depth(), 1);
426
427 let ctx_data = ctx.data();
428 let handle = std::thread::spawn(move || {
429 assert_eq!(stack_depth(), 0);
430 let mut ctx = EventContext::seed(ctx_data);
431 assert_eq!(stack_depth(), 1);
432 ctx.insert("thread", &serde_json::json!("local")).unwrap();
433 assert_eq!(
434 current_json(),
435 serde_json::json!({ "data": { "main": "thread" }, "thread": "local" }),
436 );
437 });
438
439 handle.join().unwrap();
440 assert_eq!(current_json(), serde_json::json!({ "data": value }));
441 }
442
443 #[tokio::test]
444 async fn async_context() {
445 async fn inner_async() {
446 let mut ctx = EventContext::current();
447 ctx.insert("async_inner", &serde_json::json!("value"))
448 .unwrap();
449 assert_eq!(
450 current_json(),
451 serde_json::json!({ "async_data": { "test": "async" }, "async_inner": "value" })
452 );
453 }
454
455 let mut ctx = EventContext::current();
456 assert_eq!(current_json(), serde_json::json!({}));
457
458 let value = serde_json::json!({ "test": "async" });
459 ctx.insert("async_data", &value).unwrap();
460 assert_eq!(current_json(), serde_json::json!({ "async_data": value }));
461
462 inner_async().await;
463
464 assert_eq!(
465 current_json(),
466 serde_json::json!({ "async_data": value, "async_inner": "value" })
467 );
468 }
469
470 #[test]
471 fn fork() {
472 let mut ctx = EventContext::current();
473 ctx.insert("original", &serde_json::json!("value")).unwrap();
474 assert_eq!(stack_depth(), 1);
475 assert_eq!(current_json(), serde_json::json!({ "original": "value" }));
476
477 let mut forked = EventContext::fork();
478 assert_eq!(stack_depth(), 2);
479 assert_eq!(current_json(), serde_json::json!({ "original": "value" }));
480
481 forked.insert("forked", &serde_json::json!("data")).unwrap();
482 assert_eq!(
483 current_json(),
484 serde_json::json!({ "original": "value", "forked": "data" })
485 );
486
487 drop(forked);
488
489 assert_eq!(stack_depth(), 1);
490 assert_eq!(current_json(), serde_json::json!({ "original": "value" }));
491 }
492
493 #[tokio::test]
494 async fn with_event_context_spawned() {
495 let mut ctx = EventContext::current();
496 ctx.insert("parent", &serde_json::json!("context")).unwrap();
497
498 let handle = tokio::spawn(
499 async {
500 assert_eq!(stack_depth(), 2);
501
502 EventContext::current()
503 .insert("spawned", &serde_json::json!("value"))
504 .unwrap();
505
506 assert_eq!(
507 current_json(),
508 serde_json::json!({ "parent": "context", "spawned": "value" })
509 );
510 tokio::task::yield_now().await;
511 current_json()
512 }
513 .with_event_context(ctx.data()),
514 );
515
516 let result = handle.await.unwrap();
517 assert_eq!(
518 result,
519 serde_json::json!({ "parent": "context", "spawned": "value" })
520 );
521
522 assert_eq!(current_json(), serde_json::json!({ "parent": "context" }));
523 }
524
525 #[tokio::test(flavor = "multi_thread")]
526 async fn with_event_context_spawned_multi_thread() {
527 let mut ctx = EventContext::current();
528 ctx.insert("parent", &serde_json::json!("context")).unwrap();
529
530 let handle = tokio::spawn(
531 async {
532 assert_eq!(stack_depth(), 1);
533
534 EventContext::current()
535 .insert("spawned", &serde_json::json!("value"))
536 .unwrap();
537
538 assert_eq!(
539 current_json(),
540 serde_json::json!({ "parent": "context", "spawned": "value" })
541 );
542 let data = EventContext::current().data();
543 tokio::task::yield_now().with_event_context(data).await;
544 current_json()
545 }
546 .with_event_context(ctx.data()),
547 );
548
549 let result = handle.await.unwrap();
550 assert_eq!(
551 result,
552 serde_json::json!({ "parent": "context", "spawned": "value" })
553 );
554
555 assert_eq!(current_json(), serde_json::json!({ "parent": "context" }));
556 }
557}