freighter_pg_index/
lib.rs

1use anyhow::Context;
2use async_trait::async_trait;
3use deadpool_postgres::tokio_postgres::{IsolationLevel, NoTls, Row, Statement};
4use deadpool_postgres::{Pool, Runtime};
5use freighter_api_types::index::request::{ListQuery, Publish};
6use freighter_api_types::index::response::{
7    CompletedPublication, CrateVersion, Dependency, ListAll, ListAllCrateEntry,
8    ListAllCrateVersion, SearchResults, SearchResultsEntry, SearchResultsMeta,
9};
10use freighter_api_types::index::{IndexError, IndexProvider, IndexResult};
11use futures_util::StreamExt;
12use metrics::histogram;
13use postgres_types::ToSql;
14use semver::{Version, VersionReq};
15use serde::Deserialize;
16use std::cmp::Ordering;
17use std::collections::HashMap;
18use std::future::Future;
19use std::pin::Pin;
20use std::time::Instant;
21
22pub struct PgIndexProvider {
23    pool: Pool,
24}
25
26impl PgIndexProvider {
27    pub fn new(config: Config) -> IndexResult<Self> {
28        let pool = config
29            .index_db
30            .create_pool(Some(Runtime::Tokio1), NoTls)
31            .context("Failed to create db pool")?;
32
33        Ok(Self { pool })
34    }
35
36    async fn yank_inner(&self, crate_name: &str, version: &Version, val: bool) -> IndexResult<()> {
37        let client = self.pool.get().await.unwrap();
38
39        let statement = client
40            .prepare_cached(include_str!("../sql/set-yank.sql"))
41            .await
42            .context("Failed to prepare yank/unyank statement")?;
43
44        let rows = client
45            .query(&statement, &[&crate_name, &version.to_string(), &val])
46            .await
47            .context("Failed to execute yank/unyank query")?;
48
49        assert!(rows.len() <= 1);
50
51        if rows.len() == 1 {
52            Ok(())
53        } else {
54            Err(IndexError::Conflict(
55                "Tried to set yank status to an identical status".to_string(),
56            ))
57        }
58    }
59}
60
61#[derive(Deserialize)]
62pub struct Config {
63    pub index_db: deadpool_postgres::Config,
64}
65
66#[async_trait]
67impl IndexProvider for PgIndexProvider {
68    type Config = Config;
69
70    async fn healthcheck(&self) -> anyhow::Result<()> {
71        let _ = self.pool.get().await?;
72        Ok(())
73    }
74
75    async fn get_sparse_entry(&self, crate_name: &str) -> IndexResult<Vec<CrateVersion>> {
76        let client = self.pool.get().await.map_err(anyhow::Error::from)?;
77
78        // prepare these at once to take advantage of pipelining
79        let (existential_statement, versions_statement, features_statement, dependencies_statement) =
80            tokio::try_join!(
81                client.prepare_cached(include_str!("../sql/sparse-index/get-crate.sql")),
82                client.prepare_cached(include_str!("../sql/sparse-index/get-versions.sql")),
83                client.prepare_cached(include_str!("../sql/sparse-index/get-features.sql")),
84                client.prepare_cached(include_str!("../sql/sparse-index/get-dependencies.sql"))
85            )
86            .context("Failed to prepare transaction")?;
87
88        let mut existential_rows = client
89            .query(&existential_statement, &[&crate_name])
90            .await
91            .context("Failed to query for crate existence")?;
92
93        assert!(existential_rows.len() < 2);
94
95        #[allow(clippy::single_match_else)]
96        match existential_rows.pop() {
97            Some(crate_row) => {
98                let id: i32 = crate_row.get("id");
99
100                // this is a major hotpath
101                let version_rows = client
102                    .query(&versions_statement, &[&id])
103                    .await
104                    .context("Failed to query versions")?;
105
106                let mut versions = Vec::with_capacity(version_rows.len());
107
108                // drive them all concurrently to improve pipelining
109                let mut version_queries = futures_util::stream::FuturesUnordered::new();
110
111                // using a function like this can often make rustc a bit smarter about what it captures and generates
112                async fn query_version(
113                    version_row: Row,
114                    client: &deadpool_postgres::Client,
115                    features_statement: &Statement,
116                    dependencies_statement: &Statement,
117                ) -> anyhow::Result<(Row, Vec<Row>, Vec<Row>)> {
118                    let version_id: i32 = version_row.get("id");
119
120                    // this shouldn't be necessary but it is nonetheless
121                    let version_id_query = [&version_id as &(dyn ToSql + Sync)];
122
123                    // pipeline the queries here too
124                    let (features_row, dependencies_row) = tokio::try_join!(
125                        client.query(features_statement, &version_id_query),
126                        client.query(dependencies_statement, &version_id_query)
127                    )
128                    .context("Failed to query features or dependencies for crate")?;
129
130                    Ok((version_row, features_row, dependencies_row))
131                }
132
133                for version_row in version_rows {
134                    version_queries.push(query_version(
135                        version_row,
136                        &client,
137                        &features_statement,
138                        &dependencies_statement,
139                    ));
140                }
141
142                while let Some(query_res) = version_queries.next().await {
143                    let (version_row, feature_rows, dependency_rows) = query_res?;
144
145                    let mut features = HashMap::with_capacity(feature_rows.len());
146                    let mut deps = Vec::with_capacity(dependency_rows.len());
147
148                    for feature_row in feature_rows {
149                        features.insert(feature_row.get("name"), feature_row.get("values"));
150                    }
151
152                    for deps_row in dependency_rows {
153                        let registry: Option<String> = deps_row.get("registry");
154
155                        deps.push(Dependency {
156                            name: deps_row.get("name"),
157                            req: VersionReq::parse(deps_row.get("req"))
158                                .context("Failed to parse dependency version req in db")?,
159                            features: deps_row.get("features"),
160                            optional: deps_row.get("optional"),
161                            default_features: deps_row.get("default_features"),
162                            target: deps_row.get("target"),
163                            kind: deps_row.get("kind"),
164                            registry: registry.filter(|x| !x.is_empty()),
165                            package: deps_row.get("package"),
166                        });
167                    }
168
169                    versions.push(CrateVersion {
170                        name: crate_name.to_string(),
171                        vers: Version::parse(version_row.get("version"))
172                            .context("Failed to parse crate version in db")?,
173                        deps,
174                        cksum: version_row.get("cksum"),
175                        features,
176                        yanked: version_row.get("yanked"),
177                        links: version_row.get("links"),
178                        v: 2,
179                        // todo maybe scrap
180                        features2: HashMap::new(),
181                    });
182                }
183
184                Ok(versions)
185            }
186            None => {
187                tracing::warn!("Returning 404 for crate index");
188                Err(IndexError::NotFound)
189            }
190        }
191    }
192
193    async fn confirm_existence(&self, crate_name: &str, version: &Version) -> IndexResult<bool> {
194        let client = self.pool.get().await.unwrap();
195
196        let statement = client
197            .prepare_cached(include_str!("../sql/confirm-existence.sql"))
198            .await
199            .context("Failed to prepare confirm existence statement")?;
200
201        let rows: Vec<Row> = client
202            .query(&statement, &[&crate_name, &version.to_string()])
203            .await
204            .context("Failed to execute existential confirmation query")?;
205
206        if let Some(row) = rows.first() {
207            Ok(row.get("yanked"))
208        } else {
209            Err(IndexError::NotFound)
210        }
211    }
212
213    async fn yank_crate(&self, crate_name: &str, version: &Version) -> IndexResult<()> {
214        self.yank_inner(crate_name, version, true).await
215    }
216
217    async fn unyank_crate(&self, crate_name: &str, version: &Version) -> IndexResult<()> {
218        self.yank_inner(crate_name, version, false).await
219    }
220
221    async fn search(&self, query_string: &str, limit: usize) -> IndexResult<SearchResults> {
222        let client = self.pool.get().await.unwrap();
223
224        let statement = client
225            .prepare_cached(include_str!("../sql/search.sql"))
226            .await
227            .context("Failed to prepare search statement")?;
228
229        let mut rows: Vec<Row> = client
230            .query(&statement, &[&query_string])
231            .await
232            .context("Failed to execute search query")?;
233
234        // return the client immediately to the pool in case sorting takes longer than we'd like
235        drop(client);
236
237        // we can't scale the DB as easily as we can this server, so let's sort in here
238        // warning: may be expensive!
239        //
240        // sort first by version in descending order, then by name alphabetically
241        rows.sort_unstable_by(|a, b| {
242            match a.get::<_, i64>("count").cmp(&b.get::<_, i64>("count")) {
243                Ordering::Less => Ordering::Greater,
244                Ordering::Equal => a.get::<_, String>("name").cmp(&b.get::<_, String>("name")),
245                Ordering::Greater => Ordering::Less,
246            }
247        });
248
249        let total = rows.len();
250
251        // also might be expensive
252        let crates = rows.iter().take(limit).map(search_row_to_entry).collect();
253
254        let meta = SearchResultsMeta { total };
255
256        Ok(SearchResults { crates, meta })
257    }
258
259    // this one has a lot of optimization headroom, and is thus perfect for experiments
260    // sadly it does not matter, as this will never be as slow for the user as compiling the crate
261    #[allow(clippy::too_many_lines)]
262    async fn publish(
263        &self,
264        version: &Publish,
265        checksum: &str,
266        end_step: Pin<&mut (dyn Future<Output = IndexResult<()>> + Send)>,
267    ) -> IndexResult<CompletedPublication> {
268        let startup_timer = Instant::now();
269
270        let mut client = self
271            .pool
272            .get()
273            .await
274            .context("Failed to get client from pool")?;
275
276        let transaction = client
277            .build_transaction()
278            .isolation_level(IsolationLevel::ReadCommitted)
279            .start()
280            .await
281            .context("Failed to create publication transaction")?;
282
283        let (
284            get_or_insert_crate_statement,
285            insert_version_statement,
286            insert_dependency_statement,
287            insert_features_statement,
288            update_crate_statement,
289            get_crate_keywords_statement,
290            get_crate_categories_statement,
291            insert_keyword_statement,
292            insert_category_statement,
293            insert_crate_keyword_statement,
294            insert_crate_category_statement,
295            remove_crate_keyword_statement,
296            remove_crate_category_statement,
297        ) = tokio::try_join!(
298            transaction.prepare_cached(include_str!("../sql/publish/get-or-insert-crate.sql")),
299            transaction.prepare_cached(include_str!("../sql/publish/insert-version.sql")),
300            transaction.prepare_cached(include_str!("../sql/publish/insert-dependency.sql")),
301            transaction.prepare_cached(include_str!("../sql/publish/insert-features.sql")),
302            transaction.prepare_cached(include_str!("../sql/publish/update-crate.sql")),
303            transaction.prepare_cached(include_str!("../sql/publish/get-crate-keywords.sql")),
304            transaction.prepare_cached(include_str!("../sql/publish/get-crate-categories.sql")),
305            transaction.prepare_cached(include_str!("../sql/publish/insert-keyword.sql")),
306            transaction.prepare_cached(include_str!("../sql/publish/insert-category.sql")),
307            transaction.prepare_cached(include_str!("../sql/publish/insert-crate-keyword.sql")),
308            transaction.prepare_cached(include_str!("../sql/publish/insert-crate-category.sql")),
309            transaction.prepare_cached(include_str!("../sql/publish/remove-crate-keyword.sql")),
310            transaction.prepare_cached(include_str!("../sql/publish/remove-crate-category.sql")),
311        )
312        .context("Failed to prepare statements for publish transaction")?;
313
314        histogram!(
315            "freighter_publish_component_duration_seconds",
316            "component" => "startup"
317        )
318        .record(startup_timer.elapsed());
319
320        let crate_timer = Instant::now();
321
322        let crate_row = transaction
323            .query_one(&get_or_insert_crate_statement, &[&version.name])
324            .await
325            .context("Crate get or insert failed")?;
326
327        let crate_id: i32 = crate_row.get("id");
328
329        // postgres will replace the whole row anyways, so lets just be slightly more convenient
330        if version.description != crate_row.get("description")
331            || version.documentation != crate_row.get("documentation")
332            || version.homepage != crate_row.get("homepage")
333            || version.repository != crate_row.get("repository")
334        {
335            transaction
336                .query(
337                    &update_crate_statement,
338                    &[
339                        &crate_id,
340                        &version.description,
341                        &version.documentation,
342                        &version.homepage,
343                        &version.repository,
344                    ],
345                )
346                .await
347                .context("Failed to update crate with new information")?;
348        }
349
350        histogram!(
351            "publish_component_duration_seconds",
352            "component" => "crate"
353        )
354        .record(crate_timer.elapsed());
355
356        let get_keycat_timer = Instant::now();
357
358        let crate_keywords = transaction
359            .query(&get_crate_keywords_statement, &[&crate_id])
360            .await
361            .context("Failed to fetch crate keywords")?
362            .iter()
363            .map(|x| x.get("name"))
364            .collect::<Vec<String>>();
365
366        let crate_categories = transaction
367            .query(&get_crate_categories_statement, &[&crate_id])
368            .await
369            .context("Failed to fetch crate categories")?
370            .iter()
371            .map(|x| x.get("name"))
372            .collect::<Vec<String>>();
373
374        histogram!(
375            "publish_component_duration_seconds",
376            "component" => "get_keycat"
377        )
378        .record(get_keycat_timer.elapsed());
379
380        // add missing keywords and categories
381
382        let add_to_keycat_timer = Instant::now();
383
384        for k in &version.keywords {
385            if !crate_keywords.contains(k) {
386                let keyword_id: i32 = transaction
387                    .query_one(&insert_keyword_statement, &[k])
388                    .await
389                    .context("Failed to insert keyword")?
390                    .get("id");
391
392                transaction
393                    .query(&insert_crate_keyword_statement, &[&crate_id, &keyword_id])
394                    .await
395                    .context("Failed to insert crate_keyword")?;
396            }
397        }
398
399        for c in &version.categories {
400            if !crate_categories.contains(c) {
401                let category_id: i32 = transaction
402                    .query_one(&insert_category_statement, &[c])
403                    .await
404                    .context("Failed to insert category")?
405                    .get("id");
406
407                transaction
408                    .query(&insert_crate_category_statement, &[&crate_id, &category_id])
409                    .await
410                    .context("Failed to insert crate_category")?;
411            }
412        }
413
414        histogram!(
415            "publish_component_duration_seconds",
416            "component" => "add_to_keycat"
417        )
418        .record(add_to_keycat_timer.elapsed());
419
420        // prune unneeded keywords and categories
421
422        let prune_keycat_timer = Instant::now();
423
424        for k in &crate_keywords {
425            if !version.keywords.contains(k) {
426                transaction
427                    .query(&remove_crate_keyword_statement, &[&crate_id, k])
428                    .await
429                    .context("Failed to remove crate_keyword")?;
430            }
431        }
432
433        for c in &crate_categories {
434            if !version.categories.contains(c) {
435                transaction
436                    .query(&remove_crate_category_statement, &[&crate_id, c])
437                    .await
438                    .context("Failed to remove crate_category")?;
439            }
440        }
441
442        histogram!(
443            "publish_component_duration_seconds",
444            "component" => "prune_keycat"
445        )
446        .record(prune_keycat_timer.elapsed());
447
448        let insert_version_timer = Instant::now();
449
450        let insert_version_row = transaction
451            .query_one(
452                &insert_version_statement,
453                &[
454                    &crate_id,
455                    &version.vers.to_string(),
456                    &checksum,
457                    &false,
458                    &version.links,
459                ],
460            )
461            .await
462            .map_err(|_| IndexError::Conflict("Failed to insert version".to_owned()))?;
463
464        histogram!(
465            "publish_component_duration_seconds",
466            "component" => "insert_version"
467        )
468        .record(insert_version_timer.elapsed());
469
470        let insert_dependencies_timer = Instant::now();
471
472        let version_id: i32 = insert_version_row.get("id");
473
474        for dependency in &version.deps {
475            transaction
476                .query_one(
477                    &insert_dependency_statement,
478                    &[
479                        &dependency.name,
480                        &dependency.registry.as_ref().unwrap_or(&String::new()),
481                        &version_id,
482                        &dependency.version_req.to_string(),
483                        &dependency.features,
484                        &dependency.optional,
485                        &dependency.default_features,
486                        &dependency.target,
487                        &dependency.kind,
488                        &dependency.explicit_name_in_toml,
489                    ],
490                )
491                .await
492                .context("Failed to insert dependency")?;
493        }
494
495        histogram!(
496            "publish_component_duration_seconds",
497            "component" => "insert_dependencies"
498        )
499        .record(insert_dependencies_timer.elapsed());
500
501        let insert_features_timer = Instant::now();
502
503        for feature in &version.features {
504            transaction
505                .query_one(
506                    &insert_features_statement,
507                    &[&version_id, &feature.0, &feature.1],
508                )
509                .await
510                .context("Failed to insert feature")?;
511        }
512
513        histogram!(
514            "publish_component_duration_seconds",
515            "component" => "insert_features"
516        )
517        .record(insert_features_timer.elapsed());
518
519        let end_step_timer = Instant::now();
520
521        end_step
522            .await
523            .context("Failed to execute end step in index upload transaction")?;
524
525        histogram!(
526            "publish_component_duration_seconds",
527            "component" => "end_step"
528        )
529        .record(end_step_timer.elapsed());
530
531        let commit_timer = Instant::now();
532
533        transaction
534            .commit()
535            .await
536            .context("Failed to commit transaction")?;
537
538        histogram!(
539            "publish_component_duration_seconds",
540            "component" => "commit"
541        )
542        .record(commit_timer.elapsed());
543
544        Ok(CompletedPublication { warnings: None })
545    }
546
547    async fn list(&self, pagination: &ListQuery) -> IndexResult<ListAll> {
548        let client = self.pool.get().await.unwrap();
549
550        let statement = client
551            .prepare_cached(include_str!("../sql/list.sql"))
552            .await
553            .context("Failed to prepare search statement")?;
554
555        let mut rows: Vec<Row> = client
556            .query(&statement, &[])
557            .await
558            .context("Failed to execute search query")?;
559
560        // return the client immediately to the pool in case sorting takes longer than we'd like
561        drop(client);
562
563        // we can't scale the DB as easily as we can this server, so let's sort in here
564        // warning: may be expensive!
565        rows.sort_unstable_by_key(|r| r.get::<_, String>("name"));
566
567        let crates = if let ListQuery {
568            per_page: Some(per_page),
569            page,
570        } = pagination
571        {
572            rows.chunks(*per_page)
573                .nth(page.unwrap_or_default())
574                .unwrap_or(&[])
575                .iter()
576                .map(list_row_to_entry)
577                .collect()
578        } else {
579            rows.iter().map(list_row_to_entry).collect()
580        };
581
582        let list_all = ListAll { results: crates };
583
584        Ok(list_all)
585    }
586}
587
588fn list_row_to_entry(row: &Row) -> ListAllCrateEntry {
589    let versions: Vec<String> = row.get("versions");
590
591    // we should never receive 0 versions from our query
592    let versions = versions
593        .iter()
594        .map(|s| ListAllCrateVersion {
595            version: Version::parse(s).unwrap(),
596        })
597        .collect();
598
599    ListAllCrateEntry {
600        name: row.get("name"),
601        versions,
602        description: row.try_get("description").unwrap_or_default(),
603        created_at: row.get("created_at"),
604        updated_at: row.get("updated_at"),
605        homepage: row.get("homepage"),
606        repository: row.get("repository"),
607        documentation: row.get("documentation"),
608        keywords: row.get::<_, Option<Vec<_>>>("keywords").unwrap_or_default(),
609        categories: row
610            .get::<_, Option<Vec<_>>>("categories")
611            .unwrap_or_default(),
612    }
613}
614
615fn search_row_to_entry(row: &Row) -> SearchResultsEntry {
616    let versions: Vec<String> = row.get("versions");
617
618    // we should never receive 0 versions from our query
619    let max_version = versions
620        .iter()
621        .map(|s| Version::parse(s).unwrap())
622        .max()
623        .unwrap();
624
625    SearchResultsEntry {
626        name: row.get("name"),
627        max_version,
628        description: row.try_get("description").unwrap_or_default(),
629    }
630}