swh_graph/compress/
orc.rs1use std::path::Path;
8use std::sync::Arc;
9
10use anyhow::{Context, Result};
11use ar_row::deserialize::{ArRowDeserialize, ArRowStruct};
12use arrow::array::RecordBatchReader;
13use arrow::datatypes::{DataType, Decimal128Type, DecimalType, Schema};
14use orc_rust::arrow_reader::ArrowReaderBuilder;
15use orc_rust::projection::ProjectionMask;
16use orc_rust::reader::ChunkReader;
17use rayon::prelude::*;
18
19pub(crate) const ORC_BATCH_SIZE: usize = 1024;
24
25pub(crate) fn get_dataset_readers<P: AsRef<Path>>(
26 dataset_dir: P,
27 subdirectory: &str,
28) -> Result<Vec<ArrowReaderBuilder<std::fs::File>>> {
29 let mut dataset_dir = dataset_dir.as_ref().to_owned();
30 dataset_dir.push(subdirectory);
31 std::fs::read_dir(&dataset_dir)
32 .with_context(|| format!("Could not list {}", dataset_dir.display()))?
33 .map(|file_path| {
34 let file_path = file_path
35 .with_context(|| format!("Failed to list {}", dataset_dir.display()))?
36 .path();
37 let file = std::fs::File::open(&file_path)
38 .with_context(|| format!("Could not open {}", file_path.display()))?;
39 let builder = ArrowReaderBuilder::try_new(file)
40 .with_context(|| format!("Could not read {}", file_path.display()))?;
41 Ok(builder)
42 })
43 .collect()
44}
45
46fn transform_schema(schema: &Schema) -> Arc<Schema> {
60 Arc::new(Schema::new(
61 schema
62 .fields()
63 .iter()
64 .cloned()
65 .map(|field| match field.data_type() {
66 DataType::Timestamp(_, _) => (*field)
67 .clone()
68 .with_data_type(DataType::Decimal128(Decimal128Type::MAX_SCALE as _, 9)),
70 _ => (*field).clone(),
71 })
72 .collect::<Vec<_>>(),
73 ))
74}
75
76pub(crate) fn iter_arrow<R: ChunkReader, T, IntoIterU, U, F>(
77 reader_builder: ArrowReaderBuilder<R>,
78 mut f: F,
79) -> impl Iterator<Item = U>
80where
81 F: FnMut(T) -> IntoIterU,
82 IntoIterU: IntoIterator<Item = U>,
83 T: ArRowDeserialize + ArRowStruct,
84{
85 let field_names = <T>::columns();
86 let projection = ProjectionMask::named_roots(
87 reader_builder.file_metadata().root_data_type(),
88 field_names.as_slice(),
89 );
90 let reader_builder = reader_builder
91 .with_projection(projection.clone())
92 .with_batch_size(ORC_BATCH_SIZE);
93
94 let schema = transform_schema(&reader_builder.schema());
95
96 let reader = reader_builder.with_schema(schema).build();
97
98 T::check_datatype(&DataType::Struct(reader.schema().fields().clone()))
99 .expect("Invalid data type in ORC file");
100
101 reader.flat_map(move |chunk| {
102 let chunk: arrow_array::RecordBatch =
103 chunk.unwrap_or_else(|e| panic!("Could not read chunk: {e}"));
104 let items: Vec<T> = T::from_record_batch(chunk).expect("Could not deserialize from arrow");
105 items.into_iter().flat_map(&mut f).collect::<Vec<_>>()
106 })
107}
108
109pub(crate) fn par_iter_arrow<R: ChunkReader + Send, T, IntoIterU, U: Send, F>(
110 reader_builder: ArrowReaderBuilder<R>,
111 f: F,
112) -> impl ParallelIterator<Item = U>
113where
114 F: Fn(T) -> IntoIterU + Send + Sync,
115 IntoIterU: IntoIterator<Item = U> + Send + Sync,
116 T: ArRowDeserialize + ArRowStruct + Send,
117{
118 let field_names = <T>::columns();
119 let projection = ProjectionMask::named_roots(
120 reader_builder.file_metadata().root_data_type(),
121 field_names.as_slice(),
122 );
123 let reader_builder = reader_builder
124 .with_projection(projection)
125 .with_batch_size(ORC_BATCH_SIZE);
126
127 let schema = transform_schema(&reader_builder.schema());
128
129 let reader = reader_builder.with_schema(schema).build();
130
131 T::check_datatype(&DataType::Struct(reader.schema().fields().clone()))
132 .expect("Invalid data type in ORC file");
133
134 reader.par_bridge().flat_map_iter(move |chunk| {
135 let chunk: arrow_array::RecordBatch =
136 chunk.unwrap_or_else(|e| panic!("Could not read chunk: {e}"));
137 let items: Vec<T> = T::from_record_batch(chunk).expect("Could not deserialize from arrow");
138 items.into_iter().flat_map(&f).collect::<Vec<_>>()
139 })
140}