Skip to main content

mnm_store/entities/
source.rs

1//! `source` entity queries.
2
3use mnm_core::types::{Source, SourceKind};
4use sqlx::PgPool;
5use time::OffsetDateTime;
6use uuid::Uuid;
7
8use crate::error::Result;
9
10/// Insert a new source, returning its newly-minted id.
11///
12/// # Errors
13///
14/// Returns [`crate::error::StoreError::UniqueViolation`] if `slug` already exists.
15pub async fn insert(
16    pool: &PgPool,
17    slug: &str,
18    display_name: &str,
19    kind: SourceKind,
20    origin_url: Option<&str>,
21    retention_count: i32,
22) -> Result<Uuid> {
23    let row: (Uuid,) = sqlx::query_as(
24        "INSERT INTO source (slug, display_name, kind, origin_url, retention_count) \
25         VALUES ($1, $2, $3, $4, $5) RETURNING id",
26    )
27    .bind(slug)
28    .bind(display_name)
29    .bind(
30        serde_json::to_value(kind)
31            .expect("SourceKind serializes")
32            .as_str()
33            .unwrap(),
34    )
35    .bind(origin_url)
36    .bind(retention_count)
37    .fetch_one(pool)
38    .await?;
39    Ok(row.0)
40}
41
42/// Fetch one source by slug.
43///
44/// # Errors
45///
46/// Returns [`crate::error::StoreError::NotFound`] if slug does not exist.
47pub async fn get_by_slug(pool: &PgPool, slug: &str) -> Result<Source> {
48    let row = sqlx::query_as::<_, SourceRow>(
49        "SELECT id, slug, display_name, kind, origin_url, retention_count, created_at, retired_at \
50         FROM source WHERE slug = $1",
51    )
52    .bind(slug)
53    .fetch_one(pool)
54    .await?;
55    row.try_into()
56}
57
58/// List all non-retired sources, ordered by slug.
59///
60/// # Errors
61///
62/// Returns [`crate::error::StoreError::Database`] on driver failure.
63pub async fn list_active(pool: &PgPool) -> Result<Vec<Source>> {
64    let rows = sqlx::query_as::<_, SourceRow>(
65        "SELECT id, slug, display_name, kind, origin_url, retention_count, created_at, retired_at \
66         FROM source WHERE retired_at IS NULL ORDER BY slug",
67    )
68    .fetch_all(pool)
69    .await?;
70    rows.into_iter().map(TryInto::try_into).collect()
71}
72
73/// Filterable, keyset-paginated source listing. Page key is `slug` (unique,
74/// matches the existing ORDER BY). Returns one page plus the total row count
75/// for the same filter set (ignoring the cursor).
76#[derive(Debug, Default)]
77pub struct SourcePageQuery {
78    /// Resume after this slug (exclusive). `None` = first page.
79    pub after_slug: Option<String>,
80    /// Page size (validated by the route: 1..=100).
81    pub limit: i64,
82    /// Only sources created strictly after this instant.
83    pub created_after: Option<OffsetDateTime>,
84    /// Only sources created strictly before this instant.
85    pub created_before: Option<OffsetDateTime>,
86    /// Only sources of this kind (wire string, e.g. "docs_site").
87    pub kind: Option<String>,
88    /// Include retired sources (default false = active only).
89    pub include_retired: bool,
90}
91
92/// One page of sources plus pagination facts.
93#[derive(Debug)]
94pub struct SourcePage {
95    /// The page rows, ordered by slug.
96    pub sources: Vec<Source>,
97    /// Total rows matching the filters (cursor ignored).
98    pub total: i64,
99    /// Slug to resume from when more rows exist.
100    pub next_after_slug: Option<String>,
101}
102
103/// Run a [`SourcePageQuery`].
104///
105/// Extreme `limit` values are clamped rather than panicking (the `+ 1`
106/// look-ahead row saturates at `i64::MAX`).
107///
108/// # Errors
109///
110/// Returns [`crate::error::StoreError::Database`] on driver failure.
111pub async fn list_paged(pool: &PgPool, q: &SourcePageQuery) -> Result<SourcePage> {
112    fn push_filters<'a>(b: &mut sqlx::QueryBuilder<'a, sqlx::Postgres>, q: &'a SourcePageQuery) {
113        b.push(" WHERE 1=1");
114        if !q.include_retired {
115            b.push(" AND retired_at IS NULL");
116        }
117        if let Some(t) = q.created_after {
118            b.push(" AND created_at > ").push_bind(t);
119        }
120        if let Some(t) = q.created_before {
121            b.push(" AND created_at < ").push_bind(t);
122        }
123        if let Some(k) = &q.kind {
124            b.push(" AND kind = ").push_bind(k.as_str());
125        }
126    }
127
128    let mut count = sqlx::QueryBuilder::new("SELECT count(*) FROM source");
129    push_filters(&mut count, q);
130    let total: i64 = count.build_query_scalar().fetch_one(pool).await?;
131
132    let mut page = sqlx::QueryBuilder::new(
133        "SELECT id, slug, display_name, kind, origin_url, retention_count, created_at, retired_at \
134         FROM source",
135    );
136    push_filters(&mut page, q);
137    if let Some(after) = &q.after_slug {
138        page.push(" AND slug > ").push_bind(after.as_str());
139    }
140    page.push(" ORDER BY slug LIMIT ")
141        .push_bind(q.limit.saturating_add(1));
142    let rows: Vec<SourceRow> = page.build_query_as().fetch_all(pool).await?;
143
144    let page_len = usize::try_from(q.limit).unwrap_or(usize::MAX);
145    let has_more = rows.len() > page_len;
146    let sources: Vec<Source> = rows
147        .into_iter()
148        .take(page_len)
149        .map(TryInto::try_into)
150        .collect::<Result<_>>()?;
151    let next_after_slug = if has_more {
152        sources.last().map(|s| s.slug.clone())
153    } else {
154        None
155    };
156    Ok(SourcePage {
157        sources,
158        total,
159        next_after_slug,
160    })
161}
162
163/// List every source row, including retired ones, ordered by slug.
164///
165/// Distinct from [`list_active`] which filters out `retired_at IS NOT NULL`.
166/// Used by admin-tier endpoints that need full operator visibility.
167///
168/// # Errors
169///
170/// Returns [`crate::error::StoreError::Database`] on driver failure.
171pub async fn list_all(pool: &PgPool) -> Result<Vec<Source>> {
172    let rows = sqlx::query_as::<_, SourceRow>(
173        "SELECT id, slug, display_name, kind, origin_url, retention_count, created_at, retired_at \
174         FROM source ORDER BY slug",
175    )
176    .fetch_all(pool)
177    .await?;
178    rows.into_iter().map(TryInto::try_into).collect()
179}
180
181/// Sources whose ACTIVE version is NOT on `target_model_id`, ordered by best
182/// (lowest-rank) document attribution then slug. Foundation(1) → Partner(2) →
183/// ThirdParty(3) → Community(4) → Unknown(5).
184///
185/// Only non-retired sources with an active `source_version` are considered.
186/// Sources whose active version has no documents at all sort as Unknown (rank 5).
187///
188/// # Errors
189///
190/// Returns a store error on driver failure or a malformed `kind`.
191pub async fn list_active_not_on_model(pool: &PgPool, target_model_id: Uuid) -> Result<Vec<Source>> {
192    let rows = sqlx::query_as::<_, SourceRow>(
193        "SELECT s.id, s.slug, s.display_name, s.kind, s.origin_url, s.retention_count, s.created_at, s.retired_at \
194         FROM source s \
195         JOIN source_version sv ON sv.source_id = s.id AND sv.is_active = true \
196         LEFT JOIN document d ON d.source_version_id = sv.id \
197         WHERE s.retired_at IS NULL AND sv.embedding_model_id <> $1 \
198         GROUP BY s.id, s.slug, s.display_name, s.kind, s.origin_url, s.retention_count, s.created_at, s.retired_at \
199         ORDER BY MIN(CASE d.provenance->>'attribution' \
200                        WHEN 'foundation' THEN 1 WHEN 'partner' THEN 2 WHEN 'third_party' THEN 3 \
201                        WHEN 'community' THEN 4 ELSE 5 END) ASC NULLS LAST, s.slug ASC",
202    )
203    .bind(target_model_id)
204    .fetch_all(pool)
205    .await?;
206    rows.into_iter().map(TryInto::try_into).collect()
207}
208
209/// Sparse patch applied by [`update`] — `Some(value)` updates the column,
210/// `None` leaves it untouched.
211#[derive(Debug, Default, Clone)]
212pub struct SourcePatch {
213    /// New display label, when set.
214    pub display_name: Option<String>,
215    /// New origin URL, when set. (Use `Some(None)` semantics by passing an
216    /// empty string here is NOT supported — `None` here means "no change".)
217    pub origin_url: Option<String>,
218    /// New retention count, when set. The DB CHECK constraint clamps to
219    /// `[1, 50]`; callers should validate before calling.
220    pub retention_count: Option<i32>,
221}
222
223/// Apply a sparse patch to one source by slug, returning the updated row.
224///
225/// Uses `COALESCE` per-column so a single `UPDATE ... RETURNING *` covers all
226/// patch shapes — no dynamic SQL.
227///
228/// # Errors
229///
230/// Returns [`crate::error::StoreError::NotFound`] if `slug` is unknown.
231/// Returns [`crate::error::StoreError::CheckViolation`] when `retention_count`
232/// falls outside `[1, 50]`.
233pub async fn update(pool: &PgPool, slug: &str, patch: SourcePatch) -> Result<Source> {
234    let row = sqlx::query_as::<_, SourceRow>(
235        "UPDATE source SET \
236            display_name = COALESCE($2, display_name), \
237            origin_url = COALESCE($3, origin_url), \
238            retention_count = COALESCE($4, retention_count) \
239         WHERE slug = $1 \
240         RETURNING id, slug, display_name, kind, origin_url, retention_count, created_at, retired_at",
241    )
242    .bind(slug)
243    .bind(patch.display_name)
244    .bind(patch.origin_url)
245    .bind(patch.retention_count)
246    .fetch_optional(pool)
247    .await?;
248    row.map_or(Err(crate::error::StoreError::NotFound), TryInto::try_into)
249}
250
251/// Hard-delete sources whose `retired_at` is older than `grace_seconds`.
252///
253/// Cascades through `source_version` → `chunk` / `document` / `node` /
254/// `package` via the existing `ON DELETE CASCADE` foreign keys, so one
255/// DELETE removes the whole subtree. Returns the slugs that were deleted
256/// (sorted ascending for stable logging).
257///
258/// # Errors
259///
260/// Returns [`crate::error::StoreError::Database`] on driver failure. A
261/// `grace_seconds` value of 0 means "delete anything currently retired";
262/// negative values are clamped to 0.
263pub async fn sweep_retired(pool: &PgPool, grace_seconds: i64) -> Result<Vec<String>> {
264    let grace = grace_seconds.max(0);
265    let rows: Vec<(String,)> = sqlx::query_as(
266        "DELETE FROM source \
267         WHERE retired_at IS NOT NULL \
268           AND retired_at < now() - ($1::bigint * interval '1 second') \
269         RETURNING slug",
270    )
271    .bind(grace)
272    .fetch_all(pool)
273    .await?;
274    let mut slugs: Vec<String> = rows.into_iter().map(|(s,)| s).collect();
275    slugs.sort();
276    Ok(slugs)
277}
278
279/// Mark a source as retired (idempotent: setting an already-retired row is a no-op).
280///
281/// # Errors
282///
283/// Returns [`crate::error::StoreError::NotFound`] if `slug` is unknown.
284pub async fn retire(pool: &PgPool, slug: &str) -> Result<()> {
285    let result =
286        sqlx::query("UPDATE source SET retired_at = COALESCE(retired_at, now()) WHERE slug = $1")
287            .bind(slug)
288            .execute(pool)
289            .await?;
290    if result.rows_affected() == 0 {
291        return Err(crate::error::StoreError::NotFound);
292    }
293    Ok(())
294}
295
296#[derive(sqlx::FromRow)]
297struct SourceRow {
298    id: Uuid,
299    slug: String,
300    display_name: String,
301    kind: String,
302    origin_url: Option<String>,
303    retention_count: i32,
304    created_at: OffsetDateTime,
305    retired_at: Option<OffsetDateTime>,
306}
307
308impl TryFrom<SourceRow> for Source {
309    type Error = crate::error::StoreError;
310
311    fn try_from(r: SourceRow) -> std::result::Result<Self, Self::Error> {
312        let kind: SourceKind = serde_json::from_value(serde_json::Value::String(r.kind))
313            .map_err(|e| crate::error::StoreError::Json(e.to_string()))?;
314        Ok(Self {
315            id: r.id,
316            slug: r.slug,
317            display_name: r.display_name,
318            kind,
319            origin_url: r.origin_url,
320            retention_count: r.retention_count,
321            created_at: r.created_at,
322            retired_at: r.retired_at,
323        })
324    }
325}