Skip to main content

coding_agent_search/model/
packet_audit.rs

1//! ConversationPacket equivalence audit (bead `coding_agent_session_search-ibuuh.32`).
2//!
3//! The packet contract promises that the live persist path and the rebuild
4//! path can both feed downstream sinks from the same canonical projections.
5//! Until every sink consumes the projections directly, we want a non-invasive
6//! way to *prove* the canonical persist sink is producing packet-equivalent
7//! output for what the source-of-truth raw scan would have produced. This
8//! module is that compare-mode hook.
9//!
10//! Two packets compared by [`PacketEquivalenceAuditor::audit_pair`] are
11//! considered equivalent when their projections agree byte-for-byte and
12//! their hashes either match or are explicitly excused by a documented
13//! tolerance (e.g. secret redaction is enabled, so the canonical content
14//! string differs from the raw content string and `semantic_hash` is
15//! expected to drift while `analytics`/`lexical`/`semantic` projections
16//! are still required to match).
17//!
18//! The audit is intentionally pure: it consumes already-built packets and
19//! returns a structured outcome. Callers wire the env-gated kill-switch
20//! (`CASS_INDEXER_PACKET_EQUIVALENCE_AUDIT`) at their site so this module
21//! stays cheap to import and trivially testable.
22
23use serde::{Deserialize, Serialize};
24
25use crate::model::conversation_packet::{
26    ConversationPacket, ConversationPacketAnalyticsProjection, ConversationPacketLexicalProjection,
27    ConversationPacketSemanticProjection,
28};
29
30/// Env knob (1/true/yes ⇒ enabled) that opts the live persist path into
31/// emitting compare-mode audit records. Default is off so production cost
32/// stays at zero.
33pub const PACKET_EQUIVALENCE_AUDIT_ENV: &str = "CASS_INDEXER_PACKET_EQUIVALENCE_AUDIT";
34
35/// Returns `true` when the env knob explicitly opts in. Anything else
36/// (unset, "0", "false", "no", "off") leaves the audit disabled.
37pub fn packet_equivalence_audit_enabled() -> bool {
38    match dotenvy::var(PACKET_EQUIVALENCE_AUDIT_ENV) {
39        Ok(value) => matches!(
40            value.trim().to_ascii_lowercase().as_str(),
41            "1" | "true" | "yes" | "on"
42        ),
43        Err(_) => false,
44    }
45}
46
47/// Tolerances applied while comparing packets. Each field documents *why*
48/// a category of drift is acceptable, so future agents can decide whether
49/// a hit was a real bug or a documented exemption.
50#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
51pub struct PacketEquivalenceTolerance {
52    /// When `true`, content drift driven by secret redaction (canonical
53    /// persistence applies `redact_text`/`redact_json` while raw scans
54    /// don't) is permitted, so `semantic_hash` and `message_hash` may
55    /// differ while projections must still match.
56    pub allow_redaction_drift: bool,
57}
58
59impl PacketEquivalenceTolerance {
60    pub fn strict() -> Self {
61        Self::default()
62    }
63
64    pub fn allow_redaction() -> Self {
65        Self {
66            allow_redaction_drift: true,
67        }
68    }
69}
70
71/// Distinct projections that can disagree between two packets. Carrying
72/// the variant explicitly keeps audit logs grep-friendly.
73#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
74#[serde(rename_all = "snake_case")]
75pub enum PacketProjectionDifference {
76    AnalyticsRoleCounts {
77        a: ConversationPacketAnalyticsProjection,
78        b: ConversationPacketAnalyticsProjection,
79    },
80    LexicalProjection {
81        a: ConversationPacketLexicalProjection,
82        b: ConversationPacketLexicalProjection,
83    },
84    SemanticProjection {
85        a: ConversationPacketSemanticProjection,
86        b: ConversationPacketSemanticProjection,
87    },
88}
89
90/// Distinct hash classes that can disagree.
91#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
92#[serde(rename_all = "snake_case")]
93pub enum PacketHashDifference {
94    SemanticHash { a: String, b: String },
95    MessageHash { a: String, b: String },
96}
97
98/// Why two packets did not match. Multiple categories may fire from a
99/// single audit (e.g. content drift changes both hashes *and* analytics
100/// counts), so we ship a vector of structured items rather than a single
101/// catch-all string.
102#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
103pub struct PacketEquivalenceMismatch {
104    pub version_a: u32,
105    pub version_b: u32,
106    pub projection_differences: Vec<PacketProjectionDifference>,
107    pub hash_differences: Vec<PacketHashDifference>,
108}
109
110impl PacketEquivalenceMismatch {
111    /// True when the only disagreements are hash-level (i.e. content
112    /// mutated but every byte-budget projection still agrees). This is
113    /// the shape we expect under `allow_redaction_drift` and helps
114    /// callers downgrade those cases to debug-level logs while real
115    /// projection drift escalates to warn.
116    pub fn is_hash_only(&self) -> bool {
117        self.projection_differences.is_empty() && !self.hash_differences.is_empty()
118    }
119}
120
121/// Result of an equivalence audit. The `Match` variant carries the
122/// agreed semantic hash so downstream callers can fingerprint the audited
123/// pair in their own logs/ledgers without re-computing it.
124#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
125#[serde(tag = "outcome", rename_all = "snake_case")]
126pub enum PacketEquivalenceOutcome {
127    Match { semantic_hash: String },
128    Mismatch(PacketEquivalenceMismatch),
129}
130
131impl PacketEquivalenceOutcome {
132    pub fn is_match(&self) -> bool {
133        matches!(self, Self::Match { .. })
134    }
135
136    pub fn is_mismatch(&self) -> bool {
137        matches!(self, Self::Mismatch(_))
138    }
139}
140
141/// Runs equivalence audits between pairs of packets. The auditor itself
142/// is stateless; tolerances are provided per-call so a single instance
143/// can serve both strict (rebuild-path) and redaction-aware (live-persist
144/// path) call sites.
145#[derive(Debug, Default, Clone, Copy)]
146pub struct PacketEquivalenceAuditor;
147
148impl PacketEquivalenceAuditor {
149    pub fn new() -> Self {
150        Self
151    }
152
153    /// Compare two packets under the supplied tolerance. The `a`/`b`
154    /// labelling is symmetric — swapping arguments returns the same
155    /// classification with `a`/`b` swapped inside differences.
156    pub fn audit_pair(
157        self,
158        a: &ConversationPacket,
159        b: &ConversationPacket,
160        tolerance: &PacketEquivalenceTolerance,
161    ) -> PacketEquivalenceOutcome {
162        let mut projection_differences = Vec::new();
163        if a.projections.analytics != b.projections.analytics {
164            projection_differences.push(PacketProjectionDifference::AnalyticsRoleCounts {
165                a: a.projections.analytics.clone(),
166                b: b.projections.analytics.clone(),
167            });
168        }
169        if a.projections.lexical != b.projections.lexical {
170            projection_differences.push(PacketProjectionDifference::LexicalProjection {
171                a: a.projections.lexical.clone(),
172                b: b.projections.lexical.clone(),
173            });
174        }
175        if a.projections.semantic != b.projections.semantic {
176            projection_differences.push(PacketProjectionDifference::SemanticProjection {
177                a: a.projections.semantic.clone(),
178                b: b.projections.semantic.clone(),
179            });
180        }
181
182        let mut hash_differences = Vec::new();
183        let hashes_match = a.hashes.semantic_hash == b.hashes.semantic_hash
184            && a.hashes.message_hash == b.hashes.message_hash;
185        if !hashes_match && !tolerance.allow_redaction_drift {
186            if a.hashes.semantic_hash != b.hashes.semantic_hash {
187                hash_differences.push(PacketHashDifference::SemanticHash {
188                    a: a.hashes.semantic_hash.clone(),
189                    b: b.hashes.semantic_hash.clone(),
190                });
191            }
192            if a.hashes.message_hash != b.hashes.message_hash {
193                hash_differences.push(PacketHashDifference::MessageHash {
194                    a: a.hashes.message_hash.clone(),
195                    b: b.hashes.message_hash.clone(),
196                });
197            }
198        }
199
200        if a.version == b.version
201            && projection_differences.is_empty()
202            && hash_differences.is_empty()
203        {
204            PacketEquivalenceOutcome::Match {
205                semantic_hash: a.hashes.semantic_hash.clone(),
206            }
207        } else {
208            PacketEquivalenceOutcome::Mismatch(PacketEquivalenceMismatch {
209                version_a: a.version,
210                version_b: b.version,
211                projection_differences,
212                hash_differences,
213            })
214        }
215    }
216}
217
218/// Packet-driven sink registry: enumerates the consumer sinks the
219/// `coding_agent_session_search-ibuuh.32` migration covers, the
220/// packet-driven helper each one ships, the legacy fallback function
221/// that remains as the demotion path, the byte-equivalence test that
222/// pins the two paths agree, and the env knob (if any) that opts in
223/// to the compare-mode audit.
224///
225/// This struct is the operator-facing answer to the bead acceptance
226/// language: "explicit observability showing which paths are packet-
227/// driven versus legacy, and a temporary shadow or compare mode plus
228/// explicit kill-switch or demotion path so divergence can be
229/// diagnosed without trapping users on a broken path."
230///
231/// Future migrations should append a [`PacketSinkMigration`] entry
232/// here and update the equivalence test name; CI tooling (or future
233/// `cass doctor --packet-equivalence` slices) can serialize this
234/// list to surface the kill-switch catalog without grepping for
235/// helpers across the codebase.
236#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
237pub struct PacketSinkMigration {
238    /// Stable sink identifier ("lexical", "analytics", "semantic").
239    pub sink: &'static str,
240    /// Fully-qualified path to the packet-driven helper (free
241    /// function or method) callers should use.
242    pub packet_helper: &'static str,
243    /// Fully-qualified path to the legacy non-packet path that
244    /// remains in the codebase as the demotion fallback. Removing
245    /// this without an equivalence-proven replacement is off-contract.
246    pub legacy_fallback: &'static str,
247    /// Test name (under `cargo test`) that pins byte-for-byte
248    /// equivalence between the two paths.
249    pub equivalence_test: &'static str,
250    /// Env knob that opts callers into shadow-compare audit when
251    /// applicable, or `None` if the helper is direct-replacement and
252    /// the equivalence test is the only gate.
253    pub kill_switch_env: Option<&'static str>,
254    /// Commit hash at which the packet helper landed (so an operator
255    /// debugging a regression can `git show` the migration directly).
256    pub landed_in_commit: &'static str,
257}
258
259/// Catalog of consumer sink migrations completed under
260/// `coding_agent_session_search-ibuuh.32`. Iterating this slice gives
261/// operators a single source of truth for "which derivative sinks
262/// have a packet-driven path today, where to find each helper, how
263/// to roll back, and where the equivalence proof lives."
264pub const PACKET_SINK_MIGRATIONS: &[PacketSinkMigration] = &[
265    PacketSinkMigration {
266        sink: "lexical",
267        packet_helper: "crate::search::tantivy::TantivyIndex::add_messages_from_packet",
268        legacy_fallback: "crate::search::tantivy::TantivyIndex::add_messages_with_conversation_id",
269        equivalence_test: "crate::search::tantivy::tests::packet_driven_lexical_pipeline_matches_legacy_for_normalized_conv",
270        kill_switch_env: None,
271        landed_in_commit: "19820c7a",
272    },
273    PacketSinkMigration {
274        sink: "analytics",
275        packet_helper: "crate::pages::analytics::Statistics::from_packets",
276        legacy_fallback: "crate::pages::analytics::AnalyticsGenerator::generate_statistics",
277        equivalence_test: "crate::pages::analytics::tests::analytics_statistics_from_packets_matches_sql_for_canonical_corpus",
278        kill_switch_env: None,
279        landed_in_commit: "bae8e341",
280    },
281    PacketSinkMigration {
282        sink: "semantic",
283        packet_helper: "crate::indexer::semantic::semantic_inputs_from_packets",
284        legacy_fallback: "crate::indexer::semantic::packet_embedding_inputs_from_storage",
285        equivalence_test: "crate::indexer::semantic::tests::semantic_inputs_from_packets_matches_storage_replay",
286        kill_switch_env: None,
287        landed_in_commit: "2c8ba03b",
288    },
289];
290
291#[cfg(test)]
292mod tests {
293    use super::*;
294    use crate::connectors::{NormalizedConversation, NormalizedMessage, NormalizedSnippet};
295    use crate::model::conversation_packet::{ConversationPacket, ConversationPacketProvenance};
296    use crate::model::types::{Conversation, Message, MessageRole, Snippet};
297    use serde_json::json;
298    use std::path::PathBuf;
299    use std::sync::{Mutex, MutexGuard, OnceLock};
300
301    fn env_lock() -> MutexGuard<'static, ()> {
302        static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
303        LOCK.get_or_init(|| Mutex::new(()))
304            .lock()
305            .unwrap_or_else(|p| p.into_inner())
306    }
307
308    fn raw_conversation() -> NormalizedConversation {
309        NormalizedConversation {
310            agent_slug: "codex".to_string(),
311            external_id: Some("session-audit".to_string()),
312            title: Some("Audit fixture".to_string()),
313            workspace: Some(PathBuf::from("/work/audit")),
314            source_path: PathBuf::from("/work/audit/.codex/session.jsonl"),
315            started_at: Some(1_700_000_000_000),
316            ended_at: Some(1_700_000_010_000),
317            metadata: json!({"model": "gpt-5"}),
318            messages: vec![
319                NormalizedMessage {
320                    idx: 0,
321                    role: "user".to_string(),
322                    author: Some("human".to_string()),
323                    created_at: Some(1_700_000_000_000),
324                    content: "audit the live persist sink".to_string(),
325                    extra: json!({"turn": 1}),
326                    snippets: vec![NormalizedSnippet {
327                        file_path: Some(PathBuf::from("src/audit.rs")),
328                        start_line: Some(1),
329                        end_line: Some(1),
330                        language: Some("rust".to_string()),
331                        snippet_text: Some("// audit".to_string()),
332                    }],
333                    invocations: Vec::new(),
334                },
335                NormalizedMessage {
336                    idx: 1,
337                    role: "assistant".to_string(),
338                    author: None,
339                    created_at: Some(1_700_000_001_000),
340                    content: "auditing".to_string(),
341                    extra: json!({}),
342                    snippets: Vec::new(),
343                    invocations: Vec::new(),
344                },
345            ],
346        }
347    }
348
349    fn canonical_conversation() -> Conversation {
350        Conversation {
351            id: Some(7),
352            agent_slug: "codex".to_string(),
353            workspace: Some(PathBuf::from("/work/audit")),
354            external_id: Some("session-audit".to_string()),
355            title: Some("Audit fixture".to_string()),
356            source_path: PathBuf::from("/work/audit/.codex/session.jsonl"),
357            started_at: Some(1_700_000_000_000),
358            ended_at: Some(1_700_000_010_000),
359            approx_tokens: None,
360            metadata_json: json!({"model": "gpt-5"}),
361            source_id: "local".to_string(),
362            origin_host: None,
363            messages: vec![
364                Message {
365                    id: Some(70),
366                    idx: 0,
367                    role: MessageRole::User,
368                    author: Some("human".to_string()),
369                    created_at: Some(1_700_000_000_000),
370                    content: "audit the live persist sink".to_string(),
371                    extra_json: json!({"turn": 1}),
372                    snippets: vec![Snippet {
373                        id: Some(700),
374                        file_path: Some(PathBuf::from("src/audit.rs")),
375                        start_line: Some(1),
376                        end_line: Some(1),
377                        language: Some("rust".to_string()),
378                        snippet_text: Some("// audit".to_string()),
379                    }],
380                },
381                Message {
382                    id: Some(71),
383                    idx: 1,
384                    role: MessageRole::Agent,
385                    author: None,
386                    created_at: Some(1_700_000_001_000),
387                    content: "auditing".to_string(),
388                    extra_json: json!({}),
389                    snippets: Vec::new(),
390                },
391            ],
392        }
393    }
394
395    #[test]
396    fn raw_and_canonical_packet_audit_matches_when_content_agrees() {
397        let provenance = ConversationPacketProvenance::local();
398        let raw = ConversationPacket::from_normalized_conversation(
399            &raw_conversation(),
400            provenance.clone(),
401        );
402        let canonical =
403            ConversationPacket::from_canonical_replay(&canonical_conversation(), provenance);
404
405        let auditor = PacketEquivalenceAuditor::new();
406        let outcome = auditor.audit_pair(&raw, &canonical, &PacketEquivalenceTolerance::strict());
407        assert!(outcome.is_match(), "expected match, got {outcome:?}");
408        if let PacketEquivalenceOutcome::Match { semantic_hash } = outcome {
409            assert_eq!(semantic_hash, raw.hashes.semantic_hash);
410            assert_eq!(semantic_hash.len(), 64, "blake3 hex digest is 64 chars");
411        }
412    }
413
414    #[test]
415    fn role_count_drift_surfaces_as_analytics_projection_difference() {
416        let provenance = ConversationPacketProvenance::local();
417        let raw = ConversationPacket::from_normalized_conversation(
418            &raw_conversation(),
419            provenance.clone(),
420        );
421
422        let mut canonical_data = canonical_conversation();
423        canonical_data.messages.push(Message {
424            id: Some(72),
425            idx: 2,
426            role: MessageRole::Tool,
427            author: Some("ripgrep".to_string()),
428            created_at: Some(1_700_000_002_000),
429            content: "tool output".to_string(),
430            extra_json: json!({}),
431            snippets: Vec::new(),
432        });
433        let canonical = ConversationPacket::from_canonical_replay(&canonical_data, provenance);
434
435        let auditor = PacketEquivalenceAuditor::new();
436        let outcome = auditor.audit_pair(&raw, &canonical, &PacketEquivalenceTolerance::strict());
437        let PacketEquivalenceOutcome::Mismatch(mismatch) = outcome else {
438            panic!("expected mismatch when role counts diverge");
439        };
440        assert!(
441            mismatch.projection_differences.iter().any(|diff| matches!(
442                diff,
443                PacketProjectionDifference::AnalyticsRoleCounts { a, b }
444                    if a.tool_messages == 0 && b.tool_messages == 1
445            )),
446            "expected analytics tool-message drift, got {:?}",
447            mismatch.projection_differences
448        );
449        assert!(
450            !mismatch.is_hash_only(),
451            "projection drift must not be downgraded to hash-only"
452        );
453    }
454
455    #[test]
456    fn redaction_drift_is_excused_only_under_explicit_tolerance() {
457        let provenance = ConversationPacketProvenance::local();
458        let raw = ConversationPacket::from_normalized_conversation(
459            &raw_conversation(),
460            provenance.clone(),
461        );
462        let mut redacted = canonical_conversation();
463        // Simulate redaction substituting content while preserving byte
464        // count (the projection contract requires byte-for-byte length
465        // agreement; secret-redactors that change length would break the
466        // analytics projection regardless of tolerance, which is correct).
467        let redacted_text = "█".repeat(raw.payload.messages[0].content.chars().count());
468        debug_assert_eq!(
469            redacted_text.chars().count(),
470            raw.payload.messages[0].content.chars().count()
471        );
472        // Match the byte length of the original content to keep the
473        // lexical/semantic byte projections aligned (the redactor in
474        // production is responsible for the same invariant; this test
475        // pins the contract).
476        let want_bytes = raw.payload.messages[0].content.len();
477        let mut bytes = Vec::with_capacity(want_bytes);
478        bytes.resize(want_bytes, b'#');
479        redacted.messages[0].content = String::from_utf8(bytes).unwrap();
480        let canonical = ConversationPacket::from_canonical_replay(&redacted, provenance);
481
482        let auditor = PacketEquivalenceAuditor::new();
483
484        let strict = auditor.audit_pair(&raw, &canonical, &PacketEquivalenceTolerance::strict());
485        let PacketEquivalenceOutcome::Mismatch(mismatch) = strict else {
486            panic!("strict audit should flag content/hash drift");
487        };
488        assert!(
489            mismatch.is_hash_only(),
490            "byte-length-preserving redaction should leave only hash drift, got {:?}",
491            mismatch
492        );
493        assert!(
494            mismatch
495                .hash_differences
496                .iter()
497                .any(|d| matches!(d, PacketHashDifference::SemanticHash { .. }))
498        );
499
500        let tolerant = auditor.audit_pair(
501            &raw,
502            &canonical,
503            &PacketEquivalenceTolerance::allow_redaction(),
504        );
505        assert!(
506            tolerant.is_match(),
507            "redaction-tolerant audit must match when only hashes drift, got {tolerant:?}"
508        );
509    }
510
511    #[test]
512    fn audit_env_gate_is_off_by_default_and_respects_explicit_opt_in() {
513        let _guard = env_lock();
514        let previous = std::env::var(PACKET_EQUIVALENCE_AUDIT_ENV).ok();
515
516        // SAFETY: single-threaded test holding env_lock; restored below.
517        unsafe {
518            std::env::remove_var(PACKET_EQUIVALENCE_AUDIT_ENV);
519        }
520        assert!(
521            !packet_equivalence_audit_enabled(),
522            "audit must default to OFF so production cost stays at zero"
523        );
524
525        for value in ["1", "true", "TRUE", "yes", "on"] {
526            // SAFETY: single-threaded test holding env_lock.
527            unsafe {
528                std::env::set_var(PACKET_EQUIVALENCE_AUDIT_ENV, value);
529            }
530            assert!(
531                packet_equivalence_audit_enabled(),
532                "value {value:?} should opt into the audit"
533            );
534        }
535
536        for value in ["0", "false", "no", "off", ""] {
537            // SAFETY: single-threaded test holding env_lock.
538            unsafe {
539                std::env::set_var(PACKET_EQUIVALENCE_AUDIT_ENV, value);
540            }
541            assert!(
542                !packet_equivalence_audit_enabled(),
543                "value {value:?} must NOT opt into the audit"
544            );
545        }
546
547        // Restore the caller's env to keep parallel tests deterministic.
548        // SAFETY: single-threaded test holding env_lock.
549        unsafe {
550            match previous {
551                Some(v) => std::env::set_var(PACKET_EQUIVALENCE_AUDIT_ENV, v),
552                None => std::env::remove_var(PACKET_EQUIVALENCE_AUDIT_ENV),
553            }
554        }
555    }
556
557    #[test]
558    fn audit_outcome_serializes_with_outcome_tag() {
559        let provenance = ConversationPacketProvenance::local();
560        let raw = ConversationPacket::from_normalized_conversation(
561            &raw_conversation(),
562            provenance.clone(),
563        );
564        let canonical =
565            ConversationPacket::from_canonical_replay(&canonical_conversation(), provenance);
566        let outcome = PacketEquivalenceAuditor::new().audit_pair(
567            &raw,
568            &canonical,
569            &PacketEquivalenceTolerance::strict(),
570        );
571        let serialized = serde_json::to_string(&outcome).expect("serialize match outcome");
572        assert!(
573            serialized.contains("\"outcome\":\"match\""),
574            "match outcome should serialize with snake_case `outcome` tag, got {serialized}"
575        );
576        assert!(serialized.contains("\"semantic_hash\""));
577    }
578
579    /// `coding_agent_session_search-ibuuh.32` (kill-switch catalog
580    /// gate): the PACKET_SINK_MIGRATIONS catalog must list every
581    /// derivative consumer sink covered by the migration AND keep
582    /// each entry self-consistent (sink id non-empty, helper +
583    /// fallback paths fully qualified, equivalence test name
584    /// fully qualified, commit hash present). A future migration
585    /// adding a sink without registering it here trips this gate.
586    #[test]
587    fn packet_sink_migration_catalog_documents_every_consumer_sink() {
588        // The three consumers the bead AC enumerates: lexical,
589        // analytics, semantic. (Canonical persistence is the packet
590        // payload itself — there is no separate "packet helper" for
591        // the storage write because the packet *is* the canonical
592        // form being persisted.)
593        let sinks: Vec<&str> = PACKET_SINK_MIGRATIONS
594            .iter()
595            .map(|migration| migration.sink)
596            .collect();
597        assert!(
598            sinks.contains(&"lexical"),
599            "catalog must list the lexical sink"
600        );
601        assert!(
602            sinks.contains(&"analytics"),
603            "catalog must list the analytics sink"
604        );
605        assert!(
606            sinks.contains(&"semantic"),
607            "catalog must list the semantic sink"
608        );
609
610        for migration in PACKET_SINK_MIGRATIONS {
611            assert!(!migration.sink.is_empty(), "sink id must be non-empty");
612            assert!(
613                migration.packet_helper.starts_with("crate::"),
614                "packet_helper must be fully qualified, got {:?}",
615                migration.packet_helper
616            );
617            assert!(
618                migration.legacy_fallback.starts_with("crate::"),
619                "legacy_fallback must be fully qualified, got {:?}",
620                migration.legacy_fallback
621            );
622            assert!(
623                migration.equivalence_test.starts_with("crate::"),
624                "equivalence_test must be fully qualified, got {:?}",
625                migration.equivalence_test
626            );
627            assert!(
628                !migration.landed_in_commit.is_empty(),
629                "landed_in_commit must reference the migration commit"
630            );
631            assert!(
632                migration.landed_in_commit.len() >= 7
633                    && migration
634                        .landed_in_commit
635                        .chars()
636                        .all(|c| c.is_ascii_hexdigit()),
637                "landed_in_commit must look like a git short-hash, got {:?}",
638                migration.landed_in_commit
639            );
640        }
641    }
642
643    /// PACKET_SINK_MIGRATIONS must serialize cleanly through serde so
644    /// future operator tooling (e.g. `cass doctor --packet-equivalence`,
645    /// or a status-page kill-switch view) can emit the catalog as JSON
646    /// without re-deriving the schema.
647    #[test]
648    fn packet_sink_migration_catalog_serializes_as_json_array() {
649        let json = serde_json::to_string(PACKET_SINK_MIGRATIONS)
650            .expect("PACKET_SINK_MIGRATIONS must serialize");
651        // Spot-check the lexical entry survives serialization.
652        assert!(json.contains("\"sink\":\"lexical\""));
653        assert!(json.contains("add_messages_from_packet"));
654        assert!(
655            json.contains("\"landed_in_commit\":\"19820c7a\""),
656            "lexical entry must reference its landing commit, got {json}"
657        );
658        // Verify shape: an array of N objects each with the seven
659        // expected keys. We deserialize into serde_json::Value rather
660        // than PacketSinkMigration because the struct fields are
661        // &'static str, which serde cannot rehydrate from an owned
662        // JSON string. The shape check is the contract operators
663        // care about.
664        let parsed: serde_json::Value =
665            serde_json::from_str(&json).expect("catalog must parse as JSON");
666        let arr = parsed.as_array().expect("catalog serializes as array");
667        assert_eq!(arr.len(), PACKET_SINK_MIGRATIONS.len());
668        for (entry, migration) in arr.iter().zip(PACKET_SINK_MIGRATIONS.iter()) {
669            let obj = entry.as_object().expect("each catalog entry is an object");
670            assert_eq!(
671                obj.get("sink").and_then(|v| v.as_str()),
672                Some(migration.sink),
673                "sink field must round-trip"
674            );
675            assert!(obj.contains_key("packet_helper"));
676            assert!(obj.contains_key("legacy_fallback"));
677            assert!(obj.contains_key("equivalence_test"));
678            assert!(obj.contains_key("kill_switch_env"));
679            assert!(obj.contains_key("landed_in_commit"));
680        }
681    }
682}