use std::sync::Arc;
use crate::types::{
key::InternalKey,
level::{ColumnStats, FileFormat, Level, ParquetFileMeta},
schema::{ColumnType, TableSchema},
value::Row,
MeruError, Result,
};
use bytes::Bytes;
use parquet::{
arrow::ArrowWriter,
basic::{Compression, Encoding},
file::{
properties::{WriterProperties, WriterPropertiesBuilder},
reader::FileReader,
serialized_reader::{ReadOptionsBuilder, SerializedFileReader},
},
schema::types::ColumnPath,
};
use crate::parquet::{
bloom::FastLocalBloom,
codec,
kv_index::{self, PageLocation},
};
const ASSUMED_ROW_BYTES: usize = 256;
pub fn target_row_group_bytes(level: Level) -> usize {
match level.0 {
0 => 4 * 1024 * 1024, 1 => 32 * 1024 * 1024, _ => 128 * 1024 * 1024, }
}
pub fn target_data_page_bytes(level: Level) -> usize {
match level.0 {
0 => 8 * 1024, 1 => 32 * 1024, _ => 128 * 1024, }
}
fn build_column_encoding_props(
mut builder: WriterPropertiesBuilder,
schema: &TableSchema,
format: FileFormat,
) -> WriterPropertiesBuilder {
let ikey_col = ColumnPath::new(vec!["_merutable_ikey".to_string()]);
builder = builder
.set_column_encoding(ikey_col.clone(), Encoding::PLAIN)
.set_column_dictionary_enabled(ikey_col, false);
let seq_col = ColumnPath::new(vec![crate::parquet::codec::SEQ_COLUMN_NAME.to_string()]);
builder = builder
.set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED)
.set_column_dictionary_enabled(seq_col, false);
let op_col = ColumnPath::new(vec![crate::parquet::codec::OP_COLUMN_NAME.to_string()]);
builder = builder.set_column_dictionary_enabled(op_col, true);
if format.has_value_blob() {
let value_col = ColumnPath::new(vec!["_merutable_value".to_string()]);
builder = builder
.set_column_encoding(value_col.clone(), Encoding::PLAIN)
.set_column_dictionary_enabled(value_col, false);
for col_def in &schema.columns {
let col_path = ColumnPath::new(vec![col_def.name.clone()]);
builder = builder
.set_column_encoding(col_path.clone(), Encoding::PLAIN)
.set_column_dictionary_enabled(col_path, false);
}
} else {
for col_def in &schema.columns {
let col_path = ColumnPath::new(vec![col_def.name.clone()]);
match col_def.col_type {
ColumnType::Int32 | ColumnType::Int64 => {
builder = builder
.set_column_encoding(col_path.clone(), Encoding::DELTA_BINARY_PACKED)
.set_column_dictionary_enabled(col_path, false);
}
ColumnType::Float | ColumnType::Double => {
builder = builder
.set_column_encoding(col_path.clone(), Encoding::BYTE_STREAM_SPLIT)
.set_column_dictionary_enabled(col_path, false);
}
ColumnType::ByteArray => {
builder = builder.set_column_dictionary_enabled(col_path, true);
}
ColumnType::Boolean => {
builder = builder.set_column_encoding(col_path, Encoding::RLE);
}
ColumnType::FixedLenByteArray(_) => {
builder = builder
.set_column_encoding(col_path.clone(), Encoding::PLAIN)
.set_column_dictionary_enabled(col_path, false);
}
}
}
}
builder
}
pub fn target_rows_per_row_group(level: Level) -> usize {
(target_row_group_bytes(level) / ASSUMED_ROW_BYTES).max(1024)
}
pub struct WriterStats {
pub num_rows: u64,
pub file_size: u64,
pub key_min: Vec<u8>,
pub key_max: Vec<u8>,
pub seq_min: u64,
pub seq_max: u64,
}
pub fn write_sorted_rows(
rows: Vec<(InternalKey, Row)>,
schema: Arc<TableSchema>,
level: Level,
format: FileFormat,
bloom_bits_per_key: u8,
) -> Result<(Vec<u8>, Bytes, ParquetFileMeta)> {
if rows.is_empty() {
let meta = ParquetFileMeta {
level,
seq_min: 0,
seq_max: 0,
key_min: Vec::new(),
key_max: Vec::new(),
num_rows: 0,
file_size: 0,
dv_path: None,
dv_offset: None,
dv_length: None,
format: Some(format),
column_stats: None,
};
return Ok((Vec::new(), Bytes::new(), meta));
}
let estimated = rows.len();
let arrow_schema = codec::arrow_schema(&schema, format);
let mut bloom = FastLocalBloom::new(estimated.max(1000), bloom_bits_per_key);
let mut key_min: Option<Vec<u8>> = None;
let mut key_max: Option<Vec<u8>> = None;
let mut seq_min = u64::MAX;
let mut seq_max = 0u64;
for (ikey, _) in &rows {
let uk = ikey.user_key_bytes().to_vec();
bloom.add(&uk);
if key_min.is_none() {
key_min = Some(uk.clone());
}
key_max = Some(uk);
if ikey.seq.0 < seq_min {
seq_min = ikey.seq.0;
}
if ikey.seq.0 > seq_max {
seq_max = ikey.seq.0;
}
}
let meta = ParquetFileMeta {
level,
seq_min: if seq_min == u64::MAX { 0 } else { seq_min },
seq_max,
key_min: key_min.unwrap_or_default(),
key_max: key_max.unwrap_or_default(),
num_rows: rows.len() as u64,
file_size: 0, dv_path: None,
dv_offset: None,
dv_length: None,
format: Some(format),
column_stats: None, };
let bloom_bytes = bloom.to_bytes();
let footer_kv = crate::parquet::footer::encode_footer_kv(&meta, &schema)?;
let mut base_kv: Vec<(String, String)> = footer_kv;
base_kv.push(("merutable.bloom".to_string(), hex::encode(&bloom_bytes)));
let pass1_bytes = arrow_write_pass(&rows, &arrow_schema, &schema, level, format, &base_kv)?;
let kv_index_entries = extract_kv_index_entries(&rows, &pass1_bytes)?;
let kv_index_bytes = kv_index::build(&kv_index_entries, kv_index::DEFAULT_RESTART_INTERVAL)?;
let mut full_kv = base_kv;
full_kv.push((
kv_index::KV_INDEX_FOOTER_KEY.to_string(),
hex::encode(&kv_index_bytes),
));
let pass2_bytes = arrow_write_pass(&rows, &arrow_schema, &schema, level, format, &full_kv)?;
let mut final_meta = meta;
final_meta.file_size = pass2_bytes.len() as u64;
final_meta.column_stats = extract_column_stats(&pass2_bytes, &schema).ok();
Ok((pass2_bytes, bloom_bytes, final_meta))
}
fn extract_column_stats(bytes: &[u8], schema: &TableSchema) -> Result<Vec<ColumnStats>> {
let reader = SerializedFileReader::new(Bytes::copy_from_slice(bytes))
.map_err(|e| MeruError::Parquet(e.to_string()))?;
let file_meta = reader.metadata();
let parquet_schema = file_meta.file_metadata().schema_descr();
let mut name_to_col: std::collections::HashMap<String, usize> =
std::collections::HashMap::new();
for i in 0..parquet_schema.num_columns() {
let name = parquet_schema.column(i).name().to_string();
name_to_col.insert(name, i);
}
struct Acc {
field_id: i32,
col_type: ColumnType,
compressed_bytes: u64,
value_count: u64,
null_count: u64,
min_bytes: Option<Vec<u8>>,
max_bytes: Option<Vec<u8>>,
}
let mut accs: Vec<(usize, Acc)> = Vec::new();
for (idx, col_def) in schema.columns.iter().enumerate() {
let field_id = (idx + 1) as i32;
if let Some(&parquet_col_idx) = name_to_col.get(&col_def.name) {
accs.push((
parquet_col_idx,
Acc {
field_id,
col_type: col_def.col_type.clone(),
compressed_bytes: 0,
value_count: 0,
null_count: 0,
min_bytes: None,
max_bytes: None,
},
));
}
}
for rg_idx in 0..file_meta.num_row_groups() {
let rg = file_meta.row_group(rg_idx);
for (parquet_col_idx, acc) in accs.iter_mut() {
let chunk = rg.column(*parquet_col_idx);
acc.compressed_bytes = acc
.compressed_bytes
.saturating_add(chunk.compressed_size().max(0) as u64);
if let Some(stats) = chunk.statistics() {
let rg_rows = rg.num_rows().max(0) as u64;
let null_count = stats.null_count_opt().unwrap_or(0);
let values = rg_rows.saturating_sub(null_count);
acc.value_count = acc.value_count.saturating_add(values);
acc.null_count = acc.null_count.saturating_add(null_count);
use parquet::file::statistics::Statistics as PqStats;
let (min_b, max_b): (Option<Vec<u8>>, Option<Vec<u8>>) = match stats {
PqStats::Boolean(s) => {
let b2b = |b: &bool| vec![if *b { 1u8 } else { 0u8 }];
(s.min_opt().map(b2b), s.max_opt().map(b2b))
}
PqStats::Int32(s) => (
s.min_opt().map(|v| v.to_le_bytes().to_vec()),
s.max_opt().map(|v| v.to_le_bytes().to_vec()),
),
PqStats::Int64(s) => (
s.min_opt().map(|v| v.to_le_bytes().to_vec()),
s.max_opt().map(|v| v.to_le_bytes().to_vec()),
),
PqStats::Float(s) => (
s.min_opt().map(|v| v.to_le_bytes().to_vec()),
s.max_opt().map(|v| v.to_le_bytes().to_vec()),
),
PqStats::Double(s) => (
s.min_opt().map(|v| v.to_le_bytes().to_vec()),
s.max_opt().map(|v| v.to_le_bytes().to_vec()),
),
PqStats::ByteArray(s) => (
s.min_opt().map(|v| v.data().to_vec()),
s.max_opt().map(|v| v.data().to_vec()),
),
PqStats::FixedLenByteArray(s) => (
s.min_opt().map(|v| v.data().to_vec()),
s.max_opt().map(|v| v.data().to_vec()),
),
_ => (None, None),
};
match (&mut acc.min_bytes, min_b) {
(slot @ None, Some(v)) => *slot = Some(v),
(Some(cur), Some(v)) if bound_is_less(&acc.col_type, &v, cur) => {
*cur = v;
}
_ => {}
}
match (&mut acc.max_bytes, max_b) {
(slot @ None, Some(v)) => *slot = Some(v),
(Some(cur), Some(v)) if bound_is_less(&acc.col_type, cur, &v) => {
*cur = v;
}
_ => {}
}
} else {
}
}
}
Ok(accs
.into_iter()
.map(|(_, a)| ColumnStats {
field_id: a.field_id,
compressed_bytes: a.compressed_bytes,
value_count: a.value_count,
null_count: a.null_count,
lower_bound: a.min_bytes,
upper_bound: a.max_bytes,
})
.collect())
}
fn bound_is_less(col_type: &ColumnType, a: &[u8], b: &[u8]) -> bool {
match col_type {
ColumnType::Int32 => {
let a = i32::from_le_bytes(a.try_into().unwrap_or([0u8; 4]));
let b = i32::from_le_bytes(b.try_into().unwrap_or([0u8; 4]));
a < b
}
ColumnType::Int64 => {
let a = i64::from_le_bytes(a.try_into().unwrap_or([0u8; 8]));
let b = i64::from_le_bytes(b.try_into().unwrap_or([0u8; 8]));
a < b
}
ColumnType::Float => {
let a = f32::from_le_bytes(a.try_into().unwrap_or([0u8; 4]));
let b = f32::from_le_bytes(b.try_into().unwrap_or([0u8; 4]));
a < b
}
ColumnType::Double => {
let a = f64::from_le_bytes(a.try_into().unwrap_or([0u8; 8]));
let b = f64::from_le_bytes(b.try_into().unwrap_or([0u8; 8]));
a < b
}
ColumnType::Boolean | ColumnType::ByteArray | ColumnType::FixedLenByteArray(_) => a < b,
}
}
fn arrow_write_pass(
rows: &[(InternalKey, Row)],
arrow_schema: &Arc<arrow::datatypes::Schema>,
schema: &TableSchema,
level: Level,
format: FileFormat,
kv: &[(String, String)],
) -> Result<Vec<u8>> {
let kv_meta: Vec<parquet::format::KeyValue> = kv
.iter()
.map(|(k, v)| parquet::format::KeyValue {
key: k.clone(),
value: Some(v.clone()),
})
.collect();
let builder = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.set_max_row_group_size(target_rows_per_row_group(level))
.set_data_page_size_limit(target_data_page_bytes(level))
.set_key_value_metadata(Some(kv_meta));
let builder = build_column_encoding_props(builder, schema, format);
let props = builder.build();
let buf: Vec<u8> = Vec::new();
let mut writer = ArrowWriter::try_new(buf, arrow_schema.clone(), Some(props))
.map_err(|e| MeruError::Parquet(e.to_string()))?;
let batch = codec::rows_to_record_batch(rows, schema, format)?;
writer
.write(&batch)
.map_err(|e| MeruError::Parquet(e.to_string()))?;
writer
.into_inner()
.map_err(|e| MeruError::Parquet(e.to_string()))
}
fn extract_kv_index_entries(
rows: &[(InternalKey, Row)],
pass1_bytes: &[u8],
) -> Result<Vec<(Vec<u8>, PageLocation)>> {
let bytes = Bytes::copy_from_slice(pass1_bytes);
let opts = ReadOptionsBuilder::new().with_page_index().build();
let reader = SerializedFileReader::new_with_options(bytes, opts)
.map_err(|e| MeruError::Parquet(e.to_string()))?;
let metadata = reader.metadata();
let offset_index = metadata.offset_index().ok_or_else(|| {
MeruError::Parquet(
"OffsetIndex missing from pass-1 Parquet file (expected ArrowWriter to emit it)".into(),
)
})?;
const IKEY_COL_IDX: usize = 0;
let mut entries: Vec<(Vec<u8>, PageLocation)> = Vec::new();
let mut row_group_start: u64 = 0;
for (rg_idx, rg_offset_indexes) in offset_index.iter().enumerate() {
let ikey_offsets = rg_offset_indexes.get(IKEY_COL_IDX).ok_or_else(|| {
MeruError::Parquet(format!(
"OffsetIndex missing _merutable_ikey column for row group {rg_idx}"
))
})?;
for page_loc in ikey_offsets.page_locations() {
if page_loc.first_row_index < 0 {
return Err(MeruError::Corruption(format!(
"negative first_row_index {} in row group {rg_idx}",
page_loc.first_row_index
)));
}
if page_loc.offset < 0 {
return Err(MeruError::Corruption(format!(
"negative page offset {} in row group {rg_idx}",
page_loc.offset
)));
}
if page_loc.compressed_page_size < 0 {
return Err(MeruError::Corruption(format!(
"negative compressed_page_size {} in row group {rg_idx}",
page_loc.compressed_page_size
)));
}
let global_first_row = row_group_start + page_loc.first_row_index as u64;
let row = rows.get(global_first_row as usize).ok_or_else(|| {
MeruError::Parquet(format!(
"OffsetIndex first_row_index {global_first_row} \
out of bounds for input row count {}",
rows.len()
))
})?;
entries.push((
row.0.as_bytes().to_vec(),
PageLocation {
page_offset: page_loc.offset as u64,
page_size: page_loc.compressed_page_size as u32,
first_row_index: global_first_row,
},
));
}
let rg_num_rows = metadata.row_group(rg_idx).num_rows();
if rg_num_rows < 0 {
return Err(MeruError::Corruption(format!(
"negative num_rows {} in row group {rg_idx}",
rg_num_rows
)));
}
let rg_num_rows = rg_num_rows as u64;
row_group_start += rg_num_rows;
}
Ok(entries)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn row_group_bytes_grow_from_hot_to_cold() {
let l0 = target_row_group_bytes(Level(0));
let l1 = target_row_group_bytes(Level(1));
let l2 = target_row_group_bytes(Level(2));
let l3 = target_row_group_bytes(Level(3));
assert!(l0 < l1, "L0 ({l0}) must be smaller than L1 ({l1})");
assert!(l1 < l2, "L1 ({l1}) must be smaller than L2 ({l2})");
assert_eq!(l2, l3, "L2+ should plateau at the cold-tier size");
}
#[test]
fn data_page_bytes_grow_from_hot_to_cold() {
let l0 = target_data_page_bytes(Level(0));
let l1 = target_data_page_bytes(Level(1));
let l2 = target_data_page_bytes(Level(2));
let l3 = target_data_page_bytes(Level(3));
assert!(l0 < l1, "L0 ({l0}) must be smaller than L1 ({l1})");
assert!(l1 < l2, "L1 ({l1}) must be smaller than L2 ({l2})");
assert_eq!(l2, l3, "L2+ should plateau at the cold-tier size");
}
#[test]
fn cold_tier_significantly_larger_than_hot() {
let rg_ratio = target_row_group_bytes(Level(2)) / target_row_group_bytes(Level(0));
let pg_ratio = target_data_page_bytes(Level(2)) / target_data_page_bytes(Level(0));
assert!(
rg_ratio >= 8,
"row-group cold/hot ratio {rg_ratio}x is too small (need ≥8x)"
);
assert!(
pg_ratio >= 8,
"data-page cold/hot ratio {pg_ratio}x is too small (need ≥8x)"
);
}
#[test]
fn l0_stays_row_store_sized() {
assert!(
target_row_group_bytes(Level(0)) <= 8 * 1024 * 1024,
"L0 row group must stay ≤ 8 MiB to behave like a row store"
);
assert!(
target_data_page_bytes(Level(0)) <= 16 * 1024,
"L0 data page must stay ≤ 16 KiB to behave like a row store"
);
}
#[test]
fn cold_tier_stays_analytics_sized() {
assert!(
target_row_group_bytes(Level(2)) >= 64 * 1024 * 1024,
"Cold tier row group must stay ≥ 64 MiB for analytics scans"
);
assert!(
target_data_page_bytes(Level(2)) >= 64 * 1024,
"Cold tier data page must stay ≥ 64 KiB for compression + I/O amortization"
);
}
#[test]
fn rows_per_row_group_monotonic_and_floored() {
let r0 = target_rows_per_row_group(Level(0));
let r1 = target_rows_per_row_group(Level(1));
let r2 = target_rows_per_row_group(Level(2));
assert!(r0 >= 1024, "row floor violated: {r0}");
assert!(r0 < r1, "L0 rows ({r0}) must be < L1 rows ({r1})");
assert!(r1 < r2, "L1 rows ({r1}) must be < L2 rows ({r2})");
}
use crate::types::{
schema::{ColumnDef, ColumnType, TableSchema},
sequence::{OpType, SeqNum},
value::{FieldValue, Row},
};
use bytes::Bytes as BBytes;
use crate::parquet::kv_index::{KvSparseIndex, KV_INDEX_FOOTER_KEY};
fn kv_index_test_schema() -> TableSchema {
TableSchema {
table_name: "kv_index_test".into(),
columns: vec![
ColumnDef {
name: "id".into(),
col_type: ColumnType::Int64,
nullable: false,
..Default::default()
},
ColumnDef {
name: "payload".into(),
col_type: ColumnType::ByteArray,
nullable: false,
..Default::default()
},
],
primary_key: vec![0],
..Default::default()
}
}
fn make_test_rows(n: usize, schema: &TableSchema) -> Vec<(InternalKey, Row)> {
(0..n as i64)
.map(|i| {
let ikey = InternalKey::encode(
&[FieldValue::Int64(i)],
SeqNum(i as u64 + 1),
OpType::Put,
schema,
)
.unwrap();
let mut payload_bytes = vec![0u8; 256];
let stamp = i.to_le_bytes();
for chunk in payload_bytes.chunks_mut(8) {
let n = chunk.len().min(8);
chunk[..n].copy_from_slice(&stamp[..n]);
}
let row = Row::new(vec![
Some(FieldValue::Int64(i)),
Some(FieldValue::Bytes(BBytes::from(payload_bytes))),
]);
(ikey, row)
})
.collect()
}
fn read_kv_index_from_file(file_bytes: &[u8]) -> Option<KvSparseIndex> {
let bytes = BBytes::copy_from_slice(file_bytes);
let reader = SerializedFileReader::new(bytes).ok()?;
let kv = reader.metadata().file_metadata().key_value_metadata()?;
let entry = kv.iter().find(|e| e.key == KV_INDEX_FOOTER_KEY)?;
let hex_str = entry.value.as_ref()?;
let raw = hex::decode(hex_str).ok()?;
KvSparseIndex::from_bytes(BBytes::from(raw)).ok()
}
#[test]
fn writer_emits_kv_index_in_footer() {
let schema = kv_index_test_schema();
let rows = make_test_rows(16_384, &schema);
let (file_bytes, _bloom, _meta) = write_sorted_rows(
rows.clone(),
Arc::new(schema),
Level(0),
crate::types::level::FileFormat::Dual,
10,
)
.unwrap();
let idx =
read_kv_index_from_file(&file_bytes).expect("writer must emit merutable.kv_index.v1");
assert!(
!idx.is_empty(),
"kv_index must contain at least one entry for a non-empty file"
);
assert!(
idx.len() >= 2,
"expected ≥2 page entries for 16k rows at L0; got {}",
idx.len()
);
}
#[test]
fn kv_index_predecessor_holds_for_every_input_key() {
let schema = kv_index_test_schema();
let rows = make_test_rows(16_384, &schema);
let (file_bytes, _bloom, _meta) = write_sorted_rows(
rows.clone(),
Arc::new(schema),
Level(0),
crate::types::level::FileFormat::Dual,
10,
)
.unwrap();
let idx = read_kv_index_from_file(&file_bytes).unwrap();
let entries: Vec<_> = idx.iter().collect();
assert!(!entries.is_empty());
for (row_idx, (ikey, _)) in rows.iter().enumerate() {
let probe = ikey.as_bytes();
let loc = idx.find_page(probe).unwrap_or_else(|| {
panic!("kv_index find_page returned None for input row {row_idx}")
});
assert!(
(loc.first_row_index as usize) <= row_idx,
"page first_row_index {} must be ≤ probe row {}",
loc.first_row_index,
row_idx
);
assert!(
(loc.first_row_index as usize) < rows.len(),
"page first_row_index {} out of bounds for row count {}",
loc.first_row_index,
rows.len()
);
}
}
#[test]
fn kv_index_entries_are_strictly_ascending() {
let schema = kv_index_test_schema();
let rows = make_test_rows(1500, &schema);
let (file_bytes, _bloom, _meta) = write_sorted_rows(
rows,
Arc::new(schema),
Level(0),
crate::types::level::FileFormat::Dual,
10,
)
.unwrap();
let idx = read_kv_index_from_file(&file_bytes).unwrap();
let mut prev: Option<Vec<u8>> = None;
for (k, _) in idx.iter() {
if let Some(ref p) = prev {
assert!(
k.as_slice() > p.as_slice(),
"kv_index entries not strictly ascending: {p:?} ≮ {k:?}"
);
}
prev = Some(k);
}
}
#[test]
fn l0_has_value_blob_column_l1_does_not() {
let schema = kv_index_test_schema();
let rows = make_test_rows(64, &schema);
let (l0_bytes, _, _) = write_sorted_rows(
rows.clone(),
Arc::new(schema.clone()),
Level(0),
crate::types::level::FileFormat::Dual,
10,
)
.unwrap();
let (l1_bytes, _, _) = write_sorted_rows(
rows,
Arc::new(schema),
Level(1),
crate::types::level::FileFormat::Columnar,
10,
)
.unwrap();
let l0_reader = SerializedFileReader::new(BBytes::from(l0_bytes)).unwrap();
let l1_reader = SerializedFileReader::new(BBytes::from(l1_bytes)).unwrap();
let l0_descr = l0_reader.metadata().file_metadata().schema_descr_ptr();
let l1_descr = l1_reader.metadata().file_metadata().schema_descr_ptr();
let l0_cols: Vec<String> = (0..l0_descr.num_columns())
.map(|i| l0_descr.column(i).name().to_string())
.collect();
let l1_cols: Vec<String> = (0..l1_descr.num_columns())
.map(|i| l1_descr.column(i).name().to_string())
.collect();
assert!(
l0_cols
.iter()
.any(|c| c == crate::parquet::codec::IKEY_COLUMN_NAME),
"L0 must contain ikey column; got {l0_cols:?}"
);
assert!(
l0_cols
.iter()
.any(|c| c == crate::parquet::codec::VALUE_BLOB_COLUMN_NAME),
"L0 must contain value blob column; got {l0_cols:?}"
);
assert!(
l1_cols
.iter()
.any(|c| c == crate::parquet::codec::IKEY_COLUMN_NAME),
"L1 must contain ikey column; got {l1_cols:?}"
);
assert!(
!l1_cols
.iter()
.any(|c| c == crate::parquet::codec::VALUE_BLOB_COLUMN_NAME),
"L1 must NOT contain value blob column; got {l1_cols:?}"
);
assert!(l0_cols.iter().any(|c| c == "id") && l0_cols.iter().any(|c| c == "payload"));
assert!(l1_cols.iter().any(|c| c == "id") && l1_cols.iter().any(|c| c == "payload"));
}
#[test]
fn empty_input_emits_no_kv_index() {
let schema = kv_index_test_schema();
let (file_bytes, _bloom, _meta) = write_sorted_rows(
vec![],
Arc::new(schema),
Level(0),
crate::types::level::FileFormat::Dual,
10,
)
.unwrap();
assert!(file_bytes.is_empty());
assert!(read_kv_index_from_file(&file_bytes).is_none());
}
}