1use std::sync::Arc;
24use std::time::Duration;
25
26use serde::{Deserialize, Serialize};
27
28#[non_exhaustive]
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
41#[serde(rename_all = "snake_case")]
42pub enum PageType {
43 ToolOutput,
45 ConversationTurn,
47 MemoryExcerpt,
49 SystemContext,
51}
52
53impl std::fmt::Display for PageType {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 match self {
56 Self::ToolOutput => f.write_str("tool_output"),
57 Self::ConversationTurn => f.write_str("conversation_turn"),
58 Self::MemoryExcerpt => f.write_str("memory_excerpt"),
59 Self::SystemContext => f.write_str("system_context"),
60 }
61 }
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
68#[serde(tag = "kind", rename_all = "snake_case")]
69#[non_exhaustive]
70pub enum PageOrigin {
71 ToolPair {
73 tool_name: String,
75 },
76 Turn {
78 message_id: String,
80 },
81 Excerpt {
83 source_label: String,
85 },
86 System {
88 key: String,
90 },
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
99#[serde(rename_all = "snake_case")]
100#[non_exhaustive]
101pub enum SchemaHint {
102 Json,
104 Text,
106 Diff,
108 Table,
110 Binary,
112}
113
114#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
130pub struct PageId(pub String);
131
132impl PageId {
133 #[must_use]
135 pub fn compute(page_type: PageType, origin_key: &str, body: &[u8]) -> Self {
136 let mut hasher = blake3::Hasher::new();
137 hasher.update(page_type.to_string().as_bytes());
138 hasher.update(b"|");
139 hasher.update(origin_key.as_bytes());
140 hasher.update(b"|");
141 hasher.update(body);
142 let hash = hasher.finalize();
143 let mut hex = String::with_capacity(32);
145 for b in &hash.as_bytes()[..16] {
146 use std::fmt::Write as _;
147 let _ = write!(hex, "{b:02x}");
148 }
149 Self(format!("blake3:{hex}"))
150 }
151}
152
153#[derive(Debug, Clone)]
161pub struct TypedPage {
162 pub page_id: PageId,
164 pub page_type: PageType,
166 pub origin: PageOrigin,
168 pub tokens: u32,
170 pub body: Arc<str>,
172 pub schema_hint: Option<SchemaHint>,
174}
175
176impl TypedPage {
177 #[must_use]
179 pub fn new(
180 page_type: PageType,
181 origin: PageOrigin,
182 tokens: u32,
183 body: Arc<str>,
184 schema_hint: Option<SchemaHint>,
185 ) -> Self {
186 let origin_key = origin_key_for(&origin);
187 let page_id = PageId::compute(page_type, &origin_key, body.as_bytes());
188 Self {
189 page_id,
190 page_type,
191 origin,
192 tokens,
193 body,
194 schema_hint,
195 }
196 }
197}
198
199fn origin_key_for(origin: &PageOrigin) -> String {
200 match origin {
201 PageOrigin::ToolPair { tool_name } => format!("tool:{tool_name}"),
202 PageOrigin::Turn { message_id } => format!("turn:{message_id}"),
203 PageOrigin::Excerpt { source_label } => format!("excerpt:{source_label}"),
204 PageOrigin::System { key } => format!("system:{key}"),
205 }
206}
207
208#[derive(Debug, Clone)]
215pub struct FidelityContract {
216 pub fidelity_level: &'static str,
218 pub invariant_version: u8,
220 pub required_fields: &'static [&'static str],
222}
223
224#[derive(Debug, Clone, Serialize)]
231pub struct FidelityViolation {
232 pub missing_field: String,
234 pub detail: String,
236}
237
238#[derive(Debug, Clone)]
242pub struct CompactedPage {
243 pub body: Arc<str>,
245 pub tokens: u32,
247}
248
249pub trait PageInvariant: Send + Sync {
268 fn page_type(&self) -> PageType;
270
271 fn minimum_fidelity(&self, page: &TypedPage) -> FidelityContract;
273
274 fn verify(
281 &self,
282 original: &TypedPage,
283 compacted: &CompactedPage,
284 ) -> Result<(), Vec<FidelityViolation>>;
285}
286
287pub struct ToolOutputInvariant;
294
295impl PageInvariant for ToolOutputInvariant {
296 fn page_type(&self) -> PageType {
297 PageType::ToolOutput
298 }
299
300 fn minimum_fidelity(&self, _page: &TypedPage) -> FidelityContract {
301 FidelityContract {
302 fidelity_level: "structured_summary_v1",
303 invariant_version: 1,
304 required_fields: &["tool_name", "exit_status"],
305 }
306 }
307
308 fn verify(
309 &self,
310 original: &TypedPage,
311 compacted: &CompactedPage,
312 ) -> Result<(), Vec<FidelityViolation>> {
313 let body = compacted.body.as_ref();
314 if original.schema_hint == Some(SchemaHint::Binary) {
316 return Ok(());
317 }
318
319 let mut violations = Vec::new();
320
321 let tool_name = match &original.origin {
323 PageOrigin::ToolPair { tool_name } => tool_name.as_str(),
324 _ => "",
325 };
326 if !tool_name.is_empty() && !body.contains(tool_name) {
327 violations.push(FidelityViolation {
328 missing_field: "tool_name".into(),
329 detail: format!("compacted body does not reference tool '{tool_name}'"),
330 });
331 }
332
333 let has_status = body.contains("exit_status")
335 || body.contains("exit_code")
336 || body.contains("status:")
337 || body.contains("Status:")
338 || body.contains("exit:")
339 || body.contains("rc:");
340 if !has_status {
341 violations.push(FidelityViolation {
342 missing_field: "exit_status".into(),
343 detail: "compacted body does not contain an exit status indicator".into(),
344 });
345 }
346
347 if original.schema_hint == Some(SchemaHint::Json) {
351 let original_body = original.body.as_ref();
352 let preserved = check_json_structural_key(original_body, body);
353 if !preserved {
354 violations.push(FidelityViolation {
355 missing_field: "structural_key".into(),
356 detail: "compacted JSON tool output does not reference any top-level field \
357 name from the original output"
358 .into(),
359 });
360 }
361 }
362
363 if violations.is_empty() {
364 Ok(())
365 } else {
366 Err(violations)
367 }
368 }
369}
370
371fn check_json_structural_key(original: &str, compacted: &str) -> bool {
377 let Ok(value) = serde_json::from_str::<serde_json::Value>(original) else {
378 return true;
379 };
380 let Some(obj) = value.as_object() else {
381 return true;
382 };
383 if obj.is_empty() {
384 return true;
385 }
386 obj.keys().any(|k| compacted.contains(k.as_str()))
387}
388
389pub struct ConversationTurnInvariant;
393
394impl PageInvariant for ConversationTurnInvariant {
395 fn page_type(&self) -> PageType {
396 PageType::ConversationTurn
397 }
398
399 fn minimum_fidelity(&self, _page: &TypedPage) -> FidelityContract {
400 FidelityContract {
401 fidelity_level: "semantic_summary_v1",
402 invariant_version: 1,
403 required_fields: &["role"],
404 }
405 }
406
407 fn verify(
408 &self,
409 _original: &TypedPage,
410 compacted: &CompactedPage,
411 ) -> Result<(), Vec<FidelityViolation>> {
412 let body = compacted.body.as_ref();
413 let has_role =
414 body.contains("user") || body.contains("assistant") || body.contains("system");
415 if !has_role {
416 return Err(vec![FidelityViolation {
417 missing_field: "role".into(),
418 detail: "compacted turn does not identify a speaker role".into(),
419 }]);
420 }
421 Ok(())
422 }
423}
424
425pub struct MemoryExcerptInvariant;
429
430impl PageInvariant for MemoryExcerptInvariant {
431 fn page_type(&self) -> PageType {
432 PageType::MemoryExcerpt
433 }
434
435 fn minimum_fidelity(&self, _page: &TypedPage) -> FidelityContract {
436 FidelityContract {
437 fidelity_level: "excerpt_summary_v1",
438 invariant_version: 1,
439 required_fields: &["source_label"],
440 }
441 }
442
443 fn verify(
444 &self,
445 original: &TypedPage,
446 compacted: &CompactedPage,
447 ) -> Result<(), Vec<FidelityViolation>> {
448 let source_label = match &original.origin {
449 PageOrigin::Excerpt { source_label } => source_label.as_str(),
450 _ => return Ok(()),
451 };
452 if !compacted.body.contains(source_label) {
453 return Err(vec![FidelityViolation {
454 missing_field: "source_label".into(),
455 detail: format!("compacted excerpt does not contain source label '{source_label}'"),
456 }]);
457 }
458 Ok(())
459 }
460}
461
462pub struct SystemContextInvariant;
467
468pub const SYSTEM_POINTER_PREFIX: &str = "[system-ptr:";
470
471impl PageInvariant for SystemContextInvariant {
472 fn page_type(&self) -> PageType {
473 PageType::SystemContext
474 }
475
476 fn minimum_fidelity(&self, _page: &TypedPage) -> FidelityContract {
477 FidelityContract {
478 fidelity_level: "pointer_replace_v1",
479 invariant_version: 1,
480 required_fields: &["pointer"],
481 }
482 }
483
484 fn verify(
485 &self,
486 _original: &TypedPage,
487 compacted: &CompactedPage,
488 ) -> Result<(), Vec<FidelityViolation>> {
489 if !compacted.body.starts_with(SYSTEM_POINTER_PREFIX) {
490 return Err(vec![FidelityViolation {
491 missing_field: "pointer".into(),
492 detail: format!(
493 "SystemContext page was not pointer-replaced \
494 (body does not start with '{SYSTEM_POINTER_PREFIX}')"
495 ),
496 }]);
497 }
498 Ok(())
499 }
500}
501
502pub struct InvariantRegistry {
518 tool_output: Box<dyn PageInvariant>,
519 conversation_turn: Box<dyn PageInvariant>,
520 memory_excerpt: Box<dyn PageInvariant>,
521 system_context: Box<dyn PageInvariant>,
522}
523
524impl Default for InvariantRegistry {
525 fn default() -> Self {
526 Self {
527 tool_output: Box::new(ToolOutputInvariant),
528 conversation_turn: Box::new(ConversationTurnInvariant),
529 memory_excerpt: Box::new(MemoryExcerptInvariant),
530 system_context: Box::new(SystemContextInvariant),
531 }
532 }
533}
534
535impl InvariantRegistry {
536 #[must_use]
540 pub fn get(&self, page_type: PageType) -> Option<&dyn PageInvariant> {
541 match page_type {
542 PageType::ToolOutput => Some(self.tool_output.as_ref()),
543 PageType::ConversationTurn => Some(self.conversation_turn.as_ref()),
544 PageType::MemoryExcerpt => Some(self.memory_excerpt.as_ref()),
545 PageType::SystemContext => Some(self.system_context.as_ref()),
546 }
547 }
548
549 pub fn enforce(
560 &self,
561 original: &TypedPage,
562 compacted: &CompactedPage,
563 ) -> Result<(), Vec<FidelityViolation>> {
564 let _span = tracing::info_span!(
565 "context.compaction.typed_page",
566 page_type = %original.page_type,
567 page_id = %original.page_id.0,
568 original_tokens = original.tokens,
569 compacted_tokens = compacted.tokens,
570 )
571 .entered();
572
573 if let Some(inv) = self.get(original.page_type) {
574 inv.verify(original, compacted)
575 } else {
576 tracing::warn!(
577 page_type = %original.page_type,
578 "no invariant registered for page type — skipping verification"
579 );
580 Ok(())
581 }
582 }
583}
584
585#[must_use]
614pub fn classify(body: &str) -> PageType {
615 classify_with_role(body, false)
616}
617
618#[must_use]
638pub fn classify_with_role(body: &str, is_system_role: bool) -> PageType {
639 tracing::info_span!(
640 "context.compaction.typed_page.classify",
641 body_len = body.len()
642 )
643 .in_scope(|| classify_with_role_inner(body, is_system_role))
644}
645
646fn classify_with_role_inner(body: &str, is_system_role: bool) -> PageType {
647 const TOOL_PREFIXES: &[&str] = &["[tool_output]", "[tool:", "[Tool output]"];
649 const MEMORY_PREFIXES: &[&str] = &[
650 "[cross-session context]",
651 "[semantic recall]",
652 "[known facts]",
653 "[conversation summaries]",
654 "[past corrections]",
655 "## Relevant documents",
656 ];
657 const SYSTEM_PREFIXES: &[&str] = &[
658 "[Persona context]",
659 "[Past experience]",
660 "[Memory summary]",
661 "[system",
662 "[skill",
663 "[persona",
664 "[digest",
665 "[compression",
666 ];
667
668 let trimmed = body.trim_start();
669
670 for prefix in TOOL_PREFIXES {
671 if trimmed.starts_with(prefix) {
672 return PageType::ToolOutput;
673 }
674 }
675 for prefix in MEMORY_PREFIXES {
676 if trimmed.starts_with(prefix) {
677 return PageType::MemoryExcerpt;
678 }
679 }
680 for prefix in SYSTEM_PREFIXES {
681 if trimmed.starts_with(prefix) {
682 return PageType::SystemContext;
683 }
684 }
685
686 if is_system_role {
691 return PageType::SystemContext;
692 }
693
694 let mut prefix_end = body.len().min(80);
695 while !body.is_char_boundary(prefix_end) {
696 prefix_end -= 1;
697 }
698 tracing::warn!(
699 body_prefix = &body[..prefix_end],
700 "typed-page classification fallback to ConversationTurn"
701 );
702 PageType::ConversationTurn
703}
704
705#[must_use]
711pub fn detect_schema_hint(body: &str, is_binary: bool) -> SchemaHint {
712 if is_binary || body.contains('\u{FFFD}') {
713 return SchemaHint::Binary;
714 }
715 let trimmed = body.trim_start();
716 if trimmed.starts_with('{') || trimmed.starts_with('[') {
717 return SchemaHint::Json;
718 }
719 if trimmed.starts_with("--- ")
720 || trimmed.starts_with("+++ ")
721 || trimmed.starts_with("diff --git")
722 || trimmed.starts_with("diff -")
723 {
724 return SchemaHint::Diff;
725 }
726 let first_line = trimmed.lines().next().unwrap_or("");
728 if first_line.matches('\t').count() >= 2 || first_line.matches('|').count() >= 2 {
729 return SchemaHint::Table;
730 }
731 SchemaHint::Text
732}
733
734#[derive(Debug, Serialize)]
741pub struct CompactedPageRecord {
742 pub ts: String,
744 pub turn_id: String,
746 pub page_id: String,
748 pub page_type: PageType,
750 pub origin: PageOrigin,
752 pub original_tokens: u32,
754 pub compacted_tokens: u32,
756 pub fidelity_level: String,
758 pub invariant_version: u8,
760 pub provider_name: String,
762 pub violations: Vec<FidelityViolation>,
764 #[serde(default, skip_serializing_if = "std::ops::Not::not")]
766 pub classification_fallback: bool,
767}
768
769#[derive(Debug, Clone, Serialize)]
773pub struct BatchViolation {
774 pub assertion: String,
776 pub detail: String,
778}
779
780#[derive(Debug, Clone, Default)]
804pub struct BatchAssertions {
805 pub tool_names_in_batch: Vec<String>,
807 pub has_memory_excerpt: bool,
809 pub excerpt_labels: Vec<String>,
811}
812
813impl BatchAssertions {
814 #[must_use]
818 pub fn check(&self, summary: &str) -> Vec<BatchViolation> {
819 let mut violations = Vec::new();
820
821 if !self.tool_names_in_batch.is_empty() {
823 let any_tool_mentioned = self
824 .tool_names_in_batch
825 .iter()
826 .any(|name| !name.is_empty() && summary.contains(name.as_str()));
827 if !any_tool_mentioned {
828 violations.push(BatchViolation {
829 assertion: "tool_coverage".into(),
830 detail: format!(
831 "summary mentions none of the {} tool(s) in batch: {:?}",
832 self.tool_names_in_batch.len(),
833 self.tool_names_in_batch
834 ),
835 });
836 }
837 }
838
839 if self.has_memory_excerpt && !self.excerpt_labels.is_empty() {
841 let any_label_mentioned = self
842 .excerpt_labels
843 .iter()
844 .any(|label| !label.is_empty() && summary.contains(label.as_str()));
845 if !any_label_mentioned {
846 violations.push(BatchViolation {
847 assertion: "excerpt_label_coverage".into(),
848 detail: format!(
849 "summary mentions none of the memory excerpt labels: {:?}",
850 self.excerpt_labels
851 ),
852 });
853 }
854 }
855
856 violations
857 }
858}
859
860pub struct TypedPagesState {
867 pub registry: InvariantRegistry,
869 pub audit_sink: Option<CompactionAuditSink>,
871 pub is_active: bool,
874}
875
876enum AuditCommand {
880 Record(CompactedPageRecord),
882 Flush(tokio::sync::oneshot::Sender<()>),
884}
885
886#[derive(Debug, Clone)]
914pub struct CompactionAuditSink {
915 tx: tokio::sync::mpsc::Sender<AuditCommand>,
916 drop_counter: Arc<std::sync::atomic::AtomicU64>,
917}
918
919impl CompactionAuditSink {
920 #[tracing::instrument(name = "context.typed_page.open", skip_all)]
929 pub async fn open(path: &std::path::Path, capacity: usize) -> Result<Self, std::io::Error> {
930 use tokio::io::AsyncWriteExt as _;
931
932 if let Some(parent) = path.parent() {
933 tokio::fs::create_dir_all(parent).await?;
934 }
935 let file = tokio::fs::OpenOptions::new()
936 .create(true)
937 .append(true)
938 .open(path)
939 .await?;
940
941 let (tx, mut rx) = tokio::sync::mpsc::channel::<AuditCommand>(capacity.max(1));
942 let drop_counter = Arc::new(std::sync::atomic::AtomicU64::new(0));
943 let drop_counter_bg = drop_counter.clone();
944
945 tokio::spawn(async move {
946 let mut writer = tokio::io::BufWriter::new(file);
947 while let Some(cmd) = rx.recv().await {
948 match cmd {
949 AuditCommand::Record(record) => match serde_json::to_string(&record) {
950 Ok(mut line) => {
951 line.push('\n');
952 if let Err(e) = writer.write_all(line.as_bytes()).await {
953 tracing::error!("compaction audit write failed: {e:#}");
954 }
955 }
956 Err(e) => {
957 tracing::error!("compaction audit serialization failed: {e:#}");
958 }
959 },
960 AuditCommand::Flush(responder) => {
961 let _ = writer.flush().await;
962 let _ = responder.send(());
963 }
964 }
965 }
966 let _ = writer.flush().await;
968
969 let dropped = drop_counter_bg.load(std::sync::atomic::Ordering::Relaxed);
970 if dropped > 0 {
971 tracing::warn!(dropped, "compaction audit sink closed with dropped records");
972 }
973 });
974
975 Ok(Self { tx, drop_counter })
976 }
977
978 pub fn send(&self, record: CompactedPageRecord) {
982 match self.tx.try_send(AuditCommand::Record(record)) {
983 Ok(()) => {}
984 Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
985 let prev = self
986 .drop_counter
987 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
988 tracing::warn!(
989 dropped_total = prev + 1,
990 "compaction audit sink full — record dropped (best-effort audit)"
991 );
992 }
993 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
994 tracing::error!("compaction audit sink closed unexpectedly");
995 }
996 }
997 }
998
999 #[tracing::instrument(name = "context.typed_page.flush", skip_all)]
1005 pub async fn flush(&self) {
1006 let (tx, rx) = tokio::sync::oneshot::channel::<()>();
1007 if self.tx.send(AuditCommand::Flush(tx)).await.is_ok() {
1008 let _ = tokio::time::timeout(Duration::from_millis(100), rx).await;
1009 }
1010 }
1011
1012 #[must_use]
1014 pub fn dropped_count(&self) -> u64 {
1015 self.drop_counter.load(std::sync::atomic::Ordering::Relaxed)
1016 }
1017}
1018
1019#[cfg(test)]
1022mod tests {
1023 use super::*;
1024
1025 #[test]
1028 fn page_id_same_input_same_output() {
1029 let a = PageId::compute(PageType::ToolOutput, "tool:shell", b"exit_code: 0");
1030 let b = PageId::compute(PageType::ToolOutput, "tool:shell", b"exit_code: 0");
1031 assert_eq!(a, b);
1032 }
1033
1034 #[test]
1035 fn page_id_different_type_different_id() {
1036 let a = PageId::compute(PageType::ToolOutput, "tool:shell", b"body");
1037 let b = PageId::compute(PageType::ConversationTurn, "tool:shell", b"body");
1038 assert_ne!(a, b);
1039 }
1040
1041 #[test]
1042 fn page_id_starts_with_blake3_prefix() {
1043 let id = PageId::compute(PageType::SystemContext, "system:persona", b"some content");
1044 assert!(id.0.starts_with("blake3:"));
1045 }
1046
1047 #[test]
1050 fn classify_tool_output_prefix() {
1051 assert_eq!(
1052 classify("[tool_output] shell exit_code: 0"),
1053 PageType::ToolOutput
1054 );
1055 assert_eq!(classify("[tool:shell] result"), PageType::ToolOutput);
1056 }
1057
1058 #[test]
1059 fn classify_memory_prefixes() {
1060 assert_eq!(
1061 classify("[cross-session context]\nsome recall"),
1062 PageType::MemoryExcerpt
1063 );
1064 assert_eq!(
1065 classify("[semantic recall]\n- [user] hello"),
1066 PageType::MemoryExcerpt
1067 );
1068 assert_eq!(classify("[known facts]\n- fact"), PageType::MemoryExcerpt);
1069 assert_eq!(
1070 classify("[conversation summaries]\n- 1-10: summary"),
1071 PageType::MemoryExcerpt
1072 );
1073 }
1074
1075 #[test]
1076 fn classify_system_prefixes() {
1077 assert_eq!(classify("[Persona context]\nfact"), PageType::SystemContext);
1078 assert_eq!(classify("[system prompt]"), PageType::SystemContext);
1079 }
1080
1081 #[test]
1082 fn classify_fallback_is_conversation_turn() {
1083 assert_eq!(classify("Hello, world!"), PageType::ConversationTurn);
1084 assert_eq!(classify(""), PageType::ConversationTurn);
1085 }
1086
1087 #[test]
1090 fn detect_schema_hint_json() {
1091 assert_eq!(
1092 detect_schema_hint(r#"{"key": "val"}"#, false),
1093 SchemaHint::Json
1094 );
1095 assert_eq!(detect_schema_hint("[1,2,3]", false), SchemaHint::Json);
1096 }
1097
1098 #[test]
1099 fn detect_schema_hint_diff() {
1100 assert_eq!(detect_schema_hint("--- a\n+++ b", false), SchemaHint::Diff);
1101 }
1102
1103 #[test]
1104 fn detect_schema_hint_binary() {
1105 assert_eq!(detect_schema_hint("anything", true), SchemaHint::Binary);
1106 }
1107
1108 #[test]
1109 fn detect_schema_hint_text_fallback() {
1110 assert_eq!(detect_schema_hint("plain text", false), SchemaHint::Text);
1111 }
1112
1113 #[test]
1116 fn tool_output_invariant_passes_when_fields_present() {
1117 let inv = ToolOutputInvariant;
1118 let page = TypedPage::new(
1119 PageType::ToolOutput,
1120 PageOrigin::ToolPair {
1121 tool_name: "shell".into(),
1122 },
1123 100,
1124 Arc::from("[tool_output] shell exit_code: 0\nsome output"),
1125 Some(SchemaHint::Text),
1126 );
1127 let compacted = CompactedPage {
1128 body: Arc::from("shell exit_status: 0\nkey: value"),
1129 tokens: 10,
1130 };
1131 assert!(inv.verify(&page, &compacted).is_ok());
1132 }
1133
1134 #[test]
1135 fn tool_output_invariant_fails_missing_tool_name() {
1136 let inv = ToolOutputInvariant;
1137 let page = TypedPage::new(
1138 PageType::ToolOutput,
1139 PageOrigin::ToolPair {
1140 tool_name: "my_tool".into(),
1141 },
1142 100,
1143 Arc::from("[tool_output] my_tool exit_code: 0"),
1144 Some(SchemaHint::Text),
1145 );
1146 let compacted = CompactedPage {
1147 body: Arc::from("exit_status: 0"),
1148 tokens: 5,
1149 };
1150 let result = inv.verify(&page, &compacted);
1151 assert!(result.is_err());
1152 let violations = result.unwrap_err();
1153 assert!(violations.iter().any(|v| v.missing_field == "tool_name"));
1154 }
1155
1156 #[test]
1157 fn tool_output_invariant_passes_for_binary() {
1158 let inv = ToolOutputInvariant;
1159 let page = TypedPage::new(
1160 PageType::ToolOutput,
1161 PageOrigin::ToolPair {
1162 tool_name: "binary_tool".into(),
1163 },
1164 100,
1165 Arc::from("<binary:1024 bytes>"),
1166 Some(SchemaHint::Binary),
1167 );
1168 let compacted = CompactedPage {
1169 body: Arc::from("<binary:1024 bytes> (archived)"),
1170 tokens: 5,
1171 };
1172 assert!(inv.verify(&page, &compacted).is_ok());
1173 }
1174
1175 #[test]
1178 fn system_context_invariant_passes_with_pointer() {
1179 let inv = SystemContextInvariant;
1180 let page = TypedPage::new(
1181 PageType::SystemContext,
1182 PageOrigin::System {
1183 key: "persona".into(),
1184 },
1185 200,
1186 Arc::from("[Persona context]\nsome persona info"),
1187 None,
1188 );
1189 let compacted = CompactedPage {
1190 body: Arc::from("[system-ptr:blake3:abcdef123456]"),
1191 tokens: 3,
1192 };
1193 assert!(inv.verify(&page, &compacted).is_ok());
1194 }
1195
1196 #[test]
1197 fn system_context_invariant_fails_without_pointer() {
1198 let inv = SystemContextInvariant;
1199 let page = TypedPage::new(
1200 PageType::SystemContext,
1201 PageOrigin::System {
1202 key: "persona".into(),
1203 },
1204 200,
1205 Arc::from("[Persona context]\nsome persona info"),
1206 None,
1207 );
1208 let compacted = CompactedPage {
1209 body: Arc::from("This is a paraphrase of persona info"),
1210 tokens: 10,
1211 };
1212 let result = inv.verify(&page, &compacted);
1213 assert!(result.is_err());
1214 let violations = result.unwrap_err();
1215 assert!(violations.iter().any(|v| v.missing_field == "pointer"));
1216 }
1217
1218 #[test]
1221 fn registry_covers_all_page_types() {
1222 let reg = InvariantRegistry::default();
1223 for pt in [
1224 PageType::ToolOutput,
1225 PageType::ConversationTurn,
1226 PageType::MemoryExcerpt,
1227 PageType::SystemContext,
1228 ] {
1229 assert!(reg.get(pt).is_some(), "missing invariant for {pt:?}");
1230 }
1231 }
1232
1233 #[test]
1234 fn registry_returns_correct_page_type() {
1235 let reg = InvariantRegistry::default();
1236 assert_eq!(
1237 reg.get(PageType::ToolOutput).unwrap().page_type(),
1238 PageType::ToolOutput
1239 );
1240 assert_eq!(
1241 reg.get(PageType::SystemContext).unwrap().page_type(),
1242 PageType::SystemContext
1243 );
1244 }
1245
1246 #[test]
1249 fn enforce_ok_for_valid_system_pointer() {
1250 let reg = InvariantRegistry::default();
1251 let page = TypedPage::new(
1252 PageType::SystemContext,
1253 PageOrigin::System {
1254 key: "persona".into(),
1255 },
1256 50,
1257 Arc::from("[Persona context]\nrules"),
1258 None,
1259 );
1260 let compacted = CompactedPage {
1261 body: Arc::from("[system-ptr:blake3:aabbccdd11223344]"),
1262 tokens: 3,
1263 };
1264 assert!(reg.enforce(&page, &compacted).is_ok());
1265 }
1266
1267 #[test]
1268 fn enforce_err_for_paraphrased_system_context() {
1269 let reg = InvariantRegistry::default();
1270 let page = TypedPage::new(
1271 PageType::SystemContext,
1272 PageOrigin::System {
1273 key: "persona".into(),
1274 },
1275 50,
1276 Arc::from("[Persona context]\nrules"),
1277 None,
1278 );
1279 let compacted = CompactedPage {
1280 body: Arc::from("The persona says to be helpful."),
1281 tokens: 7,
1282 };
1283 let result = reg.enforce(&page, &compacted);
1284 assert!(result.is_err());
1285 assert!(
1286 result
1287 .unwrap_err()
1288 .iter()
1289 .any(|v| v.missing_field == "pointer")
1290 );
1291 }
1292
1293 #[test]
1294 fn enforce_ok_for_conversation_turn_with_role() {
1295 let reg = InvariantRegistry::default();
1296 let page = TypedPage::new(
1297 PageType::ConversationTurn,
1298 PageOrigin::Turn {
1299 message_id: "42".into(),
1300 },
1301 30,
1302 Arc::from("Hello from user"),
1303 None,
1304 );
1305 let compacted = CompactedPage {
1306 body: Arc::from("user asked about Rust"),
1307 tokens: 5,
1308 };
1309 assert!(reg.enforce(&page, &compacted).is_ok());
1310 }
1311
1312 #[test]
1315 fn memory_excerpt_invariant_passes_when_label_present() {
1316 let inv = MemoryExcerptInvariant;
1317 let label = "semantic_recall";
1318 let page = TypedPage::new(
1319 PageType::MemoryExcerpt,
1320 PageOrigin::Excerpt {
1321 source_label: label.into(),
1322 },
1323 80,
1324 Arc::from("[semantic recall]\n- [user] hello"),
1325 None,
1326 );
1327 let compacted = CompactedPage {
1328 body: Arc::from(format!("Summary from {label}: user greeted")),
1329 tokens: 6,
1330 };
1331 assert!(inv.verify(&page, &compacted).is_ok());
1332 }
1333
1334 #[test]
1335 fn memory_excerpt_invariant_fails_when_label_missing() {
1336 let inv = MemoryExcerptInvariant;
1337 let page = TypedPage::new(
1338 PageType::MemoryExcerpt,
1339 PageOrigin::Excerpt {
1340 source_label: "graph_facts".into(),
1341 },
1342 80,
1343 Arc::from("[known facts]\n- Alice works at Acme"),
1344 None,
1345 );
1346 let compacted = CompactedPage {
1347 body: Arc::from("Alice is employed somewhere"),
1348 tokens: 5,
1349 };
1350 let result = inv.verify(&page, &compacted);
1351 assert!(result.is_err());
1352 assert!(
1353 result
1354 .unwrap_err()
1355 .iter()
1356 .any(|v| v.missing_field == "source_label")
1357 );
1358 }
1359
1360 #[test]
1361 fn memory_excerpt_invariant_passes_for_non_excerpt_origin() {
1362 let inv = MemoryExcerptInvariant;
1363 let page = TypedPage::new(
1364 PageType::MemoryExcerpt,
1365 PageOrigin::System {
1366 key: "digests".into(),
1367 },
1368 40,
1369 Arc::from("[system]"),
1370 None,
1371 );
1372 let compacted = CompactedPage {
1373 body: Arc::from("anything"),
1374 tokens: 1,
1375 };
1376 assert!(inv.verify(&page, &compacted).is_ok());
1377 }
1378
1379 #[test]
1382 fn conversation_turn_invariant_passes_with_role_word() {
1383 let inv = ConversationTurnInvariant;
1384 let page = TypedPage::new(
1385 PageType::ConversationTurn,
1386 PageOrigin::Turn {
1387 message_id: "1".into(),
1388 },
1389 20,
1390 Arc::from("Hello world"),
1391 None,
1392 );
1393 for body in &["user: hi", "assistant replied", "system note"] {
1394 let compacted = CompactedPage {
1395 body: Arc::from(*body),
1396 tokens: 2,
1397 };
1398 assert!(inv.verify(&page, &compacted).is_ok(), "body={body}");
1399 }
1400 }
1401
1402 #[test]
1403 fn conversation_turn_invariant_fails_without_role_word() {
1404 let inv = ConversationTurnInvariant;
1405 let page = TypedPage::new(
1406 PageType::ConversationTurn,
1407 PageOrigin::Turn {
1408 message_id: "2".into(),
1409 },
1410 20,
1411 Arc::from("some turn content"),
1412 None,
1413 );
1414 let compacted = CompactedPage {
1415 body: Arc::from("content was summarized"),
1416 tokens: 3,
1417 };
1418 let result = inv.verify(&page, &compacted);
1419 assert!(result.is_err());
1420 assert!(
1421 result
1422 .unwrap_err()
1423 .iter()
1424 .any(|v| v.missing_field == "role")
1425 );
1426 }
1427
1428 #[tokio::test]
1431 async fn audit_sink_jsonl_roundtrip() {
1432 let dir = tempfile::tempdir().unwrap();
1433 let path = dir.path().join("audit.jsonl");
1434
1435 let sink = CompactionAuditSink::open(&path, 64).await.unwrap();
1436 let record = CompactedPageRecord {
1437 ts: "2026-04-19T00:00:00Z".into(),
1438 turn_id: "1".into(),
1439 page_id: "blake3:aabbccdd".into(),
1440 page_type: PageType::ToolOutput,
1441 origin: PageOrigin::ToolPair {
1442 tool_name: "shell".into(),
1443 },
1444 original_tokens: 100,
1445 compacted_tokens: 20,
1446 fidelity_level: "structured_summary_v1".into(),
1447 invariant_version: 1,
1448 provider_name: "test".into(),
1449 violations: vec![],
1450 classification_fallback: false,
1451 };
1452 sink.send(record);
1453
1454 drop(sink);
1456 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1458
1459 let contents = std::fs::read_to_string(&path).unwrap();
1460 assert!(!contents.is_empty(), "audit file should not be empty");
1461 let parsed: serde_json::Value = serde_json::from_str(contents.trim()).unwrap();
1462 assert_eq!(parsed["page_type"], "tool_output");
1463 assert_eq!(parsed["turn_id"], "1");
1464 assert_eq!(parsed["provider_name"], "test");
1465 }
1466
1467 #[tokio::test]
1468 async fn audit_sink_drop_counter_increments_when_full() {
1469 let dir = tempfile::tempdir().unwrap();
1470 let path = dir.path().join("audit_full.jsonl");
1471
1472 let sink = CompactionAuditSink::open(&path, 1).await.unwrap();
1474
1475 let make_record = || CompactedPageRecord {
1476 ts: "2026-04-19T00:00:00Z".into(),
1477 turn_id: "x".into(),
1478 page_id: "blake3:00".into(),
1479 page_type: PageType::ConversationTurn,
1480 origin: PageOrigin::Turn {
1481 message_id: "0".into(),
1482 },
1483 original_tokens: 10,
1484 compacted_tokens: 5,
1485 fidelity_level: "semantic_summary_v1".into(),
1486 invariant_version: 1,
1487 provider_name: "test".into(),
1488 violations: vec![],
1489 classification_fallback: false,
1490 };
1491
1492 for _ in 0..10 {
1494 sink.send(make_record());
1495 }
1496
1497 assert!(
1498 sink.dropped_count() > 0,
1499 "expected at least one dropped record"
1500 );
1501 }
1502
1503 #[tokio::test]
1504 async fn audit_sink_flush_does_not_panic() {
1505 let dir = tempfile::tempdir().unwrap();
1506 let path = dir.path().join("audit_flush.jsonl");
1507 let sink = CompactionAuditSink::open(&path, 16).await.unwrap();
1508 sink.flush().await;
1510 }
1511
1512 #[test]
1515 fn classify_with_role_system_flag_overrides_fallback() {
1516 assert_eq!(
1517 classify_with_role("You are a helpful assistant.", true),
1518 PageType::SystemContext
1519 );
1520 }
1521
1522 #[test]
1523 fn classify_with_role_prefix_wins_over_system_flag() {
1524 assert_eq!(
1525 classify_with_role("[tool_output] exit_code: 0", false),
1526 PageType::ToolOutput
1527 );
1528 }
1529
1530 #[test]
1531 fn classify_with_role_false_still_falls_back_to_conversation_turn() {
1532 assert_eq!(
1533 classify_with_role("random prose without markers", false),
1534 PageType::ConversationTurn
1535 );
1536 }
1537
1538 #[test]
1541 fn tool_output_json_structural_check_passes_when_key_preserved() {
1542 let inv = ToolOutputInvariant;
1543 let original_body = r#"{"exit_code": 0, "stdout": "ok"}"#;
1544 let page = TypedPage::new(
1545 PageType::ToolOutput,
1546 PageOrigin::ToolPair {
1547 tool_name: "shell".into(),
1548 },
1549 50,
1550 Arc::from(original_body),
1551 Some(SchemaHint::Json),
1552 );
1553 let compacted = CompactedPage {
1555 body: Arc::from("shell exit_code: 0, stdout was ok"),
1556 tokens: 8,
1557 };
1558 assert!(inv.verify(&page, &compacted).is_ok());
1559 }
1560
1561 #[test]
1562 fn tool_output_json_structural_check_fails_when_no_key_preserved() {
1563 let inv = ToolOutputInvariant;
1564 let original_body = r#"{"some_field": "value", "other_field": 42}"#;
1565 let page = TypedPage::new(
1566 PageType::ToolOutput,
1567 PageOrigin::ToolPair {
1568 tool_name: "my_tool".into(),
1569 },
1570 50,
1571 Arc::from(original_body),
1572 Some(SchemaHint::Json),
1573 );
1574 let compacted = CompactedPage {
1576 body: Arc::from("my_tool exit_status: 0 completed successfully"),
1577 tokens: 7,
1578 };
1579 let result = inv.verify(&page, &compacted);
1580 assert!(result.is_err());
1581 let violations = result.unwrap_err();
1582 assert!(
1583 violations
1584 .iter()
1585 .any(|v| v.missing_field == "structural_key")
1586 );
1587 }
1588
1589 #[tokio::test]
1592 async fn audit_sink_capacity_zero_does_not_panic() {
1593 let dir = tempfile::tempdir().unwrap();
1594 let path = dir.path().join("cap0.jsonl");
1595 let sink = CompactionAuditSink::open(&path, 0).await.unwrap();
1597 sink.flush().await;
1598 }
1599
1600 #[test]
1603 fn classify_with_role_non_ascii_body_does_not_panic() {
1604 let cjk = "你好世界".repeat(20); let emoji = "🦀".repeat(30); let mixed = "abc🦀中文".repeat(15);
1609
1610 let _ = classify_with_role(&cjk, false);
1612 let _ = classify_with_role(&emoji, false);
1613 let _ = classify_with_role(&mixed, false);
1614 }
1615}