use crate::error::{Result, RypeError};
use std::path::Path;
use super::InvertedIndex;
use crate::constants::{MAX_INVERTED_BUCKET_IDS, MAX_INVERTED_MINIMIZERS, PARQUET_BATCH_SIZE};
use crate::indices::sharded::ShardInfo;
impl InvertedIndex {
pub fn save_shard_parquet(
&self,
path: &Path,
shard_id: u32,
options: Option<&super::super::parquet::ParquetWriteOptions>,
) -> Result<ShardInfo> {
use arrow::array::{ArrayRef, UInt32Builder, UInt64Builder};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use parquet::arrow::ArrowWriter;
use std::sync::Arc;
let opts = options.cloned().unwrap_or_default();
if self.minimizers.is_empty() {
return Ok(ShardInfo {
shard_id,
min_start: 0,
min_end: 0,
is_last_shard: true,
num_minimizers: 0,
num_bucket_ids: 0,
});
}
debug_assert!(
self.minimizers.windows(2).all(|w| w[0] < w[1]),
"InvertedIndex minimizers must be strictly increasing for Parquet serialization"
);
let schema = Arc::new(Schema::new(vec![
Field::new("minimizer", DataType::UInt64, false),
Field::new("bucket_id", DataType::UInt32, false),
]));
let props = opts.to_writer_properties();
let file = std::fs::File::create(path)
.map_err(|e| RypeError::io(path, "create Parquet shard", e))?;
let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props))?;
let mut minimizer_builder = UInt64Builder::with_capacity(PARQUET_BATCH_SIZE);
let mut bucket_id_builder = UInt32Builder::with_capacity(PARQUET_BATCH_SIZE);
let mut pairs_in_batch = 0;
for (i, &minimizer) in self.minimizers.iter().enumerate() {
let start = self.offsets[i] as usize;
let end = self.offsets[i + 1] as usize;
for &bucket_id in &self.bucket_ids[start..end] {
minimizer_builder.append_value(minimizer);
bucket_id_builder.append_value(bucket_id);
pairs_in_batch += 1;
if pairs_in_batch >= PARQUET_BATCH_SIZE {
let minimizer_array: ArrayRef = Arc::new(minimizer_builder.finish());
let bucket_id_array: ArrayRef = Arc::new(bucket_id_builder.finish());
let batch = RecordBatch::try_new(
schema.clone(),
vec![minimizer_array, bucket_id_array],
)?;
writer.write(&batch)?;
minimizer_builder = UInt64Builder::with_capacity(PARQUET_BATCH_SIZE);
bucket_id_builder = UInt32Builder::with_capacity(PARQUET_BATCH_SIZE);
pairs_in_batch = 0;
}
}
}
if pairs_in_batch > 0 {
let minimizer_array: ArrayRef = Arc::new(minimizer_builder.finish());
let bucket_id_array: ArrayRef = Arc::new(bucket_id_builder.finish());
let batch =
RecordBatch::try_new(schema.clone(), vec![minimizer_array, bucket_id_array])?;
writer.write(&batch)?;
}
writer.close()?;
let min_start = self.minimizers[0];
let min_end = 0;
Ok(ShardInfo {
shard_id,
min_start,
min_end,
is_last_shard: true,
num_minimizers: self.minimizers.len(),
num_bucket_ids: self.bucket_ids.len(),
})
}
pub fn load_shard_parquet_with_params(
path: &Path,
k: usize,
w: usize,
salt: u64,
source_hash: u64,
) -> Result<Self> {
use arrow::array::{Array, UInt32Array, UInt64Array};
use bytes::Bytes;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use rayon::prelude::*;
use std::fs::File;
use std::io::Read;
if !matches!(k, 16 | 32 | 64) {
return Err(RypeError::validation(format!(
"Invalid K value for Parquet shard: {} (must be 16, 32, or 64)",
k
)));
}
let mut file =
File::open(path).map_err(|e| RypeError::io(path, "open Parquet shard", e))?;
let file_size = file.metadata()?.len() as usize;
let mut buffer = Vec::with_capacity(file_size);
file.read_to_end(&mut buffer)?;
let bytes = Bytes::from(buffer);
let builder = ParquetRecordBatchReaderBuilder::try_new(bytes.clone())?;
let metadata = builder.metadata().clone();
let num_row_groups = metadata.num_row_groups();
if num_row_groups == 0 {
return Ok(InvertedIndex {
k,
w,
salt,
source_hash,
minimizers: Vec::new(),
offsets: vec![0],
bucket_ids: Vec::new(),
});
}
let row_group_results: Vec<Result<Vec<(u64, u32)>>> = (0..num_row_groups)
.into_par_iter()
.map(|rg_idx| {
let builder = ParquetRecordBatchReaderBuilder::try_new(bytes.clone())?;
let reader = builder.with_row_groups(vec![rg_idx]).build()?;
let mut pairs = Vec::new();
for batch in reader {
let batch = batch?;
let min_col = batch
.column(0)
.as_any()
.downcast_ref::<UInt64Array>()
.ok_or_else(|| {
RypeError::format(path, "Expected UInt64Array for minimizer column")
})?;
let bid_col = batch
.column(1)
.as_any()
.downcast_ref::<UInt32Array>()
.ok_or_else(|| {
RypeError::format(path, "Expected UInt32Array for bucket_id column")
})?;
for i in 0..batch.num_rows() {
pairs.push((min_col.value(i), bid_col.value(i)));
}
}
Ok(pairs)
})
.collect();
let mut all_minimizers: Vec<u64> = Vec::new();
let mut all_bucket_ids: Vec<u32> = Vec::new();
for result in row_group_results {
let pairs = result?;
for (m, b) in pairs {
all_minimizers.push(m);
all_bucket_ids.push(b);
}
}
if all_minimizers.is_empty() {
return Ok(InvertedIndex {
k,
w,
salt,
source_hash,
minimizers: Vec::new(),
offsets: vec![0],
bucket_ids: Vec::new(),
});
}
let mut minimizers: Vec<u64> = Vec::with_capacity(all_minimizers.len() / 2);
let mut offsets: Vec<u32> = Vec::with_capacity(all_minimizers.len() / 2 + 1);
let mut bucket_ids_out: Vec<u32> = Vec::with_capacity(all_bucket_ids.len());
offsets.push(0);
let mut current_min = all_minimizers[0];
let mut prev_min = all_minimizers[0];
minimizers.push(current_min);
for (i, (&m, &b)) in all_minimizers.iter().zip(all_bucket_ids.iter()).enumerate() {
if m < prev_min {
return Err(RypeError::format(
path,
format!(
"Parquet shard has unsorted minimizers at row {}: {} < {} (corrupt file?)",
i, m, prev_min
),
));
}
prev_min = m;
if m != current_min {
offsets.push(bucket_ids_out.len() as u32);
minimizers.push(m);
current_min = m;
}
bucket_ids_out.push(b);
}
offsets.push(bucket_ids_out.len() as u32);
if offsets.windows(2).any(|w| w[0] > w[1]) {
return Err(RypeError::format(
path,
"Parquet shard has invalid CSR offsets: first offset must be 0 and offsets must be monotonic",
));
}
if minimizers.windows(2).any(|w| w[0] >= w[1]) {
return Err(RypeError::format(
path,
"Parquet shard has duplicate minimizers after merge (corrupt file?)",
));
}
if minimizers.len() > MAX_INVERTED_MINIMIZERS {
return Err(RypeError::overflow(
"Parquet shard minimizers",
MAX_INVERTED_MINIMIZERS,
minimizers.len(),
));
}
if bucket_ids_out.len() > MAX_INVERTED_BUCKET_IDS {
return Err(RypeError::overflow(
"Parquet shard bucket IDs",
MAX_INVERTED_BUCKET_IDS,
bucket_ids_out.len(),
));
}
minimizers.shrink_to_fit();
offsets.shrink_to_fit();
bucket_ids_out.shrink_to_fit();
Ok(InvertedIndex {
k,
w,
salt,
source_hash,
minimizers,
offsets,
bucket_ids: bucket_ids_out,
})
}
pub fn get_parquet_row_group_stats(path: &Path) -> Result<Vec<(usize, u64, u64)>> {
use parquet::file::reader::FileReader;
use parquet::file::serialized_reader::SerializedFileReader;
use parquet::file::statistics::Statistics;
use std::fs::File;
let file = File::open(path)?;
let reader = SerializedFileReader::new(file)?;
let metadata = reader.metadata();
let num_row_groups = metadata.num_row_groups();
let mut stats = Vec::with_capacity(num_row_groups);
for rg_idx in 0..num_row_groups {
let rg_meta = metadata.row_group(rg_idx);
let col_meta = rg_meta.column(0);
let (rg_min, rg_max) = if let Some(Statistics::Int64(s)) = col_meta.statistics() {
let min = s.min_opt().map(|v| *v as u64).unwrap_or(0);
let max = s.max_opt().map(|v| *v as u64).unwrap_or(u64::MAX);
(min, max)
} else {
(0, u64::MAX) };
stats.push((rg_idx, rg_min, rg_max));
}
Ok(stats)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::IndexMetadata;
use anyhow::Result;
use std::collections::HashMap;
use tempfile::TempDir;
fn build_test_inverted_index(
k: usize,
w: usize,
salt: u64,
buckets: Vec<(u32, &str, Vec<u64>)>,
) -> InvertedIndex {
let mut bucket_map: HashMap<u32, Vec<u64>> = HashMap::new();
let mut bucket_names: HashMap<u32, String> = HashMap::new();
let mut bucket_minimizer_counts: HashMap<u32, usize> = HashMap::new();
for (id, name, mins) in buckets {
bucket_minimizer_counts.insert(id, mins.len());
bucket_map.insert(id, mins);
bucket_names.insert(id, name.to_string());
}
let metadata = IndexMetadata {
k,
w,
salt,
bucket_names,
bucket_sources: HashMap::new(),
bucket_minimizer_counts,
largest_shard_entries: 0,
bucket_file_stats: None,
};
InvertedIndex::build_from_bucket_map(k, w, salt, &bucket_map, &metadata)
}
#[test]
fn test_inverted_parquet_roundtrip() -> Result<()> {
let tmp = TempDir::new()?;
let path = tmp.path().join("shard.0.parquet");
let inverted = build_test_inverted_index(
64,
50,
0xABCD,
vec![
(1, "A", vec![100, 200, 300]),
(2, "B", vec![200, 300, 400]),
(3, "C", vec![500, 600]),
],
);
let original_minimizers = inverted.minimizers().to_vec();
let original_bucket_ids = inverted.bucket_ids().to_vec();
let shard_info = inverted.save_shard_parquet(&path, 0, None)?;
assert_eq!(shard_info.shard_id, 0);
assert_eq!(shard_info.num_minimizers, inverted.num_minimizers());
assert_eq!(shard_info.num_bucket_ids, inverted.num_bucket_entries());
let loaded = InvertedIndex::load_shard_parquet_with_params(
&path,
inverted.k,
inverted.w,
inverted.salt,
inverted.source_hash,
)?;
assert_eq!(loaded.k, 64);
assert_eq!(loaded.w, 50);
assert_eq!(loaded.salt, 0xABCD);
assert_eq!(loaded.num_minimizers(), inverted.num_minimizers());
assert_eq!(loaded.num_bucket_entries(), inverted.num_bucket_entries());
assert_eq!(loaded.minimizers(), original_minimizers.as_slice());
assert_eq!(loaded.bucket_ids(), original_bucket_ids.as_slice());
let hits = loaded.get_bucket_hits(&[200, 300, 500]);
assert_eq!(hits.get(&1), Some(&2)); assert_eq!(hits.get(&2), Some(&2)); assert_eq!(hits.get(&3), Some(&1));
Ok(())
}
#[test]
fn test_inverted_parquet_empty() -> Result<()> {
let tmp = TempDir::new()?;
let path = tmp.path().join("empty.parquet");
let inverted = build_test_inverted_index(64, 50, 0, vec![]);
let shard_info = inverted.save_shard_parquet(&path, 0, None)?;
assert_eq!(shard_info.num_minimizers, 0);
assert_eq!(shard_info.num_bucket_ids, 0);
Ok(())
}
#[test]
fn test_inverted_parquet_large_values() -> Result<()> {
let tmp = TempDir::new()?;
let path = tmp.path().join("large.parquet");
let minimizers: Vec<u64> = vec![
1,
1000,
1_000_000,
1_000_000_000,
1_000_000_000_000,
u64::MAX / 2,
u64::MAX - 100,
u64::MAX - 1,
];
let inverted =
build_test_inverted_index(64, 50, 0x12345678, vec![(1, "test", minimizers.clone())]);
inverted.save_shard_parquet(&path, 0, None)?;
let loaded = InvertedIndex::load_shard_parquet_with_params(
&path,
inverted.k,
inverted.w,
inverted.salt,
inverted.source_hash,
)?;
for &m in &minimizers {
let found = loaded.minimizers().binary_search(&m).is_ok();
assert!(found, "Large minimizer {} lost", m);
}
Ok(())
}
#[test]
fn test_inverted_parquet_direct_load() -> Result<()> {
let tmp = TempDir::new()?;
let parquet_path = tmp.path().join("shard.parquet");
let inverted = build_test_inverted_index(64, 50, 0, vec![(1, "A", vec![100, 200, 300])]);
inverted.save_shard_parquet(&parquet_path, 0, None)?;
let loaded = InvertedIndex::load_shard_parquet_with_params(
&parquet_path,
inverted.k,
inverted.w,
inverted.salt,
inverted.source_hash,
)?;
assert_eq!(loaded.num_minimizers(), inverted.num_minimizers());
Ok(())
}
#[test]
fn test_inverted_parquet_many_buckets() -> Result<()> {
let tmp = TempDir::new()?;
let path = tmp.path().join("many_buckets.parquet");
let shared = vec![100, 200, 300, 400, 500];
let buckets: Vec<(u32, &str, Vec<u64>)> = (0..50u32)
.map(|i| {
let mut mins = shared.clone();
mins.push(1000 + i as u64); mins.sort();
let name: &str = Box::leak(format!("bucket_{}", i).into_boxed_str());
(i, name, mins)
})
.collect();
let inverted = build_test_inverted_index(64, 50, 0xBEEF, buckets);
inverted.save_shard_parquet(&path, 0, None)?;
let loaded = InvertedIndex::load_shard_parquet_with_params(
&path,
inverted.k,
inverted.w,
inverted.salt,
inverted.source_hash,
)?;
let hits = loaded.get_bucket_hits(&[200]);
assert_eq!(hits.len(), 50);
for i in 0..50 {
assert_eq!(hits.get(&i), Some(&1));
}
Ok(())
}
}