use std::collections::{BTreeMap, HashMap};
use arrow::compute::concat;
use arrow_array::{Array, ArrayRef, RecordBatch};
use arrow_schema::{DataType, Schema};
use base64::{Engine, engine::general_purpose::STANDARD as BASE64};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use uuid::Uuid;
use crate::supertable::manifest::{
add_sum_arrays,
bloom::Bloom,
column_hll, column_min_max, column_sum,
encoding::{DecodeError, EncodeError, decode_length1_array, encode_length1_array},
hll::HllSketch,
merge_min_max_arrays,
part::{BLAKE3_DIGEST_BYTES, BLAKE3_HEX_LEN, ContentHash, PartId},
term_range::prefix_overlaps_range,
};
pub const FORMAT_VERSION: &str = "1.0";
#[derive(Debug, Clone)]
pub struct Manifest {
pub format_version: String,
pub manifest_id: u64,
pub options_hash: ContentHash,
pub schema: Vec<u8>,
pub id_column: String,
pub fts_columns: Vec<FtsColumnInfo>,
pub vector_columns: Vec<VectorColumnInfo>,
pub partition_strategy: PartitionStrategy,
pub parts: Vec<ManifestPartEntry>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FtsColumnInfo {
pub column: String,
}
#[derive(Debug, Clone, PartialEq)]
pub struct VectorColumnInfo {
pub column: String,
pub dim: usize,
pub n_cent: usize,
pub rot_seed: u64,
pub metric: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PartitionStrategy {
TimeRange {
column: String,
granularity_secs: i64,
},
Hash {
column: String,
n_buckets: u32,
},
ColumnRange {
column: String,
boundaries: Vec<Vec<u8>>,
},
IngestionTime {
granularity_secs: i64,
},
}
#[derive(Debug, Clone)]
pub struct ManifestPartEntry {
pub part_id: PartId,
pub uri: String,
pub n_superfiles: u64,
pub size_bytes_compressed: u64,
pub size_bytes_uncompressed: u64,
pub content_hash: ContentHash,
pub partition_key: Vec<u8>,
pub id_range: (i128, i128),
pub scalar_stats_agg: HashMap<String, ScalarStatsAgg>,
pub fts_summary_agg: BTreeMap<String, FtsSummaryAgg>,
pub vector_summary_agg: BTreeMap<String, VectorSummaryAgg>,
}
#[derive(Debug, Clone)]
pub struct ScalarStatsAgg {
pub min: ArrayRef,
pub max: ArrayRef,
pub null_count: Option<u64>,
pub sum: Option<ArrayRef>,
pub hll: Option<Vec<u8>>,
}
impl PartialEq for ScalarStatsAgg {
fn eq(&self, other: &Self) -> bool {
let sum_eq = match (&self.sum, &other.sum) {
(Some(a), Some(b)) => a.to_data() == b.to_data(),
(None, None) => true,
_ => false,
};
self.min.to_data() == other.min.to_data()
&& self.max.to_data() == other.max.to_data()
&& self.null_count == other.null_count
&& sum_eq
&& self.hll == other.hll
}
}
impl ScalarStatsAgg {
pub fn from_column(column: &ArrayRef) -> Option<ScalarStatsAgg> {
let (min, max) = column_min_max(column)?;
let null_count = u64::try_from(column.null_count()).ok();
Some(ScalarStatsAgg {
min,
max,
null_count,
sum: column_sum(column),
hll: column_hll(column).map(|s| s.as_bytes().to_vec()),
})
}
pub fn from_batch(
scalar_schema: &Schema,
batch: &RecordBatch,
) -> HashMap<String, ScalarStatsAgg> {
ScalarStatsAgg::from_batches(scalar_schema, &[batch])
}
pub fn from_batches(
scalar_schema: &Schema,
batches: &[&RecordBatch],
) -> HashMap<String, ScalarStatsAgg> {
let mut out = HashMap::new();
if batches.is_empty() {
return out;
}
for (idx, field) in scalar_schema.fields().iter().enumerate() {
let Some(arrays) = batches
.iter()
.map(|b| b.columns().get(idx).map(|c| c.as_ref()))
.collect::<Option<Vec<&dyn Array>>>()
else {
continue;
};
let combined = match concat(&arrays) {
Ok(a) => a,
Err(_) => continue,
};
if let Some(agg) = ScalarStatsAgg::from_column(&combined) {
out.insert(field.name().to_string(), agg);
}
}
out
}
pub fn merge_with(&mut self, other: &ScalarStatsAgg) -> Result<(), ScalarStatsMergeError> {
let Some((min, max)) = merge_min_max_arrays(&self.min, &other.min, &self.max, &other.max)
else {
return Err(ScalarStatsMergeError {
left: self.min.data_type().clone(),
right: other.min.data_type().clone(),
});
};
self.min = min;
self.max = max;
self.null_count = match (self.null_count, other.null_count) {
(Some(a), Some(b)) => a.checked_add(b),
_ => None,
};
self.sum = match (&self.sum, &other.sum) {
(Some(a), Some(b)) => add_sum_arrays(a, b),
_ => None,
};
self.hll = match (&self.hll, &other.hll) {
(Some(a), Some(b)) => match (HllSketch::from_bytes(a), HllSketch::from_bytes(b)) {
(Some(mut merged), Some(other_sketch)) => {
merged.merge(&other_sketch);
Some(merged.as_bytes().to_vec())
}
_ => None,
},
_ => None,
};
Ok(())
}
pub fn merge(
into: &mut HashMap<String, ScalarStatsAgg>,
other: &HashMap<String, ScalarStatsAgg>,
) {
for (col, other_agg) in other {
if let Some(existing) = into.get_mut(col) {
if existing.merge_with(other_agg).is_err() {
into.remove(col);
}
} else {
into.insert(col.clone(), other_agg.clone());
}
}
}
#[cfg(test)]
pub(crate) fn from_min_max(min: ArrayRef, max: ArrayRef) -> Self {
Self {
min,
max,
null_count: None,
sum: None,
hll: None,
}
}
}
#[derive(Debug, Error)]
#[error("incompatible scalar-stats min/max types: {left:?} vs {right:?}")]
pub struct ScalarStatsMergeError {
left: DataType,
right: DataType,
}
#[derive(Debug, Clone, Default)]
pub struct FtsSummaryAgg {
pub term_bloom: Option<Bloom>,
pub n_terms_distinct: u64,
pub term_range: Option<(Vec<u8>, Vec<u8>)>,
}
impl PartialEq for FtsSummaryAgg {
fn eq(&self, other: &Self) -> bool {
let bloom_eq = match (&self.term_bloom, &other.term_bloom) {
(Some(a), Some(b)) => a.to_bytes() == b.to_bytes(),
(None, None) => true,
_ => false,
};
bloom_eq
&& self.n_terms_distinct == other.n_terms_distinct
&& self.term_range == other.term_range
}
}
impl FtsSummaryAgg {
pub fn merge_with(&mut self, other: &FtsSummaryAgg) {
self.term_bloom = match (self.term_bloom.take(), other.term_bloom.as_ref()) {
(Some(a), Some(b)) => union_blooms(&a, b),
(Some(a), None) => Some(a),
(None, Some(b)) => Some(b.clone()),
(None, None) => None,
};
self.term_range = match (self.term_range.take(), other.term_range.as_ref()) {
(Some((amin, amax)), Some((bmin, bmax))) => {
let min = if &amin <= bmin { amin } else { bmin.clone() };
let max = if &amax >= bmax { amax } else { bmax.clone() };
Some((min, max))
}
(Some(a), None) => Some(a),
(None, Some(b)) => Some(b.clone()),
(None, None) => None,
};
self.n_terms_distinct = self.n_terms_distinct.max(other.n_terms_distinct);
}
pub fn merge(
into: &mut BTreeMap<String, FtsSummaryAgg>,
other: &BTreeMap<String, FtsSummaryAgg>,
) {
for (col, other_agg) in other {
if let Some(existing) = into.get_mut(col) {
existing.merge_with(other_agg);
if existing.term_bloom.is_none() {
into.remove(col);
}
} else {
into.insert(col.clone(), other_agg.clone());
}
}
}
pub fn new_with_params(
term_bloom: Bloom,
n_terms_distinct: u32,
term_range: (Vec<u8>, Vec<u8>),
) -> Self {
let term_range = if term_range.0.is_empty() && term_range.1.is_empty() {
None
} else {
Some(term_range)
};
Self {
term_bloom: Some(term_bloom),
n_terms_distinct: u64::from(n_terms_distinct),
term_range,
}
}
pub fn may_contain(&self, term: &[u8]) -> bool {
self.term_bloom.as_ref().is_none_or(|b| b.contains(term))
}
pub fn may_match_prefix(&self, prefix: &[u8]) -> bool {
match self.term_range.as_ref() {
Some((min, max)) => prefix_overlaps_range(prefix, min, max),
None => false,
}
}
}
fn union_blooms(a: &Bloom, b: &Bloom) -> Option<Bloom> {
let mut ab = a.to_bytes();
let bb = b.to_bytes();
if ab.len() != bb.len() {
return None;
}
for (x, y) in ab.iter_mut().zip(bb.iter()) {
*x |= *y;
}
Bloom::from_bytes(&ab)
}
#[derive(Debug, Clone, Default, PartialEq)]
pub struct VectorSummaryAgg {
pub centroid_envelope: Vec<u8>,
pub n_vectors: u32,
pub envelope_radius: f32,
}
impl VectorSummaryAgg {
pub fn merge_with(&mut self, other: &VectorSummaryAgg) {
if other.n_vectors == 0 {
return;
}
if self.centroid_envelope.is_empty() && self.n_vectors > 0 {
return;
}
if self.n_vectors == 0 {
self.centroid_envelope = other.centroid_envelope.clone();
self.n_vectors = other.n_vectors;
self.envelope_radius = other.envelope_radius;
return;
}
let self_center = decode_le_f32(&self.centroid_envelope);
let other_center = decode_le_f32(&other.centroid_envelope);
if self_center.len() != other_center.len() {
self.centroid_envelope.clear();
self.envelope_radius = 0.0;
return;
}
let n_total = (self.n_vectors as f32) + (other.n_vectors as f32);
let mut new_center = vec![0.0; self_center.len()];
for (i, &self_c) in self_center.iter().enumerate() {
new_center[i] = (self_c * self.n_vectors as f32
+ other_center[i] * other.n_vectors as f32)
/ n_total;
}
let self_reach = l2_distance(&self_center, &new_center) + self.envelope_radius;
let other_reach = l2_distance(&other_center, &new_center) + other.envelope_radius;
self.centroid_envelope = encode_le_f32(&new_center);
self.n_vectors = (self.n_vectors as u64 + other.n_vectors as u64) as u32;
self.envelope_radius = self_reach.max(other_reach);
}
pub fn merge(
into: &mut BTreeMap<String, VectorSummaryAgg>,
other: &BTreeMap<String, VectorSummaryAgg>,
) {
for (col, other_agg) in other {
if let Some(existing) = into.get_mut(col) {
existing.merge_with(other_agg);
} else {
into.insert(col.clone(), other_agg.clone());
}
}
}
}
fn decode_le_f32(bytes: &[u8]) -> Vec<f32> {
bytes
.chunks_exact(4)
.map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
.collect()
}
fn encode_le_f32(v: &[f32]) -> Vec<u8> {
v.iter().flat_map(|x| x.to_le_bytes()).collect()
}
fn l2_distance(a: &[f32], b: &[f32]) -> f32 {
debug_assert_eq!(a.len(), b.len(), "l2_distance: dim mismatch");
a.iter()
.zip(b.iter())
.map(|(x, y)| {
let d = x - y;
d * d
})
.sum::<f32>()
.sqrt()
}
#[derive(Debug, Error)]
pub enum ListParseError {
#[error("json parse failed: {0}")]
Json(#[from] serde_json::Error),
#[error("base64 decode failed for {field}: {source}")]
Base64 {
field: &'static str,
source: base64::DecodeError,
},
#[error("bad content_hash: {0}")]
BadContentHash(String),
#[error("bad part_id: {0}")]
BadPartId(String),
#[error("bad value for field {0}: {1:?}")]
BadFieldValue(&'static str, String),
#[error("scalar stats decode failed for {field}: {source}")]
ScalarStats {
field: &'static str,
source: DecodeError,
},
#[error("invalid term bloom layout: {0} bytes")]
InvalidBloom(usize),
#[error("incompatible major version: got {got}, supported {supported}")]
IncompatibleMajorVersion { got: String, supported: String },
}
#[derive(Debug, Error)]
pub enum ListEncodeError {
#[error("json encode failed: {0}")]
Json(#[from] serde_json::Error),
#[error("scalar stats encode failed for {field}: {source}")]
ScalarStats {
field: &'static str,
source: EncodeError,
},
}
#[derive(Serialize, Deserialize)]
struct ManifestDto {
format_version: String,
manifest_id: u64,
options_hash: String, schema: String, id_column: String,
fts_columns: Vec<FtsColumnInfo>,
vector_columns: Vec<VectorColumnInfoDto>,
partition_strategy: PartitionStrategyDto,
parts: Vec<ManifestPartEntryDto>,
}
#[derive(Serialize, Deserialize)]
struct VectorColumnInfoDto {
column: String,
dim: u64,
n_cent: u64,
rot_seed: u64,
metric: String,
}
impl Serialize for FtsColumnInfo {
fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
let mut st = s.serialize_struct("FtsColumnInfo", 1)?;
use serde::ser::SerializeStruct;
st.serialize_field("column", &self.column)?;
st.end()
}
}
impl<'de> Deserialize<'de> for FtsColumnInfo {
fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
#[derive(Deserialize)]
struct Inner {
column: String,
}
Inner::deserialize(d).map(|i| Self { column: i.column })
}
}
#[derive(Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
enum PartitionStrategyDto {
TimeRange {
column: String,
granularity_secs: i64,
},
Hash {
column: String,
n_buckets: u32,
},
ColumnRange {
column: String,
boundaries: Vec<String>, },
IngestionTime {
granularity_secs: i64,
},
}
#[derive(Serialize, Deserialize)]
struct ManifestPartEntryDto {
part_id: String, uri: String,
n_superfiles: u64,
size_bytes_compressed: u64,
size_bytes_uncompressed: u64,
content_hash: String, partition_key: String, id_range: (String, String),
scalar_stats_agg: BTreeMap<String, ScalarStatsAggDto>,
fts_summary_agg: BTreeMap<String, FtsSummaryAggDto>,
vector_summary_agg: BTreeMap<String, VectorSummaryAggDto>,
}
#[derive(Serialize, Deserialize)]
struct ScalarStatsAggDto {
min: String, max: String, #[serde(default, skip_serializing_if = "Option::is_none")]
null_count: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
sum: Option<String>, #[serde(default, skip_serializing_if = "Option::is_none")]
hll: Option<String>, }
#[derive(Serialize, Deserialize)]
struct FtsSummaryAggDto {
term_bloom_union: String,
n_terms_distinct: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
term_range_union: Option<TermRangeUnionDto>,
}
#[derive(Serialize, Deserialize)]
struct TermRangeUnionDto {
min: String, max: String, }
#[derive(Serialize, Deserialize)]
struct VectorSummaryAggDto {
centroid_envelope: String, #[serde(default)]
n_vectors: u32,
envelope_radius: f32,
}
fn encode_hash(h: &ContentHash) -> String {
format!("blake3:{}", h.to_hex())
}
fn decode_hash(s: &str) -> Result<ContentHash, ListParseError> {
let hex = s
.strip_prefix("blake3:")
.ok_or_else(|| ListParseError::BadContentHash(s.into()))?;
if hex.len() != BLAKE3_HEX_LEN {
return Err(ListParseError::BadContentHash(s.into()));
}
let mut out = [0u8; BLAKE3_DIGEST_BYTES];
for i in 0..BLAKE3_DIGEST_BYTES {
let byte = u8::from_str_radix(&hex[2 * i..2 * i + 2], 16)
.map_err(|_| ListParseError::BadContentHash(s.into()))?;
out[i] = byte;
}
Ok(ContentHash(out))
}
fn encode_b64(b: &[u8]) -> String {
BASE64.encode(b)
}
fn decode_b64(s: &str, field: &'static str) -> Result<Vec<u8>, ListParseError> {
BASE64
.decode(s)
.map_err(|source| ListParseError::Base64 { field, source })
}
fn encode_scalar_array(
column: &str,
field: &'static str,
arr: &ArrayRef,
) -> Result<String, ListEncodeError> {
let bytes = encode_length1_array(column, arr)
.map_err(|source| ListEncodeError::ScalarStats { field, source })?;
Ok(encode_b64(&bytes))
}
fn entry_to_dto(e: &ManifestPartEntry) -> Result<ManifestPartEntryDto, ListEncodeError> {
let mut scalar_stats_agg = BTreeMap::new();
for (k, v) in &e.scalar_stats_agg {
let sum = match &v.sum {
None => None,
Some(s) => Some(encode_scalar_array(k, "scalar_stats_agg.sum", s)?),
};
scalar_stats_agg.insert(
k.clone(),
ScalarStatsAggDto {
min: encode_scalar_array(k, "scalar_stats_agg.min", &v.min)?,
max: encode_scalar_array(k, "scalar_stats_agg.max", &v.max)?,
null_count: v.null_count,
sum,
hll: v.hll.as_deref().map(encode_b64),
},
);
}
Ok(ManifestPartEntryDto {
part_id: e.part_id.0.to_string(),
uri: e.uri.clone(),
n_superfiles: e.n_superfiles,
size_bytes_compressed: e.size_bytes_compressed,
size_bytes_uncompressed: e.size_bytes_uncompressed,
content_hash: encode_hash(&e.content_hash),
partition_key: encode_b64(&e.partition_key),
id_range: (e.id_range.0.to_string(), e.id_range.1.to_string()),
scalar_stats_agg,
fts_summary_agg: e
.fts_summary_agg
.iter()
.map(|(k, v)| {
(
k.clone(),
FtsSummaryAggDto {
term_bloom_union: v
.term_bloom
.as_ref()
.map(|b| encode_b64(&b.to_bytes()))
.unwrap_or_default(),
n_terms_distinct: v.n_terms_distinct,
term_range_union: v.term_range.as_ref().map(|(mn, mx)| TermRangeUnionDto {
min: encode_b64(mn),
max: encode_b64(mx),
}),
},
)
})
.collect(),
vector_summary_agg: e
.vector_summary_agg
.iter()
.map(|(k, v)| {
(
k.clone(),
VectorSummaryAggDto {
centroid_envelope: encode_b64(&v.centroid_envelope),
n_vectors: v.n_vectors,
envelope_radius: v.envelope_radius,
},
)
})
.collect(),
})
}
fn entry_from_dto(d: ManifestPartEntryDto) -> Result<ManifestPartEntry, ListParseError> {
let part_id =
PartId(Uuid::parse_str(&d.part_id).map_err(|e| ListParseError::BadPartId(e.to_string()))?);
let content_hash = decode_hash(&d.content_hash)?;
let partition_key = decode_b64(&d.partition_key, "partition_key")?;
let mut scalar_stats_agg = HashMap::new();
for (k, v) in d.scalar_stats_agg {
let min = decode_length1_array(&decode_b64(&v.min, "scalar_stats_agg.min")?).map_err(
|source| ListParseError::ScalarStats {
field: "scalar_stats_agg.min",
source,
},
)?;
let max = decode_length1_array(&decode_b64(&v.max, "scalar_stats_agg.max")?).map_err(
|source| ListParseError::ScalarStats {
field: "scalar_stats_agg.max",
source,
},
)?;
let sum = match v.sum.as_deref() {
None => None,
Some(s) => Some(
decode_length1_array(&decode_b64(s, "scalar_stats_agg.sum")?).map_err(
|source| ListParseError::ScalarStats {
field: "scalar_stats_agg.sum",
source,
},
)?,
),
};
scalar_stats_agg.insert(
k,
ScalarStatsAgg {
min,
max,
null_count: v.null_count,
sum,
hll: v
.hll
.as_deref()
.map(|s| decode_b64(s, "scalar_stats_agg.hll"))
.transpose()?,
},
);
}
let mut fts_summary_agg = BTreeMap::new();
for (k, v) in d.fts_summary_agg {
fts_summary_agg.insert(
k,
FtsSummaryAgg {
term_bloom: {
let bytes = decode_b64(&v.term_bloom_union, "term_bloom_union")?;
if bytes.is_empty() {
None
} else {
Some(
Bloom::from_bytes(&bytes)
.ok_or(ListParseError::InvalidBloom(bytes.len()))?,
)
}
},
n_terms_distinct: v.n_terms_distinct,
term_range: match v.term_range_union {
None => None,
Some(tr) => Some((
decode_b64(&tr.min, "term_range_union.min")?,
decode_b64(&tr.max, "term_range_union.max")?,
)),
},
},
);
}
let mut vector_summary_agg = BTreeMap::new();
for (k, v) in d.vector_summary_agg {
vector_summary_agg.insert(
k,
VectorSummaryAgg {
centroid_envelope: decode_b64(&v.centroid_envelope, "centroid_envelope")?,
n_vectors: v.n_vectors,
envelope_radius: v.envelope_radius,
},
);
}
Ok(ManifestPartEntry {
part_id,
uri: d.uri,
n_superfiles: d.n_superfiles,
size_bytes_compressed: d.size_bytes_compressed,
size_bytes_uncompressed: d.size_bytes_uncompressed,
content_hash,
partition_key,
id_range: {
let lo =
d.id_range.0.parse::<i128>().map_err(|_| {
ListParseError::BadFieldValue("id_range[0]", d.id_range.0.clone())
})?;
let hi =
d.id_range.1.parse::<i128>().map_err(|_| {
ListParseError::BadFieldValue("id_range[1]", d.id_range.1.clone())
})?;
(lo, hi)
},
scalar_stats_agg,
fts_summary_agg,
vector_summary_agg,
})
}
fn strategy_to_dto(s: &PartitionStrategy) -> PartitionStrategyDto {
match s {
PartitionStrategy::TimeRange {
column,
granularity_secs,
} => PartitionStrategyDto::TimeRange {
column: column.clone(),
granularity_secs: *granularity_secs,
},
PartitionStrategy::Hash { column, n_buckets } => PartitionStrategyDto::Hash {
column: column.clone(),
n_buckets: *n_buckets,
},
PartitionStrategy::ColumnRange { column, boundaries } => {
PartitionStrategyDto::ColumnRange {
column: column.clone(),
boundaries: boundaries.iter().map(|b| encode_b64(b)).collect(),
}
}
PartitionStrategy::IngestionTime { granularity_secs } => {
PartitionStrategyDto::IngestionTime {
granularity_secs: *granularity_secs,
}
}
}
}
fn strategy_from_dto(d: PartitionStrategyDto) -> Result<PartitionStrategy, ListParseError> {
Ok(match d {
PartitionStrategyDto::TimeRange {
column,
granularity_secs,
} => PartitionStrategy::TimeRange {
column,
granularity_secs,
},
PartitionStrategyDto::Hash { column, n_buckets } => {
PartitionStrategy::Hash { column, n_buckets }
}
PartitionStrategyDto::ColumnRange { column, boundaries } => {
let mut bs = Vec::with_capacity(boundaries.len());
for b in boundaries {
bs.push(decode_b64(&b, "partition_strategy.boundaries")?);
}
PartitionStrategy::ColumnRange {
column,
boundaries: bs,
}
}
PartitionStrategyDto::IngestionTime { granularity_secs } => {
PartitionStrategy::IngestionTime { granularity_secs }
}
})
}
fn list_to_dto(l: &Manifest) -> Result<ManifestDto, ListEncodeError> {
let parts = l
.parts
.iter()
.map(entry_to_dto)
.collect::<Result<Vec<_>, _>>()?;
Ok(ManifestDto {
format_version: l.format_version.clone(),
manifest_id: l.manifest_id,
options_hash: encode_hash(&l.options_hash),
schema: encode_b64(&l.schema),
id_column: l.id_column.clone(),
fts_columns: l.fts_columns.clone(),
vector_columns: l
.vector_columns
.iter()
.map(|c| VectorColumnInfoDto {
column: c.column.clone(),
dim: c.dim as u64,
n_cent: c.n_cent as u64,
rot_seed: c.rot_seed,
metric: c.metric.clone(),
})
.collect(),
partition_strategy: strategy_to_dto(&l.partition_strategy),
parts,
})
}
fn list_from_dto(d: ManifestDto) -> Result<Manifest, ListParseError> {
check_major(&d.format_version)?;
let options_hash = decode_hash(&d.options_hash)?;
let schema = decode_b64(&d.schema, "schema")?;
let mut parts = Vec::with_capacity(d.parts.len());
for entry in d.parts {
parts.push(entry_from_dto(entry)?);
}
Ok(Manifest {
format_version: d.format_version,
manifest_id: d.manifest_id,
options_hash,
schema,
id_column: d.id_column,
fts_columns: d.fts_columns,
vector_columns: d
.vector_columns
.into_iter()
.map(|c| VectorColumnInfo {
column: c.column,
dim: c.dim as usize,
n_cent: c.n_cent as usize,
rot_seed: c.rot_seed,
metric: c.metric,
})
.collect(),
partition_strategy: strategy_from_dto(d.partition_strategy)?,
parts,
})
}
pub fn encode(list: &Manifest) -> Result<Vec<u8>, ListEncodeError> {
let dto = list_to_dto(list)?;
Ok(serde_json::to_vec_pretty(&dto)?)
}
pub fn decode(bytes: &[u8]) -> Result<Manifest, ListParseError> {
let dto: ManifestDto = serde_json::from_slice(bytes)?;
list_from_dto(dto)
}
fn check_major(fv: &str) -> Result<(), ListParseError> {
let supported_major = FORMAT_VERSION
.split('.')
.next()
.expect("constant has a dot");
let got_major = fv.split('.').next().unwrap_or("");
if got_major != supported_major {
return Err(ListParseError::IncompatibleMajorVersion {
got: fv.to_string(),
supported: FORMAT_VERSION.to_string(),
});
}
Ok(())
}
#[cfg(test)]
mod tests {
use std::{
collections::{BTreeMap, HashMap},
str::from_utf8,
sync::Arc,
};
use arrow_array::{BooleanArray, Date32Array, Int64Array, StringArray};
use arrow_schema::{DataType, Field};
use uuid::Uuid;
use super::{
super::{
bloom::BloomBuilder,
part::{ContentHash, PartId},
},
*,
};
fn agg_i64(vals: Vec<i64>) -> ScalarStatsAgg {
let arr: ArrayRef = Arc::new(Int64Array::from(vals));
ScalarStatsAgg::from_column(&arr).expect("i64 is orderable")
}
fn i64_at0(arr: &ArrayRef) -> i64 {
arr.as_any()
.downcast_ref::<Int64Array>()
.expect("int64 array")
.value(0)
}
#[test]
fn scalar_agg_from_column_computes_min_max_sum_nullcount() {
let arr: ArrayRef = Arc::new(Int64Array::from(vec![Some(3), None, Some(7), Some(1)]));
let agg = ScalarStatsAgg::from_column(&arr).expect("orderable");
assert_eq!(i64_at0(&agg.min), 1);
assert_eq!(i64_at0(&agg.max), 7);
assert_eq!(agg.null_count, Some(1));
assert_eq!(i64_at0(agg.sum.as_ref().expect("sum")), 11); assert!(agg.hll.is_some());
}
#[test]
fn scalar_agg_from_batch_builds_each_column() {
let schema = Schema::new(vec![
Field::new("x", DataType::Int64, true),
Field::new("y", DataType::Int64, true),
]);
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(Int64Array::from(vec![3, 7, 1])) as ArrayRef,
Arc::new(Int64Array::from(vec![20, 5, 9])) as ArrayRef,
],
)
.expect("batch");
let table = ScalarStatsAgg::from_batch(&schema, &batch);
assert_eq!(table.len(), 2);
assert_eq!(i64_at0(&table["x"].min), 1);
assert_eq!(i64_at0(&table["x"].max), 7);
assert_eq!(i64_at0(&table["y"].min), 5);
assert_eq!(i64_at0(&table["y"].max), 20);
}
#[test]
fn scalar_agg_from_batches_concats_then_aggregates() {
let schema = Schema::new(vec![Field::new("x", DataType::Int64, true)]);
let b1 = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![Arc::new(Int64Array::from(vec![10, 50])) as ArrayRef],
)
.expect("b1");
let b2 = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![Arc::new(Int64Array::from(vec![5, 200])) as ArrayRef],
)
.expect("b2");
let table = ScalarStatsAgg::from_batches(&schema, &[&b1, &b2]);
assert_eq!(i64_at0(&table["x"].min), 5);
assert_eq!(i64_at0(&table["x"].max), 200);
assert_eq!(i64_at0(table["x"].sum.as_ref().expect("sum")), 265);
assert!(ScalarStatsAgg::from_batches(&schema, &[]).is_empty());
}
#[test]
fn scalar_agg_merge_keeps_extremes_and_adds_additive() {
let mut a = agg_i64(vec![10, 50]); let b = agg_i64(vec![5, 30]); a.merge_with(&b).expect("same type merges");
assert_eq!(i64_at0(&a.min), 5);
assert_eq!(i64_at0(&a.max), 50);
assert_eq!(i64_at0(a.sum.as_ref().expect("sum")), 95); assert_eq!(a.null_count, Some(0));
assert!(a.hll.is_some());
}
#[test]
fn scalar_agg_merge_drops_additive_when_one_side_missing() {
let mut a = agg_i64(vec![1, 2]);
let mut b = agg_i64(vec![3, 4]);
b.sum = None;
b.null_count = None;
b.hll = None;
a.merge_with(&b).expect("same type merges");
assert_eq!(i64_at0(&a.min), 1);
assert_eq!(i64_at0(&a.max), 4);
assert!(a.sum.is_none());
assert!(a.null_count.is_none());
assert!(a.hll.is_none());
}
#[test]
fn merge_tables_unions_columns_and_merges_shared() {
let mut t1: HashMap<String, ScalarStatsAgg> = HashMap::new();
t1.insert("a".into(), agg_i64(vec![10, 50]));
let mut t2: HashMap<String, ScalarStatsAgg> = HashMap::new();
t2.insert("a".into(), agg_i64(vec![5, 30]));
t2.insert("b".into(), agg_i64(vec![100, 200]));
ScalarStatsAgg::merge(&mut t1, &t2);
assert_eq!(t1.len(), 2);
assert_eq!(i64_at0(&t1["a"].min), 5);
assert_eq!(i64_at0(&t1["a"].max), 50);
assert_eq!(i64_at0(&t1["b"].min), 100);
assert_eq!(i64_at0(&t1["b"].max), 200);
}
#[test]
fn scalar_agg_from_column_utf8_has_minmax_and_hll_but_no_sum() {
let arr: ArrayRef = Arc::new(StringArray::from(vec!["alpha", "delta", "bravo"]));
let agg = ScalarStatsAgg::from_column(&arr).expect("utf8 is orderable");
assert_eq!(agg.min.len(), 1);
assert_eq!(agg.max.len(), 1);
assert!(agg.sum.is_none(), "utf8 is not summable");
assert!(agg.hll.is_some(), "utf8 supports HLL");
assert_eq!(agg.null_count, Some(0));
}
#[test]
fn scalar_agg_from_column_boolean_has_no_sum_no_hll() {
let arr: ArrayRef = Arc::new(BooleanArray::from(vec![Some(true), None, Some(false)]));
let agg = ScalarStatsAgg::from_column(&arr).expect("bool is orderable");
assert!(agg.sum.is_none(), "bool not summable");
assert!(agg.hll.is_none(), "bool not in the HLL type set");
assert_eq!(agg.null_count, Some(1));
}
#[test]
fn scalar_agg_from_column_unorderable_type_is_none() {
let arr: ArrayRef = Arc::new(Date32Array::from(vec![1, 2, 3]));
assert!(ScalarStatsAgg::from_column(&arr).is_none());
}
#[test]
fn scalar_agg_from_batches_skips_unorderable_column() {
let schema = Schema::new(vec![Field::new("d", DataType::Date32, true)]);
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![Arc::new(Date32Array::from(vec![1, 2])) as ArrayRef],
)
.expect("batch");
let table = ScalarStatsAgg::from_batches(&schema, &[&batch]);
assert!(table.is_empty(), "unorderable column yields no entry");
}
#[test]
fn scalar_agg_from_batches_skips_column_on_concat_type_mismatch() {
let schema = Schema::new(vec![Field::new("x", DataType::Int64, true)]);
let b1 = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, true)])),
vec![Arc::new(Int64Array::from(vec![1])) as ArrayRef],
)
.expect("b1");
let b2 = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("x", DataType::Utf8, true)])),
vec![Arc::new(StringArray::from(vec!["a"])) as ArrayRef],
)
.expect("b2");
let table = ScalarStatsAgg::from_batches(&schema, &[&b1, &b2]);
assert!(table.is_empty(), "concat type mismatch skips the column");
}
#[test]
fn scalar_agg_from_batches_skips_column_missing_from_a_batch() {
let schema = Schema::new(vec![
Field::new("x", DataType::Int64, true),
Field::new("y", DataType::Int64, true),
]);
let b1 = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(Int64Array::from(vec![1, 2])) as ArrayRef,
Arc::new(Int64Array::from(vec![3, 4])) as ArrayRef,
],
)
.expect("b1");
let b2 = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, true)])),
vec![Arc::new(Int64Array::from(vec![5])) as ArrayRef],
)
.expect("b2");
let table = ScalarStatsAgg::from_batches(&schema, &[&b1, &b2]);
assert!(table.contains_key("x"));
assert!(
!table.contains_key("y"),
"a column missing from a batch is skipped, not panicked"
);
}
#[test]
fn scalar_agg_merge_type_mismatch_errors_and_leaves_self_unchanged() {
let mut a = agg_i64(vec![10, 50]); let sum_present_before = a.sum.is_some();
let b = {
let arr: ArrayRef = Arc::new(StringArray::from(vec!["m", "z"]));
ScalarStatsAgg::from_column(&arr).expect("utf8")
};
assert!(a.merge_with(&b).is_err(), "incompatible types must error");
assert_eq!(i64_at0(&a.min), 10);
assert_eq!(i64_at0(&a.max), 50);
assert_eq!(
a.sum.is_some(),
sum_present_before,
"additive stats untouched on error"
);
}
#[test]
fn scalar_agg_merge_sum_overflow_drops_sum() {
let mut a = agg_i64(vec![i64::MAX]); let b = agg_i64(vec![1]); a.merge_with(&b).expect("same type merges");
assert!(a.sum.is_none(), "i64 sum overflow → None");
assert_eq!(i64_at0(&a.min), 1);
assert_eq!(i64_at0(&a.max), i64::MAX);
}
#[test]
fn scalar_agg_merge_invalid_hll_bytes_drops_hll() {
let mut a = agg_i64(vec![1, 2]); let mut b = agg_i64(vec![3, 4]);
b.hll = Some(vec![1, 2, 3]); a.merge_with(&b).expect("same type merges");
assert!(a.hll.is_none(), "unparseable HLL bytes → None");
}
#[test]
fn scalar_agg_merge_null_count_overflow_drops() {
let mut a = agg_i64(vec![1]);
a.null_count = Some(u64::MAX);
let mut b = agg_i64(vec![2]);
b.null_count = Some(1);
a.merge_with(&b).expect("same type merges");
assert!(a.null_count.is_none(), "null_count overflow → None");
}
#[test]
fn merge_tables_keeps_columns_only_in_self() {
let mut t1: HashMap<String, ScalarStatsAgg> = HashMap::new();
t1.insert("a".into(), agg_i64(vec![1, 5]));
t1.insert("c".into(), agg_i64(vec![7, 9]));
let mut t2: HashMap<String, ScalarStatsAgg> = HashMap::new();
t2.insert("a".into(), agg_i64(vec![0, 3]));
ScalarStatsAgg::merge(&mut t1, &t2);
assert_eq!(i64_at0(&t1["c"].min), 7);
assert_eq!(i64_at0(&t1["c"].max), 9);
assert_eq!(i64_at0(&t1["a"].min), 0);
assert_eq!(i64_at0(&t1["a"].max), 5);
}
#[test]
fn merge_tables_drops_shared_column_on_type_mismatch() {
let mut t1: HashMap<String, ScalarStatsAgg> = HashMap::new();
t1.insert("x".into(), agg_i64(vec![1, 10]));
let mut t2: HashMap<String, ScalarStatsAgg> = HashMap::new();
let utf8: ArrayRef = Arc::new(StringArray::from(vec!["a", "z"]));
t2.insert(
"x".into(),
ScalarStatsAgg::from_column(&utf8).expect("utf8"),
);
ScalarStatsAgg::merge(&mut t1, &t2);
assert!(
!t1.contains_key("x"),
"type-mismatched column is dropped, not kept with stale bounds"
);
}
#[test]
fn scalar_agg_partial_eq_detects_each_field() {
let base = agg_i64(vec![1, 10]);
assert_eq!(base, base.clone());
let mut d = base.clone();
d.min = Arc::new(Int64Array::from(vec![0])) as ArrayRef;
assert_ne!(base, d, "min differs");
let mut d = base.clone();
d.max = Arc::new(Int64Array::from(vec![999])) as ArrayRef;
assert_ne!(base, d, "max differs");
let mut d = base.clone();
d.null_count = Some(42);
assert_ne!(base, d, "null_count differs");
let mut d = base.clone();
d.sum = None;
assert_ne!(base, d, "sum Some vs None");
let mut d = base.clone();
d.sum = Some(Arc::new(Int64Array::from(vec![123])) as ArrayRef);
assert_ne!(base, d, "sum Some vs different Some");
let mut d = base.clone();
d.hll = Some(vec![9, 9, 9, 9]);
assert_ne!(base, d, "hll differs");
let mut a = base.clone();
a.sum = None;
let mut b = base.clone();
b.sum = None;
assert_eq!(a, b, "both sum None → equal");
}
#[test]
fn encode_rejects_non_single_row_scalar_agg() {
let mut list = empty_list();
let mut entry = rich_entry(1);
let bad: ArrayRef = Arc::new(Int64Array::from(vec![1, 2]));
entry
.scalar_stats_agg
.get_mut("ts")
.expect("ts present")
.min = bad;
list.parts = vec![entry];
let err = encode(&list).expect_err("non-length-1 min must fail encode");
assert!(
matches!(
err,
ListEncodeError::ScalarStats {
field: "scalar_stats_agg.min",
..
}
),
"got {err:?}"
);
}
#[test]
fn decode_surfaces_scalar_stats_error_on_corrupt_min() {
let list = rich_list(1);
let bytes = encode(&list).expect("encode");
let mut v: serde_json::Value = serde_json::from_slice(&bytes).expect("json");
let cols: Vec<String> = v["parts"][0]["scalar_stats_agg"]
.as_object()
.expect("scalar_stats_agg object")
.keys()
.cloned()
.collect();
let key = cols.first().expect("at least one scalar column");
let garbage_b64 = BASE64.encode(b"not arrow ipc");
v["parts"][0]["scalar_stats_agg"][key.as_str()]["min"] =
serde_json::Value::String(garbage_b64);
let tampered = serde_json::to_vec(&v).expect("reserialize");
let err = decode(&tampered).expect_err("corrupt min must fail decode");
assert!(
matches!(
err,
ListParseError::ScalarStats {
field: "scalar_stats_agg.min",
..
}
),
"got {err:?}"
);
}
fn empty_list() -> Manifest {
Manifest {
format_version: FORMAT_VERSION.into(),
manifest_id: 0,
options_hash: ContentHash([0u8; 32]),
schema: Vec::new(),
id_column: "doc_id".into(),
fts_columns: vec![],
vector_columns: vec![],
partition_strategy: PartitionStrategy::Hash {
column: "doc_id".into(),
n_buckets: 64,
},
parts: vec![],
}
}
fn rich_entry(seed: u8) -> ManifestPartEntry {
let mut scalar = HashMap::new();
for col in ["ts", "amount", "_id"] {
scalar.insert(
col.to_string(),
ScalarStatsAgg {
min: Arc::new(Int64Array::from(vec![i64::from(seed)])) as ArrayRef,
max: Arc::new(Int64Array::from(vec![i64::from(seed) + 1_000])) as ArrayRef,
null_count: Some(u64::from(seed)),
sum: Some(Arc::new(Int64Array::from(vec![i64::from(seed) * 7])) as ArrayRef),
hll: Some(vec![seed; 4]),
},
);
}
let mut fts = BTreeMap::new();
let mut title_bloom = BloomBuilder::with_n_blocks(16);
title_bloom.insert(format!("title_{seed}").as_bytes());
fts.insert(
"title".into(),
FtsSummaryAgg {
term_bloom: Some(title_bloom.finish()),
n_terms_distinct: 1_048_576,
term_range: Some((b"alpha".to_vec(), b"zulu".to_vec())),
},
);
fts.insert(
"body".into(),
FtsSummaryAgg {
term_bloom: None,
n_terms_distinct: 0,
term_range: None,
},
);
let mut vec_agg = BTreeMap::new();
vec_agg.insert(
"emb".into(),
VectorSummaryAgg {
centroid_envelope: 0.5_f32.to_le_bytes().repeat(8),
n_vectors: 3,
envelope_radius: 0.71_f32,
},
);
ManifestPartEntry {
part_id: PartId(Uuid::from_bytes([seed; 16])),
uri: format!("manifests/part-{seed:02x}.avro.zst"),
n_superfiles: 9_847,
size_bytes_compressed: 10_485_760,
size_bytes_uncompressed: 26_214_400,
content_hash: ContentHash([seed; 32]),
partition_key: vec![seed, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
id_range: (0, 245_678_901),
scalar_stats_agg: scalar,
fts_summary_agg: fts,
vector_summary_agg: vec_agg,
}
}
fn rich_list(n_parts: u8) -> Manifest {
let mut list = empty_list();
list.manifest_id = 42;
list.options_hash = ContentHash([0xab; 32]);
list.schema = vec![0x01, 0x02, 0x03, 0xff, 0xfe];
list.fts_columns = vec![
FtsColumnInfo {
column: "title".into(),
},
FtsColumnInfo {
column: "body".into(),
},
];
list.vector_columns = vec![VectorColumnInfo {
column: "emb".into(),
dim: 384,
n_cent: 64,
rot_seed: 7,
metric: "cosine".into(),
}];
list.partition_strategy = PartitionStrategy::TimeRange {
column: "ts".into(),
granularity_secs: 86_400,
};
list.parts = (0..n_parts).map(rich_entry).collect();
list
}
fn assert_entries_equal(a: &ManifestPartEntry, b: &ManifestPartEntry) {
assert_eq!(a.part_id, b.part_id, "part_id");
assert_eq!(a.uri, b.uri, "uri");
assert_eq!(a.n_superfiles, b.n_superfiles, "n_superfiles");
assert_eq!(
a.size_bytes_compressed, b.size_bytes_compressed,
"size_bytes_compressed"
);
assert_eq!(
a.size_bytes_uncompressed, b.size_bytes_uncompressed,
"size_bytes_uncompressed"
);
assert_eq!(a.content_hash, b.content_hash, "content_hash");
assert_eq!(a.partition_key, b.partition_key, "partition_key");
assert_eq!(a.id_range, b.id_range, "id_range");
assert_eq!(a.scalar_stats_agg, b.scalar_stats_agg, "scalar_stats_agg");
assert_eq!(a.fts_summary_agg, b.fts_summary_agg, "fts_summary_agg");
assert_eq!(
a.vector_summary_agg.len(),
b.vector_summary_agg.len(),
"vector_summary_agg count"
);
for (k, av) in &a.vector_summary_agg {
let bv = b
.vector_summary_agg
.get(k)
.unwrap_or_else(|| panic!("missing vec {k}"));
assert_eq!(av.centroid_envelope, bv.centroid_envelope, "vec {k} env");
assert_eq!(
av.envelope_radius.to_bits(),
bv.envelope_radius.to_bits(),
"vec {k} radius bits"
);
}
}
fn assert_lists_equal(a: &Manifest, b: &Manifest) {
assert_eq!(a.format_version, b.format_version);
assert_eq!(a.manifest_id, b.manifest_id);
assert_eq!(a.options_hash, b.options_hash);
assert_eq!(a.schema, b.schema);
assert_eq!(a.id_column, b.id_column);
assert_eq!(a.fts_columns, b.fts_columns);
assert_eq!(a.vector_columns, b.vector_columns);
assert_eq!(a.partition_strategy, b.partition_strategy);
assert_eq!(a.parts.len(), b.parts.len());
for (a_e, b_e) in a.parts.iter().zip(b.parts.iter()) {
assert_entries_equal(a_e, b_e);
}
}
#[test]
fn empty_list_roundtrip() {
let list = empty_list();
let bytes = encode(&list).expect("encode");
let decoded = decode(&bytes).expect("decode");
assert_lists_equal(&decoded, &list);
}
#[test]
fn rich_list_roundtrip_multiple_parts() {
let list = rich_list(5);
let bytes = encode(&list).expect("encode");
let decoded = decode(&bytes).expect("decode");
assert_lists_equal(&decoded, &list);
}
#[test]
fn partition_strategy_time_range_roundtrip() {
let mut list = empty_list();
list.partition_strategy = PartitionStrategy::TimeRange {
column: "event_ts".into(),
granularity_secs: 3600,
};
let bytes = encode(&list).expect("encode");
let decoded = decode(&bytes).expect("decode");
assert_eq!(decoded.partition_strategy, list.partition_strategy);
}
#[test]
fn partition_strategy_hash_roundtrip() {
let mut list = empty_list();
list.partition_strategy = PartitionStrategy::Hash {
column: "user_id".into(),
n_buckets: 1024,
};
let bytes = encode(&list).expect("encode");
let decoded = decode(&bytes).expect("decode");
assert_eq!(decoded.partition_strategy, list.partition_strategy);
}
#[test]
fn partition_strategy_column_range_roundtrip() {
let mut list = empty_list();
list.partition_strategy = PartitionStrategy::ColumnRange {
column: "category".into(),
boundaries: vec![
vec![0x01, 0x02, 0x03],
vec![0xff, 0xfe, 0xfd, 0xfc],
vec![0x00, 0x80, 0xff],
],
};
let bytes = encode(&list).expect("encode");
let decoded = decode(&bytes).expect("decode");
assert_eq!(decoded.partition_strategy, list.partition_strategy);
}
#[test]
fn term_range_union_none_is_absent_from_json() {
let list = rich_list(1);
let bytes = encode(&list).expect("encode");
let s = from_utf8(&bytes).expect("utf8");
let body_fts = serde_json::from_slice::<serde_json::Value>(&bytes).expect("json");
let fts_agg = &body_fts["parts"][0]["fts_summary_agg"]["body"];
assert!(
fts_agg.get("term_range_union").is_none(),
"term_range_union must be absent in json when None; got body fts_agg = {body_fts:#}"
);
let title_agg = &body_fts["parts"][0]["fts_summary_agg"]["title"];
assert!(title_agg.get("term_range_union").is_some());
assert!(s.contains("\"term_bloom_union\""));
let _ = decode(&bytes).expect("decode still works");
}
fn fts_agg(terms: &[&[u8]], n_blocks: usize, range: Option<(&[u8], &[u8])>) -> FtsSummaryAgg {
let mut b = BloomBuilder::with_n_blocks(n_blocks);
for t in terms {
b.insert(t);
}
FtsSummaryAgg {
term_bloom: Some(b.finish()),
n_terms_distinct: terms.len() as u64,
term_range: range.map(|(mn, mx)| (mn.to_vec(), mx.to_vec())),
}
}
#[test]
fn fts_agg_merge_unions_blooms_widens_range_and_takes_max_distinct() {
let mut a = fts_agg(&[b"alpha"], 16, Some((b"alpha", b"mango")));
a.n_terms_distinct = 3;
let b = fts_agg(&[b"omega"], 16, Some((b"beta", b"zulu")));
a.merge_with(&b);
let bloom = a.term_bloom.as_ref().expect("union bloom");
assert!(
bloom.contains(b"alpha"),
"term from self survives the union"
);
assert!(bloom.contains(b"omega"), "term from other joins the union");
assert_eq!(a.term_range, Some((b"alpha".to_vec(), b"zulu".to_vec())));
assert_eq!(a.n_terms_distinct, 3, "distinct hint takes the larger side");
}
#[test]
fn fts_agg_merge_none_side_contributes_nothing() {
let mut a = fts_agg(&[b"x"], 16, Some((b"a", b"m")));
a.merge_with(&FtsSummaryAgg::default());
assert!(a.term_bloom.as_ref().expect("kept").contains(b"x"));
assert_eq!(a.term_range, Some((b"a".to_vec(), b"m".to_vec())));
let mut none_side = FtsSummaryAgg::default();
none_side.merge_with(&fts_agg(&[b"y"], 16, Some((b"n", b"z"))));
assert!(none_side.term_bloom.as_ref().expect("taken").contains(b"y"));
assert_eq!(none_side.term_range, Some((b"n".to_vec(), b"z".to_vec())));
}
#[test]
fn fts_agg_merge_bloom_shape_mismatch_drops_to_none() {
let mut a = fts_agg(&[b"a"], 16, None);
let b = fts_agg(&[b"b"], 8, None);
a.merge_with(&b);
assert!(
a.term_bloom.is_none(),
"shape mismatch → no bloom info (always-keep)"
);
}
#[test]
fn fts_agg_from_superfile_adapts_per_superfile_shape() {
let mut b = BloomBuilder::with_n_blocks(16);
b.insert(b"alpha");
let agg = FtsSummaryAgg::new_with_params(b.finish(), 7, (b"a".to_vec(), b"z".to_vec()));
assert!(
agg.term_bloom
.as_ref()
.expect("bloom present")
.contains(b"alpha")
);
assert_eq!(agg.n_terms_distinct, 7u64); assert_eq!(agg.term_range, Some((b"a".to_vec(), b"z".to_vec())));
let empty = FtsSummaryAgg::new_with_params(
BloomBuilder::with_n_blocks(16).finish(),
0,
(Vec::new(), Vec::new()),
);
assert_eq!(empty.term_range, None);
assert!(empty.term_bloom.is_some());
}
#[test]
fn fts_agg_may_contain() {
let mut b = BloomBuilder::with_n_blocks(16);
b.insert(b"present");
let agg = FtsSummaryAgg {
term_bloom: Some(b.finish()),
n_terms_distinct: 1,
term_range: None,
};
assert!(agg.may_contain(b"present"));
assert!(!agg.may_contain(b"definitely-absent-term"));
assert!(FtsSummaryAgg::default().may_contain(b"anything"));
}
#[test]
fn fts_agg_may_match_prefix() {
let agg = FtsSummaryAgg {
term_bloom: None,
n_terms_distinct: 0,
term_range: Some((b"bravo".to_vec(), b"mango".to_vec())),
};
assert!(
agg.may_match_prefix(b"echo"),
"prefix inside [bravo, mango]"
);
assert!(!agg.may_match_prefix(b"zulu"), "above max → no overlap");
assert!(!agg.may_match_prefix(b"alpha"), "below min → no overlap");
assert!(!FtsSummaryAgg::default().may_match_prefix(b"echo"));
}
#[test]
fn same_logical_content_produces_byte_equal_json() {
let list_a = rich_list(3);
let list_b = rich_list(3);
let bytes_a = encode(&list_a).expect("encode a");
let bytes_b = encode(&list_b).expect("encode b");
assert_eq!(bytes_a, bytes_b, "byte-equal JSON for byte-equal input");
}
#[test]
fn incompatible_major_version_rejected() {
let mut list = empty_list();
list.format_version = "2.0".into();
let bytes = encode(&list).expect("encode");
let err = decode(&bytes).expect_err("major 2 must reject");
assert!(
matches!(err, ListParseError::IncompatibleMajorVersion { .. }),
"expected IncompatibleMajorVersion, got {err:?}"
);
}
#[test]
fn minor_version_compatible() {
let mut list = empty_list();
list.format_version = "1.99".into();
let bytes = encode(&list).expect("encode");
let decoded = decode(&bytes).expect("minor 99 must accept");
assert_eq!(decoded.format_version, "1.99");
}
#[test]
fn part_reuse_across_versions() {
let entry = rich_entry(99);
let mut list_v42 = empty_list();
list_v42.manifest_id = 42;
list_v42.parts = vec![entry.clone()];
let mut list_v43 = empty_list();
list_v43.manifest_id = 43;
list_v43.parts = vec![entry.clone()];
let b_v42 = encode(&list_v42).expect("encode v42");
let b_v43 = encode(&list_v43).expect("encode v43");
let d_v42 = decode(&b_v42).expect("decode v42");
let d_v43 = decode(&b_v43).expect("decode v43");
assert_eq!(d_v42.parts.len(), 1);
assert_eq!(d_v43.parts.len(), 1);
assert_entries_equal(&d_v42.parts[0], &d_v43.parts[0]);
assert_ne!(d_v42.manifest_id, d_v43.manifest_id);
}
#[test]
fn json_top_level_keys_are_jq_friendly() {
let list = rich_list(1);
let bytes = encode(&list).expect("encode");
let v: serde_json::Value = serde_json::from_slice(&bytes).expect("json");
let obj = v.as_object().expect("object");
let expected = [
"format_version",
"manifest_id",
"options_hash",
"schema",
"id_column",
"fts_columns",
"vector_columns",
"partition_strategy",
"parts",
];
for key in expected {
assert!(obj.contains_key(key), "missing top-level key {key}");
}
assert!(
obj["options_hash"]
.as_str()
.unwrap_or("")
.starts_with("blake3:"),
"options_hash should be 'blake3:<hex>' for jq-debuggability"
);
}
#[test]
fn binary_safe_schema_roundtrip() {
let mut list = empty_list();
list.schema = (0u8..=255).collect();
let bytes = encode(&list).expect("encode");
let decoded = decode(&bytes).expect("decode");
assert_eq!(decoded.schema, list.schema);
}
#[test]
fn malformed_base64_surfaces_typed_error() {
let list = rich_list(1);
let bytes = encode(&list).expect("encode");
let s = from_utf8(&bytes).expect("utf8");
let tampered = s.replacen("\"schema\": \"", "\"schema\": \"!!!!", 1);
let err = decode(tampered.as_bytes()).expect_err("must fail");
assert!(
matches!(err, ListParseError::Base64 { .. }),
"expected Base64 error, got {err:?}"
);
}
#[test]
fn malformed_term_bloom_surfaces_typed_error() {
let list = rich_list(1);
let bytes = encode(&list).expect("encode");
let s = from_utf8(&bytes).expect("utf8");
let tampered = s.replacen(
"\"term_bloom_union\": \"\"",
"\"term_bloom_union\": \"YWJj\"",
1,
);
assert_ne!(
tampered, s,
"test fixture must contain an empty bloom union"
);
let err = decode(tampered.as_bytes()).expect_err("malformed bloom");
assert!(
matches!(err, ListParseError::InvalidBloom(3)),
"expected InvalidBloom(3), got {err:?}"
);
}
#[test]
fn options_hash_without_blake3_prefix_rejected() {
let list = rich_list(1);
let bytes = encode(&list).expect("encode");
let s = from_utf8(&bytes).expect("utf8");
let tampered = s.replacen("\"blake3:", "\"nothex:", 1);
let err = decode(tampered.as_bytes()).expect_err("missing prefix");
assert!(
matches!(err, ListParseError::BadContentHash(_)),
"expected BadContentHash, got {err:?}"
);
}
#[test]
fn content_hash_wrong_hex_length_rejected() {
let list = rich_list(1);
let bytes = encode(&list).expect("encode");
let s = from_utf8(&bytes).expect("utf8");
let full = "0".repeat(BLAKE3_HEX_LEN);
let tampered = s.replacen(&format!("blake3:{full}"), "blake3:00", 1);
assert_ne!(tampered, s, "tamper must change the bytes");
let err = decode(tampered.as_bytes()).expect_err("short hash");
assert!(
matches!(err, ListParseError::BadContentHash(_)),
"expected BadContentHash, got {err:?}"
);
}
#[test]
fn non_numeric_id_range_rejected() {
let list = rich_list(1);
let bytes = encode(&list).expect("encode");
let mut v: serde_json::Value = serde_json::from_slice(&bytes).expect("json");
v["parts"][0]["id_range"][0] = serde_json::Value::String("not-an-int".into());
let tampered = serde_json::to_vec(&v).expect("reencode");
let err = decode(&tampered).expect_err("bad id_range");
assert!(
matches!(err, ListParseError::BadFieldValue("id_range[0]", _)),
"expected BadFieldValue, got {err:?}"
);
}
#[test]
fn non_numeric_id_range_upper_bound_rejected() {
let list = rich_list(1);
let bytes = encode(&list).expect("encode");
let mut v: serde_json::Value = serde_json::from_slice(&bytes).expect("json");
v["parts"][0]["id_range"][1] = serde_json::Value::String("xyz".into());
let tampered = serde_json::to_vec(&v).expect("reencode");
let err = decode(&tampered).expect_err("bad id_range upper");
assert!(
matches!(err, ListParseError::BadFieldValue("id_range[1]", _)),
"expected BadFieldValue, got {err:?}"
);
}
#[test]
fn fts_merge_empty_into_and_empty_other() {
let mut into = BTreeMap::new();
let other = BTreeMap::new();
FtsSummaryAgg::merge(&mut into, &other);
assert!(into.is_empty());
}
#[test]
fn fts_merge_empty_into_adopts_other() {
let mut into = BTreeMap::new();
let mut other = BTreeMap::new();
let mut b = super::super::bloom::BloomBuilder::with_n_blocks(16);
b.insert(b"test");
let bloom = b.finish();
let summary = FtsSummaryAgg {
term_bloom: Some(bloom),
n_terms_distinct: 42,
term_range: Some((b"apple".to_vec(), b"zebra".to_vec())),
};
other.insert("col1".to_string(), summary.clone());
FtsSummaryAgg::merge(&mut into, &other);
assert_eq!(into.len(), 1);
assert_eq!(into["col1"], summary);
}
#[test]
fn fts_merge_preserves_columns_only_in_into() {
let mut into = BTreeMap::new();
let other = BTreeMap::new();
let mut b = super::super::bloom::BloomBuilder::with_n_blocks(16);
b.insert(b"test");
let bloom = b.finish();
let summary = FtsSummaryAgg {
term_bloom: Some(bloom),
n_terms_distinct: 10,
term_range: Some((b"a".to_vec(), b"z".to_vec())),
};
into.insert("only_in_into".to_string(), summary.clone());
FtsSummaryAgg::merge(&mut into, &other);
assert_eq!(into.len(), 1);
assert_eq!(into["only_in_into"], summary);
}
#[test]
fn fts_merge_tables_merges_shared_columns() {
let mut into = BTreeMap::new();
let mut other = BTreeMap::new();
let mut b1 = super::super::bloom::BloomBuilder::with_n_blocks(16);
b1.insert(b"test1");
let bloom1 = b1.finish();
let mut b2 = super::super::bloom::BloomBuilder::with_n_blocks(16);
b2.insert(b"test2");
let bloom2 = b2.finish();
let summary1 = FtsSummaryAgg {
term_bloom: Some(bloom1),
n_terms_distinct: 10,
term_range: Some((b"apple".to_vec(), b"mango".to_vec())),
};
let summary2 = FtsSummaryAgg {
term_bloom: Some(bloom2),
n_terms_distinct: 15,
term_range: Some((b"banana".to_vec(), b"zebra".to_vec())),
};
into.insert("shared".to_string(), summary1);
other.insert("shared".to_string(), summary2);
FtsSummaryAgg::merge(&mut into, &other);
assert_eq!(into.len(), 1);
let merged = &into["shared"];
assert_eq!(merged.n_terms_distinct, 15);
assert_eq!(
merged.term_range.as_ref().expect("should be present").0,
b"apple"
); assert_eq!(
merged.term_range.as_ref().expect("should be present").1,
b"zebra"
); }
#[test]
fn fts_merge_drops_column_on_bloom_shape_mismatch() {
let mut into = BTreeMap::new();
let mut other = BTreeMap::new();
let mut b1 = super::super::bloom::BloomBuilder::with_n_blocks(16);
b1.insert(b"test1");
let bloom1 = b1.finish();
let mut b2 = super::super::bloom::BloomBuilder::with_n_blocks(8);
b2.insert(b"test2");
let bloom2 = b2.finish();
let summary1 = FtsSummaryAgg {
term_bloom: Some(bloom1),
n_terms_distinct: 10,
term_range: Some((b"a".to_vec(), b"z".to_vec())),
};
let summary2 = FtsSummaryAgg {
term_bloom: Some(bloom2),
n_terms_distinct: 15,
term_range: Some((b"a".to_vec(), b"z".to_vec())),
};
into.insert("col".to_string(), summary1);
other.insert("col".to_string(), summary2);
FtsSummaryAgg::merge(&mut into, &other);
assert!(into.is_empty());
}
#[test]
fn fts_merge_union_of_columns() {
let mut into = BTreeMap::new();
let mut other = BTreeMap::new();
let mut b = super::super::bloom::BloomBuilder::with_n_blocks(16);
b.insert(b"test");
let bloom = b.finish();
into.insert(
"col1".to_string(),
FtsSummaryAgg {
term_bloom: Some(bloom.clone()),
n_terms_distinct: 10,
term_range: Some((b"a".to_vec(), b"z".to_vec())),
},
);
other.insert(
"col2".to_string(),
FtsSummaryAgg {
term_bloom: Some(bloom),
n_terms_distinct: 20,
term_range: Some((b"a".to_vec(), b"z".to_vec())),
},
);
FtsSummaryAgg::merge(&mut into, &other);
assert_eq!(into.len(), 2);
assert!(into.contains_key("col1"));
assert!(into.contains_key("col2"));
}
#[test]
fn fts_merge_with_none_blooms() {
let mut into = BTreeMap::new();
let mut other = BTreeMap::new();
let summary1 = FtsSummaryAgg {
term_bloom: None,
n_terms_distinct: 0,
term_range: None,
};
let summary2 = FtsSummaryAgg {
term_bloom: None,
n_terms_distinct: 0,
term_range: None,
};
into.insert("col".to_string(), summary1);
other.insert("col".to_string(), summary2);
FtsSummaryAgg::merge(&mut into, &other);
assert!(into.is_empty());
}
#[test]
fn vector_merge_empty_other_is_noop() {
let mut agg = VectorSummaryAgg {
centroid_envelope: encode_le_f32(&[1.0, 2.0, 3.0]),
n_vectors: 5,
envelope_radius: 0.5,
};
let other = VectorSummaryAgg::default();
let before = agg.clone();
agg.merge_with(&other);
assert_eq!(agg, before, "merging empty other should be a no-op");
}
#[test]
fn vector_merge_empty_self_adopts_other() {
let mut agg = VectorSummaryAgg::default();
let other = VectorSummaryAgg {
centroid_envelope: encode_le_f32(&[1.0, 2.0, 3.0]),
n_vectors: 3,
envelope_radius: 0.75,
};
agg.merge_with(&other);
assert_eq!(decode_le_f32(&agg.centroid_envelope), vec![1.0, 2.0, 3.0]);
assert_eq!(agg.n_vectors, 3);
assert_eq!(agg.envelope_radius, 0.75);
}
#[test]
fn vector_merge_poisoned_stays_poisoned() {
let mut agg = VectorSummaryAgg {
centroid_envelope: Vec::new(),
n_vectors: 5,
envelope_radius: 0.0,
};
let other = VectorSummaryAgg {
centroid_envelope: encode_le_f32(&[1.0, 2.0]),
n_vectors: 3,
envelope_radius: 0.5,
};
agg.merge_with(&other);
assert!(agg.centroid_envelope.is_empty());
assert_eq!(agg.n_vectors, 5, "poisoned count should not change");
}
#[test]
fn vector_merge_weighted_mean_centroid() {
let mut agg = VectorSummaryAgg {
centroid_envelope: encode_le_f32(&[0.0, 0.0, 0.0]),
n_vectors: 3,
envelope_radius: 0.0,
};
let other = VectorSummaryAgg {
centroid_envelope: encode_le_f32(&[6.0, 6.0, 6.0]),
n_vectors: 3,
envelope_radius: 0.0,
};
agg.merge_with(&other);
let merged_center = decode_le_f32(&agg.centroid_envelope);
for &c in &merged_center {
assert!((c - 3.0).abs() < 1e-4);
}
assert_eq!(agg.n_vectors, 6);
}
#[test]
fn vector_merge_unequal_weights() {
let mut agg = VectorSummaryAgg {
centroid_envelope: encode_le_f32(&[0.0, 0.0]),
n_vectors: 1,
envelope_radius: 0.0,
};
let other = VectorSummaryAgg {
centroid_envelope: encode_le_f32(&[10.0, 10.0]),
n_vectors: 9,
envelope_radius: 0.0,
};
agg.merge_with(&other);
let merged_center = decode_le_f32(&agg.centroid_envelope);
for &c in &merged_center {
assert!((c - 9.0).abs() < 1e-4);
}
}
#[test]
fn vector_merge_dimension_mismatch_poisons() {
let mut agg = VectorSummaryAgg {
centroid_envelope: encode_le_f32(&[1.0, 2.0, 3.0]),
n_vectors: 2,
envelope_radius: 0.5,
};
let other = VectorSummaryAgg {
centroid_envelope: encode_le_f32(&[1.0, 2.0]),
n_vectors: 3,
envelope_radius: 0.4,
};
agg.merge_with(&other);
assert!(agg.centroid_envelope.is_empty());
assert_eq!(agg.envelope_radius, 0.0);
assert!(agg.n_vectors > 0, "count should not change");
}
#[test]
fn vector_merge_encloses_both_balls() {
let mut agg = VectorSummaryAgg {
centroid_envelope: encode_le_f32(&[0.0, 0.0]),
n_vectors: 1,
envelope_radius: 1.0,
};
let other = VectorSummaryAgg {
centroid_envelope: encode_le_f32(&[10.0, 0.0]),
n_vectors: 1,
envelope_radius: 1.0,
};
agg.merge_with(&other);
let merged_center = decode_le_f32(&agg.centroid_envelope);
assert!((merged_center[0] - 5.0).abs() < 1e-4);
assert!(merged_center[1].abs() < 1e-4);
assert!(agg.envelope_radius >= 6.0 - 1e-4);
}
#[test]
fn vector_merge_radius_conservative_bound() {
let mut agg = VectorSummaryAgg {
centroid_envelope: encode_le_f32(&[5.0, 0.0]),
n_vectors: 2,
envelope_radius: 3.0,
};
let other = VectorSummaryAgg {
centroid_envelope: encode_le_f32(&[5.0, 4.0]),
n_vectors: 2,
envelope_radius: 2.0,
};
agg.merge_with(&other);
let merged_center = decode_le_f32(&agg.centroid_envelope);
let reach1 = l2_distance(&[5.0, 0.0], &merged_center) + 3.0;
let reach2 = l2_distance(&[5.0, 4.0], &merged_center) + 2.0;
assert!(
reach1 <= agg.envelope_radius + 1e-4,
"first ball should be enclosed"
);
assert!(
reach2 <= agg.envelope_radius + 1e-4,
"second ball should be enclosed"
);
}
#[test]
fn vector_merge_updates_n_vectors_count() {
let mut agg = VectorSummaryAgg {
centroid_envelope: encode_le_f32(&[1.0]),
n_vectors: 7,
envelope_radius: 0.1,
};
let other = VectorSummaryAgg {
centroid_envelope: encode_le_f32(&[2.0]),
n_vectors: 5,
envelope_radius: 0.2,
};
agg.merge_with(&other);
assert_eq!(agg.n_vectors, 12);
}
#[test]
fn vector_merge_empty_into_and_empty_other() {
let mut into = BTreeMap::new();
let other = BTreeMap::new();
VectorSummaryAgg::merge(&mut into, &other);
assert!(into.is_empty());
}
#[test]
fn vector_merge_empty_into_adopts_other() {
let mut into = BTreeMap::new();
let mut other = BTreeMap::new();
let summary = VectorSummaryAgg {
centroid_envelope: encode_le_f32(&[1.0, 2.0, 3.0]),
n_vectors: 4,
envelope_radius: 0.5,
};
other.insert("vec_col".to_string(), summary.clone());
VectorSummaryAgg::merge(&mut into, &other);
assert_eq!(into.len(), 1);
assert_eq!(into["vec_col"], summary);
}
#[test]
fn vector_merge_preserves_columns_only_in_into() {
let mut into = BTreeMap::new();
let other = BTreeMap::new();
let summary = VectorSummaryAgg {
centroid_envelope: encode_le_f32(&[1.0, 2.0]),
n_vectors: 2,
envelope_radius: 0.3,
};
into.insert("only_in_into".to_string(), summary.clone());
VectorSummaryAgg::merge(&mut into, &other);
assert_eq!(into.len(), 1);
assert_eq!(into["only_in_into"], summary);
}
#[test]
fn vector_merge_merges_shared_columns() {
let mut into = BTreeMap::new();
let mut other = BTreeMap::new();
let summary1 = VectorSummaryAgg {
centroid_envelope: encode_le_f32(&[0.0, 0.0]),
n_vectors: 2,
envelope_radius: 1.0,
};
let summary2 = VectorSummaryAgg {
centroid_envelope: encode_le_f32(&[10.0, 0.0]),
n_vectors: 2,
envelope_radius: 1.0,
};
into.insert("shared".to_string(), summary1);
other.insert("shared".to_string(), summary2);
VectorSummaryAgg::merge(&mut into, &other);
assert_eq!(into.len(), 1);
assert_eq!(into["shared"].n_vectors, 4);
let merged_center = decode_le_f32(&into["shared"].centroid_envelope);
assert!((merged_center[0] - 5.0).abs() < 1e-4);
}
#[test]
fn vector_merge_poisons_on_dimension_mismatch() {
let mut into = BTreeMap::new();
let mut other = BTreeMap::new();
let summary1 = VectorSummaryAgg {
centroid_envelope: encode_le_f32(&[1.0, 2.0, 3.0]),
n_vectors: 2,
envelope_radius: 0.5,
};
let summary2 = VectorSummaryAgg {
centroid_envelope: encode_le_f32(&[1.0, 2.0]),
n_vectors: 2,
envelope_radius: 0.5,
};
into.insert("col".to_string(), summary1);
other.insert("col".to_string(), summary2);
VectorSummaryAgg::merge(&mut into, &other);
assert_eq!(into.len(), 1);
assert!(into["col"].centroid_envelope.is_empty());
assert_eq!(into["col"].envelope_radius, 0.0);
}
#[test]
fn vector_merge_union_of_columns() {
let mut into = BTreeMap::new();
let mut other = BTreeMap::new();
into.insert(
"vec1".to_string(),
VectorSummaryAgg {
centroid_envelope: encode_le_f32(&[1.0, 2.0]),
n_vectors: 2,
envelope_radius: 0.1,
},
);
other.insert(
"vec2".to_string(),
VectorSummaryAgg {
centroid_envelope: encode_le_f32(&[3.0, 4.0]),
n_vectors: 2,
envelope_radius: 0.2,
},
);
VectorSummaryAgg::merge(&mut into, &other);
assert_eq!(into.len(), 2);
assert!(into.contains_key("vec1"));
assert!(into.contains_key("vec2"));
}
#[test]
fn vector_merge_with_default_other() {
let mut into = BTreeMap::new();
let mut other = BTreeMap::new();
let summary = VectorSummaryAgg {
centroid_envelope: encode_le_f32(&[1.0, 2.0]),
n_vectors: 2,
envelope_radius: 0.5,
};
into.insert("col".to_string(), summary.clone());
let default_other = VectorSummaryAgg::default();
other.insert("col".to_string(), default_other);
VectorSummaryAgg::merge(&mut into, &other);
assert_eq!(into["col"], summary);
}
#[test]
fn vector_merge_tables_complex_scenario() {
let mut into = BTreeMap::new();
let mut other = BTreeMap::new();
into.insert(
"vec1".to_string(),
VectorSummaryAgg {
centroid_envelope: encode_le_f32(&[1.0, 2.0]),
n_vectors: 1,
envelope_radius: 0.1,
},
);
into.insert(
"vec2".to_string(),
VectorSummaryAgg {
centroid_envelope: encode_le_f32(&[3.0, 4.0]),
n_vectors: 1,
envelope_radius: 0.2,
},
);
other.insert(
"vec1".to_string(),
VectorSummaryAgg {
centroid_envelope: encode_le_f32(&[1.0, 2.0]),
n_vectors: 1,
envelope_radius: 0.1,
},
);
other.insert(
"vec3".to_string(),
VectorSummaryAgg {
centroid_envelope: encode_le_f32(&[5.0, 6.0]),
n_vectors: 1,
envelope_radius: 0.3,
},
);
VectorSummaryAgg::merge(&mut into, &other);
assert_eq!(into.len(), 3);
assert_eq!(into["vec1"].n_vectors, 2);
assert_eq!(into["vec2"].n_vectors, 1);
assert_eq!(into["vec3"].n_vectors, 1);
}
}