1use std::str::FromStr;
21
22use crate::print_options::MaxRows;
23
24use arrow::csv::writer::WriterBuilder;
25use arrow::datatypes::SchemaRef;
26use arrow::json::{ArrayWriter, LineDelimitedWriter};
27use arrow::record_batch::RecordBatch;
28use arrow::util::pretty::pretty_format_batches_with_options;
29use datafusion::config::FormatOptions;
30use datafusion::error::Result;
31
32#[derive(Debug, PartialEq, Eq, clap::ValueEnum, Clone, Copy)]
34pub enum PrintFormat {
35 Csv,
36 Tsv,
37 Table,
38 Json,
39 NdJson,
40 Automatic,
41}
42
43impl FromStr for PrintFormat {
44 type Err = String;
45
46 fn from_str(s: &str) -> Result<Self, Self::Err> {
47 clap::ValueEnum::from_str(s, true)
48 }
49}
50
51macro_rules! batches_to_json {
52 ($WRITER: ident, $writer: expr, $batches: expr) => {{
53 {
54 if !$batches.is_empty() {
55 let mut json_writer = $WRITER::new(&mut *$writer);
56 for batch in $batches {
57 json_writer.write(batch)?;
58 }
59 json_writer.finish()?;
60 json_finish!($WRITER, $writer);
61 }
62 }
63 Ok(()) as Result<()>
64 }};
65}
66
67macro_rules! json_finish {
68 (ArrayWriter, $writer: expr) => {{
69 writeln!($writer)?;
70 }};
71 (LineDelimitedWriter, $writer: expr) => {{}};
72}
73
74fn print_batches_with_sep<W: std::io::Write>(
75 writer: &mut W,
76 batches: &[RecordBatch],
77 delimiter: u8,
78 with_header: bool,
79) -> Result<()> {
80 let builder = WriterBuilder::new()
81 .with_header(with_header)
82 .with_delimiter(delimiter);
83 let mut csv_writer = builder.build(writer);
84
85 for batch in batches {
86 csv_writer.write(batch)?;
87 }
88
89 Ok(())
90}
91
92fn keep_only_maxrows(s: &str, maxrows: usize) -> String {
93 let lines: Vec<String> = s.lines().map(String::from).collect();
94
95 assert!(lines.len() >= maxrows + 4); let last_line = &lines[lines.len() - 1]; let spaces = last_line.len().saturating_sub(4);
100 let dotted_line = format!("| .{:<spaces$}|", "", spaces = spaces);
101
102 let mut result = lines[0..(maxrows + 3)].to_vec(); result.extend(vec![dotted_line; 3]); result.push(last_line.clone());
105
106 result.join("\n")
107}
108
109fn format_batches_with_maxrows<W: std::io::Write>(
110 writer: &mut W,
111 batches: &[RecordBatch],
112 maxrows: MaxRows,
113 format_options: &FormatOptions,
114) -> Result<()> {
115 let options: arrow::util::display::FormatOptions = format_options.try_into()?;
116
117 match maxrows {
118 MaxRows::Limited(maxrows) => {
119 let mut filtered_batches = Vec::new();
121 let mut row_count: usize = 0;
122 let mut over_limit = false;
123 for batch in batches {
124 if row_count + batch.num_rows() > maxrows {
125 let limit = maxrows - row_count;
127 let sliced_batch = batch.slice(0, limit);
128 filtered_batches.push(sliced_batch);
129 over_limit = true;
130 break;
131 } else {
132 filtered_batches.push(batch.clone());
133 row_count += batch.num_rows();
134 }
135 }
136
137 let formatted =
138 pretty_format_batches_with_options(&filtered_batches, &options)?;
139 if over_limit {
140 let mut formatted_str = format!("{formatted}");
141 formatted_str = keep_only_maxrows(&formatted_str, maxrows);
142 writeln!(writer, "{formatted_str}")?;
143 } else {
144 writeln!(writer, "{formatted}")?;
145 }
146 }
147 MaxRows::Unlimited => {
148 let formatted = pretty_format_batches_with_options(batches, &options)?;
149 writeln!(writer, "{formatted}")?;
150 }
151 }
152
153 Ok(())
154}
155
156impl PrintFormat {
157 pub fn print_batches<W: std::io::Write>(
159 &self,
160 writer: &mut W,
161 schema: SchemaRef,
162 batches: &[RecordBatch],
163 maxrows: MaxRows,
164 with_header: bool,
165 format_options: &FormatOptions,
166 ) -> Result<()> {
167 let batches: Vec<_> = batches
169 .iter()
170 .filter(|b| b.num_rows() > 0)
171 .cloned()
172 .collect();
173 if batches.is_empty() {
174 return self.print_empty(writer, schema, format_options);
175 }
176
177 match self {
178 Self::Csv | Self::Automatic => {
179 print_batches_with_sep(writer, &batches, b',', with_header)
180 }
181 Self::Tsv => print_batches_with_sep(writer, &batches, b'\t', with_header),
182 Self::Table => {
183 if maxrows == MaxRows::Limited(0) {
184 return Ok(());
185 }
186 format_batches_with_maxrows(writer, &batches, maxrows, format_options)
187 }
188 Self::Json => batches_to_json!(ArrayWriter, writer, &batches),
189 Self::NdJson => batches_to_json!(LineDelimitedWriter, writer, &batches),
190 }
191 }
192
193 fn print_empty<W: std::io::Write>(
195 &self,
196 writer: &mut W,
197 schema: SchemaRef,
198 format_options: &FormatOptions,
199 ) -> Result<()> {
200 match self {
201 Self::Table if !schema.fields().is_empty() => {
203 let format_options: arrow::util::display::FormatOptions =
204 format_options.try_into()?;
205
206 let empty_batch = RecordBatch::new_empty(schema);
207 let formatted =
208 pretty_format_batches_with_options(&[empty_batch], &format_options)?;
209 writeln!(writer, "{formatted}")?;
210 }
211 _ => {}
212 }
213 Ok(())
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220 use std::sync::Arc;
221
222 use arrow::array::Int32Array;
223 use arrow::datatypes::{DataType, Field, Schema};
224
225 #[test]
226 fn print_empty() {
227 for format in [
228 PrintFormat::Csv,
229 PrintFormat::Tsv,
230 PrintFormat::Json,
231 PrintFormat::NdJson,
232 PrintFormat::Automatic,
233 ] {
234 PrintBatchesTest::new()
236 .with_format(format)
237 .with_schema(three_column_schema())
238 .with_batches(vec![])
239 .with_expected(&[""])
240 .run();
241 }
242
243 #[rustfmt::skip]
245 let expected = &[
246 "+---+---+---+",
247 "| a | b | c |",
248 "+---+---+---+",
249 "+---+---+---+",
250 ];
251 PrintBatchesTest::new()
252 .with_format(PrintFormat::Table)
253 .with_schema(three_column_schema())
254 .with_batches(vec![])
255 .with_expected(expected)
256 .run();
257 }
258
259 #[test]
260 fn print_csv_no_header() {
261 #[rustfmt::skip]
262 let expected = &[
263 "1,4,7",
264 "2,5,8",
265 "3,6,9",
266 ];
267
268 PrintBatchesTest::new()
269 .with_format(PrintFormat::Csv)
270 .with_batches(split_batch(three_column_batch()))
271 .with_header(WithHeader::No)
272 .with_expected(expected)
273 .run();
274 }
275
276 #[test]
277 fn print_csv_with_header() {
278 #[rustfmt::skip]
279 let expected = &[
280 "a,b,c",
281 "1,4,7",
282 "2,5,8",
283 "3,6,9",
284 ];
285
286 PrintBatchesTest::new()
287 .with_format(PrintFormat::Csv)
288 .with_batches(split_batch(three_column_batch()))
289 .with_header(WithHeader::Yes)
290 .with_expected(expected)
291 .run();
292 }
293
294 #[test]
295 fn print_tsv_no_header() {
296 #[rustfmt::skip]
297 let expected = &[
298 "1\t4\t7",
299 "2\t5\t8",
300 "3\t6\t9",
301 ];
302
303 PrintBatchesTest::new()
304 .with_format(PrintFormat::Tsv)
305 .with_batches(split_batch(three_column_batch()))
306 .with_header(WithHeader::No)
307 .with_expected(expected)
308 .run();
309 }
310
311 #[test]
312 fn print_tsv_with_header() {
313 #[rustfmt::skip]
314 let expected = &[
315 "a\tb\tc",
316 "1\t4\t7",
317 "2\t5\t8",
318 "3\t6\t9",
319 ];
320
321 PrintBatchesTest::new()
322 .with_format(PrintFormat::Tsv)
323 .with_batches(split_batch(three_column_batch()))
324 .with_header(WithHeader::Yes)
325 .with_expected(expected)
326 .run();
327 }
328
329 #[test]
330 fn print_table() {
331 let expected = &[
332 "+---+---+---+",
333 "| a | b | c |",
334 "+---+---+---+",
335 "| 1 | 4 | 7 |",
336 "| 2 | 5 | 8 |",
337 "| 3 | 6 | 9 |",
338 "+---+---+---+",
339 ];
340
341 PrintBatchesTest::new()
342 .with_format(PrintFormat::Table)
343 .with_batches(split_batch(three_column_batch()))
344 .with_header(WithHeader::Ignored)
345 .with_expected(expected)
346 .run();
347 }
348 #[test]
349 fn print_json() {
350 let expected =
351 &[r#"[{"a":1,"b":4,"c":7},{"a":2,"b":5,"c":8},{"a":3,"b":6,"c":9}]"#];
352
353 PrintBatchesTest::new()
354 .with_format(PrintFormat::Json)
355 .with_batches(split_batch(three_column_batch()))
356 .with_header(WithHeader::Ignored)
357 .with_expected(expected)
358 .run();
359 }
360
361 #[test]
362 fn print_ndjson() {
363 let expected = &[
364 r#"{"a":1,"b":4,"c":7}"#,
365 r#"{"a":2,"b":5,"c":8}"#,
366 r#"{"a":3,"b":6,"c":9}"#,
367 ];
368
369 PrintBatchesTest::new()
370 .with_format(PrintFormat::NdJson)
371 .with_batches(split_batch(three_column_batch()))
372 .with_header(WithHeader::Ignored)
373 .with_expected(expected)
374 .run();
375 }
376
377 #[test]
378 fn print_automatic_no_header() {
379 #[rustfmt::skip]
380 let expected = &[
381 "1,4,7",
382 "2,5,8",
383 "3,6,9",
384 ];
385
386 PrintBatchesTest::new()
387 .with_format(PrintFormat::Automatic)
388 .with_batches(split_batch(three_column_batch()))
389 .with_header(WithHeader::No)
390 .with_expected(expected)
391 .run();
392 }
393 #[test]
394 fn print_automatic_with_header() {
395 #[rustfmt::skip]
396 let expected = &[
397 "a,b,c",
398 "1,4,7",
399 "2,5,8",
400 "3,6,9",
401 ];
402
403 PrintBatchesTest::new()
404 .with_format(PrintFormat::Automatic)
405 .with_batches(split_batch(three_column_batch()))
406 .with_header(WithHeader::Yes)
407 .with_expected(expected)
408 .run();
409 }
410
411 #[test]
412 fn print_maxrows_unlimited() {
413 #[rustfmt::skip]
414 let expected = &[
415 "+---+",
416 "| a |",
417 "+---+",
418 "| 1 |",
419 "| 2 |",
420 "| 3 |",
421 "+---+",
422 ];
423
424 for max_rows in [MaxRows::Unlimited, MaxRows::Limited(5), MaxRows::Limited(3)] {
427 PrintBatchesTest::new()
428 .with_format(PrintFormat::Table)
429 .with_schema(one_column_schema())
430 .with_batches(vec![one_column_batch()])
431 .with_maxrows(max_rows)
432 .with_expected(expected)
433 .run();
434 }
435 }
436
437 #[test]
438 fn print_maxrows_limited_one_batch() {
439 #[rustfmt::skip]
440 let expected = &[
441 "+---+",
442 "| a |",
443 "+---+",
444 "| 1 |",
445 "| . |",
446 "| . |",
447 "| . |",
448 "+---+",
449 ];
450
451 PrintBatchesTest::new()
452 .with_format(PrintFormat::Table)
453 .with_batches(vec![one_column_batch()])
454 .with_maxrows(MaxRows::Limited(1))
455 .with_expected(expected)
456 .run();
457 }
458
459 #[test]
460 fn print_maxrows_limited_multi_batched() {
461 #[rustfmt::skip]
462 let expected = &[
463 "+---+",
464 "| a |",
465 "+---+",
466 "| 1 |",
467 "| 2 |",
468 "| 3 |",
469 "| 1 |",
470 "| 2 |",
471 "| . |",
472 "| . |",
473 "| . |",
474 "+---+",
475 ];
476
477 PrintBatchesTest::new()
478 .with_format(PrintFormat::Table)
479 .with_batches(vec![
480 one_column_batch(),
481 one_column_batch(),
482 one_column_batch(),
483 ])
484 .with_maxrows(MaxRows::Limited(5))
485 .with_expected(expected)
486 .run();
487 }
488
489 #[test]
490 fn test_print_batches_empty_batches() {
491 let batch = one_column_batch();
492 let empty_batch = RecordBatch::new_empty(batch.schema());
493
494 #[rustfmt::skip]
495 let expected =&[
496 "+---+",
497 "| a |",
498 "+---+",
499 "| 1 |",
500 "| 2 |",
501 "| 3 |",
502 "+---+",
503 ];
504
505 PrintBatchesTest::new()
506 .with_format(PrintFormat::Table)
507 .with_batches(vec![empty_batch.clone(), batch, empty_batch])
508 .with_expected(expected)
509 .run();
510 }
511
512 #[test]
513 fn test_print_batches_empty_batch() {
514 let empty_batch = RecordBatch::new_empty(one_column_batch().schema());
515
516 #[rustfmt::skip]
518 let expected =&[
519 "+---+",
520 "| a |",
521 "+---+",
522 "+---+",
523 ];
524
525 PrintBatchesTest::new()
526 .with_format(PrintFormat::Table)
527 .with_schema(one_column_schema())
528 .with_batches(vec![empty_batch])
529 .with_header(WithHeader::Yes)
530 .with_expected(expected)
531 .run();
532
533 let empty_batch = RecordBatch::new_empty(Arc::new(Schema::empty()));
535 let expected = &[""];
536 PrintBatchesTest::new()
537 .with_format(PrintFormat::Table)
538 .with_schema(Arc::new(Schema::empty()))
539 .with_batches(vec![empty_batch])
540 .with_header(WithHeader::Yes)
541 .with_expected(expected)
542 .run();
543 }
544
545 #[derive(Debug)]
546 struct PrintBatchesTest {
547 format: PrintFormat,
548 schema: SchemaRef,
549 batches: Vec<RecordBatch>,
550 maxrows: MaxRows,
551 with_header: WithHeader,
552 expected: Vec<&'static str>,
553 }
554
555 #[derive(Debug, Clone)]
557 enum WithHeader {
558 Yes,
559 No,
560 Ignored,
562 }
563
564 impl PrintBatchesTest {
565 fn new() -> Self {
566 Self {
567 format: PrintFormat::Table,
568 schema: Arc::new(Schema::empty()),
569 batches: vec![],
570 maxrows: MaxRows::Unlimited,
571 with_header: WithHeader::Ignored,
572 expected: vec![],
573 }
574 }
575
576 fn with_format(mut self, format: PrintFormat) -> Self {
578 self.format = format;
579 self
580 }
581
582 fn with_schema(mut self, schema: SchemaRef) -> Self {
584 self.schema = schema;
585 self
586 }
587
588 fn with_batches(mut self, batches: Vec<RecordBatch>) -> Self {
590 self.batches = batches;
591 self
592 }
593
594 fn with_maxrows(mut self, maxrows: MaxRows) -> Self {
596 self.maxrows = maxrows;
597 self
598 }
599
600 fn with_header(mut self, with_header: WithHeader) -> Self {
602 self.with_header = with_header;
603 self
604 }
605
606 fn with_expected(mut self, expected: &[&'static str]) -> Self {
608 self.expected = expected.to_vec();
609 self
610 }
611
612 fn run(self) {
614 let actual = self.output();
615 let actual: Vec<_> = actual.trim_end().split('\n').collect();
616 let expected = self.expected;
617 assert_eq!(
618 actual, expected,
619 "\n\nactual:\n{actual:#?}\n\nexpected:\n{expected:#?}"
620 );
621 }
622
623 fn output(&self) -> String {
625 match self.with_header {
626 WithHeader::Yes => self.output_with_header(true),
627 WithHeader::No => self.output_with_header(false),
628 WithHeader::Ignored => {
629 let output = self.output_with_header(true);
630 let output_without_header = self.output_with_header(false);
632 assert_eq!(
633 output, output_without_header,
634 "Expected output to be the same with or without header"
635 );
636 output
637 }
638 }
639 }
640
641 fn output_with_header(&self, with_header: bool) -> String {
642 let mut buffer: Vec<u8> = vec![];
643 self.format
644 .print_batches(
645 &mut buffer,
646 self.schema.clone(),
647 &self.batches,
648 self.maxrows,
649 with_header,
650 &FormatOptions::default(),
651 )
652 .unwrap();
653 String::from_utf8(buffer).unwrap()
654 }
655 }
656
657 fn three_column_schema() -> SchemaRef {
659 Arc::new(Schema::new(vec![
660 Field::new("a", DataType::Int32, false),
661 Field::new("b", DataType::Int32, false),
662 Field::new("c", DataType::Int32, false),
663 ]))
664 }
665
666 fn three_column_batch() -> RecordBatch {
668 RecordBatch::try_new(
669 three_column_schema(),
670 vec![
671 Arc::new(Int32Array::from(vec![1, 2, 3])),
672 Arc::new(Int32Array::from(vec![4, 5, 6])),
673 Arc::new(Int32Array::from(vec![7, 8, 9])),
674 ],
675 )
676 .unwrap()
677 }
678
679 fn one_column_schema() -> SchemaRef {
681 Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]))
682 }
683
684 fn one_column_batch() -> RecordBatch {
686 RecordBatch::try_new(
687 one_column_schema(),
688 vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
689 )
690 .unwrap()
691 }
692
693 fn split_batch(batch: RecordBatch) -> Vec<RecordBatch> {
695 assert!(batch.num_rows() > 1);
696 let split = batch.num_rows() / 2;
697 vec![
698 batch.slice(0, split),
699 batch.slice(split, batch.num_rows() - split),
700 ]
701 }
702}