mod builder;
mod pruning;
mod reader;
mod schema;
pub mod writer;
pub mod zone_map;
use std::sync::Arc;
pub(crate) use builder::StatsAccumulator;
pub use schema::MAX_IS_TRUNCATED;
pub use schema::MIN_IS_TRUNCATED;
use vortex_array::DeserializeMetadata;
use vortex_array::SerializeMetadata;
use vortex_array::dtype::DType;
use vortex_array::dtype::TryFromBytes;
use vortex_array::expr::stats::Stat;
use vortex_array::stats::as_stat_bitset_bytes;
use vortex_array::stats::stats_from_bitset_bytes;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_error::vortex_ensure;
use vortex_error::vortex_ensure_eq;
use vortex_error::vortex_panic;
use vortex_session::VortexSession;
use vortex_session::registry::ReadContext;
use crate::LayoutChildType;
use crate::LayoutEncodingRef;
use crate::LayoutId;
use crate::LayoutReaderRef;
use crate::LayoutRef;
use crate::VTable;
use crate::children::LayoutChildren;
use crate::children::OwnedLayoutChildren;
use crate::layouts::zoned::reader::ZonedReader;
use crate::layouts::zoned::schema::stats_table_dtype;
use crate::segments::SegmentId;
use crate::segments::SegmentSource;
use crate::vtable;
vtable!(Zoned);
impl VTable for Zoned {
type Layout = ZonedLayout;
type Encoding = ZonedLayoutEncoding;
type Metadata = ZonedMetadata;
fn id(_encoding: &Self::Encoding) -> LayoutId {
LayoutId::new("vortex.stats")
}
fn encoding(_layout: &Self::Layout) -> LayoutEncodingRef {
LayoutEncodingRef::new_ref(ZonedLayoutEncoding.as_ref())
}
fn row_count(layout: &Self::Layout) -> u64 {
layout.children.child_row_count(0)
}
fn dtype(layout: &Self::Layout) -> &DType {
&layout.dtype
}
fn metadata(layout: &Self::Layout) -> Self::Metadata {
ZonedMetadata {
zone_len: u32::try_from(layout.zone_len).vortex_expect("Invalid zone length"),
present_stats: Arc::clone(&layout.present_stats),
}
}
fn segment_ids(_layout: &Self::Layout) -> Vec<SegmentId> {
vec![]
}
fn nchildren(_layout: &Self::Layout) -> usize {
2
}
fn child(layout: &Self::Layout, idx: usize) -> VortexResult<LayoutRef> {
match idx {
0 => layout.children.child(0, layout.dtype()),
1 => layout
.children
.child(1, &stats_table_dtype(layout.dtype(), &layout.present_stats)),
_ => vortex_bail!("Invalid child index: {}", idx),
}
}
fn child_type(_layout: &Self::Layout, idx: usize) -> LayoutChildType {
match idx {
0 => LayoutChildType::Transparent("data".into()),
1 => LayoutChildType::Auxiliary("zones".into()),
_ => vortex_panic!("Invalid child index: {}", idx),
}
}
fn new_reader(
layout: &Self::Layout,
name: Arc<str>,
segment_source: Arc<dyn SegmentSource>,
session: &VortexSession,
) -> VortexResult<LayoutReaderRef> {
Ok(Arc::new(ZonedReader::try_new(
layout.clone(),
name,
segment_source,
session.clone(),
)?))
}
fn build(
_encoding: &Self::Encoding,
dtype: &DType,
_row_count: u64,
metadata: &ZonedMetadata,
_segment_ids: Vec<SegmentId>,
children: &dyn LayoutChildren,
_ctx: &ReadContext,
) -> VortexResult<Self::Layout> {
vortex_ensure_eq!(
children.nchildren(),
2,
"ZonedLayout expects exactly 2 children (data, zones)"
);
Ok(ZonedLayout {
dtype: dtype.clone(),
children: children.to_arc(),
zone_len: metadata.zone_len as usize,
present_stats: Arc::clone(&metadata.present_stats),
})
}
fn with_children(layout: &mut Self::Layout, children: Vec<LayoutRef>) -> VortexResult<()> {
if children.len() != 2 {
vortex_bail!(
"ZonedLayout expects exactly 2 children (data, zones), got {}",
children.len()
);
}
layout.children = OwnedLayoutChildren::layout_children(children);
Ok(())
}
}
#[derive(Debug)]
pub struct ZonedLayoutEncoding;
#[derive(Clone, Debug)]
pub struct ZonedLayout {
dtype: DType,
children: Arc<dyn LayoutChildren>,
zone_len: usize,
present_stats: Arc<[Stat]>,
}
impl ZonedLayout {
pub fn new(
data: LayoutRef,
zones: LayoutRef,
zone_len: usize,
present_stats: Arc<[Stat]>,
) -> Self {
if zone_len == 0 {
vortex_panic!("Zone length must be greater than 0");
}
let expected_dtype = stats_table_dtype(data.dtype(), &present_stats);
if zones.dtype() != &expected_dtype {
vortex_panic!("Invalid zone map layout: zones dtype does not match expected dtype");
}
Self {
dtype: data.dtype().clone(),
children: OwnedLayoutChildren::layout_children(vec![data, zones]),
zone_len,
present_stats,
}
}
pub fn nzones(&self) -> usize {
usize::try_from(self.children.child_row_count(1)).vortex_expect("Invalid number of zones")
}
pub fn zone_len(&self) -> usize {
self.zone_len
}
pub fn present_stats(&self) -> &Arc<[Stat]> {
&self.present_stats
}
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct ZonedMetadata {
pub(super) zone_len: u32,
pub(super) present_stats: Arc<[Stat]>,
}
impl DeserializeMetadata for ZonedMetadata {
type Output = Self;
fn deserialize(metadata: &[u8]) -> VortexResult<Self::Output> {
vortex_ensure!(
metadata.len() >= 4,
"Zoned metadata must contain at least 4 bytes for zone length, got {}",
metadata.len()
);
let zone_len = u32::try_from_le_bytes(&metadata[0..4])?;
let present_stats: Arc<[Stat]> = stats_from_bitset_bytes(&metadata[4..]).into();
Ok(Self {
zone_len,
present_stats,
})
}
}
impl SerializeMetadata for ZonedMetadata {
fn serialize(self) -> Vec<u8> {
let mut metadata = vec![];
metadata.extend_from_slice(&self.zone_len.to_le_bytes());
metadata.extend_from_slice(&as_stat_bitset_bytes(&self.present_stats));
metadata
}
}
#[cfg(test)]
mod tests {
use std::panic;
use rstest::rstest;
use vortex_array::dtype::DType;
use vortex_array::dtype::Nullability;
use vortex_array::dtype::PType;
use vortex_session::registry::ReadContext;
use super::*;
use crate::IntoLayout;
use crate::children::OwnedLayoutChildren;
use crate::layouts::flat::FlatLayout;
use crate::segments::SegmentId;
#[rstest]
#[case(ZonedMetadata {
zone_len: u32::MAX,
present_stats: Arc::new([]),
})]
#[case::all_sorted(ZonedMetadata {
zone_len: 314,
present_stats: Arc::new([Stat::IsConstant, Stat::IsSorted, Stat::IsStrictSorted, Stat::Max, Stat::Min, Stat::Sum, Stat::NullCount, Stat::UncompressedSizeInBytes, Stat::NaNCount]),
})]
#[case::some_sorted(ZonedMetadata {
zone_len: 314,
present_stats: Arc::new([Stat::IsSorted, Stat::IsStrictSorted, Stat::Max, Stat::Min, Stat::Sum, Stat::NullCount, Stat::UncompressedSizeInBytes, Stat::NaNCount]),
})]
fn test_metadata_serialization(#[case] metadata: ZonedMetadata) {
let serialized = metadata.clone().serialize();
let deserialized = ZonedMetadata::deserialize(&serialized).unwrap();
assert_eq!(deserialized, metadata);
}
#[test]
fn test_deserialize_unsorted_stats() {
let metadata = ZonedMetadata {
zone_len: u32::MAX,
present_stats: Arc::new([Stat::IsStrictSorted, Stat::IsSorted]),
};
let serialized = metadata.clone().serialize();
let deserialized = ZonedMetadata::deserialize(&serialized).unwrap();
assert!(deserialized.present_stats.is_sorted());
assert_eq!(
deserialized.present_stats.len(),
metadata.present_stats.len()
);
assert_ne!(deserialized.present_stats, metadata.present_stats);
}
#[rstest]
#[case(vec![])]
#[case(vec![0])]
#[case(vec![0, 0])]
#[case(vec![0, 0, 0])]
fn test_deserialize_short_metadata_errors(#[case] metadata: Vec<u8>) {
assert!(ZonedMetadata::deserialize(&metadata).is_err());
}
#[test]
fn test_deserialize_short_metadata_returns_error_not_panic() {
let result = panic::catch_unwind(|| ZonedMetadata::deserialize(&[]));
assert!(
result.is_ok(),
"deserialize should return an error, not panic"
);
assert!(result.unwrap().is_err());
}
#[test]
fn test_deserialize_zero_zone_len_is_allowed_for_backcompat() {
let metadata = 0u32.to_le_bytes();
let deserialized = ZonedMetadata::deserialize(&metadata).unwrap();
assert_eq!(deserialized.zone_len, 0);
assert!(deserialized.present_stats.is_empty());
}
#[test]
fn test_build_allows_zero_zone_len_for_backcompat() -> VortexResult<()> {
let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
let read_ctx = ReadContext::new([]);
let children = OwnedLayoutChildren::layout_children(vec![
FlatLayout::new(0, dtype.clone(), SegmentId::from(0), read_ctx.clone()).into_layout(),
FlatLayout::new(
0,
stats_table_dtype(&dtype, &[]),
SegmentId::from(1),
read_ctx,
)
.into_layout(),
]);
let layout = <Zoned as VTable>::build(
&ZonedLayoutEncoding,
&dtype,
0,
&ZonedMetadata {
zone_len: 0,
present_stats: Arc::new([]),
},
vec![],
children.as_ref(),
&ReadContext::new([]),
)?;
assert_eq!(layout.zone_len, 0);
Ok(())
}
#[test]
fn test_build_rejects_invalid_child_count() {
let metadata = ZonedMetadata {
zone_len: 3,
present_stats: Arc::new([]),
};
let children = OwnedLayoutChildren::layout_children(vec![]);
let result = <Zoned as VTable>::build(
&ZonedLayoutEncoding,
&DType::Primitive(PType::I32, Nullability::NonNullable),
0,
&metadata,
vec![],
children.as_ref(),
&ReadContext::new([]),
);
assert!(result.is_err());
}
}