use mnm_core::types::{Source, SourceKind};
use sqlx::PgPool;
use time::OffsetDateTime;
use uuid::Uuid;
use crate::error::Result;
pub async fn insert(
pool: &PgPool,
slug: &str,
display_name: &str,
kind: SourceKind,
origin_url: Option<&str>,
retention_count: i32,
) -> Result<Uuid> {
let row: (Uuid,) = sqlx::query_as(
"INSERT INTO source (slug, display_name, kind, origin_url, retention_count) \
VALUES ($1, $2, $3, $4, $5) RETURNING id",
)
.bind(slug)
.bind(display_name)
.bind(
serde_json::to_value(kind)
.expect("SourceKind serializes")
.as_str()
.unwrap(),
)
.bind(origin_url)
.bind(retention_count)
.fetch_one(pool)
.await?;
Ok(row.0)
}
pub async fn get_by_slug(pool: &PgPool, slug: &str) -> Result<Source> {
let row = sqlx::query_as::<_, SourceRow>(
"SELECT id, slug, display_name, kind, origin_url, retention_count, created_at, retired_at \
FROM source WHERE slug = $1",
)
.bind(slug)
.fetch_one(pool)
.await?;
row.try_into()
}
pub async fn list_active(pool: &PgPool) -> Result<Vec<Source>> {
let rows = sqlx::query_as::<_, SourceRow>(
"SELECT id, slug, display_name, kind, origin_url, retention_count, created_at, retired_at \
FROM source WHERE retired_at IS NULL ORDER BY slug",
)
.fetch_all(pool)
.await?;
rows.into_iter().map(TryInto::try_into).collect()
}
#[derive(Debug, Default)]
pub struct SourcePageQuery {
pub after_slug: Option<String>,
pub limit: i64,
pub created_after: Option<OffsetDateTime>,
pub created_before: Option<OffsetDateTime>,
pub kind: Option<String>,
pub include_retired: bool,
}
#[derive(Debug)]
pub struct SourcePage {
pub sources: Vec<Source>,
pub total: i64,
pub next_after_slug: Option<String>,
}
pub async fn list_paged(pool: &PgPool, q: &SourcePageQuery) -> Result<SourcePage> {
fn push_filters<'a>(b: &mut sqlx::QueryBuilder<'a, sqlx::Postgres>, q: &'a SourcePageQuery) {
b.push(" WHERE 1=1");
if !q.include_retired {
b.push(" AND retired_at IS NULL");
}
if let Some(t) = q.created_after {
b.push(" AND created_at > ").push_bind(t);
}
if let Some(t) = q.created_before {
b.push(" AND created_at < ").push_bind(t);
}
if let Some(k) = &q.kind {
b.push(" AND kind = ").push_bind(k.as_str());
}
}
let mut count = sqlx::QueryBuilder::new("SELECT count(*) FROM source");
push_filters(&mut count, q);
let total: i64 = count.build_query_scalar().fetch_one(pool).await?;
let mut page = sqlx::QueryBuilder::new(
"SELECT id, slug, display_name, kind, origin_url, retention_count, created_at, retired_at \
FROM source",
);
push_filters(&mut page, q);
if let Some(after) = &q.after_slug {
page.push(" AND slug > ").push_bind(after.as_str());
}
page.push(" ORDER BY slug LIMIT ")
.push_bind(q.limit.saturating_add(1));
let rows: Vec<SourceRow> = page.build_query_as().fetch_all(pool).await?;
let page_len = usize::try_from(q.limit).unwrap_or(usize::MAX);
let has_more = rows.len() > page_len;
let sources: Vec<Source> = rows
.into_iter()
.take(page_len)
.map(TryInto::try_into)
.collect::<Result<_>>()?;
let next_after_slug = if has_more {
sources.last().map(|s| s.slug.clone())
} else {
None
};
Ok(SourcePage {
sources,
total,
next_after_slug,
})
}
pub async fn list_all(pool: &PgPool) -> Result<Vec<Source>> {
let rows = sqlx::query_as::<_, SourceRow>(
"SELECT id, slug, display_name, kind, origin_url, retention_count, created_at, retired_at \
FROM source ORDER BY slug",
)
.fetch_all(pool)
.await?;
rows.into_iter().map(TryInto::try_into).collect()
}
pub async fn list_active_not_on_model(pool: &PgPool, target_model_id: Uuid) -> Result<Vec<Source>> {
let rows = sqlx::query_as::<_, SourceRow>(
"SELECT s.id, s.slug, s.display_name, s.kind, s.origin_url, s.retention_count, s.created_at, s.retired_at \
FROM source s \
JOIN source_version sv ON sv.source_id = s.id AND sv.is_active = true \
LEFT JOIN document d ON d.source_version_id = sv.id \
WHERE s.retired_at IS NULL AND sv.embedding_model_id <> $1 \
GROUP BY s.id, s.slug, s.display_name, s.kind, s.origin_url, s.retention_count, s.created_at, s.retired_at \
ORDER BY MIN(CASE d.provenance->>'attribution' \
WHEN 'foundation' THEN 1 WHEN 'partner' THEN 2 WHEN 'third_party' THEN 3 \
WHEN 'community' THEN 4 ELSE 5 END) ASC NULLS LAST, s.slug ASC",
)
.bind(target_model_id)
.fetch_all(pool)
.await?;
rows.into_iter().map(TryInto::try_into).collect()
}
#[derive(Debug, Default, Clone)]
pub struct SourcePatch {
pub display_name: Option<String>,
pub origin_url: Option<String>,
pub retention_count: Option<i32>,
}
pub async fn update(pool: &PgPool, slug: &str, patch: SourcePatch) -> Result<Source> {
let row = sqlx::query_as::<_, SourceRow>(
"UPDATE source SET \
display_name = COALESCE($2, display_name), \
origin_url = COALESCE($3, origin_url), \
retention_count = COALESCE($4, retention_count) \
WHERE slug = $1 \
RETURNING id, slug, display_name, kind, origin_url, retention_count, created_at, retired_at",
)
.bind(slug)
.bind(patch.display_name)
.bind(patch.origin_url)
.bind(patch.retention_count)
.fetch_optional(pool)
.await?;
row.map_or(Err(crate::error::StoreError::NotFound), TryInto::try_into)
}
pub async fn sweep_retired(pool: &PgPool, grace_seconds: i64) -> Result<Vec<String>> {
let grace = grace_seconds.max(0);
let rows: Vec<(String,)> = sqlx::query_as(
"DELETE FROM source \
WHERE retired_at IS NOT NULL \
AND retired_at < now() - ($1::bigint * interval '1 second') \
RETURNING slug",
)
.bind(grace)
.fetch_all(pool)
.await?;
let mut slugs: Vec<String> = rows.into_iter().map(|(s,)| s).collect();
slugs.sort();
Ok(slugs)
}
pub async fn retire(pool: &PgPool, slug: &str) -> Result<()> {
let result =
sqlx::query("UPDATE source SET retired_at = COALESCE(retired_at, now()) WHERE slug = $1")
.bind(slug)
.execute(pool)
.await?;
if result.rows_affected() == 0 {
return Err(crate::error::StoreError::NotFound);
}
Ok(())
}
#[derive(sqlx::FromRow)]
struct SourceRow {
id: Uuid,
slug: String,
display_name: String,
kind: String,
origin_url: Option<String>,
retention_count: i32,
created_at: OffsetDateTime,
retired_at: Option<OffsetDateTime>,
}
impl TryFrom<SourceRow> for Source {
type Error = crate::error::StoreError;
fn try_from(r: SourceRow) -> std::result::Result<Self, Self::Error> {
let kind: SourceKind = serde_json::from_value(serde_json::Value::String(r.kind))
.map_err(|e| crate::error::StoreError::Json(e.to_string()))?;
Ok(Self {
id: r.id,
slug: r.slug,
display_name: r.display_name,
kind,
origin_url: r.origin_url,
retention_count: r.retention_count,
created_at: r.created_at,
retired_at: r.retired_at,
})
}
}