use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use chrono::{DateTime, Utc};
use serde::Serialize;
use super::types::Direction;
const MAX_TRACE_ENTRIES: usize = 100_000;
#[derive(Debug, Clone, Serialize)]
pub struct TraceEntry {
pub seq: u64,
pub timestamp: DateTime<Utc>,
pub actor: String,
pub phase: String,
pub direction: Direction,
pub method: String,
pub content: serde_json::Value,
}
#[derive(Clone)]
pub struct SharedTrace {
entries: Arc<Mutex<Vec<TraceEntry>>>,
seq_counter: Arc<AtomicU64>,
capacity_warned: Arc<AtomicBool>,
}
impl SharedTrace {
#[must_use]
pub fn new() -> Self {
Self {
entries: Arc::new(Mutex::new(Vec::new())),
seq_counter: Arc::new(AtomicU64::new(0)),
capacity_warned: Arc::new(AtomicBool::new(false)),
}
}
pub fn append(
&self,
actor: &str,
phase: &str,
direction: Direction,
method: &str,
content: &serde_json::Value,
) {
let mut entries = self
.entries
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if entries.len() >= MAX_TRACE_ENTRIES {
if !self.capacity_warned.swap(true, Ordering::Relaxed) {
tracing::warn!(
max = MAX_TRACE_ENTRIES,
"trace buffer full — dropping new entries"
);
}
return;
}
let seq = self.seq_counter.fetch_add(1, Ordering::Relaxed);
entries.push(TraceEntry {
seq,
timestamp: Utc::now(),
actor: actor.to_string(),
phase: phase.to_string(),
direction,
method: method.to_string(),
content: content.clone(),
});
}
#[must_use]
pub fn snapshot(&self) -> Vec<TraceEntry> {
let entries = self
.entries
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
entries.clone()
}
#[must_use]
pub fn len(&self) -> usize {
let entries = self
.entries
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
entries.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[must_use]
pub fn was_truncated(&self) -> bool {
self.capacity_warned.load(Ordering::Relaxed)
}
}
impl Default for SharedTrace {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for SharedTrace {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SharedTrace")
.field("entries_count", &self.len())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn new_trace_is_empty() {
let trace = SharedTrace::new();
assert!(trace.is_empty());
assert_eq!(trace.len(), 0);
assert!(trace.snapshot().is_empty());
}
#[test]
fn append_increments_length() {
let trace = SharedTrace::new();
trace.append(
"actor1",
"phase1",
Direction::Incoming,
"tools/call",
&serde_json::json!({}),
);
assert_eq!(trace.len(), 1);
assert!(!trace.is_empty());
}
#[test]
fn sequence_numbers_are_monotonic() {
let trace = SharedTrace::new();
for i in 0..5 {
trace.append(
"actor1",
"phase1",
Direction::Incoming,
&format!("method_{i}"),
&serde_json::json!({}),
);
}
let entries = trace.snapshot();
for (i, entry) in entries.iter().enumerate() {
assert_eq!(entry.seq, i as u64);
}
}
#[test]
fn snapshot_is_independent() {
let trace = SharedTrace::new();
trace.append(
"actor1",
"phase1",
Direction::Incoming,
"tools/call",
&serde_json::json!({}),
);
let snap = trace.snapshot();
assert_eq!(snap.len(), 1);
trace.append(
"actor1",
"phase1",
Direction::Outgoing,
"tools/call",
&serde_json::json!({}),
);
assert_eq!(snap.len(), 1);
assert_eq!(trace.len(), 2);
}
#[test]
fn cloned_trace_shares_entries() {
let trace = SharedTrace::new();
let trace2 = trace.clone();
trace.append(
"actor1",
"phase1",
Direction::Incoming,
"tools/call",
&serde_json::json!({}),
);
assert_eq!(trace2.len(), 1);
}
#[test]
fn concurrent_appends() {
let trace = SharedTrace::new();
let handles: Vec<_> = (0..10)
.map(|i| {
let trace = trace.clone();
std::thread::spawn(move || {
for j in 0..10 {
trace.append(
&format!("actor_{i}"),
"phase1",
Direction::Incoming,
&format!("method_{j}"),
&serde_json::json!({}),
);
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
assert_eq!(trace.len(), 100);
let entries = trace.snapshot();
let mut seqs: Vec<u64> = entries.iter().map(|e| e.seq).collect();
seqs.sort_unstable();
seqs.dedup();
assert_eq!(seqs.len(), 100);
}
#[test]
fn trace_entry_fields_captured() {
let trace = SharedTrace::new();
let content = serde_json::json!({"name": "calculator", "arguments": {}});
trace.append(
"mcp_poison",
"trust_building",
Direction::Incoming,
"tools/call",
&content,
);
let entries = trace.snapshot();
let entry = &entries[0];
assert_eq!(entry.actor, "mcp_poison");
assert_eq!(entry.phase, "trust_building");
assert_eq!(entry.direction, Direction::Incoming);
assert_eq!(entry.method, "tools/call");
assert_eq!(entry.content, content);
}
#[test]
fn default_trace_is_empty() {
let trace = SharedTrace::default();
assert!(trace.is_empty());
}
#[test]
fn capacity_limit_drops_entries() {
let trace = SharedTrace::new();
for i in 0..MAX_TRACE_ENTRIES + 100 {
trace.append(
"actor",
"phase",
Direction::Incoming,
&format!("method_{i}"),
&serde_json::json!({}),
);
}
assert_eq!(trace.len(), MAX_TRACE_ENTRIES);
assert!(trace.capacity_warned.load(Ordering::Relaxed));
assert!(trace.was_truncated());
}
#[test]
fn was_truncated_false_when_under_capacity() {
let trace = SharedTrace::new();
trace.append("a", "p", Direction::Incoming, "m", &serde_json::json!({}));
assert!(!trace.was_truncated());
}
mod proptests {
use super::*;
use proptest::prelude::*;
proptest! {
#![proptest_config(ProptestConfig::with_cases(256))]
#[test]
fn prop_trace_seq_monotonic(n in 1..100_usize) {
let trace = SharedTrace::new();
for i in 0..n {
trace.append(
"actor",
"phase",
Direction::Incoming,
&format!("method_{i}"),
&serde_json::json!({}),
);
}
let entries = trace.snapshot();
for window in entries.windows(2) {
prop_assert!(window[0].seq < window[1].seq,
"seq values must be strictly increasing: {} >= {}",
window[0].seq, window[1].seq);
}
}
#[test]
fn prop_trace_all_preserved(n in 0..100_usize) {
let trace = SharedTrace::new();
for i in 0..n {
trace.append(
"actor",
"phase",
Direction::Incoming,
&format!("method_{i}"),
&serde_json::json!({}),
);
}
prop_assert_eq!(trace.len(), n);
prop_assert_eq!(trace.snapshot().len(), n);
}
}
}
}