1use std::io::Write;
6
7use crate::error::{CliError, Result};
8use crate::models::{Column, Row};
9use crate::output::formatter::Formatter;
10pub const DEFAULT_BUFFER_LIMIT: usize = 10 * 1024 * 1024;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum WriteStatus {
16 Continue,
18 LimitReached,
20}
21
22pub struct StreamingWriter<W> {
33 writer: W,
35 formatter: Box<dyn Formatter>,
37 columns: Vec<Column>,
39 limit: Option<usize>,
41 buffer_limit: usize,
43 buffer: Vec<Row>,
45 buffer_bytes: usize,
47 written_count: usize,
49 output_started: bool,
51 quiet: bool,
53}
54
55impl<W: Write> StreamingWriter<W> {
56 pub fn new(
65 writer: W,
66 formatter: Box<dyn Formatter>,
67 columns: Vec<Column>,
68 limit: Option<usize>,
69 ) -> Self {
70 Self {
71 writer,
72 formatter,
73 columns,
74 limit,
75 buffer_limit: DEFAULT_BUFFER_LIMIT,
76 buffer: Vec::new(),
77 buffer_bytes: 0,
78 written_count: 0,
79 output_started: false,
80 quiet: false,
81 }
82 }
83
84 #[allow(dead_code)]
86 pub fn with_buffer_limit(mut self, buffer_limit: usize) -> Self {
87 self.buffer_limit = buffer_limit;
88 self
89 }
90
91 pub fn with_quiet(mut self, quiet: bool) -> Self {
93 self.quiet = quiet;
94 self
95 }
96
97 pub fn is_quiet(&self) -> bool {
101 self.quiet
102 }
103
104 pub fn prepare(&mut self, row_count_hint: Option<usize>) -> Result<()> {
113 let _ = row_count_hint;
114
115 if self.formatter.supports_streaming() {
117 self.formatter
118 .write_header(&mut self.writer, &self.columns)?;
119 self.output_started = true;
120 }
121
122 Ok(())
123 }
124
125 pub fn write_row(&mut self, row: Row) -> Result<WriteStatus> {
139 if let Some(limit) = self.limit {
141 if self.written_count >= limit {
142 return Ok(WriteStatus::LimitReached);
143 }
144 }
145
146 if self.formatter.supports_streaming() {
147 self.formatter.write_row(&mut self.writer, &row)?;
149 self.written_count += 1;
150 } else {
151 let row_bytes = estimate_row_bytes(&row);
153 self.buffer_bytes = self.buffer_bytes.saturating_add(row_bytes);
154 if self.buffer_bytes > self.buffer_limit {
155 return Err(CliError::InvalidArgument(
156 "Buffer limit exceeded (~10MB). \
157 Use --output json|csv|tsv or --limit to reduce results."
158 .into(),
159 ));
160 }
161 self.buffer.push(row);
162 self.written_count += 1;
163 }
164
165 Ok(WriteStatus::Continue)
166 }
167
168 pub fn finish(&mut self) -> Result<()> {
170 if !self.output_started {
172 self.formatter
173 .write_header(&mut self.writer, &self.columns)?;
174 self.output_started = true;
175
176 for row in self.buffer.drain(..) {
178 self.formatter.write_row(&mut self.writer, &row)?;
179 }
180 }
181
182 self.formatter.write_footer(&mut self.writer)?;
184
185 Ok(())
186 }
187
188 #[allow(dead_code)]
190 pub fn written_count(&self) -> usize {
191 self.written_count
192 }
193
194 #[allow(dead_code)]
196 pub fn output_started(&self) -> bool {
197 self.output_started
198 }
199
200 #[allow(dead_code)]
202 pub fn buffered_bytes(&self) -> usize {
203 self.buffer_bytes
204 }
205}
206
207fn estimate_row_bytes(row: &Row) -> usize {
208 row.columns
209 .iter()
210 .map(estimate_value_bytes)
211 .sum::<usize>()
212 .saturating_add(row.columns.len() * 8)
213}
214
215fn estimate_value_bytes(value: &crate::models::Value) -> usize {
216 match value {
217 crate::models::Value::Null => 4,
218 crate::models::Value::Bool(_) => 1,
219 crate::models::Value::Int(_) => 8,
220 crate::models::Value::Float(_) => 8,
221 crate::models::Value::Text(text) => text.len(),
222 crate::models::Value::Bytes(bytes) => bytes.len(),
223 crate::models::Value::Vector(values) => values.len() * 4,
224 }
225}
226
227#[cfg(test)]
228mod tests {
229 use super::*;
230 use crate::error::CliError;
231 use crate::models::{DataType, Value};
232 use crate::output::csv::CsvFormatter;
233 use crate::output::json::JsonFormatter;
234 use crate::output::jsonl::JsonlFormatter;
235 use crate::output::table::TableFormatter;
236
237 fn test_columns() -> Vec<Column> {
238 vec![
239 Column::new("id", DataType::Int),
240 Column::new("name", DataType::Text),
241 ]
242 }
243
244 fn test_row(id: i64, name: &str) -> Row {
245 Row::new(vec![Value::Int(id), Value::Text(name.to_string())])
246 }
247
248 #[test]
249 fn test_streaming_format_immediate_output() {
250 let mut output = Vec::new();
251 let formatter = Box::new(JsonlFormatter::new());
252 let columns = test_columns();
253
254 let mut writer = StreamingWriter::new(&mut output, formatter, columns, None);
255
256 writer.prepare(None).unwrap();
257 assert!(writer.output_started());
258
259 let status = writer.write_row(test_row(1, "Alice")).unwrap();
260 assert_eq!(status, WriteStatus::Continue);
261 assert_eq!(writer.written_count(), 1);
262
263 writer.finish().unwrap();
264
265 let result = String::from_utf8(output).unwrap();
266 assert!(result.contains("\"id\":1"));
267 assert!(result.contains("\"name\":\"Alice\""));
268 }
269
270 #[test]
271 fn test_non_streaming_format_buffered_output() {
272 let mut output = Vec::new();
273 let formatter = Box::new(TableFormatter::new());
274 let columns = test_columns();
275
276 let mut writer = StreamingWriter::new(&mut output, formatter, columns, None);
277
278 writer.prepare(None).unwrap();
279 assert!(!writer.output_started()); let status = writer.write_row(test_row(1, "Alice")).unwrap();
282 assert_eq!(status, WriteStatus::Continue);
283 assert!(!writer.output_started()); writer.finish().unwrap();
286 assert!(writer.output_started()); let result = String::from_utf8(output).unwrap();
289 assert!(result.contains("id"));
290 assert!(result.contains("Alice"));
291 }
292
293 #[test]
294 fn test_limit_enforcement() {
295 let mut output = Vec::new();
296 let formatter = Box::new(CsvFormatter::new());
297 let columns = test_columns();
298
299 let mut writer = StreamingWriter::new(&mut output, formatter, columns, Some(2));
300
301 writer.prepare(None).unwrap();
302
303 assert_eq!(
304 writer.write_row(test_row(1, "Alice")).unwrap(),
305 WriteStatus::Continue
306 );
307 assert_eq!(
308 writer.write_row(test_row(2, "Bob")).unwrap(),
309 WriteStatus::Continue
310 );
311 assert_eq!(
312 writer.write_row(test_row(3, "Charlie")).unwrap(),
313 WriteStatus::LimitReached
314 );
315
316 assert_eq!(writer.written_count(), 2);
317
318 writer.finish().unwrap();
319 }
320
321 #[test]
322 fn test_buffer_overflow_errors() {
323 let mut output = Vec::new();
324 let formatter = Box::new(TableFormatter::new());
325 let columns = test_columns();
326
327 let mut writer =
328 StreamingWriter::new(&mut output, formatter, columns, None).with_buffer_limit(40); writer.prepare(None).unwrap();
331 assert!(!writer.output_started());
332
333 assert_eq!(
335 writer.write_row(test_row(1, "Alice")).unwrap(),
336 WriteStatus::Continue
337 );
338 let err = writer.write_row(test_row(2, "Bob")).unwrap_err();
339 assert!(matches!(err, CliError::InvalidArgument(_)));
340 }
341
342 #[test]
343 fn test_empty_output() {
344 let mut output = Vec::new();
345 let formatter = Box::new(JsonFormatter::new());
346 let columns = test_columns();
347
348 let mut writer = StreamingWriter::new(&mut output, formatter, columns, None);
349
350 writer.prepare(None).unwrap();
351 writer.finish().unwrap();
352
353 let result = String::from_utf8(output).unwrap();
354 assert!(result.contains('['));
356 assert!(result.contains(']'));
357 }
358
359 #[test]
360 fn test_csv_streaming() {
361 let mut output = Vec::new();
362 let formatter = Box::new(CsvFormatter::new());
363 let columns = test_columns();
364
365 let mut writer = StreamingWriter::new(&mut output, formatter, columns, None);
366
367 writer.prepare(None).unwrap();
368 assert!(writer.output_started()); writer.write_row(test_row(1, "Alice")).unwrap();
371 writer.write_row(test_row(2, "Bob")).unwrap();
372 writer.finish().unwrap();
373
374 let result = String::from_utf8(output).unwrap();
375 assert_eq!(result, "id,name\n1,Alice\n2,Bob\n");
376 }
377
378 #[test]
379 fn test_written_count() {
380 let mut output = Vec::new();
381 let formatter = Box::new(CsvFormatter::new());
382 let columns = test_columns();
383
384 let mut writer = StreamingWriter::new(&mut output, formatter, columns, None);
385
386 writer.prepare(None).unwrap();
387
388 assert_eq!(writer.written_count(), 0);
389 writer.write_row(test_row(1, "Alice")).unwrap();
390 assert_eq!(writer.written_count(), 1);
391 writer.write_row(test_row(2, "Bob")).unwrap();
392 assert_eq!(writer.written_count(), 2);
393
394 writer.finish().unwrap();
395 }
396
397 #[test]
398 fn test_table_with_small_data() {
399 let mut output = Vec::new();
400 let formatter = Box::new(TableFormatter::new());
401 let columns = test_columns();
402
403 let mut writer = StreamingWriter::new(&mut output, formatter, columns, None);
404
405 writer.prepare(None).unwrap();
406 assert!(!writer.output_started()); writer.write_row(test_row(1, "Alice")).unwrap();
409 writer.write_row(test_row(2, "Bob")).unwrap();
410 writer.finish().unwrap();
411
412 let result = String::from_utf8(output).unwrap();
413 assert!(result.contains("id"));
415 assert!(result.contains("name"));
416 assert!(result.contains("Alice"));
417 assert!(result.contains("Bob"));
418 }
419
420 #[test]
421 fn test_streaming_large_row_count_does_not_buffer() {
422 let formatter = Box::new(JsonlFormatter::new());
423 let columns = test_columns();
424
425 let mut writer = StreamingWriter::new(std::io::sink(), formatter, columns, None);
426
427 writer.prepare(None).unwrap();
428 for i in 0..12_000 {
429 writer.write_row(test_row(i, "row")).unwrap();
430 }
431
432 assert_eq!(writer.written_count(), 12_000);
433 assert!(writer.buffer.is_empty());
434
435 writer.finish().unwrap();
436 }
437}