#![cfg(feature = "trace")]
use std::sync::Arc;
use std::time::Duration;
use jaeb::{EventBus, EventHandler, HandlerResult, SyncEventHandler};
use tokio::sync::Notify;
use tracing::Span;
#[derive(Clone, Debug)]
struct TracedEvent {
#[allow(dead_code)]
value: usize,
}
struct SpanCapturingAsyncHandler {
captured: Arc<tokio::sync::Mutex<Option<tracing::Id>>>,
done: Arc<Notify>,
}
impl EventHandler<TracedEvent> for SpanCapturingAsyncHandler {
async fn handle(&self, _event: &TracedEvent) -> HandlerResult {
let current = Span::current();
let id = current.id();
*self.captured.lock().await = id;
self.done.notify_one();
Ok(())
}
}
struct SpanCapturingSyncHandler {
captured: Arc<std::sync::Mutex<Option<tracing::Id>>>,
}
impl SyncEventHandler<TracedEvent> for SpanCapturingSyncHandler {
fn handle(&self, _event: &TracedEvent) -> HandlerResult {
let current = Span::current();
let id = current.id();
*self.captured.lock().expect("lock") = id;
Ok(())
}
}
#[tokio::test]
async fn async_handler_inherits_publish_span() {
let subscriber = tracing_subscriber::fmt()
.with_max_level(tracing::Level::TRACE)
.with_test_writer()
.finish();
let _guard = tracing::subscriber::set_default(subscriber);
let bus = EventBus::builder().buffer_size(16).build().await.expect("valid config");
let captured = Arc::new(tokio::sync::Mutex::new(None));
let done = Arc::new(Notify::new());
let _ = bus
.subscribe(SpanCapturingAsyncHandler {
captured: Arc::clone(&captured),
done: Arc::clone(&done),
})
.await
.expect("subscribe");
let outer_span = tracing::info_span!("test.outer");
let outer_id = outer_span.id();
assert!(outer_id.is_some(), "subscriber must allocate span IDs for this test");
{
let _entered = outer_span.enter();
bus.publish(TracedEvent { value: 1 }).await.expect("publish");
}
tokio::time::timeout(Duration::from_secs(2), done.notified())
.await
.expect("handler should complete within timeout");
bus.shutdown().await.expect("shutdown");
let handler_span_id = captured.lock().await.clone();
assert!(handler_span_id.is_some(), "handler should have an active span (not None)");
assert_eq!(handler_span_id, outer_id, "async handler should inherit the publisher's span");
}
#[tokio::test]
async fn sync_handler_sees_publish_span() {
let subscriber = tracing_subscriber::fmt()
.with_max_level(tracing::Level::TRACE)
.with_test_writer()
.finish();
let _guard = tracing::subscriber::set_default(subscriber);
let bus = EventBus::builder().buffer_size(16).build().await.expect("valid config");
let captured = Arc::new(std::sync::Mutex::new(None));
let _ = bus
.subscribe(SpanCapturingSyncHandler {
captured: Arc::clone(&captured),
})
.await
.expect("subscribe");
let outer_span = tracing::info_span!("test.sync_outer");
let outer_id = outer_span.id();
{
let _entered = outer_span.enter();
bus.publish(TracedEvent { value: 2 }).await.expect("publish");
}
bus.shutdown().await.expect("shutdown");
let handler_span_id = captured.lock().expect("lock").clone();
assert!(handler_span_id.is_some(), "sync handler should have an active span");
assert_eq!(handler_span_id, outer_id, "sync handler should see the publisher's span");
}
#[tokio::test]
async fn async_handler_works_without_active_span() {
let subscriber = tracing_subscriber::fmt()
.with_max_level(tracing::Level::TRACE)
.with_test_writer()
.finish();
let _guard = tracing::subscriber::set_default(subscriber);
let bus = EventBus::builder().buffer_size(16).build().await.expect("valid config");
let captured = Arc::new(tokio::sync::Mutex::new(None));
let done = Arc::new(Notify::new());
let _ = bus
.subscribe(SpanCapturingAsyncHandler {
captured: Arc::clone(&captured),
done: Arc::clone(&done),
})
.await
.expect("subscribe");
bus.publish(TracedEvent { value: 3 }).await.expect("publish");
tokio::time::timeout(Duration::from_secs(2), done.notified())
.await
.expect("handler should complete within timeout");
bus.shutdown().await.expect("shutdown");
let handler_span_id = captured.lock().await.clone();
assert!(handler_span_id.is_none(), "handler should have no active span when publisher had none");
}
#[tokio::test]
async fn try_publish_propagates_caller_span() {
let subscriber = tracing_subscriber::fmt()
.with_max_level(tracing::Level::TRACE)
.with_test_writer()
.finish();
let _guard = tracing::subscriber::set_default(subscriber);
let bus = EventBus::builder().buffer_size(16).build().await.expect("valid config");
let captured = Arc::new(tokio::sync::Mutex::new(None));
let done = Arc::new(Notify::new());
let _ = bus
.subscribe(SpanCapturingAsyncHandler {
captured: Arc::clone(&captured),
done: Arc::clone(&done),
})
.await
.expect("subscribe");
let outer_span = tracing::info_span!("test.try_publish");
let outer_id = outer_span.id();
assert!(outer_id.is_some(), "subscriber must allocate span IDs for this test");
{
let _entered = outer_span.enter();
bus.try_publish(TracedEvent { value: 4 }).expect("try_publish");
}
tokio::time::timeout(Duration::from_secs(2), done.notified())
.await
.expect("handler should complete within timeout");
bus.shutdown().await.expect("shutdown");
let handler_span_id = captured.lock().await.clone();
assert!(handler_span_id.is_some(), "handler should have an active span via try_publish");
assert_eq!(
handler_span_id, outer_id,
"try_publish should propagate the caller's span to async handlers"
);
}
#[tokio::test]
async fn worker_path_propagates_caller_span() {
let subscriber = tracing_subscriber::fmt()
.with_max_level(tracing::Level::TRACE)
.with_test_writer()
.finish();
let _guard = tracing::subscriber::set_default(subscriber);
let bus = EventBus::builder().buffer_size(16).build().await.expect("valid config");
let captured = Arc::new(tokio::sync::Mutex::new(None));
let done = Arc::new(Notify::new());
let _ = bus
.subscribe(SpanCapturingAsyncHandler {
captured: Arc::clone(&captured),
done: Arc::clone(&done),
})
.await
.expect("subscribe");
let outer_span = tracing::info_span!("test.worker_path");
let outer_id = outer_span.id();
{
let _entered = outer_span.enter();
bus.publish(TracedEvent { value: 5 }).await.expect("publish");
}
tokio::time::timeout(Duration::from_secs(2), done.notified())
.await
.expect("handler should complete within timeout");
bus.shutdown().await.expect("shutdown");
let handler_span_id = captured.lock().await.clone();
assert!(handler_span_id.is_some(), "handler via worker path should have an active span");
assert_eq!(
handler_span_id, outer_id,
"worker path should propagate the publisher's span to async handlers"
);
}