1use std::str::FromStr;
21
22use crate::print_options::MaxRows;
23
24use arrow::csv::writer::WriterBuilder;
25use arrow::datatypes::SchemaRef;
26use arrow::json::{ArrayWriter, LineDelimitedWriter};
27use arrow::record_batch::RecordBatch;
28use arrow::util::pretty::pretty_format_batches_with_options;
29use datafusion::config::FormatOptions;
30use datafusion::error::Result;
31
32#[derive(Debug, PartialEq, Eq, clap::ValueEnum, Clone, Copy)]
34pub enum PrintFormat {
35 Csv,
36 Tsv,
37 Table,
38 Json,
39 NdJson,
40 Automatic,
41}
42
43impl FromStr for PrintFormat {
44 type Err = String;
45
46 fn from_str(s: &str) -> Result<Self, Self::Err> {
47 clap::ValueEnum::from_str(s, true)
48 }
49}
50
51macro_rules! batches_to_json {
52 ($WRITER: ident, $writer: expr, $batches: expr) => {{
53 {
54 if !$batches.is_empty() {
55 let mut json_writer = $WRITER::new(&mut *$writer);
56 for batch in $batches {
57 json_writer.write(batch)?;
58 }
59 json_writer.finish()?;
60 json_finish!($WRITER, $writer);
61 }
62 }
63 Ok(()) as Result<()>
64 }};
65}
66
67macro_rules! json_finish {
68 (ArrayWriter, $writer: expr) => {{
69 writeln!($writer)?;
70 }};
71 (LineDelimitedWriter, $writer: expr) => {{}};
72}
73
74fn print_batches_with_sep<W: std::io::Write>(
75 writer: &mut W,
76 batches: &[RecordBatch],
77 delimiter: u8,
78 with_header: bool,
79) -> Result<()> {
80 let builder = WriterBuilder::new()
81 .with_header(with_header)
82 .with_delimiter(delimiter);
83 let mut csv_writer = builder.build(writer);
84
85 for batch in batches {
86 csv_writer.write(batch)?;
87 }
88
89 Ok(())
90}
91
92fn keep_only_maxrows(s: &str, maxrows: usize) -> String {
93 let lines: Vec<String> = s.lines().map(String::from).collect();
94
95 assert!(lines.len() >= maxrows + 4); let last_line = &lines[lines.len() - 1]; let spaces = last_line.len().saturating_sub(4);
100 let dotted_line = format!("| .{:<spaces$}|", "", spaces = spaces);
101
102 let mut result = lines[0..(maxrows + 3)].to_vec(); result.extend(vec![dotted_line; 3]); result.push(last_line.clone());
105
106 result.join("\n")
107}
108
109fn format_batches_with_maxrows<W: std::io::Write>(
110 writer: &mut W,
111 batches: &[RecordBatch],
112 maxrows: MaxRows,
113 format_options: &FormatOptions,
114) -> Result<()> {
115 let options: arrow::util::display::FormatOptions = format_options.try_into()?;
116
117 match maxrows {
118 MaxRows::Limited(maxrows) => {
119 let mut filtered_batches = Vec::new();
121 let mut row_count: usize = 0;
122 let mut over_limit = false;
123 for batch in batches {
124 if row_count + batch.num_rows() > maxrows {
125 let limit = maxrows - row_count;
127 let sliced_batch = batch.slice(0, limit);
128 filtered_batches.push(sliced_batch);
129 over_limit = true;
130 break;
131 } else {
132 filtered_batches.push(batch.clone());
133 row_count += batch.num_rows();
134 }
135 }
136
137 let formatted =
138 pretty_format_batches_with_options(&filtered_batches, &options)?;
139 if over_limit {
140 let mut formatted_str = format!("{formatted}");
141 formatted_str = keep_only_maxrows(&formatted_str, maxrows);
142 writeln!(writer, "{formatted_str}")?;
143 } else {
144 writeln!(writer, "{formatted}")?;
145 }
146 }
147 MaxRows::Unlimited => {
148 let formatted = pretty_format_batches_with_options(batches, &options)?;
149 writeln!(writer, "{formatted}")?;
150 }
151 }
152
153 Ok(())
154}
155
156impl PrintFormat {
157 pub fn print_batches<W: std::io::Write>(
159 &self,
160 writer: &mut W,
161 schema: SchemaRef,
162 batches: &[RecordBatch],
163 maxrows: MaxRows,
164 with_header: bool,
165 format_options: &FormatOptions,
166 ) -> Result<()> {
167 let batches: Vec<_> = batches
169 .iter()
170 .filter(|b| b.num_rows() > 0)
171 .cloned()
172 .collect();
173 if batches.is_empty() {
174 return self.print_empty(writer, schema, format_options);
175 }
176
177 match self {
178 Self::Csv | Self::Automatic => {
179 print_batches_with_sep(writer, &batches, b',', with_header)
180 }
181 Self::Tsv => print_batches_with_sep(writer, &batches, b'\t', with_header),
182 Self::Table => {
183 if maxrows == MaxRows::Limited(0) {
184 return Ok(());
185 }
186 format_batches_with_maxrows(writer, &batches, maxrows, format_options)
187 }
188 Self::Json => batches_to_json!(ArrayWriter, writer, &batches),
189 Self::NdJson => batches_to_json!(LineDelimitedWriter, writer, &batches),
190 }
191 }
192
193 fn print_empty<W: std::io::Write>(
195 &self,
196 writer: &mut W,
197 schema: SchemaRef,
198 format_options: &FormatOptions,
199 ) -> Result<()> {
200 match self {
201 Self::Table if !schema.fields().is_empty() => {
203 let format_options: arrow::util::display::FormatOptions =
204 format_options.try_into()?;
205
206 let empty_batch = RecordBatch::new_empty(schema);
207 let formatted =
208 pretty_format_batches_with_options(&[empty_batch], &format_options)?;
209 writeln!(writer, "{formatted}")?;
210 }
211 _ => {}
212 }
213 Ok(())
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220 use std::sync::Arc;
221
222 use arrow::array::Int32Array;
223 use arrow::datatypes::{DataType, Field, Schema};
224 use insta::{allow_duplicates, assert_snapshot};
225
226 #[test]
227 fn print_empty() {
228 for format in [
229 PrintFormat::Csv,
230 PrintFormat::Tsv,
231 PrintFormat::Json,
232 PrintFormat::NdJson,
233 PrintFormat::Automatic,
234 ] {
235 let output = PrintBatchesTest::new()
237 .with_format(format)
238 .with_schema(three_column_schema())
239 .with_batches(vec![])
240 .run();
241 assert_eq!(output, "")
242 }
243
244 let output = PrintBatchesTest::new()
246 .with_format(PrintFormat::Table)
247 .with_schema(three_column_schema())
248 .with_batches(vec![])
249 .run();
250 assert_snapshot!(output, @r#"
251 +---+---+---+
252 | a | b | c |
253 +---+---+---+
254 +---+---+---+
255 "#);
256 }
257
258 #[test]
259 fn print_csv_no_header() {
260 let output = PrintBatchesTest::new()
261 .with_format(PrintFormat::Csv)
262 .with_batches(split_batch(three_column_batch()))
263 .with_header(WithHeader::No)
264 .run();
265 assert_snapshot!(output, @r#"
266 1,4,7
267 2,5,8
268 3,6,9
269 "#);
270 }
271
272 #[test]
273 fn print_csv_with_header() {
274 let output = PrintBatchesTest::new()
275 .with_format(PrintFormat::Csv)
276 .with_batches(split_batch(three_column_batch()))
277 .with_header(WithHeader::Yes)
278 .run();
279 assert_snapshot!(output, @r#"
280 a,b,c
281 1,4,7
282 2,5,8
283 3,6,9
284 "#);
285 }
286
287 #[test]
288 fn print_tsv_no_header() {
289 let output = PrintBatchesTest::new()
290 .with_format(PrintFormat::Tsv)
291 .with_batches(split_batch(three_column_batch()))
292 .with_header(WithHeader::No)
293 .run();
294 assert_snapshot!(output, @"
295 1\t4\t7
296 2\t5\t8
297 3\t6\t9
298 ")
299 }
300
301 #[test]
302 fn print_tsv_with_header() {
303 let output = PrintBatchesTest::new()
304 .with_format(PrintFormat::Tsv)
305 .with_batches(split_batch(three_column_batch()))
306 .with_header(WithHeader::Yes)
307 .run();
308 assert_snapshot!(output, @"
309 a\tb\tc
310 1\t4\t7
311 2\t5\t8
312 3\t6\t9
313 ");
314 }
315
316 #[test]
317 fn print_table() {
318 let output = PrintBatchesTest::new()
319 .with_format(PrintFormat::Table)
320 .with_batches(split_batch(three_column_batch()))
321 .with_header(WithHeader::Ignored)
322 .run();
323 assert_snapshot!(output, @r#"
324 +---+---+---+
325 | a | b | c |
326 +---+---+---+
327 | 1 | 4 | 7 |
328 | 2 | 5 | 8 |
329 | 3 | 6 | 9 |
330 +---+---+---+
331 "#);
332 }
333 #[test]
334 fn print_json() {
335 let output = PrintBatchesTest::new()
336 .with_format(PrintFormat::Json)
337 .with_batches(split_batch(three_column_batch()))
338 .with_header(WithHeader::Ignored)
339 .run();
340 assert_snapshot!(output, @r#"
341 [{"a":1,"b":4,"c":7},{"a":2,"b":5,"c":8},{"a":3,"b":6,"c":9}]
342 "#);
343 }
344
345 #[test]
346 fn print_ndjson() {
347 let output = PrintBatchesTest::new()
348 .with_format(PrintFormat::NdJson)
349 .with_batches(split_batch(three_column_batch()))
350 .with_header(WithHeader::Ignored)
351 .run();
352 assert_snapshot!(output, @r#"
353 {"a":1,"b":4,"c":7}
354 {"a":2,"b":5,"c":8}
355 {"a":3,"b":6,"c":9}
356 "#);
357 }
358
359 #[test]
360 fn print_automatic_no_header() {
361 let output = PrintBatchesTest::new()
362 .with_format(PrintFormat::Automatic)
363 .with_batches(split_batch(three_column_batch()))
364 .with_header(WithHeader::No)
365 .run();
366 assert_snapshot!(output, @r#"
367 1,4,7
368 2,5,8
369 3,6,9
370 "#);
371 }
372 #[test]
373 fn print_automatic_with_header() {
374 let output = PrintBatchesTest::new()
375 .with_format(PrintFormat::Automatic)
376 .with_batches(split_batch(three_column_batch()))
377 .with_header(WithHeader::Yes)
378 .run();
379 assert_snapshot!(output, @r#"
380 a,b,c
381 1,4,7
382 2,5,8
383 3,6,9
384 "#);
385 }
386
387 #[test]
388 fn print_maxrows_unlimited() {
389 for max_rows in [MaxRows::Unlimited, MaxRows::Limited(5), MaxRows::Limited(3)] {
392 let output = PrintBatchesTest::new()
393 .with_format(PrintFormat::Table)
394 .with_schema(one_column_schema())
395 .with_batches(vec![one_column_batch()])
396 .with_maxrows(max_rows)
397 .run();
398 allow_duplicates! {
399 assert_snapshot!(output, @r#"
400 +---+
401 | a |
402 +---+
403 | 1 |
404 | 2 |
405 | 3 |
406 +---+
407 "#);
408 }
409 }
410 }
411
412 #[test]
413 fn print_maxrows_limited_one_batch() {
414 let output = PrintBatchesTest::new()
415 .with_format(PrintFormat::Table)
416 .with_batches(vec![one_column_batch()])
417 .with_maxrows(MaxRows::Limited(1))
418 .run();
419 assert_snapshot!(output, @r#"
420 +---+
421 | a |
422 +---+
423 | 1 |
424 | . |
425 | . |
426 | . |
427 +---+
428 "#);
429 }
430
431 #[test]
432 fn print_maxrows_limited_multi_batched() {
433 let output = PrintBatchesTest::new()
434 .with_format(PrintFormat::Table)
435 .with_batches(vec![
436 one_column_batch(),
437 one_column_batch(),
438 one_column_batch(),
439 ])
440 .with_maxrows(MaxRows::Limited(5))
441 .run();
442 assert_snapshot!(output, @r#"
443 +---+
444 | a |
445 +---+
446 | 1 |
447 | 2 |
448 | 3 |
449 | 1 |
450 | 2 |
451 | . |
452 | . |
453 | . |
454 +---+
455 "#);
456 }
457
458 #[test]
459 fn test_print_batches_empty_batches() {
460 let batch = one_column_batch();
461 let empty_batch = RecordBatch::new_empty(batch.schema());
462
463 let output = PrintBatchesTest::new()
464 .with_format(PrintFormat::Table)
465 .with_batches(vec![empty_batch.clone(), batch, empty_batch])
466 .run();
467 assert_snapshot!(output, @r#"
468 +---+
469 | a |
470 +---+
471 | 1 |
472 | 2 |
473 | 3 |
474 +---+
475 "#);
476 }
477
478 #[test]
479 fn test_print_batches_empty_batch() {
480 let empty_batch = RecordBatch::new_empty(one_column_batch().schema());
481
482 let output = PrintBatchesTest::new()
484 .with_format(PrintFormat::Table)
485 .with_schema(one_column_schema())
486 .with_batches(vec![empty_batch])
487 .with_header(WithHeader::Yes)
488 .run();
489 assert_snapshot!(output, @r#"
490 +---+
491 | a |
492 +---+
493 +---+
494 "#);
495
496 let empty_batch = RecordBatch::new_empty(Arc::new(Schema::empty()));
498 let output = PrintBatchesTest::new()
499 .with_format(PrintFormat::Table)
500 .with_schema(Arc::new(Schema::empty()))
501 .with_batches(vec![empty_batch])
502 .with_header(WithHeader::Yes)
503 .run();
504 assert_eq!(output, "")
505 }
506
507 #[derive(Debug)]
508 struct PrintBatchesTest {
509 format: PrintFormat,
510 schema: SchemaRef,
511 batches: Vec<RecordBatch>,
512 maxrows: MaxRows,
513 with_header: WithHeader,
514 }
515
516 #[derive(Debug, Clone)]
518 enum WithHeader {
519 Yes,
520 No,
521 Ignored,
523 }
524
525 impl PrintBatchesTest {
526 fn new() -> Self {
527 Self {
528 format: PrintFormat::Table,
529 schema: Arc::new(Schema::empty()),
530 batches: vec![],
531 maxrows: MaxRows::Unlimited,
532 with_header: WithHeader::Ignored,
533 }
534 }
535
536 fn with_format(mut self, format: PrintFormat) -> Self {
538 self.format = format;
539 self
540 }
541
542 fn with_schema(mut self, schema: SchemaRef) -> Self {
544 self.schema = schema;
545 self
546 }
547
548 fn with_batches(mut self, batches: Vec<RecordBatch>) -> Self {
550 self.batches = batches;
551 self
552 }
553
554 fn with_maxrows(mut self, maxrows: MaxRows) -> Self {
556 self.maxrows = maxrows;
557 self
558 }
559
560 fn with_header(mut self, with_header: WithHeader) -> Self {
562 self.with_header = with_header;
563 self
564 }
565
566 fn run(self) -> String {
569 match self.with_header {
570 WithHeader::Yes => self.output_with_header(true),
571 WithHeader::No => self.output_with_header(false),
572 WithHeader::Ignored => {
573 let output = self.output_with_header(true);
574 let output_without_header = self.output_with_header(false);
576 assert_eq!(
577 output, output_without_header,
578 "Expected output to be the same with or without header"
579 );
580 output
581 }
582 }
583 }
584
585 fn output_with_header(&self, with_header: bool) -> String {
586 let mut buffer: Vec<u8> = vec![];
587 self.format
588 .print_batches(
589 &mut buffer,
590 self.schema.clone(),
591 &self.batches,
592 self.maxrows,
593 with_header,
594 &FormatOptions::default(),
595 )
596 .unwrap();
597 String::from_utf8(buffer).unwrap()
598 }
599 }
600
601 fn three_column_schema() -> SchemaRef {
603 Arc::new(Schema::new(vec![
604 Field::new("a", DataType::Int32, false),
605 Field::new("b", DataType::Int32, false),
606 Field::new("c", DataType::Int32, false),
607 ]))
608 }
609
610 fn three_column_batch() -> RecordBatch {
612 RecordBatch::try_new(
613 three_column_schema(),
614 vec![
615 Arc::new(Int32Array::from(vec![1, 2, 3])),
616 Arc::new(Int32Array::from(vec![4, 5, 6])),
617 Arc::new(Int32Array::from(vec![7, 8, 9])),
618 ],
619 )
620 .unwrap()
621 }
622
623 fn one_column_schema() -> SchemaRef {
625 Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]))
626 }
627
628 fn one_column_batch() -> RecordBatch {
630 RecordBatch::try_new(
631 one_column_schema(),
632 vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
633 )
634 .unwrap()
635 }
636
637 fn split_batch(batch: RecordBatch) -> Vec<RecordBatch> {
639 assert!(batch.num_rows() > 1);
640 let split = batch.num_rows() / 2;
641 vec![
642 batch.slice(0, split),
643 batch.slice(split, batch.num_rows() - split),
644 ]
645 }
646}