Skip to main content

zeph_context/
typed_page.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Typed page classification and minimum-fidelity invariants for context compaction.
5//!
6//! Every context segment entering the assembler is tagged with a [`PageType`] and
7//! wrapped in a [`TypedPage`]. The [`PageInvariant`] trait declares the fidelity
8//! contract enforced at every compaction boundary.
9//!
10//! Classification is deterministic and side-effect free — no I/O, no LLM calls.
11//!
12//! # Architecture
13//!
14//! This module lives in `zeph-context` to keep classification logic co-located with
15//! the assembler. No dependency on `zeph-memory` is introduced here.
16//!
17//! # Feature flag
18//!
19//! All typed-page functionality is gated behind the
20//! `[memory.compaction.typed_pages] enabled = true` config key. When disabled the
21//! assembler falls back to the legacy untyped path without behaviour change.
22
23use std::sync::Arc;
24use std::time::Duration;
25
26use serde::{Deserialize, Serialize};
27
28// ── PageType ──────────────────────────────────────────────────────────────────
29
30/// Classification of a context segment for compaction purposes.
31///
32/// The variant determines which [`PageInvariant`] is enforced and what shape the
33/// compacted summary must have.
34///
35/// # Invariant
36///
37/// Every [`TypedPage`] carries exactly one `PageType`. Unclassifiable segments
38/// default to [`PageType::ConversationTurn`] (see [`classify`]).
39#[non_exhaustive]
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
41#[serde(rename_all = "snake_case")]
42pub enum PageType {
43    /// A tool request/response pair sourced from memory or the current turn.
44    ToolOutput,
45    /// A user or assistant message that does not carry a tool role.
46    ConversationTurn,
47    /// Cross-session context, past summaries, or graph-fact recall injections.
48    MemoryExcerpt,
49    /// Session digest, persona, skill instructions, or compression guidelines.
50    SystemContext,
51}
52
53impl std::fmt::Display for PageType {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        match self {
56            Self::ToolOutput => f.write_str("tool_output"),
57            Self::ConversationTurn => f.write_str("conversation_turn"),
58            Self::MemoryExcerpt => f.write_str("memory_excerpt"),
59            Self::SystemContext => f.write_str("system_context"),
60        }
61    }
62}
63
64// ── PageOrigin ────────────────────────────────────────────────────────────────
65
66/// Provenance of a [`TypedPage`], serialised into audit records.
67#[derive(Debug, Clone, Serialize, Deserialize)]
68#[serde(tag = "kind", rename_all = "snake_case")]
69#[non_exhaustive]
70pub enum PageOrigin {
71    /// Tool request/response pair.
72    ToolPair {
73        /// Name of the tool that produced this output.
74        tool_name: String,
75    },
76    /// User or assistant conversation turn.
77    Turn {
78        /// Opaque message identifier (numeric message id as string).
79        message_id: String,
80    },
81    /// Injected from memory (cross-session, summary, graph-facts, etc.).
82    Excerpt {
83        /// Human-readable label identifying the memory source.
84        source_label: String,
85    },
86    /// Session-level system context (persona, skills, digest).
87    System {
88        /// Logical key for this system context block (e.g. `"persona"`, `"skills"`).
89        key: String,
90    },
91}
92
93// ── SchemaHint ────────────────────────────────────────────────────────────────
94
95/// Body format hint for [`PageType::ToolOutput`] pages.
96///
97/// Used by the invariant to select the correct structured-summary prompt.
98#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
99#[serde(rename_all = "snake_case")]
100#[non_exhaustive]
101pub enum SchemaHint {
102    /// Body is valid JSON (object or array).
103    Json,
104    /// Body is UTF-8 text (log lines, prose, etc.).
105    Text,
106    /// Body is a unified diff.
107    Diff,
108    /// Body is a tab- or comma-separated table.
109    Table,
110    /// Body is non-UTF-8 binary data.
111    Binary,
112}
113
114// ── PageId ────────────────────────────────────────────────────────────────────
115
116/// Stable content-addressed identifier for a [`TypedPage`].
117///
118/// Computed as BLAKE3 over `page_type_tag || origin_tag || body_bytes`, encoded
119/// as lowercase hex (first 16 bytes = 32 hex chars). The same input always
120/// produces the same `PageId`, enabling deduplication across turns.
121///
122/// # Semantics
123///
124/// `PageId` is a **content hash**: identical source bytes (same page type, same
125/// origin key, same body) always produce the same id. This means that the same
126/// tool output appearing in two different turns produces the same `PageId`.
127/// Callers that need per-turn provenance must use `turn_id` from the audit record
128/// — `PageId` is for deduplication, not for uniqueness across turns.
129#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
130pub struct PageId(pub String);
131
132impl PageId {
133    /// Compute a [`PageId`] from the page type, origin key, and body bytes.
134    #[must_use]
135    pub fn compute(page_type: PageType, origin_key: &str, body: &[u8]) -> Self {
136        let mut hasher = blake3::Hasher::new();
137        hasher.update(page_type.to_string().as_bytes());
138        hasher.update(b"|");
139        hasher.update(origin_key.as_bytes());
140        hasher.update(b"|");
141        hasher.update(body);
142        let hash = hasher.finalize();
143        // Use first 16 bytes (128 bits) — sufficient for deduplication purposes.
144        let mut hex = String::with_capacity(32);
145        for b in &hash.as_bytes()[..16] {
146            use std::fmt::Write as _;
147            let _ = write!(hex, "{b:02x}");
148        }
149        Self(format!("blake3:{hex}"))
150    }
151}
152
153// ── TypedPage ─────────────────────────────────────────────────────────────────
154
155/// A classified context segment ready for invariant-aware compaction.
156///
157/// `TypedPage` is the unit of work passed to compaction boundaries. The
158/// [`PageId`] is content-stable: the same source bytes always produce the same
159/// id, enabling the compactor to skip already-compacted pages.
160#[derive(Debug, Clone)]
161pub struct TypedPage {
162    /// Stable content-addressed identifier.
163    pub page_id: PageId,
164    /// Classification determining which invariant applies.
165    pub page_type: PageType,
166    /// Provenance of this page (for audit records).
167    pub origin: PageOrigin,
168    /// Token count of the original body.
169    pub tokens: u32,
170    /// Body text shared across potential clones.
171    pub body: Arc<str>,
172    /// Body format hint (populated for `ToolOutput` only; `None` otherwise).
173    pub schema_hint: Option<SchemaHint>,
174}
175
176impl TypedPage {
177    /// Construct a new [`TypedPage`], computing its [`PageId`] from content.
178    #[must_use]
179    pub fn new(
180        page_type: PageType,
181        origin: PageOrigin,
182        tokens: u32,
183        body: Arc<str>,
184        schema_hint: Option<SchemaHint>,
185    ) -> Self {
186        let origin_key = origin_key_for(&origin);
187        let page_id = PageId::compute(page_type, &origin_key, body.as_bytes());
188        Self {
189            page_id,
190            page_type,
191            origin,
192            tokens,
193            body,
194            schema_hint,
195        }
196    }
197}
198
199fn origin_key_for(origin: &PageOrigin) -> String {
200    match origin {
201        PageOrigin::ToolPair { tool_name } => format!("tool:{tool_name}"),
202        PageOrigin::Turn { message_id } => format!("turn:{message_id}"),
203        PageOrigin::Excerpt { source_label } => format!("excerpt:{source_label}"),
204        PageOrigin::System { key } => format!("system:{key}"),
205    }
206}
207
208// ── FidelityContract ──────────────────────────────────────────────────────────
209
210/// The set of fields that must be present in a compacted page.
211///
212/// Returned by [`PageInvariant::minimum_fidelity`] and checked by
213/// [`PageInvariant::verify`] after summarization.
214#[derive(Debug, Clone)]
215pub struct FidelityContract {
216    /// Human-readable label for this contract version (e.g. `"structured_summary_v1"`).
217    pub fidelity_level: &'static str,
218    /// Schema version integer included in audit records.
219    pub invariant_version: u8,
220    /// Fields that must appear in the compacted body text.
221    pub required_fields: &'static [&'static str],
222}
223
224// ── FidelityViolation ─────────────────────────────────────────────────────────
225
226/// Describes why an invariant check failed after compaction.
227///
228/// A violation is a hard error: the compacted page is dropped and an audit
229/// record with `violations` is emitted.
230#[derive(Debug, Clone, Serialize)]
231pub struct FidelityViolation {
232    /// The field or property that was expected but missing.
233    pub missing_field: String,
234    /// Human-readable explanation of the violation.
235    pub detail: String,
236}
237
238// ── CompactedPage ─────────────────────────────────────────────────────────────
239
240/// The output of a compaction attempt, passed to [`PageInvariant::verify`].
241#[derive(Debug, Clone)]
242pub struct CompactedPage {
243    /// The summarized body text produced by the compaction provider.
244    pub body: Arc<str>,
245    /// Token count of the compacted body.
246    pub tokens: u32,
247}
248
249// ── PageInvariant trait ───────────────────────────────────────────────────────
250
251/// Minimum-fidelity contract for a single [`PageType`].
252///
253/// Implementors declare what a compacted page must contain ([`minimum_fidelity`])
254/// and verify that the actual output honours the contract ([`verify`]).
255///
256/// The trait is object-safe so implementations can be stored in a
257/// `HashMap<PageType, Box<dyn PageInvariant>>` registry.
258///
259/// # Contract
260///
261/// - [`verify`] MUST NOT perform I/O or call an LLM.
262/// - A failed [`verify`] means the compacted page is dropped — it is NEVER
263///   injected in degraded form.
264///
265/// [`minimum_fidelity`]: PageInvariant::minimum_fidelity
266/// [`verify`]: PageInvariant::verify
267pub trait PageInvariant: Send + Sync {
268    /// The page type this invariant governs.
269    fn page_type(&self) -> PageType;
270
271    /// Return the fidelity contract required for a given page.
272    fn minimum_fidelity(&self, page: &TypedPage) -> FidelityContract;
273
274    /// Verify that `compacted` satisfies the fidelity contract derived from `original`.
275    ///
276    /// # Errors
277    ///
278    /// Returns a non-empty [`Vec<FidelityViolation>`] when one or more required
279    /// fields are absent from the compacted body.
280    fn verify(
281        &self,
282        original: &TypedPage,
283        compacted: &CompactedPage,
284    ) -> Result<(), Vec<FidelityViolation>>;
285}
286
287// ── Per-type invariant implementations ───────────────────────────────────────
288
289/// Invariant for [`PageType::ToolOutput`] pages.
290///
291/// The compacted body must contain the tool name, an exit/status indicator,
292/// and at least one structural key from the original output.
293pub struct ToolOutputInvariant;
294
295impl PageInvariant for ToolOutputInvariant {
296    fn page_type(&self) -> PageType {
297        PageType::ToolOutput
298    }
299
300    fn minimum_fidelity(&self, _page: &TypedPage) -> FidelityContract {
301        FidelityContract {
302            fidelity_level: "structured_summary_v1",
303            invariant_version: 1,
304            required_fields: &["tool_name", "exit_status"],
305        }
306    }
307
308    fn verify(
309        &self,
310        original: &TypedPage,
311        compacted: &CompactedPage,
312    ) -> Result<(), Vec<FidelityViolation>> {
313        let body = compacted.body.as_ref();
314        // For binary pages the body marker is injected by the compactor, skip field checks.
315        if original.schema_hint == Some(SchemaHint::Binary) {
316            return Ok(());
317        }
318
319        let mut violations = Vec::new();
320
321        // The compacted body must reference the tool name.
322        let tool_name = match &original.origin {
323            PageOrigin::ToolPair { tool_name } => tool_name.as_str(),
324            _ => "",
325        };
326        if !tool_name.is_empty() && !body.contains(tool_name) {
327            violations.push(FidelityViolation {
328                missing_field: "tool_name".into(),
329                detail: format!("compacted body does not reference tool '{tool_name}'"),
330            });
331        }
332
333        // The compacted body must contain at least one exit / status indicator.
334        let has_status = body.contains("exit_status")
335            || body.contains("exit_code")
336            || body.contains("status:")
337            || body.contains("Status:")
338            || body.contains("exit:")
339            || body.contains("rc:");
340        if !has_status {
341            violations.push(FidelityViolation {
342                missing_field: "exit_status".into(),
343                detail: "compacted body does not contain an exit status indicator".into(),
344            });
345        }
346
347        // For JSON-schema tool outputs, verify that at least one top-level JSON
348        // field name from the original body is present in the compacted body
349        // (FR-003: structural keys must be preserved, not just exit-status markers).
350        if original.schema_hint == Some(SchemaHint::Json) {
351            let original_body = original.body.as_ref();
352            let preserved = check_json_structural_key(original_body, body);
353            if !preserved {
354                violations.push(FidelityViolation {
355                    missing_field: "structural_key".into(),
356                    detail: "compacted JSON tool output does not reference any top-level field \
357                             name from the original output"
358                        .into(),
359                });
360            }
361        }
362
363        if violations.is_empty() {
364            Ok(())
365        } else {
366            Err(violations)
367        }
368    }
369}
370
371/// Check that at least one top-level JSON key from `original` appears in `compacted`.
372///
373/// Parses `original` as a JSON object and returns `true` when any top-level key
374/// string is a substring of `compacted`. Returns `true` (no violation) when
375/// `original` is not a valid JSON object — the caller already checked schema hint.
376fn check_json_structural_key(original: &str, compacted: &str) -> bool {
377    let Ok(value) = serde_json::from_str::<serde_json::Value>(original) else {
378        return true;
379    };
380    let Some(obj) = value.as_object() else {
381        return true;
382    };
383    if obj.is_empty() {
384        return true;
385    }
386    obj.keys().any(|k| compacted.contains(k.as_str()))
387}
388
389/// Invariant for [`PageType::ConversationTurn`] pages.
390///
391/// The compacted body must preserve a role indicator and some meaningful content.
392pub struct ConversationTurnInvariant;
393
394impl PageInvariant for ConversationTurnInvariant {
395    fn page_type(&self) -> PageType {
396        PageType::ConversationTurn
397    }
398
399    fn minimum_fidelity(&self, _page: &TypedPage) -> FidelityContract {
400        FidelityContract {
401            fidelity_level: "semantic_summary_v1",
402            invariant_version: 1,
403            required_fields: &["role"],
404        }
405    }
406
407    fn verify(
408        &self,
409        _original: &TypedPage,
410        compacted: &CompactedPage,
411    ) -> Result<(), Vec<FidelityViolation>> {
412        let body = compacted.body.as_ref();
413        let has_role =
414            body.contains("user") || body.contains("assistant") || body.contains("system");
415        if !has_role {
416            return Err(vec![FidelityViolation {
417                missing_field: "role".into(),
418                detail: "compacted turn does not identify a speaker role".into(),
419            }]);
420        }
421        Ok(())
422    }
423}
424
425/// Invariant for [`PageType::MemoryExcerpt`] pages.
426///
427/// The compacted body must retain the source label and a message id reference.
428pub struct MemoryExcerptInvariant;
429
430impl PageInvariant for MemoryExcerptInvariant {
431    fn page_type(&self) -> PageType {
432        PageType::MemoryExcerpt
433    }
434
435    fn minimum_fidelity(&self, _page: &TypedPage) -> FidelityContract {
436        FidelityContract {
437            fidelity_level: "excerpt_summary_v1",
438            invariant_version: 1,
439            required_fields: &["source_label"],
440        }
441    }
442
443    fn verify(
444        &self,
445        original: &TypedPage,
446        compacted: &CompactedPage,
447    ) -> Result<(), Vec<FidelityViolation>> {
448        let source_label = match &original.origin {
449            PageOrigin::Excerpt { source_label } => source_label.as_str(),
450            _ => return Ok(()),
451        };
452        if !compacted.body.contains(source_label) {
453            return Err(vec![FidelityViolation {
454                missing_field: "source_label".into(),
455                detail: format!("compacted excerpt does not contain source label '{source_label}'"),
456            }]);
457        }
458        Ok(())
459    }
460}
461
462/// Invariant for [`PageType::SystemContext`] pages.
463///
464/// System context MUST NOT be paraphrased. Compaction replaces it with a
465/// pointer record; any body other than the pointer prefix is a violation.
466pub struct SystemContextInvariant;
467
468/// Pointer prefix that the compactor writes for `SystemContext` pages.
469pub const SYSTEM_POINTER_PREFIX: &str = "[system-ptr:";
470
471impl PageInvariant for SystemContextInvariant {
472    fn page_type(&self) -> PageType {
473        PageType::SystemContext
474    }
475
476    fn minimum_fidelity(&self, _page: &TypedPage) -> FidelityContract {
477        FidelityContract {
478            fidelity_level: "pointer_replace_v1",
479            invariant_version: 1,
480            required_fields: &["pointer"],
481        }
482    }
483
484    fn verify(
485        &self,
486        _original: &TypedPage,
487        compacted: &CompactedPage,
488    ) -> Result<(), Vec<FidelityViolation>> {
489        if !compacted.body.starts_with(SYSTEM_POINTER_PREFIX) {
490            return Err(vec![FidelityViolation {
491                missing_field: "pointer".into(),
492                detail: format!(
493                    "SystemContext page was not pointer-replaced \
494                     (body does not start with '{SYSTEM_POINTER_PREFIX}')"
495                ),
496            }]);
497        }
498        Ok(())
499    }
500}
501
502// ── InvariantRegistry ─────────────────────────────────────────────────────────
503
504/// Registry mapping each [`PageType`] to its [`PageInvariant`] implementation.
505///
506/// Built once and shared via `Arc` so tests can swap in a mock registry.
507///
508/// # Examples
509///
510/// ```
511/// use zeph_context::typed_page::{InvariantRegistry, PageType};
512///
513/// let reg = InvariantRegistry::default();
514/// let inv = reg.get(PageType::ToolOutput).unwrap();
515/// assert_eq!(inv.page_type(), PageType::ToolOutput);
516/// ```
517pub struct InvariantRegistry {
518    tool_output: Box<dyn PageInvariant>,
519    conversation_turn: Box<dyn PageInvariant>,
520    memory_excerpt: Box<dyn PageInvariant>,
521    system_context: Box<dyn PageInvariant>,
522}
523
524impl Default for InvariantRegistry {
525    fn default() -> Self {
526        Self {
527            tool_output: Box::new(ToolOutputInvariant),
528            conversation_turn: Box::new(ConversationTurnInvariant),
529            memory_excerpt: Box::new(MemoryExcerptInvariant),
530            system_context: Box::new(SystemContextInvariant),
531        }
532    }
533}
534
535impl InvariantRegistry {
536    /// Look up the invariant for a given [`PageType`].
537    ///
538    /// Always returns `Some` for the four built-in variants.
539    #[must_use]
540    pub fn get(&self, page_type: PageType) -> Option<&dyn PageInvariant> {
541        match page_type {
542            PageType::ToolOutput => Some(self.tool_output.as_ref()),
543            PageType::ConversationTurn => Some(self.conversation_turn.as_ref()),
544            PageType::MemoryExcerpt => Some(self.memory_excerpt.as_ref()),
545            PageType::SystemContext => Some(self.system_context.as_ref()),
546        }
547    }
548
549    /// Verify that `compacted` satisfies the invariant for `original` at a compaction boundary.
550    ///
551    /// This is the primary entry point for the compactor — it wraps `verify()` in a
552    /// `tracing::info_span!` per NFR-009 so every compaction boundary is observable.
553    ///
554    /// Returns `Ok(())` when the invariant is satisfied, or the violation list on failure.
555    ///
556    /// # Errors
557    ///
558    /// Propagates [`FidelityViolation`]s from the registered invariant implementation.
559    pub fn enforce(
560        &self,
561        original: &TypedPage,
562        compacted: &CompactedPage,
563    ) -> Result<(), Vec<FidelityViolation>> {
564        let _span = tracing::info_span!(
565            "context.compaction.typed_page",
566            page_type = %original.page_type,
567            page_id = %original.page_id.0,
568            original_tokens = original.tokens,
569            compacted_tokens = compacted.tokens,
570        )
571        .entered();
572
573        if let Some(inv) = self.get(original.page_type) {
574            inv.verify(original, compacted)
575        } else {
576            tracing::warn!(
577                page_type = %original.page_type,
578                "no invariant registered for page type — skipping verification"
579            );
580            Ok(())
581        }
582    }
583}
584
585// ── Classification helpers ────────────────────────────────────────────────────
586
587/// Classify a context segment by examining well-known prefix markers.
588///
589/// Classification is deterministic and performs no I/O. When the input does not
590/// match any known prefix the function defaults to [`PageType::ConversationTurn`]
591/// and logs at `WARN` level per FR-008.
592///
593/// The function emits a `context.compaction.typed_page.classify` span per NFR-009
594/// so every classification is observable in traces.
595///
596/// | Source marker | Assigned [`PageType`] |
597/// |---|---|
598/// | Starts with `[tool_output]` or `[tool:` | [`PageType::ToolOutput`] |
599/// | Starts with `[cross-session context]`, `[semantic recall]`, `[known facts]`, `[conversation summaries]` | [`PageType::MemoryExcerpt`] |
600/// | Starts with `[Persona context]`, `[Past experience]`, `[Memory summary]`, `[system` | [`PageType::SystemContext`] |
601/// | Everything else | [`PageType::ConversationTurn`] |
602///
603/// # Examples
604///
605/// ```
606/// use zeph_context::typed_page::{classify, PageType};
607///
608/// assert_eq!(classify("[tool_output] exit_code: 0"), PageType::ToolOutput);
609/// assert_eq!(classify("[cross-session context]\nsome recall"), PageType::MemoryExcerpt);
610/// assert_eq!(classify("[Persona context]\nfact"), PageType::SystemContext);
611/// assert_eq!(classify("Hello, world!"), PageType::ConversationTurn);
612/// ```
613#[must_use]
614pub fn classify(body: &str) -> PageType {
615    classify_with_role(body, false)
616}
617
618/// Classify a context segment, with an explicit `is_system_role` hint.
619///
620/// When `is_system_role` is `true` the segment is classified as
621/// [`PageType::SystemContext`] without prefix matching, preventing arbitrary
622/// system messages injected by the assembler from silently falling back to
623/// `ConversationTurn` (Key Invariant: "`SystemContext` pages are never paraphrased").
624///
625/// Use this variant when the caller has access to the message `Role`.
626///
627/// # Examples
628///
629/// ```
630/// use zeph_context::typed_page::{classify_with_role, PageType};
631///
632/// // A plain system message without a known prefix is still SystemContext.
633/// assert_eq!(classify_with_role("You are a helpful assistant.", true), PageType::SystemContext);
634/// // Role hint does not override ToolOutput prefix detection.
635/// assert_eq!(classify_with_role("[tool_output] exit_code: 0", false), PageType::ToolOutput);
636/// ```
637#[must_use]
638pub fn classify_with_role(body: &str, is_system_role: bool) -> PageType {
639    tracing::info_span!(
640        "context.compaction.typed_page.classify",
641        body_len = body.len()
642    )
643    .in_scope(|| classify_with_role_inner(body, is_system_role))
644}
645
646fn classify_with_role_inner(body: &str, is_system_role: bool) -> PageType {
647    // Use the same prefix constants as the assembler for consistency.
648    const TOOL_PREFIXES: &[&str] = &["[tool_output]", "[tool:", "[Tool output]"];
649    const MEMORY_PREFIXES: &[&str] = &[
650        "[cross-session context]",
651        "[semantic recall]",
652        "[known facts]",
653        "[conversation summaries]",
654        "[past corrections]",
655        "## Relevant documents",
656    ];
657    const SYSTEM_PREFIXES: &[&str] = &[
658        "[Persona context]",
659        "[Past experience]",
660        "[Memory summary]",
661        "[system",
662        "[skill",
663        "[persona",
664        "[digest",
665        "[compression",
666    ];
667
668    let trimmed = body.trim_start();
669
670    for prefix in TOOL_PREFIXES {
671        if trimmed.starts_with(prefix) {
672            return PageType::ToolOutput;
673        }
674    }
675    for prefix in MEMORY_PREFIXES {
676        if trimmed.starts_with(prefix) {
677            return PageType::MemoryExcerpt;
678        }
679    }
680    for prefix in SYSTEM_PREFIXES {
681        if trimmed.starts_with(prefix) {
682            return PageType::SystemContext;
683        }
684    }
685
686    // When the caller signals Role::System, classify as SystemContext even if
687    // the body does not start with a known prefix.  This prevents system
688    // context injected by the assembler (e.g. plain instructions, directives)
689    // from being eligible for paraphrase.
690    if is_system_role {
691        return PageType::SystemContext;
692    }
693
694    let mut prefix_end = body.len().min(80);
695    while !body.is_char_boundary(prefix_end) {
696        prefix_end -= 1;
697    }
698    tracing::warn!(
699        body_prefix = &body[..prefix_end],
700        "typed-page classification fallback to ConversationTurn"
701    );
702    PageType::ConversationTurn
703}
704
705/// Detect [`SchemaHint`] for a [`PageType::ToolOutput`] body.
706///
707/// Returns [`SchemaHint::Binary`] when the body is not valid UTF-8 (detected via
708/// presence of replacement characters) or when the caller passes `is_binary =
709/// true`. JSON detection is heuristic (starts with `{` or `[`).
710#[must_use]
711pub fn detect_schema_hint(body: &str, is_binary: bool) -> SchemaHint {
712    if is_binary || body.contains('\u{FFFD}') {
713        return SchemaHint::Binary;
714    }
715    let trimmed = body.trim_start();
716    if trimmed.starts_with('{') || trimmed.starts_with('[') {
717        return SchemaHint::Json;
718    }
719    if trimmed.starts_with("--- ")
720        || trimmed.starts_with("+++ ")
721        || trimmed.starts_with("diff --git")
722        || trimmed.starts_with("diff -")
723    {
724        return SchemaHint::Diff;
725    }
726    // Simple table heuristic: first line contains multiple tab or pipe separators.
727    let first_line = trimmed.lines().next().unwrap_or("");
728    if first_line.matches('\t').count() >= 2 || first_line.matches('|').count() >= 2 {
729        return SchemaHint::Table;
730    }
731    SchemaHint::Text
732}
733
734// ── Audit record ──────────────────────────────────────────────────────────────
735
736/// One JSONL audit record emitted per compacted page (FR-007).
737///
738/// Written to `[memory.compaction.typed_pages] audit_path` by the audit sink
739/// before the compacted context is handed to the LLM.
740#[derive(Debug, Serialize)]
741pub struct CompactedPageRecord {
742    /// ISO-8601 timestamp when the compaction occurred.
743    pub ts: String,
744    /// Opaque turn identifier (agent turn counter as string).
745    pub turn_id: String,
746    /// Stable content-addressed page identifier.
747    pub page_id: String,
748    /// Page classification.
749    pub page_type: PageType,
750    /// Serialised page origin.
751    pub origin: PageOrigin,
752    /// Token count of the original page.
753    pub original_tokens: u32,
754    /// Token count of the compacted page.
755    pub compacted_tokens: u32,
756    /// Fidelity level label from the invariant contract.
757    pub fidelity_level: String,
758    /// Schema version integer.
759    pub invariant_version: u8,
760    /// Provider name used for summarization.
761    pub provider_name: String,
762    /// Fidelity violations encountered (empty on success).
763    pub violations: Vec<FidelityViolation>,
764    /// `true` when classification fell back to `ConversationTurn`.
765    #[serde(default, skip_serializing_if = "std::ops::Not::not")]
766    pub classification_fallback: bool,
767}
768
769// ── Batch assertions ──────────────────────────────────────────────────────────
770
771/// A failed batch-level compaction assertion.
772#[derive(Debug, Clone, Serialize)]
773pub struct BatchViolation {
774    /// Short label for the assertion that failed.
775    pub assertion: String,
776    /// Human-readable explanation.
777    pub detail: String,
778}
779
780/// Batch-level compaction assertions for typed-page enforcement.
781///
782/// Unlike per-page [`PageInvariant`] which checks one page against its compacted form,
783/// batch assertions verify aggregate properties of the entire summary against the set
784/// of classified pages that were sent to the LLM.
785///
786/// Violations are observational — they never block compaction. They are logged and
787/// emitted to the audit trail.
788///
789/// # Examples
790///
791/// ```
792/// use zeph_context::typed_page::BatchAssertions;
793///
794/// let assertions = BatchAssertions {
795///     tool_names_in_batch: vec!["shell".to_string()],
796///     has_memory_excerpt: false,
797///     excerpt_labels: vec![],
798/// };
799/// // Summary that mentions the tool — all assertions pass.
800/// let violations = assertions.check("shell ran and exited 0");
801/// assert!(violations.is_empty());
802/// ```
803#[derive(Debug, Clone, Default)]
804pub struct BatchAssertions {
805    /// Tool names collected from `ToolOutput` pages in the batch.
806    pub tool_names_in_batch: Vec<String>,
807    /// Whether any `MemoryExcerpt` pages were in the batch.
808    pub has_memory_excerpt: bool,
809    /// Source labels from `MemoryExcerpt` pages.
810    pub excerpt_labels: Vec<String>,
811}
812
813impl BatchAssertions {
814    /// Check the summary against batch-level assertions.
815    ///
816    /// Returns a list of assertion failures (empty = all pass). Failures are never fatal.
817    #[must_use]
818    pub fn check(&self, summary: &str) -> Vec<BatchViolation> {
819        let mut violations = Vec::new();
820
821        // At least one tool name from the batch must appear in the summary.
822        if !self.tool_names_in_batch.is_empty() {
823            let any_tool_mentioned = self
824                .tool_names_in_batch
825                .iter()
826                .any(|name| !name.is_empty() && summary.contains(name.as_str()));
827            if !any_tool_mentioned {
828                violations.push(BatchViolation {
829                    assertion: "tool_coverage".into(),
830                    detail: format!(
831                        "summary mentions none of the {} tool(s) in batch: {:?}",
832                        self.tool_names_in_batch.len(),
833                        self.tool_names_in_batch
834                    ),
835                });
836            }
837        }
838
839        // If memory excerpts were present, at least one source label should appear.
840        if self.has_memory_excerpt && !self.excerpt_labels.is_empty() {
841            let any_label_mentioned = self
842                .excerpt_labels
843                .iter()
844                .any(|label| !label.is_empty() && summary.contains(label.as_str()));
845            if !any_label_mentioned {
846                violations.push(BatchViolation {
847                    assertion: "excerpt_label_coverage".into(),
848                    detail: format!(
849                        "summary mentions none of the memory excerpt labels: {:?}",
850                        self.excerpt_labels
851                    ),
852                });
853            }
854        }
855
856        violations
857    }
858}
859
860// ── TypedPagesState ───────────────────────────────────────────────────────────
861
862/// Shared runtime state for typed-page compaction, created once at agent startup.
863///
864/// Bundles the invariant registry and optional audit sink so they can be shared
865/// via `Arc` across compaction calls without per-call allocation.
866pub struct TypedPagesState {
867    /// Invariant registry shared across all compaction calls.
868    pub registry: InvariantRegistry,
869    /// Optional JSONL audit sink. `None` when audit is disabled.
870    pub audit_sink: Option<CompactionAuditSink>,
871    /// Whether enforcement is `Active` (pointer-replace `SystemContext` + batch assertions).
872    /// `false` = `Observe` mode (classify and audit only, no behavioral change).
873    pub is_active: bool,
874}
875
876// ── Audit command ─────────────────────────────────────────────────────────────
877
878/// Internal command sent through the audit sink channel.
879enum AuditCommand {
880    /// Write a compaction record.
881    Record(CompactedPageRecord),
882    /// Flush all preceding records and signal completion via the oneshot.
883    Flush(tokio::sync::oneshot::Sender<()>),
884}
885
886// ── Audit sink ────────────────────────────────────────────────────────────────
887
888/// Async bounded-mpsc audit sink for compaction records.
889///
890/// The sink serialises [`CompactedPageRecord`] values to a JSONL file via a
891/// background writer task, mirroring the `zeph-tools` audit pattern. Dropped
892/// records (when the channel is full) are counted and logged.
893///
894/// # Invariant
895///
896/// [`CompactionAuditSink::flush`] sends a rendezvous sentinel through the channel
897/// and awaits the writer task's confirmation with a 100 ms timeout. Records accepted
898/// into the channel before `flush` is called are guaranteed to be written before the
899/// flush responder fires.
900///
901/// # Examples
902///
903/// ```no_run
904/// use zeph_context::typed_page::CompactionAuditSink;
905/// use std::path::Path;
906///
907/// # async fn example() {
908/// let sink = CompactionAuditSink::open(Path::new(".local/audit/compaction.jsonl"), 256)
909///     .await
910///     .unwrap();
911/// # }
912/// ```
913#[derive(Debug, Clone)]
914pub struct CompactionAuditSink {
915    tx: tokio::sync::mpsc::Sender<AuditCommand>,
916    drop_counter: Arc<std::sync::atomic::AtomicU64>,
917}
918
919impl CompactionAuditSink {
920    /// Open a new audit sink writing to `path`.
921    ///
922    /// `capacity` is the bounded channel depth; records dropped when full are counted
923    /// in the internal drop counter and logged at WARN.
924    ///
925    /// # Errors
926    ///
927    /// Returns an error when `path` cannot be opened for appending.
928    #[tracing::instrument(name = "context.typed_page.open", skip_all)]
929    pub async fn open(path: &std::path::Path, capacity: usize) -> Result<Self, std::io::Error> {
930        use tokio::io::AsyncWriteExt as _;
931
932        if let Some(parent) = path.parent() {
933            tokio::fs::create_dir_all(parent).await?;
934        }
935        let file = tokio::fs::OpenOptions::new()
936            .create(true)
937            .append(true)
938            .open(path)
939            .await?;
940
941        let (tx, mut rx) = tokio::sync::mpsc::channel::<AuditCommand>(capacity.max(1));
942        let drop_counter = Arc::new(std::sync::atomic::AtomicU64::new(0));
943        let drop_counter_bg = drop_counter.clone();
944
945        tokio::spawn(async move {
946            let mut writer = tokio::io::BufWriter::new(file);
947            while let Some(cmd) = rx.recv().await {
948                match cmd {
949                    AuditCommand::Record(record) => match serde_json::to_string(&record) {
950                        Ok(mut line) => {
951                            line.push('\n');
952                            if let Err(e) = writer.write_all(line.as_bytes()).await {
953                                tracing::error!("compaction audit write failed: {e:#}");
954                            }
955                        }
956                        Err(e) => {
957                            tracing::error!("compaction audit serialization failed: {e:#}");
958                        }
959                    },
960                    AuditCommand::Flush(responder) => {
961                        let _ = writer.flush().await;
962                        let _ = responder.send(());
963                    }
964                }
965            }
966            // Flush remaining bytes when channel closes.
967            let _ = writer.flush().await;
968
969            let dropped = drop_counter_bg.load(std::sync::atomic::Ordering::Relaxed);
970            if dropped > 0 {
971                tracing::warn!(dropped, "compaction audit sink closed with dropped records");
972            }
973        });
974
975        Ok(Self { tx, drop_counter })
976    }
977
978    /// Send a record to the audit sink.
979    ///
980    /// If the channel is full the record is dropped and the drop counter is incremented.
981    pub fn send(&self, record: CompactedPageRecord) {
982        match self.tx.try_send(AuditCommand::Record(record)) {
983            Ok(()) => {}
984            Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
985                let prev = self
986                    .drop_counter
987                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
988                tracing::warn!(
989                    dropped_total = prev + 1,
990                    "compaction audit sink full — record dropped (best-effort audit)"
991                );
992            }
993            Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
994                tracing::error!("compaction audit sink closed unexpectedly");
995            }
996        }
997    }
998
999    /// Flush all pending records with bounded 100 ms timeout.
1000    ///
1001    /// Sends a `Flush` sentinel through the same channel as records, so ordering is
1002    /// preserved — the writer task responds only after all preceding records are written.
1003    /// If the writer task does not respond within 100 ms, the flush times out silently.
1004    #[tracing::instrument(name = "context.typed_page.flush", skip_all)]
1005    pub async fn flush(&self) {
1006        let (tx, rx) = tokio::sync::oneshot::channel::<()>();
1007        if self.tx.send(AuditCommand::Flush(tx)).await.is_ok() {
1008            let _ = tokio::time::timeout(Duration::from_millis(100), rx).await;
1009        }
1010    }
1011
1012    /// Number of records dropped due to a full channel.
1013    #[must_use]
1014    pub fn dropped_count(&self) -> u64 {
1015        self.drop_counter.load(std::sync::atomic::Ordering::Relaxed)
1016    }
1017}
1018
1019// ── Tests ─────────────────────────────────────────────────────────────────────
1020
1021#[cfg(test)]
1022mod tests {
1023    use super::*;
1024
1025    // ── PageId ────────────────────────────────────────────────────────────────
1026
1027    #[test]
1028    fn page_id_same_input_same_output() {
1029        let a = PageId::compute(PageType::ToolOutput, "tool:shell", b"exit_code: 0");
1030        let b = PageId::compute(PageType::ToolOutput, "tool:shell", b"exit_code: 0");
1031        assert_eq!(a, b);
1032    }
1033
1034    #[test]
1035    fn page_id_different_type_different_id() {
1036        let a = PageId::compute(PageType::ToolOutput, "tool:shell", b"body");
1037        let b = PageId::compute(PageType::ConversationTurn, "tool:shell", b"body");
1038        assert_ne!(a, b);
1039    }
1040
1041    #[test]
1042    fn page_id_starts_with_blake3_prefix() {
1043        let id = PageId::compute(PageType::SystemContext, "system:persona", b"some content");
1044        assert!(id.0.starts_with("blake3:"));
1045    }
1046
1047    // ── classify ──────────────────────────────────────────────────────────────
1048
1049    #[test]
1050    fn classify_tool_output_prefix() {
1051        assert_eq!(
1052            classify("[tool_output] shell exit_code: 0"),
1053            PageType::ToolOutput
1054        );
1055        assert_eq!(classify("[tool:shell] result"), PageType::ToolOutput);
1056    }
1057
1058    #[test]
1059    fn classify_memory_prefixes() {
1060        assert_eq!(
1061            classify("[cross-session context]\nsome recall"),
1062            PageType::MemoryExcerpt
1063        );
1064        assert_eq!(
1065            classify("[semantic recall]\n- [user] hello"),
1066            PageType::MemoryExcerpt
1067        );
1068        assert_eq!(classify("[known facts]\n- fact"), PageType::MemoryExcerpt);
1069        assert_eq!(
1070            classify("[conversation summaries]\n- 1-10: summary"),
1071            PageType::MemoryExcerpt
1072        );
1073    }
1074
1075    #[test]
1076    fn classify_system_prefixes() {
1077        assert_eq!(classify("[Persona context]\nfact"), PageType::SystemContext);
1078        assert_eq!(classify("[system prompt]"), PageType::SystemContext);
1079    }
1080
1081    #[test]
1082    fn classify_fallback_is_conversation_turn() {
1083        assert_eq!(classify("Hello, world!"), PageType::ConversationTurn);
1084        assert_eq!(classify(""), PageType::ConversationTurn);
1085    }
1086
1087    // ── detect_schema_hint ────────────────────────────────────────────────────
1088
1089    #[test]
1090    fn detect_schema_hint_json() {
1091        assert_eq!(
1092            detect_schema_hint(r#"{"key": "val"}"#, false),
1093            SchemaHint::Json
1094        );
1095        assert_eq!(detect_schema_hint("[1,2,3]", false), SchemaHint::Json);
1096    }
1097
1098    #[test]
1099    fn detect_schema_hint_diff() {
1100        assert_eq!(detect_schema_hint("--- a\n+++ b", false), SchemaHint::Diff);
1101    }
1102
1103    #[test]
1104    fn detect_schema_hint_binary() {
1105        assert_eq!(detect_schema_hint("anything", true), SchemaHint::Binary);
1106    }
1107
1108    #[test]
1109    fn detect_schema_hint_text_fallback() {
1110        assert_eq!(detect_schema_hint("plain text", false), SchemaHint::Text);
1111    }
1112
1113    // ── ToolOutputInvariant ───────────────────────────────────────────────────
1114
1115    #[test]
1116    fn tool_output_invariant_passes_when_fields_present() {
1117        let inv = ToolOutputInvariant;
1118        let page = TypedPage::new(
1119            PageType::ToolOutput,
1120            PageOrigin::ToolPair {
1121                tool_name: "shell".into(),
1122            },
1123            100,
1124            Arc::from("[tool_output] shell exit_code: 0\nsome output"),
1125            Some(SchemaHint::Text),
1126        );
1127        let compacted = CompactedPage {
1128            body: Arc::from("shell exit_status: 0\nkey: value"),
1129            tokens: 10,
1130        };
1131        assert!(inv.verify(&page, &compacted).is_ok());
1132    }
1133
1134    #[test]
1135    fn tool_output_invariant_fails_missing_tool_name() {
1136        let inv = ToolOutputInvariant;
1137        let page = TypedPage::new(
1138            PageType::ToolOutput,
1139            PageOrigin::ToolPair {
1140                tool_name: "my_tool".into(),
1141            },
1142            100,
1143            Arc::from("[tool_output] my_tool exit_code: 0"),
1144            Some(SchemaHint::Text),
1145        );
1146        let compacted = CompactedPage {
1147            body: Arc::from("exit_status: 0"),
1148            tokens: 5,
1149        };
1150        let result = inv.verify(&page, &compacted);
1151        assert!(result.is_err());
1152        let violations = result.unwrap_err();
1153        assert!(violations.iter().any(|v| v.missing_field == "tool_name"));
1154    }
1155
1156    #[test]
1157    fn tool_output_invariant_passes_for_binary() {
1158        let inv = ToolOutputInvariant;
1159        let page = TypedPage::new(
1160            PageType::ToolOutput,
1161            PageOrigin::ToolPair {
1162                tool_name: "binary_tool".into(),
1163            },
1164            100,
1165            Arc::from("<binary:1024 bytes>"),
1166            Some(SchemaHint::Binary),
1167        );
1168        let compacted = CompactedPage {
1169            body: Arc::from("<binary:1024 bytes> (archived)"),
1170            tokens: 5,
1171        };
1172        assert!(inv.verify(&page, &compacted).is_ok());
1173    }
1174
1175    // ── SystemContextInvariant ────────────────────────────────────────────────
1176
1177    #[test]
1178    fn system_context_invariant_passes_with_pointer() {
1179        let inv = SystemContextInvariant;
1180        let page = TypedPage::new(
1181            PageType::SystemContext,
1182            PageOrigin::System {
1183                key: "persona".into(),
1184            },
1185            200,
1186            Arc::from("[Persona context]\nsome persona info"),
1187            None,
1188        );
1189        let compacted = CompactedPage {
1190            body: Arc::from("[system-ptr:blake3:abcdef123456]"),
1191            tokens: 3,
1192        };
1193        assert!(inv.verify(&page, &compacted).is_ok());
1194    }
1195
1196    #[test]
1197    fn system_context_invariant_fails_without_pointer() {
1198        let inv = SystemContextInvariant;
1199        let page = TypedPage::new(
1200            PageType::SystemContext,
1201            PageOrigin::System {
1202                key: "persona".into(),
1203            },
1204            200,
1205            Arc::from("[Persona context]\nsome persona info"),
1206            None,
1207        );
1208        let compacted = CompactedPage {
1209            body: Arc::from("This is a paraphrase of persona info"),
1210            tokens: 10,
1211        };
1212        let result = inv.verify(&page, &compacted);
1213        assert!(result.is_err());
1214        let violations = result.unwrap_err();
1215        assert!(violations.iter().any(|v| v.missing_field == "pointer"));
1216    }
1217
1218    // ── InvariantRegistry ─────────────────────────────────────────────────────
1219
1220    #[test]
1221    fn registry_covers_all_page_types() {
1222        let reg = InvariantRegistry::default();
1223        for pt in [
1224            PageType::ToolOutput,
1225            PageType::ConversationTurn,
1226            PageType::MemoryExcerpt,
1227            PageType::SystemContext,
1228        ] {
1229            assert!(reg.get(pt).is_some(), "missing invariant for {pt:?}");
1230        }
1231    }
1232
1233    #[test]
1234    fn registry_returns_correct_page_type() {
1235        let reg = InvariantRegistry::default();
1236        assert_eq!(
1237            reg.get(PageType::ToolOutput).unwrap().page_type(),
1238            PageType::ToolOutput
1239        );
1240        assert_eq!(
1241            reg.get(PageType::SystemContext).unwrap().page_type(),
1242            PageType::SystemContext
1243        );
1244    }
1245
1246    // ── InvariantRegistry::enforce ────────────────────────────────────────────
1247
1248    #[test]
1249    fn enforce_ok_for_valid_system_pointer() {
1250        let reg = InvariantRegistry::default();
1251        let page = TypedPage::new(
1252            PageType::SystemContext,
1253            PageOrigin::System {
1254                key: "persona".into(),
1255            },
1256            50,
1257            Arc::from("[Persona context]\nrules"),
1258            None,
1259        );
1260        let compacted = CompactedPage {
1261            body: Arc::from("[system-ptr:blake3:aabbccdd11223344]"),
1262            tokens: 3,
1263        };
1264        assert!(reg.enforce(&page, &compacted).is_ok());
1265    }
1266
1267    #[test]
1268    fn enforce_err_for_paraphrased_system_context() {
1269        let reg = InvariantRegistry::default();
1270        let page = TypedPage::new(
1271            PageType::SystemContext,
1272            PageOrigin::System {
1273                key: "persona".into(),
1274            },
1275            50,
1276            Arc::from("[Persona context]\nrules"),
1277            None,
1278        );
1279        let compacted = CompactedPage {
1280            body: Arc::from("The persona says to be helpful."),
1281            tokens: 7,
1282        };
1283        let result = reg.enforce(&page, &compacted);
1284        assert!(result.is_err());
1285        assert!(
1286            result
1287                .unwrap_err()
1288                .iter()
1289                .any(|v| v.missing_field == "pointer")
1290        );
1291    }
1292
1293    #[test]
1294    fn enforce_ok_for_conversation_turn_with_role() {
1295        let reg = InvariantRegistry::default();
1296        let page = TypedPage::new(
1297            PageType::ConversationTurn,
1298            PageOrigin::Turn {
1299                message_id: "42".into(),
1300            },
1301            30,
1302            Arc::from("Hello from user"),
1303            None,
1304        );
1305        let compacted = CompactedPage {
1306            body: Arc::from("user asked about Rust"),
1307            tokens: 5,
1308        };
1309        assert!(reg.enforce(&page, &compacted).is_ok());
1310    }
1311
1312    // ── MemoryExcerptInvariant ────────────────────────────────────────────────
1313
1314    #[test]
1315    fn memory_excerpt_invariant_passes_when_label_present() {
1316        let inv = MemoryExcerptInvariant;
1317        let label = "semantic_recall";
1318        let page = TypedPage::new(
1319            PageType::MemoryExcerpt,
1320            PageOrigin::Excerpt {
1321                source_label: label.into(),
1322            },
1323            80,
1324            Arc::from("[semantic recall]\n- [user] hello"),
1325            None,
1326        );
1327        let compacted = CompactedPage {
1328            body: Arc::from(format!("Summary from {label}: user greeted")),
1329            tokens: 6,
1330        };
1331        assert!(inv.verify(&page, &compacted).is_ok());
1332    }
1333
1334    #[test]
1335    fn memory_excerpt_invariant_fails_when_label_missing() {
1336        let inv = MemoryExcerptInvariant;
1337        let page = TypedPage::new(
1338            PageType::MemoryExcerpt,
1339            PageOrigin::Excerpt {
1340                source_label: "graph_facts".into(),
1341            },
1342            80,
1343            Arc::from("[known facts]\n- Alice works at Acme"),
1344            None,
1345        );
1346        let compacted = CompactedPage {
1347            body: Arc::from("Alice is employed somewhere"),
1348            tokens: 5,
1349        };
1350        let result = inv.verify(&page, &compacted);
1351        assert!(result.is_err());
1352        assert!(
1353            result
1354                .unwrap_err()
1355                .iter()
1356                .any(|v| v.missing_field == "source_label")
1357        );
1358    }
1359
1360    #[test]
1361    fn memory_excerpt_invariant_passes_for_non_excerpt_origin() {
1362        let inv = MemoryExcerptInvariant;
1363        let page = TypedPage::new(
1364            PageType::MemoryExcerpt,
1365            PageOrigin::System {
1366                key: "digests".into(),
1367            },
1368            40,
1369            Arc::from("[system]"),
1370            None,
1371        );
1372        let compacted = CompactedPage {
1373            body: Arc::from("anything"),
1374            tokens: 1,
1375        };
1376        assert!(inv.verify(&page, &compacted).is_ok());
1377    }
1378
1379    // ── ConversationTurnInvariant ─────────────────────────────────────────────
1380
1381    #[test]
1382    fn conversation_turn_invariant_passes_with_role_word() {
1383        let inv = ConversationTurnInvariant;
1384        let page = TypedPage::new(
1385            PageType::ConversationTurn,
1386            PageOrigin::Turn {
1387                message_id: "1".into(),
1388            },
1389            20,
1390            Arc::from("Hello world"),
1391            None,
1392        );
1393        for body in &["user: hi", "assistant replied", "system note"] {
1394            let compacted = CompactedPage {
1395                body: Arc::from(*body),
1396                tokens: 2,
1397            };
1398            assert!(inv.verify(&page, &compacted).is_ok(), "body={body}");
1399        }
1400    }
1401
1402    #[test]
1403    fn conversation_turn_invariant_fails_without_role_word() {
1404        let inv = ConversationTurnInvariant;
1405        let page = TypedPage::new(
1406            PageType::ConversationTurn,
1407            PageOrigin::Turn {
1408                message_id: "2".into(),
1409            },
1410            20,
1411            Arc::from("some turn content"),
1412            None,
1413        );
1414        let compacted = CompactedPage {
1415            body: Arc::from("content was summarized"),
1416            tokens: 3,
1417        };
1418        let result = inv.verify(&page, &compacted);
1419        assert!(result.is_err());
1420        assert!(
1421            result
1422                .unwrap_err()
1423                .iter()
1424                .any(|v| v.missing_field == "role")
1425        );
1426    }
1427
1428    // ── CompactionAuditSink ───────────────────────────────────────────────────
1429
1430    #[tokio::test]
1431    async fn audit_sink_jsonl_roundtrip() {
1432        let dir = tempfile::tempdir().unwrap();
1433        let path = dir.path().join("audit.jsonl");
1434
1435        let sink = CompactionAuditSink::open(&path, 64).await.unwrap();
1436        let record = CompactedPageRecord {
1437            ts: "2026-04-19T00:00:00Z".into(),
1438            turn_id: "1".into(),
1439            page_id: "blake3:aabbccdd".into(),
1440            page_type: PageType::ToolOutput,
1441            origin: PageOrigin::ToolPair {
1442                tool_name: "shell".into(),
1443            },
1444            original_tokens: 100,
1445            compacted_tokens: 20,
1446            fidelity_level: "structured_summary_v1".into(),
1447            invariant_version: 1,
1448            provider_name: "test".into(),
1449            violations: vec![],
1450            classification_fallback: false,
1451        };
1452        sink.send(record);
1453
1454        // Drop the sink to close the channel and let the writer task flush.
1455        drop(sink);
1456        // Give the writer task time to finish.
1457        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1458
1459        let contents = std::fs::read_to_string(&path).unwrap();
1460        assert!(!contents.is_empty(), "audit file should not be empty");
1461        let parsed: serde_json::Value = serde_json::from_str(contents.trim()).unwrap();
1462        assert_eq!(parsed["page_type"], "tool_output");
1463        assert_eq!(parsed["turn_id"], "1");
1464        assert_eq!(parsed["provider_name"], "test");
1465    }
1466
1467    #[tokio::test]
1468    async fn audit_sink_drop_counter_increments_when_full() {
1469        let dir = tempfile::tempdir().unwrap();
1470        let path = dir.path().join("audit_full.jsonl");
1471
1472        // Capacity 1: first send fills the channel, subsequent sends are dropped.
1473        let sink = CompactionAuditSink::open(&path, 1).await.unwrap();
1474
1475        let make_record = || CompactedPageRecord {
1476            ts: "2026-04-19T00:00:00Z".into(),
1477            turn_id: "x".into(),
1478            page_id: "blake3:00".into(),
1479            page_type: PageType::ConversationTurn,
1480            origin: PageOrigin::Turn {
1481                message_id: "0".into(),
1482            },
1483            original_tokens: 10,
1484            compacted_tokens: 5,
1485            fidelity_level: "semantic_summary_v1".into(),
1486            invariant_version: 1,
1487            provider_name: "test".into(),
1488            violations: vec![],
1489            classification_fallback: false,
1490        };
1491
1492        // Send enough records to guarantee overflow.
1493        for _ in 0..10 {
1494            sink.send(make_record());
1495        }
1496
1497        assert!(
1498            sink.dropped_count() > 0,
1499            "expected at least one dropped record"
1500        );
1501    }
1502
1503    #[tokio::test]
1504    async fn audit_sink_flush_does_not_panic() {
1505        let dir = tempfile::tempdir().unwrap();
1506        let path = dir.path().join("audit_flush.jsonl");
1507        let sink = CompactionAuditSink::open(&path, 16).await.unwrap();
1508        // flush on an empty sink must not panic or deadlock.
1509        sink.flush().await;
1510    }
1511
1512    // ── classify_with_role ────────────────────────────────────────────────────
1513
1514    #[test]
1515    fn classify_with_role_system_flag_overrides_fallback() {
1516        assert_eq!(
1517            classify_with_role("You are a helpful assistant.", true),
1518            PageType::SystemContext
1519        );
1520    }
1521
1522    #[test]
1523    fn classify_with_role_prefix_wins_over_system_flag() {
1524        assert_eq!(
1525            classify_with_role("[tool_output] exit_code: 0", false),
1526            PageType::ToolOutput
1527        );
1528    }
1529
1530    #[test]
1531    fn classify_with_role_false_still_falls_back_to_conversation_turn() {
1532        assert_eq!(
1533            classify_with_role("random prose without markers", false),
1534            PageType::ConversationTurn
1535        );
1536    }
1537
1538    // ── check_json_structural_key (via ToolOutputInvariant) ───────────────────
1539
1540    #[test]
1541    fn tool_output_json_structural_check_passes_when_key_preserved() {
1542        let inv = ToolOutputInvariant;
1543        let original_body = r#"{"exit_code": 0, "stdout": "ok"}"#;
1544        let page = TypedPage::new(
1545            PageType::ToolOutput,
1546            PageOrigin::ToolPair {
1547                tool_name: "shell".into(),
1548            },
1549            50,
1550            Arc::from(original_body),
1551            Some(SchemaHint::Json),
1552        );
1553        // Compacted body references "exit_code" and "shell".
1554        let compacted = CompactedPage {
1555            body: Arc::from("shell exit_code: 0, stdout was ok"),
1556            tokens: 8,
1557        };
1558        assert!(inv.verify(&page, &compacted).is_ok());
1559    }
1560
1561    #[test]
1562    fn tool_output_json_structural_check_fails_when_no_key_preserved() {
1563        let inv = ToolOutputInvariant;
1564        let original_body = r#"{"some_field": "value", "other_field": 42}"#;
1565        let page = TypedPage::new(
1566            PageType::ToolOutput,
1567            PageOrigin::ToolPair {
1568                tool_name: "my_tool".into(),
1569            },
1570            50,
1571            Arc::from(original_body),
1572            Some(SchemaHint::Json),
1573        );
1574        // Compacted body references tool name and status but none of the JSON keys.
1575        let compacted = CompactedPage {
1576            body: Arc::from("my_tool exit_status: 0 completed successfully"),
1577            tokens: 7,
1578        };
1579        let result = inv.verify(&page, &compacted);
1580        assert!(result.is_err());
1581        let violations = result.unwrap_err();
1582        assert!(
1583            violations
1584                .iter()
1585                .any(|v| v.missing_field == "structural_key")
1586        );
1587    }
1588
1589    // ── Regression: F1 — capacity=0 must not panic ────────────────────────────
1590
1591    #[tokio::test]
1592    async fn audit_sink_capacity_zero_does_not_panic() {
1593        let dir = tempfile::tempdir().unwrap();
1594        let path = dir.path().join("cap0.jsonl");
1595        // capacity=0 used to panic in tokio::sync::mpsc::channel(0); must clamp to 1.
1596        let sink = CompactionAuditSink::open(&path, 0).await.unwrap();
1597        sink.flush().await;
1598    }
1599
1600    // ── Regression: F3 — non-ASCII body must not panic on prefix slice ────────
1601
1602    #[test]
1603    fn classify_with_role_non_ascii_body_does_not_panic() {
1604        // CJK and emoji span multiple bytes; a naive &body[..80] would panic at a
1605        // mid-character byte boundary. classify_with_role must not panic for any input.
1606        let cjk = "你好世界".repeat(20); // 80+ bytes, 4 bytes each
1607        let emoji = "🦀".repeat(30); // 120+ bytes, 4 bytes each
1608        let mixed = "abc🦀中文".repeat(15);
1609
1610        // None of these must panic:
1611        let _ = classify_with_role(&cjk, false);
1612        let _ = classify_with_role(&emoji, false);
1613        let _ = classify_with_role(&mixed, false);
1614    }
1615}