Skip to main content

pick/
streaming.rs

1use std::io::BufRead;
2
3use crate::cli::OutputFormat;
4use crate::error::PickError;
5use crate::output;
6use crate::selector::{Expression, execute};
7
8/// Process input as a stream of JSON values (one per line / JSONL).
9/// Applies the expression to each value and writes results immediately.
10///
11/// Time: O(n) total where n = input size. Memory: O(1) per line.
12pub fn stream_process(
13    reader: impl BufRead,
14    expression: &Expression,
15    as_json: bool,
16    as_lines: bool,
17    output_format: &OutputFormat,
18) -> Result<(), PickError> {
19    for line in reader.lines() {
20        let line = line.map_err(PickError::Io)?;
21        let trimmed = line.trim();
22        if trimmed.is_empty() {
23            continue;
24        }
25
26        let value: serde_json::Value = serde_json::from_str(trimmed)
27            .map_err(|e| PickError::ParseError("json".into(), e.to_string()))?;
28
29        let results = execute(&value, expression)?;
30        if results.is_empty() {
31            continue;
32        }
33
34        let formatted = output::format_output(&results, as_json, as_lines, output_format);
35        if !formatted.is_empty() {
36            println!("{formatted}");
37        }
38    }
39
40    Ok(())
41}
42
43#[cfg(test)]
44mod tests {
45    use super::*;
46    use std::io::Cursor;
47
48    #[test]
49    fn stream_single_line() {
50        let input = Cursor::new("{\"name\": \"Alice\"}\n");
51        let expr = Expression::parse("name").unwrap();
52        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
53        assert!(result.is_ok());
54    }
55
56    #[test]
57    fn stream_multiple_lines() {
58        let input = Cursor::new("{\"x\": 1}\n{\"x\": 2}\n{\"x\": 3}\n");
59        let expr = Expression::parse("x").unwrap();
60        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
61        assert!(result.is_ok());
62    }
63
64    #[test]
65    fn stream_skips_empty_lines() {
66        let input = Cursor::new("\n{\"x\": 1}\n\n{\"x\": 2}\n\n");
67        let expr = Expression::parse("x").unwrap();
68        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
69        assert!(result.is_ok());
70    }
71
72    #[test]
73    fn stream_invalid_json_error() {
74        let input = Cursor::new("not json\n");
75        let expr = Expression::parse("x").unwrap();
76        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
77        assert!(result.is_err());
78    }
79
80    // ══════════════════════════════════════════════
81    // Additional coverage tests
82    // ══════════════════════════════════════════════
83
84    #[test]
85    fn stream_whitespace_only_lines() {
86        let input = Cursor::new("   \n  \t  \n{\"x\": 1}\n   \n");
87        let expr = Expression::parse("x").unwrap();
88        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
89        assert!(result.is_ok());
90    }
91
92    #[test]
93    fn stream_with_pipeline() {
94        let input = Cursor::new("{\"items\": [{\"name\": \"a\"}, {\"name\": \"b\"}]}\n");
95        let expr = Expression::parse("items[*] | name").unwrap();
96        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
97        assert!(result.is_ok());
98    }
99
100    #[test]
101    fn stream_with_select() {
102        let input = Cursor::new("{\"price\": 50}\n{\"price\": 150}\n{\"price\": 200}\n");
103        let expr = Expression::parse("price").unwrap();
104        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
105        assert!(result.is_ok());
106    }
107
108    #[test]
109    fn stream_with_select_filter() {
110        let input = Cursor::new(
111            "{\"name\": \"a\", \"active\": true}\n{\"name\": \"b\", \"active\": false}\n",
112        );
113        let expr = Expression::parse("select(.active) | name").unwrap();
114        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
115        assert!(result.is_ok());
116    }
117
118    #[test]
119    fn stream_with_set() {
120        let input = Cursor::new("{\"name\": \"Alice\"}\n");
121        let expr = Expression::parse("set(.greeting, \"hello\") | greeting").unwrap();
122        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
123        assert!(result.is_ok());
124    }
125
126    #[test]
127    fn stream_with_del() {
128        let input = Cursor::new("{\"name\": \"Alice\", \"temp\": \"x\"}\n");
129        let expr = Expression::parse("del(.temp)").unwrap();
130        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
131        assert!(result.is_ok());
132    }
133
134    #[test]
135    fn stream_no_match_lines() {
136        // Key not found → should still be Ok (empty results are skipped)
137        let input = Cursor::new("{\"a\": 1}\n{\"a\": 2}\n");
138        let expr = Expression::parse("b").unwrap();
139        // extract will error per line but stream_process propagates errors
140        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
141        // This will error because 'b' is not found
142        assert!(result.is_err());
143    }
144
145    #[test]
146    fn stream_with_json_output() {
147        let input = Cursor::new("{\"name\": \"Alice\"}\n");
148        let expr = Expression::parse("name").unwrap();
149        let result = stream_process(input, &expr, true, false, &OutputFormat::Auto);
150        assert!(result.is_ok());
151    }
152
153    #[test]
154    fn stream_with_yaml_output() {
155        let input = Cursor::new("{\"name\": \"Alice\"}\n");
156        let expr = Expression::parse("").unwrap();
157        let result = stream_process(input, &expr, false, false, &OutputFormat::Yaml);
158        assert!(result.is_ok());
159    }
160
161    #[test]
162    fn stream_with_builtin() {
163        let input = Cursor::new("{\"a\": 1, \"b\": 2}\n{\"x\": 3}\n");
164        let expr = Expression::parse("keys()").unwrap();
165        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
166        assert!(result.is_ok());
167    }
168
169    #[test]
170    fn stream_empty_input() {
171        let input = Cursor::new("");
172        let expr = Expression::parse("x").unwrap();
173        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
174        assert!(result.is_ok());
175    }
176
177    #[test]
178    fn stream_only_empty_lines() {
179        let input = Cursor::new("\n\n\n");
180        let expr = Expression::parse("x").unwrap();
181        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
182        assert!(result.is_ok());
183    }
184
185    #[test]
186    fn stream_with_length() {
187        let input = Cursor::new("{\"items\": [1, 2, 3]}\n{\"items\": [4, 5]}\n");
188        let expr = Expression::parse("items | length()").unwrap();
189        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
190        assert!(result.is_ok());
191    }
192}