Skip to main content

sqz_engine/
pipeline.rs

1use crate::ansi_strip::AnsiStripper;
2use crate::error::{Result, SqzError};
3use crate::preset::Preset;
4use crate::prompt_cache::PromptCacheDetector;
5use crate::stages::{
6    CollapseArraysStage, CondenseStage, CustomTransformsStage, FlattenStage, GitDiffFoldStage,
7    KeepFieldsStage, StripFieldsStage, StripNullsStage, TruncateStringsStage,
8};
9use crate::token_counter::TokenCounter;
10use crate::toon::ToonEncoder;
11use crate::types::{CompressedContent, Content, ContentType, ModelFamily, StageConfig};
12
13/// Minimal session context passed to the pipeline.
14pub struct SessionContext {
15    pub session_id: String,
16}
17
18/// The 8-stage compression pipeline orchestrator.
19pub struct CompressionPipeline {
20    stages: Vec<Box<dyn crate::stages::CompressionStage>>,
21    toon_encoder: ToonEncoder,
22    token_counter: TokenCounter,
23    #[allow(dead_code)]
24    prompt_cache_detector: PromptCacheDetector,
25}
26
27impl CompressionPipeline {
28    /// Construct the pipeline from a preset, creating all 8 built-in stages
29    /// sorted by priority.
30    pub fn new(_preset: &Preset) -> Self {
31        let mut stages: Vec<Box<dyn crate::stages::CompressionStage>> = vec![
32            Box::new(AnsiStripper),
33            Box::new(KeepFieldsStage),
34            Box::new(StripFieldsStage),
35            Box::new(CondenseStage),
36            Box::new(GitDiffFoldStage),
37            Box::new(StripNullsStage),
38            Box::new(FlattenStage),
39            Box::new(TruncateStringsStage),
40            Box::new(CollapseArraysStage),
41            Box::new(CustomTransformsStage),
42        ];
43        stages.sort_by_key(|s| s.priority());
44
45        Self {
46            stages,
47            toon_encoder: ToonEncoder,
48            token_counter: TokenCounter::new(),
49            prompt_cache_detector: PromptCacheDetector,
50        }
51    }
52
53    /// Run content through all enabled stages then apply TOON encoding if the
54    /// result is JSON.
55    pub fn compress(
56        &self,
57        input: &str,
58        _ctx: &SessionContext,
59        preset: &Preset,
60    ) -> Result<CompressedContent> {
61        let model_family = model_family_from_preset(preset);
62        let tokens_original = self.token_counter.count(input, &model_family);
63
64        let mut content = Content {
65            raw: input.to_owned(),
66            content_type: ContentType::PlainText,
67            metadata: crate::types::ContentMetadata {
68                source: None,
69                path: None,
70                language: None,
71            },
72            tokens_original,
73        };
74
75        let mut stages_applied: Vec<String> = Vec::new();
76
77        for stage in &self.stages {
78            let config = stage_config_from_preset(stage.name(), preset);
79            if config.enabled {
80                stage.process(&mut content, &config)?;
81                stages_applied.push(stage.name().to_owned());
82            }
83        }
84
85        // Apply TOON encoding if the result is JSON
86        let data = if ToonEncoder::is_json(&content.raw) {
87            let json: serde_json::Value = serde_json::from_str(&content.raw)
88                .map_err(|e| SqzError::Other(format!("pipeline: JSON parse error: {e}")))?;
89            let encoded = self.toon_encoder.encode(&json)?;
90            stages_applied.push("toon_encode".to_owned());
91            encoded
92        } else {
93            content.raw
94        };
95
96        let tokens_compressed = self.token_counter.count(&data, &model_family);
97        let compression_ratio = if tokens_original == 0 {
98            1.0
99        } else {
100            tokens_compressed as f64 / tokens_original as f64
101        };
102
103        Ok(CompressedContent {
104            data,
105            tokens_compressed,
106            tokens_original,
107            stages_applied,
108            compression_ratio,
109            provenance: crate::types::Provenance::default(),
110            verify: None,
111        })
112    }
113
114    /// Insert a plugin stage and re-sort by priority.
115    pub fn insert_stage(&mut self, stage: Box<dyn crate::stages::CompressionStage>) {
116        self.stages.push(stage);
117        self.stages.sort_by_key(|s| s.priority());
118    }
119
120    /// Rebuild stage list from a new preset (hot-reload support).
121    /// Built-in stages are recreated; plugin stages are dropped and must be
122    /// re-inserted by the caller.
123    pub fn reload_preset(&mut self, _preset: &Preset) -> Result<()> {
124        let mut stages: Vec<Box<dyn crate::stages::CompressionStage>> = vec![
125            Box::new(AnsiStripper),
126            Box::new(KeepFieldsStage),
127            Box::new(StripFieldsStage),
128            Box::new(CondenseStage),
129            Box::new(GitDiffFoldStage),
130            Box::new(StripNullsStage),
131            Box::new(FlattenStage),
132            Box::new(TruncateStringsStage),
133            Box::new(CollapseArraysStage),
134            Box::new(CustomTransformsStage),
135        ];
136        stages.sort_by_key(|s| s.priority());
137        self.stages = stages;
138        Ok(())
139    }
140}
141
142/// Derive the `ModelFamily` from the preset's model configuration.
143fn model_family_from_preset(preset: &Preset) -> ModelFamily {
144    match preset.model.family.to_lowercase().as_str() {
145        "anthropic" | "claude" => ModelFamily::AnthropicClaude,
146        "openai" | "gpt" => ModelFamily::OpenAiGpt,
147        "google" | "gemini" => ModelFamily::GoogleGemini,
148        other => ModelFamily::Local(other.to_string()),
149    }
150}
151
152/// Build a `StageConfig` for a named stage from the preset's compression config.
153fn stage_config_from_preset(name: &str, preset: &Preset) -> StageConfig {
154    let c = &preset.compression;
155    match name {
156        "ansi_strip" => StageConfig {
157            enabled: true,
158            options: serde_json::Value::Object(Default::default()),
159        },
160        "keep_fields" => {
161            if let Some(cfg) = &c.keep_fields {
162                StageConfig {
163                    enabled: cfg.enabled,
164                    options: serde_json::json!({ "fields": cfg.fields }),
165                }
166            } else {
167                StageConfig::default()
168            }
169        }
170        "strip_fields" => {
171            if let Some(cfg) = &c.strip_fields {
172                StageConfig {
173                    enabled: cfg.enabled,
174                    options: serde_json::json!({ "fields": cfg.fields }),
175                }
176            } else {
177                StageConfig::default()
178            }
179        }
180        "condense" => {
181            if let Some(cfg) = &c.condense {
182                StageConfig {
183                    enabled: cfg.enabled,
184                    options: serde_json::json!({
185                        "max_repeated_lines": cfg.max_repeated_lines
186                    }),
187                }
188            } else {
189                StageConfig::default()
190            }
191        }
192        "git_diff_fold" => {
193            if let Some(cfg) = &c.git_diff_fold {
194                StageConfig {
195                    enabled: cfg.enabled,
196                    options: serde_json::json!({
197                        "max_context_lines": cfg.max_context_lines
198                    }),
199                }
200            } else {
201                // Default: enabled with 2 context lines
202                StageConfig {
203                    enabled: true,
204                    options: serde_json::json!({ "max_context_lines": 2 }),
205                }
206            }
207        }
208        "strip_nulls" => {
209            if let Some(cfg) = &c.strip_nulls {
210                StageConfig {
211                    enabled: cfg.enabled,
212                    options: serde_json::Value::Object(Default::default()),
213                }
214            } else {
215                StageConfig::default()
216            }
217        }
218        "flatten" => {
219            if let Some(cfg) = &c.flatten {
220                StageConfig {
221                    enabled: cfg.enabled,
222                    options: serde_json::json!({ "max_depth": cfg.max_depth }),
223                }
224            } else {
225                StageConfig::default()
226            }
227        }
228        "truncate_strings" => {
229            if let Some(cfg) = &c.truncate_strings {
230                StageConfig {
231                    enabled: cfg.enabled,
232                    options: serde_json::json!({ "max_length": cfg.max_length }),
233                }
234            } else {
235                StageConfig::default()
236            }
237        }
238        "collapse_arrays" => {
239            if let Some(cfg) = &c.collapse_arrays {
240                StageConfig {
241                    enabled: cfg.enabled,
242                    options: serde_json::json!({
243                        "max_items": cfg.max_items,
244                        "summary_template": cfg.summary_template
245                    }),
246                }
247            } else {
248                StageConfig::default()
249            }
250        }
251        "custom_transforms" => {
252            if let Some(cfg) = &c.custom_transforms {
253                StageConfig {
254                    enabled: cfg.enabled,
255                    options: serde_json::Value::Object(Default::default()),
256                }
257            } else {
258                StageConfig::default()
259            }
260        }
261        _ => StageConfig::default(),
262    }
263}
264
265// ---------------------------------------------------------------------------
266// Tests
267// ---------------------------------------------------------------------------
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272    use crate::preset::{
273        BudgetConfig, CollapseArraysConfig, CompressionConfig, CondenseConfig,
274        CustomTransformsConfig, ModelConfig, PresetMeta,
275        StripNullsConfig, ToolSelectionConfig, TruncateStringsConfig,
276        TerseModeConfig,
277    };
278
279    fn default_preset() -> Preset {
280        Preset {
281            preset: PresetMeta {
282                name: "test".into(),
283                version: "1.0".into(),
284                description: String::new(),
285            },
286            compression: CompressionConfig {
287                stages: vec![],
288                keep_fields: None,
289                strip_fields: None,
290                condense: Some(CondenseConfig {
291                    enabled: true,
292                    max_repeated_lines: 3,
293                }),
294                git_diff_fold: None,
295                strip_nulls: Some(StripNullsConfig { enabled: true }),
296                flatten: None,
297                truncate_strings: Some(TruncateStringsConfig {
298                    enabled: true,
299                    max_length: 500,
300                }),
301                collapse_arrays: Some(CollapseArraysConfig {
302                    enabled: true,
303                    max_items: 5,
304                    summary_template: "... and {remaining} more items".into(),
305                }),
306                custom_transforms: Some(CustomTransformsConfig { enabled: true }),
307            },
308            tool_selection: ToolSelectionConfig {
309                max_tools: 5,
310                similarity_threshold: 0.7,
311                default_tools: vec![],
312            },
313            budget: BudgetConfig {
314                warning_threshold: 0.70,
315                ceiling_threshold: 0.85,
316                default_window_size: 200_000,
317                agents: Default::default(),
318            },
319            terse_mode: TerseModeConfig {
320                enabled: false,
321                level: crate::preset::TerseLevel::Moderate,
322            },
323            model: ModelConfig {
324                family: "anthropic".into(),
325                primary: "claude-sonnet-4-20250514".into(),
326                local: String::new(),
327                complexity_threshold: 0.4,
328                pricing: None,
329            },
330        }
331    }
332
333    fn ctx() -> SessionContext {
334        SessionContext {
335            session_id: "test-session".into(),
336        }
337    }
338
339    #[test]
340    fn new_creates_pipeline_with_sorted_stages() {
341        let preset = default_preset();
342        let pipeline = CompressionPipeline::new(&preset);
343        // Verify stages are sorted by priority
344        let priorities: Vec<u32> = pipeline.stages.iter().map(|s| s.priority()).collect();
345        let mut sorted = priorities.clone();
346        sorted.sort();
347        assert_eq!(priorities, sorted);
348    }
349
350    #[test]
351    fn compress_plain_text_passthrough() {
352        let preset = default_preset();
353        let pipeline = CompressionPipeline::new(&preset);
354        let result = pipeline.compress("hello world", &ctx(), &preset).unwrap();
355        assert_eq!(result.data, "hello world");
356        assert!(!result.stages_applied.contains(&"toon_encode".to_owned()));
357    }
358
359    #[test]
360    fn compress_json_applies_toon() {
361        let preset = default_preset();
362        let pipeline = CompressionPipeline::new(&preset);
363        let json = r#"{"name":"Alice","age":30}"#;
364        let result = pipeline.compress(json, &ctx(), &preset).unwrap();
365        assert!(result.data.starts_with("TOON:"), "data: {}", result.data);
366        assert!(result.stages_applied.contains(&"toon_encode".to_owned()));
367    }
368
369    #[test]
370    fn compress_strips_nulls_from_json() {
371        let preset = default_preset();
372        let pipeline = CompressionPipeline::new(&preset);
373        let json = r#"{"a":1,"b":null}"#;
374        let result = pipeline.compress(json, &ctx(), &preset).unwrap();
375        // After strip_nulls, "b" is gone; TOON encodes the result
376        assert!(result.data.starts_with("TOON:"));
377        // Decode and verify null is gone
378        let decoded = ToonEncoder.decode(&result.data).unwrap();
379        assert!(decoded.get("b").is_none());
380        assert_eq!(decoded["a"], serde_json::json!(1));
381    }
382
383    #[test]
384    fn compress_returns_token_counts() {
385        let preset = default_preset();
386        let pipeline = CompressionPipeline::new(&preset);
387        let input = "a".repeat(100);
388        let result = pipeline.compress(&input, &ctx(), &preset).unwrap();
389        assert!(result.tokens_original > 0);
390        assert!(result.tokens_compressed > 0);
391    }
392
393    #[test]
394    fn compress_ratio_is_reasonable() {
395        let preset = default_preset();
396        let pipeline = CompressionPipeline::new(&preset);
397        let result = pipeline.compress("hello", &ctx(), &preset).unwrap();
398        assert!(result.compression_ratio > 0.0);
399    }
400
401    #[test]
402    fn insert_stage_re_sorts_by_priority() {
403        use crate::stages::CompressionStage;
404        use crate::types::StageConfig;
405
406        struct LowPriorityStage;
407        impl CompressionStage for LowPriorityStage {
408            fn name(&self) -> &str {
409                "low_priority"
410            }
411            fn priority(&self) -> u32 {
412                5 // lower than all built-in stages
413            }
414            fn process(
415                &self,
416                _content: &mut Content,
417                _config: &StageConfig,
418            ) -> crate::error::Result<()> {
419                Ok(())
420            }
421        }
422
423        let preset = default_preset();
424        let mut pipeline = CompressionPipeline::new(&preset);
425        pipeline.insert_stage(Box::new(LowPriorityStage));
426
427        let priorities: Vec<u32> = pipeline.stages.iter().map(|s| s.priority()).collect();
428        let mut sorted = priorities.clone();
429        sorted.sort();
430        assert_eq!(priorities, sorted);
431        assert_eq!(pipeline.stages[0].name(), "ansi_strip");
432        assert_eq!(pipeline.stages[1].name(), "low_priority");
433    }
434
435    #[test]
436    fn reload_preset_rebuilds_stages() {
437        let preset = default_preset();
438        let mut pipeline = CompressionPipeline::new(&preset);
439        let original_count = pipeline.stages.len();
440        pipeline.reload_preset(&preset).unwrap();
441        assert_eq!(pipeline.stages.len(), original_count);
442    }
443
444    #[test]
445    fn compress_keep_fields_filters_json() {
446        use crate::preset::KeepFieldsConfig;
447        let mut preset = default_preset();
448        preset.compression.keep_fields = Some(KeepFieldsConfig {
449            enabled: true,
450            fields: vec!["id".into(), "name".into()],
451        });
452        let pipeline = CompressionPipeline::new(&preset);
453        let json = r#"{"id":1,"name":"Bob","debug":"x"}"#;
454        let result = pipeline.compress(json, &ctx(), &preset).unwrap();
455        let decoded = ToonEncoder.decode(&result.data).unwrap();
456        assert!(decoded.get("debug").is_none());
457        assert_eq!(decoded["id"], serde_json::json!(1));
458    }
459
460    #[test]
461    fn compress_empty_string() {
462        let preset = default_preset();
463        let pipeline = CompressionPipeline::new(&preset);
464        let result = pipeline.compress("", &ctx(), &preset).unwrap();
465        assert_eq!(result.data, "");
466        assert_eq!(result.tokens_original, 0);
467    }
468
469    #[test]
470    fn stage_config_from_preset_unknown_stage() {
471        let preset = default_preset();
472        let config = stage_config_from_preset("nonexistent", &preset);
473        assert!(!config.enabled);
474    }
475
476    // ---------------------------------------------------------------------------
477    // Property tests
478    // ---------------------------------------------------------------------------
479
480    use proptest::prelude::*;
481
482    /// Generate a significant line from a fixed set of meaningful tokens.
483    fn significant_line_strategy() -> impl Strategy<Value = String> {
484        prop_oneof![
485            Just("error: connection refused".to_owned()),
486            Just("warning: deprecated API usage".to_owned()),
487            Just("failed: build step exited with code 1".to_owned()),
488            Just("success: deployment complete".to_owned()),
489            Just("status: all checks passed".to_owned()),
490            Just("error: file not found".to_owned()),
491            Just("warning: unused variable detected".to_owned()),
492        ]
493    }
494
495    /// Generate a noise line (repeated decorative content).
496    fn noise_line_strategy() -> impl Strategy<Value = String> {
497        prop_oneof![
498            Just("---".to_owned()),
499            Just("Loading...".to_owned()),
500            Just("================".to_owned()),
501            Just("...".to_owned()),
502        ]
503    }
504
505    /// Recursive strategy that generates arbitrary serde_json::Value instances.
506    /// Mirrors the strategy in toon.rs tests.
507    fn arb_json_value() -> impl Strategy<Value = serde_json::Value> {
508        let leaf = prop_oneof![
509            Just(serde_json::Value::Null),
510            any::<bool>().prop_map(serde_json::Value::Bool),
511            any::<i64>().prop_map(|n| serde_json::json!(n)),
512            any::<f64>()
513                .prop_filter("must be finite", |f| f.is_finite())
514                .prop_map(|f| serde_json::json!(f)),
515            ".*".prop_map(serde_json::Value::String),
516        ];
517
518        leaf.prop_recursive(4, 64, 8, |inner| {
519            prop_oneof![
520                prop::collection::vec(inner.clone(), 0..8)
521                    .prop_map(serde_json::Value::Array),
522                prop::collection::hash_map(".*", inner, 0..8).prop_map(|m| {
523                    serde_json::Value::Object(m.into_iter().collect())
524                }),
525            ]
526        })
527    }
528
529    proptest! {
530        /// **Validates: Requirements 17.1, 17.2, 13.2**
531        ///
532        /// Property 22: ASCII-safe output.
533        ///
534        /// For any JSON input, the Compression_Pipeline SHALL produce output
535        /// using only ASCII-safe characters: printable ASCII (0x20–0x7E) plus
536        /// standard whitespace (\t = 0x09, \n = 0x0A, \r = 0x0D).
537        #[test]
538        fn prop_pipeline_ascii_safe_output(v in arb_json_value()) {
539            let preset = default_preset();
540            let pipeline = CompressionPipeline::new(&preset);
541
542            let json_input = serde_json::to_string(&v).expect("serialize should not fail");
543            let result = pipeline.compress(&json_input, &ctx(), &preset)
544                .expect("compress should not fail");
545
546            for ch in result.data.chars() {
547                let cp = ch as u32;
548                let is_printable_ascii = cp >= 0x20 && cp <= 0x7E;
549                let is_standard_whitespace = cp == 0x09 || cp == 0x0A || cp == 0x0D;
550                prop_assert!(
551                    is_printable_ascii || is_standard_whitespace,
552                    "non-ASCII-safe character in output: U+{:04X} ({:?})\noutput: {:?}",
553                    cp, ch, result.data
554                );
555            }
556        }
557    }
558
559    proptest! {
560        /// **Validates: Requirements 1.3**
561        ///
562        /// Property 1: Compression preserves semantically significant content.
563        ///
564        /// For any CLI output containing significant tokens (errors, warnings,
565        /// status messages) mixed with noise (repeated identical lines), the
566        /// Compression_Pipeline SHALL produce output that:
567        ///   1. Contains all significant lines.
568        ///   2. Contains each noise line at most `max_repeated_lines` times.
569        #[test]
570        fn prop_compression_preserves_significant_content(
571            significant_lines in prop::collection::vec(significant_line_strategy(), 1..=5),
572            noise_line in noise_line_strategy(),
573            noise_repeat in 5u32..=10u32,
574        ) {
575            let preset = default_preset(); // condense enabled, max_repeated_lines=3
576            let pipeline = CompressionPipeline::new(&preset);
577
578            // Build interleaved input: noise, significant, noise, significant, ...
579            let mut lines: Vec<String> = Vec::new();
580            for sig in &significant_lines {
581                for _ in 0..noise_repeat {
582                    lines.push(noise_line.clone());
583                }
584                lines.push(sig.clone());
585            }
586            // Trailing noise block
587            for _ in 0..noise_repeat {
588                lines.push(noise_line.clone());
589            }
590
591            let input = lines.join("\n");
592            let result = pipeline.compress(&input, &ctx(), &preset).unwrap();
593            let output = &result.data;
594
595            // 1. All significant lines must appear in the output.
596            for sig in &significant_lines {
597                prop_assert!(
598                    output.contains(sig.as_str()),
599                    "significant line missing from output: {:?}\noutput: {:?}",
600                    sig,
601                    output
602                );
603            }
604
605            // 2. No consecutive run of the noise line exceeds max_repeated_lines (3).
606            let mut max_run = 0usize;
607            let mut current_run = 0usize;
608            for line in output.lines() {
609                if line == noise_line.as_str() {
610                    current_run += 1;
611                    max_run = max_run.max(current_run);
612                } else {
613                    current_run = 0;
614                }
615            }
616            prop_assert!(
617                max_run <= 3,
618                "noise line {:?} has a consecutive run of {} (max 3)\noutput: {:?}",
619                noise_line,
620                max_run,
621                output
622            );
623        }
624    }
625}