use std::sync::Arc;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PageType {
ToolOutput,
ConversationTurn,
MemoryExcerpt,
SystemContext,
}
impl std::fmt::Display for PageType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ToolOutput => f.write_str("tool_output"),
Self::ConversationTurn => f.write_str("conversation_turn"),
Self::MemoryExcerpt => f.write_str("memory_excerpt"),
Self::SystemContext => f.write_str("system_context"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum PageOrigin {
ToolPair {
tool_name: String,
},
Turn {
message_id: String,
},
Excerpt {
source_label: String,
},
System {
key: String,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SchemaHint {
Json,
Text,
Diff,
Table,
Binary,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct PageId(pub String);
impl PageId {
#[must_use]
pub fn compute(page_type: PageType, origin_key: &str, body: &[u8]) -> Self {
let mut hasher = blake3::Hasher::new();
hasher.update(page_type.to_string().as_bytes());
hasher.update(b"|");
hasher.update(origin_key.as_bytes());
hasher.update(b"|");
hasher.update(body);
let hash = hasher.finalize();
let mut hex = String::with_capacity(32);
for b in &hash.as_bytes()[..16] {
use std::fmt::Write as _;
let _ = write!(hex, "{b:02x}");
}
Self(format!("blake3:{hex}"))
}
}
#[derive(Debug, Clone)]
pub struct TypedPage {
pub page_id: PageId,
pub page_type: PageType,
pub origin: PageOrigin,
pub tokens: u32,
pub body: Arc<str>,
pub schema_hint: Option<SchemaHint>,
}
impl TypedPage {
#[must_use]
pub fn new(
page_type: PageType,
origin: PageOrigin,
tokens: u32,
body: Arc<str>,
schema_hint: Option<SchemaHint>,
) -> Self {
let origin_key = origin_key_for(&origin);
let page_id = PageId::compute(page_type, &origin_key, body.as_bytes());
Self {
page_id,
page_type,
origin,
tokens,
body,
schema_hint,
}
}
}
fn origin_key_for(origin: &PageOrigin) -> String {
match origin {
PageOrigin::ToolPair { tool_name } => format!("tool:{tool_name}"),
PageOrigin::Turn { message_id } => format!("turn:{message_id}"),
PageOrigin::Excerpt { source_label } => format!("excerpt:{source_label}"),
PageOrigin::System { key } => format!("system:{key}"),
}
}
#[derive(Debug, Clone)]
pub struct FidelityContract {
pub fidelity_level: &'static str,
pub invariant_version: u8,
pub required_fields: &'static [&'static str],
}
#[derive(Debug, Clone, Serialize)]
pub struct FidelityViolation {
pub missing_field: String,
pub detail: String,
}
#[derive(Debug, Clone)]
pub struct CompactedPage {
pub body: Arc<str>,
pub tokens: u32,
}
pub trait PageInvariant: Send + Sync {
fn page_type(&self) -> PageType;
fn minimum_fidelity(&self, page: &TypedPage) -> FidelityContract;
fn verify(
&self,
original: &TypedPage,
compacted: &CompactedPage,
) -> Result<(), Vec<FidelityViolation>>;
}
pub struct ToolOutputInvariant;
impl PageInvariant for ToolOutputInvariant {
fn page_type(&self) -> PageType {
PageType::ToolOutput
}
fn minimum_fidelity(&self, _page: &TypedPage) -> FidelityContract {
FidelityContract {
fidelity_level: "structured_summary_v1",
invariant_version: 1,
required_fields: &["tool_name", "exit_status"],
}
}
fn verify(
&self,
original: &TypedPage,
compacted: &CompactedPage,
) -> Result<(), Vec<FidelityViolation>> {
let body = compacted.body.as_ref();
if original.schema_hint == Some(SchemaHint::Binary) {
return Ok(());
}
let mut violations = Vec::new();
let tool_name = match &original.origin {
PageOrigin::ToolPair { tool_name } => tool_name.as_str(),
_ => "",
};
if !tool_name.is_empty() && !body.contains(tool_name) {
violations.push(FidelityViolation {
missing_field: "tool_name".into(),
detail: format!("compacted body does not reference tool '{tool_name}'"),
});
}
let has_status = body.contains("exit_status")
|| body.contains("exit_code")
|| body.contains("status:")
|| body.contains("Status:")
|| body.contains("exit:")
|| body.contains("rc:");
if !has_status {
violations.push(FidelityViolation {
missing_field: "exit_status".into(),
detail: "compacted body does not contain an exit status indicator".into(),
});
}
if original.schema_hint == Some(SchemaHint::Json) {
let original_body = original.body.as_ref();
let preserved = check_json_structural_key(original_body, body);
if !preserved {
violations.push(FidelityViolation {
missing_field: "structural_key".into(),
detail: "compacted JSON tool output does not reference any top-level field \
name from the original output"
.into(),
});
}
}
if violations.is_empty() {
Ok(())
} else {
Err(violations)
}
}
}
fn check_json_structural_key(original: &str, compacted: &str) -> bool {
let Ok(value) = serde_json::from_str::<serde_json::Value>(original) else {
return true;
};
let Some(obj) = value.as_object() else {
return true;
};
if obj.is_empty() {
return true;
}
obj.keys().any(|k| compacted.contains(k.as_str()))
}
pub struct ConversationTurnInvariant;
impl PageInvariant for ConversationTurnInvariant {
fn page_type(&self) -> PageType {
PageType::ConversationTurn
}
fn minimum_fidelity(&self, _page: &TypedPage) -> FidelityContract {
FidelityContract {
fidelity_level: "semantic_summary_v1",
invariant_version: 1,
required_fields: &["role"],
}
}
fn verify(
&self,
_original: &TypedPage,
compacted: &CompactedPage,
) -> Result<(), Vec<FidelityViolation>> {
let body = compacted.body.as_ref();
let has_role =
body.contains("user") || body.contains("assistant") || body.contains("system");
if !has_role {
return Err(vec![FidelityViolation {
missing_field: "role".into(),
detail: "compacted turn does not identify a speaker role".into(),
}]);
}
Ok(())
}
}
pub struct MemoryExcerptInvariant;
impl PageInvariant for MemoryExcerptInvariant {
fn page_type(&self) -> PageType {
PageType::MemoryExcerpt
}
fn minimum_fidelity(&self, _page: &TypedPage) -> FidelityContract {
FidelityContract {
fidelity_level: "excerpt_summary_v1",
invariant_version: 1,
required_fields: &["source_label"],
}
}
fn verify(
&self,
original: &TypedPage,
compacted: &CompactedPage,
) -> Result<(), Vec<FidelityViolation>> {
let source_label = match &original.origin {
PageOrigin::Excerpt { source_label } => source_label.as_str(),
_ => return Ok(()),
};
if !compacted.body.contains(source_label) {
return Err(vec![FidelityViolation {
missing_field: "source_label".into(),
detail: format!("compacted excerpt does not contain source label '{source_label}'"),
}]);
}
Ok(())
}
}
pub struct SystemContextInvariant;
pub const SYSTEM_POINTER_PREFIX: &str = "[system-ptr:";
impl PageInvariant for SystemContextInvariant {
fn page_type(&self) -> PageType {
PageType::SystemContext
}
fn minimum_fidelity(&self, _page: &TypedPage) -> FidelityContract {
FidelityContract {
fidelity_level: "pointer_replace_v1",
invariant_version: 1,
required_fields: &["pointer"],
}
}
fn verify(
&self,
_original: &TypedPage,
compacted: &CompactedPage,
) -> Result<(), Vec<FidelityViolation>> {
if !compacted.body.starts_with(SYSTEM_POINTER_PREFIX) {
return Err(vec![FidelityViolation {
missing_field: "pointer".into(),
detail: format!(
"SystemContext page was not pointer-replaced \
(body does not start with '{SYSTEM_POINTER_PREFIX}')"
),
}]);
}
Ok(())
}
}
pub struct InvariantRegistry {
tool_output: Box<dyn PageInvariant>,
conversation_turn: Box<dyn PageInvariant>,
memory_excerpt: Box<dyn PageInvariant>,
system_context: Box<dyn PageInvariant>,
}
impl Default for InvariantRegistry {
fn default() -> Self {
Self {
tool_output: Box::new(ToolOutputInvariant),
conversation_turn: Box::new(ConversationTurnInvariant),
memory_excerpt: Box::new(MemoryExcerptInvariant),
system_context: Box::new(SystemContextInvariant),
}
}
}
impl InvariantRegistry {
#[must_use]
pub fn get(&self, page_type: PageType) -> Option<&dyn PageInvariant> {
match page_type {
PageType::ToolOutput => Some(self.tool_output.as_ref()),
PageType::ConversationTurn => Some(self.conversation_turn.as_ref()),
PageType::MemoryExcerpt => Some(self.memory_excerpt.as_ref()),
PageType::SystemContext => Some(self.system_context.as_ref()),
}
}
pub fn enforce(
&self,
original: &TypedPage,
compacted: &CompactedPage,
) -> Result<(), Vec<FidelityViolation>> {
let _span = tracing::info_span!(
"context.compaction.typed_page",
page_type = %original.page_type,
page_id = %original.page_id.0,
original_tokens = original.tokens,
compacted_tokens = compacted.tokens,
)
.entered();
if let Some(inv) = self.get(original.page_type) {
inv.verify(original, compacted)
} else {
tracing::warn!(
page_type = %original.page_type,
"no invariant registered for page type — skipping verification"
);
Ok(())
}
}
}
#[must_use]
pub fn classify(body: &str) -> PageType {
classify_with_role(body, false)
}
#[must_use]
pub fn classify_with_role(body: &str, is_system_role: bool) -> PageType {
tracing::info_span!(
"context.compaction.typed_page.classify",
body_len = body.len()
)
.in_scope(|| classify_with_role_inner(body, is_system_role))
}
fn classify_with_role_inner(body: &str, is_system_role: bool) -> PageType {
const TOOL_PREFIXES: &[&str] = &["[tool_output]", "[tool:", "[Tool output]"];
const MEMORY_PREFIXES: &[&str] = &[
"[cross-session context]",
"[semantic recall]",
"[known facts]",
"[conversation summaries]",
"[past corrections]",
"## Relevant documents",
];
const SYSTEM_PREFIXES: &[&str] = &[
"[Persona context]",
"[Past experience]",
"[Memory summary]",
"[system",
"[skill",
"[persona",
"[digest",
"[compression",
];
let trimmed = body.trim_start();
for prefix in TOOL_PREFIXES {
if trimmed.starts_with(prefix) {
return PageType::ToolOutput;
}
}
for prefix in MEMORY_PREFIXES {
if trimmed.starts_with(prefix) {
return PageType::MemoryExcerpt;
}
}
for prefix in SYSTEM_PREFIXES {
if trimmed.starts_with(prefix) {
return PageType::SystemContext;
}
}
if is_system_role {
return PageType::SystemContext;
}
tracing::warn!(
body_prefix = &body[..body.len().min(80)],
"typed-page classification fallback to ConversationTurn"
);
PageType::ConversationTurn
}
#[must_use]
pub fn detect_schema_hint(body: &str, is_binary: bool) -> SchemaHint {
if is_binary || body.contains('\u{FFFD}') {
return SchemaHint::Binary;
}
let trimmed = body.trim_start();
if trimmed.starts_with('{') || trimmed.starts_with('[') {
return SchemaHint::Json;
}
if trimmed.starts_with("--- ")
|| trimmed.starts_with("+++ ")
|| trimmed.starts_with("diff --git")
|| trimmed.starts_with("diff -")
{
return SchemaHint::Diff;
}
let first_line = trimmed.lines().next().unwrap_or("");
if first_line.matches('\t').count() >= 2 || first_line.matches('|').count() >= 2 {
return SchemaHint::Table;
}
SchemaHint::Text
}
#[derive(Debug, Serialize)]
pub struct CompactedPageRecord {
pub ts: String,
pub turn_id: String,
pub page_id: String,
pub page_type: PageType,
pub origin: PageOrigin,
pub original_tokens: u32,
pub compacted_tokens: u32,
pub fidelity_level: String,
pub invariant_version: u8,
pub provider_name: String,
pub violations: Vec<FidelityViolation>,
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub classification_fallback: bool,
}
#[derive(Debug, Clone)]
pub struct CompactionAuditSink {
tx: tokio::sync::mpsc::Sender<CompactedPageRecord>,
drop_counter: Arc<std::sync::atomic::AtomicU64>,
}
impl CompactionAuditSink {
pub async fn open(path: &std::path::Path, capacity: usize) -> Result<Self, std::io::Error> {
use tokio::io::AsyncWriteExt as _;
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let file = tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
.await?;
let (tx, mut rx) = tokio::sync::mpsc::channel::<CompactedPageRecord>(capacity);
let drop_counter = Arc::new(std::sync::atomic::AtomicU64::new(0));
let drop_counter_bg = drop_counter.clone();
tokio::spawn(async move {
let mut writer = tokio::io::BufWriter::new(file);
while let Some(record) = rx.recv().await {
match serde_json::to_string(&record) {
Ok(mut line) => {
line.push('\n');
if let Err(e) = writer.write_all(line.as_bytes()).await {
tracing::error!("compaction audit write failed: {e:#}");
}
}
Err(e) => {
tracing::error!("compaction audit serialization failed: {e:#}");
}
}
}
let _ = writer.flush().await;
let dropped = drop_counter_bg.load(std::sync::atomic::Ordering::Relaxed);
if dropped > 0 {
tracing::warn!(dropped, "compaction audit sink closed with dropped records");
}
});
Ok(Self { tx, drop_counter })
}
pub fn send(&self, record: CompactedPageRecord) {
match self.tx.try_send(record) {
Ok(()) => {}
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
let prev = self
.drop_counter
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
tracing::warn!(
dropped_total = prev + 1,
"compaction audit sink full — record dropped (best-effort audit)"
);
}
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
tracing::error!("compaction audit sink closed unexpectedly");
}
}
}
pub async fn flush(&self) {
tokio::task::yield_now().await;
}
#[must_use]
pub fn dropped_count(&self) -> u64 {
self.drop_counter.load(std::sync::atomic::Ordering::Relaxed)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn page_id_same_input_same_output() {
let a = PageId::compute(PageType::ToolOutput, "tool:shell", b"exit_code: 0");
let b = PageId::compute(PageType::ToolOutput, "tool:shell", b"exit_code: 0");
assert_eq!(a, b);
}
#[test]
fn page_id_different_type_different_id() {
let a = PageId::compute(PageType::ToolOutput, "tool:shell", b"body");
let b = PageId::compute(PageType::ConversationTurn, "tool:shell", b"body");
assert_ne!(a, b);
}
#[test]
fn page_id_starts_with_blake3_prefix() {
let id = PageId::compute(PageType::SystemContext, "system:persona", b"some content");
assert!(id.0.starts_with("blake3:"));
}
#[test]
fn classify_tool_output_prefix() {
assert_eq!(
classify("[tool_output] shell exit_code: 0"),
PageType::ToolOutput
);
assert_eq!(classify("[tool:shell] result"), PageType::ToolOutput);
}
#[test]
fn classify_memory_prefixes() {
assert_eq!(
classify("[cross-session context]\nsome recall"),
PageType::MemoryExcerpt
);
assert_eq!(
classify("[semantic recall]\n- [user] hello"),
PageType::MemoryExcerpt
);
assert_eq!(classify("[known facts]\n- fact"), PageType::MemoryExcerpt);
assert_eq!(
classify("[conversation summaries]\n- 1-10: summary"),
PageType::MemoryExcerpt
);
}
#[test]
fn classify_system_prefixes() {
assert_eq!(classify("[Persona context]\nfact"), PageType::SystemContext);
assert_eq!(classify("[system prompt]"), PageType::SystemContext);
}
#[test]
fn classify_fallback_is_conversation_turn() {
assert_eq!(classify("Hello, world!"), PageType::ConversationTurn);
assert_eq!(classify(""), PageType::ConversationTurn);
}
#[test]
fn detect_schema_hint_json() {
assert_eq!(
detect_schema_hint(r#"{"key": "val"}"#, false),
SchemaHint::Json
);
assert_eq!(detect_schema_hint("[1,2,3]", false), SchemaHint::Json);
}
#[test]
fn detect_schema_hint_diff() {
assert_eq!(detect_schema_hint("--- a\n+++ b", false), SchemaHint::Diff);
}
#[test]
fn detect_schema_hint_binary() {
assert_eq!(detect_schema_hint("anything", true), SchemaHint::Binary);
}
#[test]
fn detect_schema_hint_text_fallback() {
assert_eq!(detect_schema_hint("plain text", false), SchemaHint::Text);
}
#[test]
fn tool_output_invariant_passes_when_fields_present() {
let inv = ToolOutputInvariant;
let page = TypedPage::new(
PageType::ToolOutput,
PageOrigin::ToolPair {
tool_name: "shell".into(),
},
100,
Arc::from("[tool_output] shell exit_code: 0\nsome output"),
Some(SchemaHint::Text),
);
let compacted = CompactedPage {
body: Arc::from("shell exit_status: 0\nkey: value"),
tokens: 10,
};
assert!(inv.verify(&page, &compacted).is_ok());
}
#[test]
fn tool_output_invariant_fails_missing_tool_name() {
let inv = ToolOutputInvariant;
let page = TypedPage::new(
PageType::ToolOutput,
PageOrigin::ToolPair {
tool_name: "my_tool".into(),
},
100,
Arc::from("[tool_output] my_tool exit_code: 0"),
Some(SchemaHint::Text),
);
let compacted = CompactedPage {
body: Arc::from("exit_status: 0"),
tokens: 5,
};
let result = inv.verify(&page, &compacted);
assert!(result.is_err());
let violations = result.unwrap_err();
assert!(violations.iter().any(|v| v.missing_field == "tool_name"));
}
#[test]
fn tool_output_invariant_passes_for_binary() {
let inv = ToolOutputInvariant;
let page = TypedPage::new(
PageType::ToolOutput,
PageOrigin::ToolPair {
tool_name: "binary_tool".into(),
},
100,
Arc::from("<binary:1024 bytes>"),
Some(SchemaHint::Binary),
);
let compacted = CompactedPage {
body: Arc::from("<binary:1024 bytes> (archived)"),
tokens: 5,
};
assert!(inv.verify(&page, &compacted).is_ok());
}
#[test]
fn system_context_invariant_passes_with_pointer() {
let inv = SystemContextInvariant;
let page = TypedPage::new(
PageType::SystemContext,
PageOrigin::System {
key: "persona".into(),
},
200,
Arc::from("[Persona context]\nsome persona info"),
None,
);
let compacted = CompactedPage {
body: Arc::from("[system-ptr:blake3:abcdef123456]"),
tokens: 3,
};
assert!(inv.verify(&page, &compacted).is_ok());
}
#[test]
fn system_context_invariant_fails_without_pointer() {
let inv = SystemContextInvariant;
let page = TypedPage::new(
PageType::SystemContext,
PageOrigin::System {
key: "persona".into(),
},
200,
Arc::from("[Persona context]\nsome persona info"),
None,
);
let compacted = CompactedPage {
body: Arc::from("This is a paraphrase of persona info"),
tokens: 10,
};
let result = inv.verify(&page, &compacted);
assert!(result.is_err());
let violations = result.unwrap_err();
assert!(violations.iter().any(|v| v.missing_field == "pointer"));
}
#[test]
fn registry_covers_all_page_types() {
let reg = InvariantRegistry::default();
for pt in [
PageType::ToolOutput,
PageType::ConversationTurn,
PageType::MemoryExcerpt,
PageType::SystemContext,
] {
assert!(reg.get(pt).is_some(), "missing invariant for {pt:?}");
}
}
#[test]
fn registry_returns_correct_page_type() {
let reg = InvariantRegistry::default();
assert_eq!(
reg.get(PageType::ToolOutput).unwrap().page_type(),
PageType::ToolOutput
);
assert_eq!(
reg.get(PageType::SystemContext).unwrap().page_type(),
PageType::SystemContext
);
}
#[test]
fn enforce_ok_for_valid_system_pointer() {
let reg = InvariantRegistry::default();
let page = TypedPage::new(
PageType::SystemContext,
PageOrigin::System {
key: "persona".into(),
},
50,
Arc::from("[Persona context]\nrules"),
None,
);
let compacted = CompactedPage {
body: Arc::from("[system-ptr:blake3:aabbccdd11223344]"),
tokens: 3,
};
assert!(reg.enforce(&page, &compacted).is_ok());
}
#[test]
fn enforce_err_for_paraphrased_system_context() {
let reg = InvariantRegistry::default();
let page = TypedPage::new(
PageType::SystemContext,
PageOrigin::System {
key: "persona".into(),
},
50,
Arc::from("[Persona context]\nrules"),
None,
);
let compacted = CompactedPage {
body: Arc::from("The persona says to be helpful."),
tokens: 7,
};
let result = reg.enforce(&page, &compacted);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.iter()
.any(|v| v.missing_field == "pointer")
);
}
#[test]
fn enforce_ok_for_conversation_turn_with_role() {
let reg = InvariantRegistry::default();
let page = TypedPage::new(
PageType::ConversationTurn,
PageOrigin::Turn {
message_id: "42".into(),
},
30,
Arc::from("Hello from user"),
None,
);
let compacted = CompactedPage {
body: Arc::from("user asked about Rust"),
tokens: 5,
};
assert!(reg.enforce(&page, &compacted).is_ok());
}
#[test]
fn memory_excerpt_invariant_passes_when_label_present() {
let inv = MemoryExcerptInvariant;
let label = "semantic_recall";
let page = TypedPage::new(
PageType::MemoryExcerpt,
PageOrigin::Excerpt {
source_label: label.into(),
},
80,
Arc::from("[semantic recall]\n- [user] hello"),
None,
);
let compacted = CompactedPage {
body: Arc::from(format!("Summary from {label}: user greeted")),
tokens: 6,
};
assert!(inv.verify(&page, &compacted).is_ok());
}
#[test]
fn memory_excerpt_invariant_fails_when_label_missing() {
let inv = MemoryExcerptInvariant;
let page = TypedPage::new(
PageType::MemoryExcerpt,
PageOrigin::Excerpt {
source_label: "graph_facts".into(),
},
80,
Arc::from("[known facts]\n- Alice works at Acme"),
None,
);
let compacted = CompactedPage {
body: Arc::from("Alice is employed somewhere"),
tokens: 5,
};
let result = inv.verify(&page, &compacted);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.iter()
.any(|v| v.missing_field == "source_label")
);
}
#[test]
fn memory_excerpt_invariant_passes_for_non_excerpt_origin() {
let inv = MemoryExcerptInvariant;
let page = TypedPage::new(
PageType::MemoryExcerpt,
PageOrigin::System {
key: "digests".into(),
},
40,
Arc::from("[system]"),
None,
);
let compacted = CompactedPage {
body: Arc::from("anything"),
tokens: 1,
};
assert!(inv.verify(&page, &compacted).is_ok());
}
#[test]
fn conversation_turn_invariant_passes_with_role_word() {
let inv = ConversationTurnInvariant;
let page = TypedPage::new(
PageType::ConversationTurn,
PageOrigin::Turn {
message_id: "1".into(),
},
20,
Arc::from("Hello world"),
None,
);
for body in &["user: hi", "assistant replied", "system note"] {
let compacted = CompactedPage {
body: Arc::from(*body),
tokens: 2,
};
assert!(inv.verify(&page, &compacted).is_ok(), "body={body}");
}
}
#[test]
fn conversation_turn_invariant_fails_without_role_word() {
let inv = ConversationTurnInvariant;
let page = TypedPage::new(
PageType::ConversationTurn,
PageOrigin::Turn {
message_id: "2".into(),
},
20,
Arc::from("some turn content"),
None,
);
let compacted = CompactedPage {
body: Arc::from("content was summarized"),
tokens: 3,
};
let result = inv.verify(&page, &compacted);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.iter()
.any(|v| v.missing_field == "role")
);
}
#[tokio::test]
async fn audit_sink_jsonl_roundtrip() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("audit.jsonl");
let sink = CompactionAuditSink::open(&path, 64).await.unwrap();
let record = CompactedPageRecord {
ts: "2026-04-19T00:00:00Z".into(),
turn_id: "1".into(),
page_id: "blake3:aabbccdd".into(),
page_type: PageType::ToolOutput,
origin: PageOrigin::ToolPair {
tool_name: "shell".into(),
},
original_tokens: 100,
compacted_tokens: 20,
fidelity_level: "structured_summary_v1".into(),
invariant_version: 1,
provider_name: "test".into(),
violations: vec![],
classification_fallback: false,
};
sink.send(record);
drop(sink);
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let contents = std::fs::read_to_string(&path).unwrap();
assert!(!contents.is_empty(), "audit file should not be empty");
let parsed: serde_json::Value = serde_json::from_str(contents.trim()).unwrap();
assert_eq!(parsed["page_type"], "tool_output");
assert_eq!(parsed["turn_id"], "1");
assert_eq!(parsed["provider_name"], "test");
}
#[tokio::test]
async fn audit_sink_drop_counter_increments_when_full() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("audit_full.jsonl");
let sink = CompactionAuditSink::open(&path, 1).await.unwrap();
let make_record = || CompactedPageRecord {
ts: "2026-04-19T00:00:00Z".into(),
turn_id: "x".into(),
page_id: "blake3:00".into(),
page_type: PageType::ConversationTurn,
origin: PageOrigin::Turn {
message_id: "0".into(),
},
original_tokens: 10,
compacted_tokens: 5,
fidelity_level: "semantic_summary_v1".into(),
invariant_version: 1,
provider_name: "test".into(),
violations: vec![],
classification_fallback: false,
};
for _ in 0..10 {
sink.send(make_record());
}
assert!(
sink.dropped_count() > 0,
"expected at least one dropped record"
);
}
#[tokio::test]
async fn audit_sink_flush_does_not_panic() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("audit_flush.jsonl");
let sink = CompactionAuditSink::open(&path, 16).await.unwrap();
sink.flush().await;
}
#[test]
fn classify_with_role_system_flag_overrides_fallback() {
assert_eq!(
classify_with_role("You are a helpful assistant.", true),
PageType::SystemContext
);
}
#[test]
fn classify_with_role_prefix_wins_over_system_flag() {
assert_eq!(
classify_with_role("[tool_output] exit_code: 0", false),
PageType::ToolOutput
);
}
#[test]
fn classify_with_role_false_still_falls_back_to_conversation_turn() {
assert_eq!(
classify_with_role("random prose without markers", false),
PageType::ConversationTurn
);
}
#[test]
fn tool_output_json_structural_check_passes_when_key_preserved() {
let inv = ToolOutputInvariant;
let original_body = r#"{"exit_code": 0, "stdout": "ok"}"#;
let page = TypedPage::new(
PageType::ToolOutput,
PageOrigin::ToolPair {
tool_name: "shell".into(),
},
50,
Arc::from(original_body),
Some(SchemaHint::Json),
);
let compacted = CompactedPage {
body: Arc::from("shell exit_code: 0, stdout was ok"),
tokens: 8,
};
assert!(inv.verify(&page, &compacted).is_ok());
}
#[test]
fn tool_output_json_structural_check_fails_when_no_key_preserved() {
let inv = ToolOutputInvariant;
let original_body = r#"{"some_field": "value", "other_field": 42}"#;
let page = TypedPage::new(
PageType::ToolOutput,
PageOrigin::ToolPair {
tool_name: "my_tool".into(),
},
50,
Arc::from(original_body),
Some(SchemaHint::Json),
);
let compacted = CompactedPage {
body: Arc::from("my_tool exit_status: 0 completed successfully"),
tokens: 7,
};
let result = inv.verify(&page, &compacted);
assert!(result.is_err());
let violations = result.unwrap_err();
assert!(
violations
.iter()
.any(|v| v.missing_field == "structural_key")
);
}
}