vein_adapter/cache/
postgres.rs

1use anyhow::{Context, Result};
2use chrono::{DateTime, Duration, Utc};
3use sqlx::{
4    PgPool, Transaction,
5    postgres::{PgPoolOptions, Postgres},
6};
7
8use super::{
9    CacheBackend, GemVersion, QuarantineStats, VersionStatus,
10    models::{DbGemMetadataRow, PostgresCachedAssetRow, PostgresGemVersionRow, format_timestamp},
11    serialization::{hydrate_metadata_row, parse_language_rows, prepare_metadata_strings},
12    types::{AssetKey, CachedAsset, GemMetadata, IndexStats, SbomCoverage},
13};
14
15#[derive(Debug, Clone)]
16pub struct PostgresCacheBackend {
17    pool: PgPool,
18}
19
20impl PostgresCacheBackend {
21    pub async fn connect(url: &str, max_connections: u32) -> Result<Self> {
22        let pool = PgPoolOptions::new()
23            .max_connections(max_connections.max(1))
24            .connect(url)
25            .await
26            .with_context(|| format!("connecting to postgres database {}", url))?;
27        Ok(Self { pool })
28    }
29
30    async fn touch(&self, key: &AssetKey<'_>) -> Result<()> {
31        sqlx::query(
32            r#"
33            UPDATE cached_assets
34            SET last_accessed = NOW()
35            WHERE kind = $1 AND name = $2 AND version = $3 AND
36                  ((platform IS NULL AND $4 IS NULL) OR platform = $4)
37            "#,
38        )
39        .bind(key.kind.as_str())
40        .bind(key.name)
41        .bind(key.version)
42        .bind(key.platform)
43        .execute(&self.pool)
44        .await
45        .context("updating last_accessed (postgres)")?;
46        Ok(())
47    }
48
49    pub async fn fetch_gem_metadata(
50        &self,
51        name: &str,
52        version: &str,
53        platform: Option<&str>,
54    ) -> Result<Option<GemMetadata>> {
55        let record = sqlx::query_as::<_, DbGemMetadataRow>(
56            r#"
57            SELECT
58                name,
59                version,
60                platform,
61                summary,
62                description,
63                licenses,
64                authors,
65                emails,
66                homepage,
67                documentation_url,
68                changelog_url,
69                source_code_url,
70                bug_tracker_url,
71                wiki_url,
72                funding_url,
73                metadata_json,
74                dependencies_json,
75                executables_json,
76                extensions_json,
77                native_languages_json,
78                has_native_extensions,
79                has_embedded_binaries,
80                required_ruby_version,
81                required_rubygems_version,
82                rubygems_version,
83                specification_version,
84                built_at,
85                size_bytes,
86                sha256,
87                sbom_json
88            FROM gem_metadata
89            WHERE name = $1
90              AND version = $2
91              AND ((platform IS NULL AND $3 IS NULL) OR platform = $3)
92            "#,
93        )
94        .bind(name)
95        .bind(version)
96        .bind(platform)
97        .fetch_optional(&self.pool)
98        .await
99        .context("fetching gem metadata record (postgres)")?;
100
101        match record {
102            Some(row) => hydrate_metadata_row(row).map(Some),
103            None => Ok(None),
104        }
105    }
106
107    pub async fn sbom_coverage_stats(&self) -> Result<SbomCoverage> {
108        let (total, with_sbom) = sqlx::query_as::<_, (i64, i64)>(
109            r#"
110            SELECT
111                COUNT(*) as total,
112                COALESCE(
113                    SUM(CASE WHEN sbom_json IS NOT NULL AND sbom_json <> ''
114                        THEN 1 ELSE 0 END),
115                    0
116                ) as with_sbom
117            FROM gem_metadata
118            "#,
119        )
120        .fetch_one(&self.pool)
121        .await
122        .context("querying SBOM coverage (postgres)")?;
123
124        Ok(SbomCoverage {
125            metadata_rows: total.max(0) as u64,
126            with_sbom: with_sbom.max(0) as u64,
127        })
128    }
129
130    pub async fn catalog_languages_list(&self) -> Result<Vec<String>> {
131        let rows = sqlx::query_scalar::<_, Option<String>>(
132            r#"
133            SELECT native_languages_json
134            FROM gem_metadata
135            WHERE native_languages_json IS NOT NULL AND native_languages_json <> ''
136            "#,
137        )
138        .fetch_all(&self.pool)
139        .await
140        .context("fetching native languages (postgres)")?;
141
142        parse_language_rows(rows)
143    }
144
145    async fn begin_tx(&self) -> Result<Transaction<'_, Postgres>> {
146        self.pool
147            .begin()
148            .await
149            .context("starting postgres transaction")
150    }
151
152    pub async fn upsert_gem_metadata_record(&self, metadata: &GemMetadata) -> Result<()> {
153        let prepared = prepare_metadata_strings(metadata)?;
154
155        sqlx::query(
156            r#"
157            INSERT INTO gem_metadata(
158                name,
159                version,
160                platform,
161                summary,
162                description,
163                licenses,
164                authors,
165                emails,
166                homepage,
167                documentation_url,
168                changelog_url,
169                source_code_url,
170                bug_tracker_url,
171                wiki_url,
172                funding_url,
173                metadata_json,
174                dependencies_json,
175                executables_json,
176                extensions_json,
177                native_languages_json,
178                has_native_extensions,
179                has_embedded_binaries,
180                required_ruby_version,
181                required_rubygems_version,
182                rubygems_version,
183                specification_version,
184                built_at,
185                size_bytes,
186                sha256,
187                sbom_json
188            ) VALUES (
189                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
190                $11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
191                $21, $22, $23, $24, $25, $26, $27, $28, $29, $30
192            )
193            ON CONFLICT ON CONSTRAINT gem_metadata_pkey
194            DO UPDATE SET
195                summary = EXCLUDED.summary,
196                description = EXCLUDED.description,
197                licenses = EXCLUDED.licenses,
198                authors = EXCLUDED.authors,
199                emails = EXCLUDED.emails,
200                homepage = EXCLUDED.homepage,
201                documentation_url = EXCLUDED.documentation_url,
202                changelog_url = EXCLUDED.changelog_url,
203                source_code_url = EXCLUDED.source_code_url,
204                bug_tracker_url = EXCLUDED.bug_tracker_url,
205                wiki_url = EXCLUDED.wiki_url,
206                funding_url = EXCLUDED.funding_url,
207                metadata_json = EXCLUDED.metadata_json,
208                dependencies_json = EXCLUDED.dependencies_json,
209                executables_json = EXCLUDED.executables_json,
210                extensions_json = EXCLUDED.extensions_json,
211                native_languages_json = EXCLUDED.native_languages_json,
212                has_native_extensions = EXCLUDED.has_native_extensions,
213                has_embedded_binaries = EXCLUDED.has_embedded_binaries,
214                required_ruby_version = EXCLUDED.required_ruby_version,
215                required_rubygems_version = EXCLUDED.required_rubygems_version,
216                rubygems_version = EXCLUDED.rubygems_version,
217                specification_version = EXCLUDED.specification_version,
218                built_at = EXCLUDED.built_at,
219                size_bytes = EXCLUDED.size_bytes,
220                sha256 = EXCLUDED.sha256,
221                sbom_json = EXCLUDED.sbom_json
222            "#,
223        )
224        .bind(&metadata.name)
225        .bind(&metadata.version)
226        .bind(metadata.platform.as_deref())
227        .bind(metadata.summary.as_deref())
228        .bind(metadata.description.as_deref())
229        .bind(prepared.licenses_json)
230        .bind(prepared.authors_json)
231        .bind(prepared.emails_json)
232        .bind(metadata.homepage.as_deref())
233        .bind(metadata.documentation_url.as_deref())
234        .bind(metadata.changelog_url.as_deref())
235        .bind(metadata.source_code_url.as_deref())
236        .bind(metadata.bug_tracker_url.as_deref())
237        .bind(metadata.wiki_url.as_deref())
238        .bind(metadata.funding_url.as_deref())
239        .bind(prepared.metadata_json)
240        .bind(prepared.dependencies_json)
241        .bind(prepared.executables_json)
242        .bind(prepared.extensions_json)
243        .bind(prepared.native_languages_json)
244        .bind(metadata.has_native_extensions)
245        .bind(metadata.has_embedded_binaries)
246        .bind(metadata.required_ruby_version.as_deref())
247        .bind(metadata.required_rubygems_version.as_deref())
248        .bind(metadata.rubygems_version.as_deref())
249        .bind(metadata.specification_version)
250        .bind(metadata.built_at.as_deref())
251        .bind(prepared.size_bytes)
252        .bind(&metadata.sha256)
253        .bind(prepared.sbom_json)
254        .execute(&self.pool)
255        .await
256        .context("upserting gem metadata (postgres)")?;
257
258        Ok(())
259    }
260}
261
262impl CacheBackend for PostgresCacheBackend {
263    async fn get(&self, key: &AssetKey<'_>) -> Result<Option<CachedAsset>> {
264        let record = sqlx::query_as::<_, PostgresCachedAssetRow>(
265            r#"
266            SELECT path, sha256, size_bytes, last_accessed
267            FROM cached_assets
268            WHERE kind = $1 AND name = $2 AND version = $3 AND
269                  ((platform IS NULL AND $4 IS NULL) OR platform = $4)
270            "#,
271        )
272        .bind(key.kind.as_str())
273        .bind(key.name)
274        .bind(key.version)
275        .bind(key.platform)
276        .fetch_optional(&self.pool)
277        .await
278        .context("fetching cached asset (postgres)")?;
279
280        if let Some(row) = record {
281            self.touch(key).await?;
282            Ok(Some(row.into()))
283        } else {
284            Ok(None)
285        }
286    }
287
288    async fn insert_or_replace(
289        &self,
290        key: &AssetKey<'_>,
291        path: &str,
292        sha256: &str,
293        size_bytes: u64,
294    ) -> Result<()> {
295        sqlx::query(
296            r#"
297            INSERT INTO cached_assets(
298                kind, name, version, platform, path, sha256, size_bytes, last_accessed
299            )
300            VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())
301            ON CONFLICT ON CONSTRAINT cached_assets_unique
302            DO UPDATE SET
303                path = EXCLUDED.path,
304                sha256 = EXCLUDED.sha256,
305                size_bytes = EXCLUDED.size_bytes,
306                last_accessed = EXCLUDED.last_accessed
307            "#,
308        )
309        .bind(key.kind.as_str())
310        .bind(key.name)
311        .bind(key.version)
312        .bind(key.platform)
313        .bind(path)
314        .bind(sha256)
315        .bind(size_bytes as i64)
316        .execute(&self.pool)
317        .await
318        .context("inserting cached asset (postgres)")?;
319        Ok(())
320    }
321
322    async fn get_all_gems(&self) -> Result<Vec<(String, String)>> {
323        let rows = sqlx::query_as::<_, (String, String)>(
324            r#"
325            SELECT DISTINCT name, version
326            FROM cached_assets
327            WHERE kind = 'gem'
328            ORDER BY name, version
329            "#,
330        )
331        .fetch_all(&self.pool)
332        .await
333        .context("fetching all gems (postgres)")?;
334
335        Ok(rows)
336    }
337
338    async fn stats(&self) -> Result<IndexStats> {
339        let total_assets: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM cached_assets")
340            .fetch_one(&self.pool)
341            .await
342            .context("counting cached assets (postgres)")?;
343
344        let gem_assets: i64 =
345            sqlx::query_scalar("SELECT COUNT(*) FROM cached_assets WHERE kind = 'gem'")
346                .fetch_one(&self.pool)
347                .await
348                .context("counting gem assets (postgres)")?;
349
350        let spec_assets: i64 =
351            sqlx::query_scalar("SELECT COUNT(*) FROM cached_assets WHERE kind = 'gemspec'")
352                .fetch_one(&self.pool)
353                .await
354                .context("counting gemspec assets (postgres)")?;
355
356        let unique_gems: i64 =
357            sqlx::query_scalar("SELECT COUNT(DISTINCT name) FROM cached_assets WHERE kind = 'gem'")
358                .fetch_one(&self.pool)
359                .await
360                .context("counting unique gems (postgres)")?;
361
362        let total_size_bytes: i64 =
363            sqlx::query_scalar("SELECT COALESCE(SUM(size_bytes), 0) FROM cached_assets")
364                .fetch_one(&self.pool)
365                .await
366                .context("summing cached asset sizes (postgres)")?;
367
368        let last_accessed: Option<DateTime<Utc>> =
369            sqlx::query_scalar("SELECT MAX(last_accessed) FROM cached_assets")
370                .fetch_one(&self.pool)
371                .await
372                .context("fetching last access timestamp (postgres)")?;
373
374        Ok(IndexStats {
375            total_assets: total_assets.max(0) as u64,
376            gem_assets: gem_assets.max(0) as u64,
377            spec_assets: spec_assets.max(0) as u64,
378            unique_gems: unique_gems.max(0) as u64,
379            total_size_bytes: total_size_bytes.max(0) as u64,
380            last_accessed: last_accessed.map(format_timestamp),
381        })
382    }
383
384    async fn catalog_upsert_names(&self, names: &[String]) -> Result<()> {
385        if names.is_empty() {
386            return Ok(());
387        }
388        let mut tx = self.begin_tx().await?;
389        for name in names {
390            sqlx::query(
391                r#"
392                INSERT INTO catalog_gems(name, synced_at)
393                VALUES($1, NOW())
394                ON CONFLICT(name) DO UPDATE SET synced_at = EXCLUDED.synced_at
395                "#,
396            )
397            .bind(name)
398            .execute(&mut *tx)
399            .await
400            .with_context(|| format!("upserting catalog entry {} (postgres)", name))?;
401        }
402        tx.commit()
403            .await
404            .context("committing catalog upsert (postgres)")?;
405        Ok(())
406    }
407
408    async fn catalog_total(&self) -> Result<u64> {
409        let total: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM catalog_gems")
410            .fetch_one(&self.pool)
411            .await
412            .context("counting catalog gems (postgres)")?;
413        Ok(total.max(0) as u64)
414    }
415
416    async fn catalog_page(&self, offset: i64, limit: i64) -> Result<Vec<String>> {
417        let rows = sqlx::query_scalar::<_, String>(
418            r#"
419            SELECT name
420            FROM catalog_gems
421            ORDER BY name
422            LIMIT $1 OFFSET $2
423            "#,
424        )
425        .bind(limit)
426        .bind(offset)
427        .fetch_all(&self.pool)
428        .await
429        .context("fetching catalog page (postgres)")?;
430        Ok(rows)
431    }
432
433    async fn catalog_meta_get(&self, key: &str) -> Result<Option<String>> {
434        let value =
435            sqlx::query_scalar::<_, String>("SELECT value FROM catalog_meta WHERE key = $1")
436                .bind(key)
437                .fetch_optional(&self.pool)
438                .await
439                .context("fetching catalog meta value (postgres)")?;
440        Ok(value)
441    }
442
443    async fn catalog_meta_set(&self, key: &str, value: &str) -> Result<()> {
444        sqlx::query(
445            r#"
446            INSERT INTO catalog_meta(key, value)
447            VALUES($1, $2)
448            ON CONFLICT(key) DO UPDATE SET value = EXCLUDED.value
449            "#,
450        )
451        .bind(key)
452        .bind(value)
453        .execute(&self.pool)
454        .await
455        .context("upserting catalog meta value (postgres)")?;
456        Ok(())
457    }
458
459    async fn upsert_metadata(&self, metadata: &GemMetadata) -> Result<()> {
460        self.upsert_gem_metadata_record(metadata).await
461    }
462
463    async fn gem_metadata(
464        &self,
465        name: &str,
466        version: &str,
467        platform: Option<&str>,
468    ) -> Result<Option<GemMetadata>> {
469        self.fetch_gem_metadata(name, version, platform).await
470    }
471
472    async fn sbom_coverage(&self) -> Result<SbomCoverage> {
473        self.sbom_coverage_stats().await
474    }
475
476    async fn catalog_languages(&self) -> Result<Vec<String>> {
477        self.catalog_languages_list().await
478    }
479
480    async fn catalog_page_by_language(
481        &self,
482        language: &str,
483        offset: i64,
484        limit: i64,
485    ) -> Result<Vec<String>> {
486        let pattern = format!("%\"{}\"%", language);
487        let rows = sqlx::query_scalar::<_, String>(
488            r#"
489            SELECT DISTINCT name
490            FROM gem_metadata
491            WHERE native_languages_json LIKE $1
492            ORDER BY name
493            LIMIT $2 OFFSET $3
494            "#,
495        )
496        .bind(pattern)
497        .bind(limit)
498        .bind(offset)
499        .fetch_all(&self.pool)
500        .await
501        .context("fetching catalog page by language (postgres)")?;
502        Ok(rows)
503    }
504
505    async fn catalog_total_by_language(&self, language: &str) -> Result<u64> {
506        let pattern = format!("%\"{}\"%", language);
507        let total: i64 = sqlx::query_scalar(
508            r#"
509            SELECT COUNT(DISTINCT name)
510            FROM gem_metadata
511            WHERE native_languages_json LIKE $1
512            "#,
513        )
514        .bind(pattern)
515        .fetch_one(&self.pool)
516        .await
517        .context("counting catalog gems by language (postgres)")?;
518        Ok(total.max(0) as u64)
519    }
520
521    // ==================== Quarantine Methods ====================
522
523    async fn get_gem_version(
524        &self,
525        name: &str,
526        version: &str,
527        platform: Option<&str>,
528    ) -> Result<Option<GemVersion>> {
529        let row = sqlx::query_as::<_, PostgresGemVersionRow>(
530            r#"
531            SELECT id, name, version, platform, sha256, published_at, available_after,
532                   status, status_reason, upstream_yanked, created_at, updated_at
533            FROM gem_versions
534            WHERE name = $1
535              AND version = $2
536              AND ((platform IS NULL AND $3 IS NULL) OR platform = $3)
537            "#,
538        )
539        .bind(name)
540        .bind(version)
541        .bind(platform)
542        .fetch_optional(&self.pool)
543        .await
544        .context("fetching gem version (postgres)")?;
545
546        Ok(row.map(Into::into))
547    }
548
549    async fn upsert_gem_version(&self, gem_version: &GemVersion) -> Result<()> {
550        sqlx::query(
551            r#"
552            INSERT INTO gem_versions (
553                name, version, platform, sha256, published_at, available_after,
554                status, status_reason, upstream_yanked, created_at, updated_at
555            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW())
556            ON CONFLICT ON CONSTRAINT gem_versions_unique
557            DO UPDATE SET
558                sha256 = EXCLUDED.sha256,
559                published_at = EXCLUDED.published_at,
560                available_after = EXCLUDED.available_after,
561                status = EXCLUDED.status,
562                status_reason = EXCLUDED.status_reason,
563                upstream_yanked = EXCLUDED.upstream_yanked,
564                updated_at = NOW()
565            "#,
566        )
567        .bind(&gem_version.name)
568        .bind(&gem_version.version)
569        .bind(gem_version.platform.as_deref())
570        .bind(gem_version.sha256.as_deref())
571        .bind(gem_version.published_at)
572        .bind(gem_version.available_after)
573        .bind(gem_version.status.to_string())
574        .bind(gem_version.status_reason.as_deref())
575        .bind(gem_version.upstream_yanked)
576        .bind(gem_version.created_at)
577        .execute(&self.pool)
578        .await
579        .context("upserting gem version (postgres)")?;
580
581        Ok(())
582    }
583
584    async fn get_latest_available_version(
585        &self,
586        name: &str,
587        platform: Option<&str>,
588        now: DateTime<Utc>,
589    ) -> Result<Option<GemVersion>> {
590        // Get all available versions and sort in Rust for proper semver comparison
591        let rows = sqlx::query_as::<_, PostgresGemVersionRow>(
592            r#"
593            SELECT id, name, version, platform, sha256, published_at, available_after,
594                   status, status_reason, upstream_yanked, created_at, updated_at
595            FROM gem_versions
596            WHERE name = $1
597              AND ((platform IS NULL AND $2 IS NULL) OR platform = $2)
598              AND upstream_yanked = FALSE
599              AND (status = 'available' OR status = 'pinned'
600                   OR (status = 'quarantine' AND available_after <= $3))
601            "#,
602        )
603        .bind(name)
604        .bind(platform)
605        .bind(now)
606        .fetch_all(&self.pool)
607        .await
608        .context("fetching available versions (postgres)")?;
609
610        // Find the latest version using semver comparison
611        let mut versions: Vec<GemVersion> = rows.into_iter().map(Into::into).collect();
612        versions.sort_by(|a, b| compare_versions(&b.version, &a.version));
613
614        Ok(versions.into_iter().next())
615    }
616
617    async fn get_quarantined_versions(
618        &self,
619        name: &str,
620        now: DateTime<Utc>,
621    ) -> Result<Vec<GemVersion>> {
622        let rows = sqlx::query_as::<_, PostgresGemVersionRow>(
623            r#"
624            SELECT id, name, version, platform, sha256, published_at, available_after,
625                   status, status_reason, upstream_yanked, created_at, updated_at
626            FROM gem_versions
627            WHERE name = $1
628              AND status = 'quarantine'
629              AND available_after > $2
630            ORDER BY version DESC
631            "#,
632        )
633        .bind(name)
634        .bind(now)
635        .fetch_all(&self.pool)
636        .await
637        .context("fetching quarantined versions (postgres)")?;
638
639        Ok(rows.into_iter().map(Into::into).collect())
640    }
641
642    async fn update_version_status(
643        &self,
644        name: &str,
645        version: &str,
646        platform: Option<&str>,
647        status: VersionStatus,
648        reason: Option<String>,
649    ) -> Result<()> {
650        sqlx::query(
651            r#"
652            UPDATE gem_versions
653            SET status = $1, status_reason = $2, updated_at = NOW()
654            WHERE name = $3
655              AND version = $4
656              AND ((platform IS NULL AND $5 IS NULL) OR platform = $5)
657            "#,
658        )
659        .bind(status.to_string())
660        .bind(reason)
661        .bind(name)
662        .bind(version)
663        .bind(platform)
664        .execute(&self.pool)
665        .await
666        .context("updating version status (postgres)")?;
667
668        Ok(())
669    }
670
671    async fn promote_expired_quarantines(&self, now: DateTime<Utc>) -> Result<u64> {
672        let result = sqlx::query(
673            r#"
674            UPDATE gem_versions
675            SET status = 'available', status_reason = 'auto-promoted', updated_at = NOW()
676            WHERE status = 'quarantine'
677              AND available_after <= $1
678            "#,
679        )
680        .bind(now)
681        .execute(&self.pool)
682        .await
683        .context("promoting expired quarantines (postgres)")?;
684
685        Ok(result.rows_affected())
686    }
687
688    async fn mark_yanked(&self, name: &str, version: &str) -> Result<()> {
689        sqlx::query(
690            r#"
691            UPDATE gem_versions
692            SET status = 'yanked', upstream_yanked = TRUE, updated_at = NOW()
693            WHERE name = $1 AND version = $2
694            "#,
695        )
696        .bind(name)
697        .bind(version)
698        .execute(&self.pool)
699        .await
700        .context("marking version yanked (postgres)")?;
701
702        Ok(())
703    }
704
705    async fn get_all_quarantined(&self, limit: u32, offset: u32) -> Result<Vec<GemVersion>> {
706        let rows = sqlx::query_as::<_, PostgresGemVersionRow>(
707            r#"
708            SELECT id, name, version, platform, sha256, published_at, available_after,
709                   status, status_reason, upstream_yanked, created_at, updated_at
710            FROM gem_versions
711            WHERE status = 'quarantine'
712            ORDER BY available_after ASC
713            LIMIT $1 OFFSET $2
714            "#,
715        )
716        .bind(limit as i64)
717        .bind(offset as i64)
718        .fetch_all(&self.pool)
719        .await
720        .context("fetching all quarantined (postgres)")?;
721
722        Ok(rows.into_iter().map(Into::into).collect())
723    }
724
725    async fn quarantine_stats(&self) -> Result<QuarantineStats> {
726        let now = Utc::now();
727        let today_end = now + Duration::days(1);
728        let week_end = now + Duration::days(7);
729
730        let (quarantined, available, yanked, pinned): (i64, i64, i64, i64) = sqlx::query_as(
731            r#"
732            SELECT
733                COALESCE(SUM(CASE WHEN status = 'quarantine' THEN 1 ELSE 0 END), 0),
734                COALESCE(SUM(CASE WHEN status = 'available' THEN 1 ELSE 0 END), 0),
735                COALESCE(SUM(CASE WHEN status = 'yanked' THEN 1 ELSE 0 END), 0),
736                COALESCE(SUM(CASE WHEN status = 'pinned' THEN 1 ELSE 0 END), 0)
737            FROM gem_versions
738            "#,
739        )
740        .fetch_one(&self.pool)
741        .await
742        .context("fetching quarantine counts (postgres)")?;
743
744        let releasing_today: i64 = sqlx::query_scalar(
745            r#"
746            SELECT COUNT(*)
747            FROM gem_versions
748            WHERE status = 'quarantine'
749              AND available_after > $1
750              AND available_after <= $2
751            "#,
752        )
753        .bind(now)
754        .bind(today_end)
755        .fetch_one(&self.pool)
756        .await
757        .context("counting versions releasing today (postgres)")?;
758
759        let releasing_week: i64 = sqlx::query_scalar(
760            r#"
761            SELECT COUNT(*)
762            FROM gem_versions
763            WHERE status = 'quarantine'
764              AND available_after > $1
765              AND available_after <= $2
766            "#,
767        )
768        .bind(now)
769        .bind(week_end)
770        .fetch_one(&self.pool)
771        .await
772        .context("counting versions releasing this week (postgres)")?;
773
774        Ok(QuarantineStats {
775            total_quarantined: quarantined.max(0) as u64,
776            total_available: available.max(0) as u64,
777            total_yanked: yanked.max(0) as u64,
778            total_pinned: pinned.max(0) as u64,
779            versions_releasing_today: releasing_today.max(0) as u64,
780            versions_releasing_this_week: releasing_week.max(0) as u64,
781        })
782    }
783
784    async fn get_gem_versions_for_index(&self, name: &str) -> Result<Vec<GemVersion>> {
785        let rows = sqlx::query_as::<_, PostgresGemVersionRow>(
786            r#"
787            SELECT id, name, version, platform, sha256, published_at, available_after,
788                   status, status_reason, upstream_yanked, created_at, updated_at
789            FROM gem_versions
790            WHERE name = $1
791            ORDER BY version DESC
792            "#,
793        )
794        .bind(name)
795        .fetch_all(&self.pool)
796        .await
797        .context("fetching gem versions for index (postgres)")?;
798
799        Ok(rows.into_iter().map(Into::into).collect())
800    }
801
802    async fn quarantine_table_exists(&self) -> Result<bool> {
803        let exists: bool = sqlx::query_scalar(
804            r#"
805            SELECT EXISTS (
806                SELECT FROM information_schema.tables
807                WHERE table_name = 'gem_versions'
808            )
809            "#,
810        )
811        .fetch_one(&self.pool)
812        .await
813        .context("checking quarantine table exists (postgres)")?;
814
815        Ok(exists)
816    }
817
818    async fn run_quarantine_migrations(&self) -> Result<()> {
819        // Create gem_versions table
820        sqlx::query(
821            r#"
822            CREATE TABLE IF NOT EXISTS gem_versions (
823                id BIGSERIAL PRIMARY KEY,
824                name TEXT NOT NULL,
825                version TEXT NOT NULL,
826                platform TEXT,
827                sha256 TEXT,
828                published_at TIMESTAMPTZ NOT NULL,
829                available_after TIMESTAMPTZ NOT NULL,
830                status TEXT NOT NULL DEFAULT 'quarantine',
831                status_reason TEXT,
832                upstream_yanked BOOLEAN NOT NULL DEFAULT FALSE,
833                created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
834                updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
835                CONSTRAINT gem_versions_unique UNIQUE (name, version, platform)
836            )
837            "#,
838        )
839        .execute(&self.pool)
840        .await
841        .context("creating gem_versions table (postgres)")?;
842
843        // Create indexes
844        sqlx::query(
845            "CREATE INDEX IF NOT EXISTS idx_gem_versions_name ON gem_versions(name)",
846        )
847        .execute(&self.pool)
848        .await
849        .context("creating name index (postgres)")?;
850
851        sqlx::query(
852            "CREATE INDEX IF NOT EXISTS idx_gem_versions_status ON gem_versions(status)",
853        )
854        .execute(&self.pool)
855        .await
856        .context("creating status index (postgres)")?;
857
858        sqlx::query(
859            "CREATE INDEX IF NOT EXISTS idx_gv_available ON gem_versions(available_after)",
860        )
861        .execute(&self.pool)
862        .await
863        .context("creating available_after index (postgres)")?;
864
865        Ok(())
866    }
867}
868
869/// Compare two version strings using semver when possible.
870fn compare_versions(a: &str, b: &str) -> std::cmp::Ordering {
871    // Try semver parsing first
872    match (semver::Version::parse(a), semver::Version::parse(b)) {
873        (Ok(va), Ok(vb)) => va.cmp(&vb),
874        // Fall back to string comparison if semver fails
875        _ => a.cmp(b),
876    }
877}