#![doc = include_str!("../README.md")]
use std::collections::BTreeMap;
use std::fs::{self, File};
use std::io::{BufRead, BufReader, BufWriter, Write};
use std::path::Path;
use serde::{Deserialize, Serialize};
use text_embeddings::{EmbeddingModelInfo, TextEmbedderBackend};
use thiserror::Error;
use vector_analysis_index::SerializableVectorRecord;
use crate::{DocumentChunk, RetrievalIndex};
#[derive(Debug, Error)]
pub enum StorageError {
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("JSON error: {0}")]
Json(String),
#[error("invalid manifest: {0}")]
InvalidManifest(String),
#[error("invalid retrieval state: {0}")]
InvalidState(String),
}
pub type Result<T> = std::result::Result<T, StorageError>;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RetrievalFile {
pub path: String,
pub records: u64,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct RetrievalManifest {
pub schema_version: u32,
pub chunk_count: u64,
pub vector_count: u64,
pub dimensions: Option<usize>,
pub embedder: EmbeddingModelInfo,
pub chunks_file: RetrievalFile,
pub vectors_file: RetrievalFile,
pub corpus_file: String,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct PersistedCorpusMetadata {
pub corpus_options: text_lexical::CorpusOptions,
pub bm25_options: text_lexical::Bm25Options,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct PersistedChunkRecord {
pub chunk: DocumentChunk,
pub raw_text: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct PersistedSearchIndex {
pub manifest: RetrievalManifest,
pub corpus: PersistedCorpusMetadata,
pub chunks: Vec<PersistedChunkRecord>,
pub vectors: Vec<SerializableVectorRecord>,
}
impl PersistedSearchIndex {
pub fn from_index<B: TextEmbedderBackend>(index: &RetrievalIndex<B>) -> Self {
let chunks = index
.chunks_iter()
.map(|chunk| PersistedChunkRecord {
chunk: chunk.clone(),
raw_text: index.raw_text(&chunk.chunk_id).map(ToString::to_string),
})
.collect::<Vec<_>>();
let vectors = index.vector_records();
let embedder = index.embedder_info();
Self {
manifest: RetrievalManifest {
schema_version: 1,
chunk_count: chunks.len() as u64,
vector_count: vectors.len() as u64,
dimensions: vectors.first().map(|record| record.vector.len()),
embedder,
chunks_file: RetrievalFile {
path: "chunks.jsonl".to_string(),
records: chunks.len() as u64,
},
vectors_file: RetrievalFile {
path: "vectors.jsonl".to_string(),
records: vectors.len() as u64,
},
corpus_file: "corpus.json".to_string(),
},
corpus: PersistedCorpusMetadata {
corpus_options: index.corpus_options().clone(),
bm25_options: index.bm25_options().clone(),
},
chunks,
vectors,
}
}
pub fn save_to_path(&self, path: &Path) -> Result<()> {
fs::create_dir_all(path)?;
write_json(path.join("manifest.json"), &self.manifest)?;
write_json(path.join(&self.manifest.corpus_file), &self.corpus)?;
write_jsonl(path.join(&self.manifest.chunks_file.path), &self.chunks)?;
write_jsonl(path.join(&self.manifest.vectors_file.path), &self.vectors)?;
Ok(())
}
pub fn load_from_path(path: &Path) -> Result<Self> {
let manifest = read_json::<RetrievalManifest>(path.join("manifest.json"))?;
let corpus = read_json::<PersistedCorpusMetadata>(path.join(&manifest.corpus_file))?;
let chunks = read_jsonl::<PersistedChunkRecord>(path.join(&manifest.chunks_file.path))?;
let vectors =
read_jsonl::<SerializableVectorRecord>(path.join(&manifest.vectors_file.path))?;
if chunks.len() as u64 != manifest.chunk_count {
return Err(StorageError::InvalidManifest(format!(
"manifest expected {} chunks, loaded {}",
manifest.chunk_count,
chunks.len()
)));
}
if vectors.len() as u64 != manifest.vector_count {
return Err(StorageError::InvalidManifest(format!(
"manifest expected {} vectors, loaded {}",
manifest.vector_count,
vectors.len()
)));
}
if let Some(dimensions) = manifest.dimensions {
if vectors
.iter()
.any(|record| record.vector.len() != dimensions)
{
return Err(StorageError::InvalidManifest(format!(
"persisted vectors did not all match manifest dimension {dimensions}"
)));
}
}
Ok(Self {
manifest,
corpus,
chunks,
vectors,
})
}
pub fn into_index<B: TextEmbedderBackend>(self, embedder: B) -> Result<RetrievalIndex<B>> {
validate_embedder_compatibility(&self.manifest.embedder, &embedder.model_info())?;
let raw_text_by_chunk_id = self
.chunks
.iter()
.filter_map(|record| {
record
.raw_text
.as_ref()
.map(|raw_text| (record.chunk.chunk_id.clone(), raw_text.clone()))
})
.collect::<BTreeMap<_, _>>();
let chunks = self
.chunks
.into_iter()
.map(|record| record.chunk)
.collect::<Vec<_>>();
RetrievalIndex::from_parts(
embedder,
self.corpus.corpus_options,
self.corpus.bm25_options,
chunks,
raw_text_by_chunk_id,
self.vectors,
)
.map_err(|err| StorageError::InvalidState(err.to_string()))
}
pub fn load_with_embedder<B: TextEmbedderBackend>(
path: &Path,
embedder: B,
) -> Result<RetrievalIndex<B>> {
Self::load_from_path(path)?.into_index(embedder)
}
}
fn write_json(path: impl AsRef<Path>, value: &impl Serialize) -> Result<()> {
let file = File::create(path)?;
serde_json::to_writer_pretty(BufWriter::new(file), value).map_err(json_error)
}
fn read_json<T: for<'de> Deserialize<'de>>(path: impl AsRef<Path>) -> Result<T> {
let file = File::open(path)?;
serde_json::from_reader(BufReader::new(file)).map_err(json_error)
}
fn write_jsonl<T: Serialize>(path: impl AsRef<Path>, values: &[T]) -> Result<()> {
let file = File::create(path)?;
let mut writer = BufWriter::new(file);
for value in values {
let line = serde_json::to_string(value).map_err(json_error)?;
writer.write_all(line.as_bytes())?;
writer.write_all(b"\n")?;
}
writer.flush()?;
Ok(())
}
fn read_jsonl<T: for<'de> Deserialize<'de>>(path: impl AsRef<Path>) -> Result<Vec<T>> {
let file = File::open(path)?;
let reader = BufReader::new(file);
let mut values = Vec::new();
for (line_index, line) in reader.lines().enumerate() {
let line = line?;
if line.trim().is_empty() {
continue;
}
let value = serde_json::from_str::<T>(&line)
.map_err(|err| StorageError::Json(format!("line {}: {err}", line_index + 1)))?;
values.push(value);
}
Ok(values)
}
fn validate_embedder_compatibility(
persisted: &EmbeddingModelInfo,
current: &EmbeddingModelInfo,
) -> Result<()> {
if !persisted.model_name.is_empty()
&& !current.model_name.is_empty()
&& persisted.model_name != current.model_name
{
return Err(StorageError::InvalidState(format!(
"persisted embedder `{}` did not match provided embedder `{}`",
persisted.model_name, current.model_name
)));
}
if persisted.dimensions > 0
&& current.dimensions > 0
&& persisted.dimensions != current.dimensions
{
return Err(StorageError::InvalidState(format!(
"persisted embedder dimensions {} did not match provided embedder dimensions {}",
persisted.dimensions, current.dimensions
)));
}
Ok(())
}
fn json_error(error: serde_json::Error) -> StorageError {
StorageError::Json(error.to_string())
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use crate::{HybridConfig, IngestionOptions, SearchDocument, SearchQuery};
use tempfile::tempdir;
use text_embeddings::{
DenseVector, HashedTextEmbedder, TextEmbeddingBackend, TextEmbeddingBackendKind,
TextEmbeddingConfig, TextEmbeddingMetadata,
};
use text_lexical::CorpusOptions;
use super::*;
fn embedder() -> HashedTextEmbedder {
HashedTextEmbedder::new(
TextEmbeddingConfig {
dimensions: 32,
use_idf: true,
},
CorpusOptions::default(),
)
.unwrap()
}
#[derive(Debug, Clone)]
struct NamedEmbedder {
name: String,
dimensions: usize,
}
impl TextEmbeddingBackend for NamedEmbedder {
fn embed_text(&self, _text: &str) -> video_analysis_core::Result<DenseVector> {
DenseVector::new(vec![1.0; self.dimensions])
}
fn metadata(&self) -> TextEmbeddingMetadata {
TextEmbeddingMetadata {
backend: TextEmbeddingBackendKind::Custom,
model_name: Some(self.name.clone()),
dimensions: Some(self.dimensions),
..TextEmbeddingMetadata::default()
}
}
}
#[test]
fn persisted_index_round_trips_with_manifest_validation() {
let mut index = RetrievalIndex::new(embedder());
index
.ingest_documents(
&[
SearchDocument {
id: "doc-1".to_string(),
title: Some("Rust Search".to_string()),
body: "Rust cargo crates enable semantic search over documentation."
.to_string(),
metadata: BTreeMap::from([("lang".to_string(), "en".to_string())]),
source: None,
provenance: Vec::new(),
annotations: Vec::new(),
},
SearchDocument {
id: "doc-2".to_string(),
title: None,
body: "Music playlists and recommendation notes.".to_string(),
metadata: BTreeMap::from([("lang".to_string(), "en".to_string())]),
source: None,
provenance: Vec::new(),
annotations: Vec::new(),
},
],
&IngestionOptions::default(),
)
.unwrap();
let dir = tempdir().unwrap();
let persisted = PersistedSearchIndex::from_index(&index);
persisted.save_to_path(dir.path()).unwrap();
let loaded = PersistedSearchIndex::load_with_embedder(dir.path(), embedder()).unwrap();
let query = SearchQuery {
text: "rust search docs".to_string(),
top_k: 2,
filter: None,
hybrid: HybridConfig::default(),
};
assert_eq!(
loaded.search(&query).unwrap(),
index.search(&query).unwrap()
);
}
#[test]
fn malformed_manifest_is_rejected() {
let dir = tempdir().unwrap();
fs::write(
dir.path().join("manifest.json"),
r#"{"schema_version":1,"chunk_count":2,"vector_count":1,"dimensions":2,"embedder":{"model_name":"hashed-text-embedder","backend":"hashed","dimensions":32},"chunks_file":{"path":"chunks.jsonl","records":0},"vectors_file":{"path":"vectors.jsonl","records":0},"corpus_file":"corpus.json"}"#,
)
.unwrap();
fs::write(dir.path().join("corpus.json"), r#"{"corpus_options":{"processing":{"language":null,"lowercase":true,"normalize_unicode":true,"keep_apostrophes":true,"include_punctuation":false},"min_term_len":1,"stop_words":[],"max_terms_per_document":null},"bm25_options":{"k1":1.2,"b":0.75,"min_term_len":1,"stop_words":[]}}"#).unwrap();
fs::write(dir.path().join("chunks.jsonl"), "").unwrap();
fs::write(dir.path().join("vectors.jsonl"), "").unwrap();
let err = PersistedSearchIndex::load_from_path(dir.path()).unwrap_err();
assert!(
matches!(err, StorageError::InvalidManifest(message) if message.contains("chunks"))
);
}
#[test]
fn loading_rejects_incompatible_embedder_name_or_dimensions() {
let mut index = RetrievalIndex::new(NamedEmbedder {
name: "persisted".to_string(),
dimensions: 4,
});
index
.ingest_documents(
&[SearchDocument {
id: "doc-1".to_string(),
title: None,
body: "rust cargo crates".to_string(),
metadata: BTreeMap::new(),
source: None,
provenance: Vec::new(),
annotations: Vec::new(),
}],
&IngestionOptions::default(),
)
.unwrap();
let dir = tempdir().unwrap();
PersistedSearchIndex::from_index(&index)
.save_to_path(dir.path())
.unwrap();
let wrong_name = PersistedSearchIndex::load_with_embedder(
dir.path(),
NamedEmbedder {
name: "other".to_string(),
dimensions: 4,
},
)
.unwrap_err();
assert!(
matches!(wrong_name, StorageError::InvalidState(message) if message.contains("embedder"))
);
let wrong_dimensions = PersistedSearchIndex::load_with_embedder(
dir.path(),
NamedEmbedder {
name: "persisted".to_string(),
dimensions: 8,
},
)
.unwrap_err();
assert!(
matches!(wrong_dimensions, StorageError::InvalidState(message) if message.contains("dimensions"))
);
}
}