pub mod mutable;
pub mod schema;
pub mod vectors;
use std::path::Path;
use std::sync::Arc;
use arrow::array::Array;
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::prelude::SessionContext;
use tracing::warn;
use crate::catalog::result_repo::{CreateResultTableParams, ResultTableRecord};
use crate::catalog::status::ResultTableStatus;
use crate::catalog::Catalog;
use crate::error::{JammiError, Result};
use crate::index::sidecar::SidecarIndex;
use crate::index::VectorIndex;
use crate::model_task::ModelTask;
use crate::storage::{
self, JammiObjectStore, ObjectParquetWriter, Scheme, StorageRegistry, StorageUrl,
};
#[derive(Debug)]
pub struct ResultTableInfo {
pub table_name: String,
pub parquet_url: StorageUrl,
pub index_url: Option<StorageUrl>,
}
pub struct ResultStore {
root: StorageUrl,
registry: StorageRegistry,
catalog: Arc<Catalog>,
}
fn sanitize_model_id(model_id: &str) -> String {
model_id
.chars()
.map(|c| {
if c == '/' || c == ':' || c == ' ' || c == '.' {
'_'
} else {
c
}
})
.take(64)
.collect()
}
impl ResultStore {
pub fn new(artifact_dir: &Path, catalog: Arc<Catalog>) -> Result<Self> {
let jammi_db_dir = artifact_dir.join("jammi_db");
std::fs::create_dir_all(&jammi_db_dir)?;
let url = StorageUrl::parse(
jammi_db_dir
.to_str()
.ok_or_else(|| JammiError::Config("Non-UTF8 artifact_dir".into()))?,
)?;
Ok(Self {
root: url,
registry: StorageRegistry::new(),
catalog,
})
}
pub fn with_root(
root: StorageUrl,
registry: StorageRegistry,
catalog: Arc<Catalog>,
) -> Result<Self> {
if root.scheme() == Scheme::File {
let path = root.path();
std::fs::create_dir_all(path)?;
}
Ok(Self {
root,
registry,
catalog,
})
}
pub fn open_parquet(&self, url: &StorageUrl) -> Result<JammiObjectStore> {
let driver = self.registry.driver_for(url, None)?;
Ok(JammiObjectStore::new(driver, url.clone()))
}
pub fn open_index(&self, url: &StorageUrl) -> Result<JammiObjectStore> {
let driver = self.registry.driver_for(url, None)?;
Ok(JammiObjectStore::new(driver, url.clone()))
}
pub async fn create_table(
&self,
source_id: &str,
task: ModelTask,
model_id: &str,
dimensions: Option<i32>,
key_column: Option<&str>,
text_columns: Option<&str>,
) -> Result<ResultTableInfo> {
let sanitized = sanitize_model_id(model_id);
let timestamp = chrono::Utc::now().format("%Y%m%dT%H%M%S%9f");
let suffix = &uuid::Uuid::new_v4().simple().to_string()[..8];
let task_str = task.as_db_str();
let table_name = format!("{source_id}__{task_str}__{sanitized}__{timestamp}_{suffix}");
let parquet_url = self.derive_url(&format!("{table_name}.parquet"))?;
let index_url = if task.is_embedding() {
Some(self.derive_url(&format!("{table_name}.idx"))?)
} else {
None
};
self.catalog
.create_result_table(CreateResultTableParams {
table_name: &table_name,
source_id,
model_id,
task,
parquet_path: parquet_url.as_str(),
index_path: index_url.as_ref().map(|u| u.as_str()),
dimensions,
key_column,
text_columns,
})
.await?;
Ok(ResultTableInfo {
table_name,
parquet_url,
index_url,
})
}
pub async fn open_writer(
&self,
url: &StorageUrl,
schema: arrow::datatypes::SchemaRef,
) -> Result<ObjectParquetWriter> {
let handle = self.open_parquet(url)?;
Ok(ObjectParquetWriter::open(&handle, schema).await?)
}
pub async fn register_table(
&self,
ctx: &SessionContext,
name: &str,
url: &StorageUrl,
) -> Result<()> {
register_parquet_table(ctx, &self.registry, name, url).await
}
pub async fn finalize(
&self,
ctx: &SessionContext,
name: &str,
url: &StorageUrl,
rows: usize,
) -> Result<()> {
self.register_table(ctx, name, url).await?;
self.catalog
.update_result_table_status(name, ResultTableStatus::Ready, rows)
.await?;
Ok(())
}
pub async fn recover(&self) -> Result<()> {
let building = self
.catalog
.list_result_tables_by_status(ResultTableStatus::Building)
.await?;
for table in building {
let parquet_url = StorageUrl::parse(&table.parquet_path)?;
let parquet_handle = self.open_parquet(&parquet_url)?;
let parquet_path = parquet_handle.data_path()?;
let parquet_exists = parquet_handle.exists(&parquet_path).await?;
let parquet_valid =
parquet_exists && storage::reader::is_valid_parquet(&parquet_handle).await?;
if !parquet_exists {
warn!(
table = table.table_name,
"Recovery: Parquet missing, marking failed"
);
self.catalog
.update_result_table_status(&table.table_name, ResultTableStatus::Failed, 0)
.await?;
} else if !parquet_valid {
warn!(
table = table.table_name,
"Recovery: invalid Parquet, deleting and marking failed"
);
parquet_handle.delete_if_exists(&parquet_path).await.ok();
if let Some(ref idx) = table.index_path {
let idx_url = StorageUrl::parse(idx)?;
let idx_handle = self.open_index(&idx_url)?;
storage::sidecar_layout::delete_sidecar(&idx_handle)
.await
.ok();
}
self.catalog
.update_result_table_status(&table.table_name, ResultTableStatus::Failed, 0)
.await?;
} else {
let row_count = storage::reader::count_parquet_rows(&parquet_handle).await?;
if table.task.is_embedding() {
if let Some(ref idx_path) = table.index_path {
let idx_url = StorageUrl::parse(idx_path)?;
if let Err(e) = self
.rebuild_index_from_parquet(
&parquet_handle,
&idx_url,
table.dimensions.unwrap_or(0) as usize,
)
.await
{
warn!(
table = table.table_name,
error = %e,
"Recovery: failed to rebuild index, proceeding without"
);
}
}
}
self.catalog
.update_result_table_status(
&table.table_name,
ResultTableStatus::Ready,
row_count,
)
.await?;
}
}
Ok(())
}
pub async fn load_existing_tables(&self, ctx: &SessionContext) -> Result<()> {
let ready = self
.catalog
.list_result_tables_by_status(ResultTableStatus::Ready)
.await?;
for table in ready {
let url = match StorageUrl::parse(&table.parquet_path) {
Ok(u) => u,
Err(e) => {
warn!(
table = table.table_name,
error = %e,
"Result-table parquet_path is not a valid storage URL"
);
continue;
}
};
let handle = self.open_parquet(&url)?;
let path = handle.data_path()?;
if handle.exists(&path).await? {
if let Err(e) = self.register_table(ctx, &table.table_name, &url).await {
warn!(
table = table.table_name,
error = %e,
"Failed to register existing table"
);
}
}
}
Ok(())
}
pub async fn search_vectors(
&self,
ctx: &SessionContext,
table: &ResultTableRecord,
query: &[f32],
k: usize,
) -> Result<Vec<(String, f32)>> {
let index = self.resolve_search_mode(table).await?;
match index {
Some(idx) => idx.search(query, k),
None => {
crate::index::exact::exact_vector_search(ctx, &table.table_name, query, k).await
}
}
}
pub async fn resolve_search_mode(
&self,
table: &ResultTableRecord,
) -> Result<Option<SidecarIndex>> {
let Some(ref idx_path) = table.index_path else {
return Ok(None);
};
let idx_url = StorageUrl::parse(idx_path)?;
let handle = self.open_index(&idx_url)?;
match storage::sidecar_layout::load_sidecar(&handle).await {
Ok(index) => Ok(Some(index)),
Err(e) => {
warn!(
table = table.table_name,
error = %e,
"Sidecar index unavailable, falling back to exact search"
);
Ok(None)
}
}
}
pub async fn save_sidecar(&self, url: &StorageUrl, index: &SidecarIndex) -> Result<()> {
let handle = self.open_index(url)?;
storage::sidecar_layout::save_sidecar(&handle, index).await
}
pub async fn delete_table_files(
&self,
parquet_path: &str,
index_path: Option<&str>,
) -> Result<()> {
let parquet_url = StorageUrl::parse(parquet_path)?;
let parquet_handle = self.open_parquet(&parquet_url)?;
let path = parquet_handle.data_path()?;
parquet_handle.delete_if_exists(&path).await?;
if let Some(idx) = index_path {
let idx_url = StorageUrl::parse(idx)?;
let idx_handle = self.open_index(&idx_url)?;
storage::sidecar_layout::delete_sidecar(&idx_handle).await?;
}
Ok(())
}
fn derive_url(&self, name: &str) -> Result<StorageUrl> {
let root_str = self.root.as_str();
let joined = if root_str.ends_with('/') {
format!("{root_str}{name}")
} else {
format!("{root_str}/{name}")
};
Ok(StorageUrl::parse(&joined)?)
}
async fn rebuild_index_from_parquet(
&self,
parquet_handle: &JammiObjectStore,
index_url: &StorageUrl,
dimensions: usize,
) -> Result<()> {
if dimensions == 0 {
return Ok(());
}
let batches = storage::reader::read_all_record_batches(parquet_handle).await?;
let mut index = SidecarIndex::new(dimensions)?;
for batch in batches {
let row_ids = batch
.column_by_name("_row_id")
.and_then(|c| c.as_any().downcast_ref::<arrow::array::StringArray>());
let vectors = batch.column_by_name("vector").and_then(|c| {
c.as_any()
.downcast_ref::<arrow::array::FixedSizeListArray>()
});
if let (Some(ids), Some(vecs)) = (row_ids, vectors) {
for i in 0..ids.len() {
let row_id = ids.value(i);
let v = vecs.value(i);
let float_arr = v
.as_any()
.downcast_ref::<arrow::array::Float32Array>()
.ok_or_else(|| JammiError::Other("Vector not Float32".into()))?;
let vec: Vec<f32> = (0..float_arr.len()).map(|j| float_arr.value(j)).collect();
index.add(row_id, &vec)?;
}
}
}
if index.len() > 0 {
index.build()?;
self.save_sidecar(index_url, &index).await?;
}
Ok(())
}
}
pub(crate) async fn register_parquet_table(
ctx: &SessionContext,
registry: &StorageRegistry,
name: &str,
url: &StorageUrl,
) -> Result<()> {
use datafusion::catalog::MemorySchemaProvider;
use datafusion::datasource::file_format::options::ParquetReadOptions;
let driver = registry.driver_for(url, None)?;
if !matches!(url.scheme(), Scheme::File | Scheme::Memory) {
let parsed = ::url::Url::parse(url.as_str()).map_err(|e| {
JammiError::Config(format!("Storage URL '{url}' did not re-parse: {e}"))
})?;
ctx.runtime_env().register_object_store(&parsed, driver);
}
let default_catalog_name = ctx.state().config_options().catalog.default_catalog.clone();
let default_catalog = ctx
.catalog(&default_catalog_name)
.ok_or_else(|| JammiError::Other("Default catalog not found".into()))?;
if default_catalog.schema("jammi").is_none() {
let _ = default_catalog.register_schema("jammi", Arc::new(MemorySchemaProvider::new()));
}
let table_ref = format!("jammi.{name}");
let _ = ListingTableUrl::parse(url.as_str())?;
ctx.register_parquet(&table_ref, url.as_str(), ParquetReadOptions::default())
.await?;
Ok(())
}