use crate::catalog::{Catalog, db_err};
use orbok_core::{ChunkId, EmbeddingId, FileId, ModelId, OrbokResult, now_iso8601};
use rusqlite::params;
pub struct NewEmbedding {
pub chunk_id: ChunkId,
pub model_id: ModelId,
pub dimension: u32,
pub vector: Vec<f32>,
}
#[derive(Debug, Clone)]
pub struct EmbeddingRecord {
pub embedding_id: EmbeddingId,
pub chunk_id: ChunkId,
pub file_id: FileId,
pub vector: Vec<f32>,
}
pub struct EmbeddingRepository<'a> {
catalog: &'a Catalog,
}
impl<'a> EmbeddingRepository<'a> {
pub fn new(catalog: &'a Catalog) -> Self {
Self { catalog }
}
pub fn upsert(&self, new: &NewEmbedding) -> OrbokResult<()> {
let id = EmbeddingId::generate();
let now = now_iso8601();
let blob = orbok_models::vec_to_blob(&new.vector);
let conn = self.catalog.lock();
conn.execute(
"INSERT INTO embeddings \
(embedding_id, chunk_id, model_id, vector_format, dimension, norm, \
storage_location, vector_blob, status, created_at, updated_at) \
VALUES (?1,?2,?3,'fp32',?4,'l2','sqlite_blob',?5,'active',?6,?6) \
ON CONFLICT(chunk_id, model_id, vector_format) DO UPDATE SET \
vector_blob=?5, status='active', updated_at=?6",
params![
id.as_str(),
new.chunk_id.as_str(),
new.model_id.as_str(),
new.dimension as i64,
blob,
now,
],
)
.map_err(db_err)?;
Ok(())
}
pub fn list_active_for_scan(
&self,
model_id: &str,
dimension: u32,
) -> OrbokResult<Vec<EmbeddingRecord>> {
let conn = self.catalog.lock();
let mut stmt = conn
.prepare(
"SELECT e.embedding_id, e.chunk_id, c.file_id, e.vector_blob \
FROM embeddings e \
JOIN chunks c ON c.chunk_id = e.chunk_id \
WHERE e.model_id = ?1 AND e.dimension = ?2 \
AND e.status = 'active' AND c.chunk_status = 'active'",
)
.map_err(db_err)?;
let rows = stmt
.query_map(params![model_id, dimension as i64], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, String>(2)?,
row.get::<_, Vec<u8>>(3)?,
))
})
.map_err(db_err)?;
let mut out = Vec::new();
for row in rows {
let (emb_id, chunk_id, file_id, blob) = row.map_err(db_err)?;
let vector = orbok_models::blob_to_vec(&blob, dimension).unwrap_or_default();
out.push(EmbeddingRecord {
embedding_id: EmbeddingId::from_string(emb_id),
chunk_id: ChunkId::from_string(chunk_id),
file_id: FileId::from_string(file_id),
vector,
});
}
Ok(out)
}
pub fn mark_stale_for_model(&self, model_id: &str) -> OrbokResult<u64> {
let conn = self.catalog.lock();
let n = conn
.execute(
"UPDATE embeddings SET status='stale', updated_at=?2 WHERE model_id=?1",
params![model_id, now_iso8601()],
)
.map_err(db_err)?;
Ok(n as u64)
}
pub fn count_active(&self, model_id: &str) -> OrbokResult<u64> {
let conn = self.catalog.lock();
let n: i64 = conn
.query_row(
"SELECT COUNT(*) FROM embeddings e \
JOIN chunks c ON c.chunk_id = e.chunk_id \
WHERE e.model_id=?1 AND e.status='active' AND c.chunk_status='active'",
params![model_id],
|r| r.get(0),
)
.map_err(db_err)?;
Ok(n as u64)
}
pub fn upsert_i8(
&self,
chunk_id: &orbok_core::ChunkId,
model_id: &orbok_core::ModelId,
dimension: u32,
i8_vector: &[i8],
) -> OrbokResult<()> {
let id = orbok_core::EmbeddingId::generate();
let now = orbok_core::now_iso8601();
let blob: Vec<u8> = i8_vector.iter().map(|&b| b as u8).collect();
let conn = self.catalog.lock();
conn.execute(
"INSERT INTO embeddings \
(embedding_id, chunk_id, model_id, vector_format, dimension, norm, \
storage_location, vector_blob, status, created_at, updated_at) \
VALUES (?1,?2,?3,'int8',?4,'l2','sqlite_blob',?5,'active',?6,?6) \
ON CONFLICT(chunk_id, model_id, vector_format) DO UPDATE SET \
vector_blob=?5, status='active', updated_at=?6",
rusqlite::params![id.as_str(), chunk_id.as_str(), model_id.as_str(),
dimension as i64, blob, now],
)
.map_err(crate::catalog::db_err)?;
Ok(())
}
}