use serde::{Deserialize, Serialize};
use crate::model::conversation_packet::{
ConversationPacket, ConversationPacketAnalyticsProjection, ConversationPacketLexicalProjection,
ConversationPacketSemanticProjection,
};
pub const PACKET_EQUIVALENCE_AUDIT_ENV: &str = "CASS_INDEXER_PACKET_EQUIVALENCE_AUDIT";
pub fn packet_equivalence_audit_enabled() -> bool {
match dotenvy::var(PACKET_EQUIVALENCE_AUDIT_ENV) {
Ok(value) => matches!(
value.trim().to_ascii_lowercase().as_str(),
"1" | "true" | "yes" | "on"
),
Err(_) => false,
}
}
#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PacketEquivalenceTolerance {
pub allow_redaction_drift: bool,
}
impl PacketEquivalenceTolerance {
pub fn strict() -> Self {
Self::default()
}
pub fn allow_redaction() -> Self {
Self {
allow_redaction_drift: true,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PacketProjectionDifference {
AnalyticsRoleCounts {
a: ConversationPacketAnalyticsProjection,
b: ConversationPacketAnalyticsProjection,
},
LexicalProjection {
a: ConversationPacketLexicalProjection,
b: ConversationPacketLexicalProjection,
},
SemanticProjection {
a: ConversationPacketSemanticProjection,
b: ConversationPacketSemanticProjection,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PacketHashDifference {
SemanticHash { a: String, b: String },
MessageHash { a: String, b: String },
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PacketEquivalenceMismatch {
pub version_a: u32,
pub version_b: u32,
pub projection_differences: Vec<PacketProjectionDifference>,
pub hash_differences: Vec<PacketHashDifference>,
}
impl PacketEquivalenceMismatch {
pub fn is_hash_only(&self) -> bool {
self.projection_differences.is_empty() && !self.hash_differences.is_empty()
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "outcome", rename_all = "snake_case")]
pub enum PacketEquivalenceOutcome {
Match { semantic_hash: String },
Mismatch(PacketEquivalenceMismatch),
}
impl PacketEquivalenceOutcome {
pub fn is_match(&self) -> bool {
matches!(self, Self::Match { .. })
}
pub fn is_mismatch(&self) -> bool {
matches!(self, Self::Mismatch(_))
}
}
#[derive(Debug, Default, Clone, Copy)]
pub struct PacketEquivalenceAuditor;
impl PacketEquivalenceAuditor {
pub fn new() -> Self {
Self
}
pub fn audit_pair(
self,
a: &ConversationPacket,
b: &ConversationPacket,
tolerance: &PacketEquivalenceTolerance,
) -> PacketEquivalenceOutcome {
let mut projection_differences = Vec::new();
if a.projections.analytics != b.projections.analytics {
projection_differences.push(PacketProjectionDifference::AnalyticsRoleCounts {
a: a.projections.analytics.clone(),
b: b.projections.analytics.clone(),
});
}
if a.projections.lexical != b.projections.lexical {
projection_differences.push(PacketProjectionDifference::LexicalProjection {
a: a.projections.lexical.clone(),
b: b.projections.lexical.clone(),
});
}
if a.projections.semantic != b.projections.semantic {
projection_differences.push(PacketProjectionDifference::SemanticProjection {
a: a.projections.semantic.clone(),
b: b.projections.semantic.clone(),
});
}
let mut hash_differences = Vec::new();
let hashes_match = a.hashes.semantic_hash == b.hashes.semantic_hash
&& a.hashes.message_hash == b.hashes.message_hash;
if !hashes_match && !tolerance.allow_redaction_drift {
if a.hashes.semantic_hash != b.hashes.semantic_hash {
hash_differences.push(PacketHashDifference::SemanticHash {
a: a.hashes.semantic_hash.clone(),
b: b.hashes.semantic_hash.clone(),
});
}
if a.hashes.message_hash != b.hashes.message_hash {
hash_differences.push(PacketHashDifference::MessageHash {
a: a.hashes.message_hash.clone(),
b: b.hashes.message_hash.clone(),
});
}
}
if a.version == b.version
&& projection_differences.is_empty()
&& hash_differences.is_empty()
{
PacketEquivalenceOutcome::Match {
semantic_hash: a.hashes.semantic_hash.clone(),
}
} else {
PacketEquivalenceOutcome::Mismatch(PacketEquivalenceMismatch {
version_a: a.version,
version_b: b.version,
projection_differences,
hash_differences,
})
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PacketSinkMigration {
pub sink: &'static str,
pub packet_helper: &'static str,
pub legacy_fallback: &'static str,
pub equivalence_test: &'static str,
pub kill_switch_env: Option<&'static str>,
pub landed_in_commit: &'static str,
}
pub const PACKET_SINK_MIGRATIONS: &[PacketSinkMigration] = &[
PacketSinkMigration {
sink: "lexical",
packet_helper: "crate::search::tantivy::TantivyIndex::add_messages_from_packet",
legacy_fallback: "crate::search::tantivy::TantivyIndex::add_messages_with_conversation_id",
equivalence_test: "crate::search::tantivy::tests::packet_driven_lexical_pipeline_matches_legacy_for_normalized_conv",
kill_switch_env: None,
landed_in_commit: "19820c7a",
},
PacketSinkMigration {
sink: "analytics",
packet_helper: "crate::pages::analytics::Statistics::from_packets",
legacy_fallback: "crate::pages::analytics::AnalyticsGenerator::generate_statistics",
equivalence_test: "crate::pages::analytics::tests::analytics_statistics_from_packets_matches_sql_for_canonical_corpus",
kill_switch_env: None,
landed_in_commit: "bae8e341",
},
PacketSinkMigration {
sink: "semantic",
packet_helper: "crate::indexer::semantic::semantic_inputs_from_packets",
legacy_fallback: "crate::indexer::semantic::packet_embedding_inputs_from_storage",
equivalence_test: "crate::indexer::semantic::tests::semantic_inputs_from_packets_matches_storage_replay",
kill_switch_env: None,
landed_in_commit: "2c8ba03b",
},
];
#[cfg(test)]
mod tests {
use super::*;
use crate::connectors::{NormalizedConversation, NormalizedMessage, NormalizedSnippet};
use crate::model::conversation_packet::{ConversationPacket, ConversationPacketProvenance};
use crate::model::types::{Conversation, Message, MessageRole, Snippet};
use serde_json::json;
use std::path::PathBuf;
use std::sync::{Mutex, MutexGuard, OnceLock};
fn env_lock() -> MutexGuard<'static, ()> {
static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
LOCK.get_or_init(|| Mutex::new(()))
.lock()
.unwrap_or_else(|p| p.into_inner())
}
fn raw_conversation() -> NormalizedConversation {
NormalizedConversation {
agent_slug: "codex".to_string(),
external_id: Some("session-audit".to_string()),
title: Some("Audit fixture".to_string()),
workspace: Some(PathBuf::from("/work/audit")),
source_path: PathBuf::from("/work/audit/.codex/session.jsonl"),
started_at: Some(1_700_000_000_000),
ended_at: Some(1_700_000_010_000),
metadata: json!({"model": "gpt-5"}),
messages: vec![
NormalizedMessage {
idx: 0,
role: "user".to_string(),
author: Some("human".to_string()),
created_at: Some(1_700_000_000_000),
content: "audit the live persist sink".to_string(),
extra: json!({"turn": 1}),
snippets: vec![NormalizedSnippet {
file_path: Some(PathBuf::from("src/audit.rs")),
start_line: Some(1),
end_line: Some(1),
language: Some("rust".to_string()),
snippet_text: Some("// audit".to_string()),
}],
invocations: Vec::new(),
},
NormalizedMessage {
idx: 1,
role: "assistant".to_string(),
author: None,
created_at: Some(1_700_000_001_000),
content: "auditing".to_string(),
extra: json!({}),
snippets: Vec::new(),
invocations: Vec::new(),
},
],
}
}
fn canonical_conversation() -> Conversation {
Conversation {
id: Some(7),
agent_slug: "codex".to_string(),
workspace: Some(PathBuf::from("/work/audit")),
external_id: Some("session-audit".to_string()),
title: Some("Audit fixture".to_string()),
source_path: PathBuf::from("/work/audit/.codex/session.jsonl"),
started_at: Some(1_700_000_000_000),
ended_at: Some(1_700_000_010_000),
approx_tokens: None,
metadata_json: json!({"model": "gpt-5"}),
source_id: "local".to_string(),
origin_host: None,
messages: vec![
Message {
id: Some(70),
idx: 0,
role: MessageRole::User,
author: Some("human".to_string()),
created_at: Some(1_700_000_000_000),
content: "audit the live persist sink".to_string(),
extra_json: json!({"turn": 1}),
snippets: vec![Snippet {
id: Some(700),
file_path: Some(PathBuf::from("src/audit.rs")),
start_line: Some(1),
end_line: Some(1),
language: Some("rust".to_string()),
snippet_text: Some("// audit".to_string()),
}],
},
Message {
id: Some(71),
idx: 1,
role: MessageRole::Agent,
author: None,
created_at: Some(1_700_000_001_000),
content: "auditing".to_string(),
extra_json: json!({}),
snippets: Vec::new(),
},
],
}
}
#[test]
fn raw_and_canonical_packet_audit_matches_when_content_agrees() {
let provenance = ConversationPacketProvenance::local();
let raw = ConversationPacket::from_normalized_conversation(
&raw_conversation(),
provenance.clone(),
);
let canonical =
ConversationPacket::from_canonical_replay(&canonical_conversation(), provenance);
let auditor = PacketEquivalenceAuditor::new();
let outcome = auditor.audit_pair(&raw, &canonical, &PacketEquivalenceTolerance::strict());
assert!(outcome.is_match(), "expected match, got {outcome:?}");
if let PacketEquivalenceOutcome::Match { semantic_hash } = outcome {
assert_eq!(semantic_hash, raw.hashes.semantic_hash);
assert_eq!(semantic_hash.len(), 64, "blake3 hex digest is 64 chars");
}
}
#[test]
fn role_count_drift_surfaces_as_analytics_projection_difference() {
let provenance = ConversationPacketProvenance::local();
let raw = ConversationPacket::from_normalized_conversation(
&raw_conversation(),
provenance.clone(),
);
let mut canonical_data = canonical_conversation();
canonical_data.messages.push(Message {
id: Some(72),
idx: 2,
role: MessageRole::Tool,
author: Some("ripgrep".to_string()),
created_at: Some(1_700_000_002_000),
content: "tool output".to_string(),
extra_json: json!({}),
snippets: Vec::new(),
});
let canonical = ConversationPacket::from_canonical_replay(&canonical_data, provenance);
let auditor = PacketEquivalenceAuditor::new();
let outcome = auditor.audit_pair(&raw, &canonical, &PacketEquivalenceTolerance::strict());
let PacketEquivalenceOutcome::Mismatch(mismatch) = outcome else {
panic!("expected mismatch when role counts diverge");
};
assert!(
mismatch.projection_differences.iter().any(|diff| matches!(
diff,
PacketProjectionDifference::AnalyticsRoleCounts { a, b }
if a.tool_messages == 0 && b.tool_messages == 1
)),
"expected analytics tool-message drift, got {:?}",
mismatch.projection_differences
);
assert!(
!mismatch.is_hash_only(),
"projection drift must not be downgraded to hash-only"
);
}
#[test]
fn redaction_drift_is_excused_only_under_explicit_tolerance() {
let provenance = ConversationPacketProvenance::local();
let raw = ConversationPacket::from_normalized_conversation(
&raw_conversation(),
provenance.clone(),
);
let mut redacted = canonical_conversation();
let redacted_text = "█".repeat(raw.payload.messages[0].content.chars().count());
debug_assert_eq!(
redacted_text.chars().count(),
raw.payload.messages[0].content.chars().count()
);
let want_bytes = raw.payload.messages[0].content.len();
let mut bytes = Vec::with_capacity(want_bytes);
bytes.resize(want_bytes, b'#');
redacted.messages[0].content = String::from_utf8(bytes).unwrap();
let canonical = ConversationPacket::from_canonical_replay(&redacted, provenance);
let auditor = PacketEquivalenceAuditor::new();
let strict = auditor.audit_pair(&raw, &canonical, &PacketEquivalenceTolerance::strict());
let PacketEquivalenceOutcome::Mismatch(mismatch) = strict else {
panic!("strict audit should flag content/hash drift");
};
assert!(
mismatch.is_hash_only(),
"byte-length-preserving redaction should leave only hash drift, got {:?}",
mismatch
);
assert!(
mismatch
.hash_differences
.iter()
.any(|d| matches!(d, PacketHashDifference::SemanticHash { .. }))
);
let tolerant = auditor.audit_pair(
&raw,
&canonical,
&PacketEquivalenceTolerance::allow_redaction(),
);
assert!(
tolerant.is_match(),
"redaction-tolerant audit must match when only hashes drift, got {tolerant:?}"
);
}
#[test]
fn audit_env_gate_is_off_by_default_and_respects_explicit_opt_in() {
let _guard = env_lock();
let previous = std::env::var(PACKET_EQUIVALENCE_AUDIT_ENV).ok();
unsafe {
std::env::remove_var(PACKET_EQUIVALENCE_AUDIT_ENV);
}
assert!(
!packet_equivalence_audit_enabled(),
"audit must default to OFF so production cost stays at zero"
);
for value in ["1", "true", "TRUE", "yes", "on"] {
unsafe {
std::env::set_var(PACKET_EQUIVALENCE_AUDIT_ENV, value);
}
assert!(
packet_equivalence_audit_enabled(),
"value {value:?} should opt into the audit"
);
}
for value in ["0", "false", "no", "off", ""] {
unsafe {
std::env::set_var(PACKET_EQUIVALENCE_AUDIT_ENV, value);
}
assert!(
!packet_equivalence_audit_enabled(),
"value {value:?} must NOT opt into the audit"
);
}
unsafe {
match previous {
Some(v) => std::env::set_var(PACKET_EQUIVALENCE_AUDIT_ENV, v),
None => std::env::remove_var(PACKET_EQUIVALENCE_AUDIT_ENV),
}
}
}
#[test]
fn audit_outcome_serializes_with_outcome_tag() {
let provenance = ConversationPacketProvenance::local();
let raw = ConversationPacket::from_normalized_conversation(
&raw_conversation(),
provenance.clone(),
);
let canonical =
ConversationPacket::from_canonical_replay(&canonical_conversation(), provenance);
let outcome = PacketEquivalenceAuditor::new().audit_pair(
&raw,
&canonical,
&PacketEquivalenceTolerance::strict(),
);
let serialized = serde_json::to_string(&outcome).expect("serialize match outcome");
assert!(
serialized.contains("\"outcome\":\"match\""),
"match outcome should serialize with snake_case `outcome` tag, got {serialized}"
);
assert!(serialized.contains("\"semantic_hash\""));
}
#[test]
fn packet_sink_migration_catalog_documents_every_consumer_sink() {
let sinks: Vec<&str> = PACKET_SINK_MIGRATIONS
.iter()
.map(|migration| migration.sink)
.collect();
assert!(
sinks.contains(&"lexical"),
"catalog must list the lexical sink"
);
assert!(
sinks.contains(&"analytics"),
"catalog must list the analytics sink"
);
assert!(
sinks.contains(&"semantic"),
"catalog must list the semantic sink"
);
for migration in PACKET_SINK_MIGRATIONS {
assert!(!migration.sink.is_empty(), "sink id must be non-empty");
assert!(
migration.packet_helper.starts_with("crate::"),
"packet_helper must be fully qualified, got {:?}",
migration.packet_helper
);
assert!(
migration.legacy_fallback.starts_with("crate::"),
"legacy_fallback must be fully qualified, got {:?}",
migration.legacy_fallback
);
assert!(
migration.equivalence_test.starts_with("crate::"),
"equivalence_test must be fully qualified, got {:?}",
migration.equivalence_test
);
assert!(
!migration.landed_in_commit.is_empty(),
"landed_in_commit must reference the migration commit"
);
assert!(
migration.landed_in_commit.len() >= 7
&& migration
.landed_in_commit
.chars()
.all(|c| c.is_ascii_hexdigit()),
"landed_in_commit must look like a git short-hash, got {:?}",
migration.landed_in_commit
);
}
}
#[test]
fn packet_sink_migration_catalog_serializes_as_json_array() {
let json = serde_json::to_string(PACKET_SINK_MIGRATIONS)
.expect("PACKET_SINK_MIGRATIONS must serialize");
assert!(json.contains("\"sink\":\"lexical\""));
assert!(json.contains("add_messages_from_packet"));
assert!(
json.contains("\"landed_in_commit\":\"19820c7a\""),
"lexical entry must reference its landing commit, got {json}"
);
let parsed: serde_json::Value =
serde_json::from_str(&json).expect("catalog must parse as JSON");
let arr = parsed.as_array().expect("catalog serializes as array");
assert_eq!(arr.len(), PACKET_SINK_MIGRATIONS.len());
for (entry, migration) in arr.iter().zip(PACKET_SINK_MIGRATIONS.iter()) {
let obj = entry.as_object().expect("each catalog entry is an object");
assert_eq!(
obj.get("sink").and_then(|v| v.as_str()),
Some(migration.sink),
"sink field must round-trip"
);
assert!(obj.contains_key("packet_helper"));
assert!(obj.contains_key("legacy_fallback"));
assert!(obj.contains_key("equivalence_test"));
assert!(obj.contains_key("kill_switch_env"));
assert!(obj.contains_key("landed_in_commit"));
}
}
}