use crate::{Result, config::HttpConfig, cratespec::RegistrySource, error, http::HttpClient};
use backon::{BlockingRetryable, ExponentialBuilder};
use semver::Version;
use snafu::ResultExt;
use tame_index::{
Error as TameIndexError, HttpError as TameHttpError, IndexKrate, IndexLocation, IndexUrl, KrateName,
SparseIndex,
index::RemoteSparseIndex,
utils::flock::{FileLock, LockOptions},
};
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum DownloadUrlLookup {
Url(String),
CrateNotFound,
VersionNotFound,
UrlUnavailable,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct RegistryVersionInfo {
pub(crate) version: String,
pub(crate) yanked: bool,
}
pub(crate) struct RegistryClient {
remote_index: RemoteSparseIndex,
lock: FileLock,
http_config: HttpConfig,
}
impl RegistryClient {
pub(crate) fn new(
source: Option<&RegistrySource>,
http_client: &HttpClient,
http: &HttpConfig,
) -> Result<Self> {
let index_url = resolve_index_url(source).context(error::RegistrySnafu)?;
let index_location = IndexLocation::new(index_url);
let sparse_index = SparseIndex::new(index_location).context(error::RegistrySnafu)?;
let remote_index = RemoteSparseIndex::new(sparse_index, http_client.inner().clone());
let lock = LockOptions::cargo_package_lock(None)
.context(error::RegistrySnafu)?
.lock(|_| None)
.context(error::RegistrySnafu)?;
Ok(Self {
remote_index,
lock,
http_config: http.clone(),
})
}
pub(crate) fn crate_versions(
&self,
name: &str,
offline: bool,
) -> Result<Option<Vec<RegistryVersionInfo>>> {
let Some(krate) = self.fetch_krate(name, offline)? else {
return Ok(None);
};
Ok(Some(
krate
.versions
.iter()
.map(|v| RegistryVersionInfo {
version: v.version.to_string(),
yanked: v.is_yanked(),
})
.collect(),
))
}
pub(crate) fn crate_download_url(
&self,
name: &str,
version: &Version,
offline: bool,
) -> Result<DownloadUrlLookup> {
let Some(krate) = self.fetch_krate(name, offline)? else {
return Ok(DownloadUrlLookup::CrateNotFound);
};
let Some(index_version) = krate
.versions
.iter()
.find(|v| Version::parse(&v.version).ok().is_some_and(|ver| &ver == version))
else {
return Ok(DownloadUrlLookup::VersionNotFound);
};
let index_config = self
.remote_index
.index
.index_config()
.context(error::RegistrySnafu)?;
let Some(download_url) = index_version.download_url(&index_config) else {
return Ok(DownloadUrlLookup::UrlUnavailable);
};
Ok(DownloadUrlLookup::Url(download_url))
}
fn fetch_krate(&self, name: &str, offline: bool) -> Result<Option<IndexKrate>> {
if offline {
let krate_name = KrateName::try_from(name).context(error::RegistrySnafu)?;
return self
.remote_index
.cached_krate(krate_name, &self.lock)
.context(error::RegistrySnafu);
}
let operation = || {
let krate_name = KrateName::try_from(name)?;
self.remote_index.krate(krate_name, true, &self.lock)
};
run_with_retry(&self.http_config, name, operation).context(error::RegistrySnafu)
}
}
fn resolve_index_url(source: Option<&RegistrySource>) -> std::result::Result<IndexUrl<'_>, TameIndexError> {
match source {
None => IndexUrl::crates_io(
None, None, None, ),
Some(RegistrySource::Named(registry_name)) => IndexUrl::for_registry_name(
None, None, registry_name,
),
Some(RegistrySource::IndexUrl(url)) => Ok(IndexUrl::from(url.as_str())),
}
}
fn build_registry_backoff(http: &HttpConfig) -> ExponentialBuilder {
ExponentialBuilder::default()
.with_min_delay(http.backoff_base)
.with_max_delay(http.backoff_max)
.with_max_times(http.retries)
.with_jitter()
}
fn run_with_retry<T, F>(
http: &HttpConfig,
crate_name: &str,
operation: F,
) -> std::result::Result<T, TameIndexError>
where
F: FnMut() -> std::result::Result<T, TameIndexError>,
{
operation
.retry(build_registry_backoff(http))
.when(is_retryable_tame_error)
.notify(|err, dur| {
tracing::debug!(
"Sparse index request for crate '{}' failed, retrying in {:?}: {:?}",
crate_name,
dur,
err
);
})
.call()
}
fn is_retryable_tame_error(err: &TameIndexError) -> bool {
match err {
TameIndexError::Http(TameHttpError::Reqwest(source)) => {
source.is_connect() || source.is_timeout() || source.is_request()
}
TameIndexError::Http(TameHttpError::StatusCode { code, .. }) => {
code.as_u16() == 429 || code.is_server_error()
}
_ => false,
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::{
sync::atomic::{AtomicUsize, Ordering},
time::Duration,
};
fn fast_http_config(retries: usize) -> HttpConfig {
HttpConfig {
retries,
backoff_base: Duration::from_millis(1),
backoff_max: Duration::from_millis(2),
..Default::default()
}
}
#[test]
fn test_retry_classifier_for_status_codes() {
let rate_limited = TameIndexError::Http(TameHttpError::StatusCode {
code: tame_index::external::http::StatusCode::TOO_MANY_REQUESTS,
msg: "rate limited",
});
assert!(is_retryable_tame_error(&rate_limited));
let server_error = TameIndexError::Http(TameHttpError::StatusCode {
code: tame_index::external::http::StatusCode::SERVICE_UNAVAILABLE,
msg: "service unavailable",
});
assert!(is_retryable_tame_error(&server_error));
let unauthorized = TameIndexError::Http(TameHttpError::StatusCode {
code: tame_index::external::http::StatusCode::UNAUTHORIZED,
msg: "unauthorized",
});
assert!(!is_retryable_tame_error(&unauthorized));
}
#[test]
fn test_retryable_error_retried_then_succeeds() {
let attempts = AtomicUsize::new(0);
let result = run_with_retry(&fast_http_config(2), "serde", || {
let attempt = attempts.fetch_add(1, Ordering::SeqCst);
if attempt == 0 {
Err(TameIndexError::Http(TameHttpError::StatusCode {
code: tame_index::external::http::StatusCode::TOO_MANY_REQUESTS,
msg: "rate limited",
}))
} else {
Ok("ok")
}
});
assert_eq!(result.unwrap(), "ok");
assert_eq!(attempts.load(Ordering::SeqCst), 2);
}
#[test]
fn test_non_retryable_error_not_retried() {
let attempts = AtomicUsize::new(0);
let result = run_with_retry(&fast_http_config(3), "serde", || {
attempts.fetch_add(1, Ordering::SeqCst);
Err::<(), _>(TameIndexError::Http(TameHttpError::StatusCode {
code: tame_index::external::http::StatusCode::UNAUTHORIZED,
msg: "unauthorized",
}))
});
assert!(result.is_err());
assert_eq!(attempts.load(Ordering::SeqCst), 1);
}
#[test]
fn test_retryable_error_exhausts_retry_budget() {
let attempts = AtomicUsize::new(0);
let result = run_with_retry(&fast_http_config(2), "serde", || {
attempts.fetch_add(1, Ordering::SeqCst);
Err::<(), _>(TameIndexError::Http(TameHttpError::StatusCode {
code: tame_index::external::http::StatusCode::SERVICE_UNAVAILABLE,
msg: "service unavailable",
}))
});
assert!(result.is_err());
assert_eq!(attempts.load(Ordering::SeqCst), 3);
}
#[test]
fn test_zero_retries_means_single_attempt() {
let attempts = AtomicUsize::new(0);
let result = run_with_retry(&fast_http_config(0), "serde", || {
attempts.fetch_add(1, Ordering::SeqCst);
Err::<(), _>(TameIndexError::Http(TameHttpError::StatusCode {
code: tame_index::external::http::StatusCode::SERVICE_UNAVAILABLE,
msg: "service unavailable",
}))
});
assert!(result.is_err());
assert_eq!(attempts.load(Ordering::SeqCst), 1);
}
}