1use std::io::BufRead;
2
3use crate::cli::OutputFormat;
4use crate::error::PickError;
5use crate::output;
6use crate::selector::{Expression, execute};
7
8pub fn stream_process(
13 reader: impl BufRead,
14 expression: &Expression,
15 as_json: bool,
16 as_lines: bool,
17 output_format: &OutputFormat,
18) -> Result<(), PickError> {
19 for line in reader.lines() {
20 let line = line.map_err(PickError::Io)?;
21 let trimmed = line.trim();
22 if trimmed.is_empty() {
23 continue;
24 }
25
26 let value: serde_json::Value = serde_json::from_str(trimmed).map_err(|e| {
27 PickError::ParseError("json".into(), e.to_string())
28 })?;
29
30 let results = execute(&value, expression)?;
31 if results.is_empty() {
32 continue;
33 }
34
35 let formatted = output::format_output(&results, as_json, as_lines, output_format);
36 if !formatted.is_empty() {
37 println!("{formatted}");
38 }
39 }
40
41 Ok(())
42}
43
44#[cfg(test)]
45mod tests {
46 use super::*;
47 use std::io::Cursor;
48
49 #[test]
50 fn stream_single_line() {
51 let input = Cursor::new("{\"name\": \"Alice\"}\n");
52 let expr = Expression::parse("name").unwrap();
53 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
54 assert!(result.is_ok());
55 }
56
57 #[test]
58 fn stream_multiple_lines() {
59 let input = Cursor::new("{\"x\": 1}\n{\"x\": 2}\n{\"x\": 3}\n");
60 let expr = Expression::parse("x").unwrap();
61 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
62 assert!(result.is_ok());
63 }
64
65 #[test]
66 fn stream_skips_empty_lines() {
67 let input = Cursor::new("\n{\"x\": 1}\n\n{\"x\": 2}\n\n");
68 let expr = Expression::parse("x").unwrap();
69 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
70 assert!(result.is_ok());
71 }
72
73 #[test]
74 fn stream_invalid_json_error() {
75 let input = Cursor::new("not json\n");
76 let expr = Expression::parse("x").unwrap();
77 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
78 assert!(result.is_err());
79 }
80
81 #[test]
86 fn stream_whitespace_only_lines() {
87 let input = Cursor::new(" \n \t \n{\"x\": 1}\n \n");
88 let expr = Expression::parse("x").unwrap();
89 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
90 assert!(result.is_ok());
91 }
92
93 #[test]
94 fn stream_with_pipeline() {
95 let input = Cursor::new("{\"items\": [{\"name\": \"a\"}, {\"name\": \"b\"}]}\n");
96 let expr = Expression::parse("items[*] | name").unwrap();
97 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
98 assert!(result.is_ok());
99 }
100
101 #[test]
102 fn stream_with_select() {
103 let input = Cursor::new(
104 "{\"price\": 50}\n{\"price\": 150}\n{\"price\": 200}\n",
105 );
106 let expr = Expression::parse("price").unwrap();
107 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
108 assert!(result.is_ok());
109 }
110
111 #[test]
112 fn stream_with_select_filter() {
113 let input = Cursor::new(
114 "{\"name\": \"a\", \"active\": true}\n{\"name\": \"b\", \"active\": false}\n",
115 );
116 let expr = Expression::parse("select(.active) | name").unwrap();
117 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
118 assert!(result.is_ok());
119 }
120
121 #[test]
122 fn stream_with_set() {
123 let input = Cursor::new("{\"name\": \"Alice\"}\n");
124 let expr = Expression::parse("set(.greeting, \"hello\") | greeting").unwrap();
125 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
126 assert!(result.is_ok());
127 }
128
129 #[test]
130 fn stream_with_del() {
131 let input = Cursor::new("{\"name\": \"Alice\", \"temp\": \"x\"}\n");
132 let expr = Expression::parse("del(.temp)").unwrap();
133 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
134 assert!(result.is_ok());
135 }
136
137 #[test]
138 fn stream_no_match_lines() {
139 let input = Cursor::new("{\"a\": 1}\n{\"a\": 2}\n");
141 let expr = Expression::parse("b").unwrap();
142 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
144 assert!(result.is_err());
146 }
147
148 #[test]
149 fn stream_with_json_output() {
150 let input = Cursor::new("{\"name\": \"Alice\"}\n");
151 let expr = Expression::parse("name").unwrap();
152 let result = stream_process(input, &expr, true, false, &OutputFormat::Auto);
153 assert!(result.is_ok());
154 }
155
156 #[test]
157 fn stream_with_yaml_output() {
158 let input = Cursor::new("{\"name\": \"Alice\"}\n");
159 let expr = Expression::parse("").unwrap();
160 let result = stream_process(input, &expr, false, false, &OutputFormat::Yaml);
161 assert!(result.is_ok());
162 }
163
164 #[test]
165 fn stream_with_builtin() {
166 let input = Cursor::new("{\"a\": 1, \"b\": 2}\n{\"x\": 3}\n");
167 let expr = Expression::parse("keys()").unwrap();
168 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
169 assert!(result.is_ok());
170 }
171
172 #[test]
173 fn stream_empty_input() {
174 let input = Cursor::new("");
175 let expr = Expression::parse("x").unwrap();
176 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
177 assert!(result.is_ok());
178 }
179
180 #[test]
181 fn stream_only_empty_lines() {
182 let input = Cursor::new("\n\n\n");
183 let expr = Expression::parse("x").unwrap();
184 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
185 assert!(result.is_ok());
186 }
187
188 #[test]
189 fn stream_with_length() {
190 let input = Cursor::new("{\"items\": [1, 2, 3]}\n{\"items\": [4, 5]}\n");
191 let expr = Expression::parse("items | length()").unwrap();
192 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
193 assert!(result.is_ok());
194 }
195}