1use std::str::FromStr;
21
22use crate::print_options::MaxRows;
23
24use arrow::csv::writer::WriterBuilder;
25use arrow::datatypes::SchemaRef;
26use arrow::json::{ArrayWriter, LineDelimitedWriter};
27use arrow::record_batch::RecordBatch;
28use arrow::util::pretty::pretty_format_batches_with_options;
29use datafusion::config::FormatOptions;
30use datafusion::error::Result;
31
32#[derive(Debug, PartialEq, Eq, clap::ValueEnum, Clone, Copy)]
34pub enum PrintFormat {
35 Csv,
36 Tsv,
37 Table,
38 Json,
39 NdJson,
40 Automatic,
41}
42
43impl FromStr for PrintFormat {
44 type Err = String;
45
46 fn from_str(s: &str) -> Result<Self, Self::Err> {
47 clap::ValueEnum::from_str(s, true)
48 }
49}
50
51macro_rules! batches_to_json {
52 ($WRITER: ident, $writer: expr, $batches: expr) => {{
53 {
54 if !$batches.is_empty() {
55 let mut json_writer = $WRITER::new(&mut *$writer);
56 for batch in $batches {
57 json_writer.write(batch)?;
58 }
59 json_writer.finish()?;
60 json_finish!($WRITER, $writer);
61 }
62 }
63 Ok(()) as Result<()>
64 }};
65}
66
67macro_rules! json_finish {
68 (ArrayWriter, $writer: expr) => {{
69 writeln!($writer)?;
70 }};
71 (LineDelimitedWriter, $writer: expr) => {{}};
72}
73
74fn print_batches_with_sep<W: std::io::Write>(
75 writer: &mut W,
76 batches: &[RecordBatch],
77 delimiter: u8,
78 with_header: bool,
79) -> Result<()> {
80 let builder = WriterBuilder::new()
81 .with_header(with_header)
82 .with_delimiter(delimiter);
83 let mut csv_writer = builder.build(writer);
84
85 for batch in batches {
86 csv_writer.write(batch)?;
87 }
88
89 Ok(())
90}
91
92fn keep_only_maxrows(s: &str, maxrows: usize) -> String {
93 let lines: Vec<String> = s.lines().map(String::from).collect();
94
95 assert!(lines.len() >= maxrows + 4); let last_line = &lines[lines.len() - 1]; let spaces = last_line.len().saturating_sub(4);
100 let dotted_line = format!("| .{:<spaces$}|", "", spaces = spaces);
101
102 let mut result = lines[0..(maxrows + 3)].to_vec(); result.extend(vec![dotted_line; 3]); result.push(last_line.clone());
105
106 result.join("\n")
107}
108
109fn format_batches_with_maxrows<W: std::io::Write>(
110 writer: &mut W,
111 batches: &[RecordBatch],
112 maxrows: MaxRows,
113 format_options: &FormatOptions,
114) -> Result<()> {
115 let options: arrow::util::display::FormatOptions = format_options.try_into()?;
116
117 match maxrows {
118 MaxRows::Limited(maxrows) => {
119 let mut filtered_batches = Vec::new();
121 let mut row_count: usize = 0;
122 let mut over_limit = false;
123 for batch in batches {
124 if row_count + batch.num_rows() > maxrows {
125 let limit = maxrows - row_count;
127 let sliced_batch = batch.slice(0, limit);
128 filtered_batches.push(sliced_batch);
129 over_limit = true;
130 break;
131 } else {
132 filtered_batches.push(batch.clone());
133 row_count += batch.num_rows();
134 }
135 }
136
137 let formatted =
138 pretty_format_batches_with_options(&filtered_batches, &options)?;
139 if over_limit {
140 let mut formatted_str = format!("{formatted}");
141 formatted_str = keep_only_maxrows(&formatted_str, maxrows);
142 writeln!(writer, "{formatted_str}")?;
143 } else {
144 writeln!(writer, "{formatted}")?;
145 }
146 }
147 MaxRows::Unlimited => {
148 let formatted = pretty_format_batches_with_options(batches, &options)?;
149 writeln!(writer, "{formatted}")?;
150 }
151 }
152
153 Ok(())
154}
155
156impl PrintFormat {
157 pub fn print_batches<W: std::io::Write>(
159 &self,
160 writer: &mut W,
161 schema: SchemaRef,
162 batches: &[RecordBatch],
163 maxrows: MaxRows,
164 with_header: bool,
165 format_options: &FormatOptions,
166 ) -> Result<()> {
167 let batches: Vec<_> = batches
169 .iter()
170 .filter(|b| b.num_rows() > 0)
171 .cloned()
172 .collect();
173 if batches.is_empty() {
174 return self.print_empty(writer, schema, format_options);
175 }
176
177 match self {
178 Self::Csv | Self::Automatic => {
179 print_batches_with_sep(writer, &batches, b',', with_header)
180 }
181 Self::Tsv => print_batches_with_sep(writer, &batches, b'\t', with_header),
182 Self::Table => {
183 if maxrows == MaxRows::Limited(0) {
184 return Ok(());
185 }
186 format_batches_with_maxrows(writer, &batches, maxrows, format_options)
187 }
188 Self::Json => batches_to_json!(ArrayWriter, writer, &batches),
189 Self::NdJson => batches_to_json!(LineDelimitedWriter, writer, &batches),
190 }
191 }
192
193 fn print_empty<W: std::io::Write>(
195 &self,
196 writer: &mut W,
197 schema: SchemaRef,
198 format_options: &FormatOptions,
199 ) -> Result<()> {
200 match self {
201 Self::Table if !schema.fields().is_empty() => {
203 let format_options: arrow::util::display::FormatOptions =
204 format_options.try_into()?;
205
206 let empty_batch = RecordBatch::new_empty(schema);
207 let formatted =
208 pretty_format_batches_with_options(&[empty_batch], &format_options)?;
209 writeln!(writer, "{formatted}")?;
210 }
211 _ => {}
212 }
213 Ok(())
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220 use std::sync::Arc;
221
222 use arrow::array::Int32Array;
223 use arrow::datatypes::{DataType, Field, Schema};
224 use insta::{allow_duplicates, assert_snapshot};
225
226 #[test]
227 fn print_empty() {
228 for format in [
229 PrintFormat::Csv,
230 PrintFormat::Tsv,
231 PrintFormat::Json,
232 PrintFormat::NdJson,
233 PrintFormat::Automatic,
234 ] {
235 let output = PrintBatchesTest::new()
237 .with_format(format)
238 .with_schema(three_column_schema())
239 .with_batches(vec![])
240 .run();
241 assert_eq!(output, "")
242 }
243
244 let output = PrintBatchesTest::new()
246 .with_format(PrintFormat::Table)
247 .with_schema(three_column_schema())
248 .with_batches(vec![])
249 .run();
250 assert_snapshot!(output, @r"
251 +---+---+---+
252 | a | b | c |
253 +---+---+---+
254 +---+---+---+
255 ");
256 }
257
258 #[test]
259 fn print_csv_no_header() {
260 let output = PrintBatchesTest::new()
261 .with_format(PrintFormat::Csv)
262 .with_batches(split_batch(three_column_batch()))
263 .with_header(WithHeader::No)
264 .run();
265 assert_snapshot!(output, @r"
266 1,4,7
267 2,5,8
268 3,6,9
269 ");
270 }
271
272 #[test]
273 fn print_csv_with_header() {
274 let output = PrintBatchesTest::new()
275 .with_format(PrintFormat::Csv)
276 .with_batches(split_batch(three_column_batch()))
277 .with_header(WithHeader::Yes)
278 .run();
279 assert_snapshot!(output, @r"
280 a,b,c
281 1,4,7
282 2,5,8
283 3,6,9
284 ");
285 }
286
287 #[test]
288 fn print_tsv_no_header() {
289 let output = PrintBatchesTest::new()
290 .with_format(PrintFormat::Tsv)
291 .with_batches(split_batch(three_column_batch()))
292 .with_header(WithHeader::No)
293 .run();
294 assert_snapshot!(output, @r"
295 1 4 7
296 2 5 8
297 3 6 9
298 ")
299 }
300
301 #[test]
302 fn print_tsv_with_header() {
303 let output = PrintBatchesTest::new()
304 .with_format(PrintFormat::Tsv)
305 .with_batches(split_batch(three_column_batch()))
306 .with_header(WithHeader::Yes)
307 .run();
308 assert_snapshot!(output, @r"
309 a b c
310 1 4 7
311 2 5 8
312 3 6 9
313 ");
314 }
315
316 #[test]
317 fn print_table() {
318 let output = PrintBatchesTest::new()
319 .with_format(PrintFormat::Table)
320 .with_batches(split_batch(three_column_batch()))
321 .with_header(WithHeader::Ignored)
322 .run();
323 assert_snapshot!(output, @r"
324 +---+---+---+
325 | a | b | c |
326 +---+---+---+
327 | 1 | 4 | 7 |
328 | 2 | 5 | 8 |
329 | 3 | 6 | 9 |
330 +---+---+---+
331 ");
332 }
333 #[test]
334 fn print_json() {
335 let output = PrintBatchesTest::new()
336 .with_format(PrintFormat::Json)
337 .with_batches(split_batch(three_column_batch()))
338 .with_header(WithHeader::Ignored)
339 .run();
340 assert_snapshot!(output, @r#"[{"a":1,"b":4,"c":7},{"a":2,"b":5,"c":8},{"a":3,"b":6,"c":9}]"#);
341 }
342
343 #[test]
344 fn print_ndjson() {
345 let output = PrintBatchesTest::new()
346 .with_format(PrintFormat::NdJson)
347 .with_batches(split_batch(three_column_batch()))
348 .with_header(WithHeader::Ignored)
349 .run();
350 assert_snapshot!(output, @r#"
351 {"a":1,"b":4,"c":7}
352 {"a":2,"b":5,"c":8}
353 {"a":3,"b":6,"c":9}
354 "#);
355 }
356
357 #[test]
358 fn print_automatic_no_header() {
359 let output = PrintBatchesTest::new()
360 .with_format(PrintFormat::Automatic)
361 .with_batches(split_batch(three_column_batch()))
362 .with_header(WithHeader::No)
363 .run();
364 assert_snapshot!(output, @r"
365 1,4,7
366 2,5,8
367 3,6,9
368 ");
369 }
370 #[test]
371 fn print_automatic_with_header() {
372 let output = PrintBatchesTest::new()
373 .with_format(PrintFormat::Automatic)
374 .with_batches(split_batch(three_column_batch()))
375 .with_header(WithHeader::Yes)
376 .run();
377 assert_snapshot!(output, @r"
378 a,b,c
379 1,4,7
380 2,5,8
381 3,6,9
382 ");
383 }
384
385 #[test]
386 fn print_maxrows_unlimited() {
387 for max_rows in [MaxRows::Unlimited, MaxRows::Limited(5), MaxRows::Limited(3)] {
390 let output = PrintBatchesTest::new()
391 .with_format(PrintFormat::Table)
392 .with_schema(one_column_schema())
393 .with_batches(vec![one_column_batch()])
394 .with_maxrows(max_rows)
395 .run();
396 allow_duplicates! {
397 assert_snapshot!(output, @r"
398 +---+
399 | a |
400 +---+
401 | 1 |
402 | 2 |
403 | 3 |
404 +---+
405 ");
406 }
407 }
408 }
409
410 #[test]
411 fn print_maxrows_limited_one_batch() {
412 let output = PrintBatchesTest::new()
413 .with_format(PrintFormat::Table)
414 .with_batches(vec![one_column_batch()])
415 .with_maxrows(MaxRows::Limited(1))
416 .run();
417 assert_snapshot!(output, @r"
418 +---+
419 | a |
420 +---+
421 | 1 |
422 | . |
423 | . |
424 | . |
425 +---+
426 ");
427 }
428
429 #[test]
430 fn print_maxrows_limited_multi_batched() {
431 let output = PrintBatchesTest::new()
432 .with_format(PrintFormat::Table)
433 .with_batches(vec![
434 one_column_batch(),
435 one_column_batch(),
436 one_column_batch(),
437 ])
438 .with_maxrows(MaxRows::Limited(5))
439 .run();
440 assert_snapshot!(output, @r"
441 +---+
442 | a |
443 +---+
444 | 1 |
445 | 2 |
446 | 3 |
447 | 1 |
448 | 2 |
449 | . |
450 | . |
451 | . |
452 +---+
453 ");
454 }
455
456 #[test]
457 fn test_print_batches_empty_batches() {
458 let batch = one_column_batch();
459 let empty_batch = RecordBatch::new_empty(batch.schema());
460
461 let output = PrintBatchesTest::new()
462 .with_format(PrintFormat::Table)
463 .with_batches(vec![empty_batch.clone(), batch, empty_batch])
464 .run();
465 assert_snapshot!(output, @r"
466 +---+
467 | a |
468 +---+
469 | 1 |
470 | 2 |
471 | 3 |
472 +---+
473 ");
474 }
475
476 #[test]
477 fn test_print_batches_empty_batch() {
478 let empty_batch = RecordBatch::new_empty(one_column_batch().schema());
479
480 let output = PrintBatchesTest::new()
482 .with_format(PrintFormat::Table)
483 .with_schema(one_column_schema())
484 .with_batches(vec![empty_batch])
485 .with_header(WithHeader::Yes)
486 .run();
487 assert_snapshot!(output, @r"
488 +---+
489 | a |
490 +---+
491 +---+
492 ");
493
494 let empty_batch = RecordBatch::new_empty(Arc::new(Schema::empty()));
496 let output = PrintBatchesTest::new()
497 .with_format(PrintFormat::Table)
498 .with_schema(Arc::new(Schema::empty()))
499 .with_batches(vec![empty_batch])
500 .with_header(WithHeader::Yes)
501 .run();
502 assert_eq!(output, "")
503 }
504
505 #[derive(Debug)]
506 struct PrintBatchesTest {
507 format: PrintFormat,
508 schema: SchemaRef,
509 batches: Vec<RecordBatch>,
510 maxrows: MaxRows,
511 with_header: WithHeader,
512 }
513
514 #[derive(Debug, Clone)]
516 enum WithHeader {
517 Yes,
518 No,
519 Ignored,
521 }
522
523 impl PrintBatchesTest {
524 fn new() -> Self {
525 Self {
526 format: PrintFormat::Table,
527 schema: Arc::new(Schema::empty()),
528 batches: vec![],
529 maxrows: MaxRows::Unlimited,
530 with_header: WithHeader::Ignored,
531 }
532 }
533
534 fn with_format(mut self, format: PrintFormat) -> Self {
536 self.format = format;
537 self
538 }
539
540 fn with_schema(mut self, schema: SchemaRef) -> Self {
542 self.schema = schema;
543 self
544 }
545
546 fn with_batches(mut self, batches: Vec<RecordBatch>) -> Self {
548 self.batches = batches;
549 self
550 }
551
552 fn with_maxrows(mut self, maxrows: MaxRows) -> Self {
554 self.maxrows = maxrows;
555 self
556 }
557
558 fn with_header(mut self, with_header: WithHeader) -> Self {
560 self.with_header = with_header;
561 self
562 }
563
564 fn run(self) -> String {
567 match self.with_header {
568 WithHeader::Yes => self.output_with_header(true),
569 WithHeader::No => self.output_with_header(false),
570 WithHeader::Ignored => {
571 let output = self.output_with_header(true);
572 let output_without_header = self.output_with_header(false);
574 assert_eq!(
575 output, output_without_header,
576 "Expected output to be the same with or without header"
577 );
578 output
579 }
580 }
581 }
582
583 fn output_with_header(&self, with_header: bool) -> String {
584 let mut buffer: Vec<u8> = vec![];
585 self.format
586 .print_batches(
587 &mut buffer,
588 self.schema.clone(),
589 &self.batches,
590 self.maxrows,
591 with_header,
592 &FormatOptions::default(),
593 )
594 .unwrap();
595 String::from_utf8(buffer).unwrap()
596 }
597 }
598
599 fn three_column_schema() -> SchemaRef {
601 Arc::new(Schema::new(vec![
602 Field::new("a", DataType::Int32, false),
603 Field::new("b", DataType::Int32, false),
604 Field::new("c", DataType::Int32, false),
605 ]))
606 }
607
608 fn three_column_batch() -> RecordBatch {
610 RecordBatch::try_new(
611 three_column_schema(),
612 vec![
613 Arc::new(Int32Array::from(vec![1, 2, 3])),
614 Arc::new(Int32Array::from(vec![4, 5, 6])),
615 Arc::new(Int32Array::from(vec![7, 8, 9])),
616 ],
617 )
618 .unwrap()
619 }
620
621 fn one_column_schema() -> SchemaRef {
623 Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]))
624 }
625
626 fn one_column_batch() -> RecordBatch {
628 RecordBatch::try_new(
629 one_column_schema(),
630 vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
631 )
632 .unwrap()
633 }
634
635 fn split_batch(batch: RecordBatch) -> Vec<RecordBatch> {
637 assert!(batch.num_rows() > 1);
638 let split = batch.num_rows() / 2;
639 vec![
640 batch.slice(0, split),
641 batch.slice(split, batch.num_rows() - split),
642 ]
643 }
644}