#![cfg(feature = "integration")]
#![allow(clippy::too_many_lines, clippy::doc_markdown)]
mod common;
use mnm_core::types::{SourceKind, SourceVersionStatus};
use mnm_store::entities::{embedding_model, source, source_version};
use sqlx::PgPool;
use uuid::Uuid;
async fn seed_source(pool: &PgPool, prefix: &str, retention_count: i32) -> (Uuid, Uuid) {
let model_id = embedding_model::upsert(pool, "bge-base-en-v1.5", 1, 768, "baai")
.await
.unwrap();
let slug = format!("{prefix}-{}", Uuid::new_v4());
let source_id =
source::insert(pool, &slug, "SV Fixture", SourceKind::DocsSite, None, retention_count)
.await
.unwrap();
(source_id, model_id)
}
async fn seed_n_finalized(pool: &PgPool, source_id: Uuid, model_id: Uuid, n: usize) -> Vec<Uuid> {
let mut ids = Vec::with_capacity(n);
for i in 0..n {
let (id, _rev) = source_version::create_building(
pool,
source_id,
model_id,
None,
"0.1.0",
&format!("h{i}"),
)
.await
.unwrap();
source_version::finalize(pool, id).await.unwrap();
ids.push(id);
}
ids
}
async fn age_version(pool: &PgPool, sv_id: Uuid, seconds_ago: i64) {
sqlx::query(
"UPDATE source_version \
SET ingested_at = now() - ($1::bigint * interval '1 second') \
WHERE id = $2",
)
.bind(seconds_ago)
.bind(sv_id)
.execute(pool)
.await
.unwrap();
}
#[tokio::test]
async fn list_for_source_returns_versions_newest_first() {
let h = common::boot().await;
let (source_id, model_id) = seed_source(&h.pool, "sv-list", 5).await;
seed_n_finalized(&h.pool, source_id, model_id, 3).await;
let rows = source_version::list_for_source(&h.pool, source_id)
.await
.unwrap();
assert_eq!(rows.len(), 3);
assert_eq!(rows[0].revision, 3);
assert!(rows[0].is_active);
assert_eq!(rows[1].revision, 2);
assert_eq!(rows[1].status, SourceVersionStatus::Inactive);
assert_eq!(rows[2].revision, 1);
}
#[tokio::test]
async fn promote_by_revision_swaps_active_with_inactive() {
let h = common::boot().await;
let (source_id, model_id) = seed_source(&h.pool, "sv-promote", 5).await;
let _ids = seed_n_finalized(&h.pool, source_id, model_id, 3).await;
let (promoted, demoted) = source_version::promote_by_revision(&h.pool, source_id, 1)
.await
.unwrap();
assert_eq!(promoted, 1);
assert_eq!(demoted, Some(3));
let active = source_version::get_active(&h.pool, source_id)
.await
.unwrap();
assert_eq!(active.revision, 1);
let rev3 = source_version::get_by_revision(&h.pool, source_id, 3)
.await
.unwrap();
assert_eq!(rev3.status, SourceVersionStatus::Inactive);
assert!(!rev3.is_active);
}
#[tokio::test]
async fn promote_by_revision_rejects_already_active() {
let h = common::boot().await;
let (source_id, model_id) = seed_source(&h.pool, "sv-promote-noop", 5).await;
seed_n_finalized(&h.pool, source_id, model_id, 2).await;
let err = source_version::promote_by_revision(&h.pool, source_id, 2)
.await
.unwrap_err();
match err {
mnm_store::StoreError::CheckViolation(msg) => {
assert!(msg.contains("active"), "msg = {msg}");
}
other => panic!("expected CheckViolation, got {other:?}"),
}
}
#[tokio::test]
async fn promote_by_revision_404_on_unknown_revision() {
let h = common::boot().await;
let (source_id, _) = seed_source(&h.pool, "sv-promote-missing", 5).await;
let err = source_version::promote_by_revision(&h.pool, source_id, 999)
.await
.unwrap_err();
assert!(matches!(err, mnm_store::StoreError::NotFound), "{err:?}");
}
#[tokio::test]
async fn sweep_aged_inactive_keeps_retention_window() {
let h = common::boot().await;
let (source_id, model_id) = seed_source(&h.pool, "sv-sweep-keep", 3).await;
let ids = seed_n_finalized(&h.pool, source_id, model_id, 5).await;
for id in &ids {
age_version(&h.pool, *id, 25 * 60 * 60).await;
}
let grace_seconds = 24 * 60 * 60;
source_version::sweep_aged_inactive(&h.pool, grace_seconds)
.await
.unwrap();
let remaining = source_version::list_for_source(&h.pool, source_id)
.await
.unwrap();
let revs: Vec<i32> = remaining.iter().map(|r| r.revision).collect();
assert_eq!(revs, vec![5, 4, 3], "retention_count=3 keeps the 3 newest; revs 1+2 swept",);
}
#[tokio::test]
async fn sweep_aged_inactive_keeps_within_grace_even_when_outside_retention() {
let h = common::boot().await;
let (source_id, model_id) = seed_source(&h.pool, "sv-sweep-grace", 2).await;
let ids = seed_n_finalized(&h.pool, source_id, model_id, 5).await;
let _ = ids;
let grace_seconds = 24 * 60 * 60;
let deleted = source_version::sweep_aged_inactive(&h.pool, grace_seconds)
.await
.unwrap();
let our_count = deleted.iter().filter(|(sid, _)| *sid == source_id).count();
assert_eq!(our_count, 0, "recently-ingested versions stay in place");
let remaining = source_version::list_for_source(&h.pool, source_id)
.await
.unwrap();
assert_eq!(remaining.len(), 5);
}
#[tokio::test]
async fn sweep_aged_inactive_never_touches_active_version() {
let h = common::boot().await;
let (source_id, model_id) = seed_source(&h.pool, "sv-sweep-active", 1).await;
let ids = seed_n_finalized(&h.pool, source_id, model_id, 3).await;
for id in &ids {
age_version(&h.pool, *id, 25 * 60 * 60).await;
}
source_version::sweep_aged_inactive(&h.pool, 24 * 60 * 60)
.await
.unwrap();
let active = source_version::get_active(&h.pool, source_id)
.await
.unwrap();
assert_eq!(active.revision, 3, "active version must survive sweep");
let remaining = source_version::list_for_source(&h.pool, source_id)
.await
.unwrap();
let revs: Vec<i32> = remaining.iter().map(|r| r.revision).collect();
assert_eq!(revs, vec![3], "with retention=1 only the active version remains");
}
#[tokio::test]
async fn sweep_aged_inactive_leaves_aborted_versions_alone() {
let h = common::boot().await;
let (source_id, model_id) = seed_source(&h.pool, "sv-sweep-aborted", 1).await;
let _active = seed_n_finalized(&h.pool, source_id, model_id, 1).await;
let (aborted_id, _) =
source_version::create_building(&h.pool, source_id, model_id, None, "0.1.0", "h-aborted")
.await
.unwrap();
source_version::abort(&h.pool, aborted_id).await.unwrap();
age_version(&h.pool, aborted_id, 25 * 60 * 60).await;
source_version::sweep_aged_inactive(&h.pool, 24 * 60 * 60)
.await
.unwrap();
let row = source_version::get_by_id(&h.pool, aborted_id)
.await
.unwrap();
assert_eq!(row.status, SourceVersionStatus::Aborted);
}
#[tokio::test]
async fn sweep_aged_inactive_returns_pairs_sorted() {
let h = common::boot().await;
let (source_id, model_id) = seed_source(&h.pool, "sv-sweep-sort", 1).await;
let ids = seed_n_finalized(&h.pool, source_id, model_id, 4).await;
for id in &ids {
age_version(&h.pool, *id, 25 * 60 * 60).await;
}
let deleted = source_version::sweep_aged_inactive(&h.pool, 24 * 60 * 60)
.await
.unwrap();
let our: Vec<(Uuid, i32)> = deleted
.iter()
.copied()
.filter(|(sid, _)| *sid == source_id)
.collect();
let mut sorted = our.clone();
sorted.sort();
assert_eq!(our, sorted);
}
async fn seed_aborted_run(
pool: &PgPool,
source_id: Uuid,
model_id: Uuid,
content_hash: &str,
aged_seconds_ago: i64,
) -> Uuid {
let (id, _rev) =
source_version::create_building(pool, source_id, model_id, None, "0.1.0", content_hash)
.await
.unwrap();
source_version::abort(pool, id).await.unwrap();
if aged_seconds_ago > 0 {
age_version(pool, id, aged_seconds_ago).await;
}
id
}
#[tokio::test]
async fn sweep_aborted_hard_deletes_aged_aborted_runs() {
let h = common::boot().await;
let (source_id, model_id) = seed_source(&h.pool, "sv-abort-aged", 5).await;
let aborted_id = seed_aborted_run(&h.pool, source_id, model_id, "h-aged", 2 * 60 * 60).await;
source_version::sweep_aborted(&h.pool, 60 * 60)
.await
.unwrap();
let row = source_version::get_by_id(&h.pool, aborted_id).await;
assert!(matches!(row, Err(mnm_store::StoreError::NotFound)), "{row:?}");
}
#[tokio::test]
async fn sweep_aborted_keeps_recent_aborted_runs() {
let h = common::boot().await;
let (source_id, model_id) = seed_source(&h.pool, "sv-abort-recent", 5).await;
let aborted_id = seed_aborted_run(&h.pool, source_id, model_id, "h-recent", 30).await;
let deleted = source_version::sweep_aborted(&h.pool, 60 * 60)
.await
.unwrap();
assert!(
!deleted.iter().any(|(_, rev)| *rev == 1),
"should not include OUR rev 1: {deleted:?}",
);
let row = source_version::get_by_id(&h.pool, aborted_id)
.await
.unwrap();
assert_eq!(row.status, SourceVersionStatus::Aborted);
}
#[tokio::test]
async fn sweep_aborted_does_not_touch_active_or_inactive() {
let h = common::boot().await;
let (source_id, model_id) = seed_source(&h.pool, "sv-abort-isolated", 5).await;
let ids = seed_n_finalized(&h.pool, source_id, model_id, 2).await;
for id in &ids {
age_version(&h.pool, *id, 25 * 60 * 60).await;
}
source_version::sweep_aborted(&h.pool, 0).await.unwrap();
let active = source_version::get_active(&h.pool, source_id)
.await
.unwrap();
assert_eq!(active.revision, 2);
let rev1 = source_version::get_by_revision(&h.pool, source_id, 1)
.await
.unwrap();
assert_eq!(rev1.status, SourceVersionStatus::Inactive);
}
#[tokio::test]
async fn sweep_aborted_returns_pairs_sorted() {
let h = common::boot().await;
let (source_id, model_id) = seed_source(&h.pool, "sv-abort-sort", 5).await;
seed_aborted_run(&h.pool, source_id, model_id, "h-a", 2 * 60 * 60).await;
seed_aborted_run(&h.pool, source_id, model_id, "h-b", 2 * 60 * 60).await;
let deleted = source_version::sweep_aborted(&h.pool, 60 * 60)
.await
.unwrap();
let our: Vec<(Uuid, i32)> = deleted
.iter()
.copied()
.filter(|(sid, _)| *sid == source_id)
.collect();
let mut sorted = our.clone();
sorted.sort();
assert_eq!(our, sorted);
}