swh_graph/compress/
orc.rs

1// Copyright (C) 2023-2024  The Software Heritage developers
2// See the AUTHORS file at the top-level directory of this distribution
3// License: GNU General Public License version 3, or any later version
4// See top-level LICENSE file for more information
5
6//! Readers for the ORC dataset.
7use std::path::Path;
8use std::sync::Arc;
9
10use anyhow::{Context, Result};
11use ar_row::deserialize::{ArRowDeserialize, ArRowStruct};
12use arrow::array::RecordBatchReader;
13use arrow::datatypes::{DataType, Decimal128Type, DecimalType, Schema};
14use orc_rust::arrow_reader::ArrowReaderBuilder;
15use orc_rust::projection::ProjectionMask;
16use orc_rust::reader::ChunkReader;
17use rayon::prelude::*;
18
19/// The value was computed experimentally to minimize both run time and memory,
20/// by running `swh-graph-extract extract-nodes` on the 2023-09-06 dataset,
21/// on Software Heritage's Maxxi computer (Xeon Gold 6342 CPU @ 2.80GHz,
22/// 96 threads, 4TB RAM)
23pub(crate) const ORC_BATCH_SIZE: usize = 1024;
24
25pub(crate) fn get_dataset_readers<P: AsRef<Path>>(
26    dataset_dir: P,
27    subdirectory: &str,
28) -> Result<Vec<ArrowReaderBuilder<std::fs::File>>> {
29    let mut dataset_dir = dataset_dir.as_ref().to_owned();
30    dataset_dir.push(subdirectory);
31    std::fs::read_dir(&dataset_dir)
32        .with_context(|| format!("Could not list {}", dataset_dir.display()))?
33        .map(|file_path| {
34            let file_path = file_path
35                .with_context(|| format!("Failed to list {}", dataset_dir.display()))?
36                .path();
37            let file = std::fs::File::open(&file_path)
38                .with_context(|| format!("Could not open {}", file_path.display()))?;
39            let builder = ArrowReaderBuilder::try_new(file)
40                .with_context(|| format!("Could not read {}", file_path.display()))?;
41            Ok(builder)
42        })
43        .collect()
44}
45
46/// Transforms a schema inferred by orc-rust in a way that can be used to read
47/// the Software Heritage dataset
48///
49/// Specifically, we need to represent timestamps as microseconds instead of nanoseconds,
50/// in order not to overflow Arrow's internal representation (i64) from timestamps in
51/// ORC files (i64 seconds and nanoseconds).
52/// SWH's data model allows precision up to the microsecond
53/// (https://docs.softwareheritage.org/devel/apidoc/swh.model.model.html#swh.model.model.Timestamp)
54/// so there is no loss of precision; and swh-storage in practice stores timestamps
55/// as a single i64, so overflows should not be possible.
56///
57/// Scratch that, some jokers wrote dates that overflow an i64 of microseconds, so
58/// we have to use Decimal128 for the full range.
59fn transform_schema(schema: &Schema) -> Arc<Schema> {
60    Arc::new(Schema::new(
61        schema
62            .fields()
63            .iter()
64            .cloned()
65            .map(|field| match field.data_type() {
66                DataType::Timestamp(_, _) => (*field)
67                    .clone()
68                    //.with_data_type(DataType::Timestamp(TimeUnit::Microsecond, tz.clone())),
69                    .with_data_type(DataType::Decimal128(Decimal128Type::MAX_SCALE as _, 9)),
70                _ => (*field).clone(),
71            })
72            .collect::<Vec<_>>(),
73    ))
74}
75
76pub(crate) fn iter_arrow<R: ChunkReader, T, IntoIterU, U, F>(
77    reader_builder: ArrowReaderBuilder<R>,
78    mut f: F,
79) -> impl Iterator<Item = U>
80where
81    F: FnMut(T) -> IntoIterU,
82    IntoIterU: IntoIterator<Item = U>,
83    T: ArRowDeserialize + ArRowStruct,
84{
85    let field_names = <T>::columns();
86    let projection = ProjectionMask::named_roots(
87        reader_builder.file_metadata().root_data_type(),
88        field_names.as_slice(),
89    );
90    let reader_builder = reader_builder
91        .with_projection(projection.clone())
92        .with_batch_size(ORC_BATCH_SIZE);
93
94    let schema = transform_schema(&reader_builder.schema());
95
96    let reader = reader_builder.with_schema(schema).build();
97
98    T::check_datatype(&DataType::Struct(reader.schema().fields().clone()))
99        .expect("Invalid data type in ORC file");
100
101    reader.flat_map(move |chunk| {
102        let chunk: arrow_array::RecordBatch =
103            chunk.unwrap_or_else(|e| panic!("Could not read chunk: {e}"));
104        let items: Vec<T> = T::from_record_batch(chunk).expect("Could not deserialize from arrow");
105        items.into_iter().flat_map(&mut f).collect::<Vec<_>>()
106    })
107}
108
109pub(crate) fn par_iter_arrow<R: ChunkReader + Send, T, IntoIterU, U: Send, F>(
110    reader_builder: ArrowReaderBuilder<R>,
111    f: F,
112) -> impl ParallelIterator<Item = U>
113where
114    F: Fn(T) -> IntoIterU + Send + Sync,
115    IntoIterU: IntoIterator<Item = U> + Send + Sync,
116    T: ArRowDeserialize + ArRowStruct + Send,
117{
118    let field_names = <T>::columns();
119    let projection = ProjectionMask::named_roots(
120        reader_builder.file_metadata().root_data_type(),
121        field_names.as_slice(),
122    );
123    let reader_builder = reader_builder
124        .with_projection(projection)
125        .with_batch_size(ORC_BATCH_SIZE);
126
127    let schema = transform_schema(&reader_builder.schema());
128
129    let reader = reader_builder.with_schema(schema).build();
130
131    T::check_datatype(&DataType::Struct(reader.schema().fields().clone()))
132        .expect("Invalid data type in ORC file");
133
134    reader.par_bridge().flat_map_iter(move |chunk| {
135        let chunk: arrow_array::RecordBatch =
136            chunk.unwrap_or_else(|e| panic!("Could not read chunk: {e}"));
137        let items: Vec<T> = T::from_record_batch(chunk).expect("Could not deserialize from arrow");
138        items.into_iter().flat_map(&f).collect::<Vec<_>>()
139    })
140}