1use std::path::Path;
2
3use anyhow::{Context, Result};
4use chrono::{DateTime, Duration, Utc};
5use sqlx::{SqlitePool, sqlite::SqlitePoolOptions};
6
7use super::{
8 CacheBackend, GemVersion, QuarantineStats, VersionStatus,
9 models::{CachedAssetRow, DbGemMetadataRow, GemVersionRow},
10 serialization::{hydrate_metadata_row, parse_language_rows, prepare_metadata_strings},
11 types::{AssetKey, CachedAsset, GemMetadata, IndexStats, SbomCoverage},
12};
13
14#[derive(Debug, Clone)]
15pub struct SqliteCacheBackend {
16 pub(crate) pool: SqlitePool,
17}
18
19impl SqliteCacheBackend {
20 pub async fn connect(path: &Path) -> Result<Self> {
21 if let Some(parent) = path.parent() {
22 tokio::fs::create_dir_all(parent)
23 .await
24 .with_context(|| format!("creating database directory {}", parent.display()))?;
25 }
26 let conn_str = format!(
27 "sqlite://{}",
28 path.to_str().context("database path not UTF-8")?
29 );
30 let pool = SqlitePoolOptions::new()
31 .max_connections(16)
32 .connect(&conn_str)
33 .await
34 .with_context(|| format!("connecting to sqlite database {}", path.display()))?;
35 let backend = Self { pool };
36 backend.init_schema().await?;
37 Ok(backend)
38 }
39
40 pub async fn connect_memory() -> Result<Self> {
41 let pool = SqlitePoolOptions::new()
42 .max_connections(16)
43 .connect("sqlite::memory:")
44 .await
45 .context("connecting to in-memory sqlite database")?;
46 let backend = Self { pool };
47 backend.init_schema().await?;
48 Ok(backend)
49 }
50
51 async fn init_schema(&self) -> Result<()> {
52 sqlx::query(
53 r#"
54 CREATE TABLE IF NOT EXISTS cached_assets (
55 kind TEXT NOT NULL,
56 name TEXT NOT NULL,
57 version TEXT NOT NULL,
58 platform TEXT,
59 path TEXT NOT NULL,
60 sha256 TEXT NOT NULL,
61 size_bytes INTEGER NOT NULL,
62 last_accessed TIMESTAMP DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
63 PRIMARY KEY (kind, name, version, platform)
64 )
65 "#,
66 )
67 .execute(&self.pool)
68 .await
69 .context("creating cached_assets table (sqlite)")?;
70
71 sqlx::query(
72 r#"
73 CREATE TABLE IF NOT EXISTS catalog_gems (
74 name TEXT PRIMARY KEY,
75 latest_version TEXT,
76 synced_at TIMESTAMP DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
77 )
78 "#,
79 )
80 .execute(&self.pool)
81 .await
82 .context("creating catalog_gems table (sqlite)")?;
83
84 sqlx::query(
85 r#"
86 CREATE TABLE IF NOT EXISTS catalog_meta (
87 key TEXT PRIMARY KEY,
88 value TEXT NOT NULL
89 )
90 "#,
91 )
92 .execute(&self.pool)
93 .await
94 .context("creating catalog_meta table (sqlite)")?;
95
96 sqlx::query(
97 r#"
98 CREATE TABLE IF NOT EXISTS gem_metadata (
99 name TEXT NOT NULL,
100 version TEXT NOT NULL,
101 platform TEXT,
102 summary TEXT,
103 description TEXT,
104 licenses TEXT,
105 authors TEXT,
106 emails TEXT,
107 homepage TEXT,
108 documentation_url TEXT,
109 changelog_url TEXT,
110 source_code_url TEXT,
111 bug_tracker_url TEXT,
112 wiki_url TEXT,
113 funding_url TEXT,
114 metadata_json TEXT,
115 dependencies_json TEXT NOT NULL,
116 executables_json TEXT,
117 extensions_json TEXT,
118 native_languages_json TEXT,
119 has_native_extensions INTEGER NOT NULL,
120 has_embedded_binaries INTEGER NOT NULL,
121 required_ruby_version TEXT,
122 required_rubygems_version TEXT,
123 rubygems_version TEXT,
124 specification_version INTEGER,
125 built_at TEXT,
126 size_bytes INTEGER,
127 sha256 TEXT,
128 sbom_json TEXT,
129 PRIMARY KEY (name, version, platform)
130 )
131 "#,
132 )
133 .execute(&self.pool)
134 .await
135 .context("creating gem_metadata table (sqlite)")?;
136
137 Ok(())
138 }
139
140 async fn touch(&self, key: &AssetKey<'_>) -> Result<()> {
141 sqlx::query(
142 r#"
143 UPDATE cached_assets
144 SET last_accessed = strftime('%Y-%m-%dT%H:%M:%fZ','now')
145 WHERE kind = ?1 AND name = ?2 AND version = ?3 AND
146 ((platform IS NULL AND ?4 IS NULL) OR platform = ?4)
147 "#,
148 )
149 .bind(key.kind.as_str())
150 .bind(key.name)
151 .bind(key.version)
152 .bind(key.platform)
153 .execute(&self.pool)
154 .await
155 .context("updating last_accessed")?;
156 Ok(())
157 }
158
159 pub async fn upsert_gem_metadata_record(&self, metadata: &GemMetadata) -> Result<()> {
160 let prepared = prepare_metadata_strings(metadata)?;
161
162 sqlx::query(
163 r#"
164 INSERT INTO gem_metadata(
165 name,
166 version,
167 platform,
168 summary,
169 description,
170 licenses,
171 authors,
172 emails,
173 homepage,
174 documentation_url,
175 changelog_url,
176 source_code_url,
177 bug_tracker_url,
178 wiki_url,
179 funding_url,
180 metadata_json,
181 dependencies_json,
182 executables_json,
183 extensions_json,
184 native_languages_json,
185 has_native_extensions,
186 has_embedded_binaries,
187 required_ruby_version,
188 required_rubygems_version,
189 rubygems_version,
190 specification_version,
191 built_at,
192 size_bytes,
193 sha256,
194 sbom_json
195 ) VALUES (
196 ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10,
197 ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20,
198 ?21, ?22, ?23, ?24, ?25, ?26, ?27, ?28, ?29, ?30
199 )
200 ON CONFLICT(name, version, platform)
201 DO UPDATE SET
202 summary = excluded.summary,
203 description = excluded.description,
204 licenses = excluded.licenses,
205 authors = excluded.authors,
206 emails = excluded.emails,
207 homepage = excluded.homepage,
208 documentation_url = excluded.documentation_url,
209 changelog_url = excluded.changelog_url,
210 source_code_url = excluded.source_code_url,
211 bug_tracker_url = excluded.bug_tracker_url,
212 wiki_url = excluded.wiki_url,
213 funding_url = excluded.funding_url,
214 metadata_json = excluded.metadata_json,
215 dependencies_json = excluded.dependencies_json,
216 executables_json = excluded.executables_json,
217 extensions_json = excluded.extensions_json,
218 native_languages_json = excluded.native_languages_json,
219 has_native_extensions = excluded.has_native_extensions,
220 has_embedded_binaries = excluded.has_embedded_binaries,
221 required_ruby_version = excluded.required_ruby_version,
222 required_rubygems_version = excluded.required_rubygems_version,
223 rubygems_version = excluded.rubygems_version,
224 specification_version = excluded.specification_version,
225 built_at = excluded.built_at,
226 size_bytes = excluded.size_bytes,
227 sha256 = excluded.sha256,
228 sbom_json = excluded.sbom_json
229 "#,
230 )
231 .bind(&metadata.name)
232 .bind(&metadata.version)
233 .bind(metadata.platform.as_deref())
234 .bind(metadata.summary.as_deref())
235 .bind(metadata.description.as_deref())
236 .bind(prepared.licenses_json)
237 .bind(prepared.authors_json)
238 .bind(prepared.emails_json)
239 .bind(metadata.homepage.as_deref())
240 .bind(metadata.documentation_url.as_deref())
241 .bind(metadata.changelog_url.as_deref())
242 .bind(metadata.source_code_url.as_deref())
243 .bind(metadata.bug_tracker_url.as_deref())
244 .bind(metadata.wiki_url.as_deref())
245 .bind(metadata.funding_url.as_deref())
246 .bind(prepared.metadata_json)
247 .bind(prepared.dependencies_json)
248 .bind(prepared.executables_json)
249 .bind(prepared.extensions_json)
250 .bind(prepared.native_languages_json)
251 .bind(metadata.has_native_extensions)
252 .bind(metadata.has_embedded_binaries)
253 .bind(metadata.required_ruby_version.as_deref())
254 .bind(metadata.required_rubygems_version.as_deref())
255 .bind(metadata.rubygems_version.as_deref())
256 .bind(metadata.specification_version)
257 .bind(metadata.built_at.as_deref())
258 .bind(prepared.size_bytes)
259 .bind(&metadata.sha256)
260 .bind(prepared.sbom_json)
261 .execute(&self.pool)
262 .await
263 .context("upserting gem metadata")?;
264
265 Ok(())
266 }
267
268 pub async fn fetch_gem_metadata(
269 &self,
270 name: &str,
271 version: &str,
272 platform: Option<&str>,
273 ) -> Result<Option<GemMetadata>> {
274 let record = sqlx::query_as::<_, DbGemMetadataRow>(
275 r#"
276 SELECT
277 name,
278 version,
279 platform,
280 summary,
281 description,
282 licenses,
283 authors,
284 emails,
285 homepage,
286 documentation_url,
287 changelog_url,
288 source_code_url,
289 bug_tracker_url,
290 wiki_url,
291 funding_url,
292 metadata_json,
293 dependencies_json,
294 executables_json,
295 extensions_json,
296 native_languages_json,
297 has_native_extensions,
298 has_embedded_binaries,
299 required_ruby_version,
300 required_rubygems_version,
301 rubygems_version,
302 specification_version,
303 built_at,
304 size_bytes,
305 sha256,
306 sbom_json
307 FROM gem_metadata
308 WHERE name = ?1
309 AND version = ?2
310 AND ((platform IS NULL AND ?3 IS NULL) OR platform = ?3)
311 "#,
312 )
313 .bind(name)
314 .bind(version)
315 .bind(platform)
316 .fetch_optional(&self.pool)
317 .await
318 .context("fetching gem metadata record")?;
319
320 match record {
321 Some(row) => hydrate_metadata_row(row).map(Some),
322 None => Ok(None),
323 }
324 }
325
326 pub async fn sbom_coverage_stats(&self) -> Result<SbomCoverage> {
327 let (total, with_sbom) = sqlx::query_as::<_, (i64, i64)>(
328 r#"
329 SELECT
330 COUNT(*) as total,
331 COALESCE(
332 SUM(CASE WHEN sbom_json IS NOT NULL AND sbom_json <> ''
333 THEN 1 ELSE 0 END),
334 0
335 ) as with_sbom
336 FROM gem_metadata
337 "#,
338 )
339 .fetch_one(&self.pool)
340 .await
341 .context("querying SBOM coverage (postgres)")?;
342
343 Ok(SbomCoverage {
344 metadata_rows: total.max(0) as u64,
345 with_sbom: with_sbom.max(0) as u64,
346 })
347 }
348}
349
350impl CacheBackend for SqliteCacheBackend {
351 async fn get(&self, key: &AssetKey<'_>) -> Result<Option<CachedAsset>> {
352 let record = sqlx::query_as::<_, CachedAssetRow>(
353 r#"
354 SELECT path, sha256, size_bytes, last_accessed
355 FROM cached_assets
356 WHERE kind = ?1 AND name = ?2 AND version = ?3 AND
357 ((platform IS NULL AND ?4 IS NULL) OR platform = ?4)
358 "#,
359 )
360 .bind(key.kind.as_str())
361 .bind(key.name)
362 .bind(key.version)
363 .bind(key.platform)
364 .fetch_optional(&self.pool)
365 .await
366 .context("fetching cached asset")?;
367
368 if let Some(row) = record {
369 self.touch(key).await?;
370 Ok(Some(row.into()))
371 } else {
372 Ok(None)
373 }
374 }
375
376 async fn insert_or_replace(
377 &self,
378 key: &AssetKey<'_>,
379 path: &str,
380 sha256: &str,
381 size_bytes: u64,
382 ) -> Result<()> {
383 sqlx::query(
384 r#"
385 INSERT INTO cached_assets(
386 kind, name, version, platform, path, sha256, size_bytes, last_accessed
387 )
388 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, strftime('%Y-%m-%dT%H:%M:%fZ','now'))
389 ON CONFLICT(kind, name, version, platform)
390 DO UPDATE SET
391 path = excluded.path,
392 sha256 = excluded.sha256,
393 size_bytes = excluded.size_bytes,
394 last_accessed = excluded.last_accessed
395 "#,
396 )
397 .bind(key.kind.as_str())
398 .bind(key.name)
399 .bind(key.version)
400 .bind(key.platform)
401 .bind(path)
402 .bind(sha256)
403 .bind(size_bytes as i64)
404 .execute(&self.pool)
405 .await
406 .context("inserting cached asset")?;
407 Ok(())
408 }
409
410 async fn get_all_gems(&self) -> Result<Vec<(String, String)>> {
411 let rows = sqlx::query_as::<_, (String, String)>(
412 r#"
413 SELECT DISTINCT name, version
414 FROM cached_assets
415 WHERE kind = 'gem'
416 ORDER BY name, version
417 "#,
418 )
419 .fetch_all(&self.pool)
420 .await
421 .context("fetching all gems")?;
422
423 Ok(rows)
424 }
425
426 async fn stats(&self) -> Result<IndexStats> {
427 let total_assets: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM cached_assets")
428 .fetch_one(&self.pool)
429 .await
430 .context("counting cached assets")?;
431
432 let gem_assets: i64 =
433 sqlx::query_scalar("SELECT COUNT(*) FROM cached_assets WHERE kind = 'gem'")
434 .fetch_one(&self.pool)
435 .await
436 .context("counting gem assets")?;
437
438 let spec_assets: i64 =
439 sqlx::query_scalar("SELECT COUNT(*) FROM cached_assets WHERE kind = 'gemspec'")
440 .fetch_one(&self.pool)
441 .await
442 .context("counting gemspec assets")?;
443
444 let unique_gems: i64 =
445 sqlx::query_scalar("SELECT COUNT(DISTINCT name) FROM cached_assets WHERE kind = 'gem'")
446 .fetch_one(&self.pool)
447 .await
448 .context("counting unique gems")?;
449
450 let total_size_bytes: i64 =
451 sqlx::query_scalar("SELECT COALESCE(SUM(size_bytes), 0) FROM cached_assets")
452 .fetch_one(&self.pool)
453 .await
454 .context("summing cached asset sizes")?;
455
456 let last_accessed: Option<String> =
457 sqlx::query_scalar("SELECT MAX(last_accessed) FROM cached_assets")
458 .fetch_one(&self.pool)
459 .await
460 .context("fetching last access timestamp")?;
461
462 Ok(IndexStats {
463 total_assets: total_assets.max(0) as u64,
464 gem_assets: gem_assets.max(0) as u64,
465 spec_assets: spec_assets.max(0) as u64,
466 unique_gems: unique_gems.max(0) as u64,
467 total_size_bytes: total_size_bytes.max(0) as u64,
468 last_accessed,
469 })
470 }
471
472 async fn catalog_upsert_names(&self, names: &[String]) -> Result<()> {
473 if names.is_empty() {
474 return Ok(());
475 }
476 let mut tx = self.pool.begin().await?;
477 for name in names {
478 sqlx::query(
479 r#"
480 INSERT INTO catalog_gems(name, synced_at)
481 VALUES(?1, strftime('%Y-%m-%dT%H:%M:%fZ','now'))
482 ON CONFLICT(name) DO UPDATE SET synced_at = excluded.synced_at
483 "#,
484 )
485 .bind(name)
486 .execute(&mut *tx)
487 .await
488 .with_context(|| format!("upserting catalog entry {}", name))?;
489 }
490 tx.commit().await.context("committing catalog upsert")?;
491 Ok(())
492 }
493
494 async fn catalog_total(&self) -> Result<u64> {
495 let total: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM catalog_gems")
496 .fetch_one(&self.pool)
497 .await
498 .context("counting catalog gems")?;
499 Ok(total.max(0) as u64)
500 }
501
502 async fn catalog_page(&self, offset: i64, limit: i64) -> Result<Vec<String>> {
503 let rows = sqlx::query_scalar::<_, String>(
504 r#"
505 SELECT name
506 FROM catalog_gems
507 ORDER BY name
508 LIMIT ?1 OFFSET ?2
509 "#,
510 )
511 .bind(limit)
512 .bind(offset)
513 .fetch_all(&self.pool)
514 .await
515 .context("fetching catalog page")?;
516 Ok(rows)
517 }
518
519 async fn catalog_meta_get(&self, key: &str) -> Result<Option<String>> {
520 let value =
521 sqlx::query_scalar::<_, String>("SELECT value FROM catalog_meta WHERE key = ?1")
522 .bind(key)
523 .fetch_optional(&self.pool)
524 .await
525 .context("fetching catalog meta value")?;
526 Ok(value)
527 }
528
529 async fn catalog_meta_set(&self, key: &str, value: &str) -> Result<()> {
530 sqlx::query(
531 r#"
532 INSERT INTO catalog_meta(key, value)
533 VALUES(?1, ?2)
534 ON CONFLICT(key) DO UPDATE SET value = excluded.value
535 "#,
536 )
537 .bind(key)
538 .bind(value)
539 .execute(&self.pool)
540 .await
541 .context("upserting catalog meta value")?;
542 Ok(())
543 }
544
545 async fn upsert_metadata(&self, metadata: &GemMetadata) -> Result<()> {
546 self.upsert_gem_metadata_record(metadata).await
547 }
548
549 async fn gem_metadata(
550 &self,
551 name: &str,
552 version: &str,
553 platform: Option<&str>,
554 ) -> Result<Option<GemMetadata>> {
555 self.fetch_gem_metadata(name, version, platform).await
556 }
557
558 async fn sbom_coverage(&self) -> Result<SbomCoverage> {
559 let (total, with_sbom) = sqlx::query_as::<_, (i64, i64)>(
560 r#"
561 SELECT
562 COUNT(*) as total,
563 COALESCE(
564 SUM(CASE WHEN sbom_json IS NOT NULL AND sbom_json <> ''
565 THEN 1 ELSE 0 END),
566 0
567 ) as with_sbom
568 FROM gem_metadata
569 "#,
570 )
571 .fetch_one(&self.pool)
572 .await
573 .context("querying SBOM coverage (sqlite)")?;
574
575 Ok(SbomCoverage {
576 metadata_rows: total.max(0) as u64,
577 with_sbom: with_sbom.max(0) as u64,
578 })
579 }
580
581 async fn catalog_languages(&self) -> Result<Vec<String>> {
582 let rows = sqlx::query_scalar::<_, Option<String>>(
583 r#"
584 SELECT native_languages_json
585 FROM gem_metadata
586 WHERE native_languages_json IS NOT NULL AND native_languages_json <> ''
587 "#,
588 )
589 .fetch_all(&self.pool)
590 .await
591 .context("fetching native languages (sqlite)")?;
592
593 parse_language_rows(rows)
594 }
595
596 async fn catalog_page_by_language(
597 &self,
598 language: &str,
599 offset: i64,
600 limit: i64,
601 ) -> Result<Vec<String>> {
602 let pattern = format!("%\"{}\"%", language);
603 let rows = sqlx::query_scalar::<_, String>(
604 r#"
605 SELECT DISTINCT name
606 FROM gem_metadata
607 WHERE native_languages_json LIKE ?1
608 ORDER BY name
609 LIMIT ?2 OFFSET ?3
610 "#,
611 )
612 .bind(pattern)
613 .bind(limit)
614 .bind(offset)
615 .fetch_all(&self.pool)
616 .await
617 .context("fetching catalog page by language (sqlite)")?;
618 Ok(rows)
619 }
620
621 async fn catalog_total_by_language(&self, language: &str) -> Result<u64> {
622 let pattern = format!("%\"{}\"%", language);
623 let total: i64 = sqlx::query_scalar(
624 r#"
625 SELECT COUNT(DISTINCT name)
626 FROM gem_metadata
627 WHERE native_languages_json LIKE ?1
628 "#,
629 )
630 .bind(pattern)
631 .fetch_one(&self.pool)
632 .await
633 .context("counting catalog gems by language (sqlite)")?;
634 Ok(total.max(0) as u64)
635 }
636
637 async fn get_gem_version(
640 &self,
641 name: &str,
642 version: &str,
643 platform: Option<&str>,
644 ) -> Result<Option<GemVersion>> {
645 let row = sqlx::query_as::<_, GemVersionRow>(
646 r#"
647 SELECT id, name, version, platform, sha256, published_at, available_after,
648 status, status_reason, upstream_yanked, created_at, updated_at
649 FROM gem_versions
650 WHERE name = ?1
651 AND version = ?2
652 AND ((platform IS NULL AND ?3 IS NULL) OR platform = ?3)
653 "#,
654 )
655 .bind(name)
656 .bind(version)
657 .bind(platform)
658 .fetch_optional(&self.pool)
659 .await
660 .context("fetching gem version (sqlite)")?;
661
662 Ok(row.map(Into::into))
663 }
664
665 async fn upsert_gem_version(&self, gem_version: &GemVersion) -> Result<()> {
666 let now = Utc::now().to_rfc3339();
667
668 sqlx::query(
669 r#"
670 INSERT INTO gem_versions (
671 name, version, platform, sha256, published_at, available_after,
672 status, status_reason, upstream_yanked, created_at, updated_at
673 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
674 ON CONFLICT (name, version, platform)
675 DO UPDATE SET
676 sha256 = excluded.sha256,
677 published_at = excluded.published_at,
678 available_after = excluded.available_after,
679 status = excluded.status,
680 status_reason = excluded.status_reason,
681 upstream_yanked = excluded.upstream_yanked,
682 updated_at = excluded.updated_at
683 "#,
684 )
685 .bind(&gem_version.name)
686 .bind(&gem_version.version)
687 .bind(gem_version.platform.as_deref())
688 .bind(gem_version.sha256.as_deref())
689 .bind(gem_version.published_at.to_rfc3339())
690 .bind(gem_version.available_after.to_rfc3339())
691 .bind(gem_version.status.to_string())
692 .bind(gem_version.status_reason.as_deref())
693 .bind(gem_version.upstream_yanked)
694 .bind(gem_version.created_at.to_rfc3339())
695 .bind(&now)
696 .execute(&self.pool)
697 .await
698 .context("upserting gem version (sqlite)")?;
699
700 Ok(())
701 }
702
703 async fn get_latest_available_version(
704 &self,
705 name: &str,
706 platform: Option<&str>,
707 now: DateTime<Utc>,
708 ) -> Result<Option<GemVersion>> {
709 let now_str = now.to_rfc3339();
710
711 let rows = sqlx::query_as::<_, GemVersionRow>(
713 r#"
714 SELECT id, name, version, platform, sha256, published_at, available_after,
715 status, status_reason, upstream_yanked, created_at, updated_at
716 FROM gem_versions
717 WHERE name = ?1
718 AND ((platform IS NULL AND ?2 IS NULL) OR platform = ?2)
719 AND upstream_yanked = FALSE
720 AND (status = 'available' OR status = 'pinned'
721 OR (status = 'quarantine' AND available_after <= ?3))
722 "#,
723 )
724 .bind(name)
725 .bind(platform)
726 .bind(&now_str)
727 .fetch_all(&self.pool)
728 .await
729 .context("fetching available versions (sqlite)")?;
730
731 let mut versions: Vec<GemVersion> = rows.into_iter().map(Into::into).collect();
733 versions.sort_by(|a, b| {
734 compare_versions(&b.version, &a.version) });
736
737 Ok(versions.into_iter().next())
738 }
739
740 async fn get_quarantined_versions(
741 &self,
742 name: &str,
743 now: DateTime<Utc>,
744 ) -> Result<Vec<GemVersion>> {
745 let now_str = now.to_rfc3339();
746
747 let rows = sqlx::query_as::<_, GemVersionRow>(
748 r#"
749 SELECT id, name, version, platform, sha256, published_at, available_after,
750 status, status_reason, upstream_yanked, created_at, updated_at
751 FROM gem_versions
752 WHERE name = ?1
753 AND status = 'quarantine'
754 AND available_after > ?2
755 ORDER BY version DESC
756 "#,
757 )
758 .bind(name)
759 .bind(&now_str)
760 .fetch_all(&self.pool)
761 .await
762 .context("fetching quarantined versions (sqlite)")?;
763
764 Ok(rows.into_iter().map(Into::into).collect())
765 }
766
767 async fn update_version_status(
768 &self,
769 name: &str,
770 version: &str,
771 platform: Option<&str>,
772 status: VersionStatus,
773 reason: Option<String>,
774 ) -> Result<()> {
775 let now = Utc::now().to_rfc3339();
776
777 sqlx::query(
778 r#"
779 UPDATE gem_versions
780 SET status = ?1, status_reason = ?2, updated_at = ?3
781 WHERE name = ?4
782 AND version = ?5
783 AND ((platform IS NULL AND ?6 IS NULL) OR platform = ?6)
784 "#,
785 )
786 .bind(status.to_string())
787 .bind(reason)
788 .bind(&now)
789 .bind(name)
790 .bind(version)
791 .bind(platform)
792 .execute(&self.pool)
793 .await
794 .context("updating version status (sqlite)")?;
795
796 Ok(())
797 }
798
799 async fn promote_expired_quarantines(&self, now: DateTime<Utc>) -> Result<u64> {
800 let now_str = now.to_rfc3339();
801
802 let result = sqlx::query(
803 r#"
804 UPDATE gem_versions
805 SET status = 'available', status_reason = 'auto-promoted', updated_at = ?1
806 WHERE status = 'quarantine'
807 AND available_after <= ?2
808 "#,
809 )
810 .bind(&now_str)
811 .bind(&now_str)
812 .execute(&self.pool)
813 .await
814 .context("promoting expired quarantines (sqlite)")?;
815
816 Ok(result.rows_affected())
817 }
818
819 async fn mark_yanked(&self, name: &str, version: &str) -> Result<()> {
820 let now = Utc::now().to_rfc3339();
821
822 sqlx::query(
823 r#"
824 UPDATE gem_versions
825 SET status = 'yanked', upstream_yanked = TRUE, updated_at = ?1
826 WHERE name = ?2 AND version = ?3
827 "#,
828 )
829 .bind(&now)
830 .bind(name)
831 .bind(version)
832 .execute(&self.pool)
833 .await
834 .context("marking version yanked (sqlite)")?;
835
836 Ok(())
837 }
838
839 async fn get_all_quarantined(&self, limit: u32, offset: u32) -> Result<Vec<GemVersion>> {
840 let rows = sqlx::query_as::<_, GemVersionRow>(
841 r#"
842 SELECT id, name, version, platform, sha256, published_at, available_after,
843 status, status_reason, upstream_yanked, created_at, updated_at
844 FROM gem_versions
845 WHERE status = 'quarantine'
846 ORDER BY available_after ASC
847 LIMIT ?1 OFFSET ?2
848 "#,
849 )
850 .bind(limit)
851 .bind(offset)
852 .fetch_all(&self.pool)
853 .await
854 .context("fetching all quarantined (sqlite)")?;
855
856 Ok(rows.into_iter().map(Into::into).collect())
857 }
858
859 async fn quarantine_stats(&self) -> Result<QuarantineStats> {
860 let now = Utc::now();
861 let today_end = (now + Duration::days(1)).to_rfc3339();
862 let week_end = (now + Duration::days(7)).to_rfc3339();
863 let now_str = now.to_rfc3339();
864
865 let (quarantined, available, yanked, pinned): (i64, i64, i64, i64) = sqlx::query_as(
866 r#"
867 SELECT
868 COALESCE(SUM(CASE WHEN status = 'quarantine' THEN 1 ELSE 0 END), 0),
869 COALESCE(SUM(CASE WHEN status = 'available' THEN 1 ELSE 0 END), 0),
870 COALESCE(SUM(CASE WHEN status = 'yanked' THEN 1 ELSE 0 END), 0),
871 COALESCE(SUM(CASE WHEN status = 'pinned' THEN 1 ELSE 0 END), 0)
872 FROM gem_versions
873 "#,
874 )
875 .fetch_one(&self.pool)
876 .await
877 .context("fetching quarantine counts (sqlite)")?;
878
879 let releasing_today: i64 = sqlx::query_scalar(
880 r#"
881 SELECT COUNT(*)
882 FROM gem_versions
883 WHERE status = 'quarantine'
884 AND available_after > ?1
885 AND available_after <= ?2
886 "#,
887 )
888 .bind(&now_str)
889 .bind(&today_end)
890 .fetch_one(&self.pool)
891 .await
892 .context("counting versions releasing today (sqlite)")?;
893
894 let releasing_week: i64 = sqlx::query_scalar(
895 r#"
896 SELECT COUNT(*)
897 FROM gem_versions
898 WHERE status = 'quarantine'
899 AND available_after > ?1
900 AND available_after <= ?2
901 "#,
902 )
903 .bind(&now_str)
904 .bind(&week_end)
905 .fetch_one(&self.pool)
906 .await
907 .context("counting versions releasing this week (sqlite)")?;
908
909 Ok(QuarantineStats {
910 total_quarantined: quarantined.max(0) as u64,
911 total_available: available.max(0) as u64,
912 total_yanked: yanked.max(0) as u64,
913 total_pinned: pinned.max(0) as u64,
914 versions_releasing_today: releasing_today.max(0) as u64,
915 versions_releasing_this_week: releasing_week.max(0) as u64,
916 })
917 }
918
919 async fn get_gem_versions_for_index(&self, name: &str) -> Result<Vec<GemVersion>> {
920 let rows = sqlx::query_as::<_, GemVersionRow>(
921 r#"
922 SELECT id, name, version, platform, sha256, published_at, available_after,
923 status, status_reason, upstream_yanked, created_at, updated_at
924 FROM gem_versions
925 WHERE name = ?1
926 ORDER BY version DESC
927 "#,
928 )
929 .bind(name)
930 .fetch_all(&self.pool)
931 .await
932 .context("fetching gem versions for index (sqlite)")?;
933
934 Ok(rows.into_iter().map(Into::into).collect())
935 }
936
937 async fn quarantine_table_exists(&self) -> Result<bool> {
938 let exists: i64 = sqlx::query_scalar(
939 r#"
940 SELECT COUNT(*)
941 FROM sqlite_master
942 WHERE type = 'table' AND name = 'gem_versions'
943 "#,
944 )
945 .fetch_one(&self.pool)
946 .await
947 .context("checking quarantine table exists (sqlite)")?;
948
949 Ok(exists > 0)
950 }
951
952 async fn run_quarantine_migrations(&self) -> Result<()> {
953 sqlx::query(
955 r#"
956 CREATE TABLE IF NOT EXISTS gem_versions (
957 id INTEGER PRIMARY KEY AUTOINCREMENT,
958 name TEXT NOT NULL,
959 version TEXT NOT NULL,
960 platform TEXT,
961 sha256 TEXT,
962 published_at TEXT NOT NULL,
963 available_after TEXT NOT NULL,
964 status TEXT NOT NULL DEFAULT 'quarantine',
965 status_reason TEXT,
966 upstream_yanked INTEGER NOT NULL DEFAULT 0,
967 created_at TEXT NOT NULL,
968 updated_at TEXT NOT NULL,
969 UNIQUE(name, version, platform)
970 )
971 "#,
972 )
973 .execute(&self.pool)
974 .await
975 .context("creating gem_versions table (sqlite)")?;
976
977 sqlx::query(
979 "CREATE INDEX IF NOT EXISTS idx_gem_versions_name ON gem_versions(name)",
980 )
981 .execute(&self.pool)
982 .await
983 .context("creating name index (sqlite)")?;
984
985 sqlx::query(
986 "CREATE INDEX IF NOT EXISTS idx_gem_versions_status ON gem_versions(status)",
987 )
988 .execute(&self.pool)
989 .await
990 .context("creating status index (sqlite)")?;
991
992 sqlx::query(
993 "CREATE INDEX IF NOT EXISTS idx_gv_available ON gem_versions(available_after)",
994 )
995 .execute(&self.pool)
996 .await
997 .context("creating available_after index (sqlite)")?;
998
999 Ok(())
1000 }
1001}
1002
1003fn compare_versions(a: &str, b: &str) -> std::cmp::Ordering {
1005 match (semver::Version::parse(a), semver::Version::parse(b)) {
1007 (Ok(va), Ok(vb)) => va.cmp(&vb),
1008 _ => a.cmp(b),
1010 }
1011}