1use anyhow::Context;
2use async_trait::async_trait;
3use deadpool_postgres::tokio_postgres::{IsolationLevel, NoTls, Row, Statement};
4use deadpool_postgres::{Pool, Runtime};
5use freighter_api_types::index::request::{ListQuery, Publish};
6use freighter_api_types::index::response::{
7 CompletedPublication, CrateVersion, Dependency, ListAll, ListAllCrateEntry,
8 ListAllCrateVersion, SearchResults, SearchResultsEntry, SearchResultsMeta,
9};
10use freighter_api_types::index::{IndexError, IndexProvider, IndexResult};
11use futures_util::StreamExt;
12use metrics::histogram;
13use postgres_types::ToSql;
14use semver::{Version, VersionReq};
15use serde::Deserialize;
16use std::cmp::Ordering;
17use std::collections::HashMap;
18use std::future::Future;
19use std::pin::Pin;
20use std::time::Instant;
21
22pub struct PgIndexProvider {
23 pool: Pool,
24}
25
26impl PgIndexProvider {
27 pub fn new(config: Config) -> IndexResult<Self> {
28 let pool = config
29 .index_db
30 .create_pool(Some(Runtime::Tokio1), NoTls)
31 .context("Failed to create db pool")?;
32
33 Ok(Self { pool })
34 }
35
36 async fn yank_inner(&self, crate_name: &str, version: &Version, val: bool) -> IndexResult<()> {
37 let client = self.pool.get().await.unwrap();
38
39 let statement = client
40 .prepare_cached(include_str!("../sql/set-yank.sql"))
41 .await
42 .context("Failed to prepare yank/unyank statement")?;
43
44 let rows = client
45 .query(&statement, &[&crate_name, &version.to_string(), &val])
46 .await
47 .context("Failed to execute yank/unyank query")?;
48
49 assert!(rows.len() <= 1);
50
51 if rows.len() == 1 {
52 Ok(())
53 } else {
54 Err(IndexError::Conflict(
55 "Tried to set yank status to an identical status".to_string(),
56 ))
57 }
58 }
59}
60
61#[derive(Deserialize)]
62pub struct Config {
63 pub index_db: deadpool_postgres::Config,
64}
65
66#[async_trait]
67impl IndexProvider for PgIndexProvider {
68 type Config = Config;
69
70 async fn healthcheck(&self) -> anyhow::Result<()> {
71 let _ = self.pool.get().await?;
72 Ok(())
73 }
74
75 async fn get_sparse_entry(&self, crate_name: &str) -> IndexResult<Vec<CrateVersion>> {
76 let client = self.pool.get().await.map_err(anyhow::Error::from)?;
77
78 let (existential_statement, versions_statement, features_statement, dependencies_statement) =
80 tokio::try_join!(
81 client.prepare_cached(include_str!("../sql/sparse-index/get-crate.sql")),
82 client.prepare_cached(include_str!("../sql/sparse-index/get-versions.sql")),
83 client.prepare_cached(include_str!("../sql/sparse-index/get-features.sql")),
84 client.prepare_cached(include_str!("../sql/sparse-index/get-dependencies.sql"))
85 )
86 .context("Failed to prepare transaction")?;
87
88 let mut existential_rows = client
89 .query(&existential_statement, &[&crate_name])
90 .await
91 .context("Failed to query for crate existence")?;
92
93 assert!(existential_rows.len() < 2);
94
95 #[allow(clippy::single_match_else)]
96 match existential_rows.pop() {
97 Some(crate_row) => {
98 let id: i32 = crate_row.get("id");
99
100 let version_rows = client
102 .query(&versions_statement, &[&id])
103 .await
104 .context("Failed to query versions")?;
105
106 let mut versions = Vec::with_capacity(version_rows.len());
107
108 let mut version_queries = futures_util::stream::FuturesUnordered::new();
110
111 async fn query_version(
113 version_row: Row,
114 client: &deadpool_postgres::Client,
115 features_statement: &Statement,
116 dependencies_statement: &Statement,
117 ) -> anyhow::Result<(Row, Vec<Row>, Vec<Row>)> {
118 let version_id: i32 = version_row.get("id");
119
120 let version_id_query = [&version_id as &(dyn ToSql + Sync)];
122
123 let (features_row, dependencies_row) = tokio::try_join!(
125 client.query(features_statement, &version_id_query),
126 client.query(dependencies_statement, &version_id_query)
127 )
128 .context("Failed to query features or dependencies for crate")?;
129
130 Ok((version_row, features_row, dependencies_row))
131 }
132
133 for version_row in version_rows {
134 version_queries.push(query_version(
135 version_row,
136 &client,
137 &features_statement,
138 &dependencies_statement,
139 ));
140 }
141
142 while let Some(query_res) = version_queries.next().await {
143 let (version_row, feature_rows, dependency_rows) = query_res?;
144
145 let mut features = HashMap::with_capacity(feature_rows.len());
146 let mut deps = Vec::with_capacity(dependency_rows.len());
147
148 for feature_row in feature_rows {
149 features.insert(feature_row.get("name"), feature_row.get("values"));
150 }
151
152 for deps_row in dependency_rows {
153 let registry: Option<String> = deps_row.get("registry");
154
155 deps.push(Dependency {
156 name: deps_row.get("name"),
157 req: VersionReq::parse(deps_row.get("req"))
158 .context("Failed to parse dependency version req in db")?,
159 features: deps_row.get("features"),
160 optional: deps_row.get("optional"),
161 default_features: deps_row.get("default_features"),
162 target: deps_row.get("target"),
163 kind: deps_row.get("kind"),
164 registry: registry.filter(|x| !x.is_empty()),
165 package: deps_row.get("package"),
166 });
167 }
168
169 versions.push(CrateVersion {
170 name: crate_name.to_string(),
171 vers: Version::parse(version_row.get("version"))
172 .context("Failed to parse crate version in db")?,
173 deps,
174 cksum: version_row.get("cksum"),
175 features,
176 yanked: version_row.get("yanked"),
177 links: version_row.get("links"),
178 v: 2,
179 features2: HashMap::new(),
181 });
182 }
183
184 Ok(versions)
185 }
186 None => {
187 tracing::warn!("Returning 404 for crate index");
188 Err(IndexError::NotFound)
189 }
190 }
191 }
192
193 async fn confirm_existence(&self, crate_name: &str, version: &Version) -> IndexResult<bool> {
194 let client = self.pool.get().await.unwrap();
195
196 let statement = client
197 .prepare_cached(include_str!("../sql/confirm-existence.sql"))
198 .await
199 .context("Failed to prepare confirm existence statement")?;
200
201 let rows: Vec<Row> = client
202 .query(&statement, &[&crate_name, &version.to_string()])
203 .await
204 .context("Failed to execute existential confirmation query")?;
205
206 if let Some(row) = rows.first() {
207 Ok(row.get("yanked"))
208 } else {
209 Err(IndexError::NotFound)
210 }
211 }
212
213 async fn yank_crate(&self, crate_name: &str, version: &Version) -> IndexResult<()> {
214 self.yank_inner(crate_name, version, true).await
215 }
216
217 async fn unyank_crate(&self, crate_name: &str, version: &Version) -> IndexResult<()> {
218 self.yank_inner(crate_name, version, false).await
219 }
220
221 async fn search(&self, query_string: &str, limit: usize) -> IndexResult<SearchResults> {
222 let client = self.pool.get().await.unwrap();
223
224 let statement = client
225 .prepare_cached(include_str!("../sql/search.sql"))
226 .await
227 .context("Failed to prepare search statement")?;
228
229 let mut rows: Vec<Row> = client
230 .query(&statement, &[&query_string])
231 .await
232 .context("Failed to execute search query")?;
233
234 drop(client);
236
237 rows.sort_unstable_by(|a, b| {
242 match a.get::<_, i64>("count").cmp(&b.get::<_, i64>("count")) {
243 Ordering::Less => Ordering::Greater,
244 Ordering::Equal => a.get::<_, String>("name").cmp(&b.get::<_, String>("name")),
245 Ordering::Greater => Ordering::Less,
246 }
247 });
248
249 let total = rows.len();
250
251 let crates = rows.iter().take(limit).map(search_row_to_entry).collect();
253
254 let meta = SearchResultsMeta { total };
255
256 Ok(SearchResults { crates, meta })
257 }
258
259 #[allow(clippy::too_many_lines)]
262 async fn publish(
263 &self,
264 version: &Publish,
265 checksum: &str,
266 end_step: Pin<&mut (dyn Future<Output = IndexResult<()>> + Send)>,
267 ) -> IndexResult<CompletedPublication> {
268 let startup_timer = Instant::now();
269
270 let mut client = self
271 .pool
272 .get()
273 .await
274 .context("Failed to get client from pool")?;
275
276 let transaction = client
277 .build_transaction()
278 .isolation_level(IsolationLevel::ReadCommitted)
279 .start()
280 .await
281 .context("Failed to create publication transaction")?;
282
283 let (
284 get_or_insert_crate_statement,
285 insert_version_statement,
286 insert_dependency_statement,
287 insert_features_statement,
288 update_crate_statement,
289 get_crate_keywords_statement,
290 get_crate_categories_statement,
291 insert_keyword_statement,
292 insert_category_statement,
293 insert_crate_keyword_statement,
294 insert_crate_category_statement,
295 remove_crate_keyword_statement,
296 remove_crate_category_statement,
297 ) = tokio::try_join!(
298 transaction.prepare_cached(include_str!("../sql/publish/get-or-insert-crate.sql")),
299 transaction.prepare_cached(include_str!("../sql/publish/insert-version.sql")),
300 transaction.prepare_cached(include_str!("../sql/publish/insert-dependency.sql")),
301 transaction.prepare_cached(include_str!("../sql/publish/insert-features.sql")),
302 transaction.prepare_cached(include_str!("../sql/publish/update-crate.sql")),
303 transaction.prepare_cached(include_str!("../sql/publish/get-crate-keywords.sql")),
304 transaction.prepare_cached(include_str!("../sql/publish/get-crate-categories.sql")),
305 transaction.prepare_cached(include_str!("../sql/publish/insert-keyword.sql")),
306 transaction.prepare_cached(include_str!("../sql/publish/insert-category.sql")),
307 transaction.prepare_cached(include_str!("../sql/publish/insert-crate-keyword.sql")),
308 transaction.prepare_cached(include_str!("../sql/publish/insert-crate-category.sql")),
309 transaction.prepare_cached(include_str!("../sql/publish/remove-crate-keyword.sql")),
310 transaction.prepare_cached(include_str!("../sql/publish/remove-crate-category.sql")),
311 )
312 .context("Failed to prepare statements for publish transaction")?;
313
314 histogram!(
315 "freighter_publish_component_duration_seconds",
316 "component" => "startup"
317 )
318 .record(startup_timer.elapsed());
319
320 let crate_timer = Instant::now();
321
322 let crate_row = transaction
323 .query_one(&get_or_insert_crate_statement, &[&version.name])
324 .await
325 .context("Crate get or insert failed")?;
326
327 let crate_id: i32 = crate_row.get("id");
328
329 if version.description != crate_row.get("description")
331 || version.documentation != crate_row.get("documentation")
332 || version.homepage != crate_row.get("homepage")
333 || version.repository != crate_row.get("repository")
334 {
335 transaction
336 .query(
337 &update_crate_statement,
338 &[
339 &crate_id,
340 &version.description,
341 &version.documentation,
342 &version.homepage,
343 &version.repository,
344 ],
345 )
346 .await
347 .context("Failed to update crate with new information")?;
348 }
349
350 histogram!(
351 "publish_component_duration_seconds",
352 "component" => "crate"
353 )
354 .record(crate_timer.elapsed());
355
356 let get_keycat_timer = Instant::now();
357
358 let crate_keywords = transaction
359 .query(&get_crate_keywords_statement, &[&crate_id])
360 .await
361 .context("Failed to fetch crate keywords")?
362 .iter()
363 .map(|x| x.get("name"))
364 .collect::<Vec<String>>();
365
366 let crate_categories = transaction
367 .query(&get_crate_categories_statement, &[&crate_id])
368 .await
369 .context("Failed to fetch crate categories")?
370 .iter()
371 .map(|x| x.get("name"))
372 .collect::<Vec<String>>();
373
374 histogram!(
375 "publish_component_duration_seconds",
376 "component" => "get_keycat"
377 )
378 .record(get_keycat_timer.elapsed());
379
380 let add_to_keycat_timer = Instant::now();
383
384 for k in &version.keywords {
385 if !crate_keywords.contains(k) {
386 let keyword_id: i32 = transaction
387 .query_one(&insert_keyword_statement, &[k])
388 .await
389 .context("Failed to insert keyword")?
390 .get("id");
391
392 transaction
393 .query(&insert_crate_keyword_statement, &[&crate_id, &keyword_id])
394 .await
395 .context("Failed to insert crate_keyword")?;
396 }
397 }
398
399 for c in &version.categories {
400 if !crate_categories.contains(c) {
401 let category_id: i32 = transaction
402 .query_one(&insert_category_statement, &[c])
403 .await
404 .context("Failed to insert category")?
405 .get("id");
406
407 transaction
408 .query(&insert_crate_category_statement, &[&crate_id, &category_id])
409 .await
410 .context("Failed to insert crate_category")?;
411 }
412 }
413
414 histogram!(
415 "publish_component_duration_seconds",
416 "component" => "add_to_keycat"
417 )
418 .record(add_to_keycat_timer.elapsed());
419
420 let prune_keycat_timer = Instant::now();
423
424 for k in &crate_keywords {
425 if !version.keywords.contains(k) {
426 transaction
427 .query(&remove_crate_keyword_statement, &[&crate_id, k])
428 .await
429 .context("Failed to remove crate_keyword")?;
430 }
431 }
432
433 for c in &crate_categories {
434 if !version.categories.contains(c) {
435 transaction
436 .query(&remove_crate_category_statement, &[&crate_id, c])
437 .await
438 .context("Failed to remove crate_category")?;
439 }
440 }
441
442 histogram!(
443 "publish_component_duration_seconds",
444 "component" => "prune_keycat"
445 )
446 .record(prune_keycat_timer.elapsed());
447
448 let insert_version_timer = Instant::now();
449
450 let insert_version_row = transaction
451 .query_one(
452 &insert_version_statement,
453 &[
454 &crate_id,
455 &version.vers.to_string(),
456 &checksum,
457 &false,
458 &version.links,
459 ],
460 )
461 .await
462 .map_err(|_| IndexError::Conflict("Failed to insert version".to_owned()))?;
463
464 histogram!(
465 "publish_component_duration_seconds",
466 "component" => "insert_version"
467 )
468 .record(insert_version_timer.elapsed());
469
470 let insert_dependencies_timer = Instant::now();
471
472 let version_id: i32 = insert_version_row.get("id");
473
474 for dependency in &version.deps {
475 transaction
476 .query_one(
477 &insert_dependency_statement,
478 &[
479 &dependency.name,
480 &dependency.registry.as_ref().unwrap_or(&String::new()),
481 &version_id,
482 &dependency.version_req.to_string(),
483 &dependency.features,
484 &dependency.optional,
485 &dependency.default_features,
486 &dependency.target,
487 &dependency.kind,
488 &dependency.explicit_name_in_toml,
489 ],
490 )
491 .await
492 .context("Failed to insert dependency")?;
493 }
494
495 histogram!(
496 "publish_component_duration_seconds",
497 "component" => "insert_dependencies"
498 )
499 .record(insert_dependencies_timer.elapsed());
500
501 let insert_features_timer = Instant::now();
502
503 for feature in &version.features {
504 transaction
505 .query_one(
506 &insert_features_statement,
507 &[&version_id, &feature.0, &feature.1],
508 )
509 .await
510 .context("Failed to insert feature")?;
511 }
512
513 histogram!(
514 "publish_component_duration_seconds",
515 "component" => "insert_features"
516 )
517 .record(insert_features_timer.elapsed());
518
519 let end_step_timer = Instant::now();
520
521 end_step
522 .await
523 .context("Failed to execute end step in index upload transaction")?;
524
525 histogram!(
526 "publish_component_duration_seconds",
527 "component" => "end_step"
528 )
529 .record(end_step_timer.elapsed());
530
531 let commit_timer = Instant::now();
532
533 transaction
534 .commit()
535 .await
536 .context("Failed to commit transaction")?;
537
538 histogram!(
539 "publish_component_duration_seconds",
540 "component" => "commit"
541 )
542 .record(commit_timer.elapsed());
543
544 Ok(CompletedPublication { warnings: None })
545 }
546
547 async fn list(&self, pagination: &ListQuery) -> IndexResult<ListAll> {
548 let client = self.pool.get().await.unwrap();
549
550 let statement = client
551 .prepare_cached(include_str!("../sql/list.sql"))
552 .await
553 .context("Failed to prepare search statement")?;
554
555 let mut rows: Vec<Row> = client
556 .query(&statement, &[])
557 .await
558 .context("Failed to execute search query")?;
559
560 drop(client);
562
563 rows.sort_unstable_by_key(|r| r.get::<_, String>("name"));
566
567 let crates = if let ListQuery {
568 per_page: Some(per_page),
569 page,
570 } = pagination
571 {
572 rows.chunks(*per_page)
573 .nth(page.unwrap_or_default())
574 .unwrap_or(&[])
575 .iter()
576 .map(list_row_to_entry)
577 .collect()
578 } else {
579 rows.iter().map(list_row_to_entry).collect()
580 };
581
582 let list_all = ListAll { results: crates };
583
584 Ok(list_all)
585 }
586}
587
588fn list_row_to_entry(row: &Row) -> ListAllCrateEntry {
589 let versions: Vec<String> = row.get("versions");
590
591 let versions = versions
593 .iter()
594 .map(|s| ListAllCrateVersion {
595 version: Version::parse(s).unwrap(),
596 })
597 .collect();
598
599 ListAllCrateEntry {
600 name: row.get("name"),
601 versions,
602 description: row.try_get("description").unwrap_or_default(),
603 created_at: row.get("created_at"),
604 updated_at: row.get("updated_at"),
605 homepage: row.get("homepage"),
606 repository: row.get("repository"),
607 documentation: row.get("documentation"),
608 keywords: row.get::<_, Option<Vec<_>>>("keywords").unwrap_or_default(),
609 categories: row
610 .get::<_, Option<Vec<_>>>("categories")
611 .unwrap_or_default(),
612 }
613}
614
615fn search_row_to_entry(row: &Row) -> SearchResultsEntry {
616 let versions: Vec<String> = row.get("versions");
617
618 let max_version = versions
620 .iter()
621 .map(|s| Version::parse(s).unwrap())
622 .max()
623 .unwrap();
624
625 SearchResultsEntry {
626 name: row.get("name"),
627 max_version,
628 description: row.try_get("description").unwrap_or_default(),
629 }
630}