1use crate::error::{Result, SqzError};
2use crate::toon::ToonEncoder;
3use crate::types::{Content, ContentType, StageConfig};
4
5pub trait CompressionStage: Send + Sync {
10 fn name(&self) -> &str;
11 fn priority(&self) -> u32;
12 fn process(&self, content: &mut Content, config: &StageConfig) -> Result<()>;
13}
14
15fn with_json<F>(content: &mut Content, f: F) -> Result<()>
20where
21 F: FnOnce(&mut serde_json::Value) -> Result<()>,
22{
23 if !ToonEncoder::is_json(&content.raw) {
24 return Ok(());
25 }
26 let mut value: serde_json::Value = serde_json::from_str(&content.raw)?;
27 f(&mut value)?;
28 content.raw = serde_json::to_string(&value)?;
29 Ok(())
30}
31
32pub struct KeepFieldsStage;
40
41impl CompressionStage for KeepFieldsStage {
42 fn name(&self) -> &str {
43 "keep_fields"
44 }
45
46 fn priority(&self) -> u32 {
47 10
48 }
49
50 fn process(&self, content: &mut Content, config: &StageConfig) -> Result<()> {
51 if !config.enabled {
52 return Ok(());
53 }
54 let fields: Vec<String> = match config.options.get("fields") {
55 Some(v) => serde_json::from_value(v.clone())
56 .map_err(|e| SqzError::Other(format!("keep_fields: invalid fields option: {e}")))?,
57 None => return Ok(()),
58 };
59 if fields.is_empty() {
60 return Ok(());
61 }
62 with_json(content, |value| {
63 if let serde_json::Value::Object(map) = value {
64 map.retain(|k, _| fields.contains(k));
65 }
66 Ok(())
67 })
68 }
69}
70
71pub struct StripFieldsStage;
80
81fn strip_field_path(value: &mut serde_json::Value, path: &[&str]) {
82 if path.is_empty() {
83 return;
84 }
85 if let serde_json::Value::Object(map) = value {
86 if path.len() == 1 {
87 map.remove(path[0]);
88 } else {
89 if let Some(child) = map.get_mut(path[0]) {
90 strip_field_path(child, &path[1..]);
91 }
92 }
93 }
94}
95
96impl CompressionStage for StripFieldsStage {
97 fn name(&self) -> &str {
98 "strip_fields"
99 }
100
101 fn priority(&self) -> u32 {
102 20
103 }
104
105 fn process(&self, content: &mut Content, config: &StageConfig) -> Result<()> {
106 if !config.enabled {
107 return Ok(());
108 }
109 let fields: Vec<String> = match config.options.get("fields") {
110 Some(v) => serde_json::from_value(v.clone())
111 .map_err(|e| SqzError::Other(format!("strip_fields: invalid fields option: {e}")))?,
112 None => return Ok(()),
113 };
114 if fields.is_empty() {
115 return Ok(());
116 }
117 with_json(content, |value| {
118 for field in &fields {
119 let parts: Vec<&str> = field.split('.').collect();
120 strip_field_path(value, &parts);
121 }
122 Ok(())
123 })
124 }
125}
126
127pub struct CondenseStage;
136
137impl CompressionStage for CondenseStage {
138 fn name(&self) -> &str {
139 "condense"
140 }
141
142 fn priority(&self) -> u32 {
143 30
144 }
145
146 fn process(&self, content: &mut Content, config: &StageConfig) -> Result<()> {
147 if !config.enabled {
148 return Ok(());
149 }
150 match &content.content_type {
152 ContentType::PlainText | ContentType::CliOutput { .. } => {}
153 _ => return Ok(()),
154 }
155
156 let max_repeated: u32 = config
157 .options
158 .get("max_repeated_lines")
159 .and_then(|v| v.as_u64())
160 .map(|v| v as u32)
161 .unwrap_or(3);
162
163 let mut result = Vec::new();
164 let mut current_line: Option<&str> = None;
165 let mut run_count: u32 = 0;
166
167 for line in content.raw.lines() {
168 match current_line {
169 Some(prev) if prev == line => {
170 run_count += 1;
171 if run_count <= max_repeated {
172 result.push(line);
173 }
174 }
175 _ => {
176 current_line = Some(line);
177 run_count = 1;
178 result.push(line);
179 }
180 }
181 }
182
183 let trailing_newline = content.raw.ends_with('\n');
185 content.raw = result.join("\n");
186 if trailing_newline {
187 content.raw.push('\n');
188 }
189 Ok(())
190 }
191}
192
193pub struct StripNullsStage;
201
202fn strip_nulls_recursive(value: &mut serde_json::Value) {
203 match value {
204 serde_json::Value::Object(map) => {
205 map.retain(|_, v| !v.is_null());
206 for v in map.values_mut() {
207 strip_nulls_recursive(v);
208 }
209 }
210 serde_json::Value::Array(arr) => {
211 for item in arr.iter_mut() {
212 strip_nulls_recursive(item);
213 }
214 }
215 _ => {}
216 }
217}
218
219impl CompressionStage for StripNullsStage {
220 fn name(&self) -> &str {
221 "strip_nulls"
222 }
223
224 fn priority(&self) -> u32 {
225 40
226 }
227
228 fn process(&self, content: &mut Content, config: &StageConfig) -> Result<()> {
229 if !config.enabled {
230 return Ok(());
231 }
232 with_json(content, |value| {
233 strip_nulls_recursive(value);
234 Ok(())
235 })
236 }
237}
238
239pub struct FlattenStage;
248
249fn flatten_value(
250 value: &serde_json::Value,
251 prefix: &str,
252 depth: u32,
253 max_depth: u32,
254 out: &mut serde_json::Map<String, serde_json::Value>,
255) {
256 if let serde_json::Value::Object(map) = value {
257 if depth < max_depth {
258 for (k, v) in map {
259 let new_key = if prefix.is_empty() {
260 k.clone()
261 } else {
262 format!("{prefix}.{k}")
263 };
264 flatten_value(v, &new_key, depth + 1, max_depth, out);
265 }
266 return;
267 }
268 }
269 out.insert(prefix.to_owned(), value.clone());
270}
271
272impl CompressionStage for FlattenStage {
273 fn name(&self) -> &str {
274 "flatten"
275 }
276
277 fn priority(&self) -> u32 {
278 50
279 }
280
281 fn process(&self, content: &mut Content, config: &StageConfig) -> Result<()> {
282 if !config.enabled {
283 return Ok(());
284 }
285 let max_depth: u32 = config
286 .options
287 .get("max_depth")
288 .and_then(|v| v.as_u64())
289 .map(|v| v as u32)
290 .unwrap_or(3);
291
292 with_json(content, |value| {
293 if let serde_json::Value::Object(map) = value {
294 let mut out = serde_json::Map::new();
295 for (k, v) in map.iter() {
296 flatten_value(v, k, 1, max_depth, &mut out);
297 }
298 *map = out;
299 }
300 Ok(())
301 })
302 }
303}
304
305pub struct TruncateStringsStage;
314
315fn truncate_strings_recursive(value: &mut serde_json::Value, max_length: usize) {
316 match value {
317 serde_json::Value::String(s) => {
318 if s.chars().count() > max_length {
319 let truncated: String = s.chars().take(max_length).collect();
320 *s = format!("{truncated}...");
321 }
322 }
323 serde_json::Value::Object(map) => {
324 for v in map.values_mut() {
325 truncate_strings_recursive(v, max_length);
326 }
327 }
328 serde_json::Value::Array(arr) => {
329 for item in arr.iter_mut() {
330 truncate_strings_recursive(item, max_length);
331 }
332 }
333 _ => {}
334 }
335}
336
337impl CompressionStage for TruncateStringsStage {
338 fn name(&self) -> &str {
339 "truncate_strings"
340 }
341
342 fn priority(&self) -> u32 {
343 60
344 }
345
346 fn process(&self, content: &mut Content, config: &StageConfig) -> Result<()> {
347 if !config.enabled {
348 return Ok(());
349 }
350 let max_length: usize = config
351 .options
352 .get("max_length")
353 .and_then(|v| v.as_u64())
354 .map(|v| v as usize)
355 .unwrap_or(500);
356
357 with_json(content, |value| {
358 truncate_strings_recursive(value, max_length);
359 Ok(())
360 })
361 }
362}
363
364pub struct CollapseArraysStage;
375
376fn collapse_arrays_recursive(
377 value: &mut serde_json::Value,
378 max_items: usize,
379 summary_template: &str,
380) {
381 match value {
382 serde_json::Value::Array(arr) => {
383 for item in arr.iter_mut() {
385 collapse_arrays_recursive(item, max_items, summary_template);
386 }
387 if arr.len() > max_items {
389 let remaining = arr.len() - max_items;
390 arr.truncate(max_items);
391 let summary = summary_template.replace("{remaining}", &remaining.to_string());
392 arr.push(serde_json::Value::String(summary));
393 }
394 }
395 serde_json::Value::Object(map) => {
396 for v in map.values_mut() {
397 collapse_arrays_recursive(v, max_items, summary_template);
398 }
399 }
400 _ => {}
401 }
402}
403
404impl CompressionStage for CollapseArraysStage {
405 fn name(&self) -> &str {
406 "collapse_arrays"
407 }
408
409 fn priority(&self) -> u32 {
410 70
411 }
412
413 fn process(&self, content: &mut Content, config: &StageConfig) -> Result<()> {
414 if !config.enabled {
415 return Ok(());
416 }
417 let max_items: usize = config
418 .options
419 .get("max_items")
420 .and_then(|v| v.as_u64())
421 .map(|v| v as usize)
422 .unwrap_or(5);
423 let summary_template = config
424 .options
425 .get("summary_template")
426 .and_then(|v| v.as_str())
427 .unwrap_or("... and {remaining} more items")
428 .to_owned();
429
430 with_json(content, |value| {
431 collapse_arrays_recursive(value, max_items, &summary_template);
432 Ok(())
433 })
434 }
435}
436
437pub struct CustomTransformsStage;
444
445impl CompressionStage for CustomTransformsStage {
446 fn name(&self) -> &str {
447 "custom_transforms"
448 }
449
450 fn priority(&self) -> u32 {
451 80
452 }
453
454 fn process(&self, _content: &mut Content, config: &StageConfig) -> Result<()> {
455 if !config.enabled {
456 return Ok(());
457 }
458 Ok(())
460 }
461}
462
463#[cfg(test)]
468mod tests {
469 use super::*;
470 use crate::types::{ContentMetadata, ContentType};
471 use serde_json::json;
472
473 fn json_content(raw: &str) -> Content {
474 Content {
475 raw: raw.to_owned(),
476 content_type: ContentType::Json,
477 metadata: ContentMetadata {
478 source: None,
479 path: None,
480 language: None,
481 },
482 tokens_original: 0,
483 }
484 }
485
486 fn text_content(raw: &str) -> Content {
487 Content {
488 raw: raw.to_owned(),
489 content_type: ContentType::PlainText,
490 metadata: ContentMetadata {
491 source: None,
492 path: None,
493 language: None,
494 },
495 tokens_original: 0,
496 }
497 }
498
499 fn enabled_config(options: serde_json::Value) -> StageConfig {
500 StageConfig {
501 enabled: true,
502 options,
503 }
504 }
505
506 fn disabled_config() -> StageConfig {
507 StageConfig {
508 enabled: false,
509 options: json!({}),
510 }
511 }
512
513 #[test]
516 fn keep_fields_retains_specified() {
517 let mut c = json_content(r#"{"id":1,"name":"Alice","debug":"x"}"#);
518 let cfg = enabled_config(json!({"fields": ["id", "name"]}));
519 KeepFieldsStage.process(&mut c, &cfg).unwrap();
520 let v: serde_json::Value = serde_json::from_str(&c.raw).unwrap();
521 assert_eq!(v, json!({"id":1,"name":"Alice"}));
522 }
523
524 #[test]
525 fn keep_fields_disabled_passthrough() {
526 let raw = r#"{"id":1,"name":"Alice"}"#;
527 let mut c = json_content(raw);
528 KeepFieldsStage.process(&mut c, &disabled_config()).unwrap();
529 assert_eq!(c.raw, raw);
530 }
531
532 #[test]
533 fn keep_fields_non_json_passthrough() {
534 let raw = "not json at all";
535 let mut c = text_content(raw);
536 let cfg = enabled_config(json!({"fields": ["id"]}));
537 KeepFieldsStage.process(&mut c, &cfg).unwrap();
538 assert_eq!(c.raw, raw);
539 }
540
541 #[test]
544 fn strip_fields_removes_top_level() {
545 let mut c = json_content(r#"{"id":1,"debug":"x","name":"Bob"}"#);
546 let cfg = enabled_config(json!({"fields": ["debug"]}));
547 StripFieldsStage.process(&mut c, &cfg).unwrap();
548 let v: serde_json::Value = serde_json::from_str(&c.raw).unwrap();
549 assert_eq!(v, json!({"id":1,"name":"Bob"}));
550 }
551
552 #[test]
553 fn strip_fields_dot_notation() {
554 let mut c = json_content(r#"{"metadata":{"internal_id":"x","public":"y"},"name":"Bob"}"#);
555 let cfg = enabled_config(json!({"fields": ["metadata.internal_id"]}));
556 StripFieldsStage.process(&mut c, &cfg).unwrap();
557 let v: serde_json::Value = serde_json::from_str(&c.raw).unwrap();
558 assert_eq!(v, json!({"metadata":{"public":"y"},"name":"Bob"}));
559 }
560
561 #[test]
562 fn strip_fields_disabled_passthrough() {
563 let raw = r#"{"id":1}"#;
564 let mut c = json_content(raw);
565 StripFieldsStage.process(&mut c, &disabled_config()).unwrap();
566 assert_eq!(c.raw, raw);
567 }
568
569 #[test]
572 fn condense_collapses_repeated_lines() {
573 let raw = "a\na\na\na\na\nb\n";
574 let mut c = text_content(raw);
575 let cfg = enabled_config(json!({"max_repeated_lines": 3}));
576 CondenseStage.process(&mut c, &cfg).unwrap();
577 assert_eq!(c.raw, "a\na\na\nb\n");
578 }
579
580 #[test]
581 fn condense_keeps_up_to_max() {
582 let raw = "x\nx\nx\n";
583 let mut c = text_content(raw);
584 let cfg = enabled_config(json!({"max_repeated_lines": 3}));
585 CondenseStage.process(&mut c, &cfg).unwrap();
586 assert_eq!(c.raw, "x\nx\nx\n");
587 }
588
589 #[test]
590 fn condense_disabled_passthrough() {
591 let raw = "a\na\na\na\n";
592 let mut c = text_content(raw);
593 CondenseStage.process(&mut c, &disabled_config()).unwrap();
594 assert_eq!(c.raw, raw);
595 }
596
597 #[test]
598 fn condense_skips_json() {
599 let raw = r#"{"a":1}"#;
600 let mut c = json_content(raw);
601 let cfg = enabled_config(json!({"max_repeated_lines": 1}));
602 CondenseStage.process(&mut c, &cfg).unwrap();
603 assert_eq!(c.raw, raw);
604 }
605
606 #[test]
609 fn strip_nulls_removes_null_fields() {
610 let mut c = json_content(r#"{"a":1,"b":null,"c":"x"}"#);
611 let cfg = enabled_config(json!({}));
612 StripNullsStage.process(&mut c, &cfg).unwrap();
613 let v: serde_json::Value = serde_json::from_str(&c.raw).unwrap();
614 assert_eq!(v, json!({"a":1,"c":"x"}));
615 }
616
617 #[test]
618 fn strip_nulls_recursive() {
619 let mut c = json_content(r#"{"a":{"b":null,"c":1}}"#);
620 let cfg = enabled_config(json!({}));
621 StripNullsStage.process(&mut c, &cfg).unwrap();
622 let v: serde_json::Value = serde_json::from_str(&c.raw).unwrap();
623 assert_eq!(v, json!({"a":{"c":1}}));
624 }
625
626 #[test]
627 fn strip_nulls_keeps_null_in_arrays() {
628 let mut c = json_content(r#"{"arr":[1,null,2]}"#);
629 let cfg = enabled_config(json!({}));
630 StripNullsStage.process(&mut c, &cfg).unwrap();
631 let v: serde_json::Value = serde_json::from_str(&c.raw).unwrap();
632 assert_eq!(v, json!({"arr":[1,null,2]}));
633 }
634
635 #[test]
636 fn strip_nulls_disabled_passthrough() {
637 let raw = r#"{"a":null}"#;
638 let mut c = json_content(raw);
639 StripNullsStage.process(&mut c, &disabled_config()).unwrap();
640 assert_eq!(c.raw, raw);
641 }
642
643 #[test]
646 fn flatten_nested_object() {
647 let mut c = json_content(r#"{"a":{"b":{"c":1}}}"#);
648 let cfg = enabled_config(json!({"max_depth": 3}));
649 FlattenStage.process(&mut c, &cfg).unwrap();
650 let v: serde_json::Value = serde_json::from_str(&c.raw).unwrap();
651 assert_eq!(v, json!({"a.b.c":1}));
652 }
653
654 #[test]
655 fn flatten_respects_max_depth() {
656 let mut c = json_content(r#"{"a":{"b":{"c":1}}}"#);
657 let cfg = enabled_config(json!({"max_depth": 1}));
658 FlattenStage.process(&mut c, &cfg).unwrap();
659 let v: serde_json::Value = serde_json::from_str(&c.raw).unwrap();
660 assert_eq!(v, json!({"a":{"b":{"c":1}}}));
662 }
663
664 #[test]
665 fn flatten_disabled_passthrough() {
666 let raw = r#"{"a":{"b":1}}"#;
667 let mut c = json_content(raw);
668 FlattenStage.process(&mut c, &disabled_config()).unwrap();
669 assert_eq!(c.raw, raw);
670 }
671
672 #[test]
675 fn truncate_strings_long_value() {
676 let long = "a".repeat(600);
677 let raw = format!(r#"{{"key":"{}"}}"#, long);
678 let mut c = json_content(&raw);
679 let cfg = enabled_config(json!({"max_length": 500}));
680 TruncateStringsStage.process(&mut c, &cfg).unwrap();
681 let v: serde_json::Value = serde_json::from_str(&c.raw).unwrap();
682 let s = v["key"].as_str().unwrap();
683 assert!(s.ends_with("..."));
684 assert_eq!(s.chars().count(), 503); }
686
687 #[test]
688 fn truncate_strings_short_value_unchanged() {
689 let raw = r#"{"key":"hello"}"#;
690 let mut c = json_content(raw);
691 let cfg = enabled_config(json!({"max_length": 500}));
692 TruncateStringsStage.process(&mut c, &cfg).unwrap();
693 let v: serde_json::Value = serde_json::from_str(&c.raw).unwrap();
694 assert_eq!(v["key"].as_str().unwrap(), "hello");
695 }
696
697 #[test]
698 fn truncate_strings_disabled_passthrough() {
699 let long = "a".repeat(600);
700 let raw = format!(r#"{{"key":"{}"}}"#, long);
701 let mut c = json_content(&raw);
702 TruncateStringsStage.process(&mut c, &disabled_config()).unwrap();
703 assert_eq!(c.raw, raw);
704 }
705
706 #[test]
709 fn collapse_arrays_truncates_long_array() {
710 let mut c = json_content(r#"{"items":[1,2,3,4,5,6,7]}"#);
711 let cfg = enabled_config(json!({
712 "max_items": 5,
713 "summary_template": "... and {remaining} more items"
714 }));
715 CollapseArraysStage.process(&mut c, &cfg).unwrap();
716 let v: serde_json::Value = serde_json::from_str(&c.raw).unwrap();
717 let arr = v["items"].as_array().unwrap();
718 assert_eq!(arr.len(), 6); assert_eq!(arr[5].as_str().unwrap(), "... and 2 more items");
720 }
721
722 #[test]
723 fn collapse_arrays_short_array_unchanged() {
724 let raw = r#"{"items":[1,2,3]}"#;
725 let mut c = json_content(raw);
726 let cfg = enabled_config(json!({"max_items": 5}));
727 CollapseArraysStage.process(&mut c, &cfg).unwrap();
728 let v: serde_json::Value = serde_json::from_str(&c.raw).unwrap();
729 assert_eq!(v["items"].as_array().unwrap().len(), 3);
730 }
731
732 #[test]
733 fn collapse_arrays_disabled_passthrough() {
734 let raw = r#"{"items":[1,2,3,4,5,6,7]}"#;
735 let mut c = json_content(raw);
736 CollapseArraysStage.process(&mut c, &disabled_config()).unwrap();
737 assert_eq!(c.raw, raw);
738 }
739
740 #[test]
743 fn custom_transforms_is_noop() {
744 let raw = r#"{"a":1}"#;
745 let mut c = json_content(raw);
746 let cfg = enabled_config(json!({}));
747 CustomTransformsStage.process(&mut c, &cfg).unwrap();
748 assert_eq!(c.raw, raw);
749 }
750
751 #[test]
752 fn custom_transforms_disabled_passthrough() {
753 let raw = "some text";
754 let mut c = text_content(raw);
755 CustomTransformsStage.process(&mut c, &disabled_config()).unwrap();
756 assert_eq!(c.raw, raw);
757 }
758}