1use arrow::csv::writer::WriterBuilder;
20use arrow::json::{ArrayWriter, LineDelimitedWriter};
21use datafusion::arrow::record_batch::RecordBatch;
22use datafusion::arrow::util::pretty;
23use datafusion::error::{DataFusionError, Result};
24use std::str::FromStr;
25
26#[derive(Debug, PartialEq, Eq, clap::ArgEnum, Clone)]
28pub enum PrintFormat {
29 Csv,
30 Tsv,
31 Table,
32 Json,
33 NdJson,
34}
35
36impl FromStr for PrintFormat {
37 type Err = String;
38
39 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
40 clap::ArgEnum::from_str(s, true)
41 }
42}
43
44macro_rules! batches_to_json {
45 ($WRITER: ident, $batches: expr) => {{
46 let mut bytes = vec![];
47 {
48 let mut writer = $WRITER::new(&mut bytes);
49 writer.write_batches($batches)?;
50 writer.finish()?;
51 }
52 String::from_utf8(bytes).map_err(|e| DataFusionError::Execution(e.to_string()))?
53 }};
54}
55
56fn print_batches_with_sep(batches: &[RecordBatch], delimiter: u8) -> Result<String> {
57 let mut bytes = vec![];
58 {
59 let builder = WriterBuilder::new()
60 .has_headers(true)
61 .with_delimiter(delimiter);
62 let mut writer = builder.build(&mut bytes);
63 for batch in batches {
64 writer.write(batch)?;
65 }
66 }
67 let formatted =
68 String::from_utf8(bytes).map_err(|e| DataFusionError::Execution(e.to_string()))?;
69 Ok(formatted)
70}
71
72impl PrintFormat {
73 pub fn print_batches(&self, batches: &[RecordBatch]) -> Result<()> {
75 match self {
76 Self::Csv => println!("{}", print_batches_with_sep(batches, b',')?),
77 Self::Tsv => println!("{}", print_batches_with_sep(batches, b'\t')?),
78 Self::Table => pretty::print_batches(batches)?,
79 Self::Json => println!("{}", batches_to_json!(ArrayWriter, batches)),
80 Self::NdJson => {
81 println!("{}", batches_to_json!(LineDelimitedWriter, batches))
82 }
83 }
84 Ok(())
85 }
86}
87
88#[cfg(test)]
89mod tests {
90 use super::*;
91 use arrow::array::Int32Array;
92 use arrow::datatypes::{DataType, Field, Schema};
93 use datafusion::from_slice::FromSlice;
94 use std::sync::Arc;
95
96 #[test]
97 fn test_print_batches_with_sep() {
98 let batches = vec![];
99 assert_eq!("", print_batches_with_sep(&batches, b',').unwrap());
100
101 let schema = Arc::new(Schema::new(vec![
102 Field::new("a", DataType::Int32, false),
103 Field::new("b", DataType::Int32, false),
104 Field::new("c", DataType::Int32, false),
105 ]));
106
107 let batch = RecordBatch::try_new(
108 schema,
109 vec![
110 Arc::new(Int32Array::from_slice(&[1, 2, 3])),
111 Arc::new(Int32Array::from_slice(&[4, 5, 6])),
112 Arc::new(Int32Array::from_slice(&[7, 8, 9])),
113 ],
114 )
115 .unwrap();
116
117 let batches = vec![batch];
118 let r = print_batches_with_sep(&batches, b',').unwrap();
119 assert_eq!("a,b,c\n1,4,7\n2,5,8\n3,6,9\n", r);
120 }
121
122 #[test]
123 fn test_print_batches_to_json_empty() -> Result<()> {
124 let batches = vec![];
125 let r = batches_to_json!(ArrayWriter, &batches);
126 assert_eq!("", r);
127
128 let r = batches_to_json!(LineDelimitedWriter, &batches);
129 assert_eq!("", r);
130
131 let schema = Arc::new(Schema::new(vec![
132 Field::new("a", DataType::Int32, false),
133 Field::new("b", DataType::Int32, false),
134 Field::new("c", DataType::Int32, false),
135 ]));
136
137 let batch = RecordBatch::try_new(
138 schema,
139 vec![
140 Arc::new(Int32Array::from_slice(&[1, 2, 3])),
141 Arc::new(Int32Array::from_slice(&[4, 5, 6])),
142 Arc::new(Int32Array::from_slice(&[7, 8, 9])),
143 ],
144 )
145 .unwrap();
146
147 let batches = vec![batch];
148 let r = batches_to_json!(ArrayWriter, &batches);
149 assert_eq!(
150 "[{\"a\":1,\"b\":4,\"c\":7},{\"a\":2,\"b\":5,\"c\":8},{\"a\":3,\"b\":6,\"c\":9}]",
151 r
152 );
153
154 let r = batches_to_json!(LineDelimitedWriter, &batches);
155 assert_eq!(
156 "{\"a\":1,\"b\":4,\"c\":7}\n{\"a\":2,\"b\":5,\"c\":8}\n{\"a\":3,\"b\":6,\"c\":9}\n",
157 r
158 );
159 Ok(())
160 }
161}