Skip to main content

pick/
streaming.rs

1use std::io::BufRead;
2
3use crate::cli::OutputFormat;
4use crate::error::PickError;
5use crate::output;
6use crate::selector::{Expression, execute};
7
8/// Process input as a stream of JSON values (one per line / JSONL).
9/// Applies the expression to each value and writes results immediately.
10///
11/// Time: O(n) total where n = input size. Memory: O(1) per line.
12pub fn stream_process(
13    reader: impl BufRead,
14    expression: &Expression,
15    as_json: bool,
16    as_lines: bool,
17    output_format: &OutputFormat,
18) -> Result<(), PickError> {
19    for line in reader.lines() {
20        let line = line.map_err(PickError::Io)?;
21        let trimmed = line.trim();
22        if trimmed.is_empty() {
23            continue;
24        }
25
26        let value: serde_json::Value = serde_json::from_str(trimmed).map_err(|e| {
27            PickError::ParseError("json".into(), e.to_string())
28        })?;
29
30        let results = execute(&value, expression)?;
31        if results.is_empty() {
32            continue;
33        }
34
35        let formatted = output::format_output(&results, as_json, as_lines, output_format);
36        if !formatted.is_empty() {
37            println!("{formatted}");
38        }
39    }
40
41    Ok(())
42}
43
44#[cfg(test)]
45mod tests {
46    use super::*;
47    use std::io::Cursor;
48
49    #[test]
50    fn stream_single_line() {
51        let input = Cursor::new("{\"name\": \"Alice\"}\n");
52        let expr = Expression::parse("name").unwrap();
53        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
54        assert!(result.is_ok());
55    }
56
57    #[test]
58    fn stream_multiple_lines() {
59        let input = Cursor::new("{\"x\": 1}\n{\"x\": 2}\n{\"x\": 3}\n");
60        let expr = Expression::parse("x").unwrap();
61        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
62        assert!(result.is_ok());
63    }
64
65    #[test]
66    fn stream_skips_empty_lines() {
67        let input = Cursor::new("\n{\"x\": 1}\n\n{\"x\": 2}\n\n");
68        let expr = Expression::parse("x").unwrap();
69        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
70        assert!(result.is_ok());
71    }
72
73    #[test]
74    fn stream_invalid_json_error() {
75        let input = Cursor::new("not json\n");
76        let expr = Expression::parse("x").unwrap();
77        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
78        assert!(result.is_err());
79    }
80
81    // ══════════════════════════════════════════════
82    // Additional coverage tests
83    // ══════════════════════════════════════════════
84
85    #[test]
86    fn stream_whitespace_only_lines() {
87        let input = Cursor::new("   \n  \t  \n{\"x\": 1}\n   \n");
88        let expr = Expression::parse("x").unwrap();
89        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
90        assert!(result.is_ok());
91    }
92
93    #[test]
94    fn stream_with_pipeline() {
95        let input = Cursor::new("{\"items\": [{\"name\": \"a\"}, {\"name\": \"b\"}]}\n");
96        let expr = Expression::parse("items[*] | name").unwrap();
97        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
98        assert!(result.is_ok());
99    }
100
101    #[test]
102    fn stream_with_select() {
103        let input = Cursor::new(
104            "{\"price\": 50}\n{\"price\": 150}\n{\"price\": 200}\n",
105        );
106        let expr = Expression::parse("price").unwrap();
107        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
108        assert!(result.is_ok());
109    }
110
111    #[test]
112    fn stream_with_select_filter() {
113        let input = Cursor::new(
114            "{\"name\": \"a\", \"active\": true}\n{\"name\": \"b\", \"active\": false}\n",
115        );
116        let expr = Expression::parse("select(.active) | name").unwrap();
117        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
118        assert!(result.is_ok());
119    }
120
121    #[test]
122    fn stream_with_set() {
123        let input = Cursor::new("{\"name\": \"Alice\"}\n");
124        let expr = Expression::parse("set(.greeting, \"hello\") | greeting").unwrap();
125        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
126        assert!(result.is_ok());
127    }
128
129    #[test]
130    fn stream_with_del() {
131        let input = Cursor::new("{\"name\": \"Alice\", \"temp\": \"x\"}\n");
132        let expr = Expression::parse("del(.temp)").unwrap();
133        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
134        assert!(result.is_ok());
135    }
136
137    #[test]
138    fn stream_no_match_lines() {
139        // Key not found → should still be Ok (empty results are skipped)
140        let input = Cursor::new("{\"a\": 1}\n{\"a\": 2}\n");
141        let expr = Expression::parse("b").unwrap();
142        // extract will error per line but stream_process propagates errors
143        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
144        // This will error because 'b' is not found
145        assert!(result.is_err());
146    }
147
148    #[test]
149    fn stream_with_json_output() {
150        let input = Cursor::new("{\"name\": \"Alice\"}\n");
151        let expr = Expression::parse("name").unwrap();
152        let result = stream_process(input, &expr, true, false, &OutputFormat::Auto);
153        assert!(result.is_ok());
154    }
155
156    #[test]
157    fn stream_with_yaml_output() {
158        let input = Cursor::new("{\"name\": \"Alice\"}\n");
159        let expr = Expression::parse("").unwrap();
160        let result = stream_process(input, &expr, false, false, &OutputFormat::Yaml);
161        assert!(result.is_ok());
162    }
163
164    #[test]
165    fn stream_with_builtin() {
166        let input = Cursor::new("{\"a\": 1, \"b\": 2}\n{\"x\": 3}\n");
167        let expr = Expression::parse("keys()").unwrap();
168        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
169        assert!(result.is_ok());
170    }
171
172    #[test]
173    fn stream_empty_input() {
174        let input = Cursor::new("");
175        let expr = Expression::parse("x").unwrap();
176        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
177        assert!(result.is_ok());
178    }
179
180    #[test]
181    fn stream_only_empty_lines() {
182        let input = Cursor::new("\n\n\n");
183        let expr = Expression::parse("x").unwrap();
184        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
185        assert!(result.is_ok());
186    }
187
188    #[test]
189    fn stream_with_length() {
190        let input = Cursor::new("{\"items\": [1, 2, 3]}\n{\"items\": [4, 5]}\n");
191        let expr = Expression::parse("items | length()").unwrap();
192        let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
193        assert!(result.is_ok());
194    }
195}