alopex_cli/streaming/
writer.rs

1//! StreamingWriter - Streaming output controller
2//!
3//! Manages streaming output with buffer limits for non-streaming formats.
4
5use std::io::Write;
6
7use crate::error::{CliError, Result};
8use crate::models::{Column, Row};
9use crate::output::formatter::Formatter;
10/// Default buffer limit for non-streaming formats (table).
11pub const DEFAULT_BUFFER_LIMIT: usize = 10 * 1024 * 1024;
12
13/// Status returned by write operations.
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum WriteStatus {
16    /// Row was written successfully, continue writing.
17    Continue,
18    /// Limit reached, no more rows will be written.
19    LimitReached,
20}
21
22/// Streaming writer for output.
23///
24/// Controls streaming output with the following behaviors:
25/// - **Streaming formats** (json, jsonl, csv, tsv): Output rows immediately.
26/// - **Non-streaming formats** (table): Buffer rows up to `buffer_limit`.
27///
28/// # Output Boundary
29///
30/// - **Output started**: When `output_started == true` (header has been written).
31/// - **Buffer overflow**: Returns an error prompting `--output json|csv|tsv` or `--limit`.
32pub struct StreamingWriter<W> {
33    /// The underlying writer.
34    writer: W,
35    /// The formatter to use for output.
36    formatter: Box<dyn Formatter>,
37    /// Column definitions (schema).
38    columns: Vec<Column>,
39    /// Optional row limit.
40    limit: Option<usize>,
41    /// Buffer limit for non-streaming formats.
42    buffer_limit: usize,
43    /// Buffer for non-streaming formats (table).
44    buffer: Vec<Row>,
45    /// Approximate buffered size in bytes.
46    buffer_bytes: usize,
47    /// Number of rows written (for limit checking).
48    written_count: usize,
49    /// Whether header has been output (output started).
50    output_started: bool,
51    /// Whether quiet mode is enabled (suppress warnings).
52    quiet: bool,
53}
54
55impl<W: Write> StreamingWriter<W> {
56    /// Create a new StreamingWriter.
57    ///
58    /// # Arguments
59    ///
60    /// * `writer` - The output writer (e.g., stdout).
61    /// * `formatter` - The formatter to use for output.
62    /// * `columns` - Column definitions for the output.
63    /// * `limit` - Optional row limit.
64    pub fn new(
65        writer: W,
66        formatter: Box<dyn Formatter>,
67        columns: Vec<Column>,
68        limit: Option<usize>,
69    ) -> Self {
70        Self {
71            writer,
72            formatter,
73            columns,
74            limit,
75            buffer_limit: DEFAULT_BUFFER_LIMIT,
76            buffer: Vec::new(),
77            buffer_bytes: 0,
78            written_count: 0,
79            output_started: false,
80            quiet: false,
81        }
82    }
83
84    /// Create a new StreamingWriter with a custom buffer limit.
85    #[allow(dead_code)]
86    pub fn with_buffer_limit(mut self, buffer_limit: usize) -> Self {
87        self.buffer_limit = buffer_limit;
88        self
89    }
90
91    /// Enable quiet mode (suppress warnings).
92    pub fn with_quiet(mut self, quiet: bool) -> Self {
93        self.quiet = quiet;
94        self
95    }
96
97    /// Check if quiet mode is enabled.
98    ///
99    /// When quiet mode is enabled, status-only output (OK messages) should be suppressed.
100    pub fn is_quiet(&self) -> bool {
101        self.quiet
102    }
103
104    /// Prepare the writer for output.
105    ///
106    /// For streaming formats (json, jsonl, csv, tsv), immediately outputs the header.
107    /// For non-streaming formats (table), defers header output.
108    ///
109    /// # Arguments
110    ///
111    /// * `row_count_hint` - Optional estimated row count (unused for buffer sizing).
112    pub fn prepare(&mut self, row_count_hint: Option<usize>) -> Result<()> {
113        let _ = row_count_hint;
114
115        // For streaming formats, output header immediately
116        if self.formatter.supports_streaming() {
117            self.formatter
118                .write_header(&mut self.writer, &self.columns)?;
119            self.output_started = true;
120        }
121
122        Ok(())
123    }
124
125    /// Write a row to the output.
126    ///
127    /// For streaming formats, the row is output immediately.
128    /// For non-streaming formats, the row is buffered.
129    ///
130    /// # Returns
131    ///
132    /// * `WriteStatus::Continue` - Row was written, continue writing.
133    /// * `WriteStatus::LimitReached` - Row limit reached, stop writing.
134    ///
135    /// # Note
136    ///
137    /// The row is taken by ownership to avoid unnecessary cloning when buffering.
138    pub fn write_row(&mut self, row: Row) -> Result<WriteStatus> {
139        // Check limit
140        if let Some(limit) = self.limit {
141            if self.written_count >= limit {
142                return Ok(WriteStatus::LimitReached);
143            }
144        }
145
146        if self.formatter.supports_streaming() {
147            // Streaming format: output immediately
148            self.formatter.write_row(&mut self.writer, &row)?;
149            self.written_count += 1;
150        } else {
151            // Non-streaming format: buffer the row
152            let row_bytes = estimate_row_bytes(&row);
153            self.buffer_bytes = self.buffer_bytes.saturating_add(row_bytes);
154            if self.buffer_bytes > self.buffer_limit {
155                return Err(CliError::InvalidArgument(
156                    "Buffer limit exceeded (~10MB). \
157                     Use --output json|csv|tsv or --limit to reduce results."
158                        .into(),
159                ));
160            }
161            self.buffer.push(row);
162            self.written_count += 1;
163        }
164
165        Ok(WriteStatus::Continue)
166    }
167
168    /// Finish output, flushing any buffered rows and writing the footer.
169    pub fn finish(&mut self) -> Result<()> {
170        // For non-streaming formats, output header if not yet done
171        if !self.output_started {
172            self.formatter
173                .write_header(&mut self.writer, &self.columns)?;
174            self.output_started = true;
175
176            // Flush any buffered rows
177            for row in self.buffer.drain(..) {
178                self.formatter.write_row(&mut self.writer, &row)?;
179            }
180        }
181
182        // Write footer
183        self.formatter.write_footer(&mut self.writer)?;
184
185        Ok(())
186    }
187
188    /// Returns the number of rows written.
189    #[allow(dead_code)]
190    pub fn written_count(&self) -> usize {
191        self.written_count
192    }
193
194    /// Returns whether output has started.
195    #[allow(dead_code)]
196    pub fn output_started(&self) -> bool {
197        self.output_started
198    }
199
200    /// Returns approximate buffered bytes.
201    #[allow(dead_code)]
202    pub fn buffered_bytes(&self) -> usize {
203        self.buffer_bytes
204    }
205}
206
207fn estimate_row_bytes(row: &Row) -> usize {
208    row.columns
209        .iter()
210        .map(estimate_value_bytes)
211        .sum::<usize>()
212        .saturating_add(row.columns.len() * 8)
213}
214
215fn estimate_value_bytes(value: &crate::models::Value) -> usize {
216    match value {
217        crate::models::Value::Null => 4,
218        crate::models::Value::Bool(_) => 1,
219        crate::models::Value::Int(_) => 8,
220        crate::models::Value::Float(_) => 8,
221        crate::models::Value::Text(text) => text.len(),
222        crate::models::Value::Bytes(bytes) => bytes.len(),
223        crate::models::Value::Vector(values) => values.len() * 4,
224    }
225}
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230    use crate::error::CliError;
231    use crate::models::{DataType, Value};
232    use crate::output::csv::CsvFormatter;
233    use crate::output::json::JsonFormatter;
234    use crate::output::jsonl::JsonlFormatter;
235    use crate::output::table::TableFormatter;
236
237    fn test_columns() -> Vec<Column> {
238        vec![
239            Column::new("id", DataType::Int),
240            Column::new("name", DataType::Text),
241        ]
242    }
243
244    fn test_row(id: i64, name: &str) -> Row {
245        Row::new(vec![Value::Int(id), Value::Text(name.to_string())])
246    }
247
248    #[test]
249    fn test_streaming_format_immediate_output() {
250        let mut output = Vec::new();
251        let formatter = Box::new(JsonlFormatter::new());
252        let columns = test_columns();
253
254        let mut writer = StreamingWriter::new(&mut output, formatter, columns, None);
255
256        writer.prepare(None).unwrap();
257        assert!(writer.output_started());
258
259        let status = writer.write_row(test_row(1, "Alice")).unwrap();
260        assert_eq!(status, WriteStatus::Continue);
261        assert_eq!(writer.written_count(), 1);
262
263        writer.finish().unwrap();
264
265        let result = String::from_utf8(output).unwrap();
266        assert!(result.contains("\"id\":1"));
267        assert!(result.contains("\"name\":\"Alice\""));
268    }
269
270    #[test]
271    fn test_non_streaming_format_buffered_output() {
272        let mut output = Vec::new();
273        let formatter = Box::new(TableFormatter::new());
274        let columns = test_columns();
275
276        let mut writer = StreamingWriter::new(&mut output, formatter, columns, None);
277
278        writer.prepare(None).unwrap();
279        assert!(!writer.output_started()); // Header not output yet
280
281        let status = writer.write_row(test_row(1, "Alice")).unwrap();
282        assert_eq!(status, WriteStatus::Continue);
283        assert!(!writer.output_started()); // Still buffering
284
285        writer.finish().unwrap();
286        assert!(writer.output_started()); // Now output started
287
288        let result = String::from_utf8(output).unwrap();
289        assert!(result.contains("id"));
290        assert!(result.contains("Alice"));
291    }
292
293    #[test]
294    fn test_limit_enforcement() {
295        let mut output = Vec::new();
296        let formatter = Box::new(CsvFormatter::new());
297        let columns = test_columns();
298
299        let mut writer = StreamingWriter::new(&mut output, formatter, columns, Some(2));
300
301        writer.prepare(None).unwrap();
302
303        assert_eq!(
304            writer.write_row(test_row(1, "Alice")).unwrap(),
305            WriteStatus::Continue
306        );
307        assert_eq!(
308            writer.write_row(test_row(2, "Bob")).unwrap(),
309            WriteStatus::Continue
310        );
311        assert_eq!(
312            writer.write_row(test_row(3, "Charlie")).unwrap(),
313            WriteStatus::LimitReached
314        );
315
316        assert_eq!(writer.written_count(), 2);
317
318        writer.finish().unwrap();
319    }
320
321    #[test]
322    fn test_buffer_overflow_errors() {
323        let mut output = Vec::new();
324        let formatter = Box::new(TableFormatter::new());
325        let columns = test_columns();
326
327        let mut writer =
328            StreamingWriter::new(&mut output, formatter, columns, None).with_buffer_limit(40); // Very small buffer
329
330        writer.prepare(None).unwrap();
331        assert!(!writer.output_started());
332
333        // Add rows to buffer
334        assert_eq!(
335            writer.write_row(test_row(1, "Alice")).unwrap(),
336            WriteStatus::Continue
337        );
338        let err = writer.write_row(test_row(2, "Bob")).unwrap_err();
339        assert!(matches!(err, CliError::InvalidArgument(_)));
340    }
341
342    #[test]
343    fn test_empty_output() {
344        let mut output = Vec::new();
345        let formatter = Box::new(JsonFormatter::new());
346        let columns = test_columns();
347
348        let mut writer = StreamingWriter::new(&mut output, formatter, columns, None);
349
350        writer.prepare(None).unwrap();
351        writer.finish().unwrap();
352
353        let result = String::from_utf8(output).unwrap();
354        // JSON array format: should be valid empty array
355        assert!(result.contains('['));
356        assert!(result.contains(']'));
357    }
358
359    #[test]
360    fn test_csv_streaming() {
361        let mut output = Vec::new();
362        let formatter = Box::new(CsvFormatter::new());
363        let columns = test_columns();
364
365        let mut writer = StreamingWriter::new(&mut output, formatter, columns, None);
366
367        writer.prepare(None).unwrap();
368        assert!(writer.output_started()); // CSV is streaming
369
370        writer.write_row(test_row(1, "Alice")).unwrap();
371        writer.write_row(test_row(2, "Bob")).unwrap();
372        writer.finish().unwrap();
373
374        let result = String::from_utf8(output).unwrap();
375        assert_eq!(result, "id,name\n1,Alice\n2,Bob\n");
376    }
377
378    #[test]
379    fn test_written_count() {
380        let mut output = Vec::new();
381        let formatter = Box::new(CsvFormatter::new());
382        let columns = test_columns();
383
384        let mut writer = StreamingWriter::new(&mut output, formatter, columns, None);
385
386        writer.prepare(None).unwrap();
387
388        assert_eq!(writer.written_count(), 0);
389        writer.write_row(test_row(1, "Alice")).unwrap();
390        assert_eq!(writer.written_count(), 1);
391        writer.write_row(test_row(2, "Bob")).unwrap();
392        assert_eq!(writer.written_count(), 2);
393
394        writer.finish().unwrap();
395    }
396
397    #[test]
398    fn test_table_with_small_data() {
399        let mut output = Vec::new();
400        let formatter = Box::new(TableFormatter::new());
401        let columns = test_columns();
402
403        let mut writer = StreamingWriter::new(&mut output, formatter, columns, None);
404
405        writer.prepare(None).unwrap();
406        assert!(!writer.output_started()); // Table is non-streaming
407
408        writer.write_row(test_row(1, "Alice")).unwrap();
409        writer.write_row(test_row(2, "Bob")).unwrap();
410        writer.finish().unwrap();
411
412        let result = String::from_utf8(output).unwrap();
413        // Table output should contain the data
414        assert!(result.contains("id"));
415        assert!(result.contains("name"));
416        assert!(result.contains("Alice"));
417        assert!(result.contains("Bob"));
418    }
419
420    #[test]
421    fn test_streaming_large_row_count_does_not_buffer() {
422        let formatter = Box::new(JsonlFormatter::new());
423        let columns = test_columns();
424
425        let mut writer = StreamingWriter::new(std::io::sink(), formatter, columns, None);
426
427        writer.prepare(None).unwrap();
428        for i in 0..12_000 {
429            writer.write_row(test_row(i, "row")).unwrap();
430        }
431
432        assert_eq!(writer.written_count(), 12_000);
433        assert!(writer.buffer.is_empty());
434
435        writer.finish().unwrap();
436    }
437}