use async_trait::async_trait;
use chrono::Utc as ChronoUtc;
use futures_util::StreamExt;
use proptest::{collection::hash_map, prelude::*};
use rustc_hash::FxHashMap as FastMap;
use serde_json::{Number, Value, json};
use std::{
fmt as stdfmt, fs,
io::{self, Cursor, Write},
path::PathBuf,
sync::{Arc, Mutex},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use weavegraph::channels::Channel as _;
use weavegraph::event_bus::{
ChannelSink, EmitterError, Event, EventBus, EventEmitter, EventHub, EventSink,
INVOCATION_END_SCOPE, JsonLinesSink, LLMStreamingEvent, MemorySink, NodeEvent,
STREAM_END_SCOPE,
};
use weavegraph::graphs::GraphBuilder;
use weavegraph::node::{Node, NodeContext, NodeError, NodePartial};
use weavegraph::state::{StateSnapshot, VersionedState};
use weavegraph::types::NodeKind;
use weavegraph::utils::clock::MockClock;
fn test_bus() -> (EventBus, MemorySink) {
let sink = MemorySink::new();
(EventBus::with_sink(sink.clone()), sink)
}
fn scope_message_pairs(events: &[Event]) -> Vec<(Option<String>, String)> {
events
.iter()
.map(|event| {
(
event.scope_label().map(str::to_owned),
event.message().to_owned(),
)
})
.collect()
}
fn as_node_event(event: &Event) -> &NodeEvent {
match event {
Event::Node(node) => node,
other => panic!("expected node event, got {other:?}"),
}
}
fn as_llm_event(event: &Event) -> &LLMStreamingEvent {
match event {
Event::LLM(llm) => llm,
other => panic!("expected llm event, got {other:?}"),
}
}
#[derive(Clone)]
struct MirrorWriter {
buffer: Arc<Mutex<Cursor<Vec<u8>>>>,
}
impl Write for MirrorWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.buffer
.lock()
.expect("MirrorWriter mutex poisoned")
.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.buffer
.lock()
.expect("MirrorWriter mutex poisoned")
.flush()
}
}
fn shared_buffer_text(buffer: &Arc<Mutex<Cursor<Vec<u8>>>>) -> String {
let locked = buffer.lock().expect("shared buffer mutex poisoned");
String::from_utf8(locked.get_ref().clone()).expect("buffer should contain valid utf-8")
}
fn parse_jsonl(buffer: &Arc<Mutex<Cursor<Vec<u8>>>>) -> Vec<Value> {
shared_buffer_text(buffer)
.lines()
.map(|line| serde_json::from_str(line).expect("line should be valid json"))
.collect()
}
fn repo_jsonl_path(label: &str) -> PathBuf {
let unique = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time should be after unix epoch")
.as_nanos();
let directory = std::env::current_dir()
.expect("current directory should be available")
.join("target")
.join("test-artifacts");
fs::create_dir_all(&directory).expect("test artifact directory should be creatable");
directory.join(format!("{label}-{unique}.jsonl"))
}
async fn wait_for_workers() {
tokio::time::sleep(Duration::from_millis(40)).await;
}
#[derive(Clone, Default)]
struct CapturingEmitter {
recorded: Arc<Mutex<Vec<Event>>>,
}
impl CapturingEmitter {
fn snapshot(&self) -> Vec<Event> {
self.recorded
.lock()
.expect("CapturingEmitter mutex poisoned")
.clone()
}
}
impl stdfmt::Debug for CapturingEmitter {
fn fmt(&self, f: &mut stdfmt::Formatter<'_>) -> stdfmt::Result {
f.debug_struct("CapturingEmitter")
.field("event_count", &self.snapshot().len())
.finish()
}
}
impl EventEmitter for CapturingEmitter {
fn emit(&self, event: Event) -> Result<(), EmitterError> {
self.recorded
.lock()
.expect("CapturingEmitter mutex poisoned")
.push(event);
Ok(())
}
}
#[derive(Debug)]
struct NoopTerminalNode;
#[async_trait]
impl Node for NoopTerminalNode {
async fn run(&self, _: StateSnapshot, _: NodeContext) -> Result<NodePartial, NodeError> {
Ok(NodePartial::default())
}
}
fn ascii_text() -> impl Strategy<Value = String> {
proptest::string::string_regex("[A-Za-z0-9 _-]{0,24}").expect("regex should compile")
}
fn leaf_json_value() -> impl Strategy<Value = Value> {
prop_oneof![
Just(Value::Null),
ascii_text().prop_map(Value::String),
any::<bool>().prop_map(Value::Bool),
any::<i64>().prop_map(|value| Value::Number(Number::from(value))),
]
}
fn any_event() -> impl Strategy<Value = Event> {
let diagnostics =
(ascii_text(), ascii_text()).prop_map(|(scope, message)| Event::diagnostic(scope, message));
let plain_nodes = (
prop::option::of(ascii_text()),
prop::option::of(any::<u64>()),
ascii_text(),
ascii_text(),
)
.prop_map(|(node_id, step, scope, message)| {
Event::Node(NodeEvent::new(node_id, step, scope, message))
});
let nodes_with_metadata = (
ascii_text(),
any::<u64>(),
ascii_text(),
ascii_text(),
hash_map(ascii_text(), leaf_json_value(), 0..4),
)
.prop_map(|(node_id, step, scope, message, metadata)| {
Event::node_message_with_metadata(
node_id,
step,
scope,
message,
metadata.into_iter().collect(),
)
});
let llm_events = (
prop::option::of(ascii_text()),
prop::option::of(ascii_text()),
prop::option::of(ascii_text()),
ascii_text(),
hash_map(ascii_text(), leaf_json_value(), 0..4),
any::<bool>(),
)
.prop_map(
|(session_id, node_id, stream_id, chunk, metadata, is_final)| {
let mut builder = LLMStreamingEvent::builder(chunk)
.is_final(is_final)
.metadata(metadata.into_iter().collect());
if let Some(id) = session_id {
builder = builder.session_id(id);
}
if let Some(id) = node_id {
builder = builder.node_id(id);
}
if let Some(id) = stream_id {
builder = builder.stream_id(id);
}
Event::LLM(builder.build())
},
);
prop_oneof![diagnostics, plain_nodes, nodes_with_metadata, llm_events]
}
#[tokio::test]
async fn stop_listener_flushes_pending_events() {
let (bus, sink) = test_bus();
bus.listen_for_events();
let publisher = bus.get_emitter();
publisher
.emit(Event::node_message_with_meta(
"node-a", 42, "scope-a", "payload",
))
.expect("emit should succeed");
wait_for_workers().await;
bus.stop_listener().await;
let events = sink.snapshot();
assert_eq!(events.len(), 1);
assert_eq!(
(events[0].message(), as_node_event(&events[0]).node_id()),
("payload", Some("node-a"))
);
}
#[tokio::test]
async fn stopping_idle_listener_is_safe() {
let (_, sink) = test_bus();
let bus = EventBus::with_sink(sink);
bus.listen_for_events();
bus.stop_listener().await;
bus.stop_listener().await;
}
#[tokio::test]
async fn memory_sink_keeps_scope_and_message_sequence() {
let (bus, sink) = test_bus();
bus.listen_for_events();
let publisher = bus.get_emitter();
let sequence = [
Event::node_message("Scope1", "one"),
Event::node_message("Scope1", "two"),
Event::diagnostic("Scope2", "three"),
Event::diagnostic("Scope2", "four"),
];
for item in sequence {
publisher.emit(item).expect("emit should succeed");
}
wait_for_workers().await;
bus.stop_listener().await;
assert_eq!(
scope_message_pairs(&sink.snapshot()),
vec![
(Some("Scope1".to_owned()), "one".to_owned()),
(Some("Scope1".to_owned()), "two".to_owned()),
(Some("Scope2".to_owned()), "three".to_owned()),
(Some("Scope2".to_owned()), "four".to_owned()),
]
);
}
#[tokio::test]
async fn repeated_listen_calls_keep_single_worker_per_sink() {
let (bus, sink) = test_bus();
for _ in 0..3 {
bus.listen_for_events();
}
let publisher = bus.get_emitter();
publisher
.emit(Event::node_message("single", "first"))
.expect("first emit should succeed");
publisher
.emit(Event::node_message("single", "second"))
.expect("second emit should succeed");
wait_for_workers().await;
bus.stop_listener().await;
let events = sink.snapshot();
assert_eq!(
[events[0].message(), events[1].message()],
["first", "second"]
);
}
#[tokio::test]
async fn concurrent_publishers_preserve_send_order() {
let (bus, sink) = test_bus();
bus.listen_for_events();
let source = bus.get_emitter();
let count = 12u32;
let mut tasks = Vec::new();
for index in 0..count {
let publisher = Arc::clone(&source);
tasks.push(tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis((index * 2) as u64)).await;
publisher
.emit(Event::node_message("ORDER", format!("msg-{index}")))
.expect("emit should succeed");
}));
}
for task in tasks {
task.await.expect("task should join");
}
bus.stop_listener().await;
let ordered_messages: Vec<String> = sink
.snapshot()
.into_iter()
.map(|event| event.message().to_owned())
.collect();
let expected: Vec<String> = (0..count).map(|index| format!("msg-{index}")).collect();
assert_eq!(ordered_messages, expected);
}
#[tokio::test]
async fn channel_sink_delivers_events_to_receiver() {
let (tx, rx) = flume::unbounded();
let bus = EventBus::with_sink(ChannelSink::new(tx));
bus.listen_for_events();
let publisher = bus.get_emitter();
publisher
.emit(Event::diagnostic("channel", "hello world"))
.expect("emit should succeed");
let received = tokio::time::timeout(Duration::from_secs(1), rx.recv_async())
.await
.expect("receiver should wake in time")
.expect("channel should carry an event");
assert_eq!(
(received.scope_label(), received.message()),
(Some("channel"), "hello world")
);
}
#[tokio::test]
async fn bus_broadcasts_each_event_to_every_sink() {
let memory = MemorySink::new();
let (tx, rx) = flume::unbounded();
let bus = EventBus::with_sinks(vec![
Box::new(memory.clone()),
Box::new(ChannelSink::new(tx)),
]);
bus.listen_for_events();
let publisher = bus.get_emitter();
publisher
.emit(Event::diagnostic("broadcast", "fan out"))
.expect("emit should succeed");
wait_for_workers().await;
bus.stop_listener().await;
assert_eq!(memory.snapshot()[0].message(), "fan out");
let mirrored = tokio::time::timeout(Duration::from_secs(1), rx.recv_async())
.await
.expect("receiver should wake in time")
.expect("channel should carry an event");
assert_eq!(mirrored.message(), "fan out");
}
#[tokio::test]
async fn sink_added_after_start_receives_later_events() {
let bus = EventBus::with_sinks(Vec::new());
bus.listen_for_events();
let (tx, rx) = flume::unbounded();
bus.add_sink(ChannelSink::new(tx));
let publisher = bus.get_emitter();
publisher
.emit(Event::diagnostic("dynamic", "arrived later"))
.expect("emit should succeed");
let received = tokio::time::timeout(Duration::from_secs(1), rx.recv_async())
.await
.expect("receiver should wake in time")
.expect("channel should carry an event");
assert_eq!(received.message(), "arrived later");
}
#[tokio::test(flavor = "current_thread")]
async fn stop_listener_waits_for_every_sink() {
let first = MemorySink::new();
let second = MemorySink::new();
let bus = EventBus::with_sinks(vec![Box::new(first.clone()), Box::new(second.clone())]);
bus.listen_for_events();
let publisher = bus.get_emitter();
for index in 0..10 {
publisher
.emit(Event::diagnostic("drain", format!("msg-{index}")))
.expect("emit should succeed");
}
wait_for_workers().await;
bus.stop_listener().await;
assert_eq!((first.snapshot().len(), second.snapshot().len()), (10, 10));
}
#[tokio::test]
async fn stop_listener_can_interrupt_active_publish_loop() {
let bus = Arc::new(EventBus::with_sink(MemorySink::new()));
bus.listen_for_events();
let publisher = bus.get_emitter();
let emission = tokio::spawn(async move {
for index in 0..1_000u32 {
let _ = publisher.emit(Event::diagnostic("stress", format!("{index}")));
tokio::task::yield_now().await;
}
});
tokio::time::sleep(Duration::from_millis(20)).await;
let stop_result = tokio::time::timeout(Duration::from_secs(1), bus.stop_listener()).await;
assert!(stop_result.is_ok(), "stop_listener should finish promptly");
emission.abort();
let _ = emission.await;
}
#[tokio::test]
async fn listener_can_restart_after_stop() {
let (bus, sink) = test_bus();
bus.listen_for_events();
let first_cycle = bus.get_emitter();
first_cycle
.emit(Event::diagnostic("cycle-1", "first"))
.expect("first emit should succeed");
wait_for_workers().await;
bus.stop_listener().await;
bus.listen_for_events();
let second_cycle = bus.get_emitter();
second_cycle
.emit(Event::diagnostic("cycle-2", "second"))
.expect("second emit should succeed");
wait_for_workers().await;
bus.stop_listener().await;
assert_eq!(
scope_message_pairs(&sink.snapshot()),
[
(Some("cycle-1".to_owned()), "first".to_owned()),
(Some("cycle-2".to_owned()), "second".to_owned()),
]
);
}
#[tokio::test(flavor = "current_thread")]
async fn async_stream_adapter_produces_published_event() {
let (bus, _) = test_bus();
let publisher = bus.get_emitter();
let stream = bus.subscribe().into_async_stream();
tokio::pin!(stream);
publisher
.emit(Event::diagnostic("async", "stream"))
.expect("emit should succeed");
let observed = stream.next().await.expect("stream should yield an event");
assert_eq!(observed.message(), "stream");
assert_eq!(
scope_message_pairs(std::slice::from_ref(&observed)),
[(Some("async".to_owned()), "stream".to_owned())]
);
}
#[tokio::test(flavor = "current_thread")]
async fn next_timeout_distinguishes_idle_streams_from_published_events() {
let (bus, _) = test_bus();
let publisher = bus.get_emitter();
let mut subscription = bus.subscribe();
assert!(
subscription
.next_timeout(Duration::from_millis(20))
.await
.is_none()
);
publisher
.emit(Event::diagnostic("timeout", "delivered"))
.expect("emit should succeed");
let observed = subscription
.next_timeout(Duration::from_millis(500))
.await
.expect("stream should receive the published event");
assert_eq!(observed.message(), "delivered");
assert_eq!(
scope_message_pairs(std::slice::from_ref(&observed)),
[(Some("timeout".to_owned()), "delivered".to_owned())]
);
}
#[tokio::test(flavor = "current_thread")]
async fn blocking_iterator_receives_next_event() {
let (bus, _) = test_bus();
let publisher = bus.get_emitter();
let blocking_iter = bus.subscribe().into_blocking_iter();
let worker = tokio::task::spawn_blocking(move || {
let mut blocking_iter = blocking_iter;
blocking_iter.next()
});
tokio::time::sleep(Duration::from_millis(10)).await;
publisher
.emit(Event::diagnostic("blocking", "iter"))
.expect("emit should succeed");
let observed = worker
.await
.expect("blocking task should join")
.expect("iterator should yield one event");
assert_eq!(observed.message(), "iter");
assert_eq!(
scope_message_pairs(std::slice::from_ref(&observed)),
[(Some("blocking".to_owned()), "iter".to_owned())]
);
}
#[tokio::test(flavor = "current_thread")]
async fn subscription_closes_after_bus_drop() {
let mut subscription = {
let (bus, _) = test_bus();
bus.listen_for_events();
bus.subscribe()
};
assert!(
subscription
.next_timeout(Duration::from_millis(50))
.await
.is_none()
);
}
#[tokio::test(flavor = "current_thread")]
async fn invoke_streaming_appends_terminal_scope_event() {
let terminal_kind = NodeKind::Custom("terminal".to_owned());
let builder = GraphBuilder::new()
.add_node(terminal_kind.clone(), NoopTerminalNode)
.add_edge(NodeKind::Start, terminal_kind.clone())
.add_edge(terminal_kind, NodeKind::End);
let app = builder.compile().expect("graph should compile");
let request = VersionedState::new_with_user_message("finish");
let (handle, stream) = app.invoke_streaming(request).await;
let collector =
tokio::spawn(async move { stream.into_async_stream().collect::<Vec<_>>().await });
let final_state = handle.join().await.expect("workflow should finish");
let emitted_messages = final_state.messages.snapshot().len();
assert_eq!(emitted_messages, 1);
let events = collector.await.expect("collector should join");
let terminal = events.last().expect("stream should emit a terminal event");
assert_eq!(terminal.scope_label(), Some(STREAM_END_SCOPE));
}
#[tokio::test(flavor = "current_thread")]
async fn hub_metrics_count_lagged_events() {
use tokio::sync::broadcast::error::RecvError as BroadcastRecvError;
let lag_hub = EventHub::new(1);
let publisher = lag_hub.emitter();
let mut subscription = lag_hub.subscribe();
for message in ["first", "second"] {
publisher
.emit(Event::diagnostic("metrics", message))
.expect("emit should succeed");
}
let dropped_by_lag = match subscription.recv().await {
Err(BroadcastRecvError::Lagged(count)) => count,
Ok(event) => panic!("expected lagged error, received {event:?}"),
Err(other) => panic!("unexpected recv error: {other:?}"),
};
let health = lag_hub.metrics();
assert_eq!((dropped_by_lag, health.capacity, health.dropped), (1, 1, 1));
}
#[cfg_attr(test, test)]
fn default_bus_keeps_expected_capacity_metrics() {
let default_report = EventBus::default().metrics();
assert_eq!((default_report.capacity, default_report.dropped), (1024, 0));
}
#[cfg_attr(test, test)]
fn node_context_emit_adds_identity_and_runtime_metadata() {
let recorder = CapturingEmitter::default();
let emitter: Arc<dyn EventEmitter> = Arc::new(recorder.clone());
let mut context = NodeContext::new("node-a", 7, emitter);
context.invocation_id = Some("invoke-1".to_owned());
context.clock = Some(Arc::new(MockClock::new(123)));
context
.emit("progress", "started")
.expect("node event should emit");
let events = recorder.snapshot();
assert_eq!(events.len(), 1);
let node = as_node_event(&events[0]);
assert_eq!(node.node_id(), Some("node-a"));
assert_eq!(node.step(), Some(7));
assert_eq!(node.scope(), "progress");
assert_eq!(node.message(), "started");
assert_eq!(
node.metadata().get("invocation_id"),
Some(&json!("invoke-1"))
);
assert_eq!(node.metadata().get("now_unix_ms"), Some(&json!(123_000)));
}
#[cfg_attr(any(test), test)]
fn node_context_emits_diagnostics_and_llm_variants() {
let recorder = CapturingEmitter::default();
let emitter: Arc<dyn EventEmitter> = Arc::new(recorder.clone());
let context = NodeContext::new("node-a", 7, emitter);
let mut llm_metadata = FastMap::default();
llm_metadata.insert("token_count".to_owned(), json!(42));
context
.emit_diagnostic("diagnostic", "all good")
.expect("diagnostic should emit");
context
.emit_llm_chunk(
Some("session-1".to_owned()),
Some("stream-1".to_owned()),
"chunk text",
Some(llm_metadata),
)
.expect("chunk event should emit");
context
.emit_llm_final(
Some("session-1".to_owned()),
Some("stream-1".to_owned()),
"final chunk",
None,
)
.expect("final event should emit");
context
.emit_llm_error(
Some("session-1".to_owned()),
Some("stream-1".to_owned()),
"error occurred",
)
.expect("error event should emit");
let captured = recorder.snapshot();
assert_eq!(captured.len(), 4);
let Event::Diagnostic(diagnostic) = &captured[0] else {
panic!("expected diagnostic event, got {:?}", captured[0]);
};
assert_eq!(
(diagnostic.scope(), diagnostic.message()),
("diagnostic", "all good")
);
let chunk = as_llm_event(&captured[1]);
assert_eq!(chunk.session_id(), Some("session-1"));
assert_eq!(chunk.node_id(), Some("node-a"));
assert_eq!(chunk.stream_id(), Some("stream-1"));
assert!(!chunk.is_final());
assert_eq!(chunk.chunk(), "chunk text");
assert_eq!(chunk.metadata().get("token_count"), Some(&json!(42)));
let final_chunk = as_llm_event(&captured[2]);
assert!(final_chunk.is_final());
assert_eq!(final_chunk.chunk(), "final chunk");
assert!(final_chunk.metadata().is_empty());
let error_event = as_llm_event(&captured[3]);
assert!(error_event.is_final());
assert_eq!(error_event.chunk(), "error occurred");
assert_eq!(
error_event.metadata().get("severity"),
Some(&json!("error"))
);
}
#[cfg_attr(any(test), test)]
fn legacy_node_payloads_decode_with_empty_metadata() {
let payload = r#"{"node_id":"legacy-node","step":3,"scope":"legacy","message":"old"}"#;
let event: NodeEvent = serde_json::from_str(payload).expect("legacy payload should decode");
assert_eq!(event.node_id(), Some("legacy-node"));
assert_eq!(event.step(), Some(3));
assert_eq!(event.scope(), "legacy");
assert_eq!(event.message(), "old");
assert!(event.metadata().is_empty());
}
#[test]
fn structured_metadata_beats_conflicting_runtime_keys() {
let mut metadata = FastMap::default();
metadata.insert("custom".to_owned(), json!({"nested": true}));
metadata.insert("node_id".to_owned(), json!("spoofed-node"));
metadata.insert("step".to_owned(), json!(0));
let json = Event::node_message_with_metadata("real-node", 42, "scope", "message", metadata)
.to_json_value();
assert_eq!(json["metadata"]["custom"], json!({"nested": true}));
assert_eq!(json["metadata"]["node_id"], "real-node");
assert_eq!(json["metadata"]["step"], 42);
}
#[test]
fn stream_boundary_scope_constants_remain_distinct() {
assert_eq!(STREAM_END_SCOPE, "__weavegraph_stream_end__");
assert_eq!(INVOCATION_END_SCOPE, "__weavegraph_invocation_end__");
assert_ne!(STREAM_END_SCOPE, INVOCATION_END_SCOPE);
}
#[test]
fn event_json_roundtrip_is_lossless() {
let mut runner = proptest::test_runner::TestRunner::new(ProptestConfig::with_cases(64));
runner
.run(&any_event(), |sampled_event| {
let encoded_event =
serde_json::to_string(&sampled_event).expect("event should serialize");
let restored: Event =
serde_json::from_str(&encoded_event).expect("event should deserialize");
prop_assert_eq!(sampled_event, restored);
Ok(())
})
.expect("proptest cases should pass");
}
#[test]
fn node_event_json_contains_type_message_and_metadata() {
let json =
Event::node_message_with_meta("router", 5, "routing", "Processing request").to_json_value();
assert_eq!(json["type"], "node");
assert_eq!(json["scope"], "routing");
assert_eq!(json["message"], "Processing request");
assert_eq!(json["metadata"]["node_id"], "router");
assert_eq!(json["metadata"]["step"], 5);
assert!(json["timestamp"].is_string());
}
#[test]
fn node_event_without_optional_fields_keeps_metadata_object_empty() {
let json = Event::node_message("test_scope", "test message").to_json_value();
assert_eq!(json["type"], "node");
assert_eq!(json["scope"], "test_scope");
assert_eq!(json["message"], "test message");
assert!(json["metadata"].is_object());
assert!(json["metadata"]["node_id"].is_null());
assert!(json["metadata"]["step"].is_null());
}
#[test]
fn diagnostic_json_uses_empty_metadata_map() {
let json = Event::diagnostic("error_scope", "Something went wrong").to_json_value();
assert_eq!(json["type"], "diagnostic");
assert_eq!(json["scope"], "error_scope");
assert_eq!(json["message"], "Something went wrong");
assert!(json["timestamp"].is_string());
assert_eq!(json["metadata"], json!({}));
}
#[test]
fn llm_json_serializes_ids_flags_and_timestamp() {
let mut metadata = FastMap::default();
metadata.insert("content_type".to_owned(), json!("reasoning"));
metadata.insert("token_count".to_owned(), json!(42));
let timestamp = ChronoUtc::now();
let event = Event::LLM(
LLMStreamingEvent::builder("Thinking step by step...")
.session_id("session-123")
.node_id("node-abc")
.stream_id("stream-xyz")
.metadata(metadata)
.timestamp(timestamp)
.build(),
);
let json = event.to_json_value();
assert_eq!(json["type"], "llm");
assert_eq!(json["message"], "Thinking step by step...");
assert_eq!(json["metadata"]["session_id"], "session-123");
assert_eq!(json["metadata"]["node_id"], "node-abc");
assert_eq!(json["metadata"]["stream_id"], "stream-xyz");
assert_eq!(json["metadata"]["is_final"], false);
assert_eq!(json["metadata"]["content_type"], "reasoning");
assert_eq!(json["metadata"]["token_count"], 42);
assert_eq!(json["timestamp"], timestamp.to_rfc3339());
}
#[test]
fn compact_and_pretty_json_helpers_preserve_llm_payload() {
let timestamp = ChronoUtc::now();
let event = Event::LLM(
LLMStreamingEvent::builder("hello")
.stream_id("stream-1")
.timestamp(timestamp)
.build(),
);
let compact = event
.to_json_string()
.expect("compact json should serialize");
let pretty = event
.to_json_pretty()
.expect("pretty json should serialize");
let compact_value: Value = serde_json::from_str(&compact).expect("compact json should parse");
let pretty_value: Value = serde_json::from_str(&pretty).expect("pretty json should parse");
assert_eq!(compact_value, pretty_value);
assert!(compact.contains("\"type\":\"llm\""));
assert!(pretty.contains(" \"type\": \"llm\""));
}
#[tokio::test]
async fn jsonlines_sink_writes_one_compact_json_object_per_line() {
let buffer = Arc::new(Mutex::new(Cursor::new(Vec::new())));
let writer = MirrorWriter {
buffer: Arc::clone(&buffer),
};
let mut sink = JsonLinesSink::new(Box::new(writer));
sink.handle(&Event::diagnostic("test1", "first message"))
.expect("first write should succeed");
sink.handle(&Event::node_message("test2", "second message"))
.expect("second write should succeed");
let lines = parse_jsonl(&buffer);
assert_eq!(lines.len(), 2);
assert_eq!(lines[0]["type"], "diagnostic");
assert_eq!(lines[0]["scope"], "test1");
assert_eq!(lines[0]["message"], "first message");
assert_eq!(lines[1]["type"], "node");
assert_eq!(lines[1]["scope"], "test2");
assert_eq!(lines[1]["message"], "second message");
}
#[tokio::test]
async fn pretty_jsonlines_sink_includes_indentation() {
let buffer = Arc::new(Mutex::new(Cursor::new(Vec::new())));
let writer = MirrorWriter {
buffer: Arc::clone(&buffer),
};
let mut sink = JsonLinesSink::with_pretty_print(Box::new(writer));
sink.handle(&Event::diagnostic("pretty_test", "formatted output"))
.expect("write should succeed");
let output = shared_buffer_text(&buffer);
assert!(output.contains(" \"type\": \"diagnostic\""));
assert!(output.contains(" \"scope\": \"pretty_test\""));
}
#[tokio::test]
async fn jsonlines_sink_to_file_persists_every_event() {
let path = repo_jsonl_path("event-bus-jsonl");
{
let mut sink = JsonLinesSink::to_file(&path).expect("file sink should open");
sink.handle(&Event::node_message_with_meta(
"file_node",
1,
"file_scope",
"first",
))
.expect("first write should succeed");
sink.handle(&Event::diagnostic("file_scope", "second"))
.expect("second write should succeed");
sink.handle(&Event::node_message("file_scope", "third"))
.expect("third write should succeed");
}
let contents = fs::read_to_string(&path).expect("jsonl file should be readable");
let lines: Vec<Value> = contents
.lines()
.map(|line| serde_json::from_str(line).expect("line should be valid json"))
.collect();
let _ = fs::remove_file(&path);
assert_eq!(lines.len(), 3);
assert_eq!(lines[0]["metadata"]["node_id"], "file_node");
assert_eq!(lines[0]["metadata"]["step"], 1);
assert_eq!(lines[1]["type"], "diagnostic");
assert_eq!(lines[2]["message"], "third");
}
#[tokio::test]
async fn jsonlines_sink_flushes_after_each_handle_call() {
let buffer = Arc::new(Mutex::new(Cursor::new(Vec::new())));
let writer = MirrorWriter {
buffer: Arc::clone(&buffer),
};
let mut sink = JsonLinesSink::new(Box::new(writer));
sink.handle(&Event::diagnostic(
"flush_test",
"should be visible immediately",
))
.expect("write should succeed");
let output = shared_buffer_text(&buffer);
assert!(output.contains("\"message\":\"should be visible immediately\""));
}
#[tokio::test]
async fn jsonlines_sink_integrates_with_event_bus() {
let buffer = Arc::new(Mutex::new(Cursor::new(Vec::new())));
let sink = JsonLinesSink::new(Box::new(MirrorWriter {
buffer: Arc::clone(&buffer),
}));
let bus = EventBus::with_sink(sink);
bus.listen_for_events();
let emitter = bus.get_emitter();
emitter
.emit(Event::node_message("integration", "message1"))
.expect("first emit should succeed");
emitter
.emit(Event::diagnostic("integration", "message2"))
.expect("second emit should succeed");
wait_for_workers().await;
bus.stop_listener().await;
let lines = parse_jsonl(&buffer);
assert_eq!(lines.len(), 2);
assert_eq!(lines[0]["message"], "message1");
assert_eq!(lines[1]["message"], "message2");
}
#[tokio::test]
async fn channel_sink_reports_broken_pipe_when_receiver_is_gone() {
let (tx, rx) = flume::unbounded();
let mut sink = ChannelSink::new(tx);
drop(rx);
let result = sink.handle(&Event::diagnostic("test", "msg"));
let error = result.expect_err("dropped receiver should produce an error");
assert_eq!(error.kind(), io::ErrorKind::BrokenPipe);
}