1use std::io;
2use std::pin::Pin;
3use std::sync::Arc;
4
5use futures::Stream;
6use futures::TryStreamExt;
7
8use tokio::io::AsyncWrite;
9
10use arrow::datatypes::DataType;
11use arrow::datatypes::Field;
12use arrow::datatypes::Fields;
13use arrow::datatypes::Schema;
14
15use arrow::array::Array;
16use arrow::array::ArrayBuilder;
17use arrow::array::StringArray;
18use arrow::array::StringBuilder;
19
20use arrow::record_batch::RecordBatch;
21
22use parquet::arrow::AsyncArrowWriter;
23use parquet::file::properties::WriterProperties;
24
25pub fn builders2batch(
26 sch: Arc<Schema>,
27 bldrs: &mut [StringBuilder],
28) -> Result<RecordBatch, io::Error> {
29 let mut arrays: Vec<Arc<dyn Array>> = Vec::with_capacity(bldrs.len());
30 for bldr in bldrs.iter_mut() {
31 let sa: StringArray = bldr.finish();
32 arrays.push(Arc::new(sa));
33 }
34 RecordBatch::try_new(sch.clone(), arrays).map_err(io::Error::other)
35}
36
37pub fn lines2splited2batch<L>(
38 sch: Arc<Schema>,
39 mut lines: L,
40 delim: char,
41 bsize: usize,
42) -> Pin<Box<dyn Stream<Item = Result<RecordBatch, io::Error>>>>
43where
44 L: Unpin + Stream<Item = Result<String, io::Error>> + 'static,
45{
46 let s: &Schema = &sch;
47 let fields: &Fields = &s.fields;
48 let sz: usize = fields.len();
49
50 let strm = async_stream::try_stream! {
51 (0 < sz).then_some(()).ok_or("no columns defined").map_err(io::Error::other)?;
52
53 let mut builders: Vec<StringBuilder> = (0..sz).map(|_| StringBuilder::new()).collect();
54
55 loop {
56 for _ in 0..bsize {
57 let oline: Option<String> = lines.try_next().await?;
58 if oline.is_none() && builders[0].is_empty(){
59 return
60 }
61
62 if oline.is_none(){
63 let rb = builders2batch(sch.clone(), &mut builders)?;
64 yield rb;
65 return
66 }
67
68 let line: String = oline.ok_or("must not be none").map_err(io::Error::other)?;
69
70 let mut splited = line.splitn(sz, delim);
71 for bldr in builders.iter_mut(){
72 let col: &str = splited.next().ok_or("column missing").map_err(io::Error::other)?;
73 bldr.append_value(col);
74 }
75 }
76
77 if builders[0].is_empty(){
78 return
79 }
80
81 let rb = builders2batch(sch.clone(), &mut builders)?;
82 yield rb;
83 }
84 };
85 Box::pin(strm)
86}
87
88pub async fn batch2parquet<W>(
89 b: &RecordBatch,
90 wtr: &mut AsyncArrowWriter<W>,
91) -> Result<(), io::Error>
92where
93 W: Unpin + Send + tokio::io::AsyncWrite,
94{
95 wtr.write(b).await.map_err(io::Error::other)
96}
97
98pub async fn write_all<B, W>(b: B, wtr: &mut AsyncArrowWriter<W>) -> Result<(), io::Error>
99where
100 B: Stream<Item = Result<RecordBatch, io::Error>>,
101 W: Unpin + Send + tokio::io::AsyncWrite,
102{
103 b.try_fold(wtr, |wtr, next| async move {
104 let rb: &RecordBatch = &next;
105 batch2parquet(rb, wtr).await?;
106 Ok(wtr)
107 })
108 .await
109 .map(|_| ())
110}
111
112pub async fn lines2splited2batch2parquet<L, W>(
113 sch: Arc<Schema>,
114 lines: L,
115 delim: char,
116 bsize: usize,
117 wtr: &mut AsyncArrowWriter<W>,
118) -> Result<(), io::Error>
119where
120 L: Unpin + Stream<Item = Result<String, io::Error>> + 'static,
121 W: Unpin + Send + tokio::io::AsyncWrite,
122{
123 let strm = lines2splited2batch(sch, lines, delim, bsize);
124 write_all(strm, wtr).await
125}
126
127pub async fn opts2lines2splited2batch2parquet<L, W>(
129 sch: Arc<Schema>,
130 lines: L,
131 delim: char,
132 bsize: usize,
133 wtr: W,
134 opts: Option<WriterProperties>,
135) -> Result<W, io::Error>
136where
137 L: Unpin + Stream<Item = Result<String, io::Error>> + 'static,
138 W: Unpin + Send + AsyncWrite,
139{
140 let mut aw = AsyncArrowWriter::try_new(wtr, sch.clone(), opts)?;
141 lines2splited2batch2parquet(sch, lines, delim, bsize, &mut aw).await?;
142 aw.flush().await?;
143 aw.finish().await?;
144 Ok(aw.into_inner())
145}
146
147pub fn fields2schema(v: Vec<Field>) -> Schema {
148 Schema::new(v)
149}
150
151pub fn colindex2field(colix: usize, nullable: bool) -> Field {
152 let name = format!("column_{colix}");
153 Field::new(name, DataType::Utf8, nullable)
154}
155
156pub fn colsz2schema(colsz: usize, nullable: bool) -> Schema {
157 let v: Vec<Field> = (0..colsz).map(|sz| colindex2field(sz, nullable)).collect();
158 fields2schema(v)
159}
160
161pub const NULLABLE_DEFAULT: bool = true;
162pub const DELIM_DEFAULT: char = ',';
163pub const BATSZ_DEFAULT: usize = 1024;
164
165pub async fn opts2stdin2lines2splited2batch2parquet2stdout(
167 colsz: usize,
168 nullable: bool,
169 delim: char,
170 bsize: usize,
171 opts: Option<WriterProperties>,
172) -> Result<(), io::Error> {
173 let i = tokio::io::stdin();
174 let bi = tokio::io::BufReader::new(i);
175 let lines = tokio::io::AsyncBufReadExt::lines(bi);
176 let lstrm = tokio_stream::wrappers::LinesStream::new(lines);
177
178 let sch: Schema = colsz2schema(colsz, nullable);
179
180 let mut o = tokio::io::stdout();
181 let bw = tokio::io::BufWriter::new(&mut o);
182 let mut bw =
183 opts2lines2splited2batch2parquet(sch.into(), lstrm, delim, bsize, bw, opts).await?;
184 tokio::io::AsyncWriteExt::flush(&mut bw).await?;
185 tokio::io::AsyncWriteExt::flush(&mut o).await?;
186 Ok(())
187}
188
189pub async fn opts2stdin2lines2splited2batch2parquet2stdout_default(
190 colsz: usize,
191) -> Result<(), io::Error> {
192 opts2stdin2lines2splited2batch2parquet2stdout(
193 colsz,
194 NULLABLE_DEFAULT,
195 DELIM_DEFAULT,
196 BATSZ_DEFAULT,
197 None,
198 )
199 .await
200}
201
202#[cfg(test)]
203mod tests {
204 use std::io;
205
206 use arrow::array::StringArray;
207 use arrow::datatypes::{DataType, Field, Schema};
208 use futures::TryStreamExt;
209 use futures::stream;
210
211 use super::*;
212
213 fn vec_to_stream(
214 v: Vec<Result<String, io::Error>>,
215 ) -> impl Stream<Item = Result<String, io::Error>> {
216 stream::iter(v)
217 }
218
219 fn simple_schema() -> Arc<Schema> {
220 let fields = vec![
221 Field::new("col1", DataType::Utf8, false),
222 Field::new("col2", DataType::Utf8, false),
223 ];
224 Arc::new(Schema::new(fields))
225 }
226
227 #[test]
228 fn test_basic_split_and_batch() {
229 let lines = vec![
230 Ok("a1,b1".to_string()),
231 Ok("a2,b2".to_string()),
232 Ok("a3,b3".to_string()),
233 Ok("a4,b4".to_string()),
234 Ok("a5,b5".to_string()),
235 ];
236
237 let stream = lines2splited2batch(simple_schema(), vec_to_stream(lines), ',', 3);
238
239 let batches = futures::executor::block_on(stream.try_collect::<Vec<_>>())
240 .expect("stream should produce record batches");
241
242 assert_eq!(batches.len(), 2);
243
244 let rb1 = &batches[0];
245 assert_eq!(rb1.num_columns(), 2);
246 assert_eq!(rb1.num_rows(), 3);
247
248 let col1 = rb1
249 .column(0)
250 .as_any()
251 .downcast_ref::<StringArray>()
252 .expect("col1 should be a StringArray");
253 assert_eq!(col1.value(0), "a1");
254 assert_eq!(col1.value(1), "a2");
255 assert_eq!(col1.value(2), "a3");
256
257 let col2 = rb1
258 .column(1)
259 .as_any()
260 .downcast_ref::<StringArray>()
261 .expect("col2 should be a StringArray");
262 assert_eq!(col2.value(0), "b1");
263 assert_eq!(col2.value(1), "b2");
264 assert_eq!(col2.value(2), "b3");
265
266 let rb2 = &batches[1];
267 assert_eq!(rb2.num_columns(), 2);
268 assert_eq!(rb2.num_rows(), 2);
269
270 let col1 = rb2
271 .column(0)
272 .as_any()
273 .downcast_ref::<StringArray>()
274 .expect("col1 should be a StringArray");
275 assert_eq!(col1.value(0), "a4");
276 assert_eq!(col1.value(1), "a5");
277
278 let col2 = rb2
279 .column(1)
280 .as_any()
281 .downcast_ref::<StringArray>()
282 .expect("col2 should be a StringArray");
283 assert_eq!(col2.value(0), "b4");
284 assert_eq!(col2.value(1), "b5");
285 }
286
287 #[test]
288 fn test_empty_input() {
289 let lines: Vec<Result<String, io::Error>> = vec![];
290
291 let stream = lines2splited2batch(simple_schema(), vec_to_stream(lines), ',', 3);
292
293 let batches = futures::executor::block_on(stream.try_collect::<Vec<_>>())
294 .expect("stream should produce record batches");
295
296 assert!(batches.is_empty(), "empty input → no batches");
297 }
298
299 #[test]
300 fn test_missing_column_error() {
301 let lines = vec![Ok("only_one_column".to_string())];
302
303 let stream = lines2splited2batch(simple_schema(), vec_to_stream(lines), ',', 3);
304
305 let res = futures::executor::block_on(stream.try_collect::<Vec<_>>());
306
307 assert!(
308 res.is_err(),
309 "stream should error out when a column is missing"
310 );
311
312 let err = res.unwrap_err();
313 let msg = format!("{:?}", err);
314 assert!(
315 msg.contains("column missing"),
316 "error message should mention the missing column: {msg}"
317 );
318 }
319
320 #[test]
321 fn test_delimiter_variation() {
322 let lines = vec![Ok("foo;bar".to_string()), Ok("baz;qux".to_string())];
323
324 let stream = lines2splited2batch(simple_schema(), vec_to_stream(lines), ';', 10);
325
326 let batches = futures::executor::block_on(stream.try_collect::<Vec<_>>())
327 .expect("stream should produce record batches");
328
329 assert_eq!(batches.len(), 1);
330 let rb = &batches[0];
331 assert_eq!(rb.num_rows(), 2);
332
333 let col1 = rb
334 .column(0)
335 .as_any()
336 .downcast_ref::<StringArray>()
337 .expect("col1 should be a StringArray");
338 let col2 = rb
339 .column(1)
340 .as_any()
341 .downcast_ref::<StringArray>()
342 .expect("col2 should be a StringArray");
343
344 assert_eq!(col1.value(0), "foo");
345 assert_eq!(col1.value(1), "baz");
346 assert_eq!(col2.value(0), "bar");
347 assert_eq!(col2.value(1), "qux");
348 }
349}