1use std::io::BufRead;
2
3use crate::cli::OutputFormat;
4use crate::error::PickError;
5use crate::output;
6use crate::selector::{Expression, execute};
7
8pub fn stream_process(
13 reader: impl BufRead,
14 expression: &Expression,
15 as_json: bool,
16 as_lines: bool,
17 output_format: &OutputFormat,
18) -> Result<(), PickError> {
19 for line in reader.lines() {
20 let line = line.map_err(PickError::Io)?;
21 let trimmed = line.trim();
22 if trimmed.is_empty() {
23 continue;
24 }
25
26 let value: serde_json::Value = serde_json::from_str(trimmed)
27 .map_err(|e| PickError::ParseError("json".into(), e.to_string()))?;
28
29 let results = execute(&value, expression)?;
30 if results.is_empty() {
31 continue;
32 }
33
34 let formatted = output::format_output(&results, as_json, as_lines, output_format);
35 if !formatted.is_empty() {
36 println!("{formatted}");
37 }
38 }
39
40 Ok(())
41}
42
43#[cfg(test)]
44mod tests {
45 use super::*;
46 use std::io::Cursor;
47
48 #[test]
49 fn stream_single_line() {
50 let input = Cursor::new("{\"name\": \"Alice\"}\n");
51 let expr = Expression::parse("name").unwrap();
52 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
53 assert!(result.is_ok());
54 }
55
56 #[test]
57 fn stream_multiple_lines() {
58 let input = Cursor::new("{\"x\": 1}\n{\"x\": 2}\n{\"x\": 3}\n");
59 let expr = Expression::parse("x").unwrap();
60 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
61 assert!(result.is_ok());
62 }
63
64 #[test]
65 fn stream_skips_empty_lines() {
66 let input = Cursor::new("\n{\"x\": 1}\n\n{\"x\": 2}\n\n");
67 let expr = Expression::parse("x").unwrap();
68 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
69 assert!(result.is_ok());
70 }
71
72 #[test]
73 fn stream_invalid_json_error() {
74 let input = Cursor::new("not json\n");
75 let expr = Expression::parse("x").unwrap();
76 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
77 assert!(result.is_err());
78 }
79
80 #[test]
85 fn stream_whitespace_only_lines() {
86 let input = Cursor::new(" \n \t \n{\"x\": 1}\n \n");
87 let expr = Expression::parse("x").unwrap();
88 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
89 assert!(result.is_ok());
90 }
91
92 #[test]
93 fn stream_with_pipeline() {
94 let input = Cursor::new("{\"items\": [{\"name\": \"a\"}, {\"name\": \"b\"}]}\n");
95 let expr = Expression::parse("items[*] | name").unwrap();
96 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
97 assert!(result.is_ok());
98 }
99
100 #[test]
101 fn stream_with_select() {
102 let input = Cursor::new("{\"price\": 50}\n{\"price\": 150}\n{\"price\": 200}\n");
103 let expr = Expression::parse("price").unwrap();
104 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
105 assert!(result.is_ok());
106 }
107
108 #[test]
109 fn stream_with_select_filter() {
110 let input = Cursor::new(
111 "{\"name\": \"a\", \"active\": true}\n{\"name\": \"b\", \"active\": false}\n",
112 );
113 let expr = Expression::parse("select(.active) | name").unwrap();
114 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
115 assert!(result.is_ok());
116 }
117
118 #[test]
119 fn stream_with_set() {
120 let input = Cursor::new("{\"name\": \"Alice\"}\n");
121 let expr = Expression::parse("set(.greeting, \"hello\") | greeting").unwrap();
122 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
123 assert!(result.is_ok());
124 }
125
126 #[test]
127 fn stream_with_del() {
128 let input = Cursor::new("{\"name\": \"Alice\", \"temp\": \"x\"}\n");
129 let expr = Expression::parse("del(.temp)").unwrap();
130 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
131 assert!(result.is_ok());
132 }
133
134 #[test]
135 fn stream_no_match_lines() {
136 let input = Cursor::new("{\"a\": 1}\n{\"a\": 2}\n");
138 let expr = Expression::parse("b").unwrap();
139 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
141 assert!(result.is_err());
143 }
144
145 #[test]
146 fn stream_with_json_output() {
147 let input = Cursor::new("{\"name\": \"Alice\"}\n");
148 let expr = Expression::parse("name").unwrap();
149 let result = stream_process(input, &expr, true, false, &OutputFormat::Auto);
150 assert!(result.is_ok());
151 }
152
153 #[test]
154 fn stream_with_yaml_output() {
155 let input = Cursor::new("{\"name\": \"Alice\"}\n");
156 let expr = Expression::parse("").unwrap();
157 let result = stream_process(input, &expr, false, false, &OutputFormat::Yaml);
158 assert!(result.is_ok());
159 }
160
161 #[test]
162 fn stream_with_builtin() {
163 let input = Cursor::new("{\"a\": 1, \"b\": 2}\n{\"x\": 3}\n");
164 let expr = Expression::parse("keys()").unwrap();
165 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
166 assert!(result.is_ok());
167 }
168
169 #[test]
170 fn stream_empty_input() {
171 let input = Cursor::new("");
172 let expr = Expression::parse("x").unwrap();
173 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
174 assert!(result.is_ok());
175 }
176
177 #[test]
178 fn stream_only_empty_lines() {
179 let input = Cursor::new("\n\n\n");
180 let expr = Expression::parse("x").unwrap();
181 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
182 assert!(result.is_ok());
183 }
184
185 #[test]
186 fn stream_with_length() {
187 let input = Cursor::new("{\"items\": [1, 2, 3]}\n{\"items\": [4, 5]}\n");
188 let expr = Expression::parse("items | length()").unwrap();
189 let result = stream_process(input, &expr, false, false, &OutputFormat::Auto);
190 assert!(result.is_ok());
191 }
192}