Skip to main content

rs_splited2parquet/
lib.rs

1use std::io;
2use std::pin::Pin;
3use std::sync::Arc;
4
5use futures::Stream;
6use futures::TryStreamExt;
7
8use tokio::io::AsyncWrite;
9
10use arrow::datatypes::DataType;
11use arrow::datatypes::Field;
12use arrow::datatypes::Fields;
13use arrow::datatypes::Schema;
14
15use arrow::array::Array;
16use arrow::array::ArrayBuilder;
17use arrow::array::StringArray;
18use arrow::array::StringBuilder;
19
20use arrow::record_batch::RecordBatch;
21
22use parquet::arrow::AsyncArrowWriter;
23use parquet::file::properties::WriterProperties;
24
25pub fn builders2batch(
26    sch: Arc<Schema>,
27    bldrs: &mut [StringBuilder],
28) -> Result<RecordBatch, io::Error> {
29    let mut arrays: Vec<Arc<dyn Array>> = Vec::with_capacity(bldrs.len());
30    for bldr in bldrs.iter_mut() {
31        let sa: StringArray = bldr.finish();
32        arrays.push(Arc::new(sa));
33    }
34    RecordBatch::try_new(sch.clone(), arrays).map_err(io::Error::other)
35}
36
37pub fn lines2splited2batch<L>(
38    sch: Arc<Schema>,
39    mut lines: L,
40    delim: char,
41    bsize: usize,
42) -> Pin<Box<dyn Stream<Item = Result<RecordBatch, io::Error>>>>
43where
44    L: Unpin + Stream<Item = Result<String, io::Error>> + 'static,
45{
46    let s: &Schema = &sch;
47    let fields: &Fields = &s.fields;
48    let sz: usize = fields.len();
49
50    let strm = async_stream::try_stream! {
51        (0 < sz).then_some(()).ok_or("no columns defined").map_err(io::Error::other)?;
52
53        let mut builders: Vec<StringBuilder> = (0..sz).map(|_| StringBuilder::new()).collect();
54
55        loop {
56            for _ in 0..bsize {
57                let oline: Option<String> = lines.try_next().await?;
58                if oline.is_none() && builders[0].is_empty(){
59                    return
60                }
61
62                if oline.is_none(){
63                    let rb = builders2batch(sch.clone(), &mut builders)?;
64                    yield rb;
65                    return
66                }
67
68                let line: String = oline.ok_or("must not be none").map_err(io::Error::other)?;
69
70                let mut splited = line.splitn(sz, delim);
71                for bldr in builders.iter_mut(){
72                    let col: &str = splited.next().ok_or("column missing").map_err(io::Error::other)?;
73                    bldr.append_value(col);
74                }
75            }
76
77            if builders[0].is_empty(){
78                return
79            }
80
81            let rb = builders2batch(sch.clone(), &mut builders)?;
82            yield rb;
83        }
84    };
85    Box::pin(strm)
86}
87
88pub async fn batch2parquet<W>(
89    b: &RecordBatch,
90    wtr: &mut AsyncArrowWriter<W>,
91) -> Result<(), io::Error>
92where
93    W: Unpin + Send + tokio::io::AsyncWrite,
94{
95    wtr.write(b).await.map_err(io::Error::other)
96}
97
98pub async fn write_all<B, W>(b: B, wtr: &mut AsyncArrowWriter<W>) -> Result<(), io::Error>
99where
100    B: Stream<Item = Result<RecordBatch, io::Error>>,
101    W: Unpin + Send + tokio::io::AsyncWrite,
102{
103    b.try_fold(wtr, |wtr, next| async move {
104        let rb: &RecordBatch = &next;
105        batch2parquet(rb, wtr).await?;
106        Ok(wtr)
107    })
108    .await
109    .map(|_| ())
110}
111
112pub async fn lines2splited2batch2parquet<L, W>(
113    sch: Arc<Schema>,
114    lines: L,
115    delim: char,
116    bsize: usize,
117    wtr: &mut AsyncArrowWriter<W>,
118) -> Result<(), io::Error>
119where
120    L: Unpin + Stream<Item = Result<String, io::Error>> + 'static,
121    W: Unpin + Send + tokio::io::AsyncWrite,
122{
123    let strm = lines2splited2batch(sch, lines, delim, bsize);
124    write_all(strm, wtr).await
125}
126
127/// Writes a parquet from the splited lines using the opts.
128pub async fn opts2lines2splited2batch2parquet<L, W>(
129    sch: Arc<Schema>,
130    lines: L,
131    delim: char,
132    bsize: usize,
133    wtr: W,
134    opts: Option<WriterProperties>,
135) -> Result<W, io::Error>
136where
137    L: Unpin + Stream<Item = Result<String, io::Error>> + 'static,
138    W: Unpin + Send + AsyncWrite,
139{
140    let mut aw = AsyncArrowWriter::try_new(wtr, sch.clone(), opts)?;
141    lines2splited2batch2parquet(sch, lines, delim, bsize, &mut aw).await?;
142    aw.flush().await?;
143    aw.finish().await?;
144    Ok(aw.into_inner())
145}
146
147pub fn fields2schema(v: Vec<Field>) -> Schema {
148    Schema::new(v)
149}
150
151pub fn colindex2field(colix: usize, nullable: bool) -> Field {
152    let name = format!("column_{colix}");
153    Field::new(name, DataType::Utf8, nullable)
154}
155
156pub fn colsz2schema(colsz: usize, nullable: bool) -> Schema {
157    let v: Vec<Field> = (0..colsz).map(|sz| colindex2field(sz, nullable)).collect();
158    fields2schema(v)
159}
160
161pub const NULLABLE_DEFAULT: bool = true;
162pub const DELIM_DEFAULT: char = ',';
163pub const BATSZ_DEFAULT: usize = 1024;
164
165/// Writes a parquet from the splited stdin using the opts.
166pub async fn opts2stdin2lines2splited2batch2parquet2stdout(
167    colsz: usize,
168    nullable: bool,
169    delim: char,
170    bsize: usize,
171    opts: Option<WriterProperties>,
172) -> Result<(), io::Error> {
173    let i = tokio::io::stdin();
174    let bi = tokio::io::BufReader::new(i);
175    let lines = tokio::io::AsyncBufReadExt::lines(bi);
176    let lstrm = tokio_stream::wrappers::LinesStream::new(lines);
177
178    let sch: Schema = colsz2schema(colsz, nullable);
179
180    let mut o = tokio::io::stdout();
181    let bw = tokio::io::BufWriter::new(&mut o);
182    let mut bw =
183        opts2lines2splited2batch2parquet(sch.into(), lstrm, delim, bsize, bw, opts).await?;
184    tokio::io::AsyncWriteExt::flush(&mut bw).await?;
185    tokio::io::AsyncWriteExt::flush(&mut o).await?;
186    Ok(())
187}
188
189pub async fn opts2stdin2lines2splited2batch2parquet2stdout_default(
190    colsz: usize,
191) -> Result<(), io::Error> {
192    opts2stdin2lines2splited2batch2parquet2stdout(
193        colsz,
194        NULLABLE_DEFAULT,
195        DELIM_DEFAULT,
196        BATSZ_DEFAULT,
197        None,
198    )
199    .await
200}
201
202#[cfg(test)]
203mod tests {
204    use std::io;
205
206    use arrow::array::StringArray;
207    use arrow::datatypes::{DataType, Field, Schema};
208    use futures::TryStreamExt;
209    use futures::stream;
210
211    use super::*;
212
213    fn vec_to_stream(
214        v: Vec<Result<String, io::Error>>,
215    ) -> impl Stream<Item = Result<String, io::Error>> {
216        stream::iter(v)
217    }
218
219    fn simple_schema() -> Arc<Schema> {
220        let fields = vec![
221            Field::new("col1", DataType::Utf8, false),
222            Field::new("col2", DataType::Utf8, false),
223        ];
224        Arc::new(Schema::new(fields))
225    }
226
227    #[test]
228    fn test_basic_split_and_batch() {
229        let lines = vec![
230            Ok("a1,b1".to_string()),
231            Ok("a2,b2".to_string()),
232            Ok("a3,b3".to_string()),
233            Ok("a4,b4".to_string()),
234            Ok("a5,b5".to_string()),
235        ];
236
237        let stream = lines2splited2batch(simple_schema(), vec_to_stream(lines), ',', 3);
238
239        let batches = futures::executor::block_on(stream.try_collect::<Vec<_>>())
240            .expect("stream should produce record batches");
241
242        assert_eq!(batches.len(), 2);
243
244        let rb1 = &batches[0];
245        assert_eq!(rb1.num_columns(), 2);
246        assert_eq!(rb1.num_rows(), 3);
247
248        let col1 = rb1
249            .column(0)
250            .as_any()
251            .downcast_ref::<StringArray>()
252            .expect("col1 should be a StringArray");
253        assert_eq!(col1.value(0), "a1");
254        assert_eq!(col1.value(1), "a2");
255        assert_eq!(col1.value(2), "a3");
256
257        let col2 = rb1
258            .column(1)
259            .as_any()
260            .downcast_ref::<StringArray>()
261            .expect("col2 should be a StringArray");
262        assert_eq!(col2.value(0), "b1");
263        assert_eq!(col2.value(1), "b2");
264        assert_eq!(col2.value(2), "b3");
265
266        let rb2 = &batches[1];
267        assert_eq!(rb2.num_columns(), 2);
268        assert_eq!(rb2.num_rows(), 2);
269
270        let col1 = rb2
271            .column(0)
272            .as_any()
273            .downcast_ref::<StringArray>()
274            .expect("col1 should be a StringArray");
275        assert_eq!(col1.value(0), "a4");
276        assert_eq!(col1.value(1), "a5");
277
278        let col2 = rb2
279            .column(1)
280            .as_any()
281            .downcast_ref::<StringArray>()
282            .expect("col2 should be a StringArray");
283        assert_eq!(col2.value(0), "b4");
284        assert_eq!(col2.value(1), "b5");
285    }
286
287    #[test]
288    fn test_empty_input() {
289        let lines: Vec<Result<String, io::Error>> = vec![];
290
291        let stream = lines2splited2batch(simple_schema(), vec_to_stream(lines), ',', 3);
292
293        let batches = futures::executor::block_on(stream.try_collect::<Vec<_>>())
294            .expect("stream should produce record batches");
295
296        assert!(batches.is_empty(), "empty input → no batches");
297    }
298
299    #[test]
300    fn test_missing_column_error() {
301        let lines = vec![Ok("only_one_column".to_string())];
302
303        let stream = lines2splited2batch(simple_schema(), vec_to_stream(lines), ',', 3);
304
305        let res = futures::executor::block_on(stream.try_collect::<Vec<_>>());
306
307        assert!(
308            res.is_err(),
309            "stream should error out when a column is missing"
310        );
311
312        let err = res.unwrap_err();
313        let msg = format!("{:?}", err);
314        assert!(
315            msg.contains("column missing"),
316            "error message should mention the missing column: {msg}"
317        );
318    }
319
320    #[test]
321    fn test_delimiter_variation() {
322        let lines = vec![Ok("foo;bar".to_string()), Ok("baz;qux".to_string())];
323
324        let stream = lines2splited2batch(simple_schema(), vec_to_stream(lines), ';', 10);
325
326        let batches = futures::executor::block_on(stream.try_collect::<Vec<_>>())
327            .expect("stream should produce record batches");
328
329        assert_eq!(batches.len(), 1);
330        let rb = &batches[0];
331        assert_eq!(rb.num_rows(), 2);
332
333        let col1 = rb
334            .column(0)
335            .as_any()
336            .downcast_ref::<StringArray>()
337            .expect("col1 should be a StringArray");
338        let col2 = rb
339            .column(1)
340            .as_any()
341            .downcast_ref::<StringArray>()
342            .expect("col2 should be a StringArray");
343
344        assert_eq!(col1.value(0), "foo");
345        assert_eq!(col1.value(1), "baz");
346        assert_eq!(col2.value(0), "bar");
347        assert_eq!(col2.value(1), "qux");
348    }
349}