es_entity/context/mod.rs
1//! Thread-local system for adding context data to persisted events.
2//!
3//! This module provides a context propagation system for event sourcing that allows
4//! attaching metadata (like request IDs, user IDs, or audit information) to events
5//! as they are created and persisted to the database.
6//!
7//! # Core Components
8//!
9//! - [`EventContext`]: Thread-local context manager (`!Send`) that maintains a stack
10//! of contexts within a single thread
11//! - [`ContextData`]: Immutable, thread-safe (`Send`) snapshot of context data that
12//! can be passed across thread boundaries
13//! - [`WithEventContext`]: Extension trait for `Future` types to propagate context
14//! across async boundaries
15//!
16//! # Usage Patterns
17//!
18//! ## Same Thread Context
19//! ```rust
20//! use es_entity::context::EventContext;
21//!
22//! let mut ctx = EventContext::current();
23//! ctx.insert("request_id", &"req-123").unwrap();
24//!
25//! // Fork for isolated scope
26//! {
27//! let mut child = EventContext::fork();
28//! child.insert("operation", &"update").unwrap();
29//! // Both request_id and operation are available
30//! }
31//! // Only request_id remains in parent
32//! ```
33//!
34//! ## Async Task Context
35//! ```rust
36//! use es_entity::context::{EventContext, WithEventContext};
37//!
38//! async fn spawn_with_context() {
39//! let mut ctx = EventContext::current();
40//! ctx.insert("user_id", &"user-456").unwrap();
41//!
42//! let data = ctx.data();
43//! tokio::spawn(async move {
44//! // Context is available in spawned task
45//! let ctx = EventContext::current();
46//! // Has user_id from parent
47//! }.with_event_context(data)).await.unwrap();
48//! }
49//! ```
50//!
51//! ## Cross-Thread Context
52//! ```rust
53//! use es_entity::context::EventContext;
54//!
55//! let mut ctx = EventContext::current();
56//! ctx.insert("trace_id", &"trace-789").unwrap();
57//! let data = ctx.data();
58//!
59//! std::thread::spawn(move || {
60//! let ctx = EventContext::seed(data);
61//! // New thread has trace_id
62//! });
63//! ```
64//!
65//! # Database Integration
66//!
67//! When events are persisted using repositories with `event_context = true`, the current
68//! context is automatically serialized to JSON and stored in a `context` column
69//! alongside the event data, enabling comprehensive audit trails and debugging.
70
71mod sqlx;
72#[cfg(feature = "tracing-context")]
73mod tracing;
74mod with_event_context;
75
76use serde::{Deserialize, Serialize};
77
78use std::{borrow::Cow, cell::RefCell, rc::Rc};
79
80#[cfg(feature = "tracing-context")]
81pub use tracing::*;
82pub use with_event_context::*;
83
84/// Immutable context data that can be safely shared across thread boundaries.
85///
86/// This struct holds key-value pairs of context information that gets attached
87/// to events when they are persisted. It uses an immutable HashMap internally
88/// for efficient cloning and thread-safe sharing of data snapshots.
89///
90/// `ContextData` is `Send` and can be passed between threads, unlike [`EventContext`]
91/// which is thread-local. This makes it suitable for transferring context across
92/// async boundaries via the [`WithEventContext`] trait.
93#[derive(Debug, Clone, Serialize, Deserialize)]
94#[serde(transparent)]
95pub struct ContextData(im::HashMap<Cow<'static, str>, serde_json::Value>);
96
97impl ContextData {
98 fn new() -> Self {
99 Self(im::HashMap::new())
100 }
101
102 fn insert(&mut self, key: &'static str, value: serde_json::Value) {
103 self.0 = self.0.update(Cow::Borrowed(key), value);
104 }
105
106 #[cfg(feature = "tracing-context")]
107 pub(crate) fn with_tracing_info(mut self) -> Self {
108 let tracing = TracingContext::current();
109 self.insert(
110 "tracing",
111 serde_json::to_value(&tracing).expect("Could not inject tracing"),
112 );
113 self
114 }
115
116 pub fn lookup<T: serde::de::DeserializeOwned>(
117 &self,
118 key: &'static str,
119 ) -> Result<Option<T>, serde_json::Error> {
120 let Some(val) = self.0.get(key) else {
121 return Ok(None);
122 };
123 serde_json::from_value(val.clone()).map(Some)
124 }
125}
126
127struct StackEntry {
128 id: Rc<()>,
129 data: ContextData,
130}
131
132thread_local! {
133 static CONTEXT_STACK: RefCell<Vec<StackEntry>> = const { RefCell::new(Vec::new()) };
134}
135
136/// Thread-local event context for tracking metadata throughout event sourcing operations.
137///
138/// `EventContext` provides a way to attach contextual information (like request IDs, audit info,
139/// or operation metadata) to events as they are created and persisted. The context is managed
140/// as a thread-local stack, allowing for nested contexts within the same thread.
141///
142/// # Thread Safety
143///
144/// This struct is deliberately `!Send` to ensure thread-local safety. It uses `Rc` for reference
145/// counting which is not thread-safe. For propagating context across async boundaries or threads,
146/// use the [`WithEventContext`] trait which safely transfers context data.
147///
148/// # Usage Patterns
149///
150/// - **Same thread**: Use [`fork()`](Self::fork) to create isolated child contexts
151/// - **Async tasks**: Use [`with_event_context()`](WithEventContext::with_event_context) from the [`WithEventContext`] trait
152/// - **New threads**: Use [`seed()`](Self::seed) with data from [`data()`](Self::data) to transfer context
153///
154/// # Examples
155///
156/// ```rust
157/// use es_entity::context::EventContext;
158///
159/// // Create or get current context
160/// let mut ctx = EventContext::current();
161/// ctx.insert("user_id", &"123").unwrap();
162///
163/// // Fork for isolated scope
164/// {
165/// let mut child = EventContext::fork();
166/// child.insert("operation", &"update").unwrap();
167/// // Both user_id and operation are available here
168/// }
169/// // Only user_id remains in parent context
170/// ```
171pub struct EventContext {
172 id: Rc<()>,
173}
174
175impl Drop for EventContext {
176 fn drop(&mut self) {
177 // If strong_count is 2, it means this EventContext + one StackEntry reference
178 if Rc::strong_count(&self.id) == 2 {
179 CONTEXT_STACK.with(|c| {
180 let mut stack = c.borrow_mut();
181 for i in (0..stack.len()).rev() {
182 if Rc::ptr_eq(&stack[i].id, &self.id) {
183 stack.remove(i);
184 break;
185 }
186 }
187 });
188 }
189 }
190}
191
192impl EventContext {
193 /// Gets the current event context or creates a new one if none exists.
194 ///
195 /// This function is thread-local and will return a handle to the topmost context
196 /// on the current thread's context stack. If no context exists, it will create
197 /// a new empty context and push it onto the stack.
198 ///
199 /// # Examples
200 ///
201 /// ```rust
202 /// use es_entity::context::EventContext;
203 ///
204 /// let ctx = EventContext::current();
205 /// // Context is now available for the current thread
206 /// ```
207 pub fn current() -> Self {
208 CONTEXT_STACK.with(|c| {
209 let mut stack = c.borrow_mut();
210 if let Some(last) = stack.last() {
211 return EventContext {
212 id: last.id.clone(),
213 };
214 }
215
216 let id = Rc::new(());
217 let data = ContextData::new();
218 stack.push(StackEntry {
219 id: id.clone(),
220 data,
221 });
222
223 EventContext { id }
224 })
225 }
226
227 /// Creates a new event context seeded with the provided data.
228 ///
229 /// This creates a completely new context stack entry with the given context data,
230 /// independent of any existing context. This is useful for starting fresh contexts
231 /// in new threads or async tasks.
232 ///
233 /// # Arguments
234 ///
235 /// * `data` - The initial context data for the new context
236 ///
237 /// # Examples
238 ///
239 /// ```rust
240 /// use es_entity::context::{EventContext, ContextData};
241 ///
242 /// let data = EventContext::current().data();
243 /// let new_ctx = EventContext::seed(data);
244 /// // new_ctx now has its own independent context stack
245 /// ```
246 pub fn seed(data: ContextData) -> Self {
247 CONTEXT_STACK.with(|c| {
248 let mut stack = c.borrow_mut();
249 let id = Rc::new(());
250 stack.push(StackEntry {
251 id: id.clone(),
252 data,
253 });
254
255 EventContext { id }
256 })
257 }
258
259 /// Creates a new isolated context that inherits data from the current context.
260 ///
261 /// This method creates a child context that starts with a copy of the current
262 /// context's data. Changes made to the forked context will not affect the parent
263 /// context, and when the forked context is dropped, the parent context remains
264 /// unchanged. This is useful for creating isolated scopes within the same thread.
265 ///
266 /// # Examples
267 ///
268 /// ```rust
269 /// use es_entity::context::EventContext;
270 ///
271 /// let mut parent = EventContext::current();
272 /// parent.insert("shared", &"value").unwrap();
273 ///
274 /// {
275 /// let mut child = EventContext::fork();
276 /// child.insert("child_only", &"data").unwrap();
277 /// // child context has both "shared" and "child_only"
278 /// }
279 /// // parent context only has "shared" - "child_only" is gone
280 /// ```
281 pub fn fork() -> Self {
282 let current = Self::current();
283 let data = current.data();
284 Self::seed(data)
285 }
286
287 /// Inserts a key-value pair into the current context.
288 ///
289 /// The value will be serialized to JSON and stored in the context data.
290 /// This data will be available to all code running within this context
291 /// and any child contexts created via `fork()`.
292 ///
293 /// # Arguments
294 ///
295 /// * `key` - A static string key to identify the value
296 /// * `value` - Any serializable value to store in the context
297 ///
298 /// # Returns
299 ///
300 /// Returns `Ok(())` on success or a `serde_json::Error` if serialization fails.
301 ///
302 /// # Examples
303 ///
304 /// ```rust
305 /// use es_entity::context::EventContext;
306 ///
307 /// let mut ctx = EventContext::current();
308 /// ctx.insert("user_id", &"12345").unwrap();
309 /// ctx.insert("operation", &"transfer").unwrap();
310 /// ```
311 pub fn insert<T: Serialize>(
312 &mut self,
313 key: &'static str,
314 value: &T,
315 ) -> Result<(), serde_json::Error> {
316 let json_value = serde_json::to_value(value)?;
317
318 CONTEXT_STACK.with(|c| {
319 let mut stack = c.borrow_mut();
320 for entry in stack.iter_mut().rev() {
321 if Rc::ptr_eq(&entry.id, &self.id) {
322 entry.data.insert(key, json_value);
323 return;
324 }
325 }
326 panic!("EventContext missing on CONTEXT_STACK")
327 });
328
329 Ok(())
330 }
331
332 /// Returns a copy of the current context data.
333 ///
334 /// This method returns a snapshot of all key-value pairs stored in this context.
335 /// The returned [`ContextData`] can be used to seed new contexts or passed to
336 /// async tasks to maintain context across thread boundaries.
337 ///
338 /// # Examples
339 ///
340 /// ```rust
341 /// use es_entity::context::EventContext;
342 ///
343 /// let mut ctx = EventContext::current();
344 /// ctx.insert("request_id", &"abc123").unwrap();
345 ///
346 /// let data = ctx.data();
347 /// // data now contains a copy of the context with request_id
348 /// ```
349 pub fn data(&self) -> ContextData {
350 CONTEXT_STACK.with(|c| {
351 let stack = c.borrow();
352 for entry in stack.iter().rev() {
353 if Rc::ptr_eq(&entry.id, &self.id) {
354 return entry.data.clone();
355 }
356 }
357 panic!("EventContext missing on CONTEXT_STACK")
358 })
359 }
360
361 #[allow(unused_mut)]
362 pub(crate) fn data_for_storing() -> ContextData {
363 let mut data = Self::current().data();
364 #[cfg(feature = "tracing-context")]
365 {
366 data = data.with_tracing_info();
367 }
368 data
369 }
370}
371
372#[cfg(test)]
373mod tests {
374 use super::*;
375
376 fn stack_depth() -> usize {
377 CONTEXT_STACK.with(|c| c.borrow().len())
378 }
379
380 fn current_json() -> serde_json::Value {
381 serde_json::to_value(EventContext::current().data()).unwrap()
382 }
383
384 #[test]
385 fn assert_stack_depth() {
386 fn assert_inner() {
387 let _ctx = EventContext::current();
388 assert_eq!(stack_depth(), 1);
389 }
390 assert_eq!(stack_depth(), 0);
391 {
392 let _ctx = EventContext::current();
393 assert_eq!(stack_depth(), 1);
394 assert_inner();
395 }
396 assert_eq!(stack_depth(), 0);
397 }
398
399 #[test]
400 fn insert() {
401 fn insert_inner(value: &serde_json::Value) {
402 let mut ctx = EventContext::current();
403 ctx.insert("new_data", &value).unwrap();
404 assert_eq!(
405 current_json(),
406 serde_json::json!({ "data": value, "new_data": value})
407 );
408 }
409
410 let mut ctx = EventContext::current();
411 assert_eq!(current_json(), serde_json::json!({}));
412 let value = serde_json::json!({ "hello": "world" });
413 ctx.insert("data", &value).unwrap();
414 assert_eq!(current_json(), serde_json::json!({ "data": value }));
415 insert_inner(&value);
416 assert_eq!(
417 current_json(),
418 serde_json::json!({ "data": value, "new_data": value})
419 );
420 let new_value = serde_json::json!({ "hello": "new_world" });
421 ctx.insert("data", &new_value).unwrap();
422 assert_eq!(
423 current_json(),
424 serde_json::json!({ "data": new_value, "new_data": value})
425 );
426 }
427
428 #[test]
429 fn thread_isolation() {
430 let mut ctx = EventContext::current();
431 let value = serde_json::json!({ "main": "thread" });
432 ctx.insert("data", &value).unwrap();
433 assert_eq!(stack_depth(), 1);
434
435 let ctx_data = ctx.data();
436 let handle = std::thread::spawn(move || {
437 assert_eq!(stack_depth(), 0);
438 let mut ctx = EventContext::seed(ctx_data);
439 assert_eq!(stack_depth(), 1);
440 ctx.insert("thread", &serde_json::json!("local")).unwrap();
441 assert_eq!(
442 current_json(),
443 serde_json::json!({ "data": { "main": "thread" }, "thread": "local" }),
444 );
445 });
446
447 handle.join().unwrap();
448 assert_eq!(current_json(), serde_json::json!({ "data": value }));
449 }
450
451 #[tokio::test]
452 async fn async_context() {
453 async fn inner_async() {
454 let mut ctx = EventContext::current();
455 ctx.insert("async_inner", &serde_json::json!("value"))
456 .unwrap();
457 assert_eq!(
458 current_json(),
459 serde_json::json!({ "async_data": { "test": "async" }, "async_inner": "value" })
460 );
461 }
462
463 let mut ctx = EventContext::current();
464 assert_eq!(current_json(), serde_json::json!({}));
465
466 let value = serde_json::json!({ "test": "async" });
467 ctx.insert("async_data", &value).unwrap();
468 assert_eq!(current_json(), serde_json::json!({ "async_data": value }));
469
470 inner_async().await;
471
472 assert_eq!(
473 current_json(),
474 serde_json::json!({ "async_data": value, "async_inner": "value" })
475 );
476 }
477
478 #[test]
479 fn fork() {
480 let mut ctx = EventContext::current();
481 ctx.insert("original", &serde_json::json!("value")).unwrap();
482 assert_eq!(stack_depth(), 1);
483 assert_eq!(current_json(), serde_json::json!({ "original": "value" }));
484
485 let mut forked = EventContext::fork();
486 assert_eq!(stack_depth(), 2);
487 assert_eq!(current_json(), serde_json::json!({ "original": "value" }));
488
489 forked.insert("forked", &serde_json::json!("data")).unwrap();
490 assert_eq!(
491 current_json(),
492 serde_json::json!({ "original": "value", "forked": "data" })
493 );
494
495 drop(forked);
496
497 assert_eq!(stack_depth(), 1);
498 assert_eq!(current_json(), serde_json::json!({ "original": "value" }));
499 }
500
501 #[tokio::test]
502 async fn with_event_context_spawned() {
503 let mut ctx = EventContext::current();
504 ctx.insert("parent", &serde_json::json!("context")).unwrap();
505
506 let handle = tokio::spawn(
507 async {
508 assert_eq!(stack_depth(), 2);
509
510 EventContext::current()
511 .insert("spawned", &serde_json::json!("value"))
512 .unwrap();
513
514 assert_eq!(
515 current_json(),
516 serde_json::json!({ "parent": "context", "spawned": "value" })
517 );
518 tokio::task::yield_now().await;
519 current_json()
520 }
521 .with_event_context(ctx.data()),
522 );
523
524 let result = handle.await.unwrap();
525 assert_eq!(
526 result,
527 serde_json::json!({ "parent": "context", "spawned": "value" })
528 );
529
530 assert_eq!(current_json(), serde_json::json!({ "parent": "context" }));
531 }
532
533 #[tokio::test(flavor = "multi_thread")]
534 async fn with_event_context_spawned_multi_thread() {
535 let mut ctx = EventContext::current();
536 ctx.insert("parent", &serde_json::json!("context")).unwrap();
537
538 let handle = tokio::spawn(
539 async {
540 assert_eq!(stack_depth(), 1);
541
542 EventContext::current()
543 .insert("spawned", &serde_json::json!("value"))
544 .unwrap();
545
546 assert_eq!(
547 current_json(),
548 serde_json::json!({ "parent": "context", "spawned": "value" })
549 );
550 let data = EventContext::current().data();
551 tokio::task::yield_now().with_event_context(data).await;
552 current_json()
553 }
554 .with_event_context(ctx.data()),
555 );
556
557 let result = handle.await.unwrap();
558 assert_eq!(
559 result,
560 serde_json::json!({ "parent": "context", "spawned": "value" })
561 );
562
563 assert_eq!(current_json(), serde_json::json!({ "parent": "context" }));
564 }
565}