#![allow(dead_code)]
use std::{collections::BTreeSet, convert::TryFrom, fmt, sync::Arc};
use aisle::RowFilter as AisleRowFilter;
use arrow_array::{Array, ArrayRef, RecordBatch, UInt32Array, UInt64Array};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use arrow_select::{concat::concat_batches as concat_record_batches, take::take as arrow_take};
use fusio::{
DynFs,
error::Error as FsError,
executor::Executor,
fs::OpenOptions,
path::{Path, PathPart},
};
use fusio_parquet::{reader::AsyncReader, writer::AsyncWriter};
use futures::stream::{self, BoxStream, StreamExt};
use parquet::{
arrow::{
ProjectionMask,
arrow_reader::{
ArrowReaderMetadata, ArrowReaderOptions, RowFilter as ParquetRowFilter, RowSelection,
},
async_reader::ParquetRecordBatchStreamBuilder,
async_writer::AsyncArrowWriter,
},
basic::{Compression, ZstdLevel},
errors::ParquetError,
file::{
metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader},
properties::{EnabledStatistics, WriterProperties},
},
schema::types::ColumnPath,
};
use serde::{Deserialize, Serialize};
pub use crate::ondisk::merge::{SsTableMerger, SsTableStreamBatch};
use crate::{
extractor::{KeyExtractError, KeyProjection, projection_for_columns},
id::FileId,
inmem::immutable::{
ImmutableSegment,
memtable::{DeleteSidecar, MVCC_COMMIT_COL, MvccColumns},
},
key::{KeyOwned, KeyOwnedError},
manifest::ManifestError,
mvcc::Timestamp,
ondisk::{
merge::{decode_delete_sidecar, extract_delete_key_at, extract_key_at},
scan::{ParquetStream, UnpinExec},
},
query::{Expr, ScalarValue},
};
const MVCC_SEQUENCE_COL: &str = "_sequence";
const SST_DATA_PAGE_SIZE_LIMIT: usize = 8 * 1024 * 1024;
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct SsTableId(u64);
impl SsTableId {
pub fn new(raw: u64) -> Self {
Self(raw)
}
pub fn raw(&self) -> u64 {
self.0
}
}
pub const DEFAULT_MERGE_ITERATION_BUDGET: usize = usize::MAX;
pub const DEFAULT_MAX_ROW_GROUP_ROWS: usize = 1024 * 1024;
#[derive(Clone)]
pub struct SsTableConfig {
schema: SchemaRef,
target_level: usize,
compression: SsTableCompression,
fs: Arc<dyn DynFs>,
root: Path,
key_extractor: Option<Arc<dyn KeyProjection>>,
merge_iteration_budget: usize,
max_row_group_rows: usize,
}
impl SsTableConfig {
pub fn new(schema: SchemaRef, fs: Arc<dyn DynFs>, root: Path) -> Self {
Self {
schema,
target_level: 0,
compression: SsTableCompression::default(),
fs,
root,
key_extractor: None,
merge_iteration_budget: DEFAULT_MERGE_ITERATION_BUDGET,
max_row_group_rows: DEFAULT_MAX_ROW_GROUP_ROWS,
}
}
pub fn with_target_level(mut self, level: usize) -> Self {
self.target_level = level;
self
}
pub fn with_compression(mut self, compression: SsTableCompression) -> Self {
self.compression = compression;
self
}
pub fn schema(&self) -> &SchemaRef {
&self.schema
}
pub fn target_level(&self) -> usize {
self.target_level
}
pub fn compression(&self) -> SsTableCompression {
self.compression
}
pub fn fs(&self) -> &Arc<dyn DynFs> {
&self.fs
}
pub fn root(&self) -> &Path {
&self.root
}
pub(crate) fn level_dir(&self, level: usize) -> Result<Path, SsTableError> {
let level_name = format!("L{level}");
let part = PathPart::parse(&level_name)
.map_err(|err| SsTableError::InvalidPath(err.to_string()))?;
Ok(self.root.child(part))
}
pub fn with_key_extractor(mut self, extractor: Arc<dyn KeyProjection>) -> Self {
self.key_extractor = Some(extractor);
self
}
pub fn key_extractor(&self) -> Option<&Arc<dyn KeyProjection>> {
self.key_extractor.as_ref()
}
pub fn with_merge_iteration_budget(mut self, budget: usize) -> Self {
self.merge_iteration_budget = budget.max(1);
self
}
pub fn merge_iteration_budget(&self) -> usize {
self.merge_iteration_budget
}
pub fn with_max_row_group_rows(mut self, rows: usize) -> Self {
self.max_row_group_rows = rows.max(1);
self
}
pub fn max_row_group_rows(&self) -> usize {
self.max_row_group_rows
}
}
impl fmt::Debug for SsTableConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SsTableConfig")
.field("schema_fields", &self.schema.fields().len())
.field("target_level", &self.target_level)
.field("compression", &self.compression)
.field("root", &self.root)
.field("has_key_extractor", &self.key_extractor.is_some())
.field("merge_iteration_budget", &self.merge_iteration_budget)
.field("max_row_group_rows", &self.max_row_group_rows)
.finish()
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
pub enum SsTableCompression {
None,
#[default]
Zstd,
}
#[derive(Debug)]
pub struct SsTable {
descriptor: SsTableDescriptor,
}
impl SsTable {
pub fn new(descriptor: SsTableDescriptor) -> Self {
Self { descriptor }
}
pub fn descriptor(&self) -> &SsTableDescriptor {
&self.descriptor
}
}
#[derive(Clone, Debug)]
pub struct SsTableDescriptor {
id: SsTableId,
level: usize,
stats: Option<SsTableStats>,
wal_ids: Option<Vec<FileId>>,
data_path: Option<Path>,
mvcc_path: Option<Path>,
delete_path: Option<Path>,
}
impl SsTableDescriptor {
pub fn new(id: SsTableId, level: usize) -> Self {
Self {
id,
level,
stats: None,
wal_ids: None,
data_path: None,
mvcc_path: None,
delete_path: None,
}
}
pub fn with_stats(mut self, stats: SsTableStats) -> Self {
self.stats = Some(stats);
self
}
pub fn with_wal_ids(mut self, wal_ids: Option<Vec<FileId>>) -> Self {
self.wal_ids = wal_ids;
self
}
pub fn with_storage_paths(mut self, data_path: Path, delete_path: Option<Path>) -> Self {
self.data_path = Some(data_path);
self.delete_path = delete_path;
self
}
pub fn id(&self) -> &SsTableId {
&self.id
}
pub fn level(&self) -> usize {
self.level
}
pub fn stats(&self) -> Option<&SsTableStats> {
self.stats.as_ref()
}
pub fn wal_ids(&self) -> Option<&[FileId]> {
self.wal_ids.as_deref()
}
pub fn data_path(&self) -> Option<&Path> {
self.data_path.as_ref()
}
pub fn mvcc_path(&self) -> Option<&Path> {
self.mvcc_path.as_ref()
}
pub fn delete_path(&self) -> Option<&Path> {
self.delete_path.as_ref()
}
}
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct SsTableStats {
pub(crate) rows: usize,
pub(crate) bytes: usize,
pub(crate) tombstones: usize,
pub(crate) min_key: Option<KeyOwned>,
pub(crate) max_key: Option<KeyOwned>,
pub(crate) min_commit_ts: Option<Timestamp>,
pub(crate) max_commit_ts: Option<Timestamp>,
}
impl PartialEq for SsTableStats {
fn eq(&self, _other: &Self) -> bool {
true
}
}
#[derive(Debug, thiserror::Error)]
pub enum SsTableError {
#[error("sstable operation has not been implemented yet")]
Unimplemented,
#[error("no immutable segments available for SSTable flush")]
NoImmutableSegments,
#[error("parquet writer already closed")]
WriterClosed,
#[error("sstable merge received no input batches")]
EmptyMergeInput,
#[error("sstable merge missing output id allocator for split outputs")]
MissingIdAllocator,
#[error("sstable merge produced no output")]
EmptyMergeOutput,
#[error("filesystem error: {0}")]
Fs(#[from] FsError),
#[error("parquet write error: {0}")]
Parquet(#[from] ParquetError),
#[error("sstable parquet file {path} missing page indexes: {reason}")]
MissingPageIndex { path: String, reason: String },
#[error("invalid scan selection for SST: {selection}")]
InvalidScanSelection { selection: &'static str },
#[error("row filter predicate unsupported: {reason}")]
RowFilterPredicate {
reason: String,
},
#[error("row selection length mismatch: {reason}")]
RowSelection {
reason: String,
},
#[error("invalid sstable path component: {0}")]
InvalidPath(String),
#[error("sstable sidecar stream length mismatch: {0}")]
SidecarMismatch(&'static str),
#[error("failure when persist into manifest: {0}")]
Manifest(#[from] ManifestError),
#[error("sstable key extractor not configured")]
MissingKeyExtractor,
#[error("key extraction failed: {0}")]
KeyExtract(#[from] KeyExtractError),
#[error("key materialization failed: {0}")]
KeyOwned(#[from] KeyOwnedError),
#[error("merge iteration budget {budget} exceeded")]
MergeIterationBudgetExceeded {
budget: usize,
},
}
struct StagedTableStats {
segments: usize,
rows: usize,
tombstones: usize,
min_key: Option<KeyOwned>,
max_key: Option<KeyOwned>,
min_commit_ts: Option<Timestamp>,
max_commit_ts: Option<Timestamp>,
}
impl StagedTableStats {
fn new() -> Self {
Self {
segments: 0,
rows: 0,
tombstones: 0,
min_key: None,
max_key: None,
min_commit_ts: None,
max_commit_ts: None,
}
}
fn update_key_bounds(&mut self, seg_min: &KeyOwned, seg_max: &KeyOwned) {
match self.min_key {
Some(ref existing) if existing <= seg_min => {}
_ => self.min_key = Some(seg_min.clone()),
}
match self.max_key {
Some(ref existing) if existing >= seg_max => {}
_ => self.max_key = Some(seg_max.clone()),
}
}
}
impl StagedTableStats {
fn update_commit_bounds(&mut self, seg_min: Timestamp, seg_max: Timestamp) {
match self.min_commit_ts {
Some(curr) if curr <= seg_min => {}
_ => self.min_commit_ts = Some(seg_min),
}
match self.max_commit_ts {
Some(curr) if curr >= seg_max => {}
_ => self.max_commit_ts = Some(seg_max),
}
}
}
impl fmt::Debug for StagedTableStats {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StagedTableStats")
.field("segments", &self.segments)
.field("rows", &self.rows)
.field("tombstones", &self.tombstones)
.field("has_min_key", &self.min_key.is_some())
.field("has_max_key", &self.max_key.is_some())
.field("min_commit_ts", &self.min_commit_ts)
.field("max_commit_ts", &self.max_commit_ts)
.finish()
}
}
impl SsTableStats {
fn from_staged(staged: StagedTableStats, bytes: usize) -> Self {
Self {
rows: staged.rows,
bytes,
tombstones: staged.tombstones,
min_key: staged.min_key,
max_key: staged.max_key,
min_commit_ts: staged.min_commit_ts,
max_commit_ts: staged.max_commit_ts,
}
}
}
#[derive(Clone, Debug)]
struct StagedSegment {
data_with_commit: RecordBatch,
deletes: DeleteSidecar,
}
pub(super) struct ParquetTableWriter {
config: Arc<SsTableConfig>,
descriptor: SsTableDescriptor,
staged: StagedTableStats,
segments: Vec<StagedSegment>,
wal_ids: Option<Vec<FileId>>,
staged_immutable_segments: usize,
}
impl ParquetTableWriter {
pub(super) fn new(config: Arc<SsTableConfig>, descriptor: SsTableDescriptor) -> Self {
Self {
config,
descriptor,
staged: StagedTableStats::new(),
segments: Vec::new(),
wal_ids: None,
staged_immutable_segments: 0,
}
}
pub(super) fn set_wal_ids(&mut self, wal_ids: Option<Vec<FileId>>) {
self.wal_ids = wal_ids;
}
pub(super) fn stage_immutable(
&mut self,
segment: &ImmutableSegment,
) -> Result<(), SsTableError> {
let data_rows = segment.storage().num_rows();
let delete_rows = segment.delete_sidecar().len();
if data_rows == 0 && delete_rows == 0 {
return Ok(());
}
let mut segment_tombstones = 0usize;
let mut seg_min_ts: Option<Timestamp> = None;
let mut seg_max_ts: Option<Timestamp> = None;
for entry in segment.row_iter() {
if entry.tombstone {
segment_tombstones += 1;
}
seg_min_ts = Some(match seg_min_ts {
Some(current) if current <= entry.commit_ts => current,
_ => entry.commit_ts,
});
seg_max_ts = Some(match seg_max_ts {
Some(current) if current >= entry.commit_ts => current,
_ => entry.commit_ts,
});
}
debug_assert!(
seg_min_ts.is_some() && seg_max_ts.is_some(),
"immutable segment with rows must yield commit timestamp bounds"
);
if let (Some(min_key), Some(max_key)) = (segment.min_key(), segment.max_key()) {
self.staged.update_key_bounds(&min_key, &max_key);
}
if let (Some(min_ts), Some(max_ts)) = (seg_min_ts, seg_max_ts) {
self.staged.update_commit_bounds(min_ts, max_ts);
}
self.staged.segments += 1;
self.staged.rows += data_rows;
self.staged.tombstones += segment_tombstones;
let mvcc = segment.mvcc_columns().clone();
let data_with_commit = append_commit_column(segment.storage().clone(), &mvcc)?;
let staged_segment = StagedSegment {
data_with_commit,
deletes: segment.delete_sidecar().clone(),
};
self.segments.push(staged_segment);
self.staged_immutable_segments += 1;
Ok(())
}
pub(super) fn stage_stream_batch(
&mut self,
batch: &SsTableStreamBatch,
extractor: &dyn KeyProjection,
) -> Result<(), SsTableError> {
let data_rows = batch.data.num_rows();
let delete_rows = batch.delete.as_ref().map(|b| b.num_rows()).unwrap_or(0);
if data_rows == 0 && delete_rows == 0 {
return Ok(());
}
let mut segment_tombstones = 0usize;
let mut seg_min_ts = None;
let mut seg_max_ts = None;
if data_rows > 0 {
let first = extract_key_at(&batch.data, extractor, 0)?;
let last = extract_key_at(&batch.data, extractor, data_rows - 1)?;
self.staged.update_key_bounds(&first, &last);
let commits = commit_ts_column(&batch.data)?;
for idx in 0..data_rows {
let ts = Timestamp::new(commits.value(idx));
seg_min_ts = Some(seg_min_ts.map(|t| std::cmp::min(t, ts)).unwrap_or(ts));
seg_max_ts = Some(seg_max_ts.map(|t| std::cmp::max(t, ts)).unwrap_or(ts));
}
}
if let Some(delete_batch) = &batch.delete
&& delete_batch.num_rows() > 0
{
segment_tombstones += delete_batch.num_rows();
let first = extract_delete_key_at(delete_batch, extractor, 0)?;
let last = extract_delete_key_at(delete_batch, extractor, delete_batch.num_rows() - 1)?;
self.staged.update_key_bounds(&first, &last);
let commits = delete_batch
.column_by_name(MVCC_COMMIT_COL)
.and_then(|arr| arr.as_any().downcast_ref::<UInt64Array>())
.ok_or(SsTableError::SidecarMismatch(
"delete sidecar missing commit_ts column",
))?;
for idx in 0..delete_batch.num_rows() {
let ts = Timestamp::new(commits.value(idx));
seg_min_ts = Some(seg_min_ts.map(|t| std::cmp::min(t, ts)).unwrap_or(ts));
seg_max_ts = Some(seg_max_ts.map(|t| std::cmp::max(t, ts)).unwrap_or(ts));
}
}
if let (Some(min_ts), Some(max_ts)) = (seg_min_ts, seg_max_ts) {
self.staged.update_commit_bounds(min_ts, max_ts);
}
self.staged.segments += 1;
self.staged.rows += data_rows;
self.staged.tombstones += segment_tombstones;
parse_mvcc_columns_from_data(&batch.data)?;
let deletes = if let Some(delete_batch) = &batch.delete {
decode_delete_sidecar(delete_batch, extractor)?
} else {
DeleteSidecar::empty(&extractor.key_schema())
};
let staged_segment = StagedSegment {
data_with_commit: batch.data.clone(),
deletes,
};
self.segments.push(staged_segment);
Ok(())
}
pub(super) async fn finish<E>(self, executor: E) -> Result<SsTable, SsTableError>
where
E: Executor + Clone,
{
if self.staged.segments == 0 {
return Err(SsTableError::NoImmutableSegments);
}
let ParquetTableWriter {
config,
descriptor,
staged,
mut segments,
wal_ids,
staged_immutable_segments,
} = self;
if staged_immutable_segments > 1 {
segments = normalize_immutable_flush_segments(segments, config.as_ref())?;
}
let mut ctx = WriteContext::new(Arc::clone(&config), &descriptor, executor).await?;
for segment in &segments {
ctx.write_segment(segment).await?;
}
let (data_path, delete_path, data_bytes) = ctx.finish().await?;
let stats =
SsTableStats::from_staged(staged, usize::try_from(data_bytes).unwrap_or(usize::MAX));
Ok(SsTable::new(
descriptor
.with_stats(stats)
.with_storage_paths(data_path, delete_path)
.with_wal_ids(wal_ids),
))
}
#[cfg(all(test, feature = "tokio"))]
fn plan(&self) -> &StagedTableStats {
&self.staged
}
}
#[derive(Debug)]
struct SortKey {
key: KeyOwned,
commit_ts: Timestamp,
sequence: u64,
row_idx: u32,
}
fn normalize_immutable_flush_segments(
segments: Vec<StagedSegment>,
config: &SsTableConfig,
) -> Result<Vec<StagedSegment>, SsTableError> {
if segments.len() <= 1 {
return Ok(segments);
}
let data_extractor = resolve_data_sort_extractor(config, &segments)?;
let sorted_data = sort_data_rows(&segments, data_extractor.as_ref())?;
let sorted_deletes = sort_delete_rows(&segments)?;
Ok(vec![StagedSegment {
data_with_commit: sorted_data,
deletes: sorted_deletes,
}])
}
fn resolve_data_sort_extractor(
config: &SsTableConfig,
segments: &[StagedSegment],
) -> Result<Arc<dyn KeyProjection>, SsTableError> {
if let Some(extractor) = config.key_extractor() {
return Ok(Arc::clone(extractor));
}
let first_segment = segments.first().ok_or(SsTableError::NoImmutableSegments)?;
let data_schema = first_segment.data_with_commit.schema();
let key_schema = first_segment.deletes.key_batch().schema();
let mut key_indices = Vec::with_capacity(key_schema.fields().len());
for key_field in key_schema.fields() {
let Some(index) = data_schema
.fields()
.iter()
.position(|field| field.name() == key_field.name())
else {
return Err(SsTableError::SidecarMismatch(
"failed to derive data key projection from immutable delete schema",
));
};
key_indices.push(index);
}
let projection = projection_for_columns(data_schema, key_indices)?;
Ok(projection.into())
}
fn sort_data_rows(
segments: &[StagedSegment],
extractor: &dyn KeyProjection,
) -> Result<RecordBatch, SsTableError> {
let Some(first) = segments.first() else {
return Err(SsTableError::NoImmutableSegments);
};
let data_schema = first.data_with_commit.schema();
let mut all_batches = Vec::with_capacity(segments.len());
let mut keys = Vec::new();
let mut row_offset = 0usize;
for segment in segments {
let batch = &segment.data_with_commit;
if batch.schema().as_ref() != data_schema.as_ref() {
return Err(SsTableError::SidecarMismatch(
"staged immutable data schema mismatch",
));
}
all_batches.push(batch.clone());
if batch.num_rows() == 0 {
continue;
}
let row_indices: Vec<usize> = (0..batch.num_rows()).collect();
let key_rows = extractor.project_view(batch, &row_indices)?;
let commits = commit_ts_column(batch)?;
let sequence_col = batch
.column_by_name(MVCC_SEQUENCE_COL)
.and_then(|arr| arr.as_any().downcast_ref::<UInt64Array>());
for (row, key_row) in key_rows.into_iter().enumerate() {
let key = KeyOwned::from_key_row(&key_row)?;
let row_idx = u32::try_from(row_offset.saturating_add(row)).map_err(|_| {
SsTableError::SidecarMismatch("staged immutable row index overflowed u32")
})?;
let sequence = sequence_col.map(|col| col.value(row)).unwrap_or(row as u64);
keys.push(SortKey {
key,
commit_ts: Timestamp::new(commits.value(row)),
sequence,
row_idx,
});
}
row_offset = row_offset.saturating_add(batch.num_rows());
}
let merged = if all_batches.len() == 1 {
all_batches.pop().expect("single batch available")
} else {
concat_record_batches(&data_schema, &all_batches)
.map_err(|err| SsTableError::Parquet(ParquetError::ArrowError(err.to_string())))?
};
if keys.is_empty() {
return Ok(merged);
}
keys.sort_by(|lhs, rhs| {
lhs.key
.cmp(&rhs.key)
.then_with(|| rhs.commit_ts.cmp(&lhs.commit_ts))
.then_with(|| rhs.sequence.cmp(&lhs.sequence))
.then_with(|| lhs.row_idx.cmp(&rhs.row_idx))
});
let sorted_indices: Vec<u32> = keys.into_iter().map(|entry| entry.row_idx).collect();
take_record_batch(&merged, &sorted_indices)
}
fn sort_delete_rows(segments: &[StagedSegment]) -> Result<DeleteSidecar, SsTableError> {
let Some(first) = segments.first() else {
return Err(SsTableError::NoImmutableSegments);
};
let key_schema = first.deletes.key_batch().schema();
let key_columns = key_schema.fields().len();
let mut all_batches = Vec::with_capacity(segments.len());
let mut keys = Vec::new();
let mut row_offset = 0usize;
for segment in segments {
if segment.deletes.key_batch().schema().as_ref() != key_schema.as_ref() {
return Err(SsTableError::SidecarMismatch(
"staged immutable delete schema mismatch",
));
}
let batch = segment
.deletes
.to_record_batch()
.map_err(|err| SsTableError::Parquet(ParquetError::ArrowError(err.to_string())))?;
all_batches.push(batch.clone());
}
let Some(delete_schema) = all_batches.first().map(RecordBatch::schema) else {
return Ok(DeleteSidecar::empty(&key_schema));
};
let extractor = projection_for_columns(delete_schema.clone(), (0..key_columns).collect())?;
let extractor: Arc<dyn KeyProjection> = extractor.into();
for batch in &all_batches {
if batch.num_rows() == 0 {
continue;
}
let row_indices: Vec<usize> = (0..batch.num_rows()).collect();
let key_rows = extractor.project_view(batch, &row_indices)?;
let commits = batch
.column_by_name(MVCC_COMMIT_COL)
.and_then(|arr| arr.as_any().downcast_ref::<UInt64Array>())
.ok_or(SsTableError::SidecarMismatch(
"delete sidecar missing commit_ts column",
))?;
let sequence_col = batch
.column_by_name(MVCC_SEQUENCE_COL)
.and_then(|arr| arr.as_any().downcast_ref::<UInt64Array>());
for (row, key_row) in key_rows.into_iter().enumerate() {
let key = KeyOwned::from_key_row(&key_row)?;
let row_idx = u32::try_from(row_offset.saturating_add(row)).map_err(|_| {
SsTableError::SidecarMismatch("staged delete row index overflowed u32")
})?;
let sequence = sequence_col.map(|col| col.value(row)).unwrap_or(row as u64);
keys.push(SortKey {
key,
commit_ts: Timestamp::new(commits.value(row)),
sequence,
row_idx,
});
}
row_offset = row_offset.saturating_add(batch.num_rows());
}
let merged = if all_batches.len() == 1 {
all_batches.pop().expect("single delete batch available")
} else {
concat_record_batches(&delete_schema, &all_batches)
.map_err(|err| SsTableError::Parquet(ParquetError::ArrowError(err.to_string())))?
};
if keys.is_empty() {
return Ok(DeleteSidecar::empty(&key_schema));
}
keys.sort_by(|lhs, rhs| {
lhs.key
.cmp(&rhs.key)
.then_with(|| rhs.commit_ts.cmp(&lhs.commit_ts))
.then_with(|| rhs.sequence.cmp(&lhs.sequence))
.then_with(|| lhs.row_idx.cmp(&rhs.row_idx))
});
let sorted_indices: Vec<u32> = keys.into_iter().map(|entry| entry.row_idx).collect();
let sorted = take_record_batch(&merged, &sorted_indices)?;
decode_delete_sidecar(&sorted, extractor.as_ref())
}
fn bloom_filter_columns(config: &SsTableConfig, schema: &SchemaRef) -> Vec<ColumnPath> {
let Some(extractor) = config.key_extractor() else {
return Vec::new();
};
let base_schema = config.schema();
extractor
.key_indices()
.iter()
.filter_map(|index| {
base_schema.fields().get(*index).and_then(|field| {
if schema.field_with_name(field.name()).is_ok() {
Some(ColumnPath::from(field.name().as_str()))
} else {
None
}
})
})
.collect()
}
fn writer_properties(config: &SsTableConfig, schema: &SchemaRef) -> WriterProperties {
let mut builder = match config.compression() {
SsTableCompression::None => {
WriterProperties::builder().set_compression(Compression::UNCOMPRESSED)
}
SsTableCompression::Zstd => {
WriterProperties::builder().set_compression(Compression::ZSTD(ZstdLevel::default()))
}
};
builder = builder
.set_statistics_enabled(EnabledStatistics::Page)
.set_offset_index_disabled(false)
.set_data_page_size_limit(SST_DATA_PAGE_SIZE_LIMIT)
.set_max_row_group_size(config.max_row_group_rows());
for column in bloom_filter_columns(config, schema) {
builder = builder.set_column_bloom_filter_enabled(column, true);
}
builder.build()
}
fn append_commit_column(
batch: RecordBatch,
mvcc: &MvccColumns,
) -> Result<RecordBatch, SsTableError> {
if mvcc.commit_ts.len() != batch.num_rows() {
return Err(SsTableError::SidecarMismatch(
"commit_ts length mismatch data rows",
));
}
let commit_values = UInt64Array::from_iter_values(mvcc.commit_ts.iter().map(|ts| ts.get()));
let mut fields = batch.schema().fields().to_vec();
fields.push(Field::new(MVCC_COMMIT_COL, DataType::UInt64, false).into());
let mut columns = batch.columns().to_vec();
columns.push(Arc::new(commit_values) as ArrayRef);
let new_schema = Arc::new(Schema::new_with_metadata(
fields,
batch.schema().metadata().clone(),
));
RecordBatch::try_new(new_schema, columns)
.map_err(|err| SsTableError::Parquet(ParquetError::ArrowError(err.to_string())))
}
fn commit_ts_column(batch: &RecordBatch) -> Result<Arc<UInt64Array>, SsTableError> {
batch
.column_by_name(MVCC_COMMIT_COL)
.and_then(|arr| arr.as_any().downcast_ref::<UInt64Array>())
.map(|col| Arc::new(col.clone()))
.ok_or(SsTableError::SidecarMismatch(
"commit_ts column missing or wrong type",
))
}
fn parse_mvcc_columns_from_data(batch: &RecordBatch) -> Result<MvccColumns, SsTableError> {
let commit_col = commit_ts_column(batch)?;
if commit_col.null_count() > 0 {
return Err(SsTableError::SidecarMismatch("commit_ts contains nulls"));
}
let commits: Vec<Timestamp> = commit_col
.values()
.iter()
.map(|v| Timestamp::new(*v))
.collect();
let tombstone = vec![false; commits.len()];
Ok(MvccColumns::new(commits, tombstone))
}
struct WriteContext<E>
where
E: Executor + Clone,
{
config: Arc<SsTableConfig>,
fs: Arc<dyn DynFs>,
data_path: Path,
delete_path: Path,
data_writer: Option<AsyncArrowWriter<AsyncWriter<E>>>,
delete_writer: Option<AsyncArrowWriter<AsyncWriter<E>>>,
delete_written: bool,
compression: SsTableCompression,
executor: E,
}
impl<E> WriteContext<E>
where
E: Executor + Clone,
{
async fn new(
config: Arc<SsTableConfig>,
descriptor: &SsTableDescriptor,
executor: E,
) -> Result<Self, SsTableError> {
let fs = Arc::clone(config.fs());
let (dir_path, data_path, delete_path) =
build_table_paths(&config, descriptor.level(), descriptor.id())?;
fs.create_dir_all(&dir_path).await?;
let data_options = OpenOptions::default()
.create(true)
.write(true)
.truncate(true);
let data_file = fs.open_options(&data_path, data_options).await?;
let mut fields = config.schema().fields().to_vec();
fields.push(Field::new(MVCC_COMMIT_COL, DataType::UInt64, false).into());
let data_schema = Arc::new(Schema::new_with_metadata(
fields,
config.schema().metadata().clone(),
));
let data_properties = writer_properties(config.as_ref(), &data_schema);
let data_writer = AsyncArrowWriter::try_new(
AsyncWriter::new(data_file, executor.clone()),
data_schema,
Some(data_properties),
)?;
Ok(Self {
config: Arc::clone(&config),
fs,
data_path,
data_writer: Some(data_writer),
delete_path,
delete_writer: None,
delete_written: false,
compression: config.compression(),
executor,
})
}
async fn write_segment(&mut self, segment: &StagedSegment) -> Result<(), SsTableError> {
if let Some(writer) = self.data_writer.as_mut() {
writer.write(&segment.data_with_commit).await?;
} else {
return Err(SsTableError::WriterClosed);
}
if !segment.deletes.is_empty() {
let delete_batch = segment
.deletes
.to_record_batch()
.map_err(|err| SsTableError::Parquet(ParquetError::ArrowError(err.to_string())))?;
self.write_delete_batch(delete_batch).await?;
}
Ok(())
}
async fn write_delete_batch(&mut self, batch: RecordBatch) -> Result<(), SsTableError> {
if self.delete_writer.is_none() {
let options = OpenOptions::default()
.create(true)
.write(true)
.truncate(true);
let file = self.fs.open_options(&self.delete_path, options).await?;
let writer = AsyncArrowWriter::try_new(
AsyncWriter::new(file, self.executor.clone()),
batch.schema(),
Some(writer_properties(self.config.as_ref(), &batch.schema())),
)?;
self.delete_writer = Some(writer);
}
if let Some(writer) = self.delete_writer.as_mut() {
writer.write(&batch).await?;
self.delete_written = true;
Ok(())
} else {
Err(SsTableError::WriterClosed)
}
}
async fn finish(mut self) -> Result<(Path, Option<Path>, u64), SsTableError> {
if let Some(writer) = self.data_writer.take() {
writer.close().await?;
}
let data_path = self.data_path.clone();
if let Some(writer) = self.delete_writer.take() {
writer.close().await?;
}
let data_file = self
.fs
.open_options(&data_path, OpenOptions::default().read(true))
.await?;
let data_bytes = data_file.size().await?;
let delete_path = if self.delete_written {
Some(self.delete_path.clone())
} else {
None
};
Ok((data_path, delete_path, data_bytes))
}
}
impl<E> Drop for WriteContext<E>
where
E: Executor + Clone,
{
fn drop(&mut self) {
debug_assert!(
self.data_writer.is_none() && self.delete_writer.is_none(),
"WriteContext dropped without closing writers"
);
}
}
fn build_table_paths(
config: &SsTableConfig,
level: usize,
id: &SsTableId,
) -> Result<(Path, Path, Path), SsTableError> {
let dir_path = config.level_dir(level)?;
let data_file_name = format!("{:020}.parquet", id.raw());
let data_part = PathPart::parse(&data_file_name)
.map_err(|err| SsTableError::InvalidPath(err.to_string()))?;
let data_path = dir_path.child(data_part);
let delete_file_name = format!("{:020}.delete.parquet", id.raw());
let delete_part = PathPart::parse(&delete_file_name)
.map_err(|err| SsTableError::InvalidPath(err.to_string()))?;
let delete_path = dir_path.child(delete_part);
Ok((dir_path, data_path, delete_path))
}
pub(crate) fn manifest_storage_path(root: &Path, storage_path: &Path) -> Path {
let storage_raw = storage_path.as_ref();
let root_raw = root.as_ref().trim_end_matches('/');
if root_raw.is_empty() {
return storage_path.clone();
}
if let Some(stripped) = storage_raw.strip_prefix(root_raw) {
let stripped = stripped.trim_start_matches('/');
if !stripped.is_empty() {
return Path::from(stripped.to_string());
}
}
if let Some(stripped_root) = root_raw.strip_prefix('/')
&& let Some(stripped) = storage_raw.strip_prefix(stripped_root)
{
let stripped = stripped.trim_start_matches('/');
if !stripped.is_empty() {
return Path::from(stripped.to_string());
}
}
storage_path.clone()
}
pub(crate) fn storage_path_from_manifest(root: &Path, manifest_path: &Path) -> Path {
if root == &Path::default() {
return manifest_path.clone();
}
let manifest_raw = manifest_path.as_ref();
let root_raw = root.as_ref().trim_end_matches('/');
if manifest_raw.starts_with(root_raw) {
return manifest_path.clone();
}
if let Some(root_without_leading_slash) = root_raw.strip_prefix('/')
&& manifest_raw.starts_with(root_without_leading_slash)
{
return Path::from(format!("/{manifest_raw}"));
}
Path::from(format!("{root_raw}/{manifest_raw}"))
}
pub struct SsTableBuilder {
writer: ParquetTableWriter,
}
impl SsTableBuilder {
pub fn new(config: Arc<SsTableConfig>, descriptor: SsTableDescriptor) -> Self {
Self {
writer: ParquetTableWriter::new(config, descriptor),
}
}
pub fn set_wal_ids(&mut self, wal_ids: Option<Vec<FileId>>) {
self.writer.set_wal_ids(wal_ids);
}
pub(crate) fn add_immutable(&mut self, segment: &ImmutableSegment) -> Result<(), SsTableError> {
self.writer.stage_immutable(segment)
}
pub async fn finish<E>(self, executor: E) -> Result<SsTable, SsTableError>
where
E: Executor + Clone,
{
self.writer.finish(executor).await
}
}
#[derive(Debug)]
pub struct SsTableReader {
descriptor: SsTableDescriptor,
config: Arc<SsTableConfig>,
}
impl SsTableReader {
pub async fn open(
config: Arc<SsTableConfig>,
descriptor: SsTableDescriptor,
) -> Result<Self, SsTableError> {
Ok(Self { descriptor, config })
}
pub(crate) async fn into_stream<E>(
self,
_ts: Timestamp,
predicate: Option<&Expr>,
executor: E,
) -> Result<BoxStream<'static, Result<SsTableStreamBatch, SsTableError>>, SsTableError>
where
E: Executor + Clone + 'static,
{
let fs = Arc::clone(self.config.fs());
let data_path =
self.descriptor.data_path().cloned().ok_or_else(|| {
SsTableError::InvalidPath("missing data path on descriptor".into())
})?;
let data_stream = open_parquet_stream(
fs.clone(),
data_path,
None,
None,
None,
predicate,
executor.clone(),
)
.await?;
let delete_stream = if let Some(path) = self.descriptor.delete_path() {
let delete_path = path.clone();
let delete_projection = if let Some(extractor) = self.config.key_extractor() {
let mut required = BTreeSet::new();
for field in extractor.key_schema().fields() {
required.insert(field.name().to_string());
}
required.insert(MVCC_COMMIT_COL.to_string());
Some(
build_projection_mask_for_names(
Arc::clone(&fs),
&delete_path,
&required,
executor.clone(),
)
.await?,
)
} else {
None
};
Some(
open_parquet_stream(
fs,
delete_path,
delete_projection,
None,
None,
None,
executor,
)
.await?,
)
} else {
None
};
let schema = self.config.schema().clone();
let stream = stream::try_unfold(
(data_stream, delete_stream),
move |(mut data_stream, mut delete_stream)| {
let schema = schema.clone();
async move {
let data_batch = data_stream
.next()
.await
.transpose()
.map_err(SsTableError::Parquet)?;
let delete_batch = if let Some(ref mut del_stream) = delete_stream {
del_stream
.next()
.await
.transpose()
.map_err(SsTableError::Parquet)?
} else {
None
};
match (data_batch, delete_batch) {
(None, None) => Ok(None),
(Some(data), delete) => {
let next_state = (data_stream, delete_stream);
Ok(Some((SsTableStreamBatch { data, delete }, next_state)))
}
(None, Some(delete)) => {
let empty_data = RecordBatch::new_empty(schema.clone());
let next_state = (data_stream, delete_stream);
Ok(Some((
SsTableStreamBatch {
data: empty_data,
delete: Some(delete),
},
next_state,
)))
}
}
}
},
);
Ok(Box::pin(stream))
}
}
#[derive(Debug, Clone)]
pub(crate) struct PredicateSplit {
pub(crate) pushdown: Option<Expr>,
pub(crate) residual: Option<Expr>,
}
pub(crate) fn split_predicate_for_row_filter(
predicate: &Expr,
schema: &SchemaRef,
) -> PredicateSplit {
match predicate {
Expr::True => PredicateSplit {
pushdown: None,
residual: None,
},
Expr::False => PredicateSplit {
pushdown: Some(Expr::False),
residual: None,
},
Expr::Cmp { column, value, .. } => {
if scalar_matches_column(schema, column, value) {
PredicateSplit {
pushdown: Some(predicate.clone()),
residual: None,
}
} else {
PredicateSplit {
pushdown: None,
residual: Some(predicate.clone()),
}
}
}
Expr::Between {
column, low, high, ..
} => {
if scalar_matches_column(schema, column, low)
&& scalar_matches_column(schema, column, high)
{
PredicateSplit {
pushdown: Some(predicate.clone()),
residual: None,
}
} else {
PredicateSplit {
pushdown: None,
residual: Some(predicate.clone()),
}
}
}
Expr::InList { column, values } => {
if scalars_match_column(schema, column, values) {
PredicateSplit {
pushdown: Some(predicate.clone()),
residual: None,
}
} else {
PredicateSplit {
pushdown: None,
residual: Some(predicate.clone()),
}
}
}
Expr::StartsWith { column, .. } => {
if is_string_column(schema, column) {
PredicateSplit {
pushdown: Some(predicate.clone()),
residual: None,
}
} else {
PredicateSplit {
pushdown: None,
residual: Some(predicate.clone()),
}
}
}
Expr::IsNull { column, .. } => {
if column_type(schema, column).is_some() {
PredicateSplit {
pushdown: Some(predicate.clone()),
residual: None,
}
} else {
PredicateSplit {
pushdown: None,
residual: Some(predicate.clone()),
}
}
}
Expr::BloomFilterEq { .. } | Expr::BloomFilterInList { .. } => PredicateSplit {
pushdown: None,
residual: Some(predicate.clone()),
},
Expr::And(children) => {
let mut pushdown = Vec::new();
let mut residual = Vec::new();
for child in children {
let split = split_predicate_for_row_filter(child, schema);
if let Some(expr) = split.pushdown {
pushdown.push(expr);
}
if let Some(expr) = split.residual {
residual.push(expr);
}
}
PredicateSplit {
pushdown: combine_and(pushdown),
residual: combine_and(residual),
}
}
Expr::Or(children) => {
if children.is_empty() {
return PredicateSplit {
pushdown: Some(Expr::False),
residual: None,
};
}
let mut pushdown = Vec::new();
for child in children {
let split = split_predicate_for_row_filter(child, schema);
if split.residual.is_some() {
return PredicateSplit {
pushdown: None,
residual: Some(predicate.clone()),
};
}
match split.pushdown {
Some(expr) => pushdown.push(expr),
None => {
return PredicateSplit {
pushdown: None,
residual: None,
};
}
}
}
PredicateSplit {
pushdown: combine_or(pushdown),
residual: None,
}
}
Expr::Not(child) => {
let split = split_predicate_for_row_filter(child, schema);
if split.residual.is_some() {
return PredicateSplit {
pushdown: None,
residual: Some(predicate.clone()),
};
}
if split.pushdown.is_none() {
return PredicateSplit {
pushdown: Some(Expr::False),
residual: None,
};
}
let pushdown = split.pushdown.map(|expr| Expr::Not(Box::new(expr)));
PredicateSplit {
pushdown,
residual: None,
}
}
_ => PredicateSplit {
pushdown: None,
residual: Some(predicate.clone()),
},
}
}
fn combine_and(mut parts: Vec<Expr>) -> Option<Expr> {
match parts.len() {
0 => None,
1 => Some(parts.remove(0)),
_ => Some(Expr::And(parts)),
}
}
fn combine_or(mut parts: Vec<Expr>) -> Option<Expr> {
match parts.len() {
0 => None,
1 => Some(parts.remove(0)),
_ => Some(Expr::Or(parts)),
}
}
fn row_filter_expr(predicate: &Expr, schema: &SchemaRef) -> Result<Option<Expr>, SsTableError> {
Ok(split_predicate_for_row_filter(predicate, schema).pushdown)
}
async fn build_projection_mask_for_names<E>(
fs: Arc<dyn DynFs>,
path: &Path,
required: &BTreeSet<String>,
executor: E,
) -> Result<ProjectionMask, SsTableError>
where
E: Executor + Clone + 'static,
{
let file = fs.open(path).await.map_err(SsTableError::Fs)?;
let size = file.size().await.map_err(SsTableError::Fs)?;
let mut reader = AsyncReader::new(file, size, UnpinExec(executor))
.await
.map_err(SsTableError::Fs)?;
let metadata = ParquetMetaDataReader::new()
.with_page_index_policy(PageIndexPolicy::Optional)
.load_and_finish(&mut reader, size)
.await
.map_err(SsTableError::Parquet)?;
let options = ArrowReaderOptions::new().with_page_index(true);
let arrow_metadata =
ArrowReaderMetadata::try_new(Arc::new(metadata), options).map_err(SsTableError::Parquet)?;
let file_schema = arrow_metadata.schema();
let parquet_schema = arrow_metadata.parquet_schema();
let mut remaining = required.clone();
if file_schema
.fields()
.iter()
.any(|field| field.name() == MVCC_SEQUENCE_COL)
{
remaining.insert(MVCC_SEQUENCE_COL.to_string());
}
let mut root_indices = Vec::new();
for (idx, field) in file_schema.fields().iter().enumerate() {
if remaining.remove(field.name()) {
root_indices.push(idx);
}
}
if let Some(missing) = remaining.iter().next() {
return Err(SsTableError::KeyExtract(KeyExtractError::NoSuchField {
name: missing.to_string(),
}));
}
Ok(ProjectionMask::roots(parquet_schema, root_indices))
}
fn column_type<'a>(schema: &'a SchemaRef, column: &str) -> Option<&'a DataType> {
schema
.fields()
.iter()
.find(|field| field.name() == column)
.map(|field| field.data_type())
}
fn scalar_matches_type(value: &ScalarValue, data_type: &DataType) -> bool {
if matches!(value, ScalarValue::Null) {
return false;
}
value.data_type() == *data_type
}
fn scalar_matches_column(schema: &SchemaRef, column: &str, value: &ScalarValue) -> bool {
let Some(data_type) = column_type(schema, column) else {
return false;
};
scalar_matches_type(value, data_type)
}
fn scalars_match_column(schema: &SchemaRef, column: &str, values: &[ScalarValue]) -> bool {
let Some(data_type) = column_type(schema, column) else {
return false;
};
values
.iter()
.all(|value| scalar_matches_type(value, data_type))
}
fn is_string_column(schema: &SchemaRef, column: &str) -> bool {
column_type(schema, column)
.map(is_string_type)
.unwrap_or(false)
}
fn is_string_type(data_type: &DataType) -> bool {
matches!(
data_type,
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
)
}
pub(crate) fn validate_page_indexes(
path: &Path,
metadata: &ParquetMetaData,
) -> Result<(), SsTableError> {
let path = path.to_string();
let row_groups = metadata.num_row_groups();
if row_groups == 0 {
if let Some(column_index) = metadata.column_index()
&& !column_index.is_empty()
{
return Err(SsTableError::MissingPageIndex {
path: path.clone(),
reason: format!(
"column index row group count mismatch: expected {row_groups}, got {}",
column_index.len()
),
});
}
if let Some(offset_index) = metadata.offset_index()
&& !offset_index.is_empty()
{
return Err(SsTableError::MissingPageIndex {
path: path.clone(),
reason: format!(
"offset index row group count mismatch: expected {row_groups}, got {}",
offset_index.len()
),
});
}
return Ok(());
}
let column_index = metadata
.column_index()
.ok_or_else(|| SsTableError::MissingPageIndex {
path: path.clone(),
reason: "column index missing".to_string(),
})?;
let offset_index = metadata
.offset_index()
.ok_or_else(|| SsTableError::MissingPageIndex {
path: path.clone(),
reason: "offset index missing".to_string(),
})?;
if column_index.len() != row_groups {
return Err(SsTableError::MissingPageIndex {
path: path.clone(),
reason: format!(
"column index row group count mismatch: expected {row_groups}, got {}",
column_index.len()
),
});
}
if offset_index.len() != row_groups {
return Err(SsTableError::MissingPageIndex {
path: path.clone(),
reason: format!(
"offset index row group count mismatch: expected {row_groups}, got {}",
offset_index.len()
),
});
}
for (row_group_idx, row_group) in metadata.row_groups().iter().enumerate() {
let expected_columns = row_group.num_columns();
let column_count = column_index[row_group_idx].len();
if column_count != expected_columns {
return Err(SsTableError::MissingPageIndex {
path: path.clone(),
reason: format!(
"column index column count mismatch at row group {row_group_idx}: expected \
{expected_columns}, got {column_count}",
),
});
}
let offset_count = offset_index[row_group_idx].len();
if offset_count != expected_columns {
return Err(SsTableError::MissingPageIndex {
path: path.clone(),
reason: format!(
"offset index column count mismatch at row group {row_group_idx}: expected \
{expected_columns}, got {offset_count}",
),
});
}
}
Ok(())
}
#[derive(Default)]
pub(crate) struct ParquetStreamOptions<'a> {
pub projection: Option<ProjectionMask>,
pub row_groups: Option<Vec<usize>>,
pub row_selection: Option<RowSelection>,
pub row_filter_predicate: Option<&'a Expr>,
}
pub(crate) async fn open_parquet_stream_with_metadata<E>(
fs: Arc<dyn DynFs>,
path: Path,
metadata: Arc<ParquetMetaData>,
options: ParquetStreamOptions<'_>,
executor: E,
) -> Result<ParquetStream<E>, SsTableError>
where
E: Executor + Clone + 'static,
{
let ParquetStreamOptions {
projection,
row_groups,
row_selection,
row_filter_predicate,
} = options;
let file = fs.open(&path).await?;
let size = file.size().await.map_err(SsTableError::Fs)?;
let reader = AsyncReader::new(file, size, UnpinExec(executor))
.await
.map_err(SsTableError::Fs)?;
let options = ArrowReaderOptions::new().with_page_index(true);
let arrow_metadata =
ArrowReaderMetadata::try_new(metadata, options).map_err(SsTableError::Parquet)?;
let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata(reader, arrow_metadata);
let schema = builder.schema().clone();
if let Some(predicate) = match row_filter_predicate {
Some(pred) => row_filter_expr(pred, &schema)?,
None => None,
} {
let filter = AisleRowFilter::new(predicate, builder.parquet_schema());
let row_filter = ParquetRowFilter::new(vec![Box::new(filter)]);
builder = builder.with_row_filter(row_filter);
}
let mask = projection.unwrap_or_else(ProjectionMask::all);
builder = builder.with_projection(mask);
if let Some(row_groups) = row_groups {
builder = builder.with_row_groups(row_groups);
}
if let Some(selection) = row_selection {
builder = builder.with_row_selection(selection);
}
let stream = builder.build().map_err(SsTableError::Parquet)?;
Ok(stream)
}
pub(crate) async fn open_parquet_stream<E>(
fs: Arc<dyn DynFs>,
path: Path,
projection: Option<ProjectionMask>,
row_groups: Option<Vec<usize>>,
row_selection: Option<RowSelection>,
row_filter_predicate: Option<&Expr>,
executor: E,
) -> Result<ParquetStream<E>, SsTableError>
where
E: Executor + Clone + 'static,
{
let (stream, _schema) = open_parquet_stream_with_schema(
fs,
path,
projection,
row_groups,
row_selection,
row_filter_predicate,
executor,
)
.await?;
Ok(stream)
}
pub(crate) async fn open_parquet_stream_with_schema<E>(
fs: Arc<dyn DynFs>,
path: Path,
projection: Option<ProjectionMask>,
row_groups: Option<Vec<usize>>,
row_selection: Option<RowSelection>,
row_filter_predicate: Option<&Expr>,
executor: E,
) -> Result<(ParquetStream<E>, SchemaRef), SsTableError>
where
E: Executor + Clone + 'static,
{
let file = fs.open(&path).await?;
let size = file.size().await.map_err(SsTableError::Fs)?;
let mut reader = AsyncReader::new(file, size, UnpinExec(executor))
.await
.map_err(SsTableError::Fs)?;
let metadata = ParquetMetaDataReader::new()
.with_page_index_policy(PageIndexPolicy::Optional)
.load_and_finish(&mut reader, size)
.await
.map_err(SsTableError::Parquet)?;
validate_page_indexes(&path, &metadata)?;
let options = ArrowReaderOptions::new().with_page_index(true);
let arrow_metadata =
ArrowReaderMetadata::try_new(Arc::new(metadata), options).map_err(SsTableError::Parquet)?;
let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata(reader, arrow_metadata);
let schema = builder.schema().clone();
if let Some(predicate) = match row_filter_predicate {
Some(pred) => row_filter_expr(pred, &schema)?,
None => None,
} {
let filter = AisleRowFilter::new(predicate, builder.parquet_schema());
let row_filter = ParquetRowFilter::new(vec![Box::new(filter)]);
builder = builder.with_row_filter(row_filter);
}
let mask = projection.unwrap_or_else(ProjectionMask::all);
builder = builder.with_projection(mask);
if let Some(row_groups) = row_groups {
builder = builder.with_row_groups(row_groups);
}
if let Some(selection) = row_selection {
builder = builder.with_row_selection(selection);
}
let stream = builder.build().map_err(SsTableError::Parquet)?;
Ok((stream, schema))
}
pub(crate) fn take_record_batch(
batch: &RecordBatch,
indices: &[u32],
) -> Result<RecordBatch, SsTableError> {
let idx_array = UInt32Array::from(indices.to_vec());
let mut columns = Vec::with_capacity(batch.num_columns());
for col in batch.columns() {
let taken = arrow_take(col.as_ref(), &idx_array, None)
.map_err(|err| SsTableError::Parquet(ParquetError::ArrowError(err.to_string())))?;
columns.push(taken);
}
RecordBatch::try_new(batch.schema().clone(), columns)
.map_err(|err| SsTableError::Parquet(ParquetError::ArrowError(err.to_string())))
}
#[cfg(all(test, feature = "tokio"))]
mod tests {
use std::{
collections::BTreeMap,
sync::{Arc, atomic::AtomicU64},
};
use arrow_schema::{DataType, Field, Schema};
use fusio::{disk::LocalFs, dynamic::DynFs, executor::NoopExecutor, path::Path};
use typed_arrow_dyn::{DynCell, DynRow};
use super::*;
use crate::{
extractor::{KeyProjection, projection_for_field},
id::FileIdGenerator,
inmem::immutable::memtable::{ImmutableIndexEntry, ImmutableMemTable, bundle_mvcc_sidecar},
key::KeyTsViewRaw,
mvcc::Timestamp,
ondisk::merge::SsTableMergeSource,
test::build_batch,
};
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn merge_source_yields_batches_in_order() {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let batch1 = build_batch(
Arc::clone(&schema),
vec![DynRow(vec![
Some(DynCell::Str("a".into())),
Some(DynCell::I32(1)),
])],
)
.expect("batch1");
let batch2 = build_batch(
Arc::clone(&schema),
vec![DynRow(vec![
Some(DynCell::Str("b".into())),
Some(DynCell::I32(2)),
])],
)
.expect("batch2");
let mvcc1 = MvccColumns::new(vec![Timestamp::MIN], vec![false]);
let mvcc2 = MvccColumns::new(vec![Timestamp::MIN], vec![false]);
let batch1 = append_commit_column(batch1, &mvcc1).expect("commit");
let batch2 = append_commit_column(batch2, &mvcc2).expect("commit");
let mut source = SsTableMergeSource::with_batches(vec![
SsTableStreamBatch {
data: batch1.clone(),
delete: None,
},
SsTableStreamBatch {
data: batch2.clone(),
delete: None,
},
]);
let out1 = source.next().await.expect("first").expect("batch");
let out2 = source.next().await.expect("second").expect("batch");
let out3 = source.next().await.expect("third");
assert_eq!(out1.data.num_rows(), 1);
assert_eq!(out2.data.num_rows(), 1);
assert!(out3.is_none());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn merger_combines_stats_and_wal_ids() {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let fs: Arc<dyn DynFs> = Arc::new(LocalFs {});
let tempdir = tempfile::tempdir().expect("tempdir");
let root = Path::from(tempdir.path().to_string_lossy().to_string());
let extractor = projection_for_field(Arc::clone(&schema), 0).expect("extractor");
let extractor: Arc<dyn KeyProjection> = extractor.into();
let config = Arc::new(
SsTableConfig::new(Arc::clone(&schema), fs, root).with_key_extractor(extractor),
);
let seg_a = sample_segment(
vec![("a".to_string(), 1), ("b".to_string(), 2)],
vec![1, 2],
vec![false, false],
);
let seg_b = sample_segment(
vec![("c".to_string(), 3), ("d".to_string(), 0)],
vec![3, 4],
vec![false, true],
);
let wal_a = vec![FileIdGenerator::default().generate()];
let wal_b = vec![FileIdGenerator::default().generate(), wal_a[0]];
let mut builder_a = SsTableBuilder::new(
Arc::clone(&config),
SsTableDescriptor::new(SsTableId::new(1), 0),
);
builder_a.set_wal_ids(Some(wal_a.clone()));
builder_a.add_immutable(&seg_a).expect("stage a");
let input_a = builder_a
.finish(NoopExecutor)
.await
.expect("sst a")
.descriptor()
.clone();
let mut builder_b = SsTableBuilder::new(
Arc::clone(&config),
SsTableDescriptor::new(SsTableId::new(2), 0),
);
builder_b.set_wal_ids(Some(wal_b.clone()));
builder_b.add_immutable(&seg_b).expect("stage b");
let input_b = builder_b
.finish(NoopExecutor)
.await
.expect("sst b")
.descriptor()
.clone();
let target = SsTableDescriptor::new(SsTableId::new(9), 1);
let merger = SsTableMerger::new(config, vec![input_a, input_b], target)
.with_output_id_allocator(Arc::new(AtomicU64::new(10)));
let merged = merger.execute(NoopExecutor).await.expect("merge result");
assert_eq!(merged.len(), 1);
let descriptor = merged[0].descriptor();
let merged_stats = descriptor.stats().expect("merged stats");
assert_eq!(descriptor.id().raw(), 9);
assert_eq!(descriptor.level(), 1);
assert_eq!(merged_stats.rows, 3);
assert_eq!(merged_stats.tombstones, 1);
assert_eq!(
merged_stats
.min_key
.as_ref()
.map(|k| k.as_utf8().expect("min key utf8")),
Some("a")
);
assert_eq!(
merged_stats
.max_key
.as_ref()
.map(|k| k.as_utf8().expect("max key utf8")),
Some("d")
);
assert_eq!(merged_stats.min_commit_ts, Some(Timestamp::new(1)));
assert_eq!(merged_stats.max_commit_ts, Some(Timestamp::new(4)));
let wal_ids = descriptor.wal_ids().expect("wal ids");
assert_eq!(wal_ids.len(), 2); assert!(wal_ids.contains(&wal_a[0]));
assert!(wal_ids.contains(&wal_b[0]));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn merger_applies_latest_wins_and_deletes() {
use arrow_array::{Int32Array, StringArray};
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let fs: Arc<dyn DynFs> = Arc::new(LocalFs {});
let tempdir = tempfile::tempdir().expect("tempdir");
let root = Path::from(tempdir.path().to_string_lossy().to_string());
let extractor = projection_for_field(Arc::clone(&schema), 0).expect("extractor");
let extractor: Arc<dyn KeyProjection> = extractor.into();
let config = Arc::new(
SsTableConfig::new(Arc::clone(&schema), fs, root).with_key_extractor(extractor),
);
let seg_old = sample_segment(
vec![("k1".to_string(), 1), ("k2".to_string(), 2)],
vec![10, 20],
vec![false, false],
);
let seg_new = sample_segment(
vec![("k1".to_string(), 0), ("k2".to_string(), 3)],
vec![30, 40],
vec![true, false],
);
let mut builder_old = SsTableBuilder::new(
Arc::clone(&config),
SsTableDescriptor::new(SsTableId::new(1), 0),
);
builder_old.add_immutable(&seg_old).expect("stage old");
let input_old = builder_old
.finish(NoopExecutor)
.await
.expect("sst old")
.descriptor()
.clone();
let mut builder_new = SsTableBuilder::new(
Arc::clone(&config),
SsTableDescriptor::new(SsTableId::new(2), 0),
);
builder_new.add_immutable(&seg_new).expect("stage new");
let input_new = builder_new
.finish(NoopExecutor)
.await
.expect("sst new")
.descriptor()
.clone();
let target = SsTableDescriptor::new(SsTableId::new(9), 1);
let merger = SsTableMerger::new(config.clone(), vec![input_old, input_new], target)
.with_output_id_allocator(Arc::new(AtomicU64::new(10)));
let merged = merger.execute(NoopExecutor).await.expect("merge result");
assert_eq!(merged.len(), 1);
let descriptor = merged[0].descriptor();
let stats = descriptor.stats().expect("stats");
assert_eq!(stats.rows, 1);
assert_eq!(stats.tombstones, 1);
assert_eq!(
stats.min_key.as_ref().expect("min key").as_utf8(),
Some("k1")
);
assert_eq!(
stats.max_key.as_ref().expect("max key").as_utf8(),
Some("k2")
);
let reader = SsTableReader::open(config, descriptor.clone())
.await
.expect("reader");
let mut stream = reader
.into_stream(Timestamp::MAX, None, NoopExecutor)
.await
.expect("stream");
let mut data_keys = Vec::new();
let mut data_vals = Vec::new();
let mut delete_keys = Vec::new();
while let Some(batch) = stream.next().await {
let batch = batch.expect("batch ok");
if batch.data.num_rows() > 0 {
let ids = batch
.data
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.expect("string ids");
let vals = batch
.data
.column(1)
.as_any()
.downcast_ref::<Int32Array>()
.expect("int vals");
for i in 0..batch.data.num_rows() {
data_keys.push(ids.value(i).to_string());
data_vals.push(vals.value(i));
}
}
if let Some(delete) = batch.delete.as_ref()
&& delete.num_rows() > 0
{
let ids = delete
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.expect("string delete ids");
for i in 0..delete.num_rows() {
delete_keys.push(ids.value(i).to_string());
}
}
}
assert_eq!(data_keys, vec!["k2"]);
assert_eq!(data_vals, vec![3]);
assert_eq!(delete_keys, vec!["k1"]);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn merger_splits_outputs_by_row_cap() {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let fs: Arc<dyn DynFs> = Arc::new(LocalFs {});
let tempdir = tempfile::tempdir().expect("tempdir");
let root = Path::from(tempdir.path().to_string_lossy().to_string());
let extractor = projection_for_field(Arc::clone(&schema), 0).expect("extractor");
let extractor: Arc<dyn KeyProjection> = extractor.into();
let config = Arc::new(
SsTableConfig::new(Arc::clone(&schema), fs, root).with_key_extractor(extractor),
);
let seg_a = sample_segment(
vec![("a".to_string(), 1), ("b".to_string(), 2)],
vec![1, 2],
vec![false, false],
);
let seg_b = sample_segment(vec![("c".to_string(), 3)], vec![3], vec![false]);
let mut builder_a = SsTableBuilder::new(
Arc::clone(&config),
SsTableDescriptor::new(SsTableId::new(1), 0),
);
builder_a.add_immutable(&seg_a).expect("stage a");
let input_a = builder_a
.finish(NoopExecutor)
.await
.expect("sst a")
.descriptor()
.clone();
let mut builder_b = SsTableBuilder::new(
Arc::clone(&config),
SsTableDescriptor::new(SsTableId::new(2), 0),
);
builder_b.add_immutable(&seg_b).expect("stage b");
let input_b = builder_b
.finish(NoopExecutor)
.await
.expect("sst b")
.descriptor()
.clone();
let target = SsTableDescriptor::new(SsTableId::new(9), 1);
let outputs = SsTableMerger::new(config, vec![input_a, input_b], target)
.with_output_id_allocator(Arc::new(AtomicU64::new(10)))
.with_output_caps(Some(1), None)
.with_chunk_rows(1)
.execute(NoopExecutor)
.await
.expect("merge result");
assert_eq!(outputs.len(), 3);
for table in outputs {
let stats = table.descriptor().stats().expect("stats");
assert!(stats.rows <= 1);
}
}
fn test_config(schema: SchemaRef) -> Arc<SsTableConfig> {
let fs: Arc<dyn DynFs> = Arc::new(LocalFs {});
Arc::new(SsTableConfig::new(
schema,
fs,
Path::from("/tmp/tonbo-test"),
))
}
fn sample_segment(
rows: Vec<(String, i32)>,
commits: Vec<u64>,
tombstones: Vec<bool>,
) -> ImmutableMemTable {
assert_eq!(rows.len(), commits.len());
assert_eq!(rows.len(), tombstones.len());
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, true),
Field::new("v", DataType::Int32, true),
]));
let mut data_rows = Vec::new();
let mut data_commits = Vec::new();
let mut delete_rows = Vec::new();
let mut delete_commits = Vec::new();
for ((key, value), (commit, tombstone)) in
rows.into_iter().zip(commits.into_iter().zip(tombstones))
{
let ts = Timestamp::new(commit);
if tombstone {
delete_rows.push(DynRow(vec![Some(DynCell::Str(key))]));
delete_commits.push(ts);
} else {
data_rows.push(DynRow(vec![
Some(DynCell::Str(key)),
Some(DynCell::I32(value)),
]));
data_commits.push(ts);
}
}
let batch = if data_rows.is_empty() {
RecordBatch::new_empty(schema.clone())
} else {
build_batch(schema.clone(), data_rows).expect("record batch")
};
let tombstone_flags = vec![false; batch.num_rows()];
let (batch, mvcc) =
bundle_mvcc_sidecar(batch, data_commits.clone(), tombstone_flags).expect("mvcc");
let delete_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, true)]));
let delete_batch = if delete_rows.is_empty() {
RecordBatch::new_empty(delete_schema.clone())
} else {
build_batch(delete_schema.clone(), delete_rows).expect("delete batch")
};
let delete_sidecar = DeleteSidecar::new(delete_batch, delete_commits);
let extractor =
crate::extractor::projection_for_field(schema.clone(), 0).expect("extractor");
let mut composite = BTreeMap::new();
let row_indices: Vec<usize> = (0..batch.num_rows()).collect();
let key_rows = extractor
.project_view(&batch, &row_indices)
.expect("project view");
for (row, key_row) in key_rows.into_iter().enumerate() {
composite.insert(
KeyTsViewRaw::new(key_row, mvcc.commit_ts[row]),
ImmutableIndexEntry::Row(row as u32),
);
}
if !delete_sidecar.is_empty() {
let delete_schema = delete_sidecar.key_batch().schema().clone();
let indices: Vec<usize> = (0..delete_schema.fields().len()).collect();
let projection =
crate::extractor::projection_for_columns(delete_schema.clone(), indices)
.expect("identity projection");
let delete_row_indices: Vec<usize> =
(0..delete_sidecar.key_batch().num_rows()).collect();
let delete_key_rows = projection
.project_view(delete_sidecar.key_batch(), &delete_row_indices)
.expect("delete keys");
for (row, key_row) in delete_key_rows.into_iter().enumerate() {
let ts = delete_sidecar.commit_ts(row);
composite.insert(KeyTsViewRaw::new(key_row, ts), ImmutableIndexEntry::Delete);
}
}
ImmutableMemTable::new(batch, composite, mvcc, delete_sidecar)
}
#[test]
fn parquet_writer_accumulates_segment_stats() {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, true),
Field::new("v", DataType::Int32, true),
]));
let descriptor = SsTableDescriptor::new(SsTableId::new(7), 0);
let mut writer: ParquetTableWriter =
ParquetTableWriter::new(test_config(schema.clone()), descriptor);
let segment1 = sample_segment(
vec![("a".into(), 1), ("b".into(), 2), ("c".into(), 3)],
vec![30, 20, 10],
vec![false, true, false],
);
writer.stage_immutable(&segment1).expect("stage first");
let segment2 = sample_segment(
vec![("d".into(), 4), ("e".into(), 5)],
vec![25, 35],
vec![false, false],
);
writer.stage_immutable(&segment2).expect("stage second");
let plan = writer.plan();
assert_eq!(plan.segments, 2);
assert_eq!(plan.rows, 4);
assert_eq!(plan.tombstones, 1);
assert_eq!(plan.min_commit_ts, Some(Timestamp::new(10)));
assert_eq!(plan.max_commit_ts, Some(Timestamp::new(35)));
let min_key = plan
.min_key
.as_ref()
.and_then(|k| k.as_utf8())
.expect("min key");
let max_key = plan
.max_key
.as_ref()
.and_then(|k| k.as_utf8())
.expect("max key");
assert_eq!(min_key, "a");
assert_eq!(max_key, "e");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn finish_orders_rows_across_multiple_immutables() {
use arrow_array::StringArray;
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, true),
Field::new("v", DataType::Int32, true),
]));
let fs: Arc<dyn DynFs> = Arc::new(LocalFs {});
let tempdir = tempfile::tempdir().expect("tempdir");
let root = Path::from(tempdir.path().to_string_lossy().to_string());
let extractor = projection_for_field(Arc::clone(&schema), 0).expect("extractor");
let extractor: Arc<dyn KeyProjection> = extractor.into();
let config = Arc::new(
SsTableConfig::new(Arc::clone(&schema), fs, root).with_key_extractor(extractor),
);
let mut writer = ParquetTableWriter::new(
Arc::clone(&config),
SsTableDescriptor::new(SsTableId::new(77), 0),
);
let segment_old = sample_segment(
vec![("k2".to_string(), 20), ("k3".to_string(), 30)],
vec![20, 10],
vec![false, false],
);
let segment_new = sample_segment(
vec![("k1".to_string(), 40), ("k2".to_string(), 50)],
vec![30, 25],
vec![false, false],
);
writer.stage_immutable(&segment_old).expect("stage old");
writer.stage_immutable(&segment_new).expect("stage new");
let table = writer.finish(NoopExecutor).await.expect("finish");
let data_path = table.descriptor().data_path().expect("data path").clone();
let mut stream = open_parquet_stream(
Arc::clone(config.fs()),
data_path,
None,
None,
None,
None,
NoopExecutor,
)
.await
.expect("open data stream");
let mut got = Vec::new();
while let Some(batch) = stream.next().await {
let batch = batch.expect("batch");
let ids = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.expect("id column");
let commits = batch
.column_by_name(MVCC_COMMIT_COL)
.and_then(|arr| arr.as_any().downcast_ref::<UInt64Array>())
.expect("commit column");
for row in 0..batch.num_rows() {
got.push((ids.value(row).to_string(), commits.value(row)));
}
}
assert_eq!(
got,
vec![
("k1".to_string(), 30),
("k2".to_string(), 25),
("k2".to_string(), 20),
("k3".to_string(), 10),
],
"multi-immutable flush must persist rows in global key/timestamp order"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn finish_without_segments_errors() {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, true),
Field::new("v", DataType::Int32, true),
]));
let descriptor = SsTableDescriptor::new(SsTableId::new(1), 0);
let writer: ParquetTableWriter = ParquetTableWriter::new(test_config(schema), descriptor);
let result = writer.finish(NoopExecutor).await;
assert!(matches!(result, Err(SsTableError::NoImmutableSegments)));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn finish_threads_wal_ids_into_descriptor() {
use std::str::FromStr;
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, true),
Field::new("v", DataType::Int32, true),
]));
let descriptor = SsTableDescriptor::new(SsTableId::new(11), 0);
let mut writer: ParquetTableWriter =
ParquetTableWriter::new(test_config(schema.clone()), descriptor);
let wal_ids = vec![FileId::from_str("01HV6Z2Z8Q4W5X6Y7Z8A9BCDEF").expect("valid ulid")];
writer.set_wal_ids(Some(wal_ids.clone()));
let segment = sample_segment(vec![("k".into(), 1)], vec![42], vec![false]);
writer.stage_immutable(&segment).expect("stage segment");
let table = writer.finish(NoopExecutor).await.expect("finish");
let recorded = table.descriptor().wal_ids().expect("descriptor wal ids");
assert_eq!(recorded, wal_ids.as_slice());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn finish_records_data_and_sidecar_paths() {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, true),
Field::new("v", DataType::Int32, true),
]));
let descriptor = SsTableDescriptor::new(SsTableId::new(21), 0);
let mut writer: ParquetTableWriter =
ParquetTableWriter::new(test_config(schema.clone()), descriptor);
let segment = sample_segment(vec![("m".into(), 9)], vec![123], vec![false]);
writer.stage_immutable(&segment).expect("stage segment");
let table = writer.finish(NoopExecutor).await.expect("finish table");
let descriptor = table.descriptor();
let data_path = descriptor.data_path().expect("data path present");
assert!(data_path.as_ref().ends_with(".parquet"));
assert!(descriptor.delete_path().is_none());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn finish_records_delete_sidecar_when_tombstones_exist() {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, true),
Field::new("v", DataType::Int32, true),
]));
let descriptor = SsTableDescriptor::new(SsTableId::new(22), 0);
let mut writer: ParquetTableWriter =
ParquetTableWriter::new(test_config(schema.clone()), descriptor);
let segment = sample_segment(
vec![("z".into(), 1), ("tomb".into(), 0)],
vec![100, 200],
vec![false, true],
);
writer.stage_immutable(&segment).expect("stage segment");
let table = writer.finish(NoopExecutor).await.expect("finish table");
let descriptor = table.descriptor();
let delete_path = descriptor.delete_path().expect("delete sidecar present");
assert!(delete_path.as_ref().ends_with(".delete.parquet"));
}
#[test]
fn manifest_storage_path_strips_sst_root_prefix() {
let root = Path::from("/tmp/tonbo/sst");
let storage = Path::from("/tmp/tonbo/sst/L1/00000000000000000007.parquet");
assert_eq!(
manifest_storage_path(&root, &storage).as_ref(),
"L1/00000000000000000007.parquet"
);
let legacy_storage = Path::from("tmp/tonbo/sst/L1/00000000000000000007.parquet");
assert_eq!(
manifest_storage_path(&root, &legacy_storage).as_ref(),
"L1/00000000000000000007.parquet"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn open_parquet_missing_page_indexes_errors() {
use arrow_array::StringArray;
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)]));
let values: ArrayRef = Arc::new(StringArray::from(vec!["a", "b"]));
let batch = RecordBatch::try_new(Arc::clone(&schema), vec![values]).expect("batch");
let fs: Arc<dyn DynFs> = Arc::new(LocalFs {});
let tempdir = tempfile::tempdir().expect("tempdir");
let path = Path::from(
tempdir
.path()
.join("no-page-index.parquet")
.to_string_lossy()
.to_string(),
);
let expected_path = path.to_string();
let file = fs
.open_options(
&path,
OpenOptions::default()
.create(true)
.write(true)
.truncate(true),
)
.await
.expect("open file");
let properties = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::None)
.set_offset_index_disabled(true)
.build();
let mut writer = AsyncArrowWriter::try_new(
AsyncWriter::new(file, NoopExecutor),
Arc::clone(&schema),
Some(properties),
)
.expect("writer");
writer.write(&batch).await.expect("write");
writer.close().await.expect("close");
let result = open_parquet_stream_with_schema(
Arc::clone(&fs),
path,
None,
None,
None,
None,
NoopExecutor,
)
.await;
let err = result.expect_err("expected missing page index error");
match err {
SsTableError::MissingPageIndex { path, reason } => {
assert_eq!(path, expected_path);
assert!(
reason.contains("column index"),
"unexpected reason: {reason}"
);
}
other => panic!("unexpected error: {other:?}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn open_parquet_empty_without_page_indexes_is_allowed() {
use arrow_array::StringArray;
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)]));
let values: ArrayRef = Arc::new(StringArray::from(Vec::<&str>::new()));
let batch = RecordBatch::try_new(Arc::clone(&schema), vec![values]).expect("batch");
let fs: Arc<dyn DynFs> = Arc::new(LocalFs {});
let tempdir = tempfile::tempdir().expect("tempdir");
let path = Path::from(
tempdir
.path()
.join("empty-no-page-index.parquet")
.to_string_lossy()
.to_string(),
);
let file = fs
.open_options(
&path,
OpenOptions::default()
.create(true)
.write(true)
.truncate(true),
)
.await
.expect("open file");
let properties = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::None)
.set_offset_index_disabled(true)
.build();
let mut writer = AsyncArrowWriter::try_new(
AsyncWriter::new(file, NoopExecutor),
Arc::clone(&schema),
Some(properties),
)
.expect("writer");
writer.write(&batch).await.expect("write");
writer.close().await.expect("close");
let (mut stream, stream_schema) = open_parquet_stream_with_schema(
Arc::clone(&fs),
path,
None,
None,
None,
None,
NoopExecutor,
)
.await
.expect("empty parquet without page indexes should be readable");
assert_eq!(stream_schema.fields().len(), 1);
assert!(stream.next().await.is_none());
}
}