use std::borrow::Cow;
use std::fmt::{Display, Formatter};
use std::ops::Deref;
use std::path::Path;
use std::str::FromStr;
use std::sync::{Arc, LazyLock, RwLock};
use itertools::Either;
use rustc_hash::{FxHashMap, FxHashSet};
use thiserror::Error;
use url::{ParseError, Url};
use uv_auth::RealmRef;
use uv_cache_key::CanonicalUrl;
use uv_pep508::{Scheme, VerbatimUrl, VerbatimUrlError, split_scheme};
use uv_redacted::DisplaySafeUrl;
use uv_warnings::warn_user;
use crate::{Index, IndexStatusCodeStrategy, Verbatim};
static PYPI_URL: LazyLock<DisplaySafeUrl> =
LazyLock::new(|| DisplaySafeUrl::parse("https://pypi.org/simple").unwrap());
static DEFAULT_INDEX: LazyLock<Index> = LazyLock::new(|| {
Index::from_index_url(IndexUrl::Pypi(Arc::new(VerbatimUrl::from_url(
PYPI_URL.clone(),
))))
});
#[derive(Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
pub enum IndexUrl {
Pypi(Arc<VerbatimUrl>),
Url(Arc<VerbatimUrl>),
Path(Arc<VerbatimUrl>),
}
impl IndexUrl {
pub fn parse(path: &str, root_dir: Option<&Path>) -> Result<Self, IndexUrlError> {
let url = VerbatimUrl::from_url_or_path(path, root_dir)?;
Ok(Self::from(url))
}
pub fn root(&self) -> Option<DisplaySafeUrl> {
let mut segments = self.url().path_segments()?;
let last = match segments.next_back()? {
"" => segments.next_back()?,
segment => segment,
};
if !(last.eq_ignore_ascii_case("simple") || last.eq_ignore_ascii_case("+simple")) {
return None;
}
let mut url = self.url().clone();
url.path_segments_mut().ok()?.pop_if_empty().pop();
Some(url)
}
}
#[cfg(feature = "schemars")]
impl schemars::JsonSchema for IndexUrl {
fn schema_name() -> Cow<'static, str> {
Cow::Borrowed("IndexUrl")
}
fn json_schema(_generator: &mut schemars::generate::SchemaGenerator) -> schemars::Schema {
schemars::json_schema!({
"type": "string",
"description": "The URL of an index to use for fetching packages (e.g., `https://pypi.org/simple`), or a local path."
})
}
}
impl IndexUrl {
#[inline]
fn inner(&self) -> &VerbatimUrl {
match self {
Self::Pypi(url) | Self::Url(url) | Self::Path(url) => url,
}
}
pub fn url(&self) -> &DisplaySafeUrl {
self.inner().raw()
}
pub fn into_url(self) -> DisplaySafeUrl {
self.inner().to_url()
}
pub fn without_credentials(&self) -> Cow<'_, DisplaySafeUrl> {
let url = self.url();
if url.username().is_empty() && url.password().is_none() {
Cow::Borrowed(url)
} else {
let mut url = url.clone();
let _ = url.set_username("");
let _ = url.set_password(None);
Cow::Owned(url)
}
}
pub fn warn_on_disambiguated_relative_path(&self) {
let Self::Path(verbatim_url) = &self else {
return;
};
if let Some(path) = verbatim_url.given() {
if !is_disambiguated_path(path) {
if cfg!(windows) {
warn_user!(
"Relative paths passed to `--index` or `--default-index` should be disambiguated from index names (use `.\\{path}` or `./{path}`). Support for ambiguous values will be removed in the future"
);
} else {
warn_user!(
"Relative paths passed to `--index` or `--default-index` should be disambiguated from index names (use `./{path}`). Support for ambiguous values will be removed in the future"
);
}
}
}
}
}
impl Display for IndexUrl {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Display::fmt(self.inner(), f)
}
}
impl Verbatim for IndexUrl {
fn verbatim(&self) -> Cow<'_, str> {
self.inner().verbatim()
}
}
fn is_disambiguated_path(path: &str) -> bool {
if cfg!(windows) {
if path.starts_with(".\\") || path.starts_with("..\\") || path.starts_with('/') {
return true;
}
}
if path.starts_with("./") || path.starts_with("../") || Path::new(path).is_absolute() {
return true;
}
if let Some((scheme, _)) = split_scheme(path) {
return Scheme::parse(scheme).is_some();
}
false
}
#[derive(Error, Debug)]
pub enum IndexUrlError {
#[error(transparent)]
Io(#[from] std::io::Error),
#[error(transparent)]
Url(#[from] ParseError),
#[error(transparent)]
VerbatimUrl(#[from] VerbatimUrlError),
}
impl FromStr for IndexUrl {
type Err = IndexUrlError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Self::parse(s, None)
}
}
impl serde::ser::Serialize for IndexUrl {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
self.inner().without_credentials().serialize(serializer)
}
}
impl<'de> serde::de::Deserialize<'de> for IndexUrl {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::de::Deserializer<'de>,
{
struct Visitor;
impl serde::de::Visitor<'_> for Visitor {
type Value = IndexUrl;
fn expecting(&self, f: &mut Formatter) -> std::fmt::Result {
f.write_str("a string")
}
fn visit_str<E: serde::de::Error>(self, v: &str) -> Result<Self::Value, E> {
IndexUrl::from_str(v).map_err(serde::de::Error::custom)
}
}
deserializer.deserialize_str(Visitor)
}
}
impl From<VerbatimUrl> for IndexUrl {
fn from(url: VerbatimUrl) -> Self {
if url.scheme() == "file" {
Self::Path(Arc::new(url))
} else if *url.raw() == *PYPI_URL {
Self::Pypi(Arc::new(url))
} else {
Self::Url(Arc::new(url))
}
}
}
impl From<IndexUrl> for DisplaySafeUrl {
fn from(index: IndexUrl) -> Self {
index.inner().to_url()
}
}
impl Deref for IndexUrl {
type Target = Url;
fn deref(&self) -> &Self::Target {
self.inner()
}
}
#[derive(Default, Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case", deny_unknown_fields)]
pub struct IndexLocations {
indexes: Vec<Index>,
flat_index: Vec<Index>,
no_index: bool,
}
impl IndexLocations {
pub fn new(indexes: Vec<Index>, flat_index: Vec<Index>, no_index: bool) -> Self {
Self {
indexes,
flat_index,
no_index,
}
}
#[must_use]
pub fn combine(self, indexes: Vec<Index>, flat_index: Vec<Index>, no_index: bool) -> Self {
Self {
indexes: self.indexes.into_iter().chain(indexes).collect(),
flat_index: self.flat_index.into_iter().chain(flat_index).collect(),
no_index: self.no_index || no_index,
}
}
pub fn is_none(&self) -> bool {
*self == Self::default()
}
}
fn is_same_index(a: &IndexUrl, b: &IndexUrl) -> bool {
RealmRef::from(&**b.url()) == RealmRef::from(&**a.url())
&& CanonicalUrl::new(a.url()) == CanonicalUrl::new(b.url())
}
impl<'a> IndexLocations {
pub fn default_index(&'a self) -> Option<&'a Index> {
if self.no_index {
None
} else {
let mut seen = FxHashSet::default();
self.indexes
.iter()
.filter(move |index| index.name.as_ref().is_none_or(|name| seen.insert(name)))
.find(|index| index.default)
.or_else(|| Some(&DEFAULT_INDEX))
}
}
pub fn implicit_indexes(&'a self) -> impl Iterator<Item = &'a Index> + 'a {
if self.no_index {
Either::Left(std::iter::empty())
} else {
let mut seen = FxHashSet::default();
Either::Right(
self.indexes
.iter()
.filter(move |index| index.name.as_ref().is_none_or(|name| seen.insert(name)))
.filter(|index| !index.default && !index.explicit),
)
}
}
pub fn indexes(&'a self) -> impl Iterator<Item = &'a Index> + 'a {
self.implicit_indexes()
.chain(self.default_index())
.filter(|index| !index.explicit)
}
pub fn simple_indexes(&'a self) -> impl Iterator<Item = &'a Index> + 'a {
if self.no_index {
Either::Left(std::iter::empty())
} else {
let mut seen = FxHashSet::default();
Either::Right(
self.indexes
.iter()
.filter(move |index| index.name.as_ref().is_none_or(|name| seen.insert(name))),
)
}
}
pub fn flat_indexes(&'a self) -> impl Iterator<Item = &'a Index> + 'a {
self.flat_index.iter()
}
pub fn no_index(&self) -> bool {
self.no_index
}
pub fn index_urls(&'a self) -> IndexUrls {
IndexUrls {
indexes: self.indexes.clone(),
no_index: self.no_index,
}
}
pub fn allowed_indexes(&'a self) -> Vec<&'a Index> {
if self.no_index {
self.flat_index.iter().rev().collect()
} else {
let mut indexes = vec![];
let mut seen = FxHashSet::default();
let mut default = false;
for index in {
self.indexes
.iter()
.chain(self.flat_index.iter())
.filter(move |index| index.name.as_ref().is_none_or(|name| seen.insert(name)))
} {
if index.default {
if default {
continue;
}
default = true;
}
indexes.push(index);
}
if !default {
indexes.push(&*DEFAULT_INDEX);
}
indexes.reverse();
indexes
}
}
pub fn known_indexes(&'a self) -> impl Iterator<Item = &'a Index> {
if self.no_index {
Either::Left(self.flat_index.iter().rev())
} else {
Either::Right(
std::iter::once(&*DEFAULT_INDEX)
.chain(self.flat_index.iter().rev())
.chain(self.indexes.iter().rev()),
)
}
}
pub fn simple_api_cache_control_for(&self, url: &IndexUrl) -> Option<&str> {
for index in &self.indexes {
if is_same_index(index.url(), url) {
return index.simple_api_cache_control();
}
}
None
}
pub fn artifact_cache_control_for(&self, url: &IndexUrl) -> Option<&str> {
for index in &self.indexes {
if is_same_index(index.url(), url) {
return index.artifact_cache_control();
}
}
None
}
}
impl From<&IndexLocations> for uv_auth::Indexes {
fn from(index_locations: &IndexLocations) -> Self {
Self::from_indexes(index_locations.allowed_indexes().into_iter().map(|index| {
let mut url = index.url().url().clone();
url.set_username("").ok();
url.set_password(None).ok();
let mut root_url = index.url().root().unwrap_or_else(|| url.clone());
root_url.set_username("").ok();
root_url.set_password(None).ok();
uv_auth::Index {
url,
root_url,
auth_policy: index.authenticate,
}
}))
}
}
#[derive(Default, Debug, Clone, PartialEq, Eq)]
pub struct IndexUrls {
indexes: Vec<Index>,
no_index: bool,
}
impl<'a> IndexUrls {
pub fn from_indexes(indexes: Vec<Index>) -> Self {
Self {
indexes,
no_index: false,
}
}
fn default_index(&'a self) -> Option<&'a Index> {
if self.no_index {
None
} else {
let mut seen = FxHashSet::default();
self.indexes
.iter()
.filter(move |index| index.name.as_ref().is_none_or(|name| seen.insert(name)))
.find(|index| index.default)
.or_else(|| Some(&DEFAULT_INDEX))
}
}
fn implicit_indexes(&'a self) -> impl Iterator<Item = &'a Index> + 'a {
if self.no_index {
Either::Left(std::iter::empty())
} else {
let mut seen = FxHashSet::default();
Either::Right(
self.indexes
.iter()
.filter(move |index| index.name.as_ref().is_none_or(|name| seen.insert(name)))
.filter(|index| !index.default && !index.explicit),
)
}
}
pub fn indexes(&'a self) -> impl Iterator<Item = &'a Index> + 'a {
let mut seen = FxHashSet::default();
self.implicit_indexes()
.chain(self.default_index())
.filter(|index| !index.explicit)
.filter(move |index| seen.insert(index.raw_url())) }
pub fn defined_indexes(&'a self) -> impl Iterator<Item = &'a Index> + 'a {
if self.no_index {
return Either::Left(std::iter::empty());
}
let mut seen = FxHashSet::default();
let (non_default, default) = self
.indexes
.iter()
.filter(move |index| {
if let Some(name) = &index.name {
seen.insert(name)
} else {
true
}
})
.partition::<Vec<_>, _>(|index| !index.default);
Either::Right(non_default.into_iter().chain(default))
}
pub fn no_index(&self) -> bool {
self.no_index
}
pub fn status_code_strategy_for(&self, url: &IndexUrl) -> IndexStatusCodeStrategy {
for index in &self.indexes {
if is_same_index(index.url(), url) {
return index.status_code_strategy();
}
}
IndexStatusCodeStrategy::Default
}
pub fn simple_api_cache_control_for(&self, url: &IndexUrl) -> Option<&str> {
for index in &self.indexes {
if is_same_index(index.url(), url) {
return index.simple_api_cache_control();
}
}
None
}
pub fn artifact_cache_control_for(&self, url: &IndexUrl) -> Option<&str> {
for index in &self.indexes {
if is_same_index(index.url(), url) {
return index.artifact_cache_control();
}
}
None
}
}
bitflags::bitflags! {
#[derive(Debug, Copy, Clone)]
struct Flags: u8 {
const NO_RANGE_REQUESTS = 1;
const UNAUTHORIZED = 1 << 2;
const FORBIDDEN = 1 << 1;
}
}
#[derive(Debug, Default, Clone)]
pub struct IndexCapabilities(Arc<RwLock<FxHashMap<IndexUrl, Flags>>>);
impl IndexCapabilities {
pub fn supports_range_requests(&self, index_url: &IndexUrl) -> bool {
!self
.0
.read()
.unwrap()
.get(index_url)
.is_some_and(|flags| flags.intersects(Flags::NO_RANGE_REQUESTS))
}
pub fn set_no_range_requests(&self, index_url: IndexUrl) {
self.0
.write()
.unwrap()
.entry(index_url)
.or_insert(Flags::empty())
.insert(Flags::NO_RANGE_REQUESTS);
}
pub fn unauthorized(&self, index_url: &IndexUrl) -> bool {
self.0
.read()
.unwrap()
.get(index_url)
.is_some_and(|flags| flags.intersects(Flags::UNAUTHORIZED))
}
pub fn set_unauthorized(&self, index_url: IndexUrl) {
self.0
.write()
.unwrap()
.entry(index_url)
.or_insert(Flags::empty())
.insert(Flags::UNAUTHORIZED);
}
pub fn forbidden(&self, index_url: &IndexUrl) -> bool {
self.0
.read()
.unwrap()
.get(index_url)
.is_some_and(|flags| flags.intersects(Flags::FORBIDDEN))
}
pub fn set_forbidden(&self, index_url: IndexUrl) {
self.0
.write()
.unwrap()
.entry(index_url)
.or_insert(Flags::empty())
.insert(Flags::FORBIDDEN);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{IndexCacheControl, IndexFormat, IndexName};
use uv_small_str::SmallString;
#[test]
fn test_index_url_parse_valid_paths() {
assert!(is_disambiguated_path("/absolute/path"));
assert!(is_disambiguated_path("./relative/path"));
assert!(is_disambiguated_path("../../relative/path"));
if cfg!(windows) {
assert!(is_disambiguated_path("C:/absolute/path"));
assert!(is_disambiguated_path(".\\relative\\path"));
assert!(is_disambiguated_path("..\\..\\relative\\path"));
}
}
#[test]
fn test_index_url_parse_ambiguous_paths() {
assert!(!is_disambiguated_path("index"));
assert!(!is_disambiguated_path("relative/path"));
}
#[test]
fn test_index_url_parse_with_schemes() {
assert!(is_disambiguated_path("file:///absolute/path"));
assert!(is_disambiguated_path("https://registry.com/simple/"));
assert!(is_disambiguated_path(
"git+https://github.com/example/repo.git"
));
}
#[test]
fn test_cache_control_lookup() {
use std::str::FromStr;
use uv_small_str::SmallString;
use crate::IndexFormat;
use crate::index_name::IndexName;
let indexes = vec![
Index {
name: Some(IndexName::from_str("index1").unwrap()),
url: IndexUrl::from_str("https://index1.example.com/simple").unwrap(),
cache_control: Some(crate::IndexCacheControl {
api: Some(SmallString::from("max-age=300")),
files: Some(SmallString::from("max-age=1800")),
}),
explicit: false,
default: false,
origin: None,
format: IndexFormat::Simple,
publish_url: None,
authenticate: uv_auth::AuthPolicy::default(),
ignore_error_codes: None,
},
Index {
name: Some(IndexName::from_str("index2").unwrap()),
url: IndexUrl::from_str("https://index2.example.com/simple").unwrap(),
cache_control: None,
explicit: false,
default: false,
origin: None,
format: IndexFormat::Simple,
publish_url: None,
authenticate: uv_auth::AuthPolicy::default(),
ignore_error_codes: None,
},
];
let index_urls = IndexUrls::from_indexes(indexes);
let url1 = IndexUrl::from_str("https://index1.example.com/simple").unwrap();
assert_eq!(
index_urls.simple_api_cache_control_for(&url1),
Some("max-age=300")
);
assert_eq!(
index_urls.artifact_cache_control_for(&url1),
Some("max-age=1800")
);
let url2 = IndexUrl::from_str("https://index2.example.com/simple").unwrap();
assert_eq!(index_urls.simple_api_cache_control_for(&url2), None);
assert_eq!(index_urls.artifact_cache_control_for(&url2), None);
let url3 = IndexUrl::from_str("https://index3.example.com/simple").unwrap();
assert_eq!(index_urls.simple_api_cache_control_for(&url3), None);
assert_eq!(index_urls.artifact_cache_control_for(&url3), None);
}
#[test]
fn test_pytorch_default_cache_control() {
let indexes = vec![Index {
name: Some(IndexName::from_str("pytorch").unwrap()),
url: IndexUrl::from_str("https://download.pytorch.org/whl/cu118").unwrap(),
cache_control: None, explicit: false,
default: false,
origin: None,
format: IndexFormat::Simple,
publish_url: None,
authenticate: uv_auth::AuthPolicy::default(),
ignore_error_codes: None,
}];
let index_urls = IndexUrls::from_indexes(indexes.clone());
let index_locations = IndexLocations::new(indexes, Vec::new(), false);
let pytorch_url = IndexUrl::from_str("https://download.pytorch.org/whl/cu118").unwrap();
assert_eq!(index_urls.simple_api_cache_control_for(&pytorch_url), None);
assert_eq!(
index_urls.artifact_cache_control_for(&pytorch_url),
Some("max-age=365000000, immutable, public")
);
assert_eq!(
index_locations.simple_api_cache_control_for(&pytorch_url),
None
);
assert_eq!(
index_locations.artifact_cache_control_for(&pytorch_url),
Some("max-age=365000000, immutable, public")
);
}
#[test]
fn test_pytorch_user_override_cache_control() {
let indexes = vec![Index {
name: Some(IndexName::from_str("pytorch").unwrap()),
url: IndexUrl::from_str("https://download.pytorch.org/whl/cu118").unwrap(),
cache_control: Some(IndexCacheControl {
api: Some(SmallString::from("no-cache")),
files: Some(SmallString::from("max-age=3600")),
}),
explicit: false,
default: false,
origin: None,
format: IndexFormat::Simple,
publish_url: None,
authenticate: uv_auth::AuthPolicy::default(),
ignore_error_codes: None,
}];
let index_urls = IndexUrls::from_indexes(indexes.clone());
let index_locations = IndexLocations::new(indexes, Vec::new(), false);
let pytorch_url = IndexUrl::from_str("https://download.pytorch.org/whl/cu118").unwrap();
assert_eq!(
index_urls.simple_api_cache_control_for(&pytorch_url),
Some("no-cache")
);
assert_eq!(
index_urls.artifact_cache_control_for(&pytorch_url),
Some("max-age=3600")
);
assert_eq!(
index_locations.simple_api_cache_control_for(&pytorch_url),
Some("no-cache")
);
assert_eq!(
index_locations.artifact_cache_control_for(&pytorch_url),
Some("max-age=3600")
);
}
}