Skip to main content

sqz_engine/
stages.rs

1use crate::error::{Result, SqzError};
2use crate::toon::ToonEncoder;
3use crate::types::{Content, ContentType, StageConfig};
4
5/// A single compression stage in the pipeline.
6///
7/// Each stage transforms `Content` in place according to its `StageConfig`.
8/// Stages must check `config.enabled` and return early (no-op) when disabled.
9pub trait CompressionStage: Send + Sync {
10    fn name(&self) -> &str;
11    fn priority(&self) -> u32;
12    fn process(&self, content: &mut Content, config: &StageConfig) -> Result<()>;
13}
14
15// ---------------------------------------------------------------------------
16// Helper: parse raw as JSON, apply a transform, serialize back
17// ---------------------------------------------------------------------------
18
19fn with_json<F>(content: &mut Content, f: F) -> Result<()>
20where
21    F: FnOnce(&mut serde_json::Value) -> Result<()>,
22{
23    if !ToonEncoder::is_json(&content.raw) {
24        return Ok(());
25    }
26    let mut value: serde_json::Value = serde_json::from_str(&content.raw)?;
27    f(&mut value)?;
28    content.raw = serde_json::to_string(&value)?;
29    Ok(())
30}
31
32// ---------------------------------------------------------------------------
33// Stage 1: keep_fields
34// ---------------------------------------------------------------------------
35
36/// For JSON content, keep only the specified top-level fields; drop all others.
37/// Config options: `fields` — array of field name strings.
38/// Non-JSON content passes through unchanged.
39pub struct KeepFieldsStage;
40
41impl CompressionStage for KeepFieldsStage {
42    fn name(&self) -> &str {
43        "keep_fields"
44    }
45
46    fn priority(&self) -> u32 {
47        10
48    }
49
50    fn process(&self, content: &mut Content, config: &StageConfig) -> Result<()> {
51        if !config.enabled {
52            return Ok(());
53        }
54        let fields: Vec<String> = match config.options.get("fields") {
55            Some(v) => serde_json::from_value(v.clone())
56                .map_err(|e| SqzError::Other(format!("keep_fields: invalid fields option: {e}")))?,
57            None => return Ok(()),
58        };
59        if fields.is_empty() {
60            return Ok(());
61        }
62        with_json(content, |value| {
63            if let serde_json::Value::Object(map) = value {
64                map.retain(|k, _| fields.contains(k));
65            }
66            Ok(())
67        })
68    }
69}
70
71// ---------------------------------------------------------------------------
72// Stage 2: strip_fields
73// ---------------------------------------------------------------------------
74
75/// For JSON content, remove specified fields by key name.
76/// Supports dot-notation for nested fields (e.g. "metadata.internal_id").
77/// Config options: `fields` — array of field path strings.
78/// Non-JSON content passes through unchanged.
79pub struct StripFieldsStage;
80
81fn strip_field_path(value: &mut serde_json::Value, path: &[&str]) {
82    if path.is_empty() {
83        return;
84    }
85    if let serde_json::Value::Object(map) = value {
86        if path.len() == 1 {
87            map.remove(path[0]);
88        } else {
89            if let Some(child) = map.get_mut(path[0]) {
90                strip_field_path(child, &path[1..]);
91            }
92        }
93    }
94}
95
96impl CompressionStage for StripFieldsStage {
97    fn name(&self) -> &str {
98        "strip_fields"
99    }
100
101    fn priority(&self) -> u32 {
102        20
103    }
104
105    fn process(&self, content: &mut Content, config: &StageConfig) -> Result<()> {
106        if !config.enabled {
107            return Ok(());
108        }
109        let fields: Vec<String> = match config.options.get("fields") {
110            Some(v) => serde_json::from_value(v.clone())
111                .map_err(|e| SqzError::Other(format!("strip_fields: invalid fields option: {e}")))?,
112            None => return Ok(()),
113        };
114        if fields.is_empty() {
115            return Ok(());
116        }
117        with_json(content, |value| {
118            for field in &fields {
119                let parts: Vec<&str> = field.split('.').collect();
120                strip_field_path(value, &parts);
121            }
122            Ok(())
123        })
124    }
125}
126
127// ---------------------------------------------------------------------------
128// Stage 3: condense
129// ---------------------------------------------------------------------------
130
131/// For plain text / CLI output, collapse runs of repeated identical lines
132/// down to at most `max_repeated_lines`.
133/// Config options: `max_repeated_lines` (u32, default 3).
134/// Non-plain-text content passes through unchanged.
135pub struct CondenseStage;
136
137impl CompressionStage for CondenseStage {
138    fn name(&self) -> &str {
139        "condense"
140    }
141
142    fn priority(&self) -> u32 {
143        30
144    }
145
146    fn process(&self, content: &mut Content, config: &StageConfig) -> Result<()> {
147        if !config.enabled {
148            return Ok(());
149        }
150        // Only apply to plain text and CLI output
151        match &content.content_type {
152            ContentType::PlainText | ContentType::CliOutput { .. } => {}
153            _ => return Ok(()),
154        }
155
156        let max_repeated: u32 = config
157            .options
158            .get("max_repeated_lines")
159            .and_then(|v| v.as_u64())
160            .map(|v| v as u32)
161            .unwrap_or(3);
162
163        let mut result = Vec::new();
164        let mut current_line: Option<&str> = None;
165        let mut run_count: u32 = 0;
166
167        for line in content.raw.lines() {
168            match current_line {
169                Some(prev) if prev == line => {
170                    run_count += 1;
171                    if run_count <= max_repeated {
172                        result.push(line);
173                    }
174                }
175                _ => {
176                    current_line = Some(line);
177                    run_count = 1;
178                    result.push(line);
179                }
180            }
181        }
182
183        // Preserve trailing newline if original had one
184        let trailing_newline = content.raw.ends_with('\n');
185        content.raw = result.join("\n");
186        if trailing_newline {
187            content.raw.push('\n');
188        }
189        Ok(())
190    }
191}
192
193// ---------------------------------------------------------------------------
194// Stage 4: strip_nulls
195// ---------------------------------------------------------------------------
196
197/// For JSON content, recursively remove all null-valued fields from objects.
198/// Arrays keep their null elements.
199/// Config options: `enabled` (bool).
200pub struct StripNullsStage;
201
202fn strip_nulls_recursive(value: &mut serde_json::Value) {
203    match value {
204        serde_json::Value::Object(map) => {
205            map.retain(|_, v| !v.is_null());
206            for v in map.values_mut() {
207                strip_nulls_recursive(v);
208            }
209        }
210        serde_json::Value::Array(arr) => {
211            for item in arr.iter_mut() {
212                strip_nulls_recursive(item);
213            }
214        }
215        _ => {}
216    }
217}
218
219impl CompressionStage for StripNullsStage {
220    fn name(&self) -> &str {
221        "strip_nulls"
222    }
223
224    fn priority(&self) -> u32 {
225        40
226    }
227
228    fn process(&self, content: &mut Content, config: &StageConfig) -> Result<()> {
229        if !config.enabled {
230            return Ok(());
231        }
232        with_json(content, |value| {
233            strip_nulls_recursive(value);
234            Ok(())
235        })
236    }
237}
238
239// ---------------------------------------------------------------------------
240// Stage 5: flatten
241// ---------------------------------------------------------------------------
242
243/// For JSON content, flatten nested objects up to `max_depth` levels using
244/// dot-notation for flattened keys (e.g. `{"a":{"b":1}}` → `{"a.b":1}`).
245/// Config options: `max_depth` (u32, default 3).
246/// Non-JSON content passes through unchanged.
247pub struct FlattenStage;
248
249fn flatten_value(
250    value: &serde_json::Value,
251    prefix: &str,
252    depth: u32,
253    max_depth: u32,
254    out: &mut serde_json::Map<String, serde_json::Value>,
255) {
256    if let serde_json::Value::Object(map) = value {
257        if depth < max_depth {
258            for (k, v) in map {
259                let new_key = if prefix.is_empty() {
260                    k.clone()
261                } else {
262                    format!("{prefix}.{k}")
263                };
264                flatten_value(v, &new_key, depth + 1, max_depth, out);
265            }
266            return;
267        }
268    }
269    out.insert(prefix.to_owned(), value.clone());
270}
271
272impl CompressionStage for FlattenStage {
273    fn name(&self) -> &str {
274        "flatten"
275    }
276
277    fn priority(&self) -> u32 {
278        50
279    }
280
281    fn process(&self, content: &mut Content, config: &StageConfig) -> Result<()> {
282        if !config.enabled {
283            return Ok(());
284        }
285        let max_depth: u32 = config
286            .options
287            .get("max_depth")
288            .and_then(|v| v.as_u64())
289            .map(|v| v as u32)
290            .unwrap_or(3);
291
292        with_json(content, |value| {
293            if let serde_json::Value::Object(map) = value {
294                let mut out = serde_json::Map::new();
295                for (k, v) in map.iter() {
296                    flatten_value(v, k, 1, max_depth, &mut out);
297                }
298                *map = out;
299            }
300            Ok(())
301        })
302    }
303}
304
305// ---------------------------------------------------------------------------
306// Stage 6: truncate_strings
307// ---------------------------------------------------------------------------
308
309/// For JSON content, truncate string values longer than `max_length` chars,
310/// appending "..." to indicate truncation.
311/// Config options: `max_length` (u32, default 500).
312/// Non-JSON content passes through unchanged.
313pub struct TruncateStringsStage;
314
315fn truncate_strings_recursive(value: &mut serde_json::Value, max_length: usize) {
316    match value {
317        serde_json::Value::String(s) => {
318            if s.chars().count() > max_length {
319                let truncated: String = s.chars().take(max_length).collect();
320                *s = format!("{truncated}...");
321            }
322        }
323        serde_json::Value::Object(map) => {
324            for v in map.values_mut() {
325                truncate_strings_recursive(v, max_length);
326            }
327        }
328        serde_json::Value::Array(arr) => {
329            for item in arr.iter_mut() {
330                truncate_strings_recursive(item, max_length);
331            }
332        }
333        _ => {}
334    }
335}
336
337impl CompressionStage for TruncateStringsStage {
338    fn name(&self) -> &str {
339        "truncate_strings"
340    }
341
342    fn priority(&self) -> u32 {
343        60
344    }
345
346    fn process(&self, content: &mut Content, config: &StageConfig) -> Result<()> {
347        if !config.enabled {
348            return Ok(());
349        }
350        let max_length: usize = config
351            .options
352            .get("max_length")
353            .and_then(|v| v.as_u64())
354            .map(|v| v as usize)
355            .unwrap_or(500);
356
357        with_json(content, |value| {
358            truncate_strings_recursive(value, max_length);
359            Ok(())
360        })
361    }
362}
363
364// ---------------------------------------------------------------------------
365// Stage 7: collapse_arrays
366// ---------------------------------------------------------------------------
367
368/// For JSON content, if an array has more than `max_items` elements, keep the
369/// first `max_items` and replace the rest with a summary string element.
370/// Config options:
371///   - `max_items` (u32, default 5)
372///   - `summary_template` (string, default "... and {remaining} more items")
373/// Non-JSON content passes through unchanged.
374pub struct CollapseArraysStage;
375
376fn collapse_arrays_recursive(
377    value: &mut serde_json::Value,
378    max_items: usize,
379    summary_template: &str,
380) {
381    match value {
382        serde_json::Value::Array(arr) => {
383            // First recurse into existing items
384            for item in arr.iter_mut() {
385                collapse_arrays_recursive(item, max_items, summary_template);
386            }
387            // Then collapse if needed
388            if arr.len() > max_items {
389                let remaining = arr.len() - max_items;
390                arr.truncate(max_items);
391                let summary = summary_template.replace("{remaining}", &remaining.to_string());
392                arr.push(serde_json::Value::String(summary));
393            }
394        }
395        serde_json::Value::Object(map) => {
396            for v in map.values_mut() {
397                collapse_arrays_recursive(v, max_items, summary_template);
398            }
399        }
400        _ => {}
401    }
402}
403
404impl CompressionStage for CollapseArraysStage {
405    fn name(&self) -> &str {
406        "collapse_arrays"
407    }
408
409    fn priority(&self) -> u32 {
410        70
411    }
412
413    fn process(&self, content: &mut Content, config: &StageConfig) -> Result<()> {
414        if !config.enabled {
415            return Ok(());
416        }
417        let max_items: usize = config
418            .options
419            .get("max_items")
420            .and_then(|v| v.as_u64())
421            .map(|v| v as usize)
422            .unwrap_or(5);
423        let summary_template = config
424            .options
425            .get("summary_template")
426            .and_then(|v| v.as_str())
427            .unwrap_or("... and {remaining} more items")
428            .to_owned();
429
430        with_json(content, |value| {
431            collapse_arrays_recursive(value, max_items, &summary_template);
432            Ok(())
433        })
434    }
435}
436
437// ---------------------------------------------------------------------------
438// Stage 8: custom_transforms
439// ---------------------------------------------------------------------------
440
441/// No-op stage that serves as the insertion point for plugin stages.
442/// Passes content through unchanged.
443pub struct CustomTransformsStage;
444
445impl CompressionStage for CustomTransformsStage {
446    fn name(&self) -> &str {
447        "custom_transforms"
448    }
449
450    fn priority(&self) -> u32 {
451        80
452    }
453
454    fn process(&self, _content: &mut Content, config: &StageConfig) -> Result<()> {
455        if !config.enabled {
456            return Ok(());
457        }
458        // No-op: plugin stages are inserted here by the pipeline
459        Ok(())
460    }
461}
462
463// ---------------------------------------------------------------------------
464// Tests
465// ---------------------------------------------------------------------------
466
467#[cfg(test)]
468mod tests {
469    use super::*;
470    use crate::types::{ContentMetadata, ContentType};
471    use serde_json::json;
472
473    fn json_content(raw: &str) -> Content {
474        Content {
475            raw: raw.to_owned(),
476            content_type: ContentType::Json,
477            metadata: ContentMetadata {
478                source: None,
479                path: None,
480                language: None,
481            },
482            tokens_original: 0,
483        }
484    }
485
486    fn text_content(raw: &str) -> Content {
487        Content {
488            raw: raw.to_owned(),
489            content_type: ContentType::PlainText,
490            metadata: ContentMetadata {
491                source: None,
492                path: None,
493                language: None,
494            },
495            tokens_original: 0,
496        }
497    }
498
499    fn enabled_config(options: serde_json::Value) -> StageConfig {
500        StageConfig {
501            enabled: true,
502            options,
503        }
504    }
505
506    fn disabled_config() -> StageConfig {
507        StageConfig {
508            enabled: false,
509            options: json!({}),
510        }
511    }
512
513    // --- keep_fields ---
514
515    #[test]
516    fn keep_fields_retains_specified() {
517        let mut c = json_content(r#"{"id":1,"name":"Alice","debug":"x"}"#);
518        let cfg = enabled_config(json!({"fields": ["id", "name"]}));
519        KeepFieldsStage.process(&mut c, &cfg).unwrap();
520        let v: serde_json::Value = serde_json::from_str(&c.raw).unwrap();
521        assert_eq!(v, json!({"id":1,"name":"Alice"}));
522    }
523
524    #[test]
525    fn keep_fields_disabled_passthrough() {
526        let raw = r#"{"id":1,"name":"Alice"}"#;
527        let mut c = json_content(raw);
528        KeepFieldsStage.process(&mut c, &disabled_config()).unwrap();
529        assert_eq!(c.raw, raw);
530    }
531
532    #[test]
533    fn keep_fields_non_json_passthrough() {
534        let raw = "not json at all";
535        let mut c = text_content(raw);
536        let cfg = enabled_config(json!({"fields": ["id"]}));
537        KeepFieldsStage.process(&mut c, &cfg).unwrap();
538        assert_eq!(c.raw, raw);
539    }
540
541    // --- strip_fields ---
542
543    #[test]
544    fn strip_fields_removes_top_level() {
545        let mut c = json_content(r#"{"id":1,"debug":"x","name":"Bob"}"#);
546        let cfg = enabled_config(json!({"fields": ["debug"]}));
547        StripFieldsStage.process(&mut c, &cfg).unwrap();
548        let v: serde_json::Value = serde_json::from_str(&c.raw).unwrap();
549        assert_eq!(v, json!({"id":1,"name":"Bob"}));
550    }
551
552    #[test]
553    fn strip_fields_dot_notation() {
554        let mut c = json_content(r#"{"metadata":{"internal_id":"x","public":"y"},"name":"Bob"}"#);
555        let cfg = enabled_config(json!({"fields": ["metadata.internal_id"]}));
556        StripFieldsStage.process(&mut c, &cfg).unwrap();
557        let v: serde_json::Value = serde_json::from_str(&c.raw).unwrap();
558        assert_eq!(v, json!({"metadata":{"public":"y"},"name":"Bob"}));
559    }
560
561    #[test]
562    fn strip_fields_disabled_passthrough() {
563        let raw = r#"{"id":1}"#;
564        let mut c = json_content(raw);
565        StripFieldsStage.process(&mut c, &disabled_config()).unwrap();
566        assert_eq!(c.raw, raw);
567    }
568
569    // --- condense ---
570
571    #[test]
572    fn condense_collapses_repeated_lines() {
573        let raw = "a\na\na\na\na\nb\n";
574        let mut c = text_content(raw);
575        let cfg = enabled_config(json!({"max_repeated_lines": 3}));
576        CondenseStage.process(&mut c, &cfg).unwrap();
577        assert_eq!(c.raw, "a\na\na\nb\n");
578    }
579
580    #[test]
581    fn condense_keeps_up_to_max() {
582        let raw = "x\nx\nx\n";
583        let mut c = text_content(raw);
584        let cfg = enabled_config(json!({"max_repeated_lines": 3}));
585        CondenseStage.process(&mut c, &cfg).unwrap();
586        assert_eq!(c.raw, "x\nx\nx\n");
587    }
588
589    #[test]
590    fn condense_disabled_passthrough() {
591        let raw = "a\na\na\na\n";
592        let mut c = text_content(raw);
593        CondenseStage.process(&mut c, &disabled_config()).unwrap();
594        assert_eq!(c.raw, raw);
595    }
596
597    #[test]
598    fn condense_skips_json() {
599        let raw = r#"{"a":1}"#;
600        let mut c = json_content(raw);
601        let cfg = enabled_config(json!({"max_repeated_lines": 1}));
602        CondenseStage.process(&mut c, &cfg).unwrap();
603        assert_eq!(c.raw, raw);
604    }
605
606    // --- strip_nulls ---
607
608    #[test]
609    fn strip_nulls_removes_null_fields() {
610        let mut c = json_content(r#"{"a":1,"b":null,"c":"x"}"#);
611        let cfg = enabled_config(json!({}));
612        StripNullsStage.process(&mut c, &cfg).unwrap();
613        let v: serde_json::Value = serde_json::from_str(&c.raw).unwrap();
614        assert_eq!(v, json!({"a":1,"c":"x"}));
615    }
616
617    #[test]
618    fn strip_nulls_recursive() {
619        let mut c = json_content(r#"{"a":{"b":null,"c":1}}"#);
620        let cfg = enabled_config(json!({}));
621        StripNullsStage.process(&mut c, &cfg).unwrap();
622        let v: serde_json::Value = serde_json::from_str(&c.raw).unwrap();
623        assert_eq!(v, json!({"a":{"c":1}}));
624    }
625
626    #[test]
627    fn strip_nulls_keeps_null_in_arrays() {
628        let mut c = json_content(r#"{"arr":[1,null,2]}"#);
629        let cfg = enabled_config(json!({}));
630        StripNullsStage.process(&mut c, &cfg).unwrap();
631        let v: serde_json::Value = serde_json::from_str(&c.raw).unwrap();
632        assert_eq!(v, json!({"arr":[1,null,2]}));
633    }
634
635    #[test]
636    fn strip_nulls_disabled_passthrough() {
637        let raw = r#"{"a":null}"#;
638        let mut c = json_content(raw);
639        StripNullsStage.process(&mut c, &disabled_config()).unwrap();
640        assert_eq!(c.raw, raw);
641    }
642
643    // --- flatten ---
644
645    #[test]
646    fn flatten_nested_object() {
647        let mut c = json_content(r#"{"a":{"b":{"c":1}}}"#);
648        let cfg = enabled_config(json!({"max_depth": 3}));
649        FlattenStage.process(&mut c, &cfg).unwrap();
650        let v: serde_json::Value = serde_json::from_str(&c.raw).unwrap();
651        assert_eq!(v, json!({"a.b.c":1}));
652    }
653
654    #[test]
655    fn flatten_respects_max_depth() {
656        let mut c = json_content(r#"{"a":{"b":{"c":1}}}"#);
657        let cfg = enabled_config(json!({"max_depth": 1}));
658        FlattenStage.process(&mut c, &cfg).unwrap();
659        let v: serde_json::Value = serde_json::from_str(&c.raw).unwrap();
660        // At max_depth=1, top-level values are not descended into
661        assert_eq!(v, json!({"a":{"b":{"c":1}}}));
662    }
663
664    #[test]
665    fn flatten_disabled_passthrough() {
666        let raw = r#"{"a":{"b":1}}"#;
667        let mut c = json_content(raw);
668        FlattenStage.process(&mut c, &disabled_config()).unwrap();
669        assert_eq!(c.raw, raw);
670    }
671
672    // --- truncate_strings ---
673
674    #[test]
675    fn truncate_strings_long_value() {
676        let long = "a".repeat(600);
677        let raw = format!(r#"{{"key":"{}"}}"#, long);
678        let mut c = json_content(&raw);
679        let cfg = enabled_config(json!({"max_length": 500}));
680        TruncateStringsStage.process(&mut c, &cfg).unwrap();
681        let v: serde_json::Value = serde_json::from_str(&c.raw).unwrap();
682        let s = v["key"].as_str().unwrap();
683        assert!(s.ends_with("..."));
684        assert_eq!(s.chars().count(), 503); // 500 + "..."
685    }
686
687    #[test]
688    fn truncate_strings_short_value_unchanged() {
689        let raw = r#"{"key":"hello"}"#;
690        let mut c = json_content(raw);
691        let cfg = enabled_config(json!({"max_length": 500}));
692        TruncateStringsStage.process(&mut c, &cfg).unwrap();
693        let v: serde_json::Value = serde_json::from_str(&c.raw).unwrap();
694        assert_eq!(v["key"].as_str().unwrap(), "hello");
695    }
696
697    #[test]
698    fn truncate_strings_disabled_passthrough() {
699        let long = "a".repeat(600);
700        let raw = format!(r#"{{"key":"{}"}}"#, long);
701        let mut c = json_content(&raw);
702        TruncateStringsStage.process(&mut c, &disabled_config()).unwrap();
703        assert_eq!(c.raw, raw);
704    }
705
706    // --- collapse_arrays ---
707
708    #[test]
709    fn collapse_arrays_truncates_long_array() {
710        let mut c = json_content(r#"{"items":[1,2,3,4,5,6,7]}"#);
711        let cfg = enabled_config(json!({
712            "max_items": 5,
713            "summary_template": "... and {remaining} more items"
714        }));
715        CollapseArraysStage.process(&mut c, &cfg).unwrap();
716        let v: serde_json::Value = serde_json::from_str(&c.raw).unwrap();
717        let arr = v["items"].as_array().unwrap();
718        assert_eq!(arr.len(), 6); // 5 kept + 1 summary
719        assert_eq!(arr[5].as_str().unwrap(), "... and 2 more items");
720    }
721
722    #[test]
723    fn collapse_arrays_short_array_unchanged() {
724        let raw = r#"{"items":[1,2,3]}"#;
725        let mut c = json_content(raw);
726        let cfg = enabled_config(json!({"max_items": 5}));
727        CollapseArraysStage.process(&mut c, &cfg).unwrap();
728        let v: serde_json::Value = serde_json::from_str(&c.raw).unwrap();
729        assert_eq!(v["items"].as_array().unwrap().len(), 3);
730    }
731
732    #[test]
733    fn collapse_arrays_disabled_passthrough() {
734        let raw = r#"{"items":[1,2,3,4,5,6,7]}"#;
735        let mut c = json_content(raw);
736        CollapseArraysStage.process(&mut c, &disabled_config()).unwrap();
737        assert_eq!(c.raw, raw);
738    }
739
740    // --- custom_transforms ---
741
742    #[test]
743    fn custom_transforms_is_noop() {
744        let raw = r#"{"a":1}"#;
745        let mut c = json_content(raw);
746        let cfg = enabled_config(json!({}));
747        CustomTransformsStage.process(&mut c, &cfg).unwrap();
748        assert_eq!(c.raw, raw);
749    }
750
751    #[test]
752    fn custom_transforms_disabled_passthrough() {
753        let raw = "some text";
754        let mut c = text_content(raw);
755        CustomTransformsStage.process(&mut c, &disabled_config()).unwrap();
756        assert_eq!(c.raw, raw);
757    }
758}