use {
crate::{
filesystem_scanning::PythonResourceIterator, module_util::PythonModuleSuffixes,
package_metadata::PythonPackageMetadata, resource::PythonResource,
},
anyhow::{anyhow, Context, Result},
once_cell::sync::Lazy,
simple_file_manifest::{File, FileEntry, FileManifest},
std::{borrow::Cow, io::Read, path::Path},
zip::ZipArchive,
};
static RE_WHEEL_INFO: Lazy<regex::Regex> = Lazy::new(|| {
regex::Regex::new(r"^(?P<namever>(?P<name>.+?)-(?P<ver>.+?))(-(?P<build>\d[^-]*))?-(?P<pyver>.+?)-(?P<abi>.+?)-(?P<plat>.+?)\.whl$").unwrap()
});
const S_IXUSR: u32 = 64;
pub struct WheelArchive {
files: FileManifest,
name_version: String,
}
impl WheelArchive {
pub fn from_reader<R>(reader: R, basename: &str) -> Result<Self>
where
R: std::io::Read + std::io::Seek,
{
let captures = RE_WHEEL_INFO
.captures(basename)
.ok_or_else(|| anyhow!("failed to parse wheel basename: {}", basename))?;
let name_version = captures
.name("namever")
.ok_or_else(|| anyhow!("could not find name-version in wheel name"))?
.as_str()
.to_string();
let mut archive = ZipArchive::new(reader)?;
let mut files = FileManifest::default();
for i in 0..archive.len() {
let mut file = archive.by_index(i)?;
if file.is_dir() {
continue;
}
let mut buffer = Vec::with_capacity(file.size() as usize);
file.read_to_end(&mut buffer)?;
files.add_file_entry(
Path::new(file.name()),
FileEntry::new_from_data(buffer, file.unix_mode().unwrap_or(0) & S_IXUSR != 0),
)?;
}
Ok(Self {
files,
name_version,
})
}
pub fn from_path(path: &Path) -> Result<Self> {
let fh = std::fs::File::open(path)
.with_context(|| format!("opening {} for wheel reading", path.display()))?;
let reader = std::io::BufReader::new(fh);
let basename = path
.file_name()
.ok_or_else(|| anyhow!("could not derive file name"))?
.to_string_lossy();
Self::from_reader(reader, &basename)
}
fn dist_info_path(&self) -> String {
format!("{}.dist-info", self.name_version)
}
fn data_path(&self) -> String {
format!("{}.data", self.name_version)
}
pub fn archive_metadata(&self) -> Result<PythonPackageMetadata> {
let path = format!("{}/WHEEL", self.dist_info_path());
let file = self
.files
.get(&path)
.ok_or_else(|| anyhow!("{} does not exist", path))?;
PythonPackageMetadata::from_metadata(&file.resolve_content()?)
}
pub fn metadata(&self) -> Result<PythonPackageMetadata> {
let path = format!("{}/METADATA", self.dist_info_path());
let file = self
.files
.get(&path)
.ok_or_else(|| anyhow!("{} does not exist", path))?;
PythonPackageMetadata::from_metadata(&file.resolve_content()?)
}
pub fn archive_metadata_header(&self, header: &str) -> Result<Cow<str>> {
let metadata = self.archive_metadata()?;
Ok(Cow::Owned(
metadata
.find_first_header(header)
.ok_or_else(|| anyhow!("{} not found", header))?
.to_string(),
))
}
pub fn archive_metadata_headers(&self, header: &str) -> Result<Vec<Cow<str>>> {
let metadata = self.archive_metadata()?;
Ok(metadata
.find_all_headers(header)
.iter()
.map(|s| Cow::Owned(s.to_string()))
.collect::<Vec<_>>())
}
pub fn wheel_version(&self) -> Result<Cow<str>> {
self.archive_metadata_header("Wheel-Version")
}
pub fn wheel_generator(&self) -> Result<Cow<str>> {
self.archive_metadata_header("Generator")
}
pub fn root_is_purelib(&self) -> Result<bool> {
Ok(self.archive_metadata_header("Root-Is-Purelib")? == "true")
}
pub fn tags(&self) -> Result<Vec<Cow<str>>> {
self.archive_metadata_headers("Tag")
}
pub fn build(&self) -> Result<Cow<str>> {
self.archive_metadata_header("Build")
}
pub fn install_paths_to(&self) -> Result<Vec<Cow<str>>> {
self.archive_metadata_headers("Install-Paths-To")
}
pub fn dist_info_files(&self) -> Vec<File> {
let prefix = format!("{}/", self.dist_info_path());
self.files
.iter_files()
.filter(|f| f.path().starts_with(&prefix))
.collect::<Vec<_>>()
}
fn data_paths(&self, key: &str) -> Vec<File> {
let prefix = format!("{}.data/{}/", self.name_version, key);
self.files
.iter_files()
.filter_map(|f| {
if f.path().starts_with(&prefix) {
Some(File::new(
&f.path().display().to_string()[prefix.len()..],
f.entry(),
))
} else {
None
}
})
.collect::<Vec<_>>()
}
pub fn purelib_files(&self) -> Vec<File> {
self.data_paths("purelib")
}
pub fn platlib_files(&self) -> Vec<File> {
self.data_paths("platlib")
}
pub fn headers_files(&self) -> Vec<File> {
self.data_paths("headers")
}
pub fn scripts_files(&self) -> Vec<File> {
self.data_paths("scripts")
}
pub fn data_files(&self) -> Vec<File> {
self.data_paths("data")
}
pub fn regular_files(&self) -> Vec<File> {
let dist_info_prefix = format!("{}/", self.dist_info_path());
let data_prefix = format!("{}/", self.data_path());
self.files
.iter_files()
.filter(|f| {
!(f.path().starts_with(&dist_info_prefix) || f.path().starts_with(&data_prefix))
})
.collect::<Vec<_>>()
}
pub fn python_resources<'a>(
&self,
cache_tag: &str,
suffixes: &PythonModuleSuffixes,
emit_files: bool,
classify_files: bool,
) -> Result<Vec<PythonResource<'a>>> {
let mut inputs = self.regular_files();
inputs.extend(self.dist_info_files());
inputs.extend(self.purelib_files());
inputs.extend(self.platlib_files());
inputs.extend(self.data_files());
PythonResourceIterator::from_data_locations(
&inputs,
cache_tag,
suffixes,
emit_files,
classify_files,
)?
.collect::<Result<Vec<_>>>()
}
}