use {
crate::{
bytecode::{CompileMode, PythonBytecodeCompiler},
licensing::LicensedComponent,
module_util::{is_package_from_path, packages_from_module_name, resolve_path_for_module},
python_source::has_dunder_file,
},
anyhow::{anyhow, Result},
simple_file_manifest::{File, FileData},
std::{
borrow::Cow,
collections::HashMap,
hash::BuildHasher,
path::{Path, PathBuf},
},
};
#[cfg(feature = "serialization")]
use serde::{Deserialize, Serialize};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serialization", derive(Deserialize, Serialize))]
pub enum BytecodeOptimizationLevel {
#[cfg_attr(feature = "serialization", serde(rename = "0"))]
Zero,
#[cfg_attr(feature = "serialization", serde(rename = "1"))]
One,
#[cfg_attr(feature = "serialization", serde(rename = "2"))]
Two,
}
impl BytecodeOptimizationLevel {
pub fn to_extra_tag(&self) -> &'static str {
match self {
BytecodeOptimizationLevel::Zero => "",
BytecodeOptimizationLevel::One => ".opt-1",
BytecodeOptimizationLevel::Two => ".opt-2",
}
}
}
impl TryFrom<i32> for BytecodeOptimizationLevel {
type Error = &'static str;
fn try_from(i: i32) -> Result<Self, Self::Error> {
match i {
0 => Ok(BytecodeOptimizationLevel::Zero),
1 => Ok(BytecodeOptimizationLevel::One),
2 => Ok(BytecodeOptimizationLevel::Two),
_ => Err("unsupported bytecode optimization level"),
}
}
}
impl From<BytecodeOptimizationLevel> for i32 {
fn from(level: BytecodeOptimizationLevel) -> Self {
match level {
BytecodeOptimizationLevel::Zero => 0,
BytecodeOptimizationLevel::One => 1,
BytecodeOptimizationLevel::Two => 2,
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct PythonModuleSource {
pub name: String,
pub source: FileData,
pub is_package: bool,
pub cache_tag: String,
pub is_stdlib: bool,
pub is_test: bool,
}
impl PythonModuleSource {
pub fn description(&self) -> String {
format!("source code for Python module {}", self.name)
}
pub fn to_memory(&self) -> Result<Self> {
Ok(Self {
name: self.name.clone(),
source: self.source.to_memory()?,
is_package: self.is_package,
cache_tag: self.cache_tag.clone(),
is_stdlib: self.is_stdlib,
is_test: self.is_test,
})
}
pub fn package(&self) -> String {
if self.is_package {
self.name.clone()
} else if let Some(idx) = self.name.rfind('.') {
self.name[0..idx].to_string()
} else {
self.name.clone()
}
}
pub fn top_level_package(&self) -> &str {
if let Some(idx) = self.name.find('.') {
&self.name[0..idx]
} else {
&self.name
}
}
pub fn as_bytecode_module(
&self,
optimize_level: BytecodeOptimizationLevel,
) -> PythonModuleBytecodeFromSource {
PythonModuleBytecodeFromSource {
name: self.name.clone(),
source: self.source.clone(),
optimize_level,
is_package: self.is_package,
cache_tag: self.cache_tag.clone(),
is_stdlib: self.is_stdlib,
is_test: self.is_test,
}
}
pub fn resolve_path(&self, prefix: &str) -> PathBuf {
resolve_path_for_module(prefix, &self.name, self.is_package, None)
}
pub fn has_dunder_file(&self) -> Result<bool> {
has_dunder_file(&self.source.resolve_content()?)
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct PythonModuleBytecodeFromSource {
pub name: String,
pub source: FileData,
pub optimize_level: BytecodeOptimizationLevel,
pub is_package: bool,
pub cache_tag: String,
pub is_stdlib: bool,
pub is_test: bool,
}
impl PythonModuleBytecodeFromSource {
pub fn description(&self) -> String {
format!(
"bytecode for Python module {} at O{} (compiled from source)",
self.name, self.optimize_level as i32
)
}
pub fn to_memory(&self) -> Result<Self> {
Ok(Self {
name: self.name.clone(),
source: self.source.to_memory()?,
optimize_level: self.optimize_level,
is_package: self.is_package,
cache_tag: self.cache_tag.clone(),
is_stdlib: self.is_stdlib,
is_test: self.is_test,
})
}
pub fn compile(
&self,
compiler: &mut dyn PythonBytecodeCompiler,
mode: CompileMode,
) -> Result<Vec<u8>> {
compiler.compile(
&self.source.resolve_content()?,
&self.name,
self.optimize_level,
mode,
)
}
pub fn resolve_path(&self, prefix: &str) -> PathBuf {
let bytecode_tag = match self.optimize_level {
BytecodeOptimizationLevel::Zero => self.cache_tag.clone(),
BytecodeOptimizationLevel::One => format!("{}.opt-1", self.cache_tag),
BytecodeOptimizationLevel::Two => format!("{}.opt-2", self.cache_tag),
};
resolve_path_for_module(prefix, &self.name, self.is_package, Some(&bytecode_tag))
}
pub fn has_dunder_file(&self) -> Result<bool> {
has_dunder_file(&self.source.resolve_content()?)
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct PythonModuleBytecode {
pub name: String,
bytecode: FileData,
pub optimize_level: BytecodeOptimizationLevel,
pub is_package: bool,
pub cache_tag: String,
pub is_stdlib: bool,
pub is_test: bool,
}
impl PythonModuleBytecode {
pub fn new(
name: &str,
optimize_level: BytecodeOptimizationLevel,
is_package: bool,
cache_tag: &str,
data: &[u8],
) -> Self {
Self {
name: name.to_string(),
bytecode: FileData::Memory(data.to_vec()),
optimize_level,
is_package,
cache_tag: cache_tag.to_string(),
is_stdlib: false,
is_test: false,
}
}
pub fn from_path(
name: &str,
optimize_level: BytecodeOptimizationLevel,
cache_tag: &str,
path: &Path,
) -> Self {
Self {
name: name.to_string(),
bytecode: FileData::Path(path.to_path_buf()),
optimize_level,
is_package: is_package_from_path(path),
cache_tag: cache_tag.to_string(),
is_stdlib: false,
is_test: false,
}
}
pub fn description(&self) -> String {
format!(
"bytecode for Python module {} at O{}",
self.name, self.optimize_level as i32
)
}
pub fn to_memory(&self) -> Result<Self> {
Ok(Self {
name: self.name.clone(),
bytecode: FileData::Memory(self.resolve_bytecode()?),
optimize_level: self.optimize_level,
is_package: self.is_package,
cache_tag: self.cache_tag.clone(),
is_stdlib: self.is_stdlib,
is_test: self.is_test,
})
}
pub fn resolve_bytecode(&self) -> Result<Vec<u8>> {
match &self.bytecode {
FileData::Memory(data) => Ok(data.clone()),
FileData::Path(path) => {
let data = std::fs::read(path)?;
if data.len() >= 16 {
Ok(data[16..data.len()].to_vec())
} else {
Err(anyhow!("bytecode file is too short"))
}
}
}
}
pub fn set_bytecode(&mut self, data: &[u8]) {
self.bytecode = FileData::Memory(data.to_vec());
}
pub fn resolve_path(&self, prefix: &str) -> PathBuf {
let bytecode_tag = match self.optimize_level {
BytecodeOptimizationLevel::Zero => self.cache_tag.clone(),
BytecodeOptimizationLevel::One => format!("{}.opt-1", self.cache_tag),
BytecodeOptimizationLevel::Two => format!("{}.opt-2", self.cache_tag),
};
resolve_path_for_module(prefix, &self.name, self.is_package, Some(&bytecode_tag))
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct PythonPackageResource {
pub leaf_package: String,
pub relative_name: String,
pub data: FileData,
pub is_stdlib: bool,
pub is_test: bool,
}
impl PythonPackageResource {
pub fn description(&self) -> String {
format!("Python package resource {}", self.symbolic_name())
}
pub fn to_memory(&self) -> Result<Self> {
Ok(Self {
leaf_package: self.leaf_package.clone(),
relative_name: self.relative_name.clone(),
data: self.data.to_memory()?,
is_stdlib: self.is_stdlib,
is_test: self.is_test,
})
}
pub fn symbolic_name(&self) -> String {
format!("{}:{}", self.leaf_package, self.relative_name)
}
pub fn resolve_path(&self, prefix: &str) -> PathBuf {
let mut path = PathBuf::from(prefix);
for p in self.leaf_package.split('.') {
path = path.join(p);
}
path = path.join(&self.relative_name);
path
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum PythonPackageDistributionResourceFlavor {
DistInfo,
EggInfo,
}
#[derive(Clone, Debug, PartialEq)]
pub struct PythonPackageDistributionResource {
pub location: PythonPackageDistributionResourceFlavor,
pub package: String,
pub version: String,
pub name: String,
pub data: FileData,
}
impl PythonPackageDistributionResource {
pub fn description(&self) -> String {
format!(
"Python package distribution resource {}:{}",
self.package, self.name
)
}
pub fn to_memory(&self) -> Result<Self> {
Ok(Self {
location: self.location.clone(),
package: self.package.clone(),
version: self.version.clone(),
name: self.name.clone(),
data: self.data.to_memory()?,
})
}
pub fn resolve_path(&self, prefix: &str) -> PathBuf {
let normalized_package = self.package.to_lowercase().replace('-', "_");
let p = match self.location {
PythonPackageDistributionResourceFlavor::DistInfo => {
format!("{}-{}.dist-info", normalized_package, self.version)
}
PythonPackageDistributionResourceFlavor::EggInfo => {
format!("{}-{}.egg-info", normalized_package, self.version)
}
};
PathBuf::from(prefix).join(p).join(&self.name)
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct LibraryDependency {
pub name: String,
pub static_library: Option<FileData>,
pub static_filename: Option<PathBuf>,
pub dynamic_library: Option<FileData>,
pub dynamic_filename: Option<PathBuf>,
pub framework: bool,
pub system: bool,
}
impl LibraryDependency {
pub fn to_memory(&self) -> Result<Self> {
Ok(Self {
name: self.name.clone(),
static_library: if let Some(data) = &self.static_library {
Some(data.to_memory()?)
} else {
None
},
static_filename: self.static_filename.clone(),
dynamic_library: if let Some(data) = &self.dynamic_library {
Some(data.to_memory()?)
} else {
None
},
dynamic_filename: self.dynamic_filename.clone(),
framework: self.framework,
system: self.system,
})
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct SharedLibrary {
pub name: String,
pub data: FileData,
pub filename: Option<PathBuf>,
}
impl TryFrom<&LibraryDependency> for SharedLibrary {
type Error = &'static str;
fn try_from(value: &LibraryDependency) -> Result<Self, Self::Error> {
if let Some(data) = &value.dynamic_library {
Ok(Self {
name: value.name.clone(),
data: data.clone(),
filename: value.dynamic_filename.clone(),
})
} else {
Err("library dependency does not have a shared library")
}
}
}
impl SharedLibrary {
pub fn description(&self) -> String {
format!("shared library {}", self.name)
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct PythonExtensionModule {
pub name: String,
pub init_fn: Option<String>,
pub extension_file_suffix: String,
pub shared_library: Option<FileData>,
pub object_file_data: Vec<FileData>,
pub is_package: bool,
pub link_libraries: Vec<LibraryDependency>,
pub is_stdlib: bool,
pub builtin_default: bool,
pub required: bool,
pub variant: Option<String>,
pub license: Option<LicensedComponent>,
}
impl PythonExtensionModule {
pub fn description(&self) -> String {
format!("Python extension module {}", self.name)
}
pub fn to_memory(&self) -> Result<Self> {
Ok(Self {
name: self.name.clone(),
init_fn: self.init_fn.clone(),
extension_file_suffix: self.extension_file_suffix.clone(),
shared_library: if let Some(data) = &self.shared_library {
Some(data.to_memory()?)
} else {
None
},
object_file_data: self.object_file_data.clone(),
is_package: self.is_package,
link_libraries: self
.link_libraries
.iter()
.map(|l| l.to_memory())
.collect::<Result<Vec<_>, _>>()?,
is_stdlib: self.is_stdlib,
builtin_default: self.builtin_default,
required: self.required,
variant: self.variant.clone(),
license: self.license.clone(),
})
}
pub fn file_name(&self) -> String {
if let Some(idx) = self.name.rfind('.') {
let name = &self.name[idx + 1..self.name.len()];
format!("{}{}", name, self.extension_file_suffix)
} else {
format!("{}{}", self.name, self.extension_file_suffix)
}
}
pub fn resolve_path(&self, prefix: &str) -> PathBuf {
let mut path = PathBuf::from(prefix);
path.extend(self.package_parts());
path.push(self.file_name());
path
}
pub fn package_parts(&self) -> Vec<String> {
if let Some(idx) = self.name.rfind('.') {
let prefix = &self.name[0..idx];
prefix.split('.').map(|x| x.to_string()).collect()
} else {
Vec::new()
}
}
pub fn requires_libraries(&self) -> bool {
!self.link_libraries.is_empty()
}
pub fn is_minimally_required(&self) -> bool {
self.is_stdlib && (self.builtin_default || self.required)
}
pub fn in_libpython(&self) -> bool {
self.is_stdlib && (self.builtin_default || self.shared_library.is_none())
}
pub fn top_level_package(&self) -> &str {
if let Some(idx) = self.name.find('.') {
&self.name[0..idx]
} else {
&self.name
}
}
}
#[derive(Clone, Debug, Default)]
pub struct PythonExtensionModuleVariants {
extensions: Vec<PythonExtensionModule>,
}
impl FromIterator<PythonExtensionModule> for PythonExtensionModuleVariants {
fn from_iter<I: IntoIterator<Item = PythonExtensionModule>>(iter: I) -> Self {
Self {
extensions: Vec::from_iter(iter),
}
}
}
impl PythonExtensionModuleVariants {
pub fn push(&mut self, em: PythonExtensionModule) {
self.extensions.push(em);
}
pub fn is_empty(&self) -> bool {
self.extensions.is_empty()
}
pub fn iter(&self) -> impl Iterator<Item = &PythonExtensionModule> {
self.extensions.iter()
}
pub fn default_variant(&self) -> &PythonExtensionModule {
&self.extensions[0]
}
pub fn choose_variant<S: BuildHasher>(
&self,
variants: &HashMap<String, String, S>,
) -> &PythonExtensionModule {
let mut chosen = self.default_variant();
if let Some(preferred) = variants.get(&chosen.name) {
for em in self.iter() {
if em.variant == Some(preferred.to_string()) {
chosen = em;
break;
}
}
}
chosen
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct PythonEggFile {
pub data: FileData,
}
impl PythonEggFile {
pub fn to_memory(&self) -> Result<Self> {
Ok(Self {
data: self.data.to_memory()?,
})
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct PythonPathExtension {
pub data: FileData,
}
impl PythonPathExtension {
pub fn to_memory(&self) -> Result<Self> {
Ok(Self {
data: self.data.to_memory()?,
})
}
}
#[allow(clippy::large_enum_variant)]
#[derive(Clone, Debug, PartialEq)]
pub enum PythonResource<'a> {
ModuleSource(Cow<'a, PythonModuleSource>),
ModuleBytecodeRequest(Cow<'a, PythonModuleBytecodeFromSource>),
ModuleBytecode(Cow<'a, PythonModuleBytecode>),
PackageResource(Cow<'a, PythonPackageResource>),
PackageDistributionResource(Cow<'a, PythonPackageDistributionResource>),
ExtensionModule(Cow<'a, PythonExtensionModule>),
EggFile(Cow<'a, PythonEggFile>),
PathExtension(Cow<'a, PythonPathExtension>),
File(Cow<'a, File>),
}
impl<'a> PythonResource<'a> {
pub fn full_name(&self) -> String {
match self {
PythonResource::ModuleSource(m) => m.name.clone(),
PythonResource::ModuleBytecode(m) => m.name.clone(),
PythonResource::ModuleBytecodeRequest(m) => m.name.clone(),
PythonResource::PackageResource(resource) => {
format!("{}.{}", resource.leaf_package, resource.relative_name)
}
PythonResource::PackageDistributionResource(resource) => {
format!("{}:{}", resource.package, resource.name)
}
PythonResource::ExtensionModule(em) => em.name.clone(),
PythonResource::EggFile(_) => "".to_string(),
PythonResource::PathExtension(_) => "".to_string(),
PythonResource::File(f) => format!("{}", f.path().display()),
}
}
pub fn is_in_packages(&self, packages: &[String]) -> bool {
let name = match self {
PythonResource::ModuleSource(m) => &m.name,
PythonResource::ModuleBytecode(m) => &m.name,
PythonResource::ModuleBytecodeRequest(m) => &m.name,
PythonResource::PackageResource(resource) => &resource.leaf_package,
PythonResource::PackageDistributionResource(resource) => &resource.package,
PythonResource::ExtensionModule(em) => &em.name,
PythonResource::EggFile(_) => return false,
PythonResource::PathExtension(_) => return false,
PythonResource::File(_) => return false,
};
for package in packages {
if name == package || packages_from_module_name(name).contains(package) {
return true;
}
}
false
}
pub fn to_memory(&self) -> Result<Self> {
Ok(match self {
PythonResource::ModuleSource(m) => m.to_memory()?.into(),
PythonResource::ModuleBytecode(m) => m.to_memory()?.into(),
PythonResource::ModuleBytecodeRequest(m) => m.to_memory()?.into(),
PythonResource::PackageResource(r) => r.to_memory()?.into(),
PythonResource::PackageDistributionResource(r) => r.to_memory()?.into(),
PythonResource::ExtensionModule(m) => m.to_memory()?.into(),
PythonResource::EggFile(e) => e.to_memory()?.into(),
PythonResource::PathExtension(e) => e.to_memory()?.into(),
PythonResource::File(f) => f.to_memory()?.into(),
})
}
}
impl<'a> From<PythonModuleSource> for PythonResource<'a> {
fn from(m: PythonModuleSource) -> Self {
PythonResource::ModuleSource(Cow::Owned(m))
}
}
impl<'a> From<&'a PythonModuleSource> for PythonResource<'a> {
fn from(m: &'a PythonModuleSource) -> Self {
PythonResource::ModuleSource(Cow::Borrowed(m))
}
}
impl<'a> From<PythonModuleBytecodeFromSource> for PythonResource<'a> {
fn from(m: PythonModuleBytecodeFromSource) -> Self {
PythonResource::ModuleBytecodeRequest(Cow::Owned(m))
}
}
impl<'a> From<&'a PythonModuleBytecodeFromSource> for PythonResource<'a> {
fn from(m: &'a PythonModuleBytecodeFromSource) -> Self {
PythonResource::ModuleBytecodeRequest(Cow::Borrowed(m))
}
}
impl<'a> From<PythonModuleBytecode> for PythonResource<'a> {
fn from(m: PythonModuleBytecode) -> Self {
PythonResource::ModuleBytecode(Cow::Owned(m))
}
}
impl<'a> From<&'a PythonModuleBytecode> for PythonResource<'a> {
fn from(m: &'a PythonModuleBytecode) -> Self {
PythonResource::ModuleBytecode(Cow::Borrowed(m))
}
}
impl<'a> From<PythonPackageResource> for PythonResource<'a> {
fn from(r: PythonPackageResource) -> Self {
PythonResource::PackageResource(Cow::Owned(r))
}
}
impl<'a> From<&'a PythonPackageResource> for PythonResource<'a> {
fn from(r: &'a PythonPackageResource) -> Self {
PythonResource::PackageResource(Cow::Borrowed(r))
}
}
impl<'a> From<PythonPackageDistributionResource> for PythonResource<'a> {
fn from(r: PythonPackageDistributionResource) -> Self {
PythonResource::PackageDistributionResource(Cow::Owned(r))
}
}
impl<'a> From<&'a PythonPackageDistributionResource> for PythonResource<'a> {
fn from(r: &'a PythonPackageDistributionResource) -> Self {
PythonResource::PackageDistributionResource(Cow::Borrowed(r))
}
}
impl<'a> From<PythonExtensionModule> for PythonResource<'a> {
fn from(r: PythonExtensionModule) -> Self {
PythonResource::ExtensionModule(Cow::Owned(r))
}
}
impl<'a> From<&'a PythonExtensionModule> for PythonResource<'a> {
fn from(r: &'a PythonExtensionModule) -> Self {
PythonResource::ExtensionModule(Cow::Borrowed(r))
}
}
impl<'a> From<PythonEggFile> for PythonResource<'a> {
fn from(e: PythonEggFile) -> Self {
PythonResource::EggFile(Cow::Owned(e))
}
}
impl<'a> From<&'a PythonEggFile> for PythonResource<'a> {
fn from(e: &'a PythonEggFile) -> Self {
PythonResource::EggFile(Cow::Borrowed(e))
}
}
impl<'a> From<PythonPathExtension> for PythonResource<'a> {
fn from(e: PythonPathExtension) -> Self {
PythonResource::PathExtension(Cow::Owned(e))
}
}
impl<'a> From<&'a PythonPathExtension> for PythonResource<'a> {
fn from(e: &'a PythonPathExtension) -> Self {
PythonResource::PathExtension(Cow::Borrowed(e))
}
}
impl<'a> From<File> for PythonResource<'a> {
fn from(f: File) -> Self {
PythonResource::File(Cow::Owned(f))
}
}
impl<'a> From<&'a File> for PythonResource<'a> {
fn from(f: &'a File) -> Self {
PythonResource::File(Cow::Borrowed(f))
}
}
#[cfg(test)]
mod tests {
use super::*;
const DEFAULT_CACHE_TAG: &str = "cpython-39";
#[test]
fn test_is_in_packages() {
let source = PythonResource::ModuleSource(Cow::Owned(PythonModuleSource {
name: "foo".to_string(),
source: FileData::Memory(vec![]),
is_package: false,
cache_tag: DEFAULT_CACHE_TAG.to_string(),
is_stdlib: false,
is_test: false,
}));
assert!(source.is_in_packages(&["foo".to_string()]));
assert!(!source.is_in_packages(&[]));
assert!(!source.is_in_packages(&["bar".to_string()]));
let bytecode = PythonResource::ModuleBytecode(Cow::Owned(PythonModuleBytecode {
name: "foo".to_string(),
bytecode: FileData::Memory(vec![]),
optimize_level: BytecodeOptimizationLevel::Zero,
is_package: false,
cache_tag: DEFAULT_CACHE_TAG.to_string(),
is_stdlib: false,
is_test: false,
}));
assert!(bytecode.is_in_packages(&["foo".to_string()]));
assert!(!bytecode.is_in_packages(&[]));
assert!(!bytecode.is_in_packages(&["bar".to_string()]));
}
#[test]
fn package_distribution_resources_path_normalization() {
let mut r = PythonPackageDistributionResource {
location: PythonPackageDistributionResourceFlavor::DistInfo,
package: "FoO-Bar".into(),
version: "1.0".into(),
name: "resource.txt".into(),
data: vec![42].into(),
};
assert_eq!(
r.resolve_path("prefix"),
PathBuf::from("prefix")
.join("foo_bar-1.0.dist-info")
.join("resource.txt")
);
r.location = PythonPackageDistributionResourceFlavor::EggInfo;
assert_eq!(
r.resolve_path("prefix"),
PathBuf::from("prefix")
.join("foo_bar-1.0.egg-info")
.join("resource.txt")
);
}
}