1use std::str::FromStr;
21
22use crate::print_options::MaxRows;
23
24use arrow::csv::writer::WriterBuilder;
25use arrow::datatypes::SchemaRef;
26use arrow::json::{ArrayWriter, LineDelimitedWriter};
27use arrow::record_batch::RecordBatch;
28use arrow::util::pretty::pretty_format_batches_with_options;
29use datafusion::common::format::DEFAULT_CLI_FORMAT_OPTIONS;
30use datafusion::error::Result;
31
32#[derive(Debug, PartialEq, Eq, clap::ValueEnum, Clone, Copy)]
34pub enum PrintFormat {
35 Csv,
36 Tsv,
37 Table,
38 Json,
39 NdJson,
40 Automatic,
41}
42
43impl FromStr for PrintFormat {
44 type Err = String;
45
46 fn from_str(s: &str) -> Result<Self, Self::Err> {
47 clap::ValueEnum::from_str(s, true)
48 }
49}
50
51macro_rules! batches_to_json {
52 ($WRITER: ident, $writer: expr, $batches: expr) => {{
53 {
54 if !$batches.is_empty() {
55 let mut json_writer = $WRITER::new(&mut *$writer);
56 for batch in $batches {
57 json_writer.write(batch)?;
58 }
59 json_writer.finish()?;
60 json_finish!($WRITER, $writer);
61 }
62 }
63 Ok(()) as Result<()>
64 }};
65}
66
67macro_rules! json_finish {
68 (ArrayWriter, $writer: expr) => {{
69 writeln!($writer)?;
70 }};
71 (LineDelimitedWriter, $writer: expr) => {{}};
72}
73
74fn print_batches_with_sep<W: std::io::Write>(
75 writer: &mut W,
76 batches: &[RecordBatch],
77 delimiter: u8,
78 with_header: bool,
79) -> Result<()> {
80 let builder = WriterBuilder::new()
81 .with_header(with_header)
82 .with_delimiter(delimiter);
83 let mut csv_writer = builder.build(writer);
84
85 for batch in batches {
86 csv_writer.write(batch)?;
87 }
88
89 Ok(())
90}
91
92fn keep_only_maxrows(s: &str, maxrows: usize) -> String {
93 let lines: Vec<String> = s.lines().map(String::from).collect();
94
95 assert!(lines.len() >= maxrows + 4); let last_line = &lines[lines.len() - 1]; let spaces = last_line.len().saturating_sub(4);
100 let dotted_line = format!("| .{:<spaces$}|", "", spaces = spaces);
101
102 let mut result = lines[0..(maxrows + 3)].to_vec(); result.extend(vec![dotted_line; 3]); result.push(last_line.clone());
105
106 result.join("\n")
107}
108
109fn format_batches_with_maxrows<W: std::io::Write>(
110 writer: &mut W,
111 batches: &[RecordBatch],
112 maxrows: MaxRows,
113) -> Result<()> {
114 match maxrows {
115 MaxRows::Limited(maxrows) => {
116 let mut filtered_batches = Vec::new();
118 let mut row_count: usize = 0;
119 let mut over_limit = false;
120 for batch in batches {
121 if row_count + batch.num_rows() > maxrows {
122 let limit = maxrows - row_count;
124 let sliced_batch = batch.slice(0, limit);
125 filtered_batches.push(sliced_batch);
126 over_limit = true;
127 break;
128 } else {
129 filtered_batches.push(batch.clone());
130 row_count += batch.num_rows();
131 }
132 }
133
134 let formatted = pretty_format_batches_with_options(
135 &filtered_batches,
136 &DEFAULT_CLI_FORMAT_OPTIONS,
137 )?;
138 if over_limit {
139 let mut formatted_str = format!("{}", formatted);
140 formatted_str = keep_only_maxrows(&formatted_str, maxrows);
141 writeln!(writer, "{}", formatted_str)?;
142 } else {
143 writeln!(writer, "{}", formatted)?;
144 }
145 }
146 MaxRows::Unlimited => {
147 let formatted =
148 pretty_format_batches_with_options(batches, &DEFAULT_CLI_FORMAT_OPTIONS)?;
149 writeln!(writer, "{}", formatted)?;
150 }
151 }
152
153 Ok(())
154}
155
156impl PrintFormat {
157 pub fn print_batches<W: std::io::Write>(
159 &self,
160 writer: &mut W,
161 schema: SchemaRef,
162 batches: &[RecordBatch],
163 maxrows: MaxRows,
164 with_header: bool,
165 ) -> Result<()> {
166 let batches: Vec<_> = batches
168 .iter()
169 .filter(|b| b.num_rows() > 0)
170 .cloned()
171 .collect();
172 if batches.is_empty() {
173 return self.print_empty(writer, schema);
174 }
175
176 match self {
177 Self::Csv | Self::Automatic => {
178 print_batches_with_sep(writer, &batches, b',', with_header)
179 }
180 Self::Tsv => print_batches_with_sep(writer, &batches, b'\t', with_header),
181 Self::Table => {
182 if maxrows == MaxRows::Limited(0) {
183 return Ok(());
184 }
185 format_batches_with_maxrows(writer, &batches, maxrows)
186 }
187 Self::Json => batches_to_json!(ArrayWriter, writer, &batches),
188 Self::NdJson => batches_to_json!(LineDelimitedWriter, writer, &batches),
189 }
190 }
191
192 fn print_empty<W: std::io::Write>(
194 &self,
195 writer: &mut W,
196 schema: SchemaRef,
197 ) -> Result<()> {
198 match self {
199 Self::Table if !schema.fields().is_empty() => {
201 let empty_batch = RecordBatch::new_empty(schema);
202 let formatted = pretty_format_batches_with_options(
203 &[empty_batch],
204 &DEFAULT_CLI_FORMAT_OPTIONS,
205 )?;
206 writeln!(writer, "{}", formatted)?;
207 }
208 _ => {}
209 }
210 Ok(())
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use super::*;
217 use std::sync::Arc;
218
219 use arrow::array::Int32Array;
220 use arrow::datatypes::{DataType, Field, Schema};
221
222 #[test]
223 fn print_empty() {
224 for format in [
225 PrintFormat::Csv,
226 PrintFormat::Tsv,
227 PrintFormat::Json,
228 PrintFormat::NdJson,
229 PrintFormat::Automatic,
230 ] {
231 PrintBatchesTest::new()
233 .with_format(format)
234 .with_schema(three_column_schema())
235 .with_batches(vec![])
236 .with_expected(&[""])
237 .run();
238 }
239
240 #[rustfmt::skip]
242 let expected = &[
243 "+---+---+---+",
244 "| a | b | c |",
245 "+---+---+---+",
246 "+---+---+---+",
247 ];
248 PrintBatchesTest::new()
249 .with_format(PrintFormat::Table)
250 .with_schema(three_column_schema())
251 .with_batches(vec![])
252 .with_expected(expected)
253 .run();
254 }
255
256 #[test]
257 fn print_csv_no_header() {
258 #[rustfmt::skip]
259 let expected = &[
260 "1,4,7",
261 "2,5,8",
262 "3,6,9",
263 ];
264
265 PrintBatchesTest::new()
266 .with_format(PrintFormat::Csv)
267 .with_batches(split_batch(three_column_batch()))
268 .with_header(WithHeader::No)
269 .with_expected(expected)
270 .run();
271 }
272
273 #[test]
274 fn print_csv_with_header() {
275 #[rustfmt::skip]
276 let expected = &[
277 "a,b,c",
278 "1,4,7",
279 "2,5,8",
280 "3,6,9",
281 ];
282
283 PrintBatchesTest::new()
284 .with_format(PrintFormat::Csv)
285 .with_batches(split_batch(three_column_batch()))
286 .with_header(WithHeader::Yes)
287 .with_expected(expected)
288 .run();
289 }
290
291 #[test]
292 fn print_tsv_no_header() {
293 #[rustfmt::skip]
294 let expected = &[
295 "1\t4\t7",
296 "2\t5\t8",
297 "3\t6\t9",
298 ];
299
300 PrintBatchesTest::new()
301 .with_format(PrintFormat::Tsv)
302 .with_batches(split_batch(three_column_batch()))
303 .with_header(WithHeader::No)
304 .with_expected(expected)
305 .run();
306 }
307
308 #[test]
309 fn print_tsv_with_header() {
310 #[rustfmt::skip]
311 let expected = &[
312 "a\tb\tc",
313 "1\t4\t7",
314 "2\t5\t8",
315 "3\t6\t9",
316 ];
317
318 PrintBatchesTest::new()
319 .with_format(PrintFormat::Tsv)
320 .with_batches(split_batch(three_column_batch()))
321 .with_header(WithHeader::Yes)
322 .with_expected(expected)
323 .run();
324 }
325
326 #[test]
327 fn print_table() {
328 let expected = &[
329 "+---+---+---+",
330 "| a | b | c |",
331 "+---+---+---+",
332 "| 1 | 4 | 7 |",
333 "| 2 | 5 | 8 |",
334 "| 3 | 6 | 9 |",
335 "+---+---+---+",
336 ];
337
338 PrintBatchesTest::new()
339 .with_format(PrintFormat::Table)
340 .with_batches(split_batch(three_column_batch()))
341 .with_header(WithHeader::Ignored)
342 .with_expected(expected)
343 .run();
344 }
345 #[test]
346 fn print_json() {
347 let expected =
348 &[r#"[{"a":1,"b":4,"c":7},{"a":2,"b":5,"c":8},{"a":3,"b":6,"c":9}]"#];
349
350 PrintBatchesTest::new()
351 .with_format(PrintFormat::Json)
352 .with_batches(split_batch(three_column_batch()))
353 .with_header(WithHeader::Ignored)
354 .with_expected(expected)
355 .run();
356 }
357
358 #[test]
359 fn print_ndjson() {
360 let expected = &[
361 r#"{"a":1,"b":4,"c":7}"#,
362 r#"{"a":2,"b":5,"c":8}"#,
363 r#"{"a":3,"b":6,"c":9}"#,
364 ];
365
366 PrintBatchesTest::new()
367 .with_format(PrintFormat::NdJson)
368 .with_batches(split_batch(three_column_batch()))
369 .with_header(WithHeader::Ignored)
370 .with_expected(expected)
371 .run();
372 }
373
374 #[test]
375 fn print_automatic_no_header() {
376 #[rustfmt::skip]
377 let expected = &[
378 "1,4,7",
379 "2,5,8",
380 "3,6,9",
381 ];
382
383 PrintBatchesTest::new()
384 .with_format(PrintFormat::Automatic)
385 .with_batches(split_batch(three_column_batch()))
386 .with_header(WithHeader::No)
387 .with_expected(expected)
388 .run();
389 }
390 #[test]
391 fn print_automatic_with_header() {
392 #[rustfmt::skip]
393 let expected = &[
394 "a,b,c",
395 "1,4,7",
396 "2,5,8",
397 "3,6,9",
398 ];
399
400 PrintBatchesTest::new()
401 .with_format(PrintFormat::Automatic)
402 .with_batches(split_batch(three_column_batch()))
403 .with_header(WithHeader::Yes)
404 .with_expected(expected)
405 .run();
406 }
407
408 #[test]
409 fn print_maxrows_unlimited() {
410 #[rustfmt::skip]
411 let expected = &[
412 "+---+",
413 "| a |",
414 "+---+",
415 "| 1 |",
416 "| 2 |",
417 "| 3 |",
418 "+---+",
419 ];
420
421 for max_rows in [MaxRows::Unlimited, MaxRows::Limited(5), MaxRows::Limited(3)] {
424 PrintBatchesTest::new()
425 .with_format(PrintFormat::Table)
426 .with_schema(one_column_schema())
427 .with_batches(vec![one_column_batch()])
428 .with_maxrows(max_rows)
429 .with_expected(expected)
430 .run();
431 }
432 }
433
434 #[test]
435 fn print_maxrows_limited_one_batch() {
436 #[rustfmt::skip]
437 let expected = &[
438 "+---+",
439 "| a |",
440 "+---+",
441 "| 1 |",
442 "| . |",
443 "| . |",
444 "| . |",
445 "+---+",
446 ];
447
448 PrintBatchesTest::new()
449 .with_format(PrintFormat::Table)
450 .with_batches(vec![one_column_batch()])
451 .with_maxrows(MaxRows::Limited(1))
452 .with_expected(expected)
453 .run();
454 }
455
456 #[test]
457 fn print_maxrows_limited_multi_batched() {
458 #[rustfmt::skip]
459 let expected = &[
460 "+---+",
461 "| a |",
462 "+---+",
463 "| 1 |",
464 "| 2 |",
465 "| 3 |",
466 "| 1 |",
467 "| 2 |",
468 "| . |",
469 "| . |",
470 "| . |",
471 "+---+",
472 ];
473
474 PrintBatchesTest::new()
475 .with_format(PrintFormat::Table)
476 .with_batches(vec![
477 one_column_batch(),
478 one_column_batch(),
479 one_column_batch(),
480 ])
481 .with_maxrows(MaxRows::Limited(5))
482 .with_expected(expected)
483 .run();
484 }
485
486 #[test]
487 fn test_print_batches_empty_batches() {
488 let batch = one_column_batch();
489 let empty_batch = RecordBatch::new_empty(batch.schema());
490
491 #[rustfmt::skip]
492 let expected =&[
493 "+---+",
494 "| a |",
495 "+---+",
496 "| 1 |",
497 "| 2 |",
498 "| 3 |",
499 "+---+",
500 ];
501
502 PrintBatchesTest::new()
503 .with_format(PrintFormat::Table)
504 .with_batches(vec![empty_batch.clone(), batch, empty_batch])
505 .with_expected(expected)
506 .run();
507 }
508
509 #[test]
510 fn test_print_batches_empty_batch() {
511 let empty_batch = RecordBatch::new_empty(one_column_batch().schema());
512
513 #[rustfmt::skip]
515 let expected =&[
516 "+---+",
517 "| a |",
518 "+---+",
519 "+---+",
520 ];
521
522 PrintBatchesTest::new()
523 .with_format(PrintFormat::Table)
524 .with_schema(one_column_schema())
525 .with_batches(vec![empty_batch])
526 .with_header(WithHeader::Yes)
527 .with_expected(expected)
528 .run();
529
530 let empty_batch = RecordBatch::new_empty(Arc::new(Schema::empty()));
532 let expected = &[""];
533 PrintBatchesTest::new()
534 .with_format(PrintFormat::Table)
535 .with_schema(Arc::new(Schema::empty()))
536 .with_batches(vec![empty_batch])
537 .with_header(WithHeader::Yes)
538 .with_expected(expected)
539 .run();
540 }
541
542 #[derive(Debug)]
543 struct PrintBatchesTest {
544 format: PrintFormat,
545 schema: SchemaRef,
546 batches: Vec<RecordBatch>,
547 maxrows: MaxRows,
548 with_header: WithHeader,
549 expected: Vec<&'static str>,
550 }
551
552 #[derive(Debug, Clone)]
554 enum WithHeader {
555 Yes,
556 No,
557 Ignored,
559 }
560
561 impl PrintBatchesTest {
562 fn new() -> Self {
563 Self {
564 format: PrintFormat::Table,
565 schema: Arc::new(Schema::empty()),
566 batches: vec![],
567 maxrows: MaxRows::Unlimited,
568 with_header: WithHeader::Ignored,
569 expected: vec![],
570 }
571 }
572
573 fn with_format(mut self, format: PrintFormat) -> Self {
575 self.format = format;
576 self
577 }
578
579 fn with_schema(mut self, schema: SchemaRef) -> Self {
581 self.schema = schema;
582 self
583 }
584
585 fn with_batches(mut self, batches: Vec<RecordBatch>) -> Self {
587 self.batches = batches;
588 self
589 }
590
591 fn with_maxrows(mut self, maxrows: MaxRows) -> Self {
593 self.maxrows = maxrows;
594 self
595 }
596
597 fn with_header(mut self, with_header: WithHeader) -> Self {
599 self.with_header = with_header;
600 self
601 }
602
603 fn with_expected(mut self, expected: &[&'static str]) -> Self {
605 self.expected = expected.to_vec();
606 self
607 }
608
609 fn run(self) {
611 let actual = self.output();
612 let actual: Vec<_> = actual.trim_end().split('\n').collect();
613 let expected = self.expected;
614 assert_eq!(
615 actual, expected,
616 "\n\nactual:\n{actual:#?}\n\nexpected:\n{expected:#?}"
617 );
618 }
619
620 fn output(&self) -> String {
622 match self.with_header {
623 WithHeader::Yes => self.output_with_header(true),
624 WithHeader::No => self.output_with_header(false),
625 WithHeader::Ignored => {
626 let output = self.output_with_header(true);
627 let output_without_header = self.output_with_header(false);
629 assert_eq!(
630 output, output_without_header,
631 "Expected output to be the same with or without header"
632 );
633 output
634 }
635 }
636 }
637
638 fn output_with_header(&self, with_header: bool) -> String {
639 let mut buffer: Vec<u8> = vec![];
640 self.format
641 .print_batches(
642 &mut buffer,
643 self.schema.clone(),
644 &self.batches,
645 self.maxrows,
646 with_header,
647 )
648 .unwrap();
649 String::from_utf8(buffer).unwrap()
650 }
651 }
652
653 fn three_column_schema() -> SchemaRef {
655 Arc::new(Schema::new(vec![
656 Field::new("a", DataType::Int32, false),
657 Field::new("b", DataType::Int32, false),
658 Field::new("c", DataType::Int32, false),
659 ]))
660 }
661
662 fn three_column_batch() -> RecordBatch {
664 RecordBatch::try_new(
665 three_column_schema(),
666 vec![
667 Arc::new(Int32Array::from(vec![1, 2, 3])),
668 Arc::new(Int32Array::from(vec![4, 5, 6])),
669 Arc::new(Int32Array::from(vec![7, 8, 9])),
670 ],
671 )
672 .unwrap()
673 }
674
675 fn one_column_schema() -> SchemaRef {
677 Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]))
678 }
679
680 fn one_column_batch() -> RecordBatch {
682 RecordBatch::try_new(
683 one_column_schema(),
684 vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
685 )
686 .unwrap()
687 }
688
689 fn split_batch(batch: RecordBatch) -> Vec<RecordBatch> {
691 assert!(batch.num_rows() > 1);
692 let split = batch.num_rows() / 2;
693 vec![
694 batch.slice(0, split),
695 batch.slice(split, batch.num_rows() - split),
696 ]
697 }
698}