mod sqlx;
mod tracing;
mod with_event_context;
use serde::{Deserialize, Serialize};
use std::{borrow::Cow, cell::RefCell, rc::Rc};
pub use tracing::*;
pub use with_event_context::*;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(transparent)]
pub struct ContextData(im::HashMap<Cow<'static, str>, serde_json::Value>);
impl ContextData {
fn new() -> Self {
Self(im::HashMap::new())
}
fn insert(&mut self, key: &'static str, value: serde_json::Value) {
self.0 = self.0.update(Cow::Borrowed(key), value);
}
#[cfg(feature = "tracing-context")]
pub(crate) fn with_tracing_info(mut self) -> Self {
let tracing = TracingContext::current();
self.insert(
"tracing",
serde_json::to_value(&tracing).expect("Could not inject tracing"),
);
self
}
pub fn lookup<T: serde::de::DeserializeOwned>(
&self,
key: &'static str,
) -> Result<Option<T>, serde_json::Error> {
let Some(val) = self.0.get(key) else {
return Ok(None);
};
serde_json::from_value(val.clone()).map(Some)
}
}
struct StackEntry {
id: Rc<()>,
data: ContextData,
}
thread_local! {
static CONTEXT_STACK: RefCell<Vec<StackEntry>> = const { RefCell::new(Vec::new()) };
}
pub struct EventContext {
id: Rc<()>,
}
impl Drop for EventContext {
fn drop(&mut self) {
if Rc::strong_count(&self.id) == 2 {
CONTEXT_STACK.with(|c| {
let mut stack = c.borrow_mut();
for i in (0..stack.len()).rev() {
if Rc::ptr_eq(&stack[i].id, &self.id) {
stack.remove(i);
break;
}
}
});
}
}
}
impl EventContext {
pub fn current() -> Self {
CONTEXT_STACK.with(|c| {
let mut stack = c.borrow_mut();
if let Some(last) = stack.last() {
return EventContext {
id: last.id.clone(),
};
}
let id = Rc::new(());
let data = ContextData::new();
stack.push(StackEntry {
id: id.clone(),
data,
});
EventContext { id }
})
}
pub fn seed(data: ContextData) -> Self {
CONTEXT_STACK.with(|c| {
let mut stack = c.borrow_mut();
let id = Rc::new(());
stack.push(StackEntry {
id: id.clone(),
data,
});
EventContext { id }
})
}
pub fn fork() -> Self {
let current = Self::current();
let data = current.data();
Self::seed(data)
}
pub fn insert<T: Serialize>(
&mut self,
key: &'static str,
value: &T,
) -> Result<(), serde_json::Error> {
let json_value = serde_json::to_value(value)?;
CONTEXT_STACK.with(|c| {
let mut stack = c.borrow_mut();
for entry in stack.iter_mut().rev() {
if Rc::ptr_eq(&entry.id, &self.id) {
entry.data.insert(key, json_value);
return;
}
}
panic!("EventContext missing on CONTEXT_STACK")
});
Ok(())
}
pub fn data(&self) -> ContextData {
CONTEXT_STACK.with(|c| {
let stack = c.borrow();
for entry in stack.iter().rev() {
if Rc::ptr_eq(&entry.id, &self.id) {
return entry.data.clone();
}
}
panic!("EventContext missing on CONTEXT_STACK")
})
}
#[allow(unused_mut)]
pub(crate) fn data_for_storing() -> ContextData {
let mut data = Self::current().data();
#[cfg(feature = "tracing-context")]
{
data = data.with_tracing_info();
}
data
}
}
#[cfg(test)]
mod tests {
use super::*;
fn stack_depth() -> usize {
CONTEXT_STACK.with(|c| c.borrow().len())
}
fn current_json() -> serde_json::Value {
serde_json::to_value(EventContext::current().data()).unwrap()
}
#[test]
fn assert_stack_depth() {
fn assert_inner() {
let _ctx = EventContext::current();
assert_eq!(stack_depth(), 1);
}
assert_eq!(stack_depth(), 0);
{
let _ctx = EventContext::current();
assert_eq!(stack_depth(), 1);
assert_inner();
}
assert_eq!(stack_depth(), 0);
}
#[test]
fn insert() {
fn insert_inner(value: &serde_json::Value) {
let mut ctx = EventContext::current();
ctx.insert("new_data", &value).unwrap();
assert_eq!(
current_json(),
serde_json::json!({ "data": value, "new_data": value})
);
}
let mut ctx = EventContext::current();
assert_eq!(current_json(), serde_json::json!({}));
let value = serde_json::json!({ "hello": "world" });
ctx.insert("data", &value).unwrap();
assert_eq!(current_json(), serde_json::json!({ "data": value }));
insert_inner(&value);
assert_eq!(
current_json(),
serde_json::json!({ "data": value, "new_data": value})
);
let new_value = serde_json::json!({ "hello": "new_world" });
ctx.insert("data", &new_value).unwrap();
assert_eq!(
current_json(),
serde_json::json!({ "data": new_value, "new_data": value})
);
}
#[test]
fn thread_isolation() {
let mut ctx = EventContext::current();
let value = serde_json::json!({ "main": "thread" });
ctx.insert("data", &value).unwrap();
assert_eq!(stack_depth(), 1);
let ctx_data = ctx.data();
let handle = std::thread::spawn(move || {
assert_eq!(stack_depth(), 0);
let mut ctx = EventContext::seed(ctx_data);
assert_eq!(stack_depth(), 1);
ctx.insert("thread", &serde_json::json!("local")).unwrap();
assert_eq!(
current_json(),
serde_json::json!({ "data": { "main": "thread" }, "thread": "local" }),
);
});
handle.join().unwrap();
assert_eq!(current_json(), serde_json::json!({ "data": value }));
}
#[tokio::test]
async fn async_context() {
async fn inner_async() {
let mut ctx = EventContext::current();
ctx.insert("async_inner", &serde_json::json!("value"))
.unwrap();
assert_eq!(
current_json(),
serde_json::json!({ "async_data": { "test": "async" }, "async_inner": "value" })
);
}
let mut ctx = EventContext::current();
assert_eq!(current_json(), serde_json::json!({}));
let value = serde_json::json!({ "test": "async" });
ctx.insert("async_data", &value).unwrap();
assert_eq!(current_json(), serde_json::json!({ "async_data": value }));
inner_async().await;
assert_eq!(
current_json(),
serde_json::json!({ "async_data": value, "async_inner": "value" })
);
}
#[test]
fn fork() {
let mut ctx = EventContext::current();
ctx.insert("original", &serde_json::json!("value")).unwrap();
assert_eq!(stack_depth(), 1);
assert_eq!(current_json(), serde_json::json!({ "original": "value" }));
let mut forked = EventContext::fork();
assert_eq!(stack_depth(), 2);
assert_eq!(current_json(), serde_json::json!({ "original": "value" }));
forked.insert("forked", &serde_json::json!("data")).unwrap();
assert_eq!(
current_json(),
serde_json::json!({ "original": "value", "forked": "data" })
);
drop(forked);
assert_eq!(stack_depth(), 1);
assert_eq!(current_json(), serde_json::json!({ "original": "value" }));
}
#[tokio::test]
async fn with_event_context_spawned() {
let mut ctx = EventContext::current();
ctx.insert("parent", &serde_json::json!("context")).unwrap();
let handle = tokio::spawn(
async {
assert_eq!(stack_depth(), 2);
EventContext::current()
.insert("spawned", &serde_json::json!("value"))
.unwrap();
assert_eq!(
current_json(),
serde_json::json!({ "parent": "context", "spawned": "value" })
);
tokio::task::yield_now().await;
current_json()
}
.with_event_context(ctx.data()),
);
let result = handle.await.unwrap();
assert_eq!(
result,
serde_json::json!({ "parent": "context", "spawned": "value" })
);
assert_eq!(current_json(), serde_json::json!({ "parent": "context" }));
}
#[tokio::test(flavor = "multi_thread")]
async fn with_event_context_spawned_multi_thread() {
let mut ctx = EventContext::current();
ctx.insert("parent", &serde_json::json!("context")).unwrap();
let handle = tokio::spawn(
async {
assert_eq!(stack_depth(), 1);
EventContext::current()
.insert("spawned", &serde_json::json!("value"))
.unwrap();
assert_eq!(
current_json(),
serde_json::json!({ "parent": "context", "spawned": "value" })
);
let data = EventContext::current().data();
tokio::task::yield_now().with_event_context(data).await;
current_json()
}
.with_event_context(ctx.data()),
);
let result = handle.await.unwrap();
assert_eq!(
result,
serde_json::json!({ "parent": "context", "spawned": "value" })
);
assert_eq!(current_json(), serde_json::json!({ "parent": "context" }));
}
}