1use mnm_core::types::{Source, SourceKind};
4use sqlx::PgPool;
5use time::OffsetDateTime;
6use uuid::Uuid;
7
8use crate::error::Result;
9
10pub async fn insert(
16 pool: &PgPool,
17 slug: &str,
18 display_name: &str,
19 kind: SourceKind,
20 origin_url: Option<&str>,
21 retention_count: i32,
22) -> Result<Uuid> {
23 let row: (Uuid,) = sqlx::query_as(
24 "INSERT INTO source (slug, display_name, kind, origin_url, retention_count) \
25 VALUES ($1, $2, $3, $4, $5) RETURNING id",
26 )
27 .bind(slug)
28 .bind(display_name)
29 .bind(
30 serde_json::to_value(kind)
31 .expect("SourceKind serializes")
32 .as_str()
33 .unwrap(),
34 )
35 .bind(origin_url)
36 .bind(retention_count)
37 .fetch_one(pool)
38 .await?;
39 Ok(row.0)
40}
41
42pub async fn get_by_slug(pool: &PgPool, slug: &str) -> Result<Source> {
48 let row = sqlx::query_as::<_, SourceRow>(
49 "SELECT id, slug, display_name, kind, origin_url, retention_count, created_at, retired_at \
50 FROM source WHERE slug = $1",
51 )
52 .bind(slug)
53 .fetch_one(pool)
54 .await?;
55 row.try_into()
56}
57
58pub async fn list_active(pool: &PgPool) -> Result<Vec<Source>> {
64 let rows = sqlx::query_as::<_, SourceRow>(
65 "SELECT id, slug, display_name, kind, origin_url, retention_count, created_at, retired_at \
66 FROM source WHERE retired_at IS NULL ORDER BY slug",
67 )
68 .fetch_all(pool)
69 .await?;
70 rows.into_iter().map(TryInto::try_into).collect()
71}
72
73#[derive(Debug, Default)]
77pub struct SourcePageQuery {
78 pub after_slug: Option<String>,
80 pub limit: i64,
82 pub created_after: Option<OffsetDateTime>,
84 pub created_before: Option<OffsetDateTime>,
86 pub kind: Option<String>,
88 pub include_retired: bool,
90}
91
92#[derive(Debug)]
94pub struct SourcePage {
95 pub sources: Vec<Source>,
97 pub total: i64,
99 pub next_after_slug: Option<String>,
101}
102
103pub async fn list_paged(pool: &PgPool, q: &SourcePageQuery) -> Result<SourcePage> {
112 fn push_filters<'a>(b: &mut sqlx::QueryBuilder<'a, sqlx::Postgres>, q: &'a SourcePageQuery) {
113 b.push(" WHERE 1=1");
114 if !q.include_retired {
115 b.push(" AND retired_at IS NULL");
116 }
117 if let Some(t) = q.created_after {
118 b.push(" AND created_at > ").push_bind(t);
119 }
120 if let Some(t) = q.created_before {
121 b.push(" AND created_at < ").push_bind(t);
122 }
123 if let Some(k) = &q.kind {
124 b.push(" AND kind = ").push_bind(k.as_str());
125 }
126 }
127
128 let mut count = sqlx::QueryBuilder::new("SELECT count(*) FROM source");
129 push_filters(&mut count, q);
130 let total: i64 = count.build_query_scalar().fetch_one(pool).await?;
131
132 let mut page = sqlx::QueryBuilder::new(
133 "SELECT id, slug, display_name, kind, origin_url, retention_count, created_at, retired_at \
134 FROM source",
135 );
136 push_filters(&mut page, q);
137 if let Some(after) = &q.after_slug {
138 page.push(" AND slug > ").push_bind(after.as_str());
139 }
140 page.push(" ORDER BY slug LIMIT ")
141 .push_bind(q.limit.saturating_add(1));
142 let rows: Vec<SourceRow> = page.build_query_as().fetch_all(pool).await?;
143
144 let page_len = usize::try_from(q.limit).unwrap_or(usize::MAX);
145 let has_more = rows.len() > page_len;
146 let sources: Vec<Source> = rows
147 .into_iter()
148 .take(page_len)
149 .map(TryInto::try_into)
150 .collect::<Result<_>>()?;
151 let next_after_slug = if has_more {
152 sources.last().map(|s| s.slug.clone())
153 } else {
154 None
155 };
156 Ok(SourcePage {
157 sources,
158 total,
159 next_after_slug,
160 })
161}
162
163pub async fn list_all(pool: &PgPool) -> Result<Vec<Source>> {
172 let rows = sqlx::query_as::<_, SourceRow>(
173 "SELECT id, slug, display_name, kind, origin_url, retention_count, created_at, retired_at \
174 FROM source ORDER BY slug",
175 )
176 .fetch_all(pool)
177 .await?;
178 rows.into_iter().map(TryInto::try_into).collect()
179}
180
181pub async fn list_active_not_on_model(pool: &PgPool, target_model_id: Uuid) -> Result<Vec<Source>> {
192 let rows = sqlx::query_as::<_, SourceRow>(
193 "SELECT s.id, s.slug, s.display_name, s.kind, s.origin_url, s.retention_count, s.created_at, s.retired_at \
194 FROM source s \
195 JOIN source_version sv ON sv.source_id = s.id AND sv.is_active = true \
196 LEFT JOIN document d ON d.source_version_id = sv.id \
197 WHERE s.retired_at IS NULL AND sv.embedding_model_id <> $1 \
198 GROUP BY s.id, s.slug, s.display_name, s.kind, s.origin_url, s.retention_count, s.created_at, s.retired_at \
199 ORDER BY MIN(CASE d.provenance->>'attribution' \
200 WHEN 'foundation' THEN 1 WHEN 'partner' THEN 2 WHEN 'third_party' THEN 3 \
201 WHEN 'community' THEN 4 ELSE 5 END) ASC NULLS LAST, s.slug ASC",
202 )
203 .bind(target_model_id)
204 .fetch_all(pool)
205 .await?;
206 rows.into_iter().map(TryInto::try_into).collect()
207}
208
209#[derive(Debug, Default, Clone)]
212pub struct SourcePatch {
213 pub display_name: Option<String>,
215 pub origin_url: Option<String>,
218 pub retention_count: Option<i32>,
221}
222
223pub async fn update(pool: &PgPool, slug: &str, patch: SourcePatch) -> Result<Source> {
234 let row = sqlx::query_as::<_, SourceRow>(
235 "UPDATE source SET \
236 display_name = COALESCE($2, display_name), \
237 origin_url = COALESCE($3, origin_url), \
238 retention_count = COALESCE($4, retention_count) \
239 WHERE slug = $1 \
240 RETURNING id, slug, display_name, kind, origin_url, retention_count, created_at, retired_at",
241 )
242 .bind(slug)
243 .bind(patch.display_name)
244 .bind(patch.origin_url)
245 .bind(patch.retention_count)
246 .fetch_optional(pool)
247 .await?;
248 row.map_or(Err(crate::error::StoreError::NotFound), TryInto::try_into)
249}
250
251pub async fn sweep_retired(pool: &PgPool, grace_seconds: i64) -> Result<Vec<String>> {
264 let grace = grace_seconds.max(0);
265 let rows: Vec<(String,)> = sqlx::query_as(
266 "DELETE FROM source \
267 WHERE retired_at IS NOT NULL \
268 AND retired_at < now() - ($1::bigint * interval '1 second') \
269 RETURNING slug",
270 )
271 .bind(grace)
272 .fetch_all(pool)
273 .await?;
274 let mut slugs: Vec<String> = rows.into_iter().map(|(s,)| s).collect();
275 slugs.sort();
276 Ok(slugs)
277}
278
279pub async fn retire(pool: &PgPool, slug: &str) -> Result<()> {
285 let result =
286 sqlx::query("UPDATE source SET retired_at = COALESCE(retired_at, now()) WHERE slug = $1")
287 .bind(slug)
288 .execute(pool)
289 .await?;
290 if result.rows_affected() == 0 {
291 return Err(crate::error::StoreError::NotFound);
292 }
293 Ok(())
294}
295
296#[derive(sqlx::FromRow)]
297struct SourceRow {
298 id: Uuid,
299 slug: String,
300 display_name: String,
301 kind: String,
302 origin_url: Option<String>,
303 retention_count: i32,
304 created_at: OffsetDateTime,
305 retired_at: Option<OffsetDateTime>,
306}
307
308impl TryFrom<SourceRow> for Source {
309 type Error = crate::error::StoreError;
310
311 fn try_from(r: SourceRow) -> std::result::Result<Self, Self::Error> {
312 let kind: SourceKind = serde_json::from_value(serde_json::Value::String(r.kind))
313 .map_err(|e| crate::error::StoreError::Json(e.to_string()))?;
314 Ok(Self {
315 id: r.id,
316 slug: r.slug,
317 display_name: r.display_name,
318 kind,
319 origin_url: r.origin_url,
320 retention_count: r.retention_count,
321 created_at: r.created_at,
322 retired_at: r.retired_at,
323 })
324 }
325}