pub mod artifact;
pub mod freshness;
pub mod manifest;
pub mod mutable;
pub mod schema;
pub mod vectors;
pub use artifact::{ArtifactStore, LocalArtifact};
pub use freshness::{
CacheOutcome, CachePolicy, CurrentAnchor, DerivesFromEdge, StaleReason, Staleness,
};
pub use manifest::{
AnchorKind, AnchorValue, ArtifactDigest, ComputeDevice, DefinitionHash, InputAnchor,
ManifestError, MatchVerdict, Materialization, MaterializationEnv, MaterializationManifest,
ModelIdentity, ProducingDescriptor,
};
use std::path::Path;
use std::sync::Arc;
use arrow::array::Array;
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::prelude::SessionContext;
use datafusion::sql::TableReference;
use tracing::warn;
use crate::catalog::result_repo::{CreateResultTableParams, ResultTableKind, ResultTableRecord};
use crate::catalog::status::ResultTableStatus;
use crate::catalog::Catalog;
use crate::config::AnnIndexConfig;
use crate::error::{JammiError, Result};
use crate::index::sidecar::SidecarIndex;
use crate::index::VectorIndex;
use crate::model_task::ModelTask;
use crate::storage::sidecar_layout::SidecarKind;
use crate::storage::{
self, JammiObjectStore, ObjectParquetWriter, Scheme, StorageRegistry, StorageUrl,
};
use crate::tenant_scope::TenantBinding;
#[derive(Debug)]
pub struct EmbeddingTableSpec<'a> {
pub source_id: &'a str,
pub model_id: &'a str,
pub derived_from: Option<&'a str>,
pub dimensions: usize,
}
#[derive(Debug)]
pub struct ResultTableInfo {
pub table_name: String,
pub parquet_url: StorageUrl,
pub index_url: Option<StorageUrl>,
}
pub struct ResultStore {
root: StorageUrl,
registry: StorageRegistry,
catalog: Arc<Catalog>,
ann: AnnIndexConfig,
}
fn sanitize_model_id(model_id: &str) -> String {
model_id
.chars()
.map(|c| {
if c == '/' || c == ':' || c == ' ' || c == '.' {
'_'
} else {
c
}
})
.take(64)
.collect()
}
impl ResultStore {
pub fn new(artifact_dir: &Path, catalog: Arc<Catalog>, ann: AnnIndexConfig) -> Result<Self> {
let jammi_db_dir = artifact_dir.join("jammi_db");
std::fs::create_dir_all(&jammi_db_dir)?;
let url = StorageUrl::parse(
jammi_db_dir
.to_str()
.ok_or_else(|| JammiError::Config("Non-UTF8 artifact_dir".into()))?,
)?;
Ok(Self {
root: url,
registry: StorageRegistry::new(),
catalog,
ann,
})
}
pub fn with_root(
root: StorageUrl,
registry: StorageRegistry,
catalog: Arc<Catalog>,
ann: AnnIndexConfig,
) -> Result<Self> {
if root.scheme() == Scheme::File {
let path = root.path();
std::fs::create_dir_all(path)?;
}
Ok(Self {
root,
registry,
catalog,
ann,
})
}
pub fn catalog(&self) -> &Arc<Catalog> {
&self.catalog
}
pub fn open_parquet(&self, url: &StorageUrl) -> Result<JammiObjectStore> {
let driver = self.registry.driver_for(url, None)?;
Ok(JammiObjectStore::new(driver, url.clone()))
}
pub fn open_index(&self, url: &StorageUrl) -> Result<JammiObjectStore> {
let driver = self.registry.driver_for(url, None)?;
Ok(JammiObjectStore::new(driver, url.clone()))
}
#[allow(clippy::too_many_arguments)]
pub async fn create_table(
&self,
source_id: &str,
task: ModelTask,
kind: ResultTableKind,
derived_from: Option<&str>,
model_id: &str,
dimensions: Option<i32>,
key_column: Option<&str>,
text_columns: Option<&str>,
) -> Result<ResultTableInfo> {
let sanitized = sanitize_model_id(model_id);
let timestamp = chrono::Utc::now().format("%Y%m%dT%H%M%S%9f");
let suffix = &uuid::Uuid::new_v4().simple().to_string()[..8];
let task_str = task.as_db_str();
let table_name = format!("{source_id}__{task_str}__{sanitized}__{timestamp}_{suffix}");
let parquet_url = self.derive_url(&format!("{table_name}.parquet"))?;
let index_url = if matches!(kind, ResultTableKind::Model) && task.is_embedding() {
Some(self.derive_url(&format!("{table_name}.idx"))?)
} else {
None
};
self.catalog
.create_result_table(CreateResultTableParams {
table_name: &table_name,
source_id,
model_id,
task,
kind,
derived_from,
parquet_path: parquet_url.as_str(),
index_path: index_url.as_ref().map(|u| u.as_str()),
dimensions,
key_column,
text_columns,
})
.await?;
Ok(ResultTableInfo {
table_name,
parquet_url,
index_url,
})
}
pub async fn open_writer(
&self,
url: &StorageUrl,
schema: arrow::datatypes::SchemaRef,
) -> Result<ObjectParquetWriter> {
let handle = self.open_parquet(url)?;
Ok(ObjectParquetWriter::open(&handle, schema).await?)
}
pub async fn register_table(
&self,
ctx: &SessionContext,
name: &str,
url: &StorageUrl,
) -> Result<()> {
register_parquet_table(ctx, &self.registry, name, url).await
}
pub async fn finalize_with_manifest(
&self,
ctx: &SessionContext,
name: &str,
url: &StorageUrl,
rows: usize,
materialization: Materialization<'_>,
) -> Result<MaterializationManifest> {
let parquet_handle = self.open_parquet(url)?;
let parquet_path = parquet_handle.data_path()?;
let bytes = parquet_handle.get_bytes(&parquet_path).await?;
let digest = ArtifactDigest::of_bytes(&bytes);
let manifest = MaterializationManifest::compute(
materialization.descriptor,
materialization.env,
materialization.inputs,
digest,
run_id().to_string(),
chrono::Utc::now().to_rfc3339(),
)
.map_err(manifest_to_jammi)?;
#[cfg(feature = "test-hooks")]
crate::store::mutable::test_hook::maybe_signal_materialization().await;
self.write_materialization_sidecar(url, &manifest).await?;
self.register_table(ctx, name, url).await?;
let anchors_json = serde_json::to_string(&manifest.input_anchors)
.map_err(|e| JammiError::Other(format!("serialise input anchors: {e}")))?;
self.catalog
.promote_result_table_with_manifest(
name,
rows,
manifest.definition_hash.as_str(),
&anchors_json,
)
.await?;
Ok(manifest)
}
pub async fn result_digest_anchor(&self, table: &ResultTableRecord) -> Result<InputAnchor> {
let parquet_url = StorageUrl::parse(&table.parquet_path)?;
let digest = match self.read_materialization_manifest(&parquet_url).await? {
Some(m) => m.artifact,
None => {
let handle = self.open_parquet(&parquet_url)?;
let path = handle.data_path()?;
let bytes = handle.get_bytes(&path).await?;
ArtifactDigest::of_bytes(&bytes)
}
};
Ok(InputAnchor::result_digest(&table.table_name, &digest))
}
pub async fn read_materialization_manifest(
&self,
parquet_url: &StorageUrl,
) -> Result<Option<MaterializationManifest>> {
let handle = self.open_parquet(parquet_url)?;
let sidecar = materialization_sidecar_path(&handle)?;
if !handle.exists(&sidecar).await? {
return Ok(None);
}
let bytes = handle.get_bytes(&sidecar).await?;
let manifest =
MaterializationManifest::from_json_bytes(&bytes).map_err(manifest_to_jammi)?;
Ok(Some(manifest))
}
async fn write_materialization_sidecar(
&self,
parquet_url: &StorageUrl,
manifest: &MaterializationManifest,
) -> Result<()> {
let handle = self.open_parquet(parquet_url)?;
let sidecar = materialization_sidecar_path(&handle)?;
let bytes = manifest.to_json_bytes().map_err(manifest_to_jammi)?;
handle.put_bytes(&sidecar, bytes.into()).await?;
Ok(())
}
pub async fn verify_materialization(
&self,
table: &ResultTableRecord,
expected_definition: Option<&DefinitionHash>,
) -> Result<MatchVerdict> {
let parquet_url = StorageUrl::parse(&table.parquet_path)?;
let Some(manifest) = self.read_materialization_manifest(&parquet_url).await? else {
return Ok(MatchVerdict::MissingManifest);
};
let handle = self.open_parquet(&parquet_url)?;
let path = handle.data_path()?;
let bytes = handle.get_bytes(&path).await?;
let recomputed = ArtifactDigest::of_bytes(&bytes);
if recomputed != manifest.artifact {
return Ok(MatchVerdict::Mismatch {
expected: manifest.artifact.0,
found: recomputed.0,
});
}
if let Some(expected) = expected_definition {
if *expected != manifest.definition_hash {
return Ok(MatchVerdict::Mismatch {
expected: expected.0.clone(),
found: manifest.definition_hash.0,
});
}
}
let unpinned = manifest.unpinned_inputs();
if unpinned.is_empty() {
Ok(MatchVerdict::Match)
} else {
Ok(MatchVerdict::MatchWithUnpinnedInputs { unpinned })
}
}
pub async fn recover(&self) -> Result<()> {
TenantBinding::admin_scope(self.recover_inner()).await
}
async fn recover_inner(&self) -> Result<()> {
let building = self
.catalog
.list_result_tables_by_status(ResultTableStatus::Building)
.await?;
for table in building {
let parquet_url = StorageUrl::parse(&table.parquet_path)?;
let parquet_handle = self.open_parquet(&parquet_url)?;
let parquet_path = parquet_handle.data_path()?;
let parquet_exists = parquet_handle.exists(&parquet_path).await?;
let parquet_valid =
parquet_exists && storage::reader::is_valid_parquet(&parquet_handle).await?;
if !parquet_exists {
warn!(
table = table.table_name,
"Recovery: Parquet missing, marking failed"
);
self.catalog
.update_result_table_status(&table.table_name, ResultTableStatus::Failed, 0)
.await?;
} else if !parquet_valid {
warn!(
table = table.table_name,
"Recovery: invalid Parquet, deleting and marking failed"
);
parquet_handle.delete_if_exists(&parquet_path).await.ok();
if let Some(ref idx) = table.index_path {
let idx_url = StorageUrl::parse(idx)?;
let idx_handle = self.open_index(&idx_url)?;
storage::sidecar_layout::delete_sidecar(&idx_handle, SidecarKind::Ann)
.await
.ok();
}
self.catalog
.update_result_table_status(&table.table_name, ResultTableStatus::Failed, 0)
.await?;
} else if let Some(manifest) = self.read_materialization_manifest(&parquet_url).await? {
let row_count = storage::reader::count_parquet_rows(&parquet_handle).await?;
if table.task.is_embedding() {
if let Some(ref idx_path) = table.index_path {
let idx_url = StorageUrl::parse(idx_path)?;
if let Err(e) = self
.rebuild_index_from_parquet(
&parquet_handle,
&idx_url,
table.dimensions.unwrap_or(0) as usize,
)
.await
{
warn!(
table = table.table_name,
error = %e,
"Recovery: failed to rebuild index, proceeding without"
);
}
}
}
let anchors_json = serde_json::to_string(&manifest.input_anchors)
.map_err(|e| JammiError::Other(format!("serialise input anchors: {e}")))?;
self.catalog
.promote_result_table_with_manifest(
&table.table_name,
row_count,
manifest.definition_hash.as_str(),
&anchors_json,
)
.await?;
} else {
warn!(
table = table.table_name,
"Recovery: valid Parquet but no materialization manifest \
(torn write before manifest); deleting and marking failed"
);
parquet_handle.delete_if_exists(&parquet_path).await.ok();
if let Some(ref idx) = table.index_path {
let idx_url = StorageUrl::parse(idx)?;
let idx_handle = self.open_index(&idx_url)?;
storage::sidecar_layout::delete_sidecar(&idx_handle, SidecarKind::Ann)
.await
.ok();
}
self.catalog
.update_result_table_status(&table.table_name, ResultTableStatus::Failed, 0)
.await?;
}
}
self.reconcile_ready_manifests().await?;
Ok(())
}
async fn reconcile_ready_manifests(&self) -> Result<()> {
let ready = self
.catalog
.list_result_tables_by_status(ResultTableStatus::Ready)
.await?;
for table in ready {
if table.definition_hash.is_none() {
continue;
}
let parquet_url = StorageUrl::parse(&table.parquet_path)?;
let handle = self.open_parquet(&parquet_url)?;
let sidecar = materialization_sidecar_path(&handle)?;
if handle.exists(&sidecar).await? {
continue;
}
warn!(
table = table.table_name,
"Recovery: post-contract ready table is missing its materialization \
manifest sidecar; deleting and marking failed"
);
let data_path = handle.data_path()?;
handle.delete_if_exists(&data_path).await.ok();
if let Some(ref idx) = table.index_path {
let idx_url = StorageUrl::parse(idx)?;
let idx_handle = self.open_index(&idx_url)?;
storage::sidecar_layout::delete_sidecar(&idx_handle, SidecarKind::Ann)
.await
.ok();
}
self.catalog
.update_result_table_status(&table.table_name, ResultTableStatus::Failed, 0)
.await?;
}
Ok(())
}
pub async fn load_existing_tables(&self, ctx: &SessionContext) -> Result<()> {
TenantBinding::admin_scope(self.load_existing_tables_inner(ctx)).await
}
async fn load_existing_tables_inner(&self, ctx: &SessionContext) -> Result<()> {
let ready = self
.catalog
.list_result_tables_by_status(ResultTableStatus::Ready)
.await?;
for table in ready {
let url = match StorageUrl::parse(&table.parquet_path) {
Ok(u) => u,
Err(e) => {
warn!(
table = table.table_name,
error = %e,
"Result-table parquet_path is not a valid storage URL"
);
continue;
}
};
let handle = self.open_parquet(&url)?;
let path = handle.data_path()?;
if handle.exists(&path).await? {
if let Err(e) = self.register_table(ctx, &table.table_name, &url).await {
warn!(
table = table.table_name,
error = %e,
"Failed to register existing table"
);
}
}
}
Ok(())
}
pub async fn search_vectors(
&self,
ctx: &SessionContext,
table: &ResultTableRecord,
query: &[f32],
k: usize,
) -> Result<Vec<(String, f32)>> {
let index = self.resolve_search_mode(table).await?;
match index {
Some(idx) => idx.search(query, k),
None => {
crate::index::exact::exact_vector_search(ctx, &table.table_name, query, k).await
}
}
}
pub async fn resolve_search_mode(
&self,
table: &ResultTableRecord,
) -> Result<Option<SidecarIndex>> {
let Some(ref idx_path) = table.index_path else {
return Ok(None);
};
let idx_url = StorageUrl::parse(idx_path)?;
let handle = self.open_index(&idx_url)?;
match storage::sidecar_layout::load_sidecar(&handle, &self.ann).await {
Ok(index) => Ok(Some(index)),
Err(e) => {
warn!(
table = table.table_name,
error = %e,
"Sidecar index unavailable, falling back to exact search"
);
Ok(None)
}
}
}
pub async fn save_sidecar(&self, url: &StorageUrl, index: &SidecarIndex) -> Result<()> {
let handle = self.open_index(url)?;
storage::sidecar_layout::save_sidecar(&handle, index).await
}
pub async fn delete_table_files(
&self,
parquet_path: &str,
index_path: Option<&str>,
) -> Result<()> {
let parquet_url = StorageUrl::parse(parquet_path)?;
let parquet_handle = self.open_parquet(&parquet_url)?;
let path = parquet_handle.data_path()?;
parquet_handle.delete_if_exists(&path).await?;
if let Some(idx) = index_path {
let idx_url = StorageUrl::parse(idx)?;
let idx_handle = self.open_index(&idx_url)?;
storage::sidecar_layout::delete_sidecar(&idx_handle, SidecarKind::Ann).await?;
}
Ok(())
}
fn derive_url(&self, name: &str) -> Result<StorageUrl> {
let root_str = self.root.as_str();
let joined = if root_str.ends_with('/') {
format!("{root_str}{name}")
} else {
format!("{root_str}/{name}")
};
Ok(StorageUrl::parse(&joined)?)
}
async fn rebuild_index_from_parquet(
&self,
parquet_handle: &JammiObjectStore,
index_url: &StorageUrl,
dimensions: usize,
) -> Result<()> {
if dimensions == 0 {
return Ok(());
}
let batches = storage::reader::read_all_record_batches(parquet_handle).await?;
let mut index = SidecarIndex::new(dimensions, &self.ann)?;
for batch in batches {
let row_ids = batch
.column_by_name("_row_id")
.and_then(|c| c.as_any().downcast_ref::<arrow::array::StringArray>());
let vectors = batch.column_by_name("vector").and_then(|c| {
c.as_any()
.downcast_ref::<arrow::array::FixedSizeListArray>()
});
if let (Some(ids), Some(vecs)) = (row_ids, vectors) {
for i in 0..ids.len() {
let row_id = ids.value(i);
let v = vecs.value(i);
let float_arr = v
.as_any()
.downcast_ref::<arrow::array::Float32Array>()
.ok_or_else(|| JammiError::Other("Vector not Float32".into()))?;
let vec: Vec<f32> = (0..float_arr.len()).map(|j| float_arr.value(j)).collect();
index.add(row_id, &vec)?;
}
}
}
if index.len() > 0 {
index.build()?;
self.save_sidecar(index_url, &index).await?;
}
Ok(())
}
pub async fn materialize_embedding_table(
&self,
ctx: &SessionContext,
spec: EmbeddingTableSpec<'_>,
rows: &[(String, Vec<f32>)],
materialization: Materialization<'_>,
) -> Result<ResultTableRecord> {
let EmbeddingTableSpec {
source_id,
model_id,
derived_from,
dimensions,
} = spec;
let table_info = self
.create_table(
source_id,
ModelTask::TextEmbedding,
ResultTableKind::Model,
derived_from,
model_id,
Some(dimensions as i32),
Some("_row_id"),
None,
)
.await?;
let schema = crate::store::schema::embedding_table_schema(dimensions);
let batch = embedding_batch(&schema, source_id, model_id, rows, dimensions)?;
let mut writer = self.open_writer(&table_info.parquet_url, schema).await?;
let mut index = SidecarIndex::new(dimensions, &self.ann)?;
if !rows.is_empty() {
writer.write_batch(&batch).await?;
for (key, vector) in rows {
index.add(key, vector)?;
}
}
let row_count = writer.close().await?;
if index.len() > 0 {
index.build()?;
if let Some(ref index_url) = table_info.index_url {
self.save_sidecar(index_url, &index).await?;
}
}
self.finalize_with_manifest(
ctx,
&table_info.table_name,
&table_info.parquet_url,
row_count,
materialization,
)
.await?;
self.catalog
.get_result_table(&table_info.table_name)
.await?
.ok_or_else(|| {
JammiError::Catalog(format!(
"Result table '{}' not found after materialisation",
table_info.table_name
))
})
}
}
fn embedding_batch(
schema: &arrow::datatypes::SchemaRef,
source_id: &str,
model_id: &str,
rows: &[(String, Vec<f32>)],
dimensions: usize,
) -> Result<arrow::array::RecordBatch> {
use arrow::array::{FixedSizeListArray, Float32Array, StringArray};
use arrow::datatypes::{DataType, Field};
for (key, vector) in rows {
if vector.len() != dimensions {
return Err(JammiError::Schema {
table: model_id.to_string(),
column: "vector".into(),
expected: format!("FixedSizeList<Float32> width {dimensions}"),
actual: format!("row '{key}' has width {}", vector.len()),
});
}
}
let row_ids = StringArray::from_iter_values(rows.iter().map(|(k, _)| k.as_str()));
let source_ids = StringArray::from_iter_values(rows.iter().map(|_| source_id));
let model_ids = StringArray::from_iter_values(rows.iter().map(|_| model_id));
let flat: Vec<f32> = rows.iter().flat_map(|(_, v)| v.iter().copied()).collect();
let item = Arc::new(Field::new("item", DataType::Float32, false));
let vectors = FixedSizeListArray::try_new(
item,
dimensions as i32,
Arc::new(Float32Array::from(flat)),
None,
)
.map_err(|e| JammiError::Other(format!("materialize: build vector column: {e}")))?;
arrow::array::RecordBatch::try_new(
Arc::clone(schema),
vec![
Arc::new(row_ids),
Arc::new(source_ids),
Arc::new(model_ids),
Arc::new(vectors),
],
)
.map_err(|e| JammiError::Other(format!("materialize: build batch: {e}")))
}
fn run_id() -> &'static str {
static RUN_ID: std::sync::OnceLock<String> = std::sync::OnceLock::new();
RUN_ID.get_or_init(|| uuid::Uuid::new_v4().simple().to_string())
}
fn materialization_sidecar_path(handle: &JammiObjectStore) -> Result<object_store::path::Path> {
Ok(handle.sibling_path("materialization.json")?)
}
pub fn manifest_to_jammi(e: ManifestError) -> JammiError {
match e {
ManifestError::Storage(s) => JammiError::Storage(s),
ManifestError::Serde(s) => JammiError::Json(s),
other => JammiError::Catalog(other.to_string()),
}
}
pub(crate) async fn register_parquet_table(
ctx: &SessionContext,
registry: &StorageRegistry,
name: &str,
url: &StorageUrl,
) -> Result<()> {
use datafusion::datasource::file_format::options::ParquetReadOptions;
let driver = registry.driver_for(url, None)?;
if !matches!(url.scheme(), Scheme::File | Scheme::Memory) {
let parsed = ::url::Url::parse(url.as_str()).map_err(|e| {
JammiError::Config(format!("Storage URL '{url}' did not re-parse: {e}"))
})?;
ctx.runtime_env().register_object_store(&parsed, driver);
}
let table_ref = TableReference::bare(format!("jammi.{name}"));
let _ = ListingTableUrl::parse(url.as_str())?;
ctx.register_parquet(table_ref, url.as_str(), ParquetReadOptions::default())
.await?;
Ok(())
}