use std::io::{BufWriter, Write};
use std::path::Path;
use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use chrono::{DateTime, Utc};
use serde::Serialize;
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type")]
pub enum ThoughtJackEvent {
PhaseEntered {
actor: String,
phase_name: String,
phase_index: usize,
#[serde(skip_serializing_if = "Option::is_none")]
trigger_event: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
trigger_count: Option<i64>,
},
PhaseAdvanced {
actor: String,
from: String,
to: String,
trigger: String,
},
PhaseTerminal {
actor: String,
phase_name: String,
},
ExtractorCaptured {
actor: String,
name: String,
value_preview: String,
},
SynthesizeGenerated {
actor: String,
protocol: String,
},
SynthesizeValidationBypassed {
actor: String,
},
EntryActionExecuted {
actor: String,
action_type: String,
},
OrchestratorStarted {
actor_count: usize,
server_count: usize,
client_count: usize,
},
ActorInit {
actor_name: String,
mode: String,
},
ActorReady {
actor_name: String,
bind_address: String,
},
ReadinessGateOpen {
server_count: usize,
elapsed_ms: u64,
},
ReadinessGateTimeout {
not_ready: Vec<String>,
},
ReadinessGateServerFailed {
actor: String,
},
ActorStarted {
actor_name: String,
phase_count: usize,
},
ActorCompleted {
actor_name: String,
reason: String,
phases_completed: usize,
},
ActorError {
actor_name: String,
error: String,
},
AwaitExtractorsWaiting {
actor: String,
phase_index: usize,
awaiting: Vec<String>,
},
AwaitExtractorsResolved {
actor: String,
phase_index: usize,
},
AwaitExtractorsTimeout {
actor: String,
phase_index: usize,
missing: Vec<String>,
},
OrchestratorShutdown {
reason: String,
},
OrchestratorCompleted {
summary: String,
},
GracePeriodStarted {
duration_seconds: u64,
},
GracePeriodExpired {
messages_captured: usize,
},
GracePeriodEarlyTermination {
reason: String,
},
IndicatorEvaluated {
indicator_id: String,
method: String,
result: String,
duration_ms: u64,
#[serde(skip_serializing_if = "Option::is_none")]
evidence: Option<String>,
},
PhaseCompleted {
actor: String,
phase_name: String,
duration_ms: u64,
message_count: usize,
},
IndicatorSkipped {
indicator_id: String,
reason: String,
},
SemanticLlmCall {
model: String,
indicator_id: String,
latency_ms: u64,
},
VerdictComputed {
result: String,
#[serde(skip_serializing_if = "Option::is_none")]
max_tier: Option<String>,
matched: usize,
total: usize,
},
ProtocolMessageReceived {
actor: String,
method: String,
protocol: String,
#[serde(skip_serializing_if = "Option::is_none")]
qualifier: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
trigger_current: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
trigger_total: Option<i64>,
},
ProtocolMessageSent {
actor: String,
method: String,
protocol: String,
duration_ms: u64,
#[serde(skip_serializing_if = "Option::is_none")]
qualifier: Option<String>,
},
ProtocolNotification {
actor: String,
method: String,
direction: String,
},
ProtocolTransportError {
actor: String,
error: String,
},
ProtocolInterleave {
actor: String,
server_method: String,
},
ServerStarted {
server_name: String,
transport: String,
},
ServerStopped {
reason: String,
uptime_seconds: u64,
},
TransportConnected {
connection_id: String,
},
TransportDisconnected {
connection_id: String,
reason: String,
},
Error {
error_type: String,
message: String,
context: String,
},
}
#[derive(Debug, Serialize)]
struct EventEnvelope {
sequence: u64,
timestamp: DateTime<Utc>,
#[serde(flatten)]
event: ThoughtJackEvent,
}
pub struct EventEmitter {
writer: Mutex<BufWriter<Box<dyn Write + Send>>>,
sequence: AtomicU64,
progress_tx: Option<tokio::sync::mpsc::UnboundedSender<ThoughtJackEvent>>,
}
impl std::fmt::Debug for EventEmitter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventEmitter")
.field("sequence", &self.sequence.load(Ordering::Relaxed))
.finish_non_exhaustive()
}
}
impl EventEmitter {
#[must_use]
pub fn new(writer: Box<dyn Write + Send>) -> Self {
Self {
writer: Mutex::new(BufWriter::new(writer)),
sequence: AtomicU64::new(0),
progress_tx: None,
}
}
#[must_use]
pub fn with_progress(
writer: Box<dyn Write + Send>,
tx: tokio::sync::mpsc::UnboundedSender<ThoughtJackEvent>,
) -> Self {
Self {
writer: Mutex::new(BufWriter::new(writer)),
sequence: AtomicU64::new(0),
progress_tx: Some(tx),
}
}
#[must_use]
pub fn stdout() -> Self {
Self::new(Box::new(std::io::stdout()))
}
#[must_use]
pub fn stderr() -> Self {
Self::new(Box::new(std::io::stderr()))
}
#[must_use]
pub fn noop() -> Self {
Self::new(Box::new(std::io::sink()))
}
pub fn from_file(path: &Path) -> std::io::Result<Self> {
let file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)?;
Ok(Self::new(Box::new(file)))
}
pub fn emit(&self, event: ThoughtJackEvent) {
if let Some(tx) = &self.progress_tx {
let _ = tx.send(event.clone());
}
let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
let envelope = EventEnvelope {
sequence: seq,
timestamp: Utc::now(),
event,
};
if let Ok(mut w) = self.writer.lock()
&& let Ok(line) = serde_json::to_string(&envelope)
{
let _ = writeln!(w, "{line}");
let _ = w.flush();
}
}
#[must_use]
pub fn event_count(&self) -> u64 {
self.sequence.load(Ordering::Relaxed)
}
pub fn flush(&self) {
if let Ok(mut w) = self.writer.lock() {
let _ = w.flush();
}
}
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex as StdMutex};
use super::*;
#[derive(Clone)]
struct TestWriter(Arc<StdMutex<Vec<u8>>>);
impl TestWriter {
fn new() -> Self {
Self(Arc::new(StdMutex::new(Vec::new())))
}
fn contents(&self) -> String {
let buf = self.0.lock().unwrap();
String::from_utf8_lossy(&buf).into_owned()
}
}
impl Write for TestWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.0.lock().unwrap().extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
fn sample_event() -> ThoughtJackEvent {
ThoughtJackEvent::ServerStarted {
server_name: "test-server".to_owned(),
transport: "stdio".to_owned(),
}
}
#[test]
fn event_serializes_with_type_tag() {
let json = serde_json::to_string(&sample_event()).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["type"], "ServerStarted");
assert_eq!(parsed["server_name"], "test-server");
}
#[test]
fn emitter_writes_valid_jsonl() {
let tw = TestWriter::new();
let emitter = EventEmitter::new(Box::new(tw.clone()));
emitter.emit(sample_event());
let output = tw.contents();
let parsed: serde_json::Value = serde_json::from_str(output.trim()).unwrap();
assert_eq!(parsed["type"], "ServerStarted");
assert_eq!(parsed["server_name"], "test-server");
assert_eq!(parsed["transport"], "stdio");
assert_eq!(parsed["sequence"], 0);
}
#[test]
fn emitter_increments_sequence() {
let tw = TestWriter::new();
let emitter = EventEmitter::new(Box::new(tw.clone()));
emitter.emit(sample_event());
emitter.emit(ThoughtJackEvent::ServerStopped {
reason: "completed".to_owned(),
uptime_seconds: 42,
});
assert_eq!(emitter.event_count(), 2);
let lines: Vec<serde_json::Value> = tw
.contents()
.lines()
.map(|l| serde_json::from_str(l).unwrap())
.collect();
assert_eq!(lines[0]["sequence"], 0);
assert_eq!(lines[1]["sequence"], 1);
}
#[test]
#[allow(clippy::too_many_lines)]
fn all_event_categories_serialize_to_valid_json() {
let variants: Vec<ThoughtJackEvent> = vec![
ThoughtJackEvent::PhaseEntered {
actor: "a".to_owned(),
phase_name: "p".to_owned(),
phase_index: 0,
trigger_event: Some("tools/call".to_owned()),
trigger_count: Some(3),
},
ThoughtJackEvent::PhaseAdvanced {
actor: "a".to_owned(),
from: "p1".to_owned(),
to: "p2".to_owned(),
trigger: "t".to_owned(),
},
ThoughtJackEvent::PhaseTerminal {
actor: "a".to_owned(),
phase_name: "p".to_owned(),
},
ThoughtJackEvent::ExtractorCaptured {
actor: "a".to_owned(),
name: "x".to_owned(),
value_preview: "v".to_owned(),
},
ThoughtJackEvent::SynthesizeGenerated {
actor: "a".to_owned(),
protocol: "mcp".to_owned(),
},
ThoughtJackEvent::SynthesizeValidationBypassed {
actor: "a".to_owned(),
},
ThoughtJackEvent::EntryActionExecuted {
actor: "a".to_owned(),
action_type: "notification".to_owned(),
},
ThoughtJackEvent::OrchestratorStarted {
actor_count: 2,
server_count: 1,
client_count: 1,
},
ThoughtJackEvent::ActorInit {
actor_name: "a".to_owned(),
mode: "mcp_server".to_owned(),
},
ThoughtJackEvent::ActorReady {
actor_name: "a".to_owned(),
bind_address: ":3000".to_owned(),
},
ThoughtJackEvent::ReadinessGateOpen {
server_count: 1,
elapsed_ms: 100,
},
ThoughtJackEvent::ReadinessGateTimeout {
not_ready: vec!["a".to_owned()],
},
ThoughtJackEvent::ReadinessGateServerFailed {
actor: "a".to_owned(),
},
ThoughtJackEvent::ActorStarted {
actor_name: "a".to_owned(),
phase_count: 3,
},
ThoughtJackEvent::ActorCompleted {
actor_name: "a".to_owned(),
reason: "terminal".to_owned(),
phases_completed: 3,
},
ThoughtJackEvent::ActorError {
actor_name: "a".to_owned(),
error: "boom".to_owned(),
},
ThoughtJackEvent::AwaitExtractorsWaiting {
actor: "a".to_owned(),
phase_index: 1,
awaiting: vec!["x".to_owned()],
},
ThoughtJackEvent::AwaitExtractorsResolved {
actor: "a".to_owned(),
phase_index: 1,
},
ThoughtJackEvent::AwaitExtractorsTimeout {
actor: "a".to_owned(),
phase_index: 1,
missing: vec!["x".to_owned()],
},
ThoughtJackEvent::OrchestratorShutdown {
reason: "cancel".to_owned(),
},
ThoughtJackEvent::OrchestratorCompleted {
summary: "done".to_owned(),
},
ThoughtJackEvent::GracePeriodStarted {
duration_seconds: 30,
},
ThoughtJackEvent::GracePeriodExpired {
messages_captured: 5,
},
ThoughtJackEvent::GracePeriodEarlyTermination {
reason: "eof".to_owned(),
},
ThoughtJackEvent::IndicatorEvaluated {
indicator_id: "i1".to_owned(),
method: "cel".to_owned(),
result: "matched".to_owned(),
duration_ms: 10,
evidence: Some("regex matched \"id_rsa\"".to_owned()),
},
ThoughtJackEvent::PhaseCompleted {
actor: "a".to_owned(),
phase_name: "exploit".to_owned(),
duration_ms: 3200,
message_count: 6,
},
ThoughtJackEvent::IndicatorSkipped {
indicator_id: "i2".to_owned(),
reason: "no trace".to_owned(),
},
ThoughtJackEvent::SemanticLlmCall {
model: "gpt-4".to_owned(),
indicator_id: "i3".to_owned(),
latency_ms: 500,
},
ThoughtJackEvent::VerdictComputed {
result: "exploited".to_owned(),
max_tier: Some("boundary_breach".to_owned()),
matched: 2,
total: 3,
},
ThoughtJackEvent::ProtocolMessageReceived {
actor: "a".to_owned(),
method: "tools/call".to_owned(),
protocol: "mcp".to_owned(),
qualifier: None,
trigger_current: Some(2),
trigger_total: Some(3),
},
ThoughtJackEvent::ProtocolMessageSent {
actor: "a".to_owned(),
method: "tools/call".to_owned(),
protocol: "mcp".to_owned(),
duration_ms: 5,
qualifier: None,
},
ThoughtJackEvent::ProtocolNotification {
actor: "a".to_owned(),
method: "notify".to_owned(),
direction: "outgoing".to_owned(),
},
ThoughtJackEvent::ProtocolTransportError {
actor: "a".to_owned(),
error: "timeout".to_owned(),
},
ThoughtJackEvent::ProtocolInterleave {
actor: "a".to_owned(),
server_method: "sampling/createMessage".to_owned(),
},
ThoughtJackEvent::ServerStarted {
server_name: "s".to_owned(),
transport: "stdio".to_owned(),
},
ThoughtJackEvent::ServerStopped {
reason: "completed".to_owned(),
uptime_seconds: 60,
},
ThoughtJackEvent::TransportConnected {
connection_id: "1".to_owned(),
},
ThoughtJackEvent::TransportDisconnected {
connection_id: "1".to_owned(),
reason: "eof".to_owned(),
},
ThoughtJackEvent::Error {
error_type: "io".to_owned(),
message: "disk full".to_owned(),
context: "writing trace".to_owned(),
},
];
for variant in &variants {
let json = serde_json::to_string(variant).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert!(parsed.get("type").is_some(), "missing type tag: {json}");
}
}
#[test]
fn envelope_flattens_event_fields() {
let envelope = EventEnvelope {
sequence: 7,
timestamp: DateTime::parse_from_rfc3339("2025-02-04T10:15:30Z")
.unwrap()
.with_timezone(&Utc),
event: sample_event(),
};
let json = serde_json::to_string(&envelope).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["sequence"], 7);
assert_eq!(parsed["type"], "ServerStarted");
assert_eq!(parsed["server_name"], "test-server");
assert!(
parsed.get("event").is_none(),
"event field should be flattened"
);
}
#[test]
fn from_file_creates_valid_jsonl_output() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("events.jsonl");
let emitter = EventEmitter::from_file(&path).unwrap();
emitter.emit(sample_event());
emitter.emit(ThoughtJackEvent::ServerStopped {
reason: "completed".to_owned(),
uptime_seconds: 10,
});
assert_eq!(emitter.event_count(), 2);
let content = std::fs::read_to_string(&path).unwrap();
let lines: Vec<serde_json::Value> = content
.lines()
.map(|l| serde_json::from_str(l).unwrap())
.collect();
assert_eq!(lines.len(), 2);
assert_eq!(lines[0]["type"], "ServerStarted");
assert_eq!(lines[1]["type"], "ServerStopped");
}
#[test]
fn stderr_emitter_does_not_panic() {
let emitter = EventEmitter::stderr();
emitter.emit(sample_event());
assert_eq!(emitter.event_count(), 1);
}
#[test]
fn test_timestamp_is_utc() {
let tw = TestWriter::new();
let emitter = EventEmitter::new(Box::new(tw.clone()));
emitter.emit(sample_event());
let contents = tw.contents();
let parsed: serde_json::Value = serde_json::from_str(contents.trim()).unwrap();
let ts = parsed["timestamp"]
.as_str()
.expect("timestamp field should be a string");
assert!(
ts.ends_with('Z') || ts.contains("+00:00"),
"timestamp should be in UTC (ends with Z or +00:00), got: {ts}"
);
}
#[test]
fn test_empty_server_lifecycle_events() {
let tw = TestWriter::new();
let emitter = EventEmitter::new(Box::new(tw.clone()));
emitter.emit(ThoughtJackEvent::ServerStarted {
server_name: "lifecycle-test".to_owned(),
transport: "stdio".to_owned(),
});
emitter.emit(ThoughtJackEvent::ServerStopped {
reason: "completed".to_owned(),
uptime_seconds: 0,
});
let contents = tw.contents();
let lines: Vec<&str> = contents.lines().collect();
assert_eq!(lines.len(), 2, "expected exactly 2 JSONL entries");
let first: serde_json::Value = serde_json::from_str(lines[0]).unwrap();
assert_eq!(first["type"], "ServerStarted");
let second: serde_json::Value = serde_json::from_str(lines[1]).unwrap();
assert_eq!(second["type"], "ServerStopped");
}
#[test]
fn test_metrics_with_no_requests() {
use crate::observability::metrics::record_request;
record_request("tools/call");
}
#[test]
fn concurrent_emit_from_multiple_threads() {
let tw = TestWriter::new();
let emitter = Arc::new(EventEmitter::new(Box::new(tw.clone())));
let threads: Vec<_> = (0..8)
.map(|i| {
let emitter = Arc::clone(&emitter);
std::thread::spawn(move || {
for j in 0..10 {
emitter.emit(ThoughtJackEvent::PhaseEntered {
actor: format!("thread-{i}"),
phase_name: format!("phase-{j}"),
phase_index: j,
trigger_event: None,
trigger_count: None,
});
}
})
})
.collect();
for t in threads {
t.join().unwrap();
}
assert_eq!(emitter.event_count(), 80);
let contents = tw.contents();
let lines: Vec<serde_json::Value> = contents
.lines()
.map(|l| serde_json::from_str(l).unwrap())
.collect();
assert_eq!(lines.len(), 80);
let mut seqs: Vec<u64> = lines
.iter()
.map(|l| l["sequence"].as_u64().unwrap())
.collect();
seqs.sort_unstable();
seqs.dedup();
assert_eq!(seqs.len(), 80, "all sequence numbers must be unique");
}
#[test]
fn flush_is_idempotent_and_safe() {
let tw = TestWriter::new();
let emitter = EventEmitter::new(Box::new(tw.clone()));
emitter.emit(sample_event());
emitter.flush();
emitter.flush();
let contents = tw.contents();
assert_eq!(
contents.lines().count(),
1,
"flush should not duplicate output"
);
}
#[test]
fn emit_survives_writer_error() {
struct FailingWriter;
impl Write for FailingWriter {
fn write(&mut self, _buf: &[u8]) -> std::io::Result<usize> {
Err(std::io::Error::other("disk full"))
}
fn flush(&mut self) -> std::io::Result<()> {
Err(std::io::Error::other("disk full"))
}
}
let emitter = EventEmitter::new(Box::new(FailingWriter));
emitter.emit(sample_event());
emitter.emit(sample_event());
emitter.flush();
assert_eq!(emitter.event_count(), 2);
}
#[test]
fn from_file_appends_to_existing() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("append.jsonl");
{
let emitter = EventEmitter::from_file(&path).unwrap();
emitter.emit(sample_event());
}
{
let emitter = EventEmitter::from_file(&path).unwrap();
emitter.emit(ThoughtJackEvent::ServerStopped {
reason: "done".to_owned(),
uptime_seconds: 5,
});
}
let content = std::fs::read_to_string(&path).unwrap();
let lines: Vec<serde_json::Value> = content
.lines()
.map(|l| serde_json::from_str(l).unwrap())
.collect();
assert_eq!(lines.len(), 2);
assert_eq!(lines[0]["type"], "ServerStarted");
assert_eq!(lines[1]["type"], "ServerStopped");
}
#[test]
fn noop_emitter_discards_all_events() {
let emitter = EventEmitter::noop();
emitter.emit(sample_event());
emitter.emit(sample_event());
emitter.emit(sample_event());
assert_eq!(emitter.event_count(), 3);
}
}