1use anyhow::{Context, Result};
2use chrono::{DateTime, Duration, Utc};
3use sqlx::{
4 PgPool, Transaction,
5 postgres::{PgPoolOptions, Postgres},
6};
7
8use super::{
9 CacheBackend, GemVersion, QuarantineStats, VersionStatus,
10 models::{DbGemMetadataRow, PostgresCachedAssetRow, PostgresGemVersionRow, format_timestamp},
11 serialization::{hydrate_metadata_row, parse_language_rows, prepare_metadata_strings},
12 types::{AssetKey, CachedAsset, GemMetadata, IndexStats, SbomCoverage},
13};
14
15#[derive(Debug, Clone)]
16pub struct PostgresCacheBackend {
17 pool: PgPool,
18}
19
20impl PostgresCacheBackend {
21 pub async fn connect(url: &str, max_connections: u32) -> Result<Self> {
22 let pool = PgPoolOptions::new()
23 .max_connections(max_connections.max(1))
24 .connect(url)
25 .await
26 .with_context(|| format!("connecting to postgres database {}", url))?;
27 Ok(Self { pool })
28 }
29
30 async fn touch(&self, key: &AssetKey<'_>) -> Result<()> {
31 sqlx::query(
32 r#"
33 UPDATE cached_assets
34 SET last_accessed = NOW()
35 WHERE kind = $1 AND name = $2 AND version = $3 AND
36 ((platform IS NULL AND $4 IS NULL) OR platform = $4)
37 "#,
38 )
39 .bind(key.kind.as_str())
40 .bind(key.name)
41 .bind(key.version)
42 .bind(key.platform)
43 .execute(&self.pool)
44 .await
45 .context("updating last_accessed (postgres)")?;
46 Ok(())
47 }
48
49 pub async fn fetch_gem_metadata(
50 &self,
51 name: &str,
52 version: &str,
53 platform: Option<&str>,
54 ) -> Result<Option<GemMetadata>> {
55 let record = sqlx::query_as::<_, DbGemMetadataRow>(
56 r#"
57 SELECT
58 name,
59 version,
60 platform,
61 summary,
62 description,
63 licenses,
64 authors,
65 emails,
66 homepage,
67 documentation_url,
68 changelog_url,
69 source_code_url,
70 bug_tracker_url,
71 wiki_url,
72 funding_url,
73 metadata_json,
74 dependencies_json,
75 executables_json,
76 extensions_json,
77 native_languages_json,
78 has_native_extensions,
79 has_embedded_binaries,
80 required_ruby_version,
81 required_rubygems_version,
82 rubygems_version,
83 specification_version,
84 built_at,
85 size_bytes,
86 sha256,
87 sbom_json
88 FROM gem_metadata
89 WHERE name = $1
90 AND version = $2
91 AND ((platform IS NULL AND $3 IS NULL) OR platform = $3)
92 "#,
93 )
94 .bind(name)
95 .bind(version)
96 .bind(platform)
97 .fetch_optional(&self.pool)
98 .await
99 .context("fetching gem metadata record (postgres)")?;
100
101 match record {
102 Some(row) => hydrate_metadata_row(row).map(Some),
103 None => Ok(None),
104 }
105 }
106
107 pub async fn sbom_coverage_stats(&self) -> Result<SbomCoverage> {
108 let (total, with_sbom) = sqlx::query_as::<_, (i64, i64)>(
109 r#"
110 SELECT
111 COUNT(*) as total,
112 COALESCE(
113 SUM(CASE WHEN sbom_json IS NOT NULL AND sbom_json <> ''
114 THEN 1 ELSE 0 END),
115 0
116 ) as with_sbom
117 FROM gem_metadata
118 "#,
119 )
120 .fetch_one(&self.pool)
121 .await
122 .context("querying SBOM coverage (postgres)")?;
123
124 Ok(SbomCoverage {
125 metadata_rows: total.max(0) as u64,
126 with_sbom: with_sbom.max(0) as u64,
127 })
128 }
129
130 pub async fn catalog_languages_list(&self) -> Result<Vec<String>> {
131 let rows = sqlx::query_scalar::<_, Option<String>>(
132 r#"
133 SELECT native_languages_json
134 FROM gem_metadata
135 WHERE native_languages_json IS NOT NULL AND native_languages_json <> ''
136 "#,
137 )
138 .fetch_all(&self.pool)
139 .await
140 .context("fetching native languages (postgres)")?;
141
142 parse_language_rows(rows)
143 }
144
145 async fn begin_tx(&self) -> Result<Transaction<'_, Postgres>> {
146 self.pool
147 .begin()
148 .await
149 .context("starting postgres transaction")
150 }
151
152 pub async fn upsert_gem_metadata_record(&self, metadata: &GemMetadata) -> Result<()> {
153 let prepared = prepare_metadata_strings(metadata)?;
154
155 sqlx::query(
156 r#"
157 INSERT INTO gem_metadata(
158 name,
159 version,
160 platform,
161 summary,
162 description,
163 licenses,
164 authors,
165 emails,
166 homepage,
167 documentation_url,
168 changelog_url,
169 source_code_url,
170 bug_tracker_url,
171 wiki_url,
172 funding_url,
173 metadata_json,
174 dependencies_json,
175 executables_json,
176 extensions_json,
177 native_languages_json,
178 has_native_extensions,
179 has_embedded_binaries,
180 required_ruby_version,
181 required_rubygems_version,
182 rubygems_version,
183 specification_version,
184 built_at,
185 size_bytes,
186 sha256,
187 sbom_json
188 ) VALUES (
189 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
190 $11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
191 $21, $22, $23, $24, $25, $26, $27, $28, $29, $30
192 )
193 ON CONFLICT ON CONSTRAINT gem_metadata_pkey
194 DO UPDATE SET
195 summary = EXCLUDED.summary,
196 description = EXCLUDED.description,
197 licenses = EXCLUDED.licenses,
198 authors = EXCLUDED.authors,
199 emails = EXCLUDED.emails,
200 homepage = EXCLUDED.homepage,
201 documentation_url = EXCLUDED.documentation_url,
202 changelog_url = EXCLUDED.changelog_url,
203 source_code_url = EXCLUDED.source_code_url,
204 bug_tracker_url = EXCLUDED.bug_tracker_url,
205 wiki_url = EXCLUDED.wiki_url,
206 funding_url = EXCLUDED.funding_url,
207 metadata_json = EXCLUDED.metadata_json,
208 dependencies_json = EXCLUDED.dependencies_json,
209 executables_json = EXCLUDED.executables_json,
210 extensions_json = EXCLUDED.extensions_json,
211 native_languages_json = EXCLUDED.native_languages_json,
212 has_native_extensions = EXCLUDED.has_native_extensions,
213 has_embedded_binaries = EXCLUDED.has_embedded_binaries,
214 required_ruby_version = EXCLUDED.required_ruby_version,
215 required_rubygems_version = EXCLUDED.required_rubygems_version,
216 rubygems_version = EXCLUDED.rubygems_version,
217 specification_version = EXCLUDED.specification_version,
218 built_at = EXCLUDED.built_at,
219 size_bytes = EXCLUDED.size_bytes,
220 sha256 = EXCLUDED.sha256,
221 sbom_json = EXCLUDED.sbom_json
222 "#,
223 )
224 .bind(&metadata.name)
225 .bind(&metadata.version)
226 .bind(metadata.platform.as_deref())
227 .bind(metadata.summary.as_deref())
228 .bind(metadata.description.as_deref())
229 .bind(prepared.licenses_json)
230 .bind(prepared.authors_json)
231 .bind(prepared.emails_json)
232 .bind(metadata.homepage.as_deref())
233 .bind(metadata.documentation_url.as_deref())
234 .bind(metadata.changelog_url.as_deref())
235 .bind(metadata.source_code_url.as_deref())
236 .bind(metadata.bug_tracker_url.as_deref())
237 .bind(metadata.wiki_url.as_deref())
238 .bind(metadata.funding_url.as_deref())
239 .bind(prepared.metadata_json)
240 .bind(prepared.dependencies_json)
241 .bind(prepared.executables_json)
242 .bind(prepared.extensions_json)
243 .bind(prepared.native_languages_json)
244 .bind(metadata.has_native_extensions)
245 .bind(metadata.has_embedded_binaries)
246 .bind(metadata.required_ruby_version.as_deref())
247 .bind(metadata.required_rubygems_version.as_deref())
248 .bind(metadata.rubygems_version.as_deref())
249 .bind(metadata.specification_version)
250 .bind(metadata.built_at.as_deref())
251 .bind(prepared.size_bytes)
252 .bind(&metadata.sha256)
253 .bind(prepared.sbom_json)
254 .execute(&self.pool)
255 .await
256 .context("upserting gem metadata (postgres)")?;
257
258 Ok(())
259 }
260}
261
262impl CacheBackend for PostgresCacheBackend {
263 async fn get(&self, key: &AssetKey<'_>) -> Result<Option<CachedAsset>> {
264 let record = sqlx::query_as::<_, PostgresCachedAssetRow>(
265 r#"
266 SELECT path, sha256, size_bytes, last_accessed
267 FROM cached_assets
268 WHERE kind = $1 AND name = $2 AND version = $3 AND
269 ((platform IS NULL AND $4 IS NULL) OR platform = $4)
270 "#,
271 )
272 .bind(key.kind.as_str())
273 .bind(key.name)
274 .bind(key.version)
275 .bind(key.platform)
276 .fetch_optional(&self.pool)
277 .await
278 .context("fetching cached asset (postgres)")?;
279
280 if let Some(row) = record {
281 self.touch(key).await?;
282 Ok(Some(row.into()))
283 } else {
284 Ok(None)
285 }
286 }
287
288 async fn insert_or_replace(
289 &self,
290 key: &AssetKey<'_>,
291 path: &str,
292 sha256: &str,
293 size_bytes: u64,
294 ) -> Result<()> {
295 sqlx::query(
296 r#"
297 INSERT INTO cached_assets(
298 kind, name, version, platform, path, sha256, size_bytes, last_accessed
299 )
300 VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())
301 ON CONFLICT ON CONSTRAINT cached_assets_unique
302 DO UPDATE SET
303 path = EXCLUDED.path,
304 sha256 = EXCLUDED.sha256,
305 size_bytes = EXCLUDED.size_bytes,
306 last_accessed = EXCLUDED.last_accessed
307 "#,
308 )
309 .bind(key.kind.as_str())
310 .bind(key.name)
311 .bind(key.version)
312 .bind(key.platform)
313 .bind(path)
314 .bind(sha256)
315 .bind(size_bytes as i64)
316 .execute(&self.pool)
317 .await
318 .context("inserting cached asset (postgres)")?;
319 Ok(())
320 }
321
322 async fn get_all_gems(&self) -> Result<Vec<(String, String)>> {
323 let rows = sqlx::query_as::<_, (String, String)>(
324 r#"
325 SELECT DISTINCT name, version
326 FROM cached_assets
327 WHERE kind = 'gem'
328 ORDER BY name, version
329 "#,
330 )
331 .fetch_all(&self.pool)
332 .await
333 .context("fetching all gems (postgres)")?;
334
335 Ok(rows)
336 }
337
338 async fn stats(&self) -> Result<IndexStats> {
339 let total_assets: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM cached_assets")
340 .fetch_one(&self.pool)
341 .await
342 .context("counting cached assets (postgres)")?;
343
344 let gem_assets: i64 =
345 sqlx::query_scalar("SELECT COUNT(*) FROM cached_assets WHERE kind = 'gem'")
346 .fetch_one(&self.pool)
347 .await
348 .context("counting gem assets (postgres)")?;
349
350 let spec_assets: i64 =
351 sqlx::query_scalar("SELECT COUNT(*) FROM cached_assets WHERE kind = 'gemspec'")
352 .fetch_one(&self.pool)
353 .await
354 .context("counting gemspec assets (postgres)")?;
355
356 let unique_gems: i64 =
357 sqlx::query_scalar("SELECT COUNT(DISTINCT name) FROM cached_assets WHERE kind = 'gem'")
358 .fetch_one(&self.pool)
359 .await
360 .context("counting unique gems (postgres)")?;
361
362 let total_size_bytes: i64 =
363 sqlx::query_scalar("SELECT COALESCE(SUM(size_bytes), 0) FROM cached_assets")
364 .fetch_one(&self.pool)
365 .await
366 .context("summing cached asset sizes (postgres)")?;
367
368 let last_accessed: Option<DateTime<Utc>> =
369 sqlx::query_scalar("SELECT MAX(last_accessed) FROM cached_assets")
370 .fetch_one(&self.pool)
371 .await
372 .context("fetching last access timestamp (postgres)")?;
373
374 Ok(IndexStats {
375 total_assets: total_assets.max(0) as u64,
376 gem_assets: gem_assets.max(0) as u64,
377 spec_assets: spec_assets.max(0) as u64,
378 unique_gems: unique_gems.max(0) as u64,
379 total_size_bytes: total_size_bytes.max(0) as u64,
380 last_accessed: last_accessed.map(format_timestamp),
381 })
382 }
383
384 async fn catalog_upsert_names(&self, names: &[String]) -> Result<()> {
385 if names.is_empty() {
386 return Ok(());
387 }
388 let mut tx = self.begin_tx().await?;
389 for name in names {
390 sqlx::query(
391 r#"
392 INSERT INTO catalog_gems(name, synced_at)
393 VALUES($1, NOW())
394 ON CONFLICT(name) DO UPDATE SET synced_at = EXCLUDED.synced_at
395 "#,
396 )
397 .bind(name)
398 .execute(&mut *tx)
399 .await
400 .with_context(|| format!("upserting catalog entry {} (postgres)", name))?;
401 }
402 tx.commit()
403 .await
404 .context("committing catalog upsert (postgres)")?;
405 Ok(())
406 }
407
408 async fn catalog_total(&self) -> Result<u64> {
409 let total: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM catalog_gems")
410 .fetch_one(&self.pool)
411 .await
412 .context("counting catalog gems (postgres)")?;
413 Ok(total.max(0) as u64)
414 }
415
416 async fn catalog_page(&self, offset: i64, limit: i64) -> Result<Vec<String>> {
417 let rows = sqlx::query_scalar::<_, String>(
418 r#"
419 SELECT name
420 FROM catalog_gems
421 ORDER BY name
422 LIMIT $1 OFFSET $2
423 "#,
424 )
425 .bind(limit)
426 .bind(offset)
427 .fetch_all(&self.pool)
428 .await
429 .context("fetching catalog page (postgres)")?;
430 Ok(rows)
431 }
432
433 async fn catalog_meta_get(&self, key: &str) -> Result<Option<String>> {
434 let value =
435 sqlx::query_scalar::<_, String>("SELECT value FROM catalog_meta WHERE key = $1")
436 .bind(key)
437 .fetch_optional(&self.pool)
438 .await
439 .context("fetching catalog meta value (postgres)")?;
440 Ok(value)
441 }
442
443 async fn catalog_meta_set(&self, key: &str, value: &str) -> Result<()> {
444 sqlx::query(
445 r#"
446 INSERT INTO catalog_meta(key, value)
447 VALUES($1, $2)
448 ON CONFLICT(key) DO UPDATE SET value = EXCLUDED.value
449 "#,
450 )
451 .bind(key)
452 .bind(value)
453 .execute(&self.pool)
454 .await
455 .context("upserting catalog meta value (postgres)")?;
456 Ok(())
457 }
458
459 async fn upsert_metadata(&self, metadata: &GemMetadata) -> Result<()> {
460 self.upsert_gem_metadata_record(metadata).await
461 }
462
463 async fn gem_metadata(
464 &self,
465 name: &str,
466 version: &str,
467 platform: Option<&str>,
468 ) -> Result<Option<GemMetadata>> {
469 self.fetch_gem_metadata(name, version, platform).await
470 }
471
472 async fn sbom_coverage(&self) -> Result<SbomCoverage> {
473 self.sbom_coverage_stats().await
474 }
475
476 async fn catalog_languages(&self) -> Result<Vec<String>> {
477 self.catalog_languages_list().await
478 }
479
480 async fn catalog_page_by_language(
481 &self,
482 language: &str,
483 offset: i64,
484 limit: i64,
485 ) -> Result<Vec<String>> {
486 let pattern = format!("%\"{}\"%", language);
487 let rows = sqlx::query_scalar::<_, String>(
488 r#"
489 SELECT DISTINCT name
490 FROM gem_metadata
491 WHERE native_languages_json LIKE $1
492 ORDER BY name
493 LIMIT $2 OFFSET $3
494 "#,
495 )
496 .bind(pattern)
497 .bind(limit)
498 .bind(offset)
499 .fetch_all(&self.pool)
500 .await
501 .context("fetching catalog page by language (postgres)")?;
502 Ok(rows)
503 }
504
505 async fn catalog_total_by_language(&self, language: &str) -> Result<u64> {
506 let pattern = format!("%\"{}\"%", language);
507 let total: i64 = sqlx::query_scalar(
508 r#"
509 SELECT COUNT(DISTINCT name)
510 FROM gem_metadata
511 WHERE native_languages_json LIKE $1
512 "#,
513 )
514 .bind(pattern)
515 .fetch_one(&self.pool)
516 .await
517 .context("counting catalog gems by language (postgres)")?;
518 Ok(total.max(0) as u64)
519 }
520
521 async fn get_gem_version(
524 &self,
525 name: &str,
526 version: &str,
527 platform: Option<&str>,
528 ) -> Result<Option<GemVersion>> {
529 let row = sqlx::query_as::<_, PostgresGemVersionRow>(
530 r#"
531 SELECT id, name, version, platform, sha256, published_at, available_after,
532 status, status_reason, upstream_yanked, created_at, updated_at
533 FROM gem_versions
534 WHERE name = $1
535 AND version = $2
536 AND ((platform IS NULL AND $3 IS NULL) OR platform = $3)
537 "#,
538 )
539 .bind(name)
540 .bind(version)
541 .bind(platform)
542 .fetch_optional(&self.pool)
543 .await
544 .context("fetching gem version (postgres)")?;
545
546 Ok(row.map(Into::into))
547 }
548
549 async fn upsert_gem_version(&self, gem_version: &GemVersion) -> Result<()> {
550 sqlx::query(
551 r#"
552 INSERT INTO gem_versions (
553 name, version, platform, sha256, published_at, available_after,
554 status, status_reason, upstream_yanked, created_at, updated_at
555 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW())
556 ON CONFLICT ON CONSTRAINT gem_versions_unique
557 DO UPDATE SET
558 sha256 = EXCLUDED.sha256,
559 published_at = EXCLUDED.published_at,
560 available_after = EXCLUDED.available_after,
561 status = EXCLUDED.status,
562 status_reason = EXCLUDED.status_reason,
563 upstream_yanked = EXCLUDED.upstream_yanked,
564 updated_at = NOW()
565 "#,
566 )
567 .bind(&gem_version.name)
568 .bind(&gem_version.version)
569 .bind(gem_version.platform.as_deref())
570 .bind(gem_version.sha256.as_deref())
571 .bind(gem_version.published_at)
572 .bind(gem_version.available_after)
573 .bind(gem_version.status.to_string())
574 .bind(gem_version.status_reason.as_deref())
575 .bind(gem_version.upstream_yanked)
576 .bind(gem_version.created_at)
577 .execute(&self.pool)
578 .await
579 .context("upserting gem version (postgres)")?;
580
581 Ok(())
582 }
583
584 async fn get_latest_available_version(
585 &self,
586 name: &str,
587 platform: Option<&str>,
588 now: DateTime<Utc>,
589 ) -> Result<Option<GemVersion>> {
590 let rows = sqlx::query_as::<_, PostgresGemVersionRow>(
592 r#"
593 SELECT id, name, version, platform, sha256, published_at, available_after,
594 status, status_reason, upstream_yanked, created_at, updated_at
595 FROM gem_versions
596 WHERE name = $1
597 AND ((platform IS NULL AND $2 IS NULL) OR platform = $2)
598 AND upstream_yanked = FALSE
599 AND (status = 'available' OR status = 'pinned'
600 OR (status = 'quarantine' AND available_after <= $3))
601 "#,
602 )
603 .bind(name)
604 .bind(platform)
605 .bind(now)
606 .fetch_all(&self.pool)
607 .await
608 .context("fetching available versions (postgres)")?;
609
610 let mut versions: Vec<GemVersion> = rows.into_iter().map(Into::into).collect();
612 versions.sort_by(|a, b| compare_versions(&b.version, &a.version));
613
614 Ok(versions.into_iter().next())
615 }
616
617 async fn get_quarantined_versions(
618 &self,
619 name: &str,
620 now: DateTime<Utc>,
621 ) -> Result<Vec<GemVersion>> {
622 let rows = sqlx::query_as::<_, PostgresGemVersionRow>(
623 r#"
624 SELECT id, name, version, platform, sha256, published_at, available_after,
625 status, status_reason, upstream_yanked, created_at, updated_at
626 FROM gem_versions
627 WHERE name = $1
628 AND status = 'quarantine'
629 AND available_after > $2
630 ORDER BY version DESC
631 "#,
632 )
633 .bind(name)
634 .bind(now)
635 .fetch_all(&self.pool)
636 .await
637 .context("fetching quarantined versions (postgres)")?;
638
639 Ok(rows.into_iter().map(Into::into).collect())
640 }
641
642 async fn update_version_status(
643 &self,
644 name: &str,
645 version: &str,
646 platform: Option<&str>,
647 status: VersionStatus,
648 reason: Option<String>,
649 ) -> Result<()> {
650 sqlx::query(
651 r#"
652 UPDATE gem_versions
653 SET status = $1, status_reason = $2, updated_at = NOW()
654 WHERE name = $3
655 AND version = $4
656 AND ((platform IS NULL AND $5 IS NULL) OR platform = $5)
657 "#,
658 )
659 .bind(status.to_string())
660 .bind(reason)
661 .bind(name)
662 .bind(version)
663 .bind(platform)
664 .execute(&self.pool)
665 .await
666 .context("updating version status (postgres)")?;
667
668 Ok(())
669 }
670
671 async fn promote_expired_quarantines(&self, now: DateTime<Utc>) -> Result<u64> {
672 let result = sqlx::query(
673 r#"
674 UPDATE gem_versions
675 SET status = 'available', status_reason = 'auto-promoted', updated_at = NOW()
676 WHERE status = 'quarantine'
677 AND available_after <= $1
678 "#,
679 )
680 .bind(now)
681 .execute(&self.pool)
682 .await
683 .context("promoting expired quarantines (postgres)")?;
684
685 Ok(result.rows_affected())
686 }
687
688 async fn mark_yanked(&self, name: &str, version: &str) -> Result<()> {
689 sqlx::query(
690 r#"
691 UPDATE gem_versions
692 SET status = 'yanked', upstream_yanked = TRUE, updated_at = NOW()
693 WHERE name = $1 AND version = $2
694 "#,
695 )
696 .bind(name)
697 .bind(version)
698 .execute(&self.pool)
699 .await
700 .context("marking version yanked (postgres)")?;
701
702 Ok(())
703 }
704
705 async fn get_all_quarantined(&self, limit: u32, offset: u32) -> Result<Vec<GemVersion>> {
706 let rows = sqlx::query_as::<_, PostgresGemVersionRow>(
707 r#"
708 SELECT id, name, version, platform, sha256, published_at, available_after,
709 status, status_reason, upstream_yanked, created_at, updated_at
710 FROM gem_versions
711 WHERE status = 'quarantine'
712 ORDER BY available_after ASC
713 LIMIT $1 OFFSET $2
714 "#,
715 )
716 .bind(limit as i64)
717 .bind(offset as i64)
718 .fetch_all(&self.pool)
719 .await
720 .context("fetching all quarantined (postgres)")?;
721
722 Ok(rows.into_iter().map(Into::into).collect())
723 }
724
725 async fn quarantine_stats(&self) -> Result<QuarantineStats> {
726 let now = Utc::now();
727 let today_end = now + Duration::days(1);
728 let week_end = now + Duration::days(7);
729
730 let (quarantined, available, yanked, pinned): (i64, i64, i64, i64) = sqlx::query_as(
731 r#"
732 SELECT
733 COALESCE(SUM(CASE WHEN status = 'quarantine' THEN 1 ELSE 0 END), 0),
734 COALESCE(SUM(CASE WHEN status = 'available' THEN 1 ELSE 0 END), 0),
735 COALESCE(SUM(CASE WHEN status = 'yanked' THEN 1 ELSE 0 END), 0),
736 COALESCE(SUM(CASE WHEN status = 'pinned' THEN 1 ELSE 0 END), 0)
737 FROM gem_versions
738 "#,
739 )
740 .fetch_one(&self.pool)
741 .await
742 .context("fetching quarantine counts (postgres)")?;
743
744 let releasing_today: i64 = sqlx::query_scalar(
745 r#"
746 SELECT COUNT(*)
747 FROM gem_versions
748 WHERE status = 'quarantine'
749 AND available_after > $1
750 AND available_after <= $2
751 "#,
752 )
753 .bind(now)
754 .bind(today_end)
755 .fetch_one(&self.pool)
756 .await
757 .context("counting versions releasing today (postgres)")?;
758
759 let releasing_week: i64 = sqlx::query_scalar(
760 r#"
761 SELECT COUNT(*)
762 FROM gem_versions
763 WHERE status = 'quarantine'
764 AND available_after > $1
765 AND available_after <= $2
766 "#,
767 )
768 .bind(now)
769 .bind(week_end)
770 .fetch_one(&self.pool)
771 .await
772 .context("counting versions releasing this week (postgres)")?;
773
774 Ok(QuarantineStats {
775 total_quarantined: quarantined.max(0) as u64,
776 total_available: available.max(0) as u64,
777 total_yanked: yanked.max(0) as u64,
778 total_pinned: pinned.max(0) as u64,
779 versions_releasing_today: releasing_today.max(0) as u64,
780 versions_releasing_this_week: releasing_week.max(0) as u64,
781 })
782 }
783
784 async fn get_gem_versions_for_index(&self, name: &str) -> Result<Vec<GemVersion>> {
785 let rows = sqlx::query_as::<_, PostgresGemVersionRow>(
786 r#"
787 SELECT id, name, version, platform, sha256, published_at, available_after,
788 status, status_reason, upstream_yanked, created_at, updated_at
789 FROM gem_versions
790 WHERE name = $1
791 ORDER BY version DESC
792 "#,
793 )
794 .bind(name)
795 .fetch_all(&self.pool)
796 .await
797 .context("fetching gem versions for index (postgres)")?;
798
799 Ok(rows.into_iter().map(Into::into).collect())
800 }
801
802 async fn quarantine_table_exists(&self) -> Result<bool> {
803 let exists: bool = sqlx::query_scalar(
804 r#"
805 SELECT EXISTS (
806 SELECT FROM information_schema.tables
807 WHERE table_name = 'gem_versions'
808 )
809 "#,
810 )
811 .fetch_one(&self.pool)
812 .await
813 .context("checking quarantine table exists (postgres)")?;
814
815 Ok(exists)
816 }
817
818 async fn run_quarantine_migrations(&self) -> Result<()> {
819 sqlx::query(
821 r#"
822 CREATE TABLE IF NOT EXISTS gem_versions (
823 id BIGSERIAL PRIMARY KEY,
824 name TEXT NOT NULL,
825 version TEXT NOT NULL,
826 platform TEXT,
827 sha256 TEXT,
828 published_at TIMESTAMPTZ NOT NULL,
829 available_after TIMESTAMPTZ NOT NULL,
830 status TEXT NOT NULL DEFAULT 'quarantine',
831 status_reason TEXT,
832 upstream_yanked BOOLEAN NOT NULL DEFAULT FALSE,
833 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
834 updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
835 CONSTRAINT gem_versions_unique UNIQUE (name, version, platform)
836 )
837 "#,
838 )
839 .execute(&self.pool)
840 .await
841 .context("creating gem_versions table (postgres)")?;
842
843 sqlx::query(
845 "CREATE INDEX IF NOT EXISTS idx_gem_versions_name ON gem_versions(name)",
846 )
847 .execute(&self.pool)
848 .await
849 .context("creating name index (postgres)")?;
850
851 sqlx::query(
852 "CREATE INDEX IF NOT EXISTS idx_gem_versions_status ON gem_versions(status)",
853 )
854 .execute(&self.pool)
855 .await
856 .context("creating status index (postgres)")?;
857
858 sqlx::query(
859 "CREATE INDEX IF NOT EXISTS idx_gv_available ON gem_versions(available_after)",
860 )
861 .execute(&self.pool)
862 .await
863 .context("creating available_after index (postgres)")?;
864
865 Ok(())
866 }
867}
868
869fn compare_versions(a: &str, b: &str) -> std::cmp::Ordering {
871 match (semver::Version::parse(a), semver::Version::parse(b)) {
873 (Ok(va), Ok(vb)) => va.cmp(&vb),
874 _ => a.cmp(b),
876 }
877}