use std::collections::HashMap;
use std::path::{Path, PathBuf};
use crate::core::reactor::Reactor;
use crate::features::storage::vector_contract::{
VectorEncoding, VectorMetric, VectorSpaceDescriptor,
};
pub type ManifestVersion = u32;
pub type ManifestRunId = u64;
pub const CURRENT_SCHEMA_VERSION: ManifestVersion = 5;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BitmapIndexDescriptor {
pub index_name: String,
pub field_path: String,
}
#[derive(Debug)]
pub enum ManifestError {
Io(std::io::Error),
Parse(String),
}
pub type Result<T> = std::result::Result<T, ManifestError>;
impl From<std::io::Error> for ManifestError {
fn from(err: std::io::Error) -> Self {
Self::Io(err)
}
}
#[derive(Clone)]
pub struct Manifest {
pub schema_version: ManifestVersion,
pub high_water_mark: u64,
run_id_counters: HashMap<u32, ManifestRunId>,
bitmap_indexes: HashMap<String, String>,
vector_spaces: HashMap<u32, VectorSpaceDescriptor>,
retired_spaces: std::collections::HashSet<u32>,
legacy_vector_raw_f32_compat: bool,
path: PathBuf,
reactor: std::sync::Arc<dyn Reactor + Send + Sync>,
}
impl Manifest {
pub fn load_or_create(path: PathBuf) -> Result<Self> {
Self::load_or_create_with_reactor(
path,
std::sync::Arc::new(crate::core::reactor::SystemReactor),
)
}
pub fn load_or_create_with_reactor(
path: PathBuf,
reactor: std::sync::Arc<dyn Reactor + Send + Sync>,
) -> Result<Self> {
if reactor.metadata_len(&path).is_ok() {
Self::load_with_reactor(&path, reactor)
} else {
Ok(Self {
schema_version: CURRENT_SCHEMA_VERSION,
high_water_mark: 0,
run_id_counters: HashMap::new(),
bitmap_indexes: HashMap::new(),
vector_spaces: HashMap::new(),
retired_spaces: std::collections::HashSet::new(),
legacy_vector_raw_f32_compat: true,
path,
reactor,
})
}
}
pub fn load(path: &Path) -> Result<Self> {
Self::load_with_reactor(
path,
std::sync::Arc::new(crate::core::reactor::SystemReactor),
)
}
pub fn load_with_reactor(
path: &Path,
reactor: std::sync::Arc<dyn Reactor + Send + Sync>,
) -> Result<Self> {
let contents = String::from_utf8(reactor.read_file(path)?)
.map_err(|_| ManifestError::Parse("invalid utf8".to_string()))?;
let mut schema_version = None;
let mut high_water_mark = None;
let mut run_id_counters = HashMap::new();
let mut bitmap_indexes = HashMap::new();
let mut vector_spaces = HashMap::new();
let mut retired_spaces = std::collections::HashSet::new();
let mut legacy_vector_raw_f32_compat = true;
for line in contents.lines() {
let line = line.trim();
if line.is_empty() {
continue;
}
if let Some(value) = line.strip_prefix("schema_version=") {
schema_version = Some(
value
.parse::<u32>()
.map_err(|_| ManifestError::Parse("invalid schema_version".to_string()))?,
);
continue;
}
if let Some(value) = line.strip_prefix("high_water_mark=") {
high_water_mark =
Some(value.parse::<u64>().map_err(|_| {
ManifestError::Parse("invalid high_water_mark".to_string())
})?);
continue;
}
if let Some(value) = line.strip_prefix("run_id.") {
let parts: Vec<&str> = value.split('=').collect();
if parts.len() != 2 {
return Err(ManifestError::Parse("invalid run_id format".to_string()));
}
let level = parts[0]
.parse::<u32>()
.map_err(|_| ManifestError::Parse("invalid run_id level".to_string()))?;
let counter = parts[1]
.parse::<u64>()
.map_err(|_| ManifestError::Parse("invalid run_id counter".to_string()))?;
run_id_counters.insert(level, counter);
continue;
}
if let Some(value) = line.strip_prefix("bitmap_index.") {
let parts: Vec<&str> = value.split('=').collect();
if parts.len() != 2 {
return Err(ManifestError::Parse(
"invalid bitmap_index format".to_string(),
));
}
let index_name = parts[0].trim();
let field_path = parts[1].trim();
if index_name.is_empty() || field_path.is_empty() {
return Err(ManifestError::Parse(
"bitmap_index requires non-empty name and field".to_string(),
));
}
bitmap_indexes.insert(index_name.to_string(), field_path.to_string());
continue;
}
if let Some(value) = line.strip_prefix("vector_space.") {
let parts: Vec<&str> = value.splitn(2, '=').collect();
if parts.len() != 2 {
return Err(ManifestError::Parse(
"invalid vector_space format".to_string(),
));
}
let space_id = parts[0]
.trim()
.parse::<u32>()
.map_err(|_| ManifestError::Parse("invalid vector_space id".to_string()))?;
let fields = parts[1].split(',').map(str::trim).collect::<Vec<_>>();
if fields.len() != 4 && fields.len() != 6 {
return Err(ManifestError::Parse(
"vector_space requires 4 fields (v4) or 6 fields (v5)".to_string(),
));
}
let metric = parse_metric(fields[0])?;
let dimension = fields[1].parse::<u16>().map_err(|_| {
ManifestError::Parse("invalid vector_space dimension".to_string())
})?;
let encoding = parse_encoding(fields[2])?;
let normalized = parse_bool_flag(fields[3])?;
let model_id = if fields.len() == 6 && !fields[4].is_empty() {
Some(fields[4].to_string())
} else {
None
};
let model_version = if fields.len() == 6 && !fields[5].is_empty() {
Some(fields[5].to_string())
} else {
None
};
vector_spaces.insert(
space_id,
VectorSpaceDescriptor {
space_id,
dimension,
metric,
encoding,
normalized,
model_id,
model_version,
},
);
continue;
}
if let Some(value) = line.strip_prefix("retired_space.") {
let space_id = value
.trim()
.parse::<u32>()
.map_err(|_| ManifestError::Parse("invalid retired_space id".to_string()))?;
retired_spaces.insert(space_id);
continue;
}
if let Some(value) = line.strip_prefix("legacy_vector_raw_f32_compat=") {
legacy_vector_raw_f32_compat = parse_bool_flag(value.trim())?;
}
}
let schema_version = schema_version
.ok_or_else(|| ManifestError::Parse("missing schema_version".to_string()))?;
if schema_version > CURRENT_SCHEMA_VERSION {
return Err(ManifestError::Parse(format!(
"unsupported schema_version {} (current {})",
schema_version, CURRENT_SCHEMA_VERSION
)));
}
Ok(Self {
schema_version,
high_water_mark: high_water_mark
.ok_or_else(|| ManifestError::Parse("missing high_water_mark".to_string()))?,
run_id_counters,
bitmap_indexes,
vector_spaces,
retired_spaces,
legacy_vector_raw_f32_compat,
path: path.to_path_buf(),
reactor,
})
}
pub fn persist(&self) -> Result<()> {
let mut lines = Vec::new();
lines.push(format!("schema_version={}", CURRENT_SCHEMA_VERSION));
lines.push(format!("high_water_mark={}", self.high_water_mark));
let mut levels: Vec<u32> = self.run_id_counters.keys().copied().collect();
levels.sort();
for level in levels {
if let Some(counter) = self.run_id_counters.get(&level) {
lines.push(format!("run_id.{}={}", level, counter));
}
}
let mut index_names: Vec<&str> =
self.bitmap_indexes.keys().map(|key| key.as_str()).collect();
index_names.sort();
for index_name in index_names {
if let Some(field_path) = self.bitmap_indexes.get(index_name) {
lines.push(format!("bitmap_index.{}={}", index_name, field_path));
}
}
let mut vector_space_ids = self.vector_spaces.keys().copied().collect::<Vec<u32>>();
vector_space_ids.sort_unstable();
for space_id in vector_space_ids {
if let Some(descriptor) = self.vector_spaces.get(&space_id) {
lines.push(format!(
"vector_space.{}={},{},{},{},{},{}",
space_id,
descriptor.metric.as_str(),
descriptor.dimension,
descriptor.encoding.as_str(),
if descriptor.normalized {
"true"
} else {
"false"
},
descriptor.model_id.as_deref().unwrap_or(""),
descriptor.model_version.as_deref().unwrap_or(""),
));
}
}
let mut retired_ids: Vec<u32> = self.retired_spaces.iter().copied().collect();
retired_ids.sort_unstable();
for space_id in retired_ids {
lines.push(format!("retired_space.{}", space_id));
}
lines.push(format!(
"legacy_vector_raw_f32_compat={}",
if self.legacy_vector_raw_f32_compat {
"true"
} else {
"false"
}
));
self.reactor
.write_file(&self.path, lines.join("\n").as_bytes())?;
Ok(())
}
pub fn path(&self) -> &Path {
&self.path
}
pub fn allocate_run_id(&mut self, level: u32) -> ManifestRunId {
let counter = self.run_id_counters.entry(level).or_insert(0);
*counter += 1;
*counter
}
pub fn set_high_water_mark(&mut self, value: u64) {
self.high_water_mark = value;
}
pub fn register_bitmap_index(&mut self, index_name: &str, field_path: &str) -> Result<()> {
let index_name = index_name.trim();
let field_path = field_path.trim();
if index_name.is_empty() || field_path.is_empty() {
return Err(ManifestError::Parse(
"bitmap index requires non-empty name and field path".to_string(),
));
}
if let Some(existing) = self.bitmap_indexes.get(index_name) {
if existing != field_path {
return Err(ManifestError::Parse(format!(
"bitmap index '{}' already registered for field '{}'",
index_name, existing
)));
}
return Ok(());
}
self.bitmap_indexes
.insert(index_name.to_string(), field_path.to_string());
Ok(())
}
pub fn has_bitmap_index(&self, index_name: &str) -> bool {
self.bitmap_indexes.contains_key(index_name)
}
pub fn bitmap_indexes(&self) -> Vec<BitmapIndexDescriptor> {
let mut out = self
.bitmap_indexes
.iter()
.filter(|(index_name, _)| !index_name.starts_with("__"))
.map(|(index_name, field_path)| BitmapIndexDescriptor {
index_name: index_name.clone(),
field_path: field_path.clone(),
})
.collect::<Vec<BitmapIndexDescriptor>>();
out.sort_by(|lhs, rhs| lhs.index_name.cmp(&rhs.index_name));
out
}
pub fn register_vector_space(&mut self, descriptor: VectorSpaceDescriptor) -> Result<()> {
if descriptor.dimension == 0 {
return Err(ManifestError::Parse(
"vector space dimension must be > 0".to_string(),
));
}
match self.vector_spaces.get(&descriptor.space_id) {
Some(existing) => {
if !existing.structural_eq(&descriptor) {
return Err(ManifestError::Parse(format!(
"vector space {} already registered with incompatible descriptor",
descriptor.space_id
)));
}
if descriptor.model_id.is_some() && descriptor.model_id != existing.model_id {
return Err(ManifestError::Parse(format!(
"vector space {} model_id mismatch: registered {:?}, got {:?}",
descriptor.space_id, existing.model_id, descriptor.model_id
)));
}
Ok(())
}
None => {
self.vector_spaces.insert(descriptor.space_id, descriptor);
Ok(())
}
}
}
pub fn vector_space(&self, space_id: u32) -> Option<&VectorSpaceDescriptor> {
self.vector_spaces.get(&space_id)
}
pub fn vector_spaces(&self) -> Vec<VectorSpaceDescriptor> {
let mut out = self.vector_spaces.values().cloned().collect::<Vec<_>>();
out.sort_by_key(|descriptor| descriptor.space_id);
out
}
pub fn unique_vector_space_for_metric(
&self,
metric: VectorMetric,
) -> Option<VectorSpaceDescriptor> {
let mut matching = self
.vector_spaces
.values()
.filter(|descriptor| descriptor.metric == metric)
.cloned();
let first = matching.next()?;
if matching.next().is_some() {
return None;
}
Some(first)
}
pub fn legacy_vector_raw_f32_compat(&self) -> bool {
self.legacy_vector_raw_f32_compat
}
pub fn set_legacy_vector_raw_f32_compat(&mut self, enabled: bool) {
self.legacy_vector_raw_f32_compat = enabled;
}
pub fn retire_space(&mut self, space_id: u32) {
self.retired_spaces.insert(space_id);
}
pub fn is_space_retired(&self, space_id: u32) -> bool {
self.retired_spaces.contains(&space_id)
}
}
fn parse_metric(value: &str) -> Result<VectorMetric> {
match value {
"cosine" => Ok(VectorMetric::Cosine),
"euclidean" => Ok(VectorMetric::Euclidean),
_ => Err(ManifestError::Parse(format!(
"unsupported vector metric '{}'",
value
))),
}
}
fn parse_encoding(value: &str) -> Result<VectorEncoding> {
match value {
"f32" => Ok(VectorEncoding::F32),
"quantized_i8" => Ok(VectorEncoding::QuantizedI8),
_ => Err(ManifestError::Parse(format!(
"unsupported vector encoding '{}'",
value
))),
}
}
fn parse_bool_flag(value: &str) -> Result<bool> {
match value {
"true" => Ok(true),
"false" => Ok(false),
_ => Err(ManifestError::Parse(format!(
"invalid bool flag '{}'",
value
))),
}
}
#[cfg(test)]
mod tests {
use super::*;
fn temp_manifest_path(name: &str) -> PathBuf {
let mut dir = std::env::temp_dir();
let stamp = format!(
"{}_{}_{}",
name,
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
);
dir.push(format!("{}.manifest", stamp));
dir
}
#[test]
fn manifest_round_trip() {
let path = temp_manifest_path("manifest_round_trip");
let mut manifest = Manifest::load_or_create(path.clone()).unwrap();
manifest.set_high_water_mark(42);
let run_id = manifest.allocate_run_id(1);
assert_eq!(run_id, 1);
manifest
.register_vector_space(VectorSpaceDescriptor {
space_id: 7,
dimension: 4,
metric: VectorMetric::Cosine,
encoding: VectorEncoding::F32,
normalized: false,
model_id: None,
model_version: None,
})
.unwrap();
manifest.persist().unwrap();
let loaded = Manifest::load(&path).unwrap();
assert_eq!(loaded.schema_version, CURRENT_SCHEMA_VERSION);
assert_eq!(loaded.high_water_mark, 42);
assert_eq!(loaded.run_id_counters.get(&1).copied(), Some(1));
assert_eq!(loaded.vector_space(7).unwrap().dimension, 4);
assert_eq!(loaded.vector_space(7).unwrap().metric, VectorMetric::Cosine);
assert_eq!(loaded.vector_space(7).unwrap().model_id, None);
std::fs::remove_file(path).ok();
}
#[test]
fn manifest_v5_model_id_round_trip() {
let path = temp_manifest_path("manifest_v5_model_id");
let mut manifest = Manifest::load_or_create(path.clone()).unwrap();
manifest.set_high_water_mark(0);
manifest
.register_vector_space(VectorSpaceDescriptor {
space_id: 1,
dimension: 128,
metric: VectorMetric::Cosine,
encoding: VectorEncoding::F32,
normalized: false,
model_id: Some("text-embedding-3-small".to_string()),
model_version: Some("2024-01".to_string()),
})
.unwrap();
manifest.persist().unwrap();
let loaded = Manifest::load(&path).unwrap();
assert_eq!(loaded.schema_version, CURRENT_SCHEMA_VERSION);
let space = loaded.vector_space(1).unwrap();
assert_eq!(space.model_id.as_deref(), Some("text-embedding-3-small"));
assert_eq!(space.model_version.as_deref(), Some("2024-01"));
std::fs::remove_file(path).ok();
}
#[test]
fn v4_manifest_loads_with_model_id_none() {
let path = temp_manifest_path("manifest_v4_compat");
std::fs::write(
&path,
"schema_version=4\nhigh_water_mark=0\nvector_space.3=cosine,64,f32,false\n",
)
.unwrap();
let loaded = Manifest::load(&path).unwrap();
assert_eq!(loaded.schema_version, 4);
let space = loaded.vector_space(3).unwrap();
assert_eq!(space.model_id, None);
assert_eq!(space.model_version, None);
assert_eq!(space.dimension, 64);
loaded.persist().unwrap();
let upgraded = Manifest::load(&path).unwrap();
assert_eq!(upgraded.schema_version, CURRENT_SCHEMA_VERSION);
assert_eq!(upgraded.vector_space(3).unwrap().model_id, None);
std::fs::remove_file(path).ok();
}
#[test]
fn register_vector_space_rejects_model_id_mismatch() {
let path = temp_manifest_path("manifest_model_id_mismatch");
let mut manifest = Manifest::load_or_create(path.clone()).unwrap();
manifest.set_high_water_mark(0);
manifest
.register_vector_space(VectorSpaceDescriptor {
space_id: 2,
dimension: 32,
metric: VectorMetric::Cosine,
encoding: VectorEncoding::F32,
normalized: false,
model_id: Some("model-a".to_string()),
model_version: None,
})
.unwrap();
let err = manifest
.register_vector_space(VectorSpaceDescriptor {
space_id: 2,
dimension: 32,
metric: VectorMetric::Cosine,
encoding: VectorEncoding::F32,
normalized: false,
model_id: Some("model-b".to_string()),
model_version: None,
})
.unwrap_err();
match err {
ManifestError::Parse(msg) => assert!(msg.contains("model_id mismatch")),
ManifestError::Io(io) => panic!("unexpected io error: {io}"),
}
manifest
.register_vector_space(VectorSpaceDescriptor {
space_id: 2,
dimension: 32,
metric: VectorMetric::Cosine,
encoding: VectorEncoding::F32,
normalized: false,
model_id: None,
model_version: None,
})
.unwrap();
std::fs::remove_file(path).ok();
}
#[test]
fn allocate_run_id_is_monotonic_per_level() {
let path = temp_manifest_path("manifest_run_id");
let mut manifest = Manifest::load_or_create(path.clone()).unwrap();
assert_eq!(manifest.allocate_run_id(3), 1);
assert_eq!(manifest.allocate_run_id(3), 2);
assert_eq!(manifest.allocate_run_id(4), 1);
manifest.persist().unwrap();
std::fs::remove_file(path).ok();
}
#[test]
fn load_rejects_future_schema_version() {
let path = temp_manifest_path("manifest_future_schema");
std::fs::write(
&path,
format!(
"schema_version={}\nhigh_water_mark=0",
CURRENT_SCHEMA_VERSION + 1
),
)
.unwrap();
let err = match Manifest::load(&path) {
Ok(_) => panic!("expected future schema version to be rejected"),
Err(err) => err,
};
match err {
ManifestError::Parse(msg) => assert!(msg.contains("unsupported schema_version")),
ManifestError::Io(io) => panic!("unexpected io error: {}", io),
}
std::fs::remove_file(path).ok();
}
#[test]
fn bitmap_index_round_trip_and_sorted_listing() {
let path = temp_manifest_path("manifest_bitmap_index");
let mut manifest = Manifest::load_or_create(path.clone()).unwrap();
manifest
.register_bitmap_index("idx_country", "n.country")
.unwrap();
manifest
.register_bitmap_index("idx_risk_tier", "n.risk_tier")
.unwrap();
manifest.persist().unwrap();
let loaded = Manifest::load(&path).unwrap();
let indexes = loaded.bitmap_indexes();
assert_eq!(indexes.len(), 2);
assert_eq!(indexes[0].index_name, "idx_country");
assert_eq!(indexes[0].field_path, "n.country");
assert_eq!(indexes[1].index_name, "idx_risk_tier");
assert_eq!(indexes[1].field_path, "n.risk_tier");
std::fs::remove_file(path).ok();
}
}