Skip to main content

zeph_context/
typed_page.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Typed page classification and minimum-fidelity invariants for context compaction.
5//!
6//! Every context segment entering the assembler is tagged with a [`PageType`] and
7//! wrapped in a [`TypedPage`]. The [`PageInvariant`] trait declares the fidelity
8//! contract enforced at every compaction boundary.
9//!
10//! Classification is deterministic and side-effect free — no I/O, no LLM calls.
11//!
12//! # Architecture
13//!
14//! This module lives in `zeph-context` to keep classification logic co-located with
15//! the assembler. No dependency on `zeph-memory` is introduced here.
16//!
17//! # Feature flag
18//!
19//! All typed-page functionality is gated behind the
20//! `[memory.compaction.typed_pages] enabled = true` config key. When disabled the
21//! assembler falls back to the legacy untyped path without behaviour change.
22
23use std::sync::Arc;
24use std::time::Duration;
25
26use serde::{Deserialize, Serialize};
27
28// ── PageType ──────────────────────────────────────────────────────────────────
29
30/// Classification of a context segment for compaction purposes.
31///
32/// The variant determines which [`PageInvariant`] is enforced and what shape the
33/// compacted summary must have.
34///
35/// # Invariant
36///
37/// Every [`TypedPage`] carries exactly one `PageType`. Unclassifiable segments
38/// default to [`PageType::ConversationTurn`] (see [`classify`]).
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
40#[serde(rename_all = "snake_case")]
41pub enum PageType {
42    /// A tool request/response pair sourced from memory or the current turn.
43    ToolOutput,
44    /// A user or assistant message that does not carry a tool role.
45    ConversationTurn,
46    /// Cross-session context, past summaries, or graph-fact recall injections.
47    MemoryExcerpt,
48    /// Session digest, persona, skill instructions, or compression guidelines.
49    SystemContext,
50}
51
52impl std::fmt::Display for PageType {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        match self {
55            Self::ToolOutput => f.write_str("tool_output"),
56            Self::ConversationTurn => f.write_str("conversation_turn"),
57            Self::MemoryExcerpt => f.write_str("memory_excerpt"),
58            Self::SystemContext => f.write_str("system_context"),
59        }
60    }
61}
62
63// ── PageOrigin ────────────────────────────────────────────────────────────────
64
65/// Provenance of a [`TypedPage`], serialised into audit records.
66#[derive(Debug, Clone, Serialize, Deserialize)]
67#[serde(tag = "kind", rename_all = "snake_case")]
68#[non_exhaustive]
69pub enum PageOrigin {
70    /// Tool request/response pair.
71    ToolPair {
72        /// Name of the tool that produced this output.
73        tool_name: String,
74    },
75    /// User or assistant conversation turn.
76    Turn {
77        /// Opaque message identifier (numeric message id as string).
78        message_id: String,
79    },
80    /// Injected from memory (cross-session, summary, graph-facts, etc.).
81    Excerpt {
82        /// Human-readable label identifying the memory source.
83        source_label: String,
84    },
85    /// Session-level system context (persona, skills, digest).
86    System {
87        /// Logical key for this system context block (e.g. `"persona"`, `"skills"`).
88        key: String,
89    },
90}
91
92// ── SchemaHint ────────────────────────────────────────────────────────────────
93
94/// Body format hint for [`PageType::ToolOutput`] pages.
95///
96/// Used by the invariant to select the correct structured-summary prompt.
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
98#[serde(rename_all = "snake_case")]
99#[non_exhaustive]
100pub enum SchemaHint {
101    /// Body is valid JSON (object or array).
102    Json,
103    /// Body is UTF-8 text (log lines, prose, etc.).
104    Text,
105    /// Body is a unified diff.
106    Diff,
107    /// Body is a tab- or comma-separated table.
108    Table,
109    /// Body is non-UTF-8 binary data.
110    Binary,
111}
112
113// ── PageId ────────────────────────────────────────────────────────────────────
114
115/// Stable content-addressed identifier for a [`TypedPage`].
116///
117/// Computed as BLAKE3 over `page_type_tag || origin_tag || body_bytes`, encoded
118/// as lowercase hex (first 16 bytes = 32 hex chars). The same input always
119/// produces the same `PageId`, enabling deduplication across turns.
120///
121/// # Semantics
122///
123/// `PageId` is a **content hash**: identical source bytes (same page type, same
124/// origin key, same body) always produce the same id. This means that the same
125/// tool output appearing in two different turns produces the same `PageId`.
126/// Callers that need per-turn provenance must use `turn_id` from the audit record
127/// — `PageId` is for deduplication, not for uniqueness across turns.
128#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
129pub struct PageId(pub String);
130
131impl PageId {
132    /// Compute a [`PageId`] from the page type, origin key, and body bytes.
133    #[must_use]
134    pub fn compute(page_type: PageType, origin_key: &str, body: &[u8]) -> Self {
135        let mut hasher = blake3::Hasher::new();
136        hasher.update(page_type.to_string().as_bytes());
137        hasher.update(b"|");
138        hasher.update(origin_key.as_bytes());
139        hasher.update(b"|");
140        hasher.update(body);
141        let hash = hasher.finalize();
142        // Use first 16 bytes (128 bits) — sufficient for deduplication purposes.
143        let mut hex = String::with_capacity(32);
144        for b in &hash.as_bytes()[..16] {
145            use std::fmt::Write as _;
146            let _ = write!(hex, "{b:02x}");
147        }
148        Self(format!("blake3:{hex}"))
149    }
150}
151
152// ── TypedPage ─────────────────────────────────────────────────────────────────
153
154/// A classified context segment ready for invariant-aware compaction.
155///
156/// `TypedPage` is the unit of work passed to compaction boundaries. The
157/// [`PageId`] is content-stable: the same source bytes always produce the same
158/// id, enabling the compactor to skip already-compacted pages.
159#[derive(Debug, Clone)]
160pub struct TypedPage {
161    /// Stable content-addressed identifier.
162    pub page_id: PageId,
163    /// Classification determining which invariant applies.
164    pub page_type: PageType,
165    /// Provenance of this page (for audit records).
166    pub origin: PageOrigin,
167    /// Token count of the original body.
168    pub tokens: u32,
169    /// Body text shared across potential clones.
170    pub body: Arc<str>,
171    /// Body format hint (populated for `ToolOutput` only; `None` otherwise).
172    pub schema_hint: Option<SchemaHint>,
173}
174
175impl TypedPage {
176    /// Construct a new [`TypedPage`], computing its [`PageId`] from content.
177    #[must_use]
178    pub fn new(
179        page_type: PageType,
180        origin: PageOrigin,
181        tokens: u32,
182        body: Arc<str>,
183        schema_hint: Option<SchemaHint>,
184    ) -> Self {
185        let origin_key = origin_key_for(&origin);
186        let page_id = PageId::compute(page_type, &origin_key, body.as_bytes());
187        Self {
188            page_id,
189            page_type,
190            origin,
191            tokens,
192            body,
193            schema_hint,
194        }
195    }
196}
197
198fn origin_key_for(origin: &PageOrigin) -> String {
199    match origin {
200        PageOrigin::ToolPair { tool_name } => format!("tool:{tool_name}"),
201        PageOrigin::Turn { message_id } => format!("turn:{message_id}"),
202        PageOrigin::Excerpt { source_label } => format!("excerpt:{source_label}"),
203        PageOrigin::System { key } => format!("system:{key}"),
204    }
205}
206
207// ── FidelityContract ──────────────────────────────────────────────────────────
208
209/// The set of fields that must be present in a compacted page.
210///
211/// Returned by [`PageInvariant::minimum_fidelity`] and checked by
212/// [`PageInvariant::verify`] after summarization.
213#[derive(Debug, Clone)]
214pub struct FidelityContract {
215    /// Human-readable label for this contract version (e.g. `"structured_summary_v1"`).
216    pub fidelity_level: &'static str,
217    /// Schema version integer included in audit records.
218    pub invariant_version: u8,
219    /// Fields that must appear in the compacted body text.
220    pub required_fields: &'static [&'static str],
221}
222
223// ── FidelityViolation ─────────────────────────────────────────────────────────
224
225/// Describes why an invariant check failed after compaction.
226///
227/// A violation is a hard error: the compacted page is dropped and an audit
228/// record with `violations` is emitted.
229#[derive(Debug, Clone, Serialize)]
230pub struct FidelityViolation {
231    /// The field or property that was expected but missing.
232    pub missing_field: String,
233    /// Human-readable explanation of the violation.
234    pub detail: String,
235}
236
237// ── CompactedPage ─────────────────────────────────────────────────────────────
238
239/// The output of a compaction attempt, passed to [`PageInvariant::verify`].
240#[derive(Debug, Clone)]
241pub struct CompactedPage {
242    /// The summarized body text produced by the compaction provider.
243    pub body: Arc<str>,
244    /// Token count of the compacted body.
245    pub tokens: u32,
246}
247
248// ── PageInvariant trait ───────────────────────────────────────────────────────
249
250/// Minimum-fidelity contract for a single [`PageType`].
251///
252/// Implementors declare what a compacted page must contain ([`minimum_fidelity`])
253/// and verify that the actual output honours the contract ([`verify`]).
254///
255/// The trait is object-safe so implementations can be stored in a
256/// `HashMap<PageType, Box<dyn PageInvariant>>` registry.
257///
258/// # Contract
259///
260/// - [`verify`] MUST NOT perform I/O or call an LLM.
261/// - A failed [`verify`] means the compacted page is dropped — it is NEVER
262///   injected in degraded form.
263///
264/// [`minimum_fidelity`]: PageInvariant::minimum_fidelity
265/// [`verify`]: PageInvariant::verify
266pub trait PageInvariant: Send + Sync {
267    /// The page type this invariant governs.
268    fn page_type(&self) -> PageType;
269
270    /// Return the fidelity contract required for a given page.
271    fn minimum_fidelity(&self, page: &TypedPage) -> FidelityContract;
272
273    /// Verify that `compacted` satisfies the fidelity contract derived from `original`.
274    ///
275    /// # Errors
276    ///
277    /// Returns a non-empty [`Vec<FidelityViolation>`] when one or more required
278    /// fields are absent from the compacted body.
279    fn verify(
280        &self,
281        original: &TypedPage,
282        compacted: &CompactedPage,
283    ) -> Result<(), Vec<FidelityViolation>>;
284}
285
286// ── Per-type invariant implementations ───────────────────────────────────────
287
288/// Invariant for [`PageType::ToolOutput`] pages.
289///
290/// The compacted body must contain the tool name, an exit/status indicator,
291/// and at least one structural key from the original output.
292pub struct ToolOutputInvariant;
293
294impl PageInvariant for ToolOutputInvariant {
295    fn page_type(&self) -> PageType {
296        PageType::ToolOutput
297    }
298
299    fn minimum_fidelity(&self, _page: &TypedPage) -> FidelityContract {
300        FidelityContract {
301            fidelity_level: "structured_summary_v1",
302            invariant_version: 1,
303            required_fields: &["tool_name", "exit_status"],
304        }
305    }
306
307    fn verify(
308        &self,
309        original: &TypedPage,
310        compacted: &CompactedPage,
311    ) -> Result<(), Vec<FidelityViolation>> {
312        let body = compacted.body.as_ref();
313        // For binary pages the body marker is injected by the compactor, skip field checks.
314        if original.schema_hint == Some(SchemaHint::Binary) {
315            return Ok(());
316        }
317
318        let mut violations = Vec::new();
319
320        // The compacted body must reference the tool name.
321        let tool_name = match &original.origin {
322            PageOrigin::ToolPair { tool_name } => tool_name.as_str(),
323            _ => "",
324        };
325        if !tool_name.is_empty() && !body.contains(tool_name) {
326            violations.push(FidelityViolation {
327                missing_field: "tool_name".into(),
328                detail: format!("compacted body does not reference tool '{tool_name}'"),
329            });
330        }
331
332        // The compacted body must contain at least one exit / status indicator.
333        let has_status = body.contains("exit_status")
334            || body.contains("exit_code")
335            || body.contains("status:")
336            || body.contains("Status:")
337            || body.contains("exit:")
338            || body.contains("rc:");
339        if !has_status {
340            violations.push(FidelityViolation {
341                missing_field: "exit_status".into(),
342                detail: "compacted body does not contain an exit status indicator".into(),
343            });
344        }
345
346        // For JSON-schema tool outputs, verify that at least one top-level JSON
347        // field name from the original body is present in the compacted body
348        // (FR-003: structural keys must be preserved, not just exit-status markers).
349        if original.schema_hint == Some(SchemaHint::Json) {
350            let original_body = original.body.as_ref();
351            let preserved = check_json_structural_key(original_body, body);
352            if !preserved {
353                violations.push(FidelityViolation {
354                    missing_field: "structural_key".into(),
355                    detail: "compacted JSON tool output does not reference any top-level field \
356                             name from the original output"
357                        .into(),
358                });
359            }
360        }
361
362        if violations.is_empty() {
363            Ok(())
364        } else {
365            Err(violations)
366        }
367    }
368}
369
370/// Check that at least one top-level JSON key from `original` appears in `compacted`.
371///
372/// Parses `original` as a JSON object and returns `true` when any top-level key
373/// string is a substring of `compacted`. Returns `true` (no violation) when
374/// `original` is not a valid JSON object — the caller already checked schema hint.
375fn check_json_structural_key(original: &str, compacted: &str) -> bool {
376    let Ok(value) = serde_json::from_str::<serde_json::Value>(original) else {
377        return true;
378    };
379    let Some(obj) = value.as_object() else {
380        return true;
381    };
382    if obj.is_empty() {
383        return true;
384    }
385    obj.keys().any(|k| compacted.contains(k.as_str()))
386}
387
388/// Invariant for [`PageType::ConversationTurn`] pages.
389///
390/// The compacted body must preserve a role indicator and some meaningful content.
391pub struct ConversationTurnInvariant;
392
393impl PageInvariant for ConversationTurnInvariant {
394    fn page_type(&self) -> PageType {
395        PageType::ConversationTurn
396    }
397
398    fn minimum_fidelity(&self, _page: &TypedPage) -> FidelityContract {
399        FidelityContract {
400            fidelity_level: "semantic_summary_v1",
401            invariant_version: 1,
402            required_fields: &["role"],
403        }
404    }
405
406    fn verify(
407        &self,
408        _original: &TypedPage,
409        compacted: &CompactedPage,
410    ) -> Result<(), Vec<FidelityViolation>> {
411        let body = compacted.body.as_ref();
412        let has_role =
413            body.contains("user") || body.contains("assistant") || body.contains("system");
414        if !has_role {
415            return Err(vec![FidelityViolation {
416                missing_field: "role".into(),
417                detail: "compacted turn does not identify a speaker role".into(),
418            }]);
419        }
420        Ok(())
421    }
422}
423
424/// Invariant for [`PageType::MemoryExcerpt`] pages.
425///
426/// The compacted body must retain the source label and a message id reference.
427pub struct MemoryExcerptInvariant;
428
429impl PageInvariant for MemoryExcerptInvariant {
430    fn page_type(&self) -> PageType {
431        PageType::MemoryExcerpt
432    }
433
434    fn minimum_fidelity(&self, _page: &TypedPage) -> FidelityContract {
435        FidelityContract {
436            fidelity_level: "excerpt_summary_v1",
437            invariant_version: 1,
438            required_fields: &["source_label"],
439        }
440    }
441
442    fn verify(
443        &self,
444        original: &TypedPage,
445        compacted: &CompactedPage,
446    ) -> Result<(), Vec<FidelityViolation>> {
447        let source_label = match &original.origin {
448            PageOrigin::Excerpt { source_label } => source_label.as_str(),
449            _ => return Ok(()),
450        };
451        if !compacted.body.contains(source_label) {
452            return Err(vec![FidelityViolation {
453                missing_field: "source_label".into(),
454                detail: format!("compacted excerpt does not contain source label '{source_label}'"),
455            }]);
456        }
457        Ok(())
458    }
459}
460
461/// Invariant for [`PageType::SystemContext`] pages.
462///
463/// System context MUST NOT be paraphrased. Compaction replaces it with a
464/// pointer record; any body other than the pointer prefix is a violation.
465pub struct SystemContextInvariant;
466
467/// Pointer prefix that the compactor writes for `SystemContext` pages.
468pub const SYSTEM_POINTER_PREFIX: &str = "[system-ptr:";
469
470impl PageInvariant for SystemContextInvariant {
471    fn page_type(&self) -> PageType {
472        PageType::SystemContext
473    }
474
475    fn minimum_fidelity(&self, _page: &TypedPage) -> FidelityContract {
476        FidelityContract {
477            fidelity_level: "pointer_replace_v1",
478            invariant_version: 1,
479            required_fields: &["pointer"],
480        }
481    }
482
483    fn verify(
484        &self,
485        _original: &TypedPage,
486        compacted: &CompactedPage,
487    ) -> Result<(), Vec<FidelityViolation>> {
488        if !compacted.body.starts_with(SYSTEM_POINTER_PREFIX) {
489            return Err(vec![FidelityViolation {
490                missing_field: "pointer".into(),
491                detail: format!(
492                    "SystemContext page was not pointer-replaced \
493                     (body does not start with '{SYSTEM_POINTER_PREFIX}')"
494                ),
495            }]);
496        }
497        Ok(())
498    }
499}
500
501// ── InvariantRegistry ─────────────────────────────────────────────────────────
502
503/// Registry mapping each [`PageType`] to its [`PageInvariant`] implementation.
504///
505/// Built once and shared via `Arc` so tests can swap in a mock registry.
506///
507/// # Examples
508///
509/// ```
510/// use zeph_context::typed_page::{InvariantRegistry, PageType};
511///
512/// let reg = InvariantRegistry::default();
513/// let inv = reg.get(PageType::ToolOutput).unwrap();
514/// assert_eq!(inv.page_type(), PageType::ToolOutput);
515/// ```
516pub struct InvariantRegistry {
517    tool_output: Box<dyn PageInvariant>,
518    conversation_turn: Box<dyn PageInvariant>,
519    memory_excerpt: Box<dyn PageInvariant>,
520    system_context: Box<dyn PageInvariant>,
521}
522
523impl Default for InvariantRegistry {
524    fn default() -> Self {
525        Self {
526            tool_output: Box::new(ToolOutputInvariant),
527            conversation_turn: Box::new(ConversationTurnInvariant),
528            memory_excerpt: Box::new(MemoryExcerptInvariant),
529            system_context: Box::new(SystemContextInvariant),
530        }
531    }
532}
533
534impl InvariantRegistry {
535    /// Look up the invariant for a given [`PageType`].
536    ///
537    /// Always returns `Some` for the four built-in variants.
538    #[must_use]
539    pub fn get(&self, page_type: PageType) -> Option<&dyn PageInvariant> {
540        match page_type {
541            PageType::ToolOutput => Some(self.tool_output.as_ref()),
542            PageType::ConversationTurn => Some(self.conversation_turn.as_ref()),
543            PageType::MemoryExcerpt => Some(self.memory_excerpt.as_ref()),
544            PageType::SystemContext => Some(self.system_context.as_ref()),
545        }
546    }
547
548    /// Verify that `compacted` satisfies the invariant for `original` at a compaction boundary.
549    ///
550    /// This is the primary entry point for the compactor — it wraps `verify()` in a
551    /// `tracing::info_span!` per NFR-009 so every compaction boundary is observable.
552    ///
553    /// Returns `Ok(())` when the invariant is satisfied, or the violation list on failure.
554    ///
555    /// # Errors
556    ///
557    /// Propagates [`FidelityViolation`]s from the registered invariant implementation.
558    pub fn enforce(
559        &self,
560        original: &TypedPage,
561        compacted: &CompactedPage,
562    ) -> Result<(), Vec<FidelityViolation>> {
563        let _span = tracing::info_span!(
564            "context.compaction.typed_page",
565            page_type = %original.page_type,
566            page_id = %original.page_id.0,
567            original_tokens = original.tokens,
568            compacted_tokens = compacted.tokens,
569        )
570        .entered();
571
572        if let Some(inv) = self.get(original.page_type) {
573            inv.verify(original, compacted)
574        } else {
575            tracing::warn!(
576                page_type = %original.page_type,
577                "no invariant registered for page type — skipping verification"
578            );
579            Ok(())
580        }
581    }
582}
583
584// ── Classification helpers ────────────────────────────────────────────────────
585
586/// Classify a context segment by examining well-known prefix markers.
587///
588/// Classification is deterministic and performs no I/O. When the input does not
589/// match any known prefix the function defaults to [`PageType::ConversationTurn`]
590/// and logs at `WARN` level per FR-008.
591///
592/// The function emits a `context.compaction.typed_page.classify` span per NFR-009
593/// so every classification is observable in traces.
594///
595/// | Source marker | Assigned [`PageType`] |
596/// |---|---|
597/// | Starts with `[tool_output]` or `[tool:` | [`PageType::ToolOutput`] |
598/// | Starts with `[cross-session context]`, `[semantic recall]`, `[known facts]`, `[conversation summaries]` | [`PageType::MemoryExcerpt`] |
599/// | Starts with `[Persona context]`, `[Past experience]`, `[Memory summary]`, `[system` | [`PageType::SystemContext`] |
600/// | Everything else | [`PageType::ConversationTurn`] |
601///
602/// # Examples
603///
604/// ```
605/// use zeph_context::typed_page::{classify, PageType};
606///
607/// assert_eq!(classify("[tool_output] exit_code: 0"), PageType::ToolOutput);
608/// assert_eq!(classify("[cross-session context]\nsome recall"), PageType::MemoryExcerpt);
609/// assert_eq!(classify("[Persona context]\nfact"), PageType::SystemContext);
610/// assert_eq!(classify("Hello, world!"), PageType::ConversationTurn);
611/// ```
612#[must_use]
613pub fn classify(body: &str) -> PageType {
614    classify_with_role(body, false)
615}
616
617/// Classify a context segment, with an explicit `is_system_role` hint.
618///
619/// When `is_system_role` is `true` the segment is classified as
620/// [`PageType::SystemContext`] without prefix matching, preventing arbitrary
621/// system messages injected by the assembler from silently falling back to
622/// `ConversationTurn` (Key Invariant: "`SystemContext` pages are never paraphrased").
623///
624/// Use this variant when the caller has access to the message `Role`.
625///
626/// # Examples
627///
628/// ```
629/// use zeph_context::typed_page::{classify_with_role, PageType};
630///
631/// // A plain system message without a known prefix is still SystemContext.
632/// assert_eq!(classify_with_role("You are a helpful assistant.", true), PageType::SystemContext);
633/// // Role hint does not override ToolOutput prefix detection.
634/// assert_eq!(classify_with_role("[tool_output] exit_code: 0", false), PageType::ToolOutput);
635/// ```
636#[must_use]
637pub fn classify_with_role(body: &str, is_system_role: bool) -> PageType {
638    tracing::info_span!(
639        "context.compaction.typed_page.classify",
640        body_len = body.len()
641    )
642    .in_scope(|| classify_with_role_inner(body, is_system_role))
643}
644
645fn classify_with_role_inner(body: &str, is_system_role: bool) -> PageType {
646    // Use the same prefix constants as the assembler for consistency.
647    const TOOL_PREFIXES: &[&str] = &["[tool_output]", "[tool:", "[Tool output]"];
648    const MEMORY_PREFIXES: &[&str] = &[
649        "[cross-session context]",
650        "[semantic recall]",
651        "[known facts]",
652        "[conversation summaries]",
653        "[past corrections]",
654        "## Relevant documents",
655    ];
656    const SYSTEM_PREFIXES: &[&str] = &[
657        "[Persona context]",
658        "[Past experience]",
659        "[Memory summary]",
660        "[system",
661        "[skill",
662        "[persona",
663        "[digest",
664        "[compression",
665    ];
666
667    let trimmed = body.trim_start();
668
669    for prefix in TOOL_PREFIXES {
670        if trimmed.starts_with(prefix) {
671            return PageType::ToolOutput;
672        }
673    }
674    for prefix in MEMORY_PREFIXES {
675        if trimmed.starts_with(prefix) {
676            return PageType::MemoryExcerpt;
677        }
678    }
679    for prefix in SYSTEM_PREFIXES {
680        if trimmed.starts_with(prefix) {
681            return PageType::SystemContext;
682        }
683    }
684
685    // When the caller signals Role::System, classify as SystemContext even if
686    // the body does not start with a known prefix.  This prevents system
687    // context injected by the assembler (e.g. plain instructions, directives)
688    // from being eligible for paraphrase.
689    if is_system_role {
690        return PageType::SystemContext;
691    }
692
693    let mut prefix_end = body.len().min(80);
694    while !body.is_char_boundary(prefix_end) {
695        prefix_end -= 1;
696    }
697    tracing::warn!(
698        body_prefix = &body[..prefix_end],
699        "typed-page classification fallback to ConversationTurn"
700    );
701    PageType::ConversationTurn
702}
703
704/// Detect [`SchemaHint`] for a [`PageType::ToolOutput`] body.
705///
706/// Returns [`SchemaHint::Binary`] when the body is not valid UTF-8 (detected via
707/// presence of replacement characters) or when the caller passes `is_binary =
708/// true`. JSON detection is heuristic (starts with `{` or `[`).
709#[must_use]
710pub fn detect_schema_hint(body: &str, is_binary: bool) -> SchemaHint {
711    if is_binary || body.contains('\u{FFFD}') {
712        return SchemaHint::Binary;
713    }
714    let trimmed = body.trim_start();
715    if trimmed.starts_with('{') || trimmed.starts_with('[') {
716        return SchemaHint::Json;
717    }
718    if trimmed.starts_with("--- ")
719        || trimmed.starts_with("+++ ")
720        || trimmed.starts_with("diff --git")
721        || trimmed.starts_with("diff -")
722    {
723        return SchemaHint::Diff;
724    }
725    // Simple table heuristic: first line contains multiple tab or pipe separators.
726    let first_line = trimmed.lines().next().unwrap_or("");
727    if first_line.matches('\t').count() >= 2 || first_line.matches('|').count() >= 2 {
728        return SchemaHint::Table;
729    }
730    SchemaHint::Text
731}
732
733// ── Audit record ──────────────────────────────────────────────────────────────
734
735/// One JSONL audit record emitted per compacted page (FR-007).
736///
737/// Written to `[memory.compaction.typed_pages] audit_path` by the audit sink
738/// before the compacted context is handed to the LLM.
739#[derive(Debug, Serialize)]
740pub struct CompactedPageRecord {
741    /// ISO-8601 timestamp when the compaction occurred.
742    pub ts: String,
743    /// Opaque turn identifier (agent turn counter as string).
744    pub turn_id: String,
745    /// Stable content-addressed page identifier.
746    pub page_id: String,
747    /// Page classification.
748    pub page_type: PageType,
749    /// Serialised page origin.
750    pub origin: PageOrigin,
751    /// Token count of the original page.
752    pub original_tokens: u32,
753    /// Token count of the compacted page.
754    pub compacted_tokens: u32,
755    /// Fidelity level label from the invariant contract.
756    pub fidelity_level: String,
757    /// Schema version integer.
758    pub invariant_version: u8,
759    /// Provider name used for summarization.
760    pub provider_name: String,
761    /// Fidelity violations encountered (empty on success).
762    pub violations: Vec<FidelityViolation>,
763    /// `true` when classification fell back to `ConversationTurn`.
764    #[serde(default, skip_serializing_if = "std::ops::Not::not")]
765    pub classification_fallback: bool,
766}
767
768// ── Batch assertions ──────────────────────────────────────────────────────────
769
770/// A failed batch-level compaction assertion.
771#[derive(Debug, Clone, Serialize)]
772pub struct BatchViolation {
773    /// Short label for the assertion that failed.
774    pub assertion: String,
775    /// Human-readable explanation.
776    pub detail: String,
777}
778
779/// Batch-level compaction assertions for typed-page enforcement.
780///
781/// Unlike per-page [`PageInvariant`] which checks one page against its compacted form,
782/// batch assertions verify aggregate properties of the entire summary against the set
783/// of classified pages that were sent to the LLM.
784///
785/// Violations are observational — they never block compaction. They are logged and
786/// emitted to the audit trail.
787///
788/// # Examples
789///
790/// ```
791/// use zeph_context::typed_page::BatchAssertions;
792///
793/// let assertions = BatchAssertions {
794///     tool_names_in_batch: vec!["shell".to_string()],
795///     has_memory_excerpt: false,
796///     excerpt_labels: vec![],
797/// };
798/// // Summary that mentions the tool — all assertions pass.
799/// let violations = assertions.check("shell ran and exited 0");
800/// assert!(violations.is_empty());
801/// ```
802#[derive(Debug, Clone, Default)]
803pub struct BatchAssertions {
804    /// Tool names collected from `ToolOutput` pages in the batch.
805    pub tool_names_in_batch: Vec<String>,
806    /// Whether any `MemoryExcerpt` pages were in the batch.
807    pub has_memory_excerpt: bool,
808    /// Source labels from `MemoryExcerpt` pages.
809    pub excerpt_labels: Vec<String>,
810}
811
812impl BatchAssertions {
813    /// Check the summary against batch-level assertions.
814    ///
815    /// Returns a list of assertion failures (empty = all pass). Failures are never fatal.
816    #[must_use]
817    pub fn check(&self, summary: &str) -> Vec<BatchViolation> {
818        let mut violations = Vec::new();
819
820        // At least one tool name from the batch must appear in the summary.
821        if !self.tool_names_in_batch.is_empty() {
822            let any_tool_mentioned = self
823                .tool_names_in_batch
824                .iter()
825                .any(|name| !name.is_empty() && summary.contains(name.as_str()));
826            if !any_tool_mentioned {
827                violations.push(BatchViolation {
828                    assertion: "tool_coverage".into(),
829                    detail: format!(
830                        "summary mentions none of the {} tool(s) in batch: {:?}",
831                        self.tool_names_in_batch.len(),
832                        self.tool_names_in_batch
833                    ),
834                });
835            }
836        }
837
838        // If memory excerpts were present, at least one source label should appear.
839        if self.has_memory_excerpt && !self.excerpt_labels.is_empty() {
840            let any_label_mentioned = self
841                .excerpt_labels
842                .iter()
843                .any(|label| !label.is_empty() && summary.contains(label.as_str()));
844            if !any_label_mentioned {
845                violations.push(BatchViolation {
846                    assertion: "excerpt_label_coverage".into(),
847                    detail: format!(
848                        "summary mentions none of the memory excerpt labels: {:?}",
849                        self.excerpt_labels
850                    ),
851                });
852            }
853        }
854
855        violations
856    }
857}
858
859// ── TypedPagesState ───────────────────────────────────────────────────────────
860
861/// Shared runtime state for typed-page compaction, created once at agent startup.
862///
863/// Bundles the invariant registry and optional audit sink so they can be shared
864/// via `Arc` across compaction calls without per-call allocation.
865pub struct TypedPagesState {
866    /// Invariant registry shared across all compaction calls.
867    pub registry: InvariantRegistry,
868    /// Optional JSONL audit sink. `None` when audit is disabled.
869    pub audit_sink: Option<CompactionAuditSink>,
870    /// Whether enforcement is `Active` (pointer-replace `SystemContext` + batch assertions).
871    /// `false` = `Observe` mode (classify and audit only, no behavioral change).
872    pub is_active: bool,
873}
874
875// ── Audit command ─────────────────────────────────────────────────────────────
876
877/// Internal command sent through the audit sink channel.
878enum AuditCommand {
879    /// Write a compaction record.
880    Record(CompactedPageRecord),
881    /// Flush all preceding records and signal completion via the oneshot.
882    Flush(tokio::sync::oneshot::Sender<()>),
883}
884
885// ── Audit sink ────────────────────────────────────────────────────────────────
886
887/// Async bounded-mpsc audit sink for compaction records.
888///
889/// The sink serialises [`CompactedPageRecord`] values to a JSONL file via a
890/// background writer task, mirroring the `zeph-tools` audit pattern. Dropped
891/// records (when the channel is full) are counted and logged.
892///
893/// # Invariant
894///
895/// [`CompactionAuditSink::flush`] sends a rendezvous sentinel through the channel
896/// and awaits the writer task's confirmation with a 100 ms timeout. Records accepted
897/// into the channel before `flush` is called are guaranteed to be written before the
898/// flush responder fires.
899///
900/// # Examples
901///
902/// ```no_run
903/// use zeph_context::typed_page::CompactionAuditSink;
904/// use std::path::Path;
905///
906/// # async fn example() {
907/// let sink = CompactionAuditSink::open(Path::new(".local/audit/compaction.jsonl"), 256)
908///     .await
909///     .unwrap();
910/// # }
911/// ```
912#[derive(Debug, Clone)]
913pub struct CompactionAuditSink {
914    tx: tokio::sync::mpsc::Sender<AuditCommand>,
915    drop_counter: Arc<std::sync::atomic::AtomicU64>,
916}
917
918impl CompactionAuditSink {
919    /// Open a new audit sink writing to `path`.
920    ///
921    /// `capacity` is the bounded channel depth; records dropped when full are counted
922    /// in the internal drop counter and logged at WARN.
923    ///
924    /// # Errors
925    ///
926    /// Returns an error when `path` cannot be opened for appending.
927    pub async fn open(path: &std::path::Path, capacity: usize) -> Result<Self, std::io::Error> {
928        use tokio::io::AsyncWriteExt as _;
929
930        if let Some(parent) = path.parent() {
931            tokio::fs::create_dir_all(parent).await?;
932        }
933        let file = tokio::fs::OpenOptions::new()
934            .create(true)
935            .append(true)
936            .open(path)
937            .await?;
938
939        let (tx, mut rx) = tokio::sync::mpsc::channel::<AuditCommand>(capacity.max(1));
940        let drop_counter = Arc::new(std::sync::atomic::AtomicU64::new(0));
941        let drop_counter_bg = drop_counter.clone();
942
943        tokio::spawn(async move {
944            let mut writer = tokio::io::BufWriter::new(file);
945            while let Some(cmd) = rx.recv().await {
946                match cmd {
947                    AuditCommand::Record(record) => match serde_json::to_string(&record) {
948                        Ok(mut line) => {
949                            line.push('\n');
950                            if let Err(e) = writer.write_all(line.as_bytes()).await {
951                                tracing::error!("compaction audit write failed: {e:#}");
952                            }
953                        }
954                        Err(e) => {
955                            tracing::error!("compaction audit serialization failed: {e:#}");
956                        }
957                    },
958                    AuditCommand::Flush(responder) => {
959                        let _ = writer.flush().await;
960                        let _ = responder.send(());
961                    }
962                }
963            }
964            // Flush remaining bytes when channel closes.
965            let _ = writer.flush().await;
966
967            let dropped = drop_counter_bg.load(std::sync::atomic::Ordering::Relaxed);
968            if dropped > 0 {
969                tracing::warn!(dropped, "compaction audit sink closed with dropped records");
970            }
971        });
972
973        Ok(Self { tx, drop_counter })
974    }
975
976    /// Send a record to the audit sink.
977    ///
978    /// If the channel is full the record is dropped and the drop counter is incremented.
979    pub fn send(&self, record: CompactedPageRecord) {
980        match self.tx.try_send(AuditCommand::Record(record)) {
981            Ok(()) => {}
982            Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
983                let prev = self
984                    .drop_counter
985                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
986                tracing::warn!(
987                    dropped_total = prev + 1,
988                    "compaction audit sink full — record dropped (best-effort audit)"
989                );
990            }
991            Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
992                tracing::error!("compaction audit sink closed unexpectedly");
993            }
994        }
995    }
996
997    /// Flush all pending records with bounded 100 ms timeout.
998    ///
999    /// Sends a `Flush` sentinel through the same channel as records, so ordering is
1000    /// preserved — the writer task responds only after all preceding records are written.
1001    /// If the writer task does not respond within 100 ms, the flush times out silently.
1002    pub async fn flush(&self) {
1003        let (tx, rx) = tokio::sync::oneshot::channel::<()>();
1004        if self.tx.send(AuditCommand::Flush(tx)).await.is_ok() {
1005            let _ = tokio::time::timeout(Duration::from_millis(100), rx).await;
1006        }
1007    }
1008
1009    /// Number of records dropped due to a full channel.
1010    #[must_use]
1011    pub fn dropped_count(&self) -> u64 {
1012        self.drop_counter.load(std::sync::atomic::Ordering::Relaxed)
1013    }
1014}
1015
1016// ── Tests ─────────────────────────────────────────────────────────────────────
1017
1018#[cfg(test)]
1019mod tests {
1020    use super::*;
1021
1022    // ── PageId ────────────────────────────────────────────────────────────────
1023
1024    #[test]
1025    fn page_id_same_input_same_output() {
1026        let a = PageId::compute(PageType::ToolOutput, "tool:shell", b"exit_code: 0");
1027        let b = PageId::compute(PageType::ToolOutput, "tool:shell", b"exit_code: 0");
1028        assert_eq!(a, b);
1029    }
1030
1031    #[test]
1032    fn page_id_different_type_different_id() {
1033        let a = PageId::compute(PageType::ToolOutput, "tool:shell", b"body");
1034        let b = PageId::compute(PageType::ConversationTurn, "tool:shell", b"body");
1035        assert_ne!(a, b);
1036    }
1037
1038    #[test]
1039    fn page_id_starts_with_blake3_prefix() {
1040        let id = PageId::compute(PageType::SystemContext, "system:persona", b"some content");
1041        assert!(id.0.starts_with("blake3:"));
1042    }
1043
1044    // ── classify ──────────────────────────────────────────────────────────────
1045
1046    #[test]
1047    fn classify_tool_output_prefix() {
1048        assert_eq!(
1049            classify("[tool_output] shell exit_code: 0"),
1050            PageType::ToolOutput
1051        );
1052        assert_eq!(classify("[tool:shell] result"), PageType::ToolOutput);
1053    }
1054
1055    #[test]
1056    fn classify_memory_prefixes() {
1057        assert_eq!(
1058            classify("[cross-session context]\nsome recall"),
1059            PageType::MemoryExcerpt
1060        );
1061        assert_eq!(
1062            classify("[semantic recall]\n- [user] hello"),
1063            PageType::MemoryExcerpt
1064        );
1065        assert_eq!(classify("[known facts]\n- fact"), PageType::MemoryExcerpt);
1066        assert_eq!(
1067            classify("[conversation summaries]\n- 1-10: summary"),
1068            PageType::MemoryExcerpt
1069        );
1070    }
1071
1072    #[test]
1073    fn classify_system_prefixes() {
1074        assert_eq!(classify("[Persona context]\nfact"), PageType::SystemContext);
1075        assert_eq!(classify("[system prompt]"), PageType::SystemContext);
1076    }
1077
1078    #[test]
1079    fn classify_fallback_is_conversation_turn() {
1080        assert_eq!(classify("Hello, world!"), PageType::ConversationTurn);
1081        assert_eq!(classify(""), PageType::ConversationTurn);
1082    }
1083
1084    // ── detect_schema_hint ────────────────────────────────────────────────────
1085
1086    #[test]
1087    fn detect_schema_hint_json() {
1088        assert_eq!(
1089            detect_schema_hint(r#"{"key": "val"}"#, false),
1090            SchemaHint::Json
1091        );
1092        assert_eq!(detect_schema_hint("[1,2,3]", false), SchemaHint::Json);
1093    }
1094
1095    #[test]
1096    fn detect_schema_hint_diff() {
1097        assert_eq!(detect_schema_hint("--- a\n+++ b", false), SchemaHint::Diff);
1098    }
1099
1100    #[test]
1101    fn detect_schema_hint_binary() {
1102        assert_eq!(detect_schema_hint("anything", true), SchemaHint::Binary);
1103    }
1104
1105    #[test]
1106    fn detect_schema_hint_text_fallback() {
1107        assert_eq!(detect_schema_hint("plain text", false), SchemaHint::Text);
1108    }
1109
1110    // ── ToolOutputInvariant ───────────────────────────────────────────────────
1111
1112    #[test]
1113    fn tool_output_invariant_passes_when_fields_present() {
1114        let inv = ToolOutputInvariant;
1115        let page = TypedPage::new(
1116            PageType::ToolOutput,
1117            PageOrigin::ToolPair {
1118                tool_name: "shell".into(),
1119            },
1120            100,
1121            Arc::from("[tool_output] shell exit_code: 0\nsome output"),
1122            Some(SchemaHint::Text),
1123        );
1124        let compacted = CompactedPage {
1125            body: Arc::from("shell exit_status: 0\nkey: value"),
1126            tokens: 10,
1127        };
1128        assert!(inv.verify(&page, &compacted).is_ok());
1129    }
1130
1131    #[test]
1132    fn tool_output_invariant_fails_missing_tool_name() {
1133        let inv = ToolOutputInvariant;
1134        let page = TypedPage::new(
1135            PageType::ToolOutput,
1136            PageOrigin::ToolPair {
1137                tool_name: "my_tool".into(),
1138            },
1139            100,
1140            Arc::from("[tool_output] my_tool exit_code: 0"),
1141            Some(SchemaHint::Text),
1142        );
1143        let compacted = CompactedPage {
1144            body: Arc::from("exit_status: 0"),
1145            tokens: 5,
1146        };
1147        let result = inv.verify(&page, &compacted);
1148        assert!(result.is_err());
1149        let violations = result.unwrap_err();
1150        assert!(violations.iter().any(|v| v.missing_field == "tool_name"));
1151    }
1152
1153    #[test]
1154    fn tool_output_invariant_passes_for_binary() {
1155        let inv = ToolOutputInvariant;
1156        let page = TypedPage::new(
1157            PageType::ToolOutput,
1158            PageOrigin::ToolPair {
1159                tool_name: "binary_tool".into(),
1160            },
1161            100,
1162            Arc::from("<binary:1024 bytes>"),
1163            Some(SchemaHint::Binary),
1164        );
1165        let compacted = CompactedPage {
1166            body: Arc::from("<binary:1024 bytes> (archived)"),
1167            tokens: 5,
1168        };
1169        assert!(inv.verify(&page, &compacted).is_ok());
1170    }
1171
1172    // ── SystemContextInvariant ────────────────────────────────────────────────
1173
1174    #[test]
1175    fn system_context_invariant_passes_with_pointer() {
1176        let inv = SystemContextInvariant;
1177        let page = TypedPage::new(
1178            PageType::SystemContext,
1179            PageOrigin::System {
1180                key: "persona".into(),
1181            },
1182            200,
1183            Arc::from("[Persona context]\nsome persona info"),
1184            None,
1185        );
1186        let compacted = CompactedPage {
1187            body: Arc::from("[system-ptr:blake3:abcdef123456]"),
1188            tokens: 3,
1189        };
1190        assert!(inv.verify(&page, &compacted).is_ok());
1191    }
1192
1193    #[test]
1194    fn system_context_invariant_fails_without_pointer() {
1195        let inv = SystemContextInvariant;
1196        let page = TypedPage::new(
1197            PageType::SystemContext,
1198            PageOrigin::System {
1199                key: "persona".into(),
1200            },
1201            200,
1202            Arc::from("[Persona context]\nsome persona info"),
1203            None,
1204        );
1205        let compacted = CompactedPage {
1206            body: Arc::from("This is a paraphrase of persona info"),
1207            tokens: 10,
1208        };
1209        let result = inv.verify(&page, &compacted);
1210        assert!(result.is_err());
1211        let violations = result.unwrap_err();
1212        assert!(violations.iter().any(|v| v.missing_field == "pointer"));
1213    }
1214
1215    // ── InvariantRegistry ─────────────────────────────────────────────────────
1216
1217    #[test]
1218    fn registry_covers_all_page_types() {
1219        let reg = InvariantRegistry::default();
1220        for pt in [
1221            PageType::ToolOutput,
1222            PageType::ConversationTurn,
1223            PageType::MemoryExcerpt,
1224            PageType::SystemContext,
1225        ] {
1226            assert!(reg.get(pt).is_some(), "missing invariant for {pt:?}");
1227        }
1228    }
1229
1230    #[test]
1231    fn registry_returns_correct_page_type() {
1232        let reg = InvariantRegistry::default();
1233        assert_eq!(
1234            reg.get(PageType::ToolOutput).unwrap().page_type(),
1235            PageType::ToolOutput
1236        );
1237        assert_eq!(
1238            reg.get(PageType::SystemContext).unwrap().page_type(),
1239            PageType::SystemContext
1240        );
1241    }
1242
1243    // ── InvariantRegistry::enforce ────────────────────────────────────────────
1244
1245    #[test]
1246    fn enforce_ok_for_valid_system_pointer() {
1247        let reg = InvariantRegistry::default();
1248        let page = TypedPage::new(
1249            PageType::SystemContext,
1250            PageOrigin::System {
1251                key: "persona".into(),
1252            },
1253            50,
1254            Arc::from("[Persona context]\nrules"),
1255            None,
1256        );
1257        let compacted = CompactedPage {
1258            body: Arc::from("[system-ptr:blake3:aabbccdd11223344]"),
1259            tokens: 3,
1260        };
1261        assert!(reg.enforce(&page, &compacted).is_ok());
1262    }
1263
1264    #[test]
1265    fn enforce_err_for_paraphrased_system_context() {
1266        let reg = InvariantRegistry::default();
1267        let page = TypedPage::new(
1268            PageType::SystemContext,
1269            PageOrigin::System {
1270                key: "persona".into(),
1271            },
1272            50,
1273            Arc::from("[Persona context]\nrules"),
1274            None,
1275        );
1276        let compacted = CompactedPage {
1277            body: Arc::from("The persona says to be helpful."),
1278            tokens: 7,
1279        };
1280        let result = reg.enforce(&page, &compacted);
1281        assert!(result.is_err());
1282        assert!(
1283            result
1284                .unwrap_err()
1285                .iter()
1286                .any(|v| v.missing_field == "pointer")
1287        );
1288    }
1289
1290    #[test]
1291    fn enforce_ok_for_conversation_turn_with_role() {
1292        let reg = InvariantRegistry::default();
1293        let page = TypedPage::new(
1294            PageType::ConversationTurn,
1295            PageOrigin::Turn {
1296                message_id: "42".into(),
1297            },
1298            30,
1299            Arc::from("Hello from user"),
1300            None,
1301        );
1302        let compacted = CompactedPage {
1303            body: Arc::from("user asked about Rust"),
1304            tokens: 5,
1305        };
1306        assert!(reg.enforce(&page, &compacted).is_ok());
1307    }
1308
1309    // ── MemoryExcerptInvariant ────────────────────────────────────────────────
1310
1311    #[test]
1312    fn memory_excerpt_invariant_passes_when_label_present() {
1313        let inv = MemoryExcerptInvariant;
1314        let label = "semantic_recall";
1315        let page = TypedPage::new(
1316            PageType::MemoryExcerpt,
1317            PageOrigin::Excerpt {
1318                source_label: label.into(),
1319            },
1320            80,
1321            Arc::from("[semantic recall]\n- [user] hello"),
1322            None,
1323        );
1324        let compacted = CompactedPage {
1325            body: Arc::from(format!("Summary from {label}: user greeted")),
1326            tokens: 6,
1327        };
1328        assert!(inv.verify(&page, &compacted).is_ok());
1329    }
1330
1331    #[test]
1332    fn memory_excerpt_invariant_fails_when_label_missing() {
1333        let inv = MemoryExcerptInvariant;
1334        let page = TypedPage::new(
1335            PageType::MemoryExcerpt,
1336            PageOrigin::Excerpt {
1337                source_label: "graph_facts".into(),
1338            },
1339            80,
1340            Arc::from("[known facts]\n- Alice works at Acme"),
1341            None,
1342        );
1343        let compacted = CompactedPage {
1344            body: Arc::from("Alice is employed somewhere"),
1345            tokens: 5,
1346        };
1347        let result = inv.verify(&page, &compacted);
1348        assert!(result.is_err());
1349        assert!(
1350            result
1351                .unwrap_err()
1352                .iter()
1353                .any(|v| v.missing_field == "source_label")
1354        );
1355    }
1356
1357    #[test]
1358    fn memory_excerpt_invariant_passes_for_non_excerpt_origin() {
1359        let inv = MemoryExcerptInvariant;
1360        let page = TypedPage::new(
1361            PageType::MemoryExcerpt,
1362            PageOrigin::System {
1363                key: "digests".into(),
1364            },
1365            40,
1366            Arc::from("[system]"),
1367            None,
1368        );
1369        let compacted = CompactedPage {
1370            body: Arc::from("anything"),
1371            tokens: 1,
1372        };
1373        assert!(inv.verify(&page, &compacted).is_ok());
1374    }
1375
1376    // ── ConversationTurnInvariant ─────────────────────────────────────────────
1377
1378    #[test]
1379    fn conversation_turn_invariant_passes_with_role_word() {
1380        let inv = ConversationTurnInvariant;
1381        let page = TypedPage::new(
1382            PageType::ConversationTurn,
1383            PageOrigin::Turn {
1384                message_id: "1".into(),
1385            },
1386            20,
1387            Arc::from("Hello world"),
1388            None,
1389        );
1390        for body in &["user: hi", "assistant replied", "system note"] {
1391            let compacted = CompactedPage {
1392                body: Arc::from(*body),
1393                tokens: 2,
1394            };
1395            assert!(inv.verify(&page, &compacted).is_ok(), "body={body}");
1396        }
1397    }
1398
1399    #[test]
1400    fn conversation_turn_invariant_fails_without_role_word() {
1401        let inv = ConversationTurnInvariant;
1402        let page = TypedPage::new(
1403            PageType::ConversationTurn,
1404            PageOrigin::Turn {
1405                message_id: "2".into(),
1406            },
1407            20,
1408            Arc::from("some turn content"),
1409            None,
1410        );
1411        let compacted = CompactedPage {
1412            body: Arc::from("content was summarized"),
1413            tokens: 3,
1414        };
1415        let result = inv.verify(&page, &compacted);
1416        assert!(result.is_err());
1417        assert!(
1418            result
1419                .unwrap_err()
1420                .iter()
1421                .any(|v| v.missing_field == "role")
1422        );
1423    }
1424
1425    // ── CompactionAuditSink ───────────────────────────────────────────────────
1426
1427    #[tokio::test]
1428    async fn audit_sink_jsonl_roundtrip() {
1429        let dir = tempfile::tempdir().unwrap();
1430        let path = dir.path().join("audit.jsonl");
1431
1432        let sink = CompactionAuditSink::open(&path, 64).await.unwrap();
1433        let record = CompactedPageRecord {
1434            ts: "2026-04-19T00:00:00Z".into(),
1435            turn_id: "1".into(),
1436            page_id: "blake3:aabbccdd".into(),
1437            page_type: PageType::ToolOutput,
1438            origin: PageOrigin::ToolPair {
1439                tool_name: "shell".into(),
1440            },
1441            original_tokens: 100,
1442            compacted_tokens: 20,
1443            fidelity_level: "structured_summary_v1".into(),
1444            invariant_version: 1,
1445            provider_name: "test".into(),
1446            violations: vec![],
1447            classification_fallback: false,
1448        };
1449        sink.send(record);
1450
1451        // Drop the sink to close the channel and let the writer task flush.
1452        drop(sink);
1453        // Give the writer task time to finish.
1454        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1455
1456        let contents = std::fs::read_to_string(&path).unwrap();
1457        assert!(!contents.is_empty(), "audit file should not be empty");
1458        let parsed: serde_json::Value = serde_json::from_str(contents.trim()).unwrap();
1459        assert_eq!(parsed["page_type"], "tool_output");
1460        assert_eq!(parsed["turn_id"], "1");
1461        assert_eq!(parsed["provider_name"], "test");
1462    }
1463
1464    #[tokio::test]
1465    async fn audit_sink_drop_counter_increments_when_full() {
1466        let dir = tempfile::tempdir().unwrap();
1467        let path = dir.path().join("audit_full.jsonl");
1468
1469        // Capacity 1: first send fills the channel, subsequent sends are dropped.
1470        let sink = CompactionAuditSink::open(&path, 1).await.unwrap();
1471
1472        let make_record = || CompactedPageRecord {
1473            ts: "2026-04-19T00:00:00Z".into(),
1474            turn_id: "x".into(),
1475            page_id: "blake3:00".into(),
1476            page_type: PageType::ConversationTurn,
1477            origin: PageOrigin::Turn {
1478                message_id: "0".into(),
1479            },
1480            original_tokens: 10,
1481            compacted_tokens: 5,
1482            fidelity_level: "semantic_summary_v1".into(),
1483            invariant_version: 1,
1484            provider_name: "test".into(),
1485            violations: vec![],
1486            classification_fallback: false,
1487        };
1488
1489        // Send enough records to guarantee overflow.
1490        for _ in 0..10 {
1491            sink.send(make_record());
1492        }
1493
1494        assert!(
1495            sink.dropped_count() > 0,
1496            "expected at least one dropped record"
1497        );
1498    }
1499
1500    #[tokio::test]
1501    async fn audit_sink_flush_does_not_panic() {
1502        let dir = tempfile::tempdir().unwrap();
1503        let path = dir.path().join("audit_flush.jsonl");
1504        let sink = CompactionAuditSink::open(&path, 16).await.unwrap();
1505        // flush on an empty sink must not panic or deadlock.
1506        sink.flush().await;
1507    }
1508
1509    // ── classify_with_role ────────────────────────────────────────────────────
1510
1511    #[test]
1512    fn classify_with_role_system_flag_overrides_fallback() {
1513        assert_eq!(
1514            classify_with_role("You are a helpful assistant.", true),
1515            PageType::SystemContext
1516        );
1517    }
1518
1519    #[test]
1520    fn classify_with_role_prefix_wins_over_system_flag() {
1521        assert_eq!(
1522            classify_with_role("[tool_output] exit_code: 0", false),
1523            PageType::ToolOutput
1524        );
1525    }
1526
1527    #[test]
1528    fn classify_with_role_false_still_falls_back_to_conversation_turn() {
1529        assert_eq!(
1530            classify_with_role("random prose without markers", false),
1531            PageType::ConversationTurn
1532        );
1533    }
1534
1535    // ── check_json_structural_key (via ToolOutputInvariant) ───────────────────
1536
1537    #[test]
1538    fn tool_output_json_structural_check_passes_when_key_preserved() {
1539        let inv = ToolOutputInvariant;
1540        let original_body = r#"{"exit_code": 0, "stdout": "ok"}"#;
1541        let page = TypedPage::new(
1542            PageType::ToolOutput,
1543            PageOrigin::ToolPair {
1544                tool_name: "shell".into(),
1545            },
1546            50,
1547            Arc::from(original_body),
1548            Some(SchemaHint::Json),
1549        );
1550        // Compacted body references "exit_code" and "shell".
1551        let compacted = CompactedPage {
1552            body: Arc::from("shell exit_code: 0, stdout was ok"),
1553            tokens: 8,
1554        };
1555        assert!(inv.verify(&page, &compacted).is_ok());
1556    }
1557
1558    #[test]
1559    fn tool_output_json_structural_check_fails_when_no_key_preserved() {
1560        let inv = ToolOutputInvariant;
1561        let original_body = r#"{"some_field": "value", "other_field": 42}"#;
1562        let page = TypedPage::new(
1563            PageType::ToolOutput,
1564            PageOrigin::ToolPair {
1565                tool_name: "my_tool".into(),
1566            },
1567            50,
1568            Arc::from(original_body),
1569            Some(SchemaHint::Json),
1570        );
1571        // Compacted body references tool name and status but none of the JSON keys.
1572        let compacted = CompactedPage {
1573            body: Arc::from("my_tool exit_status: 0 completed successfully"),
1574            tokens: 7,
1575        };
1576        let result = inv.verify(&page, &compacted);
1577        assert!(result.is_err());
1578        let violations = result.unwrap_err();
1579        assert!(
1580            violations
1581                .iter()
1582                .any(|v| v.missing_field == "structural_key")
1583        );
1584    }
1585
1586    // ── Regression: F1 — capacity=0 must not panic ────────────────────────────
1587
1588    #[tokio::test]
1589    async fn audit_sink_capacity_zero_does_not_panic() {
1590        let dir = tempfile::tempdir().unwrap();
1591        let path = dir.path().join("cap0.jsonl");
1592        // capacity=0 used to panic in tokio::sync::mpsc::channel(0); must clamp to 1.
1593        let sink = CompactionAuditSink::open(&path, 0).await.unwrap();
1594        sink.flush().await;
1595    }
1596
1597    // ── Regression: F3 — non-ASCII body must not panic on prefix slice ────────
1598
1599    #[test]
1600    fn classify_with_role_non_ascii_body_does_not_panic() {
1601        // CJK and emoji span multiple bytes; a naive &body[..80] would panic at a
1602        // mid-character byte boundary. classify_with_role must not panic for any input.
1603        let cjk = "你好世界".repeat(20); // 80+ bytes, 4 bytes each
1604        let emoji = "🦀".repeat(30); // 120+ bytes, 4 bytes each
1605        let mixed = "abc🦀中文".repeat(15);
1606
1607        // None of these must panic:
1608        let _ = classify_with_role(&cjk, false);
1609        let _ = classify_with_role(&emoji, false);
1610        let _ = classify_with_role(&mixed, false);
1611    }
1612}