use mnm_core::types::{SourceVersion, SourceVersionStatus};
use sqlx::PgPool;
use time::OffsetDateTime;
use uuid::Uuid;
use crate::error::Result;
pub async fn create_building(
pool: &PgPool,
source_id: Uuid,
embedding_model_id: Uuid,
code_embedding_model_id: Option<Uuid>,
ingest_cli_version: &str,
content_hash: &str,
) -> Result<(Uuid, i32)> {
let row: (Uuid, i32) = sqlx::query_as(
"INSERT INTO source_version (source_id, revision, status, embedding_model_id, \
code_embedding_model_id, ingest_cli_version, content_hash) \
SELECT $1, COALESCE(MAX(revision), 0) + 1, 'building', $2, $3, $4, $5 \
FROM source_version WHERE source_id = $1 \
RETURNING id, revision",
)
.bind(source_id)
.bind(embedding_model_id)
.bind(code_embedding_model_id)
.bind(ingest_cli_version)
.bind(content_hash)
.fetch_one(pool)
.await?;
Ok(row)
}
pub async fn finalize(pool: &PgPool, source_version_id: Uuid) -> Result<(i32, Option<i32>)> {
let mut tx = pool.begin().await?;
let row: (Uuid, i32, String) = sqlx::query_as(
"SELECT source_id, revision, status FROM source_version WHERE id = $1 FOR UPDATE",
)
.bind(source_version_id)
.fetch_one(&mut *tx)
.await?;
let (source_id, promoted_revision, status) = row;
if status != "building" {
return Err(crate::error::StoreError::CheckViolation(format!(
"source_version {source_version_id} is not in building state (current: {status})"
)));
}
let demoted: Option<(i32,)> = sqlx::query_as(
"UPDATE source_version SET is_active = false, status = 'inactive' \
WHERE source_id = $1 AND is_active = true AND id <> $2 \
RETURNING revision",
)
.bind(source_id)
.bind(source_version_id)
.fetch_optional(&mut *tx)
.await?;
sqlx::query(
"UPDATE source_version SET is_active = true, status = 'active', ingested_at = now() \
WHERE id = $1",
)
.bind(source_version_id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok((promoted_revision, demoted.map(|r| r.0)))
}
pub async fn abort(pool: &PgPool, source_version_id: Uuid) -> Result<()> {
let r = sqlx::query(
"UPDATE source_version SET status = 'aborted' WHERE id = $1 AND status = 'building'",
)
.bind(source_version_id)
.execute(pool)
.await?;
if r.rows_affected() == 0 {
return Err(crate::error::StoreError::NotFound);
}
Ok(())
}
pub async fn retire(pool: &PgPool, source_version_id: Uuid) -> Result<()> {
let r = sqlx::query(
"UPDATE source_version SET status = 'retired', is_active = false, retired_at = now() \
WHERE id = $1",
)
.bind(source_version_id)
.execute(pool)
.await?;
if r.rows_affected() == 0 {
return Err(crate::error::StoreError::NotFound);
}
Ok(())
}
pub async fn get_active(pool: &PgPool, source_id: Uuid) -> Result<SourceVersion> {
let row = sqlx::query_as::<_, SourceVersionRow>(
"SELECT id, source_id, revision, status, is_active, ingested_at, ingest_cli_version, \
embedding_model_id, code_embedding_model_id, content_hash, notes, retired_at \
FROM source_version WHERE source_id = $1 AND is_active = true",
)
.bind(source_id)
.fetch_one(pool)
.await?;
row.try_into()
}
pub async fn get_by_id(pool: &PgPool, id: Uuid) -> Result<SourceVersion> {
let row = sqlx::query_as::<_, SourceVersionRow>(
"SELECT id, source_id, revision, status, is_active, ingested_at, ingest_cli_version, \
embedding_model_id, code_embedding_model_id, content_hash, notes, retired_at \
FROM source_version WHERE id = $1",
)
.bind(id)
.fetch_one(pool)
.await?;
row.try_into()
}
pub async fn list_for_source(pool: &PgPool, source_id: Uuid) -> Result<Vec<SourceVersion>> {
let rows = sqlx::query_as::<_, SourceVersionRow>(
"SELECT id, source_id, revision, status, is_active, ingested_at, ingest_cli_version, \
embedding_model_id, code_embedding_model_id, content_hash, notes, retired_at \
FROM source_version WHERE source_id = $1 ORDER BY revision DESC",
)
.bind(source_id)
.fetch_all(pool)
.await?;
rows.into_iter().map(TryInto::try_into).collect()
}
pub async fn promote_by_revision(
pool: &PgPool,
source_id: Uuid,
revision: i32,
) -> Result<(i32, Option<i32>)> {
let mut tx = pool.begin().await?;
let row: (Uuid, String) = sqlx::query_as(
"SELECT id, status FROM source_version \
WHERE source_id = $1 AND revision = $2 FOR UPDATE",
)
.bind(source_id)
.bind(revision)
.fetch_one(&mut *tx)
.await?;
let (target_id, status) = row;
if status != "inactive" {
return Err(crate::error::StoreError::CheckViolation(format!(
"source_version revision {revision} is in `{status}` — \
only `inactive` versions can be promoted (active is already current)"
)));
}
let demoted: Option<(i32,)> = sqlx::query_as(
"UPDATE source_version SET is_active = false, status = 'inactive' \
WHERE source_id = $1 AND is_active = true AND id <> $2 \
RETURNING revision",
)
.bind(source_id)
.bind(target_id)
.fetch_optional(&mut *tx)
.await?;
sqlx::query(
"UPDATE source_version SET is_active = true, status = 'active', retired_at = NULL \
WHERE id = $1",
)
.bind(target_id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok((revision, demoted.map(|r| r.0)))
}
pub async fn sweep_aged_inactive(pool: &PgPool, grace_seconds: i64) -> Result<Vec<(Uuid, i32)>> {
let grace = grace_seconds.max(0);
let rows: Vec<(Uuid, i32)> = sqlx::query_as(
"WITH ranked AS ( \
SELECT sv.id, sv.source_id, sv.revision, sv.ingested_at, sv.status, \
ROW_NUMBER() OVER ( \
PARTITION BY sv.source_id ORDER BY sv.revision DESC \
) AS rn \
FROM source_version sv \
WHERE sv.status IN ('active','inactive','retired') \
), \
eligible AS ( \
SELECT r.id, r.source_id, r.revision \
FROM ranked r \
JOIN source s ON s.id = r.source_id \
WHERE r.rn > s.retention_count \
AND r.status IN ('inactive','retired') \
AND r.ingested_at < now() - ($1::bigint * interval '1 second') \
) \
DELETE FROM source_version \
WHERE id IN (SELECT id FROM eligible) \
RETURNING source_id, revision",
)
.bind(grace)
.fetch_all(pool)
.await?;
let mut pairs: Vec<(Uuid, i32)> = rows;
pairs.sort();
Ok(pairs)
}
pub async fn sweep_aborted(pool: &PgPool, grace_seconds: i64) -> Result<Vec<(Uuid, i32)>> {
let grace = grace_seconds.max(0);
let rows: Vec<(Uuid, i32)> = sqlx::query_as(
"DELETE FROM source_version \
WHERE status = 'aborted' \
AND ingested_at < now() - ($1::bigint * interval '1 second') \
RETURNING source_id, revision",
)
.bind(grace)
.fetch_all(pool)
.await?;
let mut pairs: Vec<(Uuid, i32)> = rows;
pairs.sort();
Ok(pairs)
}
pub async fn get_by_revision(
pool: &PgPool,
source_id: Uuid,
revision: i32,
) -> Result<SourceVersion> {
let row = sqlx::query_as::<_, SourceVersionRow>(
"SELECT id, source_id, revision, status, is_active, ingested_at, ingest_cli_version, \
embedding_model_id, code_embedding_model_id, content_hash, notes, retired_at \
FROM source_version WHERE source_id = $1 AND revision = $2",
)
.bind(source_id)
.bind(revision)
.fetch_one(pool)
.await?;
row.try_into()
}
#[derive(sqlx::FromRow)]
struct SourceVersionRow {
id: Uuid,
source_id: Uuid,
revision: i32,
status: String,
is_active: bool,
ingested_at: OffsetDateTime,
ingest_cli_version: String,
embedding_model_id: Uuid,
code_embedding_model_id: Option<Uuid>,
content_hash: String,
notes: Option<String>,
retired_at: Option<OffsetDateTime>,
}
impl TryFrom<SourceVersionRow> for SourceVersion {
type Error = crate::error::StoreError;
fn try_from(r: SourceVersionRow) -> std::result::Result<Self, Self::Error> {
let status: SourceVersionStatus =
serde_json::from_value(serde_json::Value::String(r.status))
.map_err(|e| crate::error::StoreError::Json(e.to_string()))?;
Ok(Self {
id: r.id,
source_id: r.source_id,
revision: r.revision,
status,
is_active: r.is_active,
ingested_at: r.ingested_at,
ingest_cli_version: r.ingest_cli_version,
embedding_model_id: r.embedding_model_id,
code_embedding_model_id: r.code_embedding_model_id,
content_hash: r.content_hash,
notes: r.notes,
retired_at: r.retired_at,
})
}
}