Skip to main content

zeph_context/
typed_page.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Typed page classification and minimum-fidelity invariants for context compaction.
5//!
6//! Every context segment entering the assembler is tagged with a [`PageType`] and
7//! wrapped in a [`TypedPage`]. The [`PageInvariant`] trait declares the fidelity
8//! contract enforced at every compaction boundary.
9//!
10//! Classification is deterministic and side-effect free — no I/O, no LLM calls.
11//!
12//! # Architecture
13//!
14//! This module lives in `zeph-context` to keep classification logic co-located with
15//! the assembler. No dependency on `zeph-memory` is introduced here.
16//!
17//! # Feature flag
18//!
19//! All typed-page functionality is gated behind the
20//! `[memory.compaction.typed_pages] enabled = true` config key. When disabled the
21//! assembler falls back to the legacy untyped path without behaviour change.
22
23use std::sync::Arc;
24use std::time::Duration;
25
26use serde::{Deserialize, Serialize};
27
28// ── PageType ──────────────────────────────────────────────────────────────────
29
30/// Classification of a context segment for compaction purposes.
31///
32/// The variant determines which [`PageInvariant`] is enforced and what shape the
33/// compacted summary must have.
34///
35/// # Invariant
36///
37/// Every [`TypedPage`] carries exactly one `PageType`. Unclassifiable segments
38/// default to [`PageType::ConversationTurn`] (see [`classify`]).
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
40#[serde(rename_all = "snake_case")]
41pub enum PageType {
42    /// A tool request/response pair sourced from memory or the current turn.
43    ToolOutput,
44    /// A user or assistant message that does not carry a tool role.
45    ConversationTurn,
46    /// Cross-session context, past summaries, or graph-fact recall injections.
47    MemoryExcerpt,
48    /// Session digest, persona, skill instructions, or compression guidelines.
49    SystemContext,
50}
51
52impl std::fmt::Display for PageType {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        match self {
55            Self::ToolOutput => f.write_str("tool_output"),
56            Self::ConversationTurn => f.write_str("conversation_turn"),
57            Self::MemoryExcerpt => f.write_str("memory_excerpt"),
58            Self::SystemContext => f.write_str("system_context"),
59        }
60    }
61}
62
63// ── PageOrigin ────────────────────────────────────────────────────────────────
64
65/// Provenance of a [`TypedPage`], serialised into audit records.
66#[derive(Debug, Clone, Serialize, Deserialize)]
67#[serde(tag = "kind", rename_all = "snake_case")]
68pub enum PageOrigin {
69    /// Tool request/response pair.
70    ToolPair {
71        /// Name of the tool that produced this output.
72        tool_name: String,
73    },
74    /// User or assistant conversation turn.
75    Turn {
76        /// Opaque message identifier (numeric message id as string).
77        message_id: String,
78    },
79    /// Injected from memory (cross-session, summary, graph-facts, etc.).
80    Excerpt {
81        /// Human-readable label identifying the memory source.
82        source_label: String,
83    },
84    /// Session-level system context (persona, skills, digest).
85    System {
86        /// Logical key for this system context block (e.g. `"persona"`, `"skills"`).
87        key: String,
88    },
89}
90
91// ── SchemaHint ────────────────────────────────────────────────────────────────
92
93/// Body format hint for [`PageType::ToolOutput`] pages.
94///
95/// Used by the invariant to select the correct structured-summary prompt.
96#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
97#[serde(rename_all = "snake_case")]
98pub enum SchemaHint {
99    /// Body is valid JSON (object or array).
100    Json,
101    /// Body is UTF-8 text (log lines, prose, etc.).
102    Text,
103    /// Body is a unified diff.
104    Diff,
105    /// Body is a tab- or comma-separated table.
106    Table,
107    /// Body is non-UTF-8 binary data.
108    Binary,
109}
110
111// ── PageId ────────────────────────────────────────────────────────────────────
112
113/// Stable content-addressed identifier for a [`TypedPage`].
114///
115/// Computed as BLAKE3 over `page_type_tag || origin_tag || body_bytes`, encoded
116/// as lowercase hex (first 16 bytes = 32 hex chars). The same input always
117/// produces the same `PageId`, enabling deduplication across turns.
118///
119/// # Semantics
120///
121/// `PageId` is a **content hash**: identical source bytes (same page type, same
122/// origin key, same body) always produce the same id. This means that the same
123/// tool output appearing in two different turns produces the same `PageId`.
124/// Callers that need per-turn provenance must use `turn_id` from the audit record
125/// — `PageId` is for deduplication, not for uniqueness across turns.
126#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
127pub struct PageId(pub String);
128
129impl PageId {
130    /// Compute a [`PageId`] from the page type, origin key, and body bytes.
131    #[must_use]
132    pub fn compute(page_type: PageType, origin_key: &str, body: &[u8]) -> Self {
133        let mut hasher = blake3::Hasher::new();
134        hasher.update(page_type.to_string().as_bytes());
135        hasher.update(b"|");
136        hasher.update(origin_key.as_bytes());
137        hasher.update(b"|");
138        hasher.update(body);
139        let hash = hasher.finalize();
140        // Use first 16 bytes (128 bits) — sufficient for deduplication purposes.
141        let mut hex = String::with_capacity(32);
142        for b in &hash.as_bytes()[..16] {
143            use std::fmt::Write as _;
144            let _ = write!(hex, "{b:02x}");
145        }
146        Self(format!("blake3:{hex}"))
147    }
148}
149
150// ── TypedPage ─────────────────────────────────────────────────────────────────
151
152/// A classified context segment ready for invariant-aware compaction.
153///
154/// `TypedPage` is the unit of work passed to compaction boundaries. The
155/// [`PageId`] is content-stable: the same source bytes always produce the same
156/// id, enabling the compactor to skip already-compacted pages.
157#[derive(Debug, Clone)]
158pub struct TypedPage {
159    /// Stable content-addressed identifier.
160    pub page_id: PageId,
161    /// Classification determining which invariant applies.
162    pub page_type: PageType,
163    /// Provenance of this page (for audit records).
164    pub origin: PageOrigin,
165    /// Token count of the original body.
166    pub tokens: u32,
167    /// Body text shared across potential clones.
168    pub body: Arc<str>,
169    /// Body format hint (populated for `ToolOutput` only; `None` otherwise).
170    pub schema_hint: Option<SchemaHint>,
171}
172
173impl TypedPage {
174    /// Construct a new [`TypedPage`], computing its [`PageId`] from content.
175    #[must_use]
176    pub fn new(
177        page_type: PageType,
178        origin: PageOrigin,
179        tokens: u32,
180        body: Arc<str>,
181        schema_hint: Option<SchemaHint>,
182    ) -> Self {
183        let origin_key = origin_key_for(&origin);
184        let page_id = PageId::compute(page_type, &origin_key, body.as_bytes());
185        Self {
186            page_id,
187            page_type,
188            origin,
189            tokens,
190            body,
191            schema_hint,
192        }
193    }
194}
195
196fn origin_key_for(origin: &PageOrigin) -> String {
197    match origin {
198        PageOrigin::ToolPair { tool_name } => format!("tool:{tool_name}"),
199        PageOrigin::Turn { message_id } => format!("turn:{message_id}"),
200        PageOrigin::Excerpt { source_label } => format!("excerpt:{source_label}"),
201        PageOrigin::System { key } => format!("system:{key}"),
202    }
203}
204
205// ── FidelityContract ──────────────────────────────────────────────────────────
206
207/// The set of fields that must be present in a compacted page.
208///
209/// Returned by [`PageInvariant::minimum_fidelity`] and checked by
210/// [`PageInvariant::verify`] after summarization.
211#[derive(Debug, Clone)]
212pub struct FidelityContract {
213    /// Human-readable label for this contract version (e.g. `"structured_summary_v1"`).
214    pub fidelity_level: &'static str,
215    /// Schema version integer included in audit records.
216    pub invariant_version: u8,
217    /// Fields that must appear in the compacted body text.
218    pub required_fields: &'static [&'static str],
219}
220
221// ── FidelityViolation ─────────────────────────────────────────────────────────
222
223/// Describes why an invariant check failed after compaction.
224///
225/// A violation is a hard error: the compacted page is dropped and an audit
226/// record with `violations` is emitted.
227#[derive(Debug, Clone, Serialize)]
228pub struct FidelityViolation {
229    /// The field or property that was expected but missing.
230    pub missing_field: String,
231    /// Human-readable explanation of the violation.
232    pub detail: String,
233}
234
235// ── CompactedPage ─────────────────────────────────────────────────────────────
236
237/// The output of a compaction attempt, passed to [`PageInvariant::verify`].
238#[derive(Debug, Clone)]
239pub struct CompactedPage {
240    /// The summarized body text produced by the compaction provider.
241    pub body: Arc<str>,
242    /// Token count of the compacted body.
243    pub tokens: u32,
244}
245
246// ── PageInvariant trait ───────────────────────────────────────────────────────
247
248/// Minimum-fidelity contract for a single [`PageType`].
249///
250/// Implementors declare what a compacted page must contain ([`minimum_fidelity`])
251/// and verify that the actual output honours the contract ([`verify`]).
252///
253/// The trait is object-safe so implementations can be stored in a
254/// `HashMap<PageType, Box<dyn PageInvariant>>` registry.
255///
256/// # Contract
257///
258/// - [`verify`] MUST NOT perform I/O or call an LLM.
259/// - A failed [`verify`] means the compacted page is dropped — it is NEVER
260///   injected in degraded form.
261///
262/// [`minimum_fidelity`]: PageInvariant::minimum_fidelity
263/// [`verify`]: PageInvariant::verify
264pub trait PageInvariant: Send + Sync {
265    /// The page type this invariant governs.
266    fn page_type(&self) -> PageType;
267
268    /// Return the fidelity contract required for a given page.
269    fn minimum_fidelity(&self, page: &TypedPage) -> FidelityContract;
270
271    /// Verify that `compacted` satisfies the fidelity contract derived from `original`.
272    ///
273    /// # Errors
274    ///
275    /// Returns a non-empty [`Vec<FidelityViolation>`] when one or more required
276    /// fields are absent from the compacted body.
277    fn verify(
278        &self,
279        original: &TypedPage,
280        compacted: &CompactedPage,
281    ) -> Result<(), Vec<FidelityViolation>>;
282}
283
284// ── Per-type invariant implementations ───────────────────────────────────────
285
286/// Invariant for [`PageType::ToolOutput`] pages.
287///
288/// The compacted body must contain the tool name, an exit/status indicator,
289/// and at least one structural key from the original output.
290pub struct ToolOutputInvariant;
291
292impl PageInvariant for ToolOutputInvariant {
293    fn page_type(&self) -> PageType {
294        PageType::ToolOutput
295    }
296
297    fn minimum_fidelity(&self, _page: &TypedPage) -> FidelityContract {
298        FidelityContract {
299            fidelity_level: "structured_summary_v1",
300            invariant_version: 1,
301            required_fields: &["tool_name", "exit_status"],
302        }
303    }
304
305    fn verify(
306        &self,
307        original: &TypedPage,
308        compacted: &CompactedPage,
309    ) -> Result<(), Vec<FidelityViolation>> {
310        let body = compacted.body.as_ref();
311        // For binary pages the body marker is injected by the compactor, skip field checks.
312        if original.schema_hint == Some(SchemaHint::Binary) {
313            return Ok(());
314        }
315
316        let mut violations = Vec::new();
317
318        // The compacted body must reference the tool name.
319        let tool_name = match &original.origin {
320            PageOrigin::ToolPair { tool_name } => tool_name.as_str(),
321            _ => "",
322        };
323        if !tool_name.is_empty() && !body.contains(tool_name) {
324            violations.push(FidelityViolation {
325                missing_field: "tool_name".into(),
326                detail: format!("compacted body does not reference tool '{tool_name}'"),
327            });
328        }
329
330        // The compacted body must contain at least one exit / status indicator.
331        let has_status = body.contains("exit_status")
332            || body.contains("exit_code")
333            || body.contains("status:")
334            || body.contains("Status:")
335            || body.contains("exit:")
336            || body.contains("rc:");
337        if !has_status {
338            violations.push(FidelityViolation {
339                missing_field: "exit_status".into(),
340                detail: "compacted body does not contain an exit status indicator".into(),
341            });
342        }
343
344        // For JSON-schema tool outputs, verify that at least one top-level JSON
345        // field name from the original body is present in the compacted body
346        // (FR-003: structural keys must be preserved, not just exit-status markers).
347        if original.schema_hint == Some(SchemaHint::Json) {
348            let original_body = original.body.as_ref();
349            let preserved = check_json_structural_key(original_body, body);
350            if !preserved {
351                violations.push(FidelityViolation {
352                    missing_field: "structural_key".into(),
353                    detail: "compacted JSON tool output does not reference any top-level field \
354                             name from the original output"
355                        .into(),
356                });
357            }
358        }
359
360        if violations.is_empty() {
361            Ok(())
362        } else {
363            Err(violations)
364        }
365    }
366}
367
368/// Check that at least one top-level JSON key from `original` appears in `compacted`.
369///
370/// Parses `original` as a JSON object and returns `true` when any top-level key
371/// string is a substring of `compacted`. Returns `true` (no violation) when
372/// `original` is not a valid JSON object — the caller already checked schema hint.
373fn check_json_structural_key(original: &str, compacted: &str) -> bool {
374    let Ok(value) = serde_json::from_str::<serde_json::Value>(original) else {
375        return true;
376    };
377    let Some(obj) = value.as_object() else {
378        return true;
379    };
380    if obj.is_empty() {
381        return true;
382    }
383    obj.keys().any(|k| compacted.contains(k.as_str()))
384}
385
386/// Invariant for [`PageType::ConversationTurn`] pages.
387///
388/// The compacted body must preserve a role indicator and some meaningful content.
389pub struct ConversationTurnInvariant;
390
391impl PageInvariant for ConversationTurnInvariant {
392    fn page_type(&self) -> PageType {
393        PageType::ConversationTurn
394    }
395
396    fn minimum_fidelity(&self, _page: &TypedPage) -> FidelityContract {
397        FidelityContract {
398            fidelity_level: "semantic_summary_v1",
399            invariant_version: 1,
400            required_fields: &["role"],
401        }
402    }
403
404    fn verify(
405        &self,
406        _original: &TypedPage,
407        compacted: &CompactedPage,
408    ) -> Result<(), Vec<FidelityViolation>> {
409        let body = compacted.body.as_ref();
410        let has_role =
411            body.contains("user") || body.contains("assistant") || body.contains("system");
412        if !has_role {
413            return Err(vec![FidelityViolation {
414                missing_field: "role".into(),
415                detail: "compacted turn does not identify a speaker role".into(),
416            }]);
417        }
418        Ok(())
419    }
420}
421
422/// Invariant for [`PageType::MemoryExcerpt`] pages.
423///
424/// The compacted body must retain the source label and a message id reference.
425pub struct MemoryExcerptInvariant;
426
427impl PageInvariant for MemoryExcerptInvariant {
428    fn page_type(&self) -> PageType {
429        PageType::MemoryExcerpt
430    }
431
432    fn minimum_fidelity(&self, _page: &TypedPage) -> FidelityContract {
433        FidelityContract {
434            fidelity_level: "excerpt_summary_v1",
435            invariant_version: 1,
436            required_fields: &["source_label"],
437        }
438    }
439
440    fn verify(
441        &self,
442        original: &TypedPage,
443        compacted: &CompactedPage,
444    ) -> Result<(), Vec<FidelityViolation>> {
445        let source_label = match &original.origin {
446            PageOrigin::Excerpt { source_label } => source_label.as_str(),
447            _ => return Ok(()),
448        };
449        if !compacted.body.contains(source_label) {
450            return Err(vec![FidelityViolation {
451                missing_field: "source_label".into(),
452                detail: format!("compacted excerpt does not contain source label '{source_label}'"),
453            }]);
454        }
455        Ok(())
456    }
457}
458
459/// Invariant for [`PageType::SystemContext`] pages.
460///
461/// System context MUST NOT be paraphrased. Compaction replaces it with a
462/// pointer record; any body other than the pointer prefix is a violation.
463pub struct SystemContextInvariant;
464
465/// Pointer prefix that the compactor writes for `SystemContext` pages.
466pub const SYSTEM_POINTER_PREFIX: &str = "[system-ptr:";
467
468impl PageInvariant for SystemContextInvariant {
469    fn page_type(&self) -> PageType {
470        PageType::SystemContext
471    }
472
473    fn minimum_fidelity(&self, _page: &TypedPage) -> FidelityContract {
474        FidelityContract {
475            fidelity_level: "pointer_replace_v1",
476            invariant_version: 1,
477            required_fields: &["pointer"],
478        }
479    }
480
481    fn verify(
482        &self,
483        _original: &TypedPage,
484        compacted: &CompactedPage,
485    ) -> Result<(), Vec<FidelityViolation>> {
486        if !compacted.body.starts_with(SYSTEM_POINTER_PREFIX) {
487            return Err(vec![FidelityViolation {
488                missing_field: "pointer".into(),
489                detail: format!(
490                    "SystemContext page was not pointer-replaced \
491                     (body does not start with '{SYSTEM_POINTER_PREFIX}')"
492                ),
493            }]);
494        }
495        Ok(())
496    }
497}
498
499// ── InvariantRegistry ─────────────────────────────────────────────────────────
500
501/// Registry mapping each [`PageType`] to its [`PageInvariant`] implementation.
502///
503/// Built once and shared via `Arc` so tests can swap in a mock registry.
504///
505/// # Examples
506///
507/// ```
508/// use zeph_context::typed_page::{InvariantRegistry, PageType};
509///
510/// let reg = InvariantRegistry::default();
511/// let inv = reg.get(PageType::ToolOutput).unwrap();
512/// assert_eq!(inv.page_type(), PageType::ToolOutput);
513/// ```
514pub struct InvariantRegistry {
515    tool_output: Box<dyn PageInvariant>,
516    conversation_turn: Box<dyn PageInvariant>,
517    memory_excerpt: Box<dyn PageInvariant>,
518    system_context: Box<dyn PageInvariant>,
519}
520
521impl Default for InvariantRegistry {
522    fn default() -> Self {
523        Self {
524            tool_output: Box::new(ToolOutputInvariant),
525            conversation_turn: Box::new(ConversationTurnInvariant),
526            memory_excerpt: Box::new(MemoryExcerptInvariant),
527            system_context: Box::new(SystemContextInvariant),
528        }
529    }
530}
531
532impl InvariantRegistry {
533    /// Look up the invariant for a given [`PageType`].
534    ///
535    /// Always returns `Some` for the four built-in variants.
536    #[must_use]
537    pub fn get(&self, page_type: PageType) -> Option<&dyn PageInvariant> {
538        match page_type {
539            PageType::ToolOutput => Some(self.tool_output.as_ref()),
540            PageType::ConversationTurn => Some(self.conversation_turn.as_ref()),
541            PageType::MemoryExcerpt => Some(self.memory_excerpt.as_ref()),
542            PageType::SystemContext => Some(self.system_context.as_ref()),
543        }
544    }
545
546    /// Verify that `compacted` satisfies the invariant for `original` at a compaction boundary.
547    ///
548    /// This is the primary entry point for the compactor — it wraps `verify()` in a
549    /// `tracing::info_span!` per NFR-009 so every compaction boundary is observable.
550    ///
551    /// Returns `Ok(())` when the invariant is satisfied, or the violation list on failure.
552    ///
553    /// # Errors
554    ///
555    /// Propagates [`FidelityViolation`]s from the registered invariant implementation.
556    pub fn enforce(
557        &self,
558        original: &TypedPage,
559        compacted: &CompactedPage,
560    ) -> Result<(), Vec<FidelityViolation>> {
561        let _span = tracing::info_span!(
562            "context.compaction.typed_page",
563            page_type = %original.page_type,
564            page_id = %original.page_id.0,
565            original_tokens = original.tokens,
566            compacted_tokens = compacted.tokens,
567        )
568        .entered();
569
570        if let Some(inv) = self.get(original.page_type) {
571            inv.verify(original, compacted)
572        } else {
573            tracing::warn!(
574                page_type = %original.page_type,
575                "no invariant registered for page type — skipping verification"
576            );
577            Ok(())
578        }
579    }
580}
581
582// ── Classification helpers ────────────────────────────────────────────────────
583
584/// Classify a context segment by examining well-known prefix markers.
585///
586/// Classification is deterministic and performs no I/O. When the input does not
587/// match any known prefix the function defaults to [`PageType::ConversationTurn`]
588/// and logs at `WARN` level per FR-008.
589///
590/// The function emits a `context.compaction.typed_page.classify` span per NFR-009
591/// so every classification is observable in traces.
592///
593/// | Source marker | Assigned [`PageType`] |
594/// |---|---|
595/// | Starts with `[tool_output]` or `[tool:` | [`PageType::ToolOutput`] |
596/// | Starts with `[cross-session context]`, `[semantic recall]`, `[known facts]`, `[conversation summaries]` | [`PageType::MemoryExcerpt`] |
597/// | Starts with `[Persona context]`, `[Past experience]`, `[Memory summary]`, `[system` | [`PageType::SystemContext`] |
598/// | Everything else | [`PageType::ConversationTurn`] |
599///
600/// # Examples
601///
602/// ```
603/// use zeph_context::typed_page::{classify, PageType};
604///
605/// assert_eq!(classify("[tool_output] exit_code: 0"), PageType::ToolOutput);
606/// assert_eq!(classify("[cross-session context]\nsome recall"), PageType::MemoryExcerpt);
607/// assert_eq!(classify("[Persona context]\nfact"), PageType::SystemContext);
608/// assert_eq!(classify("Hello, world!"), PageType::ConversationTurn);
609/// ```
610#[must_use]
611pub fn classify(body: &str) -> PageType {
612    classify_with_role(body, false)
613}
614
615/// Classify a context segment, with an explicit `is_system_role` hint.
616///
617/// When `is_system_role` is `true` the segment is classified as
618/// [`PageType::SystemContext`] without prefix matching, preventing arbitrary
619/// system messages injected by the assembler from silently falling back to
620/// `ConversationTurn` (Key Invariant: "`SystemContext` pages are never paraphrased").
621///
622/// Use this variant when the caller has access to the message `Role`.
623///
624/// # Examples
625///
626/// ```
627/// use zeph_context::typed_page::{classify_with_role, PageType};
628///
629/// // A plain system message without a known prefix is still SystemContext.
630/// assert_eq!(classify_with_role("You are a helpful assistant.", true), PageType::SystemContext);
631/// // Role hint does not override ToolOutput prefix detection.
632/// assert_eq!(classify_with_role("[tool_output] exit_code: 0", false), PageType::ToolOutput);
633/// ```
634#[must_use]
635pub fn classify_with_role(body: &str, is_system_role: bool) -> PageType {
636    tracing::info_span!(
637        "context.compaction.typed_page.classify",
638        body_len = body.len()
639    )
640    .in_scope(|| classify_with_role_inner(body, is_system_role))
641}
642
643fn classify_with_role_inner(body: &str, is_system_role: bool) -> PageType {
644    // Use the same prefix constants as the assembler for consistency.
645    const TOOL_PREFIXES: &[&str] = &["[tool_output]", "[tool:", "[Tool output]"];
646    const MEMORY_PREFIXES: &[&str] = &[
647        "[cross-session context]",
648        "[semantic recall]",
649        "[known facts]",
650        "[conversation summaries]",
651        "[past corrections]",
652        "## Relevant documents",
653    ];
654    const SYSTEM_PREFIXES: &[&str] = &[
655        "[Persona context]",
656        "[Past experience]",
657        "[Memory summary]",
658        "[system",
659        "[skill",
660        "[persona",
661        "[digest",
662        "[compression",
663    ];
664
665    let trimmed = body.trim_start();
666
667    for prefix in TOOL_PREFIXES {
668        if trimmed.starts_with(prefix) {
669            return PageType::ToolOutput;
670        }
671    }
672    for prefix in MEMORY_PREFIXES {
673        if trimmed.starts_with(prefix) {
674            return PageType::MemoryExcerpt;
675        }
676    }
677    for prefix in SYSTEM_PREFIXES {
678        if trimmed.starts_with(prefix) {
679            return PageType::SystemContext;
680        }
681    }
682
683    // When the caller signals Role::System, classify as SystemContext even if
684    // the body does not start with a known prefix.  This prevents system
685    // context injected by the assembler (e.g. plain instructions, directives)
686    // from being eligible for paraphrase.
687    if is_system_role {
688        return PageType::SystemContext;
689    }
690
691    let mut prefix_end = body.len().min(80);
692    while !body.is_char_boundary(prefix_end) {
693        prefix_end -= 1;
694    }
695    tracing::warn!(
696        body_prefix = &body[..prefix_end],
697        "typed-page classification fallback to ConversationTurn"
698    );
699    PageType::ConversationTurn
700}
701
702/// Detect [`SchemaHint`] for a [`PageType::ToolOutput`] body.
703///
704/// Returns [`SchemaHint::Binary`] when the body is not valid UTF-8 (detected via
705/// presence of replacement characters) or when the caller passes `is_binary =
706/// true`. JSON detection is heuristic (starts with `{` or `[`).
707#[must_use]
708pub fn detect_schema_hint(body: &str, is_binary: bool) -> SchemaHint {
709    if is_binary || body.contains('\u{FFFD}') {
710        return SchemaHint::Binary;
711    }
712    let trimmed = body.trim_start();
713    if trimmed.starts_with('{') || trimmed.starts_with('[') {
714        return SchemaHint::Json;
715    }
716    if trimmed.starts_with("--- ")
717        || trimmed.starts_with("+++ ")
718        || trimmed.starts_with("diff --git")
719        || trimmed.starts_with("diff -")
720    {
721        return SchemaHint::Diff;
722    }
723    // Simple table heuristic: first line contains multiple tab or pipe separators.
724    let first_line = trimmed.lines().next().unwrap_or("");
725    if first_line.matches('\t').count() >= 2 || first_line.matches('|').count() >= 2 {
726        return SchemaHint::Table;
727    }
728    SchemaHint::Text
729}
730
731// ── Audit record ──────────────────────────────────────────────────────────────
732
733/// One JSONL audit record emitted per compacted page (FR-007).
734///
735/// Written to `[memory.compaction.typed_pages] audit_path` by the audit sink
736/// before the compacted context is handed to the LLM.
737#[derive(Debug, Serialize)]
738pub struct CompactedPageRecord {
739    /// ISO-8601 timestamp when the compaction occurred.
740    pub ts: String,
741    /// Opaque turn identifier (agent turn counter as string).
742    pub turn_id: String,
743    /// Stable content-addressed page identifier.
744    pub page_id: String,
745    /// Page classification.
746    pub page_type: PageType,
747    /// Serialised page origin.
748    pub origin: PageOrigin,
749    /// Token count of the original page.
750    pub original_tokens: u32,
751    /// Token count of the compacted page.
752    pub compacted_tokens: u32,
753    /// Fidelity level label from the invariant contract.
754    pub fidelity_level: String,
755    /// Schema version integer.
756    pub invariant_version: u8,
757    /// Provider name used for summarization.
758    pub provider_name: String,
759    /// Fidelity violations encountered (empty on success).
760    pub violations: Vec<FidelityViolation>,
761    /// `true` when classification fell back to `ConversationTurn`.
762    #[serde(default, skip_serializing_if = "std::ops::Not::not")]
763    pub classification_fallback: bool,
764}
765
766// ── Batch assertions ──────────────────────────────────────────────────────────
767
768/// A failed batch-level compaction assertion.
769#[derive(Debug, Clone, Serialize)]
770pub struct BatchViolation {
771    /// Short label for the assertion that failed.
772    pub assertion: String,
773    /// Human-readable explanation.
774    pub detail: String,
775}
776
777/// Batch-level compaction assertions for typed-page enforcement.
778///
779/// Unlike per-page [`PageInvariant`] which checks one page against its compacted form,
780/// batch assertions verify aggregate properties of the entire summary against the set
781/// of classified pages that were sent to the LLM.
782///
783/// Violations are observational — they never block compaction. They are logged and
784/// emitted to the audit trail.
785///
786/// # Examples
787///
788/// ```
789/// use zeph_context::typed_page::BatchAssertions;
790///
791/// let assertions = BatchAssertions {
792///     tool_names_in_batch: vec!["shell".to_string()],
793///     has_memory_excerpt: false,
794///     excerpt_labels: vec![],
795/// };
796/// // Summary that mentions the tool — all assertions pass.
797/// let violations = assertions.check("shell ran and exited 0");
798/// assert!(violations.is_empty());
799/// ```
800#[derive(Debug, Clone, Default)]
801pub struct BatchAssertions {
802    /// Tool names collected from `ToolOutput` pages in the batch.
803    pub tool_names_in_batch: Vec<String>,
804    /// Whether any `MemoryExcerpt` pages were in the batch.
805    pub has_memory_excerpt: bool,
806    /// Source labels from `MemoryExcerpt` pages.
807    pub excerpt_labels: Vec<String>,
808}
809
810impl BatchAssertions {
811    /// Check the summary against batch-level assertions.
812    ///
813    /// Returns a list of assertion failures (empty = all pass). Failures are never fatal.
814    #[must_use]
815    pub fn check(&self, summary: &str) -> Vec<BatchViolation> {
816        let mut violations = Vec::new();
817
818        // At least one tool name from the batch must appear in the summary.
819        if !self.tool_names_in_batch.is_empty() {
820            let any_tool_mentioned = self
821                .tool_names_in_batch
822                .iter()
823                .any(|name| !name.is_empty() && summary.contains(name.as_str()));
824            if !any_tool_mentioned {
825                violations.push(BatchViolation {
826                    assertion: "tool_coverage".into(),
827                    detail: format!(
828                        "summary mentions none of the {} tool(s) in batch: {:?}",
829                        self.tool_names_in_batch.len(),
830                        self.tool_names_in_batch
831                    ),
832                });
833            }
834        }
835
836        // If memory excerpts were present, at least one source label should appear.
837        if self.has_memory_excerpt && !self.excerpt_labels.is_empty() {
838            let any_label_mentioned = self
839                .excerpt_labels
840                .iter()
841                .any(|label| !label.is_empty() && summary.contains(label.as_str()));
842            if !any_label_mentioned {
843                violations.push(BatchViolation {
844                    assertion: "excerpt_label_coverage".into(),
845                    detail: format!(
846                        "summary mentions none of the memory excerpt labels: {:?}",
847                        self.excerpt_labels
848                    ),
849                });
850            }
851        }
852
853        violations
854    }
855}
856
857// ── TypedPagesState ───────────────────────────────────────────────────────────
858
859/// Shared runtime state for typed-page compaction, created once at agent startup.
860///
861/// Bundles the invariant registry and optional audit sink so they can be shared
862/// via `Arc` across compaction calls without per-call allocation.
863pub struct TypedPagesState {
864    /// Invariant registry shared across all compaction calls.
865    pub registry: InvariantRegistry,
866    /// Optional JSONL audit sink. `None` when audit is disabled.
867    pub audit_sink: Option<CompactionAuditSink>,
868    /// Whether enforcement is `Active` (pointer-replace `SystemContext` + batch assertions).
869    /// `false` = `Observe` mode (classify and audit only, no behavioral change).
870    pub is_active: bool,
871}
872
873// ── Audit command ─────────────────────────────────────────────────────────────
874
875/// Internal command sent through the audit sink channel.
876enum AuditCommand {
877    /// Write a compaction record.
878    Record(CompactedPageRecord),
879    /// Flush all preceding records and signal completion via the oneshot.
880    Flush(tokio::sync::oneshot::Sender<()>),
881}
882
883// ── Audit sink ────────────────────────────────────────────────────────────────
884
885/// Async bounded-mpsc audit sink for compaction records.
886///
887/// The sink serialises [`CompactedPageRecord`] values to a JSONL file via a
888/// background writer task, mirroring the `zeph-tools` audit pattern. Dropped
889/// records (when the channel is full) are counted and logged.
890///
891/// # Invariant
892///
893/// [`CompactionAuditSink::flush`] sends a rendezvous sentinel through the channel
894/// and awaits the writer task's confirmation with a 100 ms timeout. Records accepted
895/// into the channel before `flush` is called are guaranteed to be written before the
896/// flush responder fires.
897///
898/// # Examples
899///
900/// ```no_run
901/// use zeph_context::typed_page::CompactionAuditSink;
902/// use std::path::Path;
903///
904/// # async fn example() {
905/// let sink = CompactionAuditSink::open(Path::new(".local/audit/compaction.jsonl"), 256)
906///     .await
907///     .unwrap();
908/// # }
909/// ```
910#[derive(Debug, Clone)]
911pub struct CompactionAuditSink {
912    tx: tokio::sync::mpsc::Sender<AuditCommand>,
913    drop_counter: Arc<std::sync::atomic::AtomicU64>,
914}
915
916impl CompactionAuditSink {
917    /// Open a new audit sink writing to `path`.
918    ///
919    /// `capacity` is the bounded channel depth; records dropped when full are counted
920    /// in the internal drop counter and logged at WARN.
921    ///
922    /// # Errors
923    ///
924    /// Returns an error when `path` cannot be opened for appending.
925    pub async fn open(path: &std::path::Path, capacity: usize) -> Result<Self, std::io::Error> {
926        use tokio::io::AsyncWriteExt as _;
927
928        if let Some(parent) = path.parent() {
929            tokio::fs::create_dir_all(parent).await?;
930        }
931        let file = tokio::fs::OpenOptions::new()
932            .create(true)
933            .append(true)
934            .open(path)
935            .await?;
936
937        let (tx, mut rx) = tokio::sync::mpsc::channel::<AuditCommand>(capacity.max(1));
938        let drop_counter = Arc::new(std::sync::atomic::AtomicU64::new(0));
939        let drop_counter_bg = drop_counter.clone();
940
941        tokio::spawn(async move {
942            let mut writer = tokio::io::BufWriter::new(file);
943            while let Some(cmd) = rx.recv().await {
944                match cmd {
945                    AuditCommand::Record(record) => match serde_json::to_string(&record) {
946                        Ok(mut line) => {
947                            line.push('\n');
948                            if let Err(e) = writer.write_all(line.as_bytes()).await {
949                                tracing::error!("compaction audit write failed: {e:#}");
950                            }
951                        }
952                        Err(e) => {
953                            tracing::error!("compaction audit serialization failed: {e:#}");
954                        }
955                    },
956                    AuditCommand::Flush(responder) => {
957                        let _ = writer.flush().await;
958                        let _ = responder.send(());
959                    }
960                }
961            }
962            // Flush remaining bytes when channel closes.
963            let _ = writer.flush().await;
964
965            let dropped = drop_counter_bg.load(std::sync::atomic::Ordering::Relaxed);
966            if dropped > 0 {
967                tracing::warn!(dropped, "compaction audit sink closed with dropped records");
968            }
969        });
970
971        Ok(Self { tx, drop_counter })
972    }
973
974    /// Send a record to the audit sink.
975    ///
976    /// If the channel is full the record is dropped and the drop counter is incremented.
977    pub fn send(&self, record: CompactedPageRecord) {
978        match self.tx.try_send(AuditCommand::Record(record)) {
979            Ok(()) => {}
980            Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
981                let prev = self
982                    .drop_counter
983                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
984                tracing::warn!(
985                    dropped_total = prev + 1,
986                    "compaction audit sink full — record dropped (best-effort audit)"
987                );
988            }
989            Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
990                tracing::error!("compaction audit sink closed unexpectedly");
991            }
992        }
993    }
994
995    /// Flush all pending records with bounded 100 ms timeout.
996    ///
997    /// Sends a `Flush` sentinel through the same channel as records, so ordering is
998    /// preserved — the writer task responds only after all preceding records are written.
999    /// If the writer task does not respond within 100 ms, the flush times out silently.
1000    pub async fn flush(&self) {
1001        let (tx, rx) = tokio::sync::oneshot::channel::<()>();
1002        if self.tx.send(AuditCommand::Flush(tx)).await.is_ok() {
1003            let _ = tokio::time::timeout(Duration::from_millis(100), rx).await;
1004        }
1005    }
1006
1007    /// Number of records dropped due to a full channel.
1008    #[must_use]
1009    pub fn dropped_count(&self) -> u64 {
1010        self.drop_counter.load(std::sync::atomic::Ordering::Relaxed)
1011    }
1012}
1013
1014// ── Tests ─────────────────────────────────────────────────────────────────────
1015
1016#[cfg(test)]
1017mod tests {
1018    use super::*;
1019
1020    // ── PageId ────────────────────────────────────────────────────────────────
1021
1022    #[test]
1023    fn page_id_same_input_same_output() {
1024        let a = PageId::compute(PageType::ToolOutput, "tool:shell", b"exit_code: 0");
1025        let b = PageId::compute(PageType::ToolOutput, "tool:shell", b"exit_code: 0");
1026        assert_eq!(a, b);
1027    }
1028
1029    #[test]
1030    fn page_id_different_type_different_id() {
1031        let a = PageId::compute(PageType::ToolOutput, "tool:shell", b"body");
1032        let b = PageId::compute(PageType::ConversationTurn, "tool:shell", b"body");
1033        assert_ne!(a, b);
1034    }
1035
1036    #[test]
1037    fn page_id_starts_with_blake3_prefix() {
1038        let id = PageId::compute(PageType::SystemContext, "system:persona", b"some content");
1039        assert!(id.0.starts_with("blake3:"));
1040    }
1041
1042    // ── classify ──────────────────────────────────────────────────────────────
1043
1044    #[test]
1045    fn classify_tool_output_prefix() {
1046        assert_eq!(
1047            classify("[tool_output] shell exit_code: 0"),
1048            PageType::ToolOutput
1049        );
1050        assert_eq!(classify("[tool:shell] result"), PageType::ToolOutput);
1051    }
1052
1053    #[test]
1054    fn classify_memory_prefixes() {
1055        assert_eq!(
1056            classify("[cross-session context]\nsome recall"),
1057            PageType::MemoryExcerpt
1058        );
1059        assert_eq!(
1060            classify("[semantic recall]\n- [user] hello"),
1061            PageType::MemoryExcerpt
1062        );
1063        assert_eq!(classify("[known facts]\n- fact"), PageType::MemoryExcerpt);
1064        assert_eq!(
1065            classify("[conversation summaries]\n- 1-10: summary"),
1066            PageType::MemoryExcerpt
1067        );
1068    }
1069
1070    #[test]
1071    fn classify_system_prefixes() {
1072        assert_eq!(classify("[Persona context]\nfact"), PageType::SystemContext);
1073        assert_eq!(classify("[system prompt]"), PageType::SystemContext);
1074    }
1075
1076    #[test]
1077    fn classify_fallback_is_conversation_turn() {
1078        assert_eq!(classify("Hello, world!"), PageType::ConversationTurn);
1079        assert_eq!(classify(""), PageType::ConversationTurn);
1080    }
1081
1082    // ── detect_schema_hint ────────────────────────────────────────────────────
1083
1084    #[test]
1085    fn detect_schema_hint_json() {
1086        assert_eq!(
1087            detect_schema_hint(r#"{"key": "val"}"#, false),
1088            SchemaHint::Json
1089        );
1090        assert_eq!(detect_schema_hint("[1,2,3]", false), SchemaHint::Json);
1091    }
1092
1093    #[test]
1094    fn detect_schema_hint_diff() {
1095        assert_eq!(detect_schema_hint("--- a\n+++ b", false), SchemaHint::Diff);
1096    }
1097
1098    #[test]
1099    fn detect_schema_hint_binary() {
1100        assert_eq!(detect_schema_hint("anything", true), SchemaHint::Binary);
1101    }
1102
1103    #[test]
1104    fn detect_schema_hint_text_fallback() {
1105        assert_eq!(detect_schema_hint("plain text", false), SchemaHint::Text);
1106    }
1107
1108    // ── ToolOutputInvariant ───────────────────────────────────────────────────
1109
1110    #[test]
1111    fn tool_output_invariant_passes_when_fields_present() {
1112        let inv = ToolOutputInvariant;
1113        let page = TypedPage::new(
1114            PageType::ToolOutput,
1115            PageOrigin::ToolPair {
1116                tool_name: "shell".into(),
1117            },
1118            100,
1119            Arc::from("[tool_output] shell exit_code: 0\nsome output"),
1120            Some(SchemaHint::Text),
1121        );
1122        let compacted = CompactedPage {
1123            body: Arc::from("shell exit_status: 0\nkey: value"),
1124            tokens: 10,
1125        };
1126        assert!(inv.verify(&page, &compacted).is_ok());
1127    }
1128
1129    #[test]
1130    fn tool_output_invariant_fails_missing_tool_name() {
1131        let inv = ToolOutputInvariant;
1132        let page = TypedPage::new(
1133            PageType::ToolOutput,
1134            PageOrigin::ToolPair {
1135                tool_name: "my_tool".into(),
1136            },
1137            100,
1138            Arc::from("[tool_output] my_tool exit_code: 0"),
1139            Some(SchemaHint::Text),
1140        );
1141        let compacted = CompactedPage {
1142            body: Arc::from("exit_status: 0"),
1143            tokens: 5,
1144        };
1145        let result = inv.verify(&page, &compacted);
1146        assert!(result.is_err());
1147        let violations = result.unwrap_err();
1148        assert!(violations.iter().any(|v| v.missing_field == "tool_name"));
1149    }
1150
1151    #[test]
1152    fn tool_output_invariant_passes_for_binary() {
1153        let inv = ToolOutputInvariant;
1154        let page = TypedPage::new(
1155            PageType::ToolOutput,
1156            PageOrigin::ToolPair {
1157                tool_name: "binary_tool".into(),
1158            },
1159            100,
1160            Arc::from("<binary:1024 bytes>"),
1161            Some(SchemaHint::Binary),
1162        );
1163        let compacted = CompactedPage {
1164            body: Arc::from("<binary:1024 bytes> (archived)"),
1165            tokens: 5,
1166        };
1167        assert!(inv.verify(&page, &compacted).is_ok());
1168    }
1169
1170    // ── SystemContextInvariant ────────────────────────────────────────────────
1171
1172    #[test]
1173    fn system_context_invariant_passes_with_pointer() {
1174        let inv = SystemContextInvariant;
1175        let page = TypedPage::new(
1176            PageType::SystemContext,
1177            PageOrigin::System {
1178                key: "persona".into(),
1179            },
1180            200,
1181            Arc::from("[Persona context]\nsome persona info"),
1182            None,
1183        );
1184        let compacted = CompactedPage {
1185            body: Arc::from("[system-ptr:blake3:abcdef123456]"),
1186            tokens: 3,
1187        };
1188        assert!(inv.verify(&page, &compacted).is_ok());
1189    }
1190
1191    #[test]
1192    fn system_context_invariant_fails_without_pointer() {
1193        let inv = SystemContextInvariant;
1194        let page = TypedPage::new(
1195            PageType::SystemContext,
1196            PageOrigin::System {
1197                key: "persona".into(),
1198            },
1199            200,
1200            Arc::from("[Persona context]\nsome persona info"),
1201            None,
1202        );
1203        let compacted = CompactedPage {
1204            body: Arc::from("This is a paraphrase of persona info"),
1205            tokens: 10,
1206        };
1207        let result = inv.verify(&page, &compacted);
1208        assert!(result.is_err());
1209        let violations = result.unwrap_err();
1210        assert!(violations.iter().any(|v| v.missing_field == "pointer"));
1211    }
1212
1213    // ── InvariantRegistry ─────────────────────────────────────────────────────
1214
1215    #[test]
1216    fn registry_covers_all_page_types() {
1217        let reg = InvariantRegistry::default();
1218        for pt in [
1219            PageType::ToolOutput,
1220            PageType::ConversationTurn,
1221            PageType::MemoryExcerpt,
1222            PageType::SystemContext,
1223        ] {
1224            assert!(reg.get(pt).is_some(), "missing invariant for {pt:?}");
1225        }
1226    }
1227
1228    #[test]
1229    fn registry_returns_correct_page_type() {
1230        let reg = InvariantRegistry::default();
1231        assert_eq!(
1232            reg.get(PageType::ToolOutput).unwrap().page_type(),
1233            PageType::ToolOutput
1234        );
1235        assert_eq!(
1236            reg.get(PageType::SystemContext).unwrap().page_type(),
1237            PageType::SystemContext
1238        );
1239    }
1240
1241    // ── InvariantRegistry::enforce ────────────────────────────────────────────
1242
1243    #[test]
1244    fn enforce_ok_for_valid_system_pointer() {
1245        let reg = InvariantRegistry::default();
1246        let page = TypedPage::new(
1247            PageType::SystemContext,
1248            PageOrigin::System {
1249                key: "persona".into(),
1250            },
1251            50,
1252            Arc::from("[Persona context]\nrules"),
1253            None,
1254        );
1255        let compacted = CompactedPage {
1256            body: Arc::from("[system-ptr:blake3:aabbccdd11223344]"),
1257            tokens: 3,
1258        };
1259        assert!(reg.enforce(&page, &compacted).is_ok());
1260    }
1261
1262    #[test]
1263    fn enforce_err_for_paraphrased_system_context() {
1264        let reg = InvariantRegistry::default();
1265        let page = TypedPage::new(
1266            PageType::SystemContext,
1267            PageOrigin::System {
1268                key: "persona".into(),
1269            },
1270            50,
1271            Arc::from("[Persona context]\nrules"),
1272            None,
1273        );
1274        let compacted = CompactedPage {
1275            body: Arc::from("The persona says to be helpful."),
1276            tokens: 7,
1277        };
1278        let result = reg.enforce(&page, &compacted);
1279        assert!(result.is_err());
1280        assert!(
1281            result
1282                .unwrap_err()
1283                .iter()
1284                .any(|v| v.missing_field == "pointer")
1285        );
1286    }
1287
1288    #[test]
1289    fn enforce_ok_for_conversation_turn_with_role() {
1290        let reg = InvariantRegistry::default();
1291        let page = TypedPage::new(
1292            PageType::ConversationTurn,
1293            PageOrigin::Turn {
1294                message_id: "42".into(),
1295            },
1296            30,
1297            Arc::from("Hello from user"),
1298            None,
1299        );
1300        let compacted = CompactedPage {
1301            body: Arc::from("user asked about Rust"),
1302            tokens: 5,
1303        };
1304        assert!(reg.enforce(&page, &compacted).is_ok());
1305    }
1306
1307    // ── MemoryExcerptInvariant ────────────────────────────────────────────────
1308
1309    #[test]
1310    fn memory_excerpt_invariant_passes_when_label_present() {
1311        let inv = MemoryExcerptInvariant;
1312        let label = "semantic_recall";
1313        let page = TypedPage::new(
1314            PageType::MemoryExcerpt,
1315            PageOrigin::Excerpt {
1316                source_label: label.into(),
1317            },
1318            80,
1319            Arc::from("[semantic recall]\n- [user] hello"),
1320            None,
1321        );
1322        let compacted = CompactedPage {
1323            body: Arc::from(format!("Summary from {label}: user greeted")),
1324            tokens: 6,
1325        };
1326        assert!(inv.verify(&page, &compacted).is_ok());
1327    }
1328
1329    #[test]
1330    fn memory_excerpt_invariant_fails_when_label_missing() {
1331        let inv = MemoryExcerptInvariant;
1332        let page = TypedPage::new(
1333            PageType::MemoryExcerpt,
1334            PageOrigin::Excerpt {
1335                source_label: "graph_facts".into(),
1336            },
1337            80,
1338            Arc::from("[known facts]\n- Alice works at Acme"),
1339            None,
1340        );
1341        let compacted = CompactedPage {
1342            body: Arc::from("Alice is employed somewhere"),
1343            tokens: 5,
1344        };
1345        let result = inv.verify(&page, &compacted);
1346        assert!(result.is_err());
1347        assert!(
1348            result
1349                .unwrap_err()
1350                .iter()
1351                .any(|v| v.missing_field == "source_label")
1352        );
1353    }
1354
1355    #[test]
1356    fn memory_excerpt_invariant_passes_for_non_excerpt_origin() {
1357        let inv = MemoryExcerptInvariant;
1358        let page = TypedPage::new(
1359            PageType::MemoryExcerpt,
1360            PageOrigin::System {
1361                key: "digests".into(),
1362            },
1363            40,
1364            Arc::from("[system]"),
1365            None,
1366        );
1367        let compacted = CompactedPage {
1368            body: Arc::from("anything"),
1369            tokens: 1,
1370        };
1371        assert!(inv.verify(&page, &compacted).is_ok());
1372    }
1373
1374    // ── ConversationTurnInvariant ─────────────────────────────────────────────
1375
1376    #[test]
1377    fn conversation_turn_invariant_passes_with_role_word() {
1378        let inv = ConversationTurnInvariant;
1379        let page = TypedPage::new(
1380            PageType::ConversationTurn,
1381            PageOrigin::Turn {
1382                message_id: "1".into(),
1383            },
1384            20,
1385            Arc::from("Hello world"),
1386            None,
1387        );
1388        for body in &["user: hi", "assistant replied", "system note"] {
1389            let compacted = CompactedPage {
1390                body: Arc::from(*body),
1391                tokens: 2,
1392            };
1393            assert!(inv.verify(&page, &compacted).is_ok(), "body={body}");
1394        }
1395    }
1396
1397    #[test]
1398    fn conversation_turn_invariant_fails_without_role_word() {
1399        let inv = ConversationTurnInvariant;
1400        let page = TypedPage::new(
1401            PageType::ConversationTurn,
1402            PageOrigin::Turn {
1403                message_id: "2".into(),
1404            },
1405            20,
1406            Arc::from("some turn content"),
1407            None,
1408        );
1409        let compacted = CompactedPage {
1410            body: Arc::from("content was summarized"),
1411            tokens: 3,
1412        };
1413        let result = inv.verify(&page, &compacted);
1414        assert!(result.is_err());
1415        assert!(
1416            result
1417                .unwrap_err()
1418                .iter()
1419                .any(|v| v.missing_field == "role")
1420        );
1421    }
1422
1423    // ── CompactionAuditSink ───────────────────────────────────────────────────
1424
1425    #[tokio::test]
1426    async fn audit_sink_jsonl_roundtrip() {
1427        let dir = tempfile::tempdir().unwrap();
1428        let path = dir.path().join("audit.jsonl");
1429
1430        let sink = CompactionAuditSink::open(&path, 64).await.unwrap();
1431        let record = CompactedPageRecord {
1432            ts: "2026-04-19T00:00:00Z".into(),
1433            turn_id: "1".into(),
1434            page_id: "blake3:aabbccdd".into(),
1435            page_type: PageType::ToolOutput,
1436            origin: PageOrigin::ToolPair {
1437                tool_name: "shell".into(),
1438            },
1439            original_tokens: 100,
1440            compacted_tokens: 20,
1441            fidelity_level: "structured_summary_v1".into(),
1442            invariant_version: 1,
1443            provider_name: "test".into(),
1444            violations: vec![],
1445            classification_fallback: false,
1446        };
1447        sink.send(record);
1448
1449        // Drop the sink to close the channel and let the writer task flush.
1450        drop(sink);
1451        // Give the writer task time to finish.
1452        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1453
1454        let contents = std::fs::read_to_string(&path).unwrap();
1455        assert!(!contents.is_empty(), "audit file should not be empty");
1456        let parsed: serde_json::Value = serde_json::from_str(contents.trim()).unwrap();
1457        assert_eq!(parsed["page_type"], "tool_output");
1458        assert_eq!(parsed["turn_id"], "1");
1459        assert_eq!(parsed["provider_name"], "test");
1460    }
1461
1462    #[tokio::test]
1463    async fn audit_sink_drop_counter_increments_when_full() {
1464        let dir = tempfile::tempdir().unwrap();
1465        let path = dir.path().join("audit_full.jsonl");
1466
1467        // Capacity 1: first send fills the channel, subsequent sends are dropped.
1468        let sink = CompactionAuditSink::open(&path, 1).await.unwrap();
1469
1470        let make_record = || CompactedPageRecord {
1471            ts: "2026-04-19T00:00:00Z".into(),
1472            turn_id: "x".into(),
1473            page_id: "blake3:00".into(),
1474            page_type: PageType::ConversationTurn,
1475            origin: PageOrigin::Turn {
1476                message_id: "0".into(),
1477            },
1478            original_tokens: 10,
1479            compacted_tokens: 5,
1480            fidelity_level: "semantic_summary_v1".into(),
1481            invariant_version: 1,
1482            provider_name: "test".into(),
1483            violations: vec![],
1484            classification_fallback: false,
1485        };
1486
1487        // Send enough records to guarantee overflow.
1488        for _ in 0..10 {
1489            sink.send(make_record());
1490        }
1491
1492        assert!(
1493            sink.dropped_count() > 0,
1494            "expected at least one dropped record"
1495        );
1496    }
1497
1498    #[tokio::test]
1499    async fn audit_sink_flush_does_not_panic() {
1500        let dir = tempfile::tempdir().unwrap();
1501        let path = dir.path().join("audit_flush.jsonl");
1502        let sink = CompactionAuditSink::open(&path, 16).await.unwrap();
1503        // flush on an empty sink must not panic or deadlock.
1504        sink.flush().await;
1505    }
1506
1507    // ── classify_with_role ────────────────────────────────────────────────────
1508
1509    #[test]
1510    fn classify_with_role_system_flag_overrides_fallback() {
1511        assert_eq!(
1512            classify_with_role("You are a helpful assistant.", true),
1513            PageType::SystemContext
1514        );
1515    }
1516
1517    #[test]
1518    fn classify_with_role_prefix_wins_over_system_flag() {
1519        assert_eq!(
1520            classify_with_role("[tool_output] exit_code: 0", false),
1521            PageType::ToolOutput
1522        );
1523    }
1524
1525    #[test]
1526    fn classify_with_role_false_still_falls_back_to_conversation_turn() {
1527        assert_eq!(
1528            classify_with_role("random prose without markers", false),
1529            PageType::ConversationTurn
1530        );
1531    }
1532
1533    // ── check_json_structural_key (via ToolOutputInvariant) ───────────────────
1534
1535    #[test]
1536    fn tool_output_json_structural_check_passes_when_key_preserved() {
1537        let inv = ToolOutputInvariant;
1538        let original_body = r#"{"exit_code": 0, "stdout": "ok"}"#;
1539        let page = TypedPage::new(
1540            PageType::ToolOutput,
1541            PageOrigin::ToolPair {
1542                tool_name: "shell".into(),
1543            },
1544            50,
1545            Arc::from(original_body),
1546            Some(SchemaHint::Json),
1547        );
1548        // Compacted body references "exit_code" and "shell".
1549        let compacted = CompactedPage {
1550            body: Arc::from("shell exit_code: 0, stdout was ok"),
1551            tokens: 8,
1552        };
1553        assert!(inv.verify(&page, &compacted).is_ok());
1554    }
1555
1556    #[test]
1557    fn tool_output_json_structural_check_fails_when_no_key_preserved() {
1558        let inv = ToolOutputInvariant;
1559        let original_body = r#"{"some_field": "value", "other_field": 42}"#;
1560        let page = TypedPage::new(
1561            PageType::ToolOutput,
1562            PageOrigin::ToolPair {
1563                tool_name: "my_tool".into(),
1564            },
1565            50,
1566            Arc::from(original_body),
1567            Some(SchemaHint::Json),
1568        );
1569        // Compacted body references tool name and status but none of the JSON keys.
1570        let compacted = CompactedPage {
1571            body: Arc::from("my_tool exit_status: 0 completed successfully"),
1572            tokens: 7,
1573        };
1574        let result = inv.verify(&page, &compacted);
1575        assert!(result.is_err());
1576        let violations = result.unwrap_err();
1577        assert!(
1578            violations
1579                .iter()
1580                .any(|v| v.missing_field == "structural_key")
1581        );
1582    }
1583
1584    // ── Regression: F1 — capacity=0 must not panic ────────────────────────────
1585
1586    #[tokio::test]
1587    async fn audit_sink_capacity_zero_does_not_panic() {
1588        let dir = tempfile::tempdir().unwrap();
1589        let path = dir.path().join("cap0.jsonl");
1590        // capacity=0 used to panic in tokio::sync::mpsc::channel(0); must clamp to 1.
1591        let sink = CompactionAuditSink::open(&path, 0).await.unwrap();
1592        sink.flush().await;
1593    }
1594
1595    // ── Regression: F3 — non-ASCII body must not panic on prefix slice ────────
1596
1597    #[test]
1598    fn classify_with_role_non_ascii_body_does_not_panic() {
1599        // CJK and emoji span multiple bytes; a naive &body[..80] would panic at a
1600        // mid-character byte boundary. classify_with_role must not panic for any input.
1601        let cjk = "你好世界".repeat(20); // 80+ bytes, 4 bytes each
1602        let emoji = "🦀".repeat(30); // 120+ bytes, 4 bytes each
1603        let mixed = "abc🦀中文".repeat(15);
1604
1605        // None of these must panic:
1606        let _ = classify_with_role(&cjk, false);
1607        let _ = classify_with_role(&emoji, false);
1608        let _ = classify_with_role(&mixed, false);
1609    }
1610}