use std::collections::{BTreeMap, BTreeSet};
use chrono::Utc;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use crate::agent::AgentEvent;
use crate::error::{Error, Result};
pub const SWARM_FLIGHT_RECORDER_EVENT_SCHEMA: &str = "pi.swarm.flight_recorder.event.v1";
pub const SWARM_FLIGHT_RECORDER_REPORT_SCHEMA: &str = "pi.swarm.flight_recorder.report.v1";
pub const SWARM_FLIGHT_RECORDER_REPLAY_SCHEMA: &str = "pi.swarm.flight_recorder.replay.v1";
const REDACTED: &str = "[REDACTED]";
const SENSITIVE_KEY_FRAGMENTS: &[&str] = &[
"api_key",
"authorization",
"bearer",
"content",
"cookie",
"key",
"password",
"prompt",
"secret",
"token",
"transcript",
];
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SwarmFlightRedactionSummary {
pub redacted_fields: u64,
pub redacted_keys: Vec<String>,
}
#[derive(Debug, Default)]
struct RedactionAccumulator {
redacted_fields: u64,
redacted_keys: BTreeSet<String>,
}
impl RedactionAccumulator {
fn finish(self) -> SwarmFlightRedactionSummary {
SwarmFlightRedactionSummary {
redacted_fields: self.redacted_fields,
redacted_keys: self.redacted_keys.into_iter().collect(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SwarmFlightRecorderEvent {
pub schema: String,
pub sequence: u64,
pub correlation_id: String,
pub agent_name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub session_id: Option<String>,
pub component: String,
pub event_kind: String,
pub timestamp_ms: i64,
pub elapsed_ms: u64,
pub payload: Value,
pub redaction: SwarmFlightRedactionSummary,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SwarmFlightLatencyHotspot {
pub component: String,
pub total_ms: u64,
pub samples: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SwarmFlightCoordinationFailure {
pub sequence: u64,
pub agent_name: String,
pub event_kind: String,
pub summary: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SwarmFlightReplayInstructions {
pub schema: String,
pub command: String,
pub requires_live_provider_credentials: bool,
pub artifact_paths: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SwarmFlightRecorderReport {
pub schema: String,
pub correlation_id: String,
pub event_count: u64,
pub agent_count: u64,
pub component_counts: BTreeMap<String, u64>,
pub dominant_latency_components: Vec<SwarmFlightLatencyHotspot>,
pub coordination_failures: Vec<SwarmFlightCoordinationFailure>,
pub replay: SwarmFlightReplayInstructions,
}
#[derive(Debug, Clone)]
pub struct SwarmFlightRecorder {
correlation_id: String,
next_sequence: u64,
events: Vec<SwarmFlightRecorderEvent>,
}
impl SwarmFlightRecorder {
pub fn new(correlation_id: impl Into<String>) -> Result<Self> {
let correlation_id = correlation_id.into();
if correlation_id.trim().is_empty() {
return Err(Error::validation(
"flight recorder correlation_id cannot be empty".to_string(),
));
}
Ok(Self {
correlation_id,
next_sequence: 0,
events: Vec::new(),
})
}
#[must_use]
pub fn events(&self) -> &[SwarmFlightRecorderEvent] {
&self.events
}
pub fn record_agent_event(
&mut self,
agent_name: impl Into<String>,
elapsed_ms: u64,
event: &AgentEvent,
) -> Result<()> {
let payload = serde_json::to_value(event)?;
let event_kind = payload
.get("type")
.and_then(Value::as_str)
.unwrap_or("agent_event")
.to_string();
let session_id = extract_session_id(&payload);
self.record_value(
agent_name, session_id, "agent", event_kind, elapsed_ms, payload,
)
}
pub fn record_session_snapshot(
&mut self,
agent_name: impl Into<String>,
session_id: impl Into<String>,
elapsed_ms: u64,
payload: Value,
) -> Result<()> {
self.record_value(
agent_name,
Some(session_id.into()),
"session",
"session_snapshot",
elapsed_ms,
payload,
)
}
pub fn record_extension_event(
&mut self,
agent_name: impl Into<String>,
session_id: impl Into<String>,
elapsed_ms: u64,
payload: Value,
) -> Result<()> {
self.record_value(
agent_name,
Some(session_id.into()),
"extension",
"extension_hooks",
elapsed_ms,
payload,
)
}
pub fn record_coordination_marker(
&mut self,
agent_name: impl Into<String>,
elapsed_ms: u64,
event_kind: impl Into<String>,
payload: Value,
) -> Result<()> {
self.record_value(
agent_name,
None,
"coordination",
event_kind,
elapsed_ms,
payload,
)
}
fn record_value(
&mut self,
agent_name: impl Into<String>,
session_id: Option<String>,
component: impl Into<String>,
event_kind: impl Into<String>,
elapsed_ms: u64,
payload: Value,
) -> Result<()> {
let agent_name = agent_name.into();
if agent_name.trim().is_empty() {
return Err(Error::validation(
"flight recorder agent_name cannot be empty".to_string(),
));
}
let component = component.into();
if component.trim().is_empty() {
return Err(Error::validation(
"flight recorder component cannot be empty".to_string(),
));
}
let event_kind = event_kind.into();
if event_kind.trim().is_empty() {
return Err(Error::validation(
"flight recorder event_kind cannot be empty".to_string(),
));
}
let (payload, redaction) = redact_payload(payload);
let event = SwarmFlightRecorderEvent {
schema: SWARM_FLIGHT_RECORDER_EVENT_SCHEMA.to_string(),
sequence: self.next_sequence,
correlation_id: self.correlation_id.clone(),
agent_name,
session_id,
component,
event_kind,
timestamp_ms: Utc::now().timestamp_millis(),
elapsed_ms,
payload,
redaction,
};
self.next_sequence = self.next_sequence.saturating_add(1);
self.events.push(event);
Ok(())
}
pub fn to_jsonl(&self) -> Result<String> {
let mut out = String::new();
for event in &self.events {
let line = serde_json::to_string(event)?;
out.push_str(&line);
out.push('\n');
}
Ok(out)
}
pub fn build_report(
&self,
replay_command: impl Into<String>,
artifact_paths: Vec<String>,
) -> SwarmFlightRecorderReport {
let mut agents = BTreeSet::new();
let mut component_counts = BTreeMap::new();
let mut latency_totals = BTreeMap::<String, (u64, u64)>::new();
let mut coordination_failures = Vec::new();
for event in &self.events {
agents.insert(event.agent_name.clone());
*component_counts
.entry(event.component.clone())
.or_insert(0u64) += 1;
collect_latency_components(&event.payload, &mut latency_totals);
if matches!(event.component.as_str(), "coordination") && is_coordination_failure(event)
{
coordination_failures.push(SwarmFlightCoordinationFailure {
sequence: event.sequence,
agent_name: event.agent_name.clone(),
event_kind: event.event_kind.clone(),
summary: coordination_summary(event),
});
}
}
let mut dominant_latency_components = latency_totals
.into_iter()
.map(
|(component, (total_ms, samples))| SwarmFlightLatencyHotspot {
component,
total_ms,
samples,
},
)
.collect::<Vec<_>>();
dominant_latency_components.sort_by(|left, right| {
right
.total_ms
.cmp(&left.total_ms)
.then_with(|| left.component.cmp(&right.component))
});
SwarmFlightRecorderReport {
schema: SWARM_FLIGHT_RECORDER_REPORT_SCHEMA.to_string(),
correlation_id: self.correlation_id.clone(),
event_count: u64::try_from(self.events.len()).unwrap_or(u64::MAX),
agent_count: u64::try_from(agents.len()).unwrap_or(u64::MAX),
component_counts,
dominant_latency_components,
coordination_failures,
replay: SwarmFlightReplayInstructions {
schema: SWARM_FLIGHT_RECORDER_REPLAY_SCHEMA.to_string(),
command: replay_command.into(),
requires_live_provider_credentials: false,
artifact_paths,
},
}
}
}
pub fn validate_swarm_flight_recorder_jsonl(jsonl: &str) -> Result<Vec<SwarmFlightRecorderEvent>> {
let mut events = Vec::new();
for (line_index, line) in jsonl.lines().enumerate() {
if line.trim().is_empty() {
continue;
}
let event: SwarmFlightRecorderEvent = serde_json::from_str(line)?;
if event.schema != SWARM_FLIGHT_RECORDER_EVENT_SCHEMA {
return Err(Error::validation(format!(
"flight recorder line {} has unsupported schema {}",
line_index + 1,
event.schema
)));
}
let expected_sequence = u64::try_from(events.len()).unwrap_or(u64::MAX);
if event.sequence != expected_sequence {
return Err(Error::validation(format!(
"flight recorder line {} has non-monotonic sequence {}",
line_index + 1,
event.sequence
)));
}
if event.correlation_id.trim().is_empty()
|| event.agent_name.trim().is_empty()
|| event.component.trim().is_empty()
|| event.event_kind.trim().is_empty()
{
return Err(Error::validation(format!(
"flight recorder line {} has an empty required field",
line_index + 1
)));
}
events.push(event);
}
if events.is_empty() {
return Err(Error::validation(
"flight recorder JSONL contains no events".to_string(),
));
}
Ok(events)
}
fn redact_payload(payload: Value) -> (Value, SwarmFlightRedactionSummary) {
let mut accumulator = RedactionAccumulator::default();
let redacted = redact_value(payload, None, &mut accumulator);
(redacted, accumulator.finish())
}
fn redact_value(value: Value, key: Option<&str>, accumulator: &mut RedactionAccumulator) -> Value {
if key.is_some_and(is_sensitive_key) {
accumulator.redacted_fields = accumulator.redacted_fields.saturating_add(1);
if let Some(key) = key {
accumulator.redacted_keys.insert(key.to_string());
}
return Value::String(REDACTED.to_string());
}
match value {
Value::Array(values) => Value::Array(
values
.into_iter()
.map(|value| redact_value(value, key, accumulator))
.collect(),
),
Value::Object(map) => {
let redacted = map
.into_iter()
.map(|(key, value)| {
let value = redact_value(value, Some(&key), accumulator);
(key, value)
})
.collect::<Map<_, _>>();
Value::Object(redacted)
}
other => other,
}
}
fn is_sensitive_key(key: &str) -> bool {
let normalized = key.to_ascii_lowercase();
SENSITIVE_KEY_FRAGMENTS
.iter()
.any(|fragment| normalized.contains(fragment))
}
fn extract_session_id(payload: &Value) -> Option<String> {
payload
.get("sessionId")
.or_else(|| payload.get("session_id"))
.and_then(Value::as_str)
.filter(|value| !value.trim().is_empty())
.map(str::to_string)
}
fn collect_latency_components(payload: &Value, totals: &mut BTreeMap<String, (u64, u64)>) {
let Some(latency) = payload.get("latencyBreakdown") else {
return;
};
for key in [
"providerStreaming",
"localTools",
"extensionHostcalls",
"persistence",
] {
let Some(duration_ms) = latency
.get(key)
.and_then(|value| value.get("durationMs"))
.and_then(Value::as_u64)
else {
continue;
};
let entry = totals.entry(key.to_string()).or_insert((0, 0));
entry.0 = entry.0.saturating_add(duration_ms);
entry.1 = entry.1.saturating_add(1);
}
}
fn is_coordination_failure(event: &SwarmFlightRecorderEvent) -> bool {
let kind = event.event_kind.to_ascii_lowercase();
if kind.contains("failure") || kind.contains("degraded") || kind.contains("fallback") {
return true;
}
event
.payload
.get("status")
.and_then(Value::as_str)
.is_some_and(|status| {
let status = status.to_ascii_lowercase();
matches!(status.as_str(), "red" | "error" | "degraded")
})
}
fn coordination_summary(event: &SwarmFlightRecorderEvent) -> String {
event
.payload
.get("summary")
.and_then(Value::as_str)
.or_else(|| event.payload.get("mode").and_then(Value::as_str))
.unwrap_or(event.event_kind.as_str())
.to_string()
}
#[cfg(test)]
mod tests {
use serde_json::json;
use super::*;
#[test]
fn redacts_sensitive_payload_keys_recursively() {
let (redacted, summary) = redact_payload(json!({
"token": "abc",
"nested": { "api_key": "def", "safe": "ok" },
}));
assert_eq!(redacted["token"], REDACTED);
assert_eq!(redacted["nested"]["api_key"], REDACTED);
assert_eq!(redacted["nested"]["safe"], "ok");
assert_eq!(summary.redacted_fields, 2);
assert_eq!(summary.redacted_keys, vec!["api_key", "token"]);
}
#[test]
fn validates_monotonic_jsonl_rows() {
let mut recorder = SwarmFlightRecorder::new("corr-test").expect("recorder");
recorder
.record_coordination_marker(
"agent-a",
0,
"agent_mail_degraded",
json!({"status": "red", "summary": "schema missing"}),
)
.expect("record marker");
let jsonl = recorder.to_jsonl().expect("jsonl");
let rows = validate_swarm_flight_recorder_jsonl(&jsonl).expect("valid jsonl");
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].sequence, 0);
}
}