vein_adapter/cache/
sqlite.rs

1use std::path::Path;
2
3use anyhow::{Context, Result};
4use chrono::{DateTime, Duration, Utc};
5use sqlx::{SqlitePool, sqlite::SqlitePoolOptions};
6
7use super::{
8    CacheBackend, GemVersion, QuarantineStats, VersionStatus,
9    models::{CachedAssetRow, DbGemMetadataRow, GemVersionRow},
10    serialization::{hydrate_metadata_row, parse_language_rows, prepare_metadata_strings},
11    types::{AssetKey, CachedAsset, GemMetadata, IndexStats, SbomCoverage},
12};
13
14#[derive(Debug, Clone)]
15pub struct SqliteCacheBackend {
16    pub(crate) pool: SqlitePool,
17}
18
19impl SqliteCacheBackend {
20    pub async fn connect(path: &Path) -> Result<Self> {
21        if let Some(parent) = path.parent() {
22            tokio::fs::create_dir_all(parent)
23                .await
24                .with_context(|| format!("creating database directory {}", parent.display()))?;
25        }
26        let conn_str = format!(
27            "sqlite://{}",
28            path.to_str().context("database path not UTF-8")?
29        );
30        let pool = SqlitePoolOptions::new()
31            .max_connections(16)
32            .connect(&conn_str)
33            .await
34            .with_context(|| format!("connecting to sqlite database {}", path.display()))?;
35        let backend = Self { pool };
36        backend.init_schema().await?;
37        Ok(backend)
38    }
39
40    pub async fn connect_memory() -> Result<Self> {
41        let pool = SqlitePoolOptions::new()
42            .max_connections(16)
43            .connect("sqlite::memory:")
44            .await
45            .context("connecting to in-memory sqlite database")?;
46        let backend = Self { pool };
47        backend.init_schema().await?;
48        Ok(backend)
49    }
50
51    async fn init_schema(&self) -> Result<()> {
52        sqlx::query(
53            r#"
54            CREATE TABLE IF NOT EXISTS cached_assets (
55                kind TEXT NOT NULL,
56                name TEXT NOT NULL,
57                version TEXT NOT NULL,
58                platform TEXT,
59                path TEXT NOT NULL,
60                sha256 TEXT NOT NULL,
61                size_bytes INTEGER NOT NULL,
62                last_accessed TIMESTAMP DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
63                PRIMARY KEY (kind, name, version, platform)
64            )
65            "#,
66        )
67        .execute(&self.pool)
68        .await
69        .context("creating cached_assets table (sqlite)")?;
70
71        sqlx::query(
72            r#"
73            CREATE TABLE IF NOT EXISTS catalog_gems (
74                name TEXT PRIMARY KEY,
75                latest_version TEXT,
76                synced_at TIMESTAMP DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
77            )
78            "#,
79        )
80        .execute(&self.pool)
81        .await
82        .context("creating catalog_gems table (sqlite)")?;
83
84        sqlx::query(
85            r#"
86            CREATE TABLE IF NOT EXISTS catalog_meta (
87                key TEXT PRIMARY KEY,
88                value TEXT NOT NULL
89            )
90            "#,
91        )
92        .execute(&self.pool)
93        .await
94        .context("creating catalog_meta table (sqlite)")?;
95
96        sqlx::query(
97            r#"
98            CREATE TABLE IF NOT EXISTS gem_metadata (
99                name TEXT NOT NULL,
100                version TEXT NOT NULL,
101                platform TEXT,
102                summary TEXT,
103                description TEXT,
104                licenses TEXT,
105                authors TEXT,
106                emails TEXT,
107                homepage TEXT,
108                documentation_url TEXT,
109                changelog_url TEXT,
110                source_code_url TEXT,
111                bug_tracker_url TEXT,
112                wiki_url TEXT,
113                funding_url TEXT,
114                metadata_json TEXT,
115                dependencies_json TEXT NOT NULL,
116                executables_json TEXT,
117                extensions_json TEXT,
118                native_languages_json TEXT,
119                has_native_extensions INTEGER NOT NULL,
120                has_embedded_binaries INTEGER NOT NULL,
121                required_ruby_version TEXT,
122                required_rubygems_version TEXT,
123                rubygems_version TEXT,
124                specification_version INTEGER,
125                built_at TEXT,
126                size_bytes INTEGER,
127                sha256 TEXT,
128                sbom_json TEXT,
129                PRIMARY KEY (name, version, platform)
130            )
131            "#,
132        )
133        .execute(&self.pool)
134        .await
135        .context("creating gem_metadata table (sqlite)")?;
136
137        Ok(())
138    }
139
140    async fn touch(&self, key: &AssetKey<'_>) -> Result<()> {
141        sqlx::query(
142            r#"
143            UPDATE cached_assets
144            SET last_accessed = strftime('%Y-%m-%dT%H:%M:%fZ','now')
145            WHERE kind = ?1 AND name = ?2 AND version = ?3 AND
146                  ((platform IS NULL AND ?4 IS NULL) OR platform = ?4)
147            "#,
148        )
149        .bind(key.kind.as_str())
150        .bind(key.name)
151        .bind(key.version)
152        .bind(key.platform)
153        .execute(&self.pool)
154        .await
155        .context("updating last_accessed")?;
156        Ok(())
157    }
158
159    pub async fn upsert_gem_metadata_record(&self, metadata: &GemMetadata) -> Result<()> {
160        let prepared = prepare_metadata_strings(metadata)?;
161
162        sqlx::query(
163            r#"
164            INSERT INTO gem_metadata(
165                name,
166                version,
167                platform,
168                summary,
169                description,
170                licenses,
171                authors,
172                emails,
173                homepage,
174                documentation_url,
175                changelog_url,
176                source_code_url,
177                bug_tracker_url,
178                wiki_url,
179                funding_url,
180                metadata_json,
181                dependencies_json,
182                executables_json,
183                extensions_json,
184                native_languages_json,
185                has_native_extensions,
186                has_embedded_binaries,
187                required_ruby_version,
188                required_rubygems_version,
189                rubygems_version,
190                specification_version,
191                built_at,
192                size_bytes,
193                sha256,
194                sbom_json
195            ) VALUES (
196                ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10,
197                ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20,
198                ?21, ?22, ?23, ?24, ?25, ?26, ?27, ?28, ?29, ?30
199            )
200            ON CONFLICT(name, version, platform)
201            DO UPDATE SET
202                summary = excluded.summary,
203                description = excluded.description,
204                licenses = excluded.licenses,
205                authors = excluded.authors,
206                emails = excluded.emails,
207                homepage = excluded.homepage,
208                documentation_url = excluded.documentation_url,
209                changelog_url = excluded.changelog_url,
210                source_code_url = excluded.source_code_url,
211                bug_tracker_url = excluded.bug_tracker_url,
212                wiki_url = excluded.wiki_url,
213                funding_url = excluded.funding_url,
214                metadata_json = excluded.metadata_json,
215                dependencies_json = excluded.dependencies_json,
216                executables_json = excluded.executables_json,
217                extensions_json = excluded.extensions_json,
218                native_languages_json = excluded.native_languages_json,
219                has_native_extensions = excluded.has_native_extensions,
220                has_embedded_binaries = excluded.has_embedded_binaries,
221                required_ruby_version = excluded.required_ruby_version,
222                required_rubygems_version = excluded.required_rubygems_version,
223                rubygems_version = excluded.rubygems_version,
224                specification_version = excluded.specification_version,
225                built_at = excluded.built_at,
226                size_bytes = excluded.size_bytes,
227                sha256 = excluded.sha256,
228                sbom_json = excluded.sbom_json
229            "#,
230        )
231        .bind(&metadata.name)
232        .bind(&metadata.version)
233        .bind(metadata.platform.as_deref())
234        .bind(metadata.summary.as_deref())
235        .bind(metadata.description.as_deref())
236        .bind(prepared.licenses_json)
237        .bind(prepared.authors_json)
238        .bind(prepared.emails_json)
239        .bind(metadata.homepage.as_deref())
240        .bind(metadata.documentation_url.as_deref())
241        .bind(metadata.changelog_url.as_deref())
242        .bind(metadata.source_code_url.as_deref())
243        .bind(metadata.bug_tracker_url.as_deref())
244        .bind(metadata.wiki_url.as_deref())
245        .bind(metadata.funding_url.as_deref())
246        .bind(prepared.metadata_json)
247        .bind(prepared.dependencies_json)
248        .bind(prepared.executables_json)
249        .bind(prepared.extensions_json)
250        .bind(prepared.native_languages_json)
251        .bind(metadata.has_native_extensions)
252        .bind(metadata.has_embedded_binaries)
253        .bind(metadata.required_ruby_version.as_deref())
254        .bind(metadata.required_rubygems_version.as_deref())
255        .bind(metadata.rubygems_version.as_deref())
256        .bind(metadata.specification_version)
257        .bind(metadata.built_at.as_deref())
258        .bind(prepared.size_bytes)
259        .bind(&metadata.sha256)
260        .bind(prepared.sbom_json)
261        .execute(&self.pool)
262        .await
263        .context("upserting gem metadata")?;
264
265        Ok(())
266    }
267
268    pub async fn fetch_gem_metadata(
269        &self,
270        name: &str,
271        version: &str,
272        platform: Option<&str>,
273    ) -> Result<Option<GemMetadata>> {
274        let record = sqlx::query_as::<_, DbGemMetadataRow>(
275            r#"
276            SELECT
277                name,
278                version,
279                platform,
280                summary,
281                description,
282                licenses,
283                authors,
284                emails,
285                homepage,
286                documentation_url,
287                changelog_url,
288                source_code_url,
289                bug_tracker_url,
290                wiki_url,
291                funding_url,
292                metadata_json,
293                dependencies_json,
294                executables_json,
295                extensions_json,
296                native_languages_json,
297                has_native_extensions,
298                has_embedded_binaries,
299                required_ruby_version,
300                required_rubygems_version,
301                rubygems_version,
302                specification_version,
303                built_at,
304                size_bytes,
305                sha256,
306                sbom_json
307            FROM gem_metadata
308            WHERE name = ?1
309              AND version = ?2
310              AND ((platform IS NULL AND ?3 IS NULL) OR platform = ?3)
311            "#,
312        )
313        .bind(name)
314        .bind(version)
315        .bind(platform)
316        .fetch_optional(&self.pool)
317        .await
318        .context("fetching gem metadata record")?;
319
320        match record {
321            Some(row) => hydrate_metadata_row(row).map(Some),
322            None => Ok(None),
323        }
324    }
325
326    pub async fn sbom_coverage_stats(&self) -> Result<SbomCoverage> {
327        let (total, with_sbom) = sqlx::query_as::<_, (i64, i64)>(
328            r#"
329            SELECT
330                COUNT(*) as total,
331                COALESCE(
332                    SUM(CASE WHEN sbom_json IS NOT NULL AND sbom_json <> ''
333                        THEN 1 ELSE 0 END),
334                    0
335                ) as with_sbom
336            FROM gem_metadata
337            "#,
338        )
339        .fetch_one(&self.pool)
340        .await
341        .context("querying SBOM coverage (postgres)")?;
342
343        Ok(SbomCoverage {
344            metadata_rows: total.max(0) as u64,
345            with_sbom: with_sbom.max(0) as u64,
346        })
347    }
348}
349
350impl CacheBackend for SqliteCacheBackend {
351    async fn get(&self, key: &AssetKey<'_>) -> Result<Option<CachedAsset>> {
352        let record = sqlx::query_as::<_, CachedAssetRow>(
353            r#"
354            SELECT path, sha256, size_bytes, last_accessed
355            FROM cached_assets
356            WHERE kind = ?1 AND name = ?2 AND version = ?3 AND
357                  ((platform IS NULL AND ?4 IS NULL) OR platform = ?4)
358            "#,
359        )
360        .bind(key.kind.as_str())
361        .bind(key.name)
362        .bind(key.version)
363        .bind(key.platform)
364        .fetch_optional(&self.pool)
365        .await
366        .context("fetching cached asset")?;
367
368        if let Some(row) = record {
369            self.touch(key).await?;
370            Ok(Some(row.into()))
371        } else {
372            Ok(None)
373        }
374    }
375
376    async fn insert_or_replace(
377        &self,
378        key: &AssetKey<'_>,
379        path: &str,
380        sha256: &str,
381        size_bytes: u64,
382    ) -> Result<()> {
383        sqlx::query(
384            r#"
385            INSERT INTO cached_assets(
386                kind, name, version, platform, path, sha256, size_bytes, last_accessed
387            )
388            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, strftime('%Y-%m-%dT%H:%M:%fZ','now'))
389            ON CONFLICT(kind, name, version, platform)
390            DO UPDATE SET
391                path = excluded.path,
392                sha256 = excluded.sha256,
393                size_bytes = excluded.size_bytes,
394                last_accessed = excluded.last_accessed
395            "#,
396        )
397        .bind(key.kind.as_str())
398        .bind(key.name)
399        .bind(key.version)
400        .bind(key.platform)
401        .bind(path)
402        .bind(sha256)
403        .bind(size_bytes as i64)
404        .execute(&self.pool)
405        .await
406        .context("inserting cached asset")?;
407        Ok(())
408    }
409
410    async fn get_all_gems(&self) -> Result<Vec<(String, String)>> {
411        let rows = sqlx::query_as::<_, (String, String)>(
412            r#"
413            SELECT DISTINCT name, version
414            FROM cached_assets
415            WHERE kind = 'gem'
416            ORDER BY name, version
417            "#,
418        )
419        .fetch_all(&self.pool)
420        .await
421        .context("fetching all gems")?;
422
423        Ok(rows)
424    }
425
426    async fn stats(&self) -> Result<IndexStats> {
427        let total_assets: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM cached_assets")
428            .fetch_one(&self.pool)
429            .await
430            .context("counting cached assets")?;
431
432        let gem_assets: i64 =
433            sqlx::query_scalar("SELECT COUNT(*) FROM cached_assets WHERE kind = 'gem'")
434                .fetch_one(&self.pool)
435                .await
436                .context("counting gem assets")?;
437
438        let spec_assets: i64 =
439            sqlx::query_scalar("SELECT COUNT(*) FROM cached_assets WHERE kind = 'gemspec'")
440                .fetch_one(&self.pool)
441                .await
442                .context("counting gemspec assets")?;
443
444        let unique_gems: i64 =
445            sqlx::query_scalar("SELECT COUNT(DISTINCT name) FROM cached_assets WHERE kind = 'gem'")
446                .fetch_one(&self.pool)
447                .await
448                .context("counting unique gems")?;
449
450        let total_size_bytes: i64 =
451            sqlx::query_scalar("SELECT COALESCE(SUM(size_bytes), 0) FROM cached_assets")
452                .fetch_one(&self.pool)
453                .await
454                .context("summing cached asset sizes")?;
455
456        let last_accessed: Option<String> =
457            sqlx::query_scalar("SELECT MAX(last_accessed) FROM cached_assets")
458                .fetch_one(&self.pool)
459                .await
460                .context("fetching last access timestamp")?;
461
462        Ok(IndexStats {
463            total_assets: total_assets.max(0) as u64,
464            gem_assets: gem_assets.max(0) as u64,
465            spec_assets: spec_assets.max(0) as u64,
466            unique_gems: unique_gems.max(0) as u64,
467            total_size_bytes: total_size_bytes.max(0) as u64,
468            last_accessed,
469        })
470    }
471
472    async fn catalog_upsert_names(&self, names: &[String]) -> Result<()> {
473        if names.is_empty() {
474            return Ok(());
475        }
476        let mut tx = self.pool.begin().await?;
477        for name in names {
478            sqlx::query(
479                r#"
480                INSERT INTO catalog_gems(name, synced_at)
481                VALUES(?1, strftime('%Y-%m-%dT%H:%M:%fZ','now'))
482                ON CONFLICT(name) DO UPDATE SET synced_at = excluded.synced_at
483                "#,
484            )
485            .bind(name)
486            .execute(&mut *tx)
487            .await
488            .with_context(|| format!("upserting catalog entry {}", name))?;
489        }
490        tx.commit().await.context("committing catalog upsert")?;
491        Ok(())
492    }
493
494    async fn catalog_total(&self) -> Result<u64> {
495        let total: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM catalog_gems")
496            .fetch_one(&self.pool)
497            .await
498            .context("counting catalog gems")?;
499        Ok(total.max(0) as u64)
500    }
501
502    async fn catalog_page(&self, offset: i64, limit: i64) -> Result<Vec<String>> {
503        let rows = sqlx::query_scalar::<_, String>(
504            r#"
505            SELECT name
506            FROM catalog_gems
507            ORDER BY name
508            LIMIT ?1 OFFSET ?2
509            "#,
510        )
511        .bind(limit)
512        .bind(offset)
513        .fetch_all(&self.pool)
514        .await
515        .context("fetching catalog page")?;
516        Ok(rows)
517    }
518
519    async fn catalog_meta_get(&self, key: &str) -> Result<Option<String>> {
520        let value =
521            sqlx::query_scalar::<_, String>("SELECT value FROM catalog_meta WHERE key = ?1")
522                .bind(key)
523                .fetch_optional(&self.pool)
524                .await
525                .context("fetching catalog meta value")?;
526        Ok(value)
527    }
528
529    async fn catalog_meta_set(&self, key: &str, value: &str) -> Result<()> {
530        sqlx::query(
531            r#"
532            INSERT INTO catalog_meta(key, value)
533            VALUES(?1, ?2)
534            ON CONFLICT(key) DO UPDATE SET value = excluded.value
535            "#,
536        )
537        .bind(key)
538        .bind(value)
539        .execute(&self.pool)
540        .await
541        .context("upserting catalog meta value")?;
542        Ok(())
543    }
544
545    async fn upsert_metadata(&self, metadata: &GemMetadata) -> Result<()> {
546        self.upsert_gem_metadata_record(metadata).await
547    }
548
549    async fn gem_metadata(
550        &self,
551        name: &str,
552        version: &str,
553        platform: Option<&str>,
554    ) -> Result<Option<GemMetadata>> {
555        self.fetch_gem_metadata(name, version, platform).await
556    }
557
558    async fn sbom_coverage(&self) -> Result<SbomCoverage> {
559        let (total, with_sbom) = sqlx::query_as::<_, (i64, i64)>(
560            r#"
561            SELECT
562                COUNT(*) as total,
563                COALESCE(
564                    SUM(CASE WHEN sbom_json IS NOT NULL AND sbom_json <> ''
565                        THEN 1 ELSE 0 END),
566                    0
567                ) as with_sbom
568            FROM gem_metadata
569            "#,
570        )
571        .fetch_one(&self.pool)
572        .await
573        .context("querying SBOM coverage (sqlite)")?;
574
575        Ok(SbomCoverage {
576            metadata_rows: total.max(0) as u64,
577            with_sbom: with_sbom.max(0) as u64,
578        })
579    }
580
581    async fn catalog_languages(&self) -> Result<Vec<String>> {
582        let rows = sqlx::query_scalar::<_, Option<String>>(
583            r#"
584            SELECT native_languages_json
585            FROM gem_metadata
586            WHERE native_languages_json IS NOT NULL AND native_languages_json <> ''
587            "#,
588        )
589        .fetch_all(&self.pool)
590        .await
591        .context("fetching native languages (sqlite)")?;
592
593        parse_language_rows(rows)
594    }
595
596    async fn catalog_page_by_language(
597        &self,
598        language: &str,
599        offset: i64,
600        limit: i64,
601    ) -> Result<Vec<String>> {
602        let pattern = format!("%\"{}\"%", language);
603        let rows = sqlx::query_scalar::<_, String>(
604            r#"
605            SELECT DISTINCT name
606            FROM gem_metadata
607            WHERE native_languages_json LIKE ?1
608            ORDER BY name
609            LIMIT ?2 OFFSET ?3
610            "#,
611        )
612        .bind(pattern)
613        .bind(limit)
614        .bind(offset)
615        .fetch_all(&self.pool)
616        .await
617        .context("fetching catalog page by language (sqlite)")?;
618        Ok(rows)
619    }
620
621    async fn catalog_total_by_language(&self, language: &str) -> Result<u64> {
622        let pattern = format!("%\"{}\"%", language);
623        let total: i64 = sqlx::query_scalar(
624            r#"
625            SELECT COUNT(DISTINCT name)
626            FROM gem_metadata
627            WHERE native_languages_json LIKE ?1
628            "#,
629        )
630        .bind(pattern)
631        .fetch_one(&self.pool)
632        .await
633        .context("counting catalog gems by language (sqlite)")?;
634        Ok(total.max(0) as u64)
635    }
636
637    // ==================== Quarantine Methods ====================
638
639    async fn get_gem_version(
640        &self,
641        name: &str,
642        version: &str,
643        platform: Option<&str>,
644    ) -> Result<Option<GemVersion>> {
645        let row = sqlx::query_as::<_, GemVersionRow>(
646            r#"
647            SELECT id, name, version, platform, sha256, published_at, available_after,
648                   status, status_reason, upstream_yanked, created_at, updated_at
649            FROM gem_versions
650            WHERE name = ?1
651              AND version = ?2
652              AND ((platform IS NULL AND ?3 IS NULL) OR platform = ?3)
653            "#,
654        )
655        .bind(name)
656        .bind(version)
657        .bind(platform)
658        .fetch_optional(&self.pool)
659        .await
660        .context("fetching gem version (sqlite)")?;
661
662        Ok(row.map(Into::into))
663    }
664
665    async fn upsert_gem_version(&self, gem_version: &GemVersion) -> Result<()> {
666        let now = Utc::now().to_rfc3339();
667
668        sqlx::query(
669            r#"
670            INSERT INTO gem_versions (
671                name, version, platform, sha256, published_at, available_after,
672                status, status_reason, upstream_yanked, created_at, updated_at
673            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
674            ON CONFLICT (name, version, platform)
675            DO UPDATE SET
676                sha256 = excluded.sha256,
677                published_at = excluded.published_at,
678                available_after = excluded.available_after,
679                status = excluded.status,
680                status_reason = excluded.status_reason,
681                upstream_yanked = excluded.upstream_yanked,
682                updated_at = excluded.updated_at
683            "#,
684        )
685        .bind(&gem_version.name)
686        .bind(&gem_version.version)
687        .bind(gem_version.platform.as_deref())
688        .bind(gem_version.sha256.as_deref())
689        .bind(gem_version.published_at.to_rfc3339())
690        .bind(gem_version.available_after.to_rfc3339())
691        .bind(gem_version.status.to_string())
692        .bind(gem_version.status_reason.as_deref())
693        .bind(gem_version.upstream_yanked)
694        .bind(gem_version.created_at.to_rfc3339())
695        .bind(&now)
696        .execute(&self.pool)
697        .await
698        .context("upserting gem version (sqlite)")?;
699
700        Ok(())
701    }
702
703    async fn get_latest_available_version(
704        &self,
705        name: &str,
706        platform: Option<&str>,
707        now: DateTime<Utc>,
708    ) -> Result<Option<GemVersion>> {
709        let now_str = now.to_rfc3339();
710
711        // Get all available versions and sort in Rust for proper semver comparison
712        let rows = sqlx::query_as::<_, GemVersionRow>(
713            r#"
714            SELECT id, name, version, platform, sha256, published_at, available_after,
715                   status, status_reason, upstream_yanked, created_at, updated_at
716            FROM gem_versions
717            WHERE name = ?1
718              AND ((platform IS NULL AND ?2 IS NULL) OR platform = ?2)
719              AND upstream_yanked = FALSE
720              AND (status = 'available' OR status = 'pinned'
721                   OR (status = 'quarantine' AND available_after <= ?3))
722            "#,
723        )
724        .bind(name)
725        .bind(platform)
726        .bind(&now_str)
727        .fetch_all(&self.pool)
728        .await
729        .context("fetching available versions (sqlite)")?;
730
731        // Find the latest version using semver comparison
732        let mut versions: Vec<GemVersion> = rows.into_iter().map(Into::into).collect();
733        versions.sort_by(|a, b| {
734            compare_versions(&b.version, &a.version) // Reverse for descending
735        });
736
737        Ok(versions.into_iter().next())
738    }
739
740    async fn get_quarantined_versions(
741        &self,
742        name: &str,
743        now: DateTime<Utc>,
744    ) -> Result<Vec<GemVersion>> {
745        let now_str = now.to_rfc3339();
746
747        let rows = sqlx::query_as::<_, GemVersionRow>(
748            r#"
749            SELECT id, name, version, platform, sha256, published_at, available_after,
750                   status, status_reason, upstream_yanked, created_at, updated_at
751            FROM gem_versions
752            WHERE name = ?1
753              AND status = 'quarantine'
754              AND available_after > ?2
755            ORDER BY version DESC
756            "#,
757        )
758        .bind(name)
759        .bind(&now_str)
760        .fetch_all(&self.pool)
761        .await
762        .context("fetching quarantined versions (sqlite)")?;
763
764        Ok(rows.into_iter().map(Into::into).collect())
765    }
766
767    async fn update_version_status(
768        &self,
769        name: &str,
770        version: &str,
771        platform: Option<&str>,
772        status: VersionStatus,
773        reason: Option<String>,
774    ) -> Result<()> {
775        let now = Utc::now().to_rfc3339();
776
777        sqlx::query(
778            r#"
779            UPDATE gem_versions
780            SET status = ?1, status_reason = ?2, updated_at = ?3
781            WHERE name = ?4
782              AND version = ?5
783              AND ((platform IS NULL AND ?6 IS NULL) OR platform = ?6)
784            "#,
785        )
786        .bind(status.to_string())
787        .bind(reason)
788        .bind(&now)
789        .bind(name)
790        .bind(version)
791        .bind(platform)
792        .execute(&self.pool)
793        .await
794        .context("updating version status (sqlite)")?;
795
796        Ok(())
797    }
798
799    async fn promote_expired_quarantines(&self, now: DateTime<Utc>) -> Result<u64> {
800        let now_str = now.to_rfc3339();
801
802        let result = sqlx::query(
803            r#"
804            UPDATE gem_versions
805            SET status = 'available', status_reason = 'auto-promoted', updated_at = ?1
806            WHERE status = 'quarantine'
807              AND available_after <= ?2
808            "#,
809        )
810        .bind(&now_str)
811        .bind(&now_str)
812        .execute(&self.pool)
813        .await
814        .context("promoting expired quarantines (sqlite)")?;
815
816        Ok(result.rows_affected())
817    }
818
819    async fn mark_yanked(&self, name: &str, version: &str) -> Result<()> {
820        let now = Utc::now().to_rfc3339();
821
822        sqlx::query(
823            r#"
824            UPDATE gem_versions
825            SET status = 'yanked', upstream_yanked = TRUE, updated_at = ?1
826            WHERE name = ?2 AND version = ?3
827            "#,
828        )
829        .bind(&now)
830        .bind(name)
831        .bind(version)
832        .execute(&self.pool)
833        .await
834        .context("marking version yanked (sqlite)")?;
835
836        Ok(())
837    }
838
839    async fn get_all_quarantined(&self, limit: u32, offset: u32) -> Result<Vec<GemVersion>> {
840        let rows = sqlx::query_as::<_, GemVersionRow>(
841            r#"
842            SELECT id, name, version, platform, sha256, published_at, available_after,
843                   status, status_reason, upstream_yanked, created_at, updated_at
844            FROM gem_versions
845            WHERE status = 'quarantine'
846            ORDER BY available_after ASC
847            LIMIT ?1 OFFSET ?2
848            "#,
849        )
850        .bind(limit)
851        .bind(offset)
852        .fetch_all(&self.pool)
853        .await
854        .context("fetching all quarantined (sqlite)")?;
855
856        Ok(rows.into_iter().map(Into::into).collect())
857    }
858
859    async fn quarantine_stats(&self) -> Result<QuarantineStats> {
860        let now = Utc::now();
861        let today_end = (now + Duration::days(1)).to_rfc3339();
862        let week_end = (now + Duration::days(7)).to_rfc3339();
863        let now_str = now.to_rfc3339();
864
865        let (quarantined, available, yanked, pinned): (i64, i64, i64, i64) = sqlx::query_as(
866            r#"
867            SELECT
868                COALESCE(SUM(CASE WHEN status = 'quarantine' THEN 1 ELSE 0 END), 0),
869                COALESCE(SUM(CASE WHEN status = 'available' THEN 1 ELSE 0 END), 0),
870                COALESCE(SUM(CASE WHEN status = 'yanked' THEN 1 ELSE 0 END), 0),
871                COALESCE(SUM(CASE WHEN status = 'pinned' THEN 1 ELSE 0 END), 0)
872            FROM gem_versions
873            "#,
874        )
875        .fetch_one(&self.pool)
876        .await
877        .context("fetching quarantine counts (sqlite)")?;
878
879        let releasing_today: i64 = sqlx::query_scalar(
880            r#"
881            SELECT COUNT(*)
882            FROM gem_versions
883            WHERE status = 'quarantine'
884              AND available_after > ?1
885              AND available_after <= ?2
886            "#,
887        )
888        .bind(&now_str)
889        .bind(&today_end)
890        .fetch_one(&self.pool)
891        .await
892        .context("counting versions releasing today (sqlite)")?;
893
894        let releasing_week: i64 = sqlx::query_scalar(
895            r#"
896            SELECT COUNT(*)
897            FROM gem_versions
898            WHERE status = 'quarantine'
899              AND available_after > ?1
900              AND available_after <= ?2
901            "#,
902        )
903        .bind(&now_str)
904        .bind(&week_end)
905        .fetch_one(&self.pool)
906        .await
907        .context("counting versions releasing this week (sqlite)")?;
908
909        Ok(QuarantineStats {
910            total_quarantined: quarantined.max(0) as u64,
911            total_available: available.max(0) as u64,
912            total_yanked: yanked.max(0) as u64,
913            total_pinned: pinned.max(0) as u64,
914            versions_releasing_today: releasing_today.max(0) as u64,
915            versions_releasing_this_week: releasing_week.max(0) as u64,
916        })
917    }
918
919    async fn get_gem_versions_for_index(&self, name: &str) -> Result<Vec<GemVersion>> {
920        let rows = sqlx::query_as::<_, GemVersionRow>(
921            r#"
922            SELECT id, name, version, platform, sha256, published_at, available_after,
923                   status, status_reason, upstream_yanked, created_at, updated_at
924            FROM gem_versions
925            WHERE name = ?1
926            ORDER BY version DESC
927            "#,
928        )
929        .bind(name)
930        .fetch_all(&self.pool)
931        .await
932        .context("fetching gem versions for index (sqlite)")?;
933
934        Ok(rows.into_iter().map(Into::into).collect())
935    }
936
937    async fn quarantine_table_exists(&self) -> Result<bool> {
938        let exists: i64 = sqlx::query_scalar(
939            r#"
940            SELECT COUNT(*)
941            FROM sqlite_master
942            WHERE type = 'table' AND name = 'gem_versions'
943            "#,
944        )
945        .fetch_one(&self.pool)
946        .await
947        .context("checking quarantine table exists (sqlite)")?;
948
949        Ok(exists > 0)
950    }
951
952    async fn run_quarantine_migrations(&self) -> Result<()> {
953        // Create gem_versions table
954        sqlx::query(
955            r#"
956            CREATE TABLE IF NOT EXISTS gem_versions (
957                id INTEGER PRIMARY KEY AUTOINCREMENT,
958                name TEXT NOT NULL,
959                version TEXT NOT NULL,
960                platform TEXT,
961                sha256 TEXT,
962                published_at TEXT NOT NULL,
963                available_after TEXT NOT NULL,
964                status TEXT NOT NULL DEFAULT 'quarantine',
965                status_reason TEXT,
966                upstream_yanked INTEGER NOT NULL DEFAULT 0,
967                created_at TEXT NOT NULL,
968                updated_at TEXT NOT NULL,
969                UNIQUE(name, version, platform)
970            )
971            "#,
972        )
973        .execute(&self.pool)
974        .await
975        .context("creating gem_versions table (sqlite)")?;
976
977        // Create indexes
978        sqlx::query(
979            "CREATE INDEX IF NOT EXISTS idx_gem_versions_name ON gem_versions(name)",
980        )
981        .execute(&self.pool)
982        .await
983        .context("creating name index (sqlite)")?;
984
985        sqlx::query(
986            "CREATE INDEX IF NOT EXISTS idx_gem_versions_status ON gem_versions(status)",
987        )
988        .execute(&self.pool)
989        .await
990        .context("creating status index (sqlite)")?;
991
992        sqlx::query(
993            "CREATE INDEX IF NOT EXISTS idx_gv_available ON gem_versions(available_after)",
994        )
995        .execute(&self.pool)
996        .await
997        .context("creating available_after index (sqlite)")?;
998
999        Ok(())
1000    }
1001}
1002
1003/// Compare two version strings using semver when possible.
1004fn compare_versions(a: &str, b: &str) -> std::cmp::Ordering {
1005    // Try semver parsing first
1006    match (semver::Version::parse(a), semver::Version::parse(b)) {
1007        (Ok(va), Ok(vb)) => va.cmp(&vb),
1008        // Fall back to string comparison if semver fails
1009        _ => a.cmp(b),
1010    }
1011}