Skip to main content

sqz_engine/
pipeline.rs

1use crate::ansi_strip::AnsiStripper;
2use crate::error::{Result, SqzError};
3use crate::preset::Preset;
4use crate::prompt_cache::PromptCacheDetector;
5use crate::stages::{
6    CollapseArraysStage, CondenseStage, CustomTransformsStage, FlattenStage, GitDiffFoldStage,
7    KeepFieldsStage, StripFieldsStage, StripNullsStage, TruncateStringsStage,
8};
9use crate::toon::ToonEncoder;
10use crate::types::{CompressedContent, Content, ContentType, StageConfig};
11
12/// Minimal session context passed to the pipeline.
13pub struct SessionContext {
14    pub session_id: String,
15}
16
17/// The 8-stage compression pipeline orchestrator.
18pub struct CompressionPipeline {
19    stages: Vec<Box<dyn crate::stages::CompressionStage>>,
20    toon_encoder: ToonEncoder,
21    #[allow(dead_code)]
22    prompt_cache_detector: PromptCacheDetector,
23}
24
25impl CompressionPipeline {
26    /// Construct the pipeline from a preset, creating all 8 built-in stages
27    /// sorted by priority.
28    pub fn new(_preset: &Preset) -> Self {
29        let mut stages: Vec<Box<dyn crate::stages::CompressionStage>> = vec![
30            Box::new(AnsiStripper),
31            Box::new(KeepFieldsStage),
32            Box::new(StripFieldsStage),
33            Box::new(CondenseStage),
34            Box::new(GitDiffFoldStage),
35            Box::new(StripNullsStage),
36            Box::new(FlattenStage),
37            Box::new(TruncateStringsStage),
38            Box::new(CollapseArraysStage),
39            Box::new(CustomTransformsStage),
40        ];
41        stages.sort_by_key(|s| s.priority());
42
43        Self {
44            stages,
45            toon_encoder: ToonEncoder,
46            prompt_cache_detector: PromptCacheDetector,
47        }
48    }
49
50    /// Run content through all enabled stages then apply TOON encoding if the
51    /// result is JSON.
52    pub fn compress(
53        &self,
54        input: &str,
55        _ctx: &SessionContext,
56        preset: &Preset,
57    ) -> Result<CompressedContent> {
58        let tokens_original = (input.chars().count() as u32).saturating_add(3) / 4;
59
60        let mut content = Content {
61            raw: input.to_owned(),
62            content_type: ContentType::PlainText,
63            metadata: crate::types::ContentMetadata {
64                source: None,
65                path: None,
66                language: None,
67            },
68            tokens_original,
69        };
70
71        let mut stages_applied: Vec<String> = Vec::new();
72
73        for stage in &self.stages {
74            let config = stage_config_from_preset(stage.name(), preset);
75            if config.enabled {
76                stage.process(&mut content, &config)?;
77                stages_applied.push(stage.name().to_owned());
78            }
79        }
80
81        // Apply TOON encoding if the result is JSON
82        let data = if ToonEncoder::is_json(&content.raw) {
83            let json: serde_json::Value = serde_json::from_str(&content.raw)
84                .map_err(|e| SqzError::Other(format!("pipeline: JSON parse error: {e}")))?;
85            let encoded = self.toon_encoder.encode(&json)?;
86            stages_applied.push("toon_encode".to_owned());
87            encoded
88        } else {
89            content.raw
90        };
91
92        let tokens_compressed = (data.chars().count() as u32).saturating_add(3) / 4;
93        let compression_ratio = if tokens_original == 0 {
94            1.0
95        } else {
96            tokens_compressed as f64 / tokens_original as f64
97        };
98
99        Ok(CompressedContent {
100            data,
101            tokens_compressed,
102            tokens_original,
103            stages_applied,
104            compression_ratio,
105            provenance: crate::types::Provenance::default(),
106            verify: None,
107        })
108    }
109
110    /// Insert a plugin stage and re-sort by priority.
111    pub fn insert_stage(&mut self, stage: Box<dyn crate::stages::CompressionStage>) {
112        self.stages.push(stage);
113        self.stages.sort_by_key(|s| s.priority());
114    }
115
116    /// Rebuild stage list from a new preset (hot-reload support).
117    /// Built-in stages are recreated; plugin stages are dropped and must be
118    /// re-inserted by the caller.
119    pub fn reload_preset(&mut self, _preset: &Preset) -> Result<()> {
120        let mut stages: Vec<Box<dyn crate::stages::CompressionStage>> = vec![
121            Box::new(AnsiStripper),
122            Box::new(KeepFieldsStage),
123            Box::new(StripFieldsStage),
124            Box::new(CondenseStage),
125            Box::new(GitDiffFoldStage),
126            Box::new(StripNullsStage),
127            Box::new(FlattenStage),
128            Box::new(TruncateStringsStage),
129            Box::new(CollapseArraysStage),
130            Box::new(CustomTransformsStage),
131        ];
132        stages.sort_by_key(|s| s.priority());
133        self.stages = stages;
134        Ok(())
135    }
136}
137
138/// Build a `StageConfig` for a named stage from the preset's compression config.
139fn stage_config_from_preset(name: &str, preset: &Preset) -> StageConfig {
140    let c = &preset.compression;
141    match name {
142        "ansi_strip" => StageConfig {
143            enabled: true,
144            options: serde_json::Value::Object(Default::default()),
145        },
146        "keep_fields" => {
147            if let Some(cfg) = &c.keep_fields {
148                StageConfig {
149                    enabled: cfg.enabled,
150                    options: serde_json::json!({ "fields": cfg.fields }),
151                }
152            } else {
153                StageConfig::default()
154            }
155        }
156        "strip_fields" => {
157            if let Some(cfg) = &c.strip_fields {
158                StageConfig {
159                    enabled: cfg.enabled,
160                    options: serde_json::json!({ "fields": cfg.fields }),
161                }
162            } else {
163                StageConfig::default()
164            }
165        }
166        "condense" => {
167            if let Some(cfg) = &c.condense {
168                StageConfig {
169                    enabled: cfg.enabled,
170                    options: serde_json::json!({
171                        "max_repeated_lines": cfg.max_repeated_lines
172                    }),
173                }
174            } else {
175                StageConfig::default()
176            }
177        }
178        "git_diff_fold" => {
179            if let Some(cfg) = &c.git_diff_fold {
180                StageConfig {
181                    enabled: cfg.enabled,
182                    options: serde_json::json!({
183                        "max_context_lines": cfg.max_context_lines
184                    }),
185                }
186            } else {
187                // Default: enabled with 2 context lines
188                StageConfig {
189                    enabled: true,
190                    options: serde_json::json!({ "max_context_lines": 2 }),
191                }
192            }
193        }
194        "strip_nulls" => {
195            if let Some(cfg) = &c.strip_nulls {
196                StageConfig {
197                    enabled: cfg.enabled,
198                    options: serde_json::Value::Object(Default::default()),
199                }
200            } else {
201                StageConfig::default()
202            }
203        }
204        "flatten" => {
205            if let Some(cfg) = &c.flatten {
206                StageConfig {
207                    enabled: cfg.enabled,
208                    options: serde_json::json!({ "max_depth": cfg.max_depth }),
209                }
210            } else {
211                StageConfig::default()
212            }
213        }
214        "truncate_strings" => {
215            if let Some(cfg) = &c.truncate_strings {
216                StageConfig {
217                    enabled: cfg.enabled,
218                    options: serde_json::json!({ "max_length": cfg.max_length }),
219                }
220            } else {
221                StageConfig::default()
222            }
223        }
224        "collapse_arrays" => {
225            if let Some(cfg) = &c.collapse_arrays {
226                StageConfig {
227                    enabled: cfg.enabled,
228                    options: serde_json::json!({
229                        "max_items": cfg.max_items,
230                        "summary_template": cfg.summary_template
231                    }),
232                }
233            } else {
234                StageConfig::default()
235            }
236        }
237        "custom_transforms" => {
238            if let Some(cfg) = &c.custom_transforms {
239                StageConfig {
240                    enabled: cfg.enabled,
241                    options: serde_json::Value::Object(Default::default()),
242                }
243            } else {
244                StageConfig::default()
245            }
246        }
247        _ => StageConfig::default(),
248    }
249}
250
251// ---------------------------------------------------------------------------
252// Tests
253// ---------------------------------------------------------------------------
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258    use crate::preset::{
259        BudgetConfig, CollapseArraysConfig, CompressionConfig, CondenseConfig,
260        CustomTransformsConfig, ModelConfig, PresetMeta,
261        StripNullsConfig, ToolSelectionConfig, TruncateStringsConfig,
262        TerseModeConfig,
263    };
264
265    fn default_preset() -> Preset {
266        Preset {
267            preset: PresetMeta {
268                name: "test".into(),
269                version: "1.0".into(),
270                description: String::new(),
271            },
272            compression: CompressionConfig {
273                stages: vec![],
274                keep_fields: None,
275                strip_fields: None,
276                condense: Some(CondenseConfig {
277                    enabled: true,
278                    max_repeated_lines: 3,
279                }),
280                git_diff_fold: None,
281                strip_nulls: Some(StripNullsConfig { enabled: true }),
282                flatten: None,
283                truncate_strings: Some(TruncateStringsConfig {
284                    enabled: true,
285                    max_length: 500,
286                }),
287                collapse_arrays: Some(CollapseArraysConfig {
288                    enabled: true,
289                    max_items: 5,
290                    summary_template: "... and {remaining} more items".into(),
291                }),
292                custom_transforms: Some(CustomTransformsConfig { enabled: true }),
293            },
294            tool_selection: ToolSelectionConfig {
295                max_tools: 5,
296                similarity_threshold: 0.7,
297                default_tools: vec![],
298            },
299            budget: BudgetConfig {
300                warning_threshold: 0.70,
301                ceiling_threshold: 0.85,
302                default_window_size: 200_000,
303                agents: Default::default(),
304            },
305            terse_mode: TerseModeConfig {
306                enabled: false,
307                level: crate::preset::TerseLevel::Moderate,
308            },
309            model: ModelConfig {
310                family: "anthropic".into(),
311                primary: "claude-sonnet-4-20250514".into(),
312                local: String::new(),
313                complexity_threshold: 0.4,
314                pricing: None,
315            },
316        }
317    }
318
319    fn ctx() -> SessionContext {
320        SessionContext {
321            session_id: "test-session".into(),
322        }
323    }
324
325    #[test]
326    fn new_creates_pipeline_with_sorted_stages() {
327        let preset = default_preset();
328        let pipeline = CompressionPipeline::new(&preset);
329        // Verify stages are sorted by priority
330        let priorities: Vec<u32> = pipeline.stages.iter().map(|s| s.priority()).collect();
331        let mut sorted = priorities.clone();
332        sorted.sort();
333        assert_eq!(priorities, sorted);
334    }
335
336    #[test]
337    fn compress_plain_text_passthrough() {
338        let preset = default_preset();
339        let pipeline = CompressionPipeline::new(&preset);
340        let result = pipeline.compress("hello world", &ctx(), &preset).unwrap();
341        assert_eq!(result.data, "hello world");
342        assert!(!result.stages_applied.contains(&"toon_encode".to_owned()));
343    }
344
345    #[test]
346    fn compress_json_applies_toon() {
347        let preset = default_preset();
348        let pipeline = CompressionPipeline::new(&preset);
349        let json = r#"{"name":"Alice","age":30}"#;
350        let result = pipeline.compress(json, &ctx(), &preset).unwrap();
351        assert!(result.data.starts_with("TOON:"), "data: {}", result.data);
352        assert!(result.stages_applied.contains(&"toon_encode".to_owned()));
353    }
354
355    #[test]
356    fn compress_strips_nulls_from_json() {
357        let preset = default_preset();
358        let pipeline = CompressionPipeline::new(&preset);
359        let json = r#"{"a":1,"b":null}"#;
360        let result = pipeline.compress(json, &ctx(), &preset).unwrap();
361        // After strip_nulls, "b" is gone; TOON encodes the result
362        assert!(result.data.starts_with("TOON:"));
363        // Decode and verify null is gone
364        let decoded = ToonEncoder.decode(&result.data).unwrap();
365        assert!(decoded.get("b").is_none());
366        assert_eq!(decoded["a"], serde_json::json!(1));
367    }
368
369    #[test]
370    fn compress_returns_token_counts() {
371        let preset = default_preset();
372        let pipeline = CompressionPipeline::new(&preset);
373        let input = "a".repeat(100);
374        let result = pipeline.compress(&input, &ctx(), &preset).unwrap();
375        assert!(result.tokens_original > 0);
376        assert!(result.tokens_compressed > 0);
377    }
378
379    #[test]
380    fn compress_ratio_is_reasonable() {
381        let preset = default_preset();
382        let pipeline = CompressionPipeline::new(&preset);
383        let result = pipeline.compress("hello", &ctx(), &preset).unwrap();
384        assert!(result.compression_ratio > 0.0);
385    }
386
387    #[test]
388    fn insert_stage_re_sorts_by_priority() {
389        use crate::stages::CompressionStage;
390        use crate::types::StageConfig;
391
392        struct LowPriorityStage;
393        impl CompressionStage for LowPriorityStage {
394            fn name(&self) -> &str {
395                "low_priority"
396            }
397            fn priority(&self) -> u32 {
398                5 // lower than all built-in stages
399            }
400            fn process(
401                &self,
402                _content: &mut Content,
403                _config: &StageConfig,
404            ) -> crate::error::Result<()> {
405                Ok(())
406            }
407        }
408
409        let preset = default_preset();
410        let mut pipeline = CompressionPipeline::new(&preset);
411        pipeline.insert_stage(Box::new(LowPriorityStage));
412
413        let priorities: Vec<u32> = pipeline.stages.iter().map(|s| s.priority()).collect();
414        let mut sorted = priorities.clone();
415        sorted.sort();
416        assert_eq!(priorities, sorted);
417        assert_eq!(pipeline.stages[0].name(), "ansi_strip");
418        assert_eq!(pipeline.stages[1].name(), "low_priority");
419    }
420
421    #[test]
422    fn reload_preset_rebuilds_stages() {
423        let preset = default_preset();
424        let mut pipeline = CompressionPipeline::new(&preset);
425        let original_count = pipeline.stages.len();
426        pipeline.reload_preset(&preset).unwrap();
427        assert_eq!(pipeline.stages.len(), original_count);
428    }
429
430    #[test]
431    fn compress_keep_fields_filters_json() {
432        use crate::preset::KeepFieldsConfig;
433        let mut preset = default_preset();
434        preset.compression.keep_fields = Some(KeepFieldsConfig {
435            enabled: true,
436            fields: vec!["id".into(), "name".into()],
437        });
438        let pipeline = CompressionPipeline::new(&preset);
439        let json = r#"{"id":1,"name":"Bob","debug":"x"}"#;
440        let result = pipeline.compress(json, &ctx(), &preset).unwrap();
441        let decoded = ToonEncoder.decode(&result.data).unwrap();
442        assert!(decoded.get("debug").is_none());
443        assert_eq!(decoded["id"], serde_json::json!(1));
444    }
445
446    #[test]
447    fn compress_empty_string() {
448        let preset = default_preset();
449        let pipeline = CompressionPipeline::new(&preset);
450        let result = pipeline.compress("", &ctx(), &preset).unwrap();
451        assert_eq!(result.data, "");
452        assert_eq!(result.tokens_original, 0);
453    }
454
455    #[test]
456    fn stage_config_from_preset_unknown_stage() {
457        let preset = default_preset();
458        let config = stage_config_from_preset("nonexistent", &preset);
459        assert!(!config.enabled);
460    }
461
462    // ---------------------------------------------------------------------------
463    // Property tests
464    // ---------------------------------------------------------------------------
465
466    use proptest::prelude::*;
467
468    /// Generate a significant line from a fixed set of meaningful tokens.
469    fn significant_line_strategy() -> impl Strategy<Value = String> {
470        prop_oneof![
471            Just("error: connection refused".to_owned()),
472            Just("warning: deprecated API usage".to_owned()),
473            Just("failed: build step exited with code 1".to_owned()),
474            Just("success: deployment complete".to_owned()),
475            Just("status: all checks passed".to_owned()),
476            Just("error: file not found".to_owned()),
477            Just("warning: unused variable detected".to_owned()),
478        ]
479    }
480
481    /// Generate a noise line (repeated decorative content).
482    fn noise_line_strategy() -> impl Strategy<Value = String> {
483        prop_oneof![
484            Just("---".to_owned()),
485            Just("Loading...".to_owned()),
486            Just("================".to_owned()),
487            Just("...".to_owned()),
488        ]
489    }
490
491    /// Recursive strategy that generates arbitrary serde_json::Value instances.
492    /// Mirrors the strategy in toon.rs tests.
493    fn arb_json_value() -> impl Strategy<Value = serde_json::Value> {
494        let leaf = prop_oneof![
495            Just(serde_json::Value::Null),
496            any::<bool>().prop_map(serde_json::Value::Bool),
497            any::<i64>().prop_map(|n| serde_json::json!(n)),
498            any::<f64>()
499                .prop_filter("must be finite", |f| f.is_finite())
500                .prop_map(|f| serde_json::json!(f)),
501            ".*".prop_map(serde_json::Value::String),
502        ];
503
504        leaf.prop_recursive(4, 64, 8, |inner| {
505            prop_oneof![
506                prop::collection::vec(inner.clone(), 0..8)
507                    .prop_map(serde_json::Value::Array),
508                prop::collection::hash_map(".*", inner, 0..8).prop_map(|m| {
509                    serde_json::Value::Object(m.into_iter().collect())
510                }),
511            ]
512        })
513    }
514
515    proptest! {
516        /// **Validates: Requirements 17.1, 17.2, 13.2**
517        ///
518        /// Property 22: ASCII-safe output.
519        ///
520        /// For any JSON input, the Compression_Pipeline SHALL produce output
521        /// using only ASCII-safe characters: printable ASCII (0x20–0x7E) plus
522        /// standard whitespace (\t = 0x09, \n = 0x0A, \r = 0x0D).
523        #[test]
524        fn prop_pipeline_ascii_safe_output(v in arb_json_value()) {
525            let preset = default_preset();
526            let pipeline = CompressionPipeline::new(&preset);
527
528            let json_input = serde_json::to_string(&v).expect("serialize should not fail");
529            let result = pipeline.compress(&json_input, &ctx(), &preset)
530                .expect("compress should not fail");
531
532            for ch in result.data.chars() {
533                let cp = ch as u32;
534                let is_printable_ascii = cp >= 0x20 && cp <= 0x7E;
535                let is_standard_whitespace = cp == 0x09 || cp == 0x0A || cp == 0x0D;
536                prop_assert!(
537                    is_printable_ascii || is_standard_whitespace,
538                    "non-ASCII-safe character in output: U+{:04X} ({:?})\noutput: {:?}",
539                    cp, ch, result.data
540                );
541            }
542        }
543    }
544
545    proptest! {
546        /// **Validates: Requirements 1.3**
547        ///
548        /// Property 1: Compression preserves semantically significant content.
549        ///
550        /// For any CLI output containing significant tokens (errors, warnings,
551        /// status messages) mixed with noise (repeated identical lines), the
552        /// Compression_Pipeline SHALL produce output that:
553        ///   1. Contains all significant lines.
554        ///   2. Contains each noise line at most `max_repeated_lines` times.
555        #[test]
556        fn prop_compression_preserves_significant_content(
557            significant_lines in prop::collection::vec(significant_line_strategy(), 1..=5),
558            noise_line in noise_line_strategy(),
559            noise_repeat in 5u32..=10u32,
560        ) {
561            let preset = default_preset(); // condense enabled, max_repeated_lines=3
562            let pipeline = CompressionPipeline::new(&preset);
563
564            // Build interleaved input: noise, significant, noise, significant, ...
565            let mut lines: Vec<String> = Vec::new();
566            for sig in &significant_lines {
567                for _ in 0..noise_repeat {
568                    lines.push(noise_line.clone());
569                }
570                lines.push(sig.clone());
571            }
572            // Trailing noise block
573            for _ in 0..noise_repeat {
574                lines.push(noise_line.clone());
575            }
576
577            let input = lines.join("\n");
578            let result = pipeline.compress(&input, &ctx(), &preset).unwrap();
579            let output = &result.data;
580
581            // 1. All significant lines must appear in the output.
582            for sig in &significant_lines {
583                prop_assert!(
584                    output.contains(sig.as_str()),
585                    "significant line missing from output: {:?}\noutput: {:?}",
586                    sig,
587                    output
588                );
589            }
590
591            // 2. No consecutive run of the noise line exceeds max_repeated_lines (3).
592            let mut max_run = 0usize;
593            let mut current_run = 0usize;
594            for line in output.lines() {
595                if line == noise_line.as_str() {
596                    current_run += 1;
597                    max_run = max_run.max(current_run);
598                } else {
599                    current_run = 0;
600                }
601            }
602            prop_assert!(
603                max_run <= 3,
604                "noise line {:?} has a consecutive run of {} (max 3)\noutput: {:?}",
605                noise_line,
606                max_run,
607                output
608            );
609        }
610    }
611}