1use serde::{Deserialize, Serialize};
24
25use crate::model::conversation_packet::{
26 ConversationPacket, ConversationPacketAnalyticsProjection, ConversationPacketLexicalProjection,
27 ConversationPacketSemanticProjection,
28};
29
30pub const PACKET_EQUIVALENCE_AUDIT_ENV: &str = "CASS_INDEXER_PACKET_EQUIVALENCE_AUDIT";
34
35pub fn packet_equivalence_audit_enabled() -> bool {
38 match dotenvy::var(PACKET_EQUIVALENCE_AUDIT_ENV) {
39 Ok(value) => matches!(
40 value.trim().to_ascii_lowercase().as_str(),
41 "1" | "true" | "yes" | "on"
42 ),
43 Err(_) => false,
44 }
45}
46
47#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
51pub struct PacketEquivalenceTolerance {
52 pub allow_redaction_drift: bool,
57}
58
59impl PacketEquivalenceTolerance {
60 pub fn strict() -> Self {
61 Self::default()
62 }
63
64 pub fn allow_redaction() -> Self {
65 Self {
66 allow_redaction_drift: true,
67 }
68 }
69}
70
71#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
74#[serde(rename_all = "snake_case")]
75pub enum PacketProjectionDifference {
76 AnalyticsRoleCounts {
77 a: ConversationPacketAnalyticsProjection,
78 b: ConversationPacketAnalyticsProjection,
79 },
80 LexicalProjection {
81 a: ConversationPacketLexicalProjection,
82 b: ConversationPacketLexicalProjection,
83 },
84 SemanticProjection {
85 a: ConversationPacketSemanticProjection,
86 b: ConversationPacketSemanticProjection,
87 },
88}
89
90#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
92#[serde(rename_all = "snake_case")]
93pub enum PacketHashDifference {
94 SemanticHash { a: String, b: String },
95 MessageHash { a: String, b: String },
96}
97
98#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
103pub struct PacketEquivalenceMismatch {
104 pub version_a: u32,
105 pub version_b: u32,
106 pub projection_differences: Vec<PacketProjectionDifference>,
107 pub hash_differences: Vec<PacketHashDifference>,
108}
109
110impl PacketEquivalenceMismatch {
111 pub fn is_hash_only(&self) -> bool {
117 self.projection_differences.is_empty() && !self.hash_differences.is_empty()
118 }
119}
120
121#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
125#[serde(tag = "outcome", rename_all = "snake_case")]
126pub enum PacketEquivalenceOutcome {
127 Match { semantic_hash: String },
128 Mismatch(PacketEquivalenceMismatch),
129}
130
131impl PacketEquivalenceOutcome {
132 pub fn is_match(&self) -> bool {
133 matches!(self, Self::Match { .. })
134 }
135
136 pub fn is_mismatch(&self) -> bool {
137 matches!(self, Self::Mismatch(_))
138 }
139}
140
141#[derive(Debug, Default, Clone, Copy)]
146pub struct PacketEquivalenceAuditor;
147
148impl PacketEquivalenceAuditor {
149 pub fn new() -> Self {
150 Self
151 }
152
153 pub fn audit_pair(
157 self,
158 a: &ConversationPacket,
159 b: &ConversationPacket,
160 tolerance: &PacketEquivalenceTolerance,
161 ) -> PacketEquivalenceOutcome {
162 let mut projection_differences = Vec::new();
163 if a.projections.analytics != b.projections.analytics {
164 projection_differences.push(PacketProjectionDifference::AnalyticsRoleCounts {
165 a: a.projections.analytics.clone(),
166 b: b.projections.analytics.clone(),
167 });
168 }
169 if a.projections.lexical != b.projections.lexical {
170 projection_differences.push(PacketProjectionDifference::LexicalProjection {
171 a: a.projections.lexical.clone(),
172 b: b.projections.lexical.clone(),
173 });
174 }
175 if a.projections.semantic != b.projections.semantic {
176 projection_differences.push(PacketProjectionDifference::SemanticProjection {
177 a: a.projections.semantic.clone(),
178 b: b.projections.semantic.clone(),
179 });
180 }
181
182 let mut hash_differences = Vec::new();
183 let hashes_match = a.hashes.semantic_hash == b.hashes.semantic_hash
184 && a.hashes.message_hash == b.hashes.message_hash;
185 if !hashes_match && !tolerance.allow_redaction_drift {
186 if a.hashes.semantic_hash != b.hashes.semantic_hash {
187 hash_differences.push(PacketHashDifference::SemanticHash {
188 a: a.hashes.semantic_hash.clone(),
189 b: b.hashes.semantic_hash.clone(),
190 });
191 }
192 if a.hashes.message_hash != b.hashes.message_hash {
193 hash_differences.push(PacketHashDifference::MessageHash {
194 a: a.hashes.message_hash.clone(),
195 b: b.hashes.message_hash.clone(),
196 });
197 }
198 }
199
200 if a.version == b.version
201 && projection_differences.is_empty()
202 && hash_differences.is_empty()
203 {
204 PacketEquivalenceOutcome::Match {
205 semantic_hash: a.hashes.semantic_hash.clone(),
206 }
207 } else {
208 PacketEquivalenceOutcome::Mismatch(PacketEquivalenceMismatch {
209 version_a: a.version,
210 version_b: b.version,
211 projection_differences,
212 hash_differences,
213 })
214 }
215 }
216}
217
218#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
237pub struct PacketSinkMigration {
238 pub sink: &'static str,
240 pub packet_helper: &'static str,
243 pub legacy_fallback: &'static str,
247 pub equivalence_test: &'static str,
250 pub kill_switch_env: Option<&'static str>,
254 pub landed_in_commit: &'static str,
257}
258
259pub const PACKET_SINK_MIGRATIONS: &[PacketSinkMigration] = &[
265 PacketSinkMigration {
266 sink: "lexical",
267 packet_helper: "crate::search::tantivy::TantivyIndex::add_messages_from_packet",
268 legacy_fallback: "crate::search::tantivy::TantivyIndex::add_messages_with_conversation_id",
269 equivalence_test: "crate::search::tantivy::tests::packet_driven_lexical_pipeline_matches_legacy_for_normalized_conv",
270 kill_switch_env: None,
271 landed_in_commit: "19820c7a",
272 },
273 PacketSinkMigration {
274 sink: "analytics",
275 packet_helper: "crate::pages::analytics::Statistics::from_packets",
276 legacy_fallback: "crate::pages::analytics::AnalyticsGenerator::generate_statistics",
277 equivalence_test: "crate::pages::analytics::tests::analytics_statistics_from_packets_matches_sql_for_canonical_corpus",
278 kill_switch_env: None,
279 landed_in_commit: "bae8e341",
280 },
281 PacketSinkMigration {
282 sink: "semantic",
283 packet_helper: "crate::indexer::semantic::semantic_inputs_from_packets",
284 legacy_fallback: "crate::indexer::semantic::packet_embedding_inputs_from_storage",
285 equivalence_test: "crate::indexer::semantic::tests::semantic_inputs_from_packets_matches_storage_replay",
286 kill_switch_env: None,
287 landed_in_commit: "2c8ba03b",
288 },
289];
290
291#[cfg(test)]
292mod tests {
293 use super::*;
294 use crate::connectors::{NormalizedConversation, NormalizedMessage, NormalizedSnippet};
295 use crate::model::conversation_packet::{ConversationPacket, ConversationPacketProvenance};
296 use crate::model::types::{Conversation, Message, MessageRole, Snippet};
297 use serde_json::json;
298 use std::path::PathBuf;
299 use std::sync::{Mutex, MutexGuard, OnceLock};
300
301 fn env_lock() -> MutexGuard<'static, ()> {
302 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
303 LOCK.get_or_init(|| Mutex::new(()))
304 .lock()
305 .unwrap_or_else(|p| p.into_inner())
306 }
307
308 fn raw_conversation() -> NormalizedConversation {
309 NormalizedConversation {
310 agent_slug: "codex".to_string(),
311 external_id: Some("session-audit".to_string()),
312 title: Some("Audit fixture".to_string()),
313 workspace: Some(PathBuf::from("/work/audit")),
314 source_path: PathBuf::from("/work/audit/.codex/session.jsonl"),
315 started_at: Some(1_700_000_000_000),
316 ended_at: Some(1_700_000_010_000),
317 metadata: json!({"model": "gpt-5"}),
318 messages: vec![
319 NormalizedMessage {
320 idx: 0,
321 role: "user".to_string(),
322 author: Some("human".to_string()),
323 created_at: Some(1_700_000_000_000),
324 content: "audit the live persist sink".to_string(),
325 extra: json!({"turn": 1}),
326 snippets: vec![NormalizedSnippet {
327 file_path: Some(PathBuf::from("src/audit.rs")),
328 start_line: Some(1),
329 end_line: Some(1),
330 language: Some("rust".to_string()),
331 snippet_text: Some("// audit".to_string()),
332 }],
333 invocations: Vec::new(),
334 },
335 NormalizedMessage {
336 idx: 1,
337 role: "assistant".to_string(),
338 author: None,
339 created_at: Some(1_700_000_001_000),
340 content: "auditing".to_string(),
341 extra: json!({}),
342 snippets: Vec::new(),
343 invocations: Vec::new(),
344 },
345 ],
346 }
347 }
348
349 fn canonical_conversation() -> Conversation {
350 Conversation {
351 id: Some(7),
352 agent_slug: "codex".to_string(),
353 workspace: Some(PathBuf::from("/work/audit")),
354 external_id: Some("session-audit".to_string()),
355 title: Some("Audit fixture".to_string()),
356 source_path: PathBuf::from("/work/audit/.codex/session.jsonl"),
357 started_at: Some(1_700_000_000_000),
358 ended_at: Some(1_700_000_010_000),
359 approx_tokens: None,
360 metadata_json: json!({"model": "gpt-5"}),
361 source_id: "local".to_string(),
362 origin_host: None,
363 messages: vec![
364 Message {
365 id: Some(70),
366 idx: 0,
367 role: MessageRole::User,
368 author: Some("human".to_string()),
369 created_at: Some(1_700_000_000_000),
370 content: "audit the live persist sink".to_string(),
371 extra_json: json!({"turn": 1}),
372 snippets: vec![Snippet {
373 id: Some(700),
374 file_path: Some(PathBuf::from("src/audit.rs")),
375 start_line: Some(1),
376 end_line: Some(1),
377 language: Some("rust".to_string()),
378 snippet_text: Some("// audit".to_string()),
379 }],
380 },
381 Message {
382 id: Some(71),
383 idx: 1,
384 role: MessageRole::Agent,
385 author: None,
386 created_at: Some(1_700_000_001_000),
387 content: "auditing".to_string(),
388 extra_json: json!({}),
389 snippets: Vec::new(),
390 },
391 ],
392 }
393 }
394
395 #[test]
396 fn raw_and_canonical_packet_audit_matches_when_content_agrees() {
397 let provenance = ConversationPacketProvenance::local();
398 let raw = ConversationPacket::from_normalized_conversation(
399 &raw_conversation(),
400 provenance.clone(),
401 );
402 let canonical =
403 ConversationPacket::from_canonical_replay(&canonical_conversation(), provenance);
404
405 let auditor = PacketEquivalenceAuditor::new();
406 let outcome = auditor.audit_pair(&raw, &canonical, &PacketEquivalenceTolerance::strict());
407 assert!(outcome.is_match(), "expected match, got {outcome:?}");
408 if let PacketEquivalenceOutcome::Match { semantic_hash } = outcome {
409 assert_eq!(semantic_hash, raw.hashes.semantic_hash);
410 assert_eq!(semantic_hash.len(), 64, "blake3 hex digest is 64 chars");
411 }
412 }
413
414 #[test]
415 fn role_count_drift_surfaces_as_analytics_projection_difference() {
416 let provenance = ConversationPacketProvenance::local();
417 let raw = ConversationPacket::from_normalized_conversation(
418 &raw_conversation(),
419 provenance.clone(),
420 );
421
422 let mut canonical_data = canonical_conversation();
423 canonical_data.messages.push(Message {
424 id: Some(72),
425 idx: 2,
426 role: MessageRole::Tool,
427 author: Some("ripgrep".to_string()),
428 created_at: Some(1_700_000_002_000),
429 content: "tool output".to_string(),
430 extra_json: json!({}),
431 snippets: Vec::new(),
432 });
433 let canonical = ConversationPacket::from_canonical_replay(&canonical_data, provenance);
434
435 let auditor = PacketEquivalenceAuditor::new();
436 let outcome = auditor.audit_pair(&raw, &canonical, &PacketEquivalenceTolerance::strict());
437 let PacketEquivalenceOutcome::Mismatch(mismatch) = outcome else {
438 panic!("expected mismatch when role counts diverge");
439 };
440 assert!(
441 mismatch.projection_differences.iter().any(|diff| matches!(
442 diff,
443 PacketProjectionDifference::AnalyticsRoleCounts { a, b }
444 if a.tool_messages == 0 && b.tool_messages == 1
445 )),
446 "expected analytics tool-message drift, got {:?}",
447 mismatch.projection_differences
448 );
449 assert!(
450 !mismatch.is_hash_only(),
451 "projection drift must not be downgraded to hash-only"
452 );
453 }
454
455 #[test]
456 fn redaction_drift_is_excused_only_under_explicit_tolerance() {
457 let provenance = ConversationPacketProvenance::local();
458 let raw = ConversationPacket::from_normalized_conversation(
459 &raw_conversation(),
460 provenance.clone(),
461 );
462 let mut redacted = canonical_conversation();
463 let redacted_text = "█".repeat(raw.payload.messages[0].content.chars().count());
468 debug_assert_eq!(
469 redacted_text.chars().count(),
470 raw.payload.messages[0].content.chars().count()
471 );
472 let want_bytes = raw.payload.messages[0].content.len();
477 let mut bytes = Vec::with_capacity(want_bytes);
478 bytes.resize(want_bytes, b'#');
479 redacted.messages[0].content = String::from_utf8(bytes).unwrap();
480 let canonical = ConversationPacket::from_canonical_replay(&redacted, provenance);
481
482 let auditor = PacketEquivalenceAuditor::new();
483
484 let strict = auditor.audit_pair(&raw, &canonical, &PacketEquivalenceTolerance::strict());
485 let PacketEquivalenceOutcome::Mismatch(mismatch) = strict else {
486 panic!("strict audit should flag content/hash drift");
487 };
488 assert!(
489 mismatch.is_hash_only(),
490 "byte-length-preserving redaction should leave only hash drift, got {:?}",
491 mismatch
492 );
493 assert!(
494 mismatch
495 .hash_differences
496 .iter()
497 .any(|d| matches!(d, PacketHashDifference::SemanticHash { .. }))
498 );
499
500 let tolerant = auditor.audit_pair(
501 &raw,
502 &canonical,
503 &PacketEquivalenceTolerance::allow_redaction(),
504 );
505 assert!(
506 tolerant.is_match(),
507 "redaction-tolerant audit must match when only hashes drift, got {tolerant:?}"
508 );
509 }
510
511 #[test]
512 fn audit_env_gate_is_off_by_default_and_respects_explicit_opt_in() {
513 let _guard = env_lock();
514 let previous = std::env::var(PACKET_EQUIVALENCE_AUDIT_ENV).ok();
515
516 unsafe {
518 std::env::remove_var(PACKET_EQUIVALENCE_AUDIT_ENV);
519 }
520 assert!(
521 !packet_equivalence_audit_enabled(),
522 "audit must default to OFF so production cost stays at zero"
523 );
524
525 for value in ["1", "true", "TRUE", "yes", "on"] {
526 unsafe {
528 std::env::set_var(PACKET_EQUIVALENCE_AUDIT_ENV, value);
529 }
530 assert!(
531 packet_equivalence_audit_enabled(),
532 "value {value:?} should opt into the audit"
533 );
534 }
535
536 for value in ["0", "false", "no", "off", ""] {
537 unsafe {
539 std::env::set_var(PACKET_EQUIVALENCE_AUDIT_ENV, value);
540 }
541 assert!(
542 !packet_equivalence_audit_enabled(),
543 "value {value:?} must NOT opt into the audit"
544 );
545 }
546
547 unsafe {
550 match previous {
551 Some(v) => std::env::set_var(PACKET_EQUIVALENCE_AUDIT_ENV, v),
552 None => std::env::remove_var(PACKET_EQUIVALENCE_AUDIT_ENV),
553 }
554 }
555 }
556
557 #[test]
558 fn audit_outcome_serializes_with_outcome_tag() {
559 let provenance = ConversationPacketProvenance::local();
560 let raw = ConversationPacket::from_normalized_conversation(
561 &raw_conversation(),
562 provenance.clone(),
563 );
564 let canonical =
565 ConversationPacket::from_canonical_replay(&canonical_conversation(), provenance);
566 let outcome = PacketEquivalenceAuditor::new().audit_pair(
567 &raw,
568 &canonical,
569 &PacketEquivalenceTolerance::strict(),
570 );
571 let serialized = serde_json::to_string(&outcome).expect("serialize match outcome");
572 assert!(
573 serialized.contains("\"outcome\":\"match\""),
574 "match outcome should serialize with snake_case `outcome` tag, got {serialized}"
575 );
576 assert!(serialized.contains("\"semantic_hash\""));
577 }
578
579 #[test]
587 fn packet_sink_migration_catalog_documents_every_consumer_sink() {
588 let sinks: Vec<&str> = PACKET_SINK_MIGRATIONS
594 .iter()
595 .map(|migration| migration.sink)
596 .collect();
597 assert!(
598 sinks.contains(&"lexical"),
599 "catalog must list the lexical sink"
600 );
601 assert!(
602 sinks.contains(&"analytics"),
603 "catalog must list the analytics sink"
604 );
605 assert!(
606 sinks.contains(&"semantic"),
607 "catalog must list the semantic sink"
608 );
609
610 for migration in PACKET_SINK_MIGRATIONS {
611 assert!(!migration.sink.is_empty(), "sink id must be non-empty");
612 assert!(
613 migration.packet_helper.starts_with("crate::"),
614 "packet_helper must be fully qualified, got {:?}",
615 migration.packet_helper
616 );
617 assert!(
618 migration.legacy_fallback.starts_with("crate::"),
619 "legacy_fallback must be fully qualified, got {:?}",
620 migration.legacy_fallback
621 );
622 assert!(
623 migration.equivalence_test.starts_with("crate::"),
624 "equivalence_test must be fully qualified, got {:?}",
625 migration.equivalence_test
626 );
627 assert!(
628 !migration.landed_in_commit.is_empty(),
629 "landed_in_commit must reference the migration commit"
630 );
631 assert!(
632 migration.landed_in_commit.len() >= 7
633 && migration
634 .landed_in_commit
635 .chars()
636 .all(|c| c.is_ascii_hexdigit()),
637 "landed_in_commit must look like a git short-hash, got {:?}",
638 migration.landed_in_commit
639 );
640 }
641 }
642
643 #[test]
648 fn packet_sink_migration_catalog_serializes_as_json_array() {
649 let json = serde_json::to_string(PACKET_SINK_MIGRATIONS)
650 .expect("PACKET_SINK_MIGRATIONS must serialize");
651 assert!(json.contains("\"sink\":\"lexical\""));
653 assert!(json.contains("add_messages_from_packet"));
654 assert!(
655 json.contains("\"landed_in_commit\":\"19820c7a\""),
656 "lexical entry must reference its landing commit, got {json}"
657 );
658 let parsed: serde_json::Value =
665 serde_json::from_str(&json).expect("catalog must parse as JSON");
666 let arr = parsed.as_array().expect("catalog serializes as array");
667 assert_eq!(arr.len(), PACKET_SINK_MIGRATIONS.len());
668 for (entry, migration) in arr.iter().zip(PACKET_SINK_MIGRATIONS.iter()) {
669 let obj = entry.as_object().expect("each catalog entry is an object");
670 assert_eq!(
671 obj.get("sink").and_then(|v| v.as_str()),
672 Some(migration.sink),
673 "sink field must round-trip"
674 );
675 assert!(obj.contains_key("packet_helper"));
676 assert!(obj.contains_key("legacy_fallback"));
677 assert!(obj.contains_key("equivalence_test"));
678 assert!(obj.contains_key("kill_switch_env"));
679 assert!(obj.contains_key("landed_in_commit"));
680 }
681 }
682}