1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
use std::time::SystemTime;
use futures::future::BoxFuture;
use tokio::fs::File;
use tokio::io::BufReader;
use tokio_util::compat::*;
use arrow2::error::Result;
use arrow2::io::parquet::read::{self, RowGroupDeserializer};
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
let start = SystemTime::now();
use std::env;
let args: Vec<String> = env::args().collect();
let file_path = Box::new(args[1].clone());
// # Read metadata
let mut reader = BufReader::new(File::open(file_path.as_ref()).await?).compat();
// this operation is usually done before reading the data, during planning.
// This is a mix of IO and CPU-bounded tasks but both of them are O(1)
let metadata = read::read_metadata_async(&mut reader).await?;
let schema = read::infer_schema(&metadata)?;
// This factory yields one file descriptor per column and is used to read columns concurrently.
// They do not need to be buffered since we execute exactly 1 seek and 1 read on them.
let factory = || {
Box::pin(async { Ok(File::open(file_path.clone().as_ref()).await?.compat()) })
as BoxFuture<_>
};
// This is the row group loop. Groups can be skipped based on the statistics they carry.
for row_group in &metadata.row_groups {
// A row group is consumed in two steps: the first step is to read the (compressed)
// columns into memory, which is IO-bounded.
let column_chunks = read::read_columns_many_async(
factory,
row_group,
schema.fields.clone(),
None,
None,
None,
)
.await?;
// the second step is to iterate over the columns in chunks.
// this operation is CPU-bounded and should be sent to a separate thread pool (e.g. `tokio_rayon`) to not block
// the runtime.
// Furthermore, this operation is trivially paralellizable e.g. via rayon, as each iterator
// can be advanced in parallel (parallel decompression and deserialization).
let chunks = RowGroupDeserializer::new(column_chunks, row_group.num_rows(), None);
for maybe_chunk in chunks {
let chunk = maybe_chunk?;
println!("{}", chunk.len());
}
}
println!("took: {} ms", start.elapsed().unwrap().as_millis());
Ok(())
}