Skip to main content

sqz_engine/
pipeline.rs

1use crate::ansi_strip::AnsiStripper;
2use crate::error::{Result, SqzError};
3use crate::preset::Preset;
4use crate::prompt_cache::PromptCacheDetector;
5use crate::stages::{
6    CollapseArraysStage, CondenseStage, CustomTransformsStage, FlattenStage, KeepFieldsStage,
7    StripFieldsStage, StripNullsStage, TruncateStringsStage,
8};
9use crate::toon::ToonEncoder;
10use crate::types::{CompressedContent, Content, ContentType, StageConfig};
11
12/// Minimal session context passed to the pipeline.
13pub struct SessionContext {
14    pub session_id: String,
15}
16
17/// The 8-stage compression pipeline orchestrator.
18pub struct CompressionPipeline {
19    stages: Vec<Box<dyn crate::stages::CompressionStage>>,
20    toon_encoder: ToonEncoder,
21    #[allow(dead_code)]
22    prompt_cache_detector: PromptCacheDetector,
23}
24
25impl CompressionPipeline {
26    /// Construct the pipeline from a preset, creating all 8 built-in stages
27    /// sorted by priority.
28    pub fn new(preset: &Preset) -> Self {
29        let mut stages: Vec<Box<dyn crate::stages::CompressionStage>> = vec![
30            Box::new(AnsiStripper),
31            Box::new(KeepFieldsStage),
32            Box::new(StripFieldsStage),
33            Box::new(CondenseStage),
34            Box::new(StripNullsStage),
35            Box::new(FlattenStage),
36            Box::new(TruncateStringsStage),
37            Box::new(CollapseArraysStage),
38            Box::new(CustomTransformsStage),
39        ];
40        stages.sort_by_key(|s| s.priority());
41
42        let _ = preset; // preset is used for config lookup in compress()
43
44        Self {
45            stages,
46            toon_encoder: ToonEncoder,
47            prompt_cache_detector: PromptCacheDetector,
48        }
49    }
50
51    /// Run content through all enabled stages then apply TOON encoding if the
52    /// result is JSON.
53    pub fn compress(
54        &self,
55        input: &str,
56        _ctx: &SessionContext,
57        preset: &Preset,
58    ) -> Result<CompressedContent> {
59        let tokens_original = (input.chars().count() as u32).saturating_add(3) / 4;
60
61        let mut content = Content {
62            raw: input.to_owned(),
63            content_type: ContentType::PlainText,
64            metadata: crate::types::ContentMetadata {
65                source: None,
66                path: None,
67                language: None,
68            },
69            tokens_original,
70        };
71
72        let mut stages_applied: Vec<String> = Vec::new();
73
74        for stage in &self.stages {
75            let config = stage_config_from_preset(stage.name(), preset);
76            if config.enabled {
77                stage.process(&mut content, &config)?;
78                stages_applied.push(stage.name().to_owned());
79            }
80        }
81
82        // Apply TOON encoding if the result is JSON
83        let data = if ToonEncoder::is_json(&content.raw) {
84            let json: serde_json::Value = serde_json::from_str(&content.raw)
85                .map_err(|e| SqzError::Other(format!("pipeline: JSON parse error: {e}")))?;
86            let encoded = self.toon_encoder.encode(&json)?;
87            stages_applied.push("toon_encode".to_owned());
88            encoded
89        } else {
90            content.raw
91        };
92
93        let tokens_compressed = (data.chars().count() as u32).saturating_add(3) / 4;
94        let compression_ratio = if tokens_original == 0 {
95            1.0
96        } else {
97            tokens_compressed as f64 / tokens_original as f64
98        };
99
100        Ok(CompressedContent {
101            data,
102            tokens_compressed,
103            tokens_original,
104            stages_applied,
105            compression_ratio,
106        })
107    }
108
109    /// Insert a plugin stage and re-sort by priority.
110    pub fn insert_stage(&mut self, stage: Box<dyn crate::stages::CompressionStage>) {
111        self.stages.push(stage);
112        self.stages.sort_by_key(|s| s.priority());
113    }
114
115    /// Rebuild stage list from a new preset (hot-reload support).
116    /// Built-in stages are recreated; plugin stages are dropped and must be
117    /// re-inserted by the caller.
118    pub fn reload_preset(&mut self, preset: &Preset) -> Result<()> {
119        let mut stages: Vec<Box<dyn crate::stages::CompressionStage>> = vec![
120            Box::new(AnsiStripper),
121            Box::new(KeepFieldsStage),
122            Box::new(StripFieldsStage),
123            Box::new(CondenseStage),
124            Box::new(StripNullsStage),
125            Box::new(FlattenStage),
126            Box::new(TruncateStringsStage),
127            Box::new(CollapseArraysStage),
128            Box::new(CustomTransformsStage),
129        ];
130        stages.sort_by_key(|s| s.priority());
131        self.stages = stages;
132        let _ = preset;
133        Ok(())
134    }
135}
136
137/// Build a `StageConfig` for a named stage from the preset's compression config.
138fn stage_config_from_preset(name: &str, preset: &Preset) -> StageConfig {
139    let c = &preset.compression;
140    match name {
141        "ansi_strip" => StageConfig {
142            enabled: true,
143            options: serde_json::Value::Object(Default::default()),
144        },
145        "keep_fields" => {
146            if let Some(cfg) = &c.keep_fields {
147                StageConfig {
148                    enabled: cfg.enabled,
149                    options: serde_json::json!({ "fields": cfg.fields }),
150                }
151            } else {
152                StageConfig::default()
153            }
154        }
155        "strip_fields" => {
156            if let Some(cfg) = &c.strip_fields {
157                StageConfig {
158                    enabled: cfg.enabled,
159                    options: serde_json::json!({ "fields": cfg.fields }),
160                }
161            } else {
162                StageConfig::default()
163            }
164        }
165        "condense" => {
166            if let Some(cfg) = &c.condense {
167                StageConfig {
168                    enabled: cfg.enabled,
169                    options: serde_json::json!({
170                        "max_repeated_lines": cfg.max_repeated_lines
171                    }),
172                }
173            } else {
174                StageConfig::default()
175            }
176        }
177        "strip_nulls" => {
178            if let Some(cfg) = &c.strip_nulls {
179                StageConfig {
180                    enabled: cfg.enabled,
181                    options: serde_json::Value::Object(Default::default()),
182                }
183            } else {
184                StageConfig::default()
185            }
186        }
187        "flatten" => {
188            if let Some(cfg) = &c.flatten {
189                StageConfig {
190                    enabled: cfg.enabled,
191                    options: serde_json::json!({ "max_depth": cfg.max_depth }),
192                }
193            } else {
194                StageConfig::default()
195            }
196        }
197        "truncate_strings" => {
198            if let Some(cfg) = &c.truncate_strings {
199                StageConfig {
200                    enabled: cfg.enabled,
201                    options: serde_json::json!({ "max_length": cfg.max_length }),
202                }
203            } else {
204                StageConfig::default()
205            }
206        }
207        "collapse_arrays" => {
208            if let Some(cfg) = &c.collapse_arrays {
209                StageConfig {
210                    enabled: cfg.enabled,
211                    options: serde_json::json!({
212                        "max_items": cfg.max_items,
213                        "summary_template": cfg.summary_template
214                    }),
215                }
216            } else {
217                StageConfig::default()
218            }
219        }
220        "custom_transforms" => {
221            if let Some(cfg) = &c.custom_transforms {
222                StageConfig {
223                    enabled: cfg.enabled,
224                    options: serde_json::Value::Object(Default::default()),
225                }
226            } else {
227                StageConfig::default()
228            }
229        }
230        _ => StageConfig::default(),
231    }
232}
233
234// ---------------------------------------------------------------------------
235// Tests
236// ---------------------------------------------------------------------------
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241    use crate::preset::{
242        BudgetConfig, CollapseArraysConfig, CompressionConfig, CondenseConfig,
243        CustomTransformsConfig, ModelConfig, PresetMeta,
244        StripNullsConfig, ToolSelectionConfig, TruncateStringsConfig,
245        TerseModeConfig,
246    };
247
248    fn default_preset() -> Preset {
249        Preset {
250            preset: PresetMeta {
251                name: "test".into(),
252                version: "1.0".into(),
253                description: String::new(),
254            },
255            compression: CompressionConfig {
256                stages: vec![],
257                keep_fields: None,
258                strip_fields: None,
259                condense: Some(CondenseConfig {
260                    enabled: true,
261                    max_repeated_lines: 3,
262                }),
263                strip_nulls: Some(StripNullsConfig { enabled: true }),
264                flatten: None,
265                truncate_strings: Some(TruncateStringsConfig {
266                    enabled: true,
267                    max_length: 500,
268                }),
269                collapse_arrays: Some(CollapseArraysConfig {
270                    enabled: true,
271                    max_items: 5,
272                    summary_template: "... and {remaining} more items".into(),
273                }),
274                custom_transforms: Some(CustomTransformsConfig { enabled: true }),
275            },
276            tool_selection: ToolSelectionConfig {
277                max_tools: 5,
278                similarity_threshold: 0.7,
279                default_tools: vec![],
280            },
281            budget: BudgetConfig {
282                warning_threshold: 0.70,
283                ceiling_threshold: 0.85,
284                default_window_size: 200_000,
285                agents: Default::default(),
286            },
287            terse_mode: TerseModeConfig {
288                enabled: false,
289                level: crate::preset::TerseLevel::Moderate,
290            },
291            model: ModelConfig {
292                family: "anthropic".into(),
293                primary: "claude-sonnet-4-20250514".into(),
294                local: String::new(),
295                complexity_threshold: 0.4,
296                pricing: None,
297            },
298        }
299    }
300
301    fn ctx() -> SessionContext {
302        SessionContext {
303            session_id: "test-session".into(),
304        }
305    }
306
307    #[test]
308    fn new_creates_pipeline_with_sorted_stages() {
309        let preset = default_preset();
310        let pipeline = CompressionPipeline::new(&preset);
311        // Verify stages are sorted by priority
312        let priorities: Vec<u32> = pipeline.stages.iter().map(|s| s.priority()).collect();
313        let mut sorted = priorities.clone();
314        sorted.sort();
315        assert_eq!(priorities, sorted);
316    }
317
318    #[test]
319    fn compress_plain_text_passthrough() {
320        let preset = default_preset();
321        let pipeline = CompressionPipeline::new(&preset);
322        let result = pipeline.compress("hello world", &ctx(), &preset).unwrap();
323        assert_eq!(result.data, "hello world");
324        assert!(!result.stages_applied.contains(&"toon_encode".to_owned()));
325    }
326
327    #[test]
328    fn compress_json_applies_toon() {
329        let preset = default_preset();
330        let pipeline = CompressionPipeline::new(&preset);
331        let json = r#"{"name":"Alice","age":30}"#;
332        let result = pipeline.compress(json, &ctx(), &preset).unwrap();
333        assert!(result.data.starts_with("TOON:"), "data: {}", result.data);
334        assert!(result.stages_applied.contains(&"toon_encode".to_owned()));
335    }
336
337    #[test]
338    fn compress_strips_nulls_from_json() {
339        let preset = default_preset();
340        let pipeline = CompressionPipeline::new(&preset);
341        let json = r#"{"a":1,"b":null}"#;
342        let result = pipeline.compress(json, &ctx(), &preset).unwrap();
343        // After strip_nulls, "b" is gone; TOON encodes the result
344        assert!(result.data.starts_with("TOON:"));
345        // Decode and verify null is gone
346        let decoded = ToonEncoder.decode(&result.data).unwrap();
347        assert!(decoded.get("b").is_none());
348        assert_eq!(decoded["a"], serde_json::json!(1));
349    }
350
351    #[test]
352    fn compress_returns_token_counts() {
353        let preset = default_preset();
354        let pipeline = CompressionPipeline::new(&preset);
355        let input = "a".repeat(100);
356        let result = pipeline.compress(&input, &ctx(), &preset).unwrap();
357        assert!(result.tokens_original > 0);
358        assert!(result.tokens_compressed > 0);
359    }
360
361    #[test]
362    fn compress_ratio_is_reasonable() {
363        let preset = default_preset();
364        let pipeline = CompressionPipeline::new(&preset);
365        let result = pipeline.compress("hello", &ctx(), &preset).unwrap();
366        assert!(result.compression_ratio > 0.0);
367    }
368
369    #[test]
370    fn insert_stage_re_sorts_by_priority() {
371        use crate::stages::CompressionStage;
372        use crate::types::StageConfig;
373
374        struct LowPriorityStage;
375        impl CompressionStage for LowPriorityStage {
376            fn name(&self) -> &str {
377                "low_priority"
378            }
379            fn priority(&self) -> u32 {
380                5 // lower than all built-in stages
381            }
382            fn process(
383                &self,
384                _content: &mut Content,
385                _config: &StageConfig,
386            ) -> crate::error::Result<()> {
387                Ok(())
388            }
389        }
390
391        let preset = default_preset();
392        let mut pipeline = CompressionPipeline::new(&preset);
393        pipeline.insert_stage(Box::new(LowPriorityStage));
394
395        let priorities: Vec<u32> = pipeline.stages.iter().map(|s| s.priority()).collect();
396        let mut sorted = priorities.clone();
397        sorted.sort();
398        assert_eq!(priorities, sorted);
399        assert_eq!(pipeline.stages[0].name(), "ansi_strip");
400        assert_eq!(pipeline.stages[1].name(), "low_priority");
401    }
402
403    #[test]
404    fn reload_preset_rebuilds_stages() {
405        let preset = default_preset();
406        let mut pipeline = CompressionPipeline::new(&preset);
407        let original_count = pipeline.stages.len();
408        pipeline.reload_preset(&preset).unwrap();
409        assert_eq!(pipeline.stages.len(), original_count);
410    }
411
412    #[test]
413    fn compress_keep_fields_filters_json() {
414        use crate::preset::KeepFieldsConfig;
415        let mut preset = default_preset();
416        preset.compression.keep_fields = Some(KeepFieldsConfig {
417            enabled: true,
418            fields: vec!["id".into(), "name".into()],
419        });
420        let pipeline = CompressionPipeline::new(&preset);
421        let json = r#"{"id":1,"name":"Bob","debug":"x"}"#;
422        let result = pipeline.compress(json, &ctx(), &preset).unwrap();
423        let decoded = ToonEncoder.decode(&result.data).unwrap();
424        assert!(decoded.get("debug").is_none());
425        assert_eq!(decoded["id"], serde_json::json!(1));
426    }
427
428    #[test]
429    fn compress_empty_string() {
430        let preset = default_preset();
431        let pipeline = CompressionPipeline::new(&preset);
432        let result = pipeline.compress("", &ctx(), &preset).unwrap();
433        assert_eq!(result.data, "");
434        assert_eq!(result.tokens_original, 0);
435    }
436
437    #[test]
438    fn stage_config_from_preset_unknown_stage() {
439        let preset = default_preset();
440        let config = stage_config_from_preset("nonexistent", &preset);
441        assert!(!config.enabled);
442    }
443
444    // ---------------------------------------------------------------------------
445    // Property tests
446    // ---------------------------------------------------------------------------
447
448    use proptest::prelude::*;
449
450    /// Generate a significant line from a fixed set of meaningful tokens.
451    fn significant_line_strategy() -> impl Strategy<Value = String> {
452        prop_oneof![
453            Just("error: connection refused".to_owned()),
454            Just("warning: deprecated API usage".to_owned()),
455            Just("failed: build step exited with code 1".to_owned()),
456            Just("success: deployment complete".to_owned()),
457            Just("status: all checks passed".to_owned()),
458            Just("error: file not found".to_owned()),
459            Just("warning: unused variable detected".to_owned()),
460        ]
461    }
462
463    /// Generate a noise line (repeated decorative content).
464    fn noise_line_strategy() -> impl Strategy<Value = String> {
465        prop_oneof![
466            Just("---".to_owned()),
467            Just("Loading...".to_owned()),
468            Just("================".to_owned()),
469            Just("...".to_owned()),
470        ]
471    }
472
473    /// Recursive strategy that generates arbitrary serde_json::Value instances.
474    /// Mirrors the strategy in toon.rs tests.
475    fn arb_json_value() -> impl Strategy<Value = serde_json::Value> {
476        let leaf = prop_oneof![
477            Just(serde_json::Value::Null),
478            any::<bool>().prop_map(serde_json::Value::Bool),
479            any::<i64>().prop_map(|n| serde_json::json!(n)),
480            any::<f64>()
481                .prop_filter("must be finite", |f| f.is_finite())
482                .prop_map(|f| serde_json::json!(f)),
483            ".*".prop_map(serde_json::Value::String),
484        ];
485
486        leaf.prop_recursive(4, 64, 8, |inner| {
487            prop_oneof![
488                prop::collection::vec(inner.clone(), 0..8)
489                    .prop_map(serde_json::Value::Array),
490                prop::collection::hash_map(".*", inner, 0..8).prop_map(|m| {
491                    serde_json::Value::Object(m.into_iter().collect())
492                }),
493            ]
494        })
495    }
496
497    proptest! {
498        /// **Validates: Requirements 17.1, 17.2, 13.2**
499        ///
500        /// Property 22: ASCII-safe output.
501        ///
502        /// For any JSON input, the Compression_Pipeline SHALL produce output
503        /// using only ASCII-safe characters: printable ASCII (0x20–0x7E) plus
504        /// standard whitespace (\t = 0x09, \n = 0x0A, \r = 0x0D).
505        #[test]
506        fn prop_pipeline_ascii_safe_output(v in arb_json_value()) {
507            let preset = default_preset();
508            let pipeline = CompressionPipeline::new(&preset);
509
510            let json_input = serde_json::to_string(&v).expect("serialize should not fail");
511            let result = pipeline.compress(&json_input, &ctx(), &preset)
512                .expect("compress should not fail");
513
514            for ch in result.data.chars() {
515                let cp = ch as u32;
516                let is_printable_ascii = cp >= 0x20 && cp <= 0x7E;
517                let is_standard_whitespace = cp == 0x09 || cp == 0x0A || cp == 0x0D;
518                prop_assert!(
519                    is_printable_ascii || is_standard_whitespace,
520                    "non-ASCII-safe character in output: U+{:04X} ({:?})\noutput: {:?}",
521                    cp, ch, result.data
522                );
523            }
524        }
525    }
526
527    proptest! {
528        /// **Validates: Requirements 1.3**
529        ///
530        /// Property 1: Compression preserves semantically significant content.
531        ///
532        /// For any CLI output containing significant tokens (errors, warnings,
533        /// status messages) mixed with noise (repeated identical lines), the
534        /// Compression_Pipeline SHALL produce output that:
535        ///   1. Contains all significant lines.
536        ///   2. Contains each noise line at most `max_repeated_lines` times.
537        #[test]
538        fn prop_compression_preserves_significant_content(
539            significant_lines in prop::collection::vec(significant_line_strategy(), 1..=5),
540            noise_line in noise_line_strategy(),
541            noise_repeat in 5u32..=10u32,
542        ) {
543            let preset = default_preset(); // condense enabled, max_repeated_lines=3
544            let pipeline = CompressionPipeline::new(&preset);
545
546            // Build interleaved input: noise, significant, noise, significant, ...
547            let mut lines: Vec<String> = Vec::new();
548            for sig in &significant_lines {
549                for _ in 0..noise_repeat {
550                    lines.push(noise_line.clone());
551                }
552                lines.push(sig.clone());
553            }
554            // Trailing noise block
555            for _ in 0..noise_repeat {
556                lines.push(noise_line.clone());
557            }
558
559            let input = lines.join("\n");
560            let result = pipeline.compress(&input, &ctx(), &preset).unwrap();
561            let output = &result.data;
562
563            // 1. All significant lines must appear in the output.
564            for sig in &significant_lines {
565                prop_assert!(
566                    output.contains(sig.as_str()),
567                    "significant line missing from output: {:?}\noutput: {:?}",
568                    sig,
569                    output
570                );
571            }
572
573            // 2. No consecutive run of the noise line exceeds max_repeated_lines (3).
574            let mut max_run = 0usize;
575            let mut current_run = 0usize;
576            for line in output.lines() {
577                if line == noise_line.as_str() {
578                    current_run += 1;
579                    max_run = max_run.max(current_run);
580                } else {
581                    current_run = 0;
582                }
583            }
584            prop_assert!(
585                max_run <= 3,
586                "noise line {:?} has a consecutive run of {} (max 3)\noutput: {:?}",
587                noise_line,
588                max_run,
589                output
590            );
591        }
592    }
593}