es_entity/context/mod.rs
1//! Thread-local system for adding context data to persisted events.
2//!
3//! This module provides a context propagation system for event sourcing that allows
4//! attaching metadata (like request IDs, user IDs, or audit information) to events
5//! as they are created and persisted to the database.
6//!
7//! # Core Components
8//!
9//! - [`EventContext`]: Thread-local context manager (`!Send`) that maintains a stack
10//! of contexts within a single thread
11//! - [`ContextData`]: Immutable, thread-safe (`Send`) snapshot of context data that
12//! can be passed across thread boundaries
13//! - [`WithEventContext`]: Extension trait for `Future` types to propagate context
14//! across async boundaries
15//!
16//! # Usage Patterns
17//!
18//! ## Same Thread Context
19//! ```rust
20//! use es_entity::context::EventContext;
21//!
22//! let mut ctx = EventContext::current();
23//! ctx.insert("request_id", &"req-123").unwrap();
24//!
25//! // Fork for isolated scope
26//! {
27//! let mut child = EventContext::fork();
28//! child.insert("operation", &"update").unwrap();
29//! // Both request_id and operation are available
30//! }
31//! // Only request_id remains in parent
32//! ```
33//!
34//! ## Async Task Context
35//! ```rust
36//! use es_entity::context::{EventContext, WithEventContext};
37//!
38//! async fn spawn_with_context() {
39//! let mut ctx = EventContext::current();
40//! ctx.insert("user_id", &"user-456").unwrap();
41//!
42//! let data = ctx.data();
43//! tokio::spawn(async move {
44//! // Context is available in spawned task
45//! let ctx = EventContext::current();
46//! // Has user_id from parent
47//! }.with_event_context(data)).await.unwrap();
48//! }
49//! ```
50//!
51//! ## Cross-Thread Context
52//! ```rust
53//! use es_entity::context::EventContext;
54//!
55//! let mut ctx = EventContext::current();
56//! ctx.insert("trace_id", &"trace-789").unwrap();
57//! let data = ctx.data();
58//!
59//! std::thread::spawn(move || {
60//! let ctx = EventContext::seed(data);
61//! // New thread has trace_id
62//! });
63//! ```
64//!
65//! # Database Integration
66//!
67//! When events are persisted using repositories with `event_context = true`, the current
68//! context is automatically serialized to JSON and stored in a `context` column
69//! alongside the event data, enabling comprehensive audit trails and debugging.
70
71mod sqlx;
72mod tracing;
73mod with_event_context;
74
75use serde::{Deserialize, Serialize};
76
77use std::{borrow::Cow, cell::RefCell, rc::Rc};
78
79pub use tracing::*;
80pub use with_event_context::*;
81
82/// Immutable context data that can be safely shared across thread boundaries.
83///
84/// This struct holds key-value pairs of context information that gets attached
85/// to events when they are persisted. It uses an immutable HashMap internally
86/// for efficient cloning and thread-safe sharing of data snapshots.
87///
88/// `ContextData` is `Send` and can be passed between threads, unlike [`EventContext`]
89/// which is thread-local. This makes it suitable for transferring context across
90/// async boundaries via the [`WithEventContext`] trait.
91#[derive(Debug, Clone, Serialize, Deserialize)]
92#[serde(transparent)]
93pub struct ContextData(im::HashMap<Cow<'static, str>, serde_json::Value>);
94
95impl ContextData {
96 fn new() -> Self {
97 Self(im::HashMap::new())
98 }
99
100 fn insert(&mut self, key: &'static str, value: serde_json::Value) {
101 self.0 = self.0.update(Cow::Borrowed(key), value);
102 }
103
104 #[cfg(feature = "tracing-context")]
105 pub(crate) fn with_tracing_info(mut self) -> Self {
106 let tracing = TracingContext::current();
107 self.insert(
108 "tracing",
109 serde_json::to_value(&tracing).expect("Could not inject tracing"),
110 );
111 self
112 }
113
114 pub fn lookup<T: serde::de::DeserializeOwned>(
115 &self,
116 key: &'static str,
117 ) -> Result<Option<T>, serde_json::Error> {
118 let Some(val) = self.0.get(key) else {
119 return Ok(None);
120 };
121 serde_json::from_value(val.clone()).map(Some)
122 }
123}
124
125struct StackEntry {
126 id: Rc<()>,
127 data: ContextData,
128}
129
130thread_local! {
131 static CONTEXT_STACK: RefCell<Vec<StackEntry>> = const { RefCell::new(Vec::new()) };
132}
133
134/// Thread-local event context for tracking metadata throughout event sourcing operations.
135///
136/// `EventContext` provides a way to attach contextual information (like request IDs, audit info,
137/// or operation metadata) to events as they are created and persisted. The context is managed
138/// as a thread-local stack, allowing for nested contexts within the same thread.
139///
140/// # Thread Safety
141///
142/// This struct is deliberately `!Send` to ensure thread-local safety. It uses `Rc` for reference
143/// counting which is not thread-safe. For propagating context across async boundaries or threads,
144/// use the [`WithEventContext`] trait which safely transfers context data.
145///
146/// # Usage Patterns
147///
148/// - **Same thread**: Use [`fork()`](Self::fork) to create isolated child contexts
149/// - **Async tasks**: Use [`with_event_context()`](WithEventContext::with_event_context) from the [`WithEventContext`] trait
150/// - **New threads**: Use [`seed()`](Self::seed) with data from [`data()`](Self::data) to transfer context
151///
152/// # Examples
153///
154/// ```rust
155/// use es_entity::context::EventContext;
156///
157/// // Create or get current context
158/// let mut ctx = EventContext::current();
159/// ctx.insert("user_id", &"123").unwrap();
160///
161/// // Fork for isolated scope
162/// {
163/// let mut child = EventContext::fork();
164/// child.insert("operation", &"update").unwrap();
165/// // Both user_id and operation are available here
166/// }
167/// // Only user_id remains in parent context
168/// ```
169pub struct EventContext {
170 id: Rc<()>,
171}
172
173impl Drop for EventContext {
174 fn drop(&mut self) {
175 // If strong_count is 2, it means this EventContext + one StackEntry reference
176 if Rc::strong_count(&self.id) == 2 {
177 CONTEXT_STACK.with(|c| {
178 let mut stack = c.borrow_mut();
179 for i in (0..stack.len()).rev() {
180 if Rc::ptr_eq(&stack[i].id, &self.id) {
181 stack.remove(i);
182 break;
183 }
184 }
185 });
186 }
187 }
188}
189
190impl EventContext {
191 /// Gets the current event context or creates a new one if none exists.
192 ///
193 /// This function is thread-local and will return a handle to the topmost context
194 /// on the current thread's context stack. If no context exists, it will create
195 /// a new empty context and push it onto the stack.
196 ///
197 /// # Examples
198 ///
199 /// ```rust
200 /// use es_entity::context::EventContext;
201 ///
202 /// let ctx = EventContext::current();
203 /// // Context is now available for the current thread
204 /// ```
205 pub fn current() -> Self {
206 CONTEXT_STACK.with(|c| {
207 let mut stack = c.borrow_mut();
208 if let Some(last) = stack.last() {
209 return EventContext {
210 id: last.id.clone(),
211 };
212 }
213
214 let id = Rc::new(());
215 let data = ContextData::new();
216 stack.push(StackEntry {
217 id: id.clone(),
218 data,
219 });
220
221 EventContext { id }
222 })
223 }
224
225 /// Creates a new event context seeded with the provided data.
226 ///
227 /// This creates a completely new context stack entry with the given context data,
228 /// independent of any existing context. This is useful for starting fresh contexts
229 /// in new threads or async tasks.
230 ///
231 /// # Arguments
232 ///
233 /// * `data` - The initial context data for the new context
234 ///
235 /// # Examples
236 ///
237 /// ```rust
238 /// use es_entity::context::{EventContext, ContextData};
239 ///
240 /// let data = EventContext::current().data();
241 /// let new_ctx = EventContext::seed(data);
242 /// // new_ctx now has its own independent context stack
243 /// ```
244 pub fn seed(data: ContextData) -> Self {
245 CONTEXT_STACK.with(|c| {
246 let mut stack = c.borrow_mut();
247 let id = Rc::new(());
248 stack.push(StackEntry {
249 id: id.clone(),
250 data,
251 });
252
253 EventContext { id }
254 })
255 }
256
257 /// Creates a new isolated context that inherits data from the current context.
258 ///
259 /// This method creates a child context that starts with a copy of the current
260 /// context's data. Changes made to the forked context will not affect the parent
261 /// context, and when the forked context is dropped, the parent context remains
262 /// unchanged. This is useful for creating isolated scopes within the same thread.
263 ///
264 /// # Examples
265 ///
266 /// ```rust
267 /// use es_entity::context::EventContext;
268 ///
269 /// let mut parent = EventContext::current();
270 /// parent.insert("shared", &"value").unwrap();
271 ///
272 /// {
273 /// let mut child = EventContext::fork();
274 /// child.insert("child_only", &"data").unwrap();
275 /// // child context has both "shared" and "child_only"
276 /// }
277 /// // parent context only has "shared" - "child_only" is gone
278 /// ```
279 pub fn fork() -> Self {
280 let current = Self::current();
281 let data = current.data();
282 Self::seed(data)
283 }
284
285 /// Inserts a key-value pair into the current context.
286 ///
287 /// The value will be serialized to JSON and stored in the context data.
288 /// This data will be available to all code running within this context
289 /// and any child contexts created via `fork()`.
290 ///
291 /// # Arguments
292 ///
293 /// * `key` - A static string key to identify the value
294 /// * `value` - Any serializable value to store in the context
295 ///
296 /// # Returns
297 ///
298 /// Returns `Ok(())` on success or a `serde_json::Error` if serialization fails.
299 ///
300 /// # Examples
301 ///
302 /// ```rust
303 /// use es_entity::context::EventContext;
304 ///
305 /// let mut ctx = EventContext::current();
306 /// ctx.insert("user_id", &"12345").unwrap();
307 /// ctx.insert("operation", &"transfer").unwrap();
308 /// ```
309 pub fn insert<T: Serialize>(
310 &mut self,
311 key: &'static str,
312 value: &T,
313 ) -> Result<(), serde_json::Error> {
314 let json_value = serde_json::to_value(value)?;
315
316 CONTEXT_STACK.with(|c| {
317 let mut stack = c.borrow_mut();
318 for entry in stack.iter_mut().rev() {
319 if Rc::ptr_eq(&entry.id, &self.id) {
320 entry.data.insert(key, json_value);
321 return;
322 }
323 }
324 panic!("EventContext missing on CONTEXT_STACK")
325 });
326
327 Ok(())
328 }
329
330 /// Returns a copy of the current context data.
331 ///
332 /// This method returns a snapshot of all key-value pairs stored in this context.
333 /// The returned [`ContextData`] can be used to seed new contexts or passed to
334 /// async tasks to maintain context across thread boundaries.
335 ///
336 /// # Examples
337 ///
338 /// ```rust
339 /// use es_entity::context::EventContext;
340 ///
341 /// let mut ctx = EventContext::current();
342 /// ctx.insert("request_id", &"abc123").unwrap();
343 ///
344 /// let data = ctx.data();
345 /// // data now contains a copy of the context with request_id
346 /// ```
347 pub fn data(&self) -> ContextData {
348 CONTEXT_STACK.with(|c| {
349 let stack = c.borrow();
350 for entry in stack.iter().rev() {
351 if Rc::ptr_eq(&entry.id, &self.id) {
352 return entry.data.clone();
353 }
354 }
355 panic!("EventContext missing on CONTEXT_STACK")
356 })
357 }
358
359 #[allow(unused_mut)]
360 pub(crate) fn data_for_storing() -> ContextData {
361 let mut data = Self::current().data();
362 #[cfg(feature = "tracing-context")]
363 {
364 data = data.with_tracing_info();
365 }
366 data
367 }
368}
369
370#[cfg(test)]
371mod tests {
372 use super::*;
373
374 fn stack_depth() -> usize {
375 CONTEXT_STACK.with(|c| c.borrow().len())
376 }
377
378 fn current_json() -> serde_json::Value {
379 serde_json::to_value(EventContext::current().data()).unwrap()
380 }
381
382 #[test]
383 fn assert_stack_depth() {
384 fn assert_inner() {
385 let _ctx = EventContext::current();
386 assert_eq!(stack_depth(), 1);
387 }
388 assert_eq!(stack_depth(), 0);
389 {
390 let _ctx = EventContext::current();
391 assert_eq!(stack_depth(), 1);
392 assert_inner();
393 }
394 assert_eq!(stack_depth(), 0);
395 }
396
397 #[test]
398 fn insert() {
399 fn insert_inner(value: &serde_json::Value) {
400 let mut ctx = EventContext::current();
401 ctx.insert("new_data", &value).unwrap();
402 assert_eq!(
403 current_json(),
404 serde_json::json!({ "data": value, "new_data": value})
405 );
406 }
407
408 let mut ctx = EventContext::current();
409 assert_eq!(current_json(), serde_json::json!({}));
410 let value = serde_json::json!({ "hello": "world" });
411 ctx.insert("data", &value).unwrap();
412 assert_eq!(current_json(), serde_json::json!({ "data": value }));
413 insert_inner(&value);
414 assert_eq!(
415 current_json(),
416 serde_json::json!({ "data": value, "new_data": value})
417 );
418 let new_value = serde_json::json!({ "hello": "new_world" });
419 ctx.insert("data", &new_value).unwrap();
420 assert_eq!(
421 current_json(),
422 serde_json::json!({ "data": new_value, "new_data": value})
423 );
424 }
425
426 #[test]
427 fn thread_isolation() {
428 let mut ctx = EventContext::current();
429 let value = serde_json::json!({ "main": "thread" });
430 ctx.insert("data", &value).unwrap();
431 assert_eq!(stack_depth(), 1);
432
433 let ctx_data = ctx.data();
434 let handle = std::thread::spawn(move || {
435 assert_eq!(stack_depth(), 0);
436 let mut ctx = EventContext::seed(ctx_data);
437 assert_eq!(stack_depth(), 1);
438 ctx.insert("thread", &serde_json::json!("local")).unwrap();
439 assert_eq!(
440 current_json(),
441 serde_json::json!({ "data": { "main": "thread" }, "thread": "local" }),
442 );
443 });
444
445 handle.join().unwrap();
446 assert_eq!(current_json(), serde_json::json!({ "data": value }));
447 }
448
449 #[tokio::test]
450 async fn async_context() {
451 async fn inner_async() {
452 let mut ctx = EventContext::current();
453 ctx.insert("async_inner", &serde_json::json!("value"))
454 .unwrap();
455 assert_eq!(
456 current_json(),
457 serde_json::json!({ "async_data": { "test": "async" }, "async_inner": "value" })
458 );
459 }
460
461 let mut ctx = EventContext::current();
462 assert_eq!(current_json(), serde_json::json!({}));
463
464 let value = serde_json::json!({ "test": "async" });
465 ctx.insert("async_data", &value).unwrap();
466 assert_eq!(current_json(), serde_json::json!({ "async_data": value }));
467
468 inner_async().await;
469
470 assert_eq!(
471 current_json(),
472 serde_json::json!({ "async_data": value, "async_inner": "value" })
473 );
474 }
475
476 #[test]
477 fn fork() {
478 let mut ctx = EventContext::current();
479 ctx.insert("original", &serde_json::json!("value")).unwrap();
480 assert_eq!(stack_depth(), 1);
481 assert_eq!(current_json(), serde_json::json!({ "original": "value" }));
482
483 let mut forked = EventContext::fork();
484 assert_eq!(stack_depth(), 2);
485 assert_eq!(current_json(), serde_json::json!({ "original": "value" }));
486
487 forked.insert("forked", &serde_json::json!("data")).unwrap();
488 assert_eq!(
489 current_json(),
490 serde_json::json!({ "original": "value", "forked": "data" })
491 );
492
493 drop(forked);
494
495 assert_eq!(stack_depth(), 1);
496 assert_eq!(current_json(), serde_json::json!({ "original": "value" }));
497 }
498
499 #[tokio::test]
500 async fn with_event_context_spawned() {
501 let mut ctx = EventContext::current();
502 ctx.insert("parent", &serde_json::json!("context")).unwrap();
503
504 let handle = tokio::spawn(
505 async {
506 assert_eq!(stack_depth(), 2);
507
508 EventContext::current()
509 .insert("spawned", &serde_json::json!("value"))
510 .unwrap();
511
512 assert_eq!(
513 current_json(),
514 serde_json::json!({ "parent": "context", "spawned": "value" })
515 );
516 tokio::task::yield_now().await;
517 current_json()
518 }
519 .with_event_context(ctx.data()),
520 );
521
522 let result = handle.await.unwrap();
523 assert_eq!(
524 result,
525 serde_json::json!({ "parent": "context", "spawned": "value" })
526 );
527
528 assert_eq!(current_json(), serde_json::json!({ "parent": "context" }));
529 }
530
531 #[tokio::test(flavor = "multi_thread")]
532 async fn with_event_context_spawned_multi_thread() {
533 let mut ctx = EventContext::current();
534 ctx.insert("parent", &serde_json::json!("context")).unwrap();
535
536 let handle = tokio::spawn(
537 async {
538 assert_eq!(stack_depth(), 1);
539
540 EventContext::current()
541 .insert("spawned", &serde_json::json!("value"))
542 .unwrap();
543
544 assert_eq!(
545 current_json(),
546 serde_json::json!({ "parent": "context", "spawned": "value" })
547 );
548 let data = EventContext::current().data();
549 tokio::task::yield_now().with_event_context(data).await;
550 current_json()
551 }
552 .with_event_context(ctx.data()),
553 );
554
555 let result = handle.await.unwrap();
556 assert_eq!(
557 result,
558 serde_json::json!({ "parent": "context", "spawned": "value" })
559 );
560
561 assert_eq!(current_json(), serde_json::json!({ "parent": "context" }));
562 }
563}