use std::{error::Error, fmt};
use crate::supertable::{
manifest::{list::PartitionStrategy, part::ContentHash},
options::SupertableOptions,
};
pub fn compute_options_hash(opts: &SupertableOptions, strategy: &PartitionStrategy) -> ContentHash {
let mut buf: Vec<u8> = Vec::with_capacity(256);
push_tag(&mut buf, b"schema");
let fields = opts.schema.fields();
buf.extend_from_slice(&(fields.len() as u64).to_le_bytes());
for f in fields {
push_str(&mut buf, f.name());
let dt_str = format!("{:?}", f.data_type());
push_str(&mut buf, &dt_str);
buf.push(f.is_nullable() as u8);
}
push_tag(&mut buf, b"id_column");
push_str(&mut buf, &opts.id_column);
push_tag(&mut buf, b"fts_columns");
buf.extend_from_slice(&(opts.fts_columns.len() as u64).to_le_bytes());
for c in &opts.fts_columns {
push_str(&mut buf, &c.column);
}
push_tag(&mut buf, b"vector_columns");
buf.extend_from_slice(&(opts.vector_columns.len() as u64).to_le_bytes());
for v in &opts.vector_columns {
push_str(&mut buf, &v.column);
buf.extend_from_slice(&(v.dim as u64).to_le_bytes());
buf.extend_from_slice(&(v.n_cent as u64).to_le_bytes());
buf.extend_from_slice(&v.rot_seed.to_le_bytes());
let metric_str = format!("{:?}", v.metric).to_lowercase();
push_str(&mut buf, &metric_str);
}
push_tag(&mut buf, b"partition_strategy");
match strategy {
PartitionStrategy::TimeRange {
column,
granularity_secs,
} => {
push_tag(&mut buf, b"time_range");
push_str(&mut buf, column);
buf.extend_from_slice(&granularity_secs.to_le_bytes());
}
PartitionStrategy::Hash { column, n_buckets } => {
push_tag(&mut buf, b"hash");
push_str(&mut buf, column);
buf.extend_from_slice(&n_buckets.to_le_bytes());
}
PartitionStrategy::ColumnRange { column, boundaries } => {
push_tag(&mut buf, b"column_range");
push_str(&mut buf, column);
buf.extend_from_slice(&(boundaries.len() as u64).to_le_bytes());
for b in boundaries {
buf.extend_from_slice(&(b.len() as u64).to_le_bytes());
buf.extend_from_slice(b);
}
}
PartitionStrategy::IngestionTime { granularity_secs } => {
push_tag(&mut buf, b"ingestion_time");
buf.extend_from_slice(&granularity_secs.to_le_bytes());
}
}
let h = blake3::hash(&buf);
ContentHash(*h.as_bytes())
}
pub fn verify_options_hash(
expected: ContentHash,
actual: ContentHash,
) -> Result<(), OptionsHashMismatch> {
if actual.0 == [0u8; 32] {
return Ok(());
}
if expected.0 == actual.0 {
return Ok(());
}
Err(OptionsHashMismatch {
expected: expected.to_hex(),
actual: actual.to_hex(),
})
}
#[derive(Debug, Clone)]
pub struct OptionsHashMismatch {
pub expected: String,
pub actual: String,
}
impl fmt::Display for OptionsHashMismatch {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"options_hash mismatch: caller=blake3:{} list=blake3:{}",
self.expected, self.actual
)
}
}
impl Error for OptionsHashMismatch {}
#[inline]
fn push_tag(buf: &mut Vec<u8>, tag: &[u8]) {
buf.push(tag.len() as u8);
buf.extend_from_slice(tag);
}
#[inline]
fn push_str(buf: &mut Vec<u8>, s: &str) {
buf.extend_from_slice(&(s.len() as u64).to_le_bytes());
buf.extend_from_slice(s.as_bytes());
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow_schema::{DataType, Field, Schema};
use super::*;
use crate::{
superfile::{
builder::{FtsConfig, VectorConfig},
vector::{distance::Metric, rerank_codec::RerankCodec},
},
supertable::{
manifest::{list::PartitionStrategy, part::ContentHash},
options::SupertableOptions,
},
test_helpers::default_tokenizer,
};
fn schema_title_only() -> Arc<Schema> {
Arc::new(Schema::new(vec![Field::new(
"title",
DataType::LargeUtf8,
false,
)]))
}
fn schema_title_emb(dim: usize) -> Arc<Schema> {
let list_field = Field::new("item", DataType::Float32, false);
let list_type = DataType::FixedSizeList(Arc::new(list_field), dim as i32);
Arc::new(Schema::new(vec![
Field::new("title", DataType::LargeUtf8, false),
Field::new("emb", list_type, false),
]))
}
fn fts_opts() -> SupertableOptions {
SupertableOptions::new(
schema_title_only(),
vec![FtsConfig {
column: "title".into(),
}],
vec![],
Some(default_tokenizer()),
)
.expect("opts")
}
fn time_range() -> PartitionStrategy {
PartitionStrategy::TimeRange {
column: "_id".into(),
granularity_secs: 86_400,
}
}
#[test]
fn compute_options_hash_deterministic() {
let h1 = compute_options_hash(&fts_opts(), &time_range());
let h2 = compute_options_hash(&fts_opts(), &time_range());
assert_eq!(h1.0, h2.0);
}
#[test]
fn compute_options_hash_changes_with_schema() {
let opts_a = fts_opts();
let opts_b = SupertableOptions::new(
Arc::new(Schema::new(vec![Field::new(
"body",
DataType::LargeUtf8,
false,
)])),
vec![FtsConfig {
column: "body".into(),
}],
vec![],
Some(default_tokenizer()),
)
.expect("opts");
let h_a = compute_options_hash(&opts_a, &time_range());
let h_b = compute_options_hash(&opts_b, &time_range());
assert_ne!(h_a.0, h_b.0);
}
#[test]
fn compute_options_hash_changes_with_nullability() {
let opts_a = fts_opts();
let opts_b = SupertableOptions::new(
Arc::new(Schema::new(vec![Field::new(
"title",
DataType::LargeUtf8,
true, )])),
vec![FtsConfig {
column: "title".into(),
}],
vec![],
Some(default_tokenizer()),
)
.expect("opts");
let h_a = compute_options_hash(&opts_a, &time_range());
let h_b = compute_options_hash(&opts_b, &time_range());
assert_ne!(h_a.0, h_b.0);
}
#[test]
fn compute_options_hash_changes_with_fts_column_set() {
let opts_a = fts_opts();
let schema_two = Arc::new(Schema::new(vec![
Field::new("title", DataType::LargeUtf8, false),
Field::new("subtitle", DataType::LargeUtf8, false),
]));
let opts_b = SupertableOptions::new(
schema_two,
vec![
FtsConfig {
column: "title".into(),
},
FtsConfig {
column: "subtitle".into(),
},
],
vec![],
Some(default_tokenizer()),
)
.expect("opts");
let h_a = compute_options_hash(&opts_a, &time_range());
let h_b = compute_options_hash(&opts_b, &time_range());
assert_ne!(h_a.0, h_b.0);
}
#[test]
fn compute_options_hash_changes_with_fts_column_order() {
let schema_two = Arc::new(Schema::new(vec![
Field::new("title", DataType::LargeUtf8, false),
Field::new("subtitle", DataType::LargeUtf8, false),
]));
let opts_a = SupertableOptions::new(
schema_two.clone(),
vec![
FtsConfig {
column: "title".into(),
},
FtsConfig {
column: "subtitle".into(),
},
],
vec![],
Some(default_tokenizer()),
)
.expect("opts");
let opts_b = SupertableOptions::new(
schema_two,
vec![
FtsConfig {
column: "subtitle".into(),
},
FtsConfig {
column: "title".into(),
},
],
vec![],
Some(default_tokenizer()),
)
.expect("opts");
let h_a = compute_options_hash(&opts_a, &time_range());
let h_b = compute_options_hash(&opts_b, &time_range());
assert_ne!(h_a.0, h_b.0);
}
#[test]
fn compute_options_hash_changes_with_vector_columns() {
let opts_a = fts_opts();
let opts_b = SupertableOptions::new(
schema_title_emb(16),
vec![FtsConfig {
column: "title".into(),
}],
vec![VectorConfig {
column: "emb".into(),
dim: 16,
n_cent: 4,
rot_seed: 0,
metric: Metric::Cosine,
rerank_codec: RerankCodec::default(),
}],
Some(default_tokenizer()),
)
.expect("opts");
let h_a = compute_options_hash(&opts_a, &time_range());
let h_b = compute_options_hash(&opts_b, &time_range());
assert_ne!(h_a.0, h_b.0);
}
#[test]
fn compute_options_hash_changes_with_vector_metric() {
let mk = |metric: Metric| {
SupertableOptions::new(
schema_title_emb(16),
vec![],
vec![VectorConfig {
column: "emb".into(),
dim: 16,
n_cent: 4,
rot_seed: 0,
metric,
rerank_codec: RerankCodec::default(),
}],
Some(default_tokenizer()),
)
.expect("opts")
};
let h_a = compute_options_hash(&mk(Metric::Cosine), &time_range());
let h_b = compute_options_hash(&mk(Metric::NegDot), &time_range());
assert_ne!(h_a.0, h_b.0);
}
#[test]
fn compute_options_hash_distinguishes_partition_strategy_variants() {
let opts = fts_opts();
let h_time = compute_options_hash(
&opts,
&PartitionStrategy::TimeRange {
column: "_id".into(),
granularity_secs: 86_400,
},
);
let h_hash = compute_options_hash(
&opts,
&PartitionStrategy::Hash {
column: "_id".into(),
n_buckets: 16,
},
);
let h_range = compute_options_hash(
&opts,
&PartitionStrategy::ColumnRange {
column: "_id".into(),
boundaries: vec![vec![1, 2, 3], vec![4, 5, 6]],
},
);
assert_ne!(h_time.0, h_hash.0);
assert_ne!(h_hash.0, h_range.0);
assert_ne!(h_time.0, h_range.0);
}
#[test]
fn compute_options_hash_partition_field_changes_propagate() {
let opts = fts_opts();
let h_t1 = compute_options_hash(
&opts,
&PartitionStrategy::TimeRange {
column: "_id".into(),
granularity_secs: 86_400,
},
);
let h_t2 = compute_options_hash(
&opts,
&PartitionStrategy::TimeRange {
column: "_id".into(),
granularity_secs: 3600,
},
);
assert_ne!(h_t1.0, h_t2.0);
let h_h1 = compute_options_hash(
&opts,
&PartitionStrategy::Hash {
column: "_id".into(),
n_buckets: 16,
},
);
let h_h2 = compute_options_hash(
&opts,
&PartitionStrategy::Hash {
column: "_id".into(),
n_buckets: 32,
},
);
assert_ne!(h_h1.0, h_h2.0);
let h_r1 = compute_options_hash(
&opts,
&PartitionStrategy::ColumnRange {
column: "_id".into(),
boundaries: vec![vec![1, 2]],
},
);
let h_r2 = compute_options_hash(
&opts,
&PartitionStrategy::ColumnRange {
column: "_id".into(),
boundaries: vec![vec![1, 2], vec![3, 4]],
},
);
assert_ne!(h_r1.0, h_r2.0);
}
#[test]
fn verify_options_hash_accepts_matching_pair() {
let opts = fts_opts();
let h = compute_options_hash(&opts, &time_range());
verify_options_hash(h, h).expect("matching pair accepted");
}
#[test]
fn verify_options_hash_skips_zero_sentinel() {
let opts = fts_opts();
let computed = compute_options_hash(&opts, &time_range());
let zero = ContentHash([0u8; 32]);
verify_options_hash(computed, zero).expect("zero sentinel bypasses verification");
}
#[test]
fn verify_options_hash_rejects_mismatch_with_hex_payload() {
let h_a = ContentHash([1u8; 32]);
let h_b = ContentHash([2u8; 32]);
let err = verify_options_hash(h_a, h_b).expect_err("mismatch must error");
let rendered = format!("{err}");
assert!(
rendered.contains("options_hash mismatch"),
"got: {rendered}"
);
assert!(rendered.contains("blake3:"), "got: {rendered}");
assert!(rendered.contains(&"01".repeat(32)), "got: {rendered}");
assert!(rendered.contains(&"02".repeat(32)), "got: {rendered}");
}
#[test]
fn options_hash_mismatch_is_error_impl() {
let h_a = ContentHash([3u8; 32]);
let h_b = ContentHash([4u8; 32]);
let err = verify_options_hash(h_a, h_b).expect_err("mismatch");
let dyn_err: Box<dyn Error> = Box::new(err);
assert!(dyn_err.to_string().contains("options_hash mismatch"));
}
#[test]
fn push_helpers_emit_length_prefixed_bytes() {
let mut buf = Vec::new();
push_tag(&mut buf, b"schema");
assert_eq!(buf, vec![6u8, b's', b'c', b'h', b'e', b'm', b'a']);
let mut buf = Vec::new();
push_str(&mut buf, "ok");
assert_eq!(buf, vec![2u8, 0, 0, 0, 0, 0, 0, 0, b'o', b'k']);
}
}