use serde_json::Value;
use std::collections::{HashMap, HashSet};
use std::fs;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::Path;
use std::process::Command;
use toml::Value as TomlValue;
use crate::config::FeludaConfig;
use crate::debug::{log, log_debug, log_error, LogLevel};
use crate::licenses::{
fetch_licenses_from_github, is_license_restrictive, LicenseCompatibility, LicenseInfo,
};
#[derive(Debug, Clone, PartialEq)]
pub struct EnvironmentMarker {
pub raw: String,
pub components: Vec<MarkerComponent>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct MarkerComponent {
pub variable: String,
pub operator: String,
pub value: String,
}
impl EnvironmentMarker {
fn parse(marker_str: &str) -> Option<Self> {
let marker_str = marker_str.trim();
if marker_str.is_empty() {
return None;
}
let raw = marker_str.to_string();
let components = parse_marker_components(marker_str);
Some(EnvironmentMarker { raw, components })
}
pub fn describe(&self) -> String {
if self.components.is_empty() {
return self.raw.clone();
}
let mut descriptions = Vec::new();
for component in &self.components {
descriptions.push(format!(
"{} {} '{}'",
component.variable, component.operator, component.value
));
}
descriptions.join(" and ")
}
#[allow(dead_code)]
pub fn applies_to_environment(&self) -> bool {
true
}
}
fn parse_marker_components(marker_str: &str) -> Vec<MarkerComponent> {
let mut components = Vec::new();
let conditions: Vec<&str> = marker_str.split(" and ").collect();
for condition in conditions {
let condition = condition.trim();
let operators = vec!["!=", "==", "<=", ">=", "<", ">", "in", "not"];
for op in operators {
if let Some(parts) = condition.split_once(op) {
let variable = parts.0.trim().to_string();
let value_part = parts.1.trim();
let value = value_part.trim_matches('\'').trim_matches('"').to_string();
if !variable.is_empty() && !value.is_empty() {
components.push(MarkerComponent {
variable,
operator: op.to_string(),
value,
});
break;
}
}
}
}
components
}
pub fn analyze_python_licenses(package_file_path: &str, config: &FeludaConfig) -> Vec<LicenseInfo> {
let mut licenses = Vec::new();
log(
LogLevel::Info,
&format!("Analyzing Python dependencies from: {package_file_path}"),
);
let known_licenses = match fetch_licenses_from_github() {
Ok(licenses) => {
log(
LogLevel::Info,
&format!("Fetched {} known licenses from GitHub", licenses.len()),
);
licenses
}
Err(err) => {
log_error("Failed to fetch licenses from GitHub", &err);
HashMap::new()
}
};
if package_file_path.ends_with("pyproject.toml") {
match fs::read_to_string(package_file_path) {
Ok(content) => match toml::from_str::<TomlValue>(&content) {
Ok(toml_config) => {
if let Some(project) = toml_config.as_table().and_then(|t| t.get("project")) {
if let Some(deps) = project
.as_table()
.and_then(|t| t.get("dependencies"))
.and_then(|d| d.as_array())
{
log(
LogLevel::Info,
&format!("Found {} Python dependencies", deps.len()),
);
log_debug("Dependencies", deps);
let mut direct_deps = Vec::new();
for dep in deps {
if let Some(dep_str) = dep.as_str() {
let (name, version) = if let Some((n, v)) = dep_str
.split_once("==")
.or_else(|| dep_str.split_once(">="))
.or_else(|| dep_str.split_once(">"))
.or_else(|| dep_str.split_once("~="))
.or_else(|| dep_str.split_once("<="))
.or_else(|| dep_str.split_once("<"))
{
(n.trim(), v.trim())
} else {
(dep_str.trim(), "latest")
};
direct_deps.push((name.to_string(), version.to_string()));
}
}
let max_depth = config.dependencies.max_depth;
log(
LogLevel::Info,
&format!("Using max dependency depth: {max_depth}"),
);
let all_deps = resolve_python_dependencies(
&direct_deps,
package_file_path,
max_depth,
);
for (name, version) in all_deps {
log(
LogLevel::Info,
&format!("Processing dependency: {name} ({version})"),
);
let license_result =
fetch_license_for_python_dependency(&name, &version);
let license = Some(license_result);
let is_restrictive = is_license_restrictive(
&license,
&known_licenses,
config.strict,
);
if is_restrictive {
log(
LogLevel::Warn,
&format!(
"Restrictive license found: {license:?} for {name}"
),
);
}
licenses.push(LicenseInfo {
name,
version,
license: license.clone(),
is_restrictive,
compatibility: LicenseCompatibility::Unknown,
osi_status: match &license {
Some(l) => crate::licenses::get_osi_status(l),
None => crate::licenses::OsiStatus::Unknown,
},
});
}
} else {
log(
LogLevel::Warn,
"Failed to find dependencies in pyproject.toml",
);
}
} else {
log(
LogLevel::Warn,
"No 'project' section found in pyproject.toml",
);
}
}
Err(err) => {
log_error("Failed to parse pyproject.toml", &err);
}
},
Err(err) => {
log_error("Failed to read pyproject.toml file", &err);
}
}
} else {
log(LogLevel::Info, "Processing requirements.txt format");
match File::open(package_file_path) {
Ok(file) => {
let reader = BufReader::new(file);
let mut direct_deps = Vec::new();
for line_result in reader.lines() {
match line_result {
Ok(line) => {
let line = line.trim();
if line.is_empty() || line.starts_with('#') {
continue;
}
if let Some((name, version)) = parse_requirement_line(line) {
direct_deps.push((name, version));
} else {
log(LogLevel::Warn, &format!("Invalid requirement line: {line}"));
}
}
Err(err) => {
log_error("Failed to read line from requirements.txt", &err);
}
}
}
log(
LogLevel::Info,
&format!(
"Found {} direct requirements in requirements.txt",
direct_deps.len()
),
);
let max_depth = config.dependencies.max_depth;
log(
LogLevel::Info,
&format!("Using max dependency depth: {max_depth}"),
);
let all_deps =
resolve_python_dependencies(&direct_deps, package_file_path, max_depth);
for (name, version) in all_deps {
log(
LogLevel::Info,
&format!("Processing dependency: {name} ({version})"),
);
let license_result = fetch_license_for_python_dependency(&name, &version);
let license = Some(license_result);
let is_restrictive =
is_license_restrictive(&license, &known_licenses, config.strict);
if is_restrictive {
log(
LogLevel::Warn,
&format!("Restrictive license found: {license:?} for {name}"),
);
}
licenses.push(LicenseInfo {
name,
version,
license: license.clone(),
is_restrictive,
compatibility: LicenseCompatibility::Unknown,
osi_status: match &license {
Some(l) => crate::licenses::get_osi_status(l),
None => crate::licenses::OsiStatus::Unknown,
},
});
}
log(
LogLevel::Info,
&format!(
"Processed {} total dependencies (including transitive)",
licenses.len()
),
);
}
Err(err) => {
log_error("Failed to open requirements.txt file", &err);
}
}
}
log(
LogLevel::Info,
&format!("Found {} Python dependencies with licenses", licenses.len()),
);
licenses
}
pub fn fetch_license_for_python_dependency(name: &str, version: &str) -> String {
if let Some(license) = get_license_from_local_site_packages(name) {
log(
LogLevel::Info,
&format!("Found license in local site-packages for {name}: {license}"),
);
return license;
}
fetch_license_from_pypi(name, version)
}
fn get_license_from_local_site_packages(package_name: &str) -> Option<String> {
let python_paths = get_python_site_packages_paths();
for site_packages in python_paths {
if let Some(license) = check_site_package_metadata(&site_packages, package_name) {
return Some(license);
}
if let Some(license) = check_site_package_license_file(&site_packages, package_name) {
return Some(license);
}
}
None
}
fn get_python_site_packages_paths() -> Vec<std::path::PathBuf> {
let mut paths = Vec::new();
if let Ok(output) = Command::new("python3")
.args([
"-c",
"import site; print('\\n'.join(site.getsitepackages()))",
])
.output()
{
if output.status.success() {
let stdout = String::from_utf8_lossy(&output.stdout);
for line in stdout.lines() {
paths.push(std::path::PathBuf::from(line.trim()));
}
}
}
if let Ok(output) = Command::new("python")
.args([
"-c",
"import site; print('\\n'.join(site.getsitepackages()))",
])
.output()
{
if output.status.success() {
let stdout = String::from_utf8_lossy(&output.stdout);
for line in stdout.lines() {
let path = std::path::PathBuf::from(line.trim());
if !paths.contains(&path) {
paths.push(path);
}
}
}
}
paths
}
fn check_site_package_metadata(site_packages: &Path, package_name: &str) -> Option<String> {
let metadata_file = site_packages
.join(format!("{package_name}.dist-info"))
.join("METADATA");
if metadata_file.exists() {
if let Ok(content) = fs::read_to_string(&metadata_file) {
for line in content.lines() {
if line.starts_with("License:") {
if let Some(license) = line.strip_prefix("License:") {
let license = license.trim();
if !license.is_empty() && license != "UNKNOWN" {
return Some(license.to_string());
}
}
}
}
}
}
let normalized_name = package_name.replace('-', "_");
let metadata_file_normalized = site_packages
.join(format!("{normalized_name}.dist-info"))
.join("METADATA");
if metadata_file_normalized.exists() {
if let Ok(content) = fs::read_to_string(&metadata_file_normalized) {
for line in content.lines() {
if line.starts_with("License:") {
if let Some(license) = line.strip_prefix("License:") {
let license = license.trim();
if !license.is_empty() && license != "UNKNOWN" {
return Some(license.to_string());
}
}
}
}
}
}
None
}
fn check_site_package_license_file(site_packages: &Path, package_name: &str) -> Option<String> {
let package_dirs = vec![
site_packages.join(package_name),
site_packages.join(package_name.replace('-', "_")),
];
let license_files = [
"LICENSE",
"LICENSE.txt",
"LICENSE.md",
"COPYING",
"COPYING.md",
];
for package_dir in package_dirs {
if !package_dir.exists() {
continue;
}
for license_file in &license_files {
let license_path = package_dir.join(license_file);
if license_path.exists() {
if let Ok(content) = fs::read_to_string(&license_path) {
if let Some(license) = detect_license_from_content(&content) {
return Some(license);
}
}
}
}
}
None
}
fn detect_license_from_content(content: &str) -> Option<String> {
let content_upper = content.to_uppercase();
let patterns = vec![
("MIT", "MIT License"),
("APACHE", "Apache License"),
("GPL", "GPL"),
("BSD", "BSD"),
("ISC", "ISC License"),
("LGPL", "LGPL"),
("UNLICENSE", "Unlicense"),
("MPL", "Mozilla Public License"),
];
for (pattern, label) in patterns {
if content_upper.contains(pattern) {
return Some(label.to_string());
}
}
None
}
fn fetch_license_from_pypi(name: &str, version: &str) -> String {
let api_url = format!("https://pypi.org/pypi/{name}/{version}/json");
log(
LogLevel::Info,
&format!("Fetching license from PyPI: {api_url}"),
);
match reqwest::blocking::get(&api_url) {
Ok(response) => {
let status = response.status();
log(
LogLevel::Info,
&format!("PyPI API response status: {status}"),
);
if status.is_success() {
match response.json::<Value>() {
Ok(json) => match json["info"]["license"].as_str() {
Some(license_str) if !license_str.is_empty() => {
log(
LogLevel::Info,
&format!("License found for {name}: {license_str}"),
);
license_str.to_string()
}
_ => {
log(
LogLevel::Warn,
&format!("No license found for {name} ({version})"),
);
format!("Unknown license for {name}: {version}")
}
},
Err(err) => {
log_error(&format!("Failed to parse JSON for {name}: {version}"), &err);
String::from("Unknown")
}
}
} else {
log(
LogLevel::Error,
&format!("Failed to fetch metadata for {name}: HTTP {status}"),
);
String::from("Unknown")
}
}
Err(err) => {
log_error(&format!("Failed to fetch metadata for {name}"), &err);
String::from("Unknown")
}
}
}
fn parse_requirement_line(line: &str) -> Option<(String, String)> {
let line = line.trim();
let (base_req, marker) = if let Some((base, marker_str)) = line.split_once(';') {
(base.trim(), EnvironmentMarker::parse(marker_str))
} else {
(line, None)
};
if let Some(marker) = &marker {
log_debug(
"Environment Marker (requirements.txt)",
&format!("Detected marker: {} -> {}", marker.raw, marker.describe()),
);
}
if let Some((name, version)) = base_req
.split_once("==")
.or_else(|| base_req.split_once(">="))
.or_else(|| base_req.split_once(">"))
.or_else(|| base_req.split_once("~="))
.or_else(|| base_req.split_once("<="))
.or_else(|| base_req.split_once("<"))
{
let name = name.trim();
let version = version
.trim()
.trim_matches('"')
.trim_matches('\'')
.replace("^", "")
.replace("~", "");
Some((name.to_string(), version))
} else {
Some((base_req.to_string(), "latest".to_string()))
}
}
fn resolve_python_dependencies(
direct_deps: &[(String, String)],
package_file_path: &str,
max_depth: u32,
) -> Vec<(String, String)> {
log(
LogLevel::Info,
&format!("Resolving Python dependencies (including transitive up to depth {max_depth})"),
);
if let Ok(uv_deps) = resolve_with_uv(package_file_path, max_depth) {
if !uv_deps.is_empty() {
log(
LogLevel::Info,
&format!(
"Resolved {} dependencies using uv (depth {})",
uv_deps.len(),
max_depth
),
);
return uv_deps;
}
}
log(
LogLevel::Info,
"Falling back to PyPI-based transitive dependency resolution",
);
resolve_with_pypi(direct_deps, max_depth)
}
fn resolve_with_uv(
package_file_path: &str,
max_depth: u32,
) -> Result<Vec<(String, String)>, String> {
let project_dir = Path::new(package_file_path)
.parent()
.ok_or("Cannot determine project directory")?;
log(
LogLevel::Info,
&format!("Attempting to resolve dependencies with uv (max depth: {max_depth})"),
);
if let Ok(output) = Command::new("uv")
.args(["lock", "--dry-run"])
.current_dir(project_dir)
.output()
{
if output.status.success() {
let lock_file = project_dir.join("uv.lock");
if lock_file.exists() {
if let Ok(deps) = parse_uv_lock(&lock_file, max_depth) {
log(
LogLevel::Info,
&format!("Resolved {} dependencies from uv.lock", deps.len()),
);
return Ok(deps);
}
}
}
}
if let Ok(output) = Command::new("uv")
.args(["pip", "compile", "--dry-run", package_file_path])
.current_dir(project_dir)
.output()
{
if output.status.success() {
let stdout_str = String::from_utf8_lossy(&output.stdout);
let deps = parse_pip_compile_output(&stdout_str);
log(
LogLevel::Info,
&format!(
"Resolved {} dependencies from pip-compile output",
deps.len()
),
);
return Ok(deps);
}
}
Err("uv resolution failed".to_string())
}
fn parse_uv_lock(lock_file: &Path, max_depth: u32) -> Result<Vec<(String, String)>, String> {
let content =
fs::read_to_string(lock_file).map_err(|e| format!("Failed to read uv.lock: {e}"))?;
let lock_data: TomlValue =
toml::from_str(&content).map_err(|e| format!("Failed to parse uv.lock: {e}"))?;
let mut deps = Vec::new();
log(
LogLevel::Info,
&format!("Parsing uv.lock with max depth {max_depth}"),
);
if let Some(packages) = lock_data.get("package").and_then(|p| p.as_array()) {
for package in packages {
if let Some(package_table) = package.as_table() {
if let (Some(name), Some(version)) = (
package_table.get("name").and_then(|n| n.as_str()),
package_table.get("version").and_then(|v| v.as_str()),
) {
deps.push((name.to_string(), version.to_string()));
}
}
}
log(
LogLevel::Info,
&format!(
"Extracted {} dependencies from uv.lock (all depths included)",
deps.len()
),
);
}
Ok(deps)
}
fn parse_pip_compile_output(output: &str) -> Vec<(String, String)> {
let mut deps = Vec::new();
for line in output.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') {
continue;
}
if let Some((name, version)) = parse_requirement_line(line) {
deps.push((name, version));
}
}
deps
}
fn resolve_with_pypi(direct_deps: &[(String, String)], max_depth: u32) -> Vec<(String, String)> {
let mut all_deps = HashMap::new();
let mut processed = HashSet::new();
let mut to_process: Vec<(String, String, u32)> = direct_deps
.iter()
.map(|(name, version)| (name.clone(), version.clone(), 0))
.collect();
log(
LogLevel::Info,
&format!(
"Starting PyPI-based resolution with {} direct dependencies (max depth: {})",
direct_deps.len(),
max_depth
),
);
let mut depth_stats = HashMap::new();
while let Some((name, version, depth)) = to_process.pop() {
let key = format!("{name}@{version}");
if processed.contains(&key) {
continue;
}
if depth >= max_depth {
log(
LogLevel::Info,
&format!("Skipping {name}@{version} - exceeded max depth {max_depth}"),
);
continue;
}
processed.insert(key);
all_deps.insert(name.clone(), version.clone());
*depth_stats.entry(depth).or_insert(0) += 1;
log(
LogLevel::Info,
&format!("Resolving dependencies for: {name}@{version} (depth {depth})"),
);
if let Ok(transitive_deps) = fetch_pypi_dependencies(&name, &version) {
log(
LogLevel::Info,
&format!(
"Found {} transitive dependencies for {} at depth {}",
transitive_deps.len(),
name,
depth
),
);
for (dep_name, dep_version) in transitive_deps {
let dep_key = format!("{dep_name}@{dep_version}");
if !processed.contains(&dep_key) {
to_process.push((dep_name, dep_version, depth + 1));
}
}
}
}
for depth in 0..=max_depth {
if let Some(count) = depth_stats.get(&depth) {
log(
LogLevel::Info,
&format!("Depth {depth}: {count} dependencies"),
);
}
}
log(
LogLevel::Info,
&format!(
"PyPI resolution completed. Total dependencies: {} (explored up to depth {})",
all_deps.len(),
max_depth
),
);
all_deps.into_iter().collect()
}
fn fetch_pypi_dependencies(name: &str, version: &str) -> Result<Vec<(String, String)>, String> {
let api_url = format!("https://pypi.org/pypi/{name}/{version}/json");
match reqwest::blocking::get(&api_url) {
Ok(response) => {
if response.status().is_success() {
if let Ok(json) = response.json::<Value>() {
let mut deps = Vec::new();
if let Some(requires_dist) = json["info"]["requires_dist"].as_array() {
for req in requires_dist {
if let Some(req_str) = req.as_str() {
if let Some((dep_name, dep_version)) =
parse_pypi_requirement(req_str)
{
deps.push((dep_name, dep_version));
}
}
}
}
return Ok(deps);
}
}
}
Err(err) => {
log_error(&format!("Failed to fetch dependencies for {name}"), &err);
}
}
Ok(Vec::new())
}
fn parse_pypi_requirement(req_str: &str) -> Option<(String, String)> {
let req_str = req_str.trim();
let (base_req, marker) = if let Some((base, marker_str)) = req_str.split_once(';') {
(base.trim(), EnvironmentMarker::parse(marker_str))
} else {
(req_str, None)
};
if let Some(marker) = &marker {
log_debug(
"Environment Marker",
&format!("Detected marker: {} -> {}", marker.raw, marker.describe()),
);
}
let mut chars = base_req.chars().peekable();
let mut name = String::new();
while let Some(ch) = chars.peek() {
if ">=<!~=()[".contains(*ch) || ch.is_whitespace() {
break;
}
if let Some(ch) = chars.next() {
name.push(ch);
}
}
let name = name.trim().to_string();
if name.is_empty() {
return None;
}
while let Some(ch) = chars.peek() {
if *ch == '[' {
chars.next(); for ch in chars.by_ref() {
if ch == ']' {
break;
}
}
} else {
break;
}
}
while let Some(ch) = chars.peek() {
if ch.is_whitespace() || *ch == '(' {
chars.next();
} else {
break;
}
}
let remaining: String = chars.collect();
let remaining = remaining.trim_end_matches(')').trim();
if remaining.is_empty() {
return Some((name, "latest".to_string()));
}
let constraints: Vec<&str> = remaining.split(',').collect();
let mut best_version = "latest";
for constraint in &constraints {
if let Some((_operator, version_part)) = parse_version_constraint(constraint.trim()) {
if constraint.trim().starts_with(">=") || constraint.trim().starts_with("==") {
best_version = version_part.trim();
break;
} else if best_version == "latest" {
best_version = version_part.trim();
}
}
}
Some((name, best_version.to_string()))
}
fn parse_version_constraint(constraint: &str) -> Option<(&str, &str)> {
let constraint = constraint.trim();
if let Some(version) = constraint.strip_prefix(">=") {
Some((">=", version.trim()))
} else if let Some(version) = constraint.strip_prefix("<=") {
Some(("<=", version.trim()))
} else if let Some(version) = constraint.strip_prefix("==") {
Some(("==", version.trim()))
} else if let Some(version) = constraint.strip_prefix("~=") {
Some(("~=", version.trim()))
} else if let Some(version) = constraint.strip_prefix("!=") {
Some(("!=", version.trim()))
} else if let Some(version) = constraint.strip_prefix(">") {
Some((">", version.trim()))
} else {
constraint
.strip_prefix("<")
.map(|version| ("<", version.trim()))
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_analyze_python_licenses_pyproject_toml() {
let temp_dir = TempDir::new().unwrap();
let pyproject_toml_path = temp_dir.path().join("pyproject.toml");
std::fs::write(
&pyproject_toml_path,
r#"[project]
name = "test-project"
version = "0.1.0"
dependencies = [
"requests>=2.31.0",
"flask~=2.0.0"
]
"#,
)
.unwrap();
let config = FeludaConfig::default();
let result = analyze_python_licenses(pyproject_toml_path.to_str().unwrap(), &config);
assert!(!result.is_empty());
assert!(result.iter().any(|info| info.name == "requests"));
assert!(result.iter().any(|info| info.name == "flask"));
}
#[test]
fn test_analyze_python_licenses_empty_file() {
let temp_dir = TempDir::new().unwrap();
let requirements_path = temp_dir.path().join("requirements.txt");
std::fs::write(&requirements_path, "").unwrap();
let config = FeludaConfig::default();
let result = analyze_python_licenses(requirements_path.to_str().unwrap(), &config);
assert!(result.is_empty());
}
#[test]
fn test_analyze_python_licenses_invalid_format() {
let temp_dir = TempDir::new().unwrap();
let requirements_path = temp_dir.path().join("requirements.txt");
std::fs::write(&requirements_path, "# This is a comment\n\n").unwrap();
let config = FeludaConfig::default();
let result = analyze_python_licenses(requirements_path.to_str().unwrap(), &config);
assert!(result.is_empty());
}
#[test]
fn test_analyze_python_licenses_packages_without_versions() {
let temp_dir = TempDir::new().unwrap();
let requirements_path = temp_dir.path().join("requirements.txt");
std::fs::write(
&requirements_path,
"requests\nflask\n# This is a comment\nnumpy",
)
.unwrap();
let config = FeludaConfig::default();
let result = analyze_python_licenses(requirements_path.to_str().unwrap(), &config);
assert!(!result.is_empty());
assert!(result.iter().any(|info| info.name == "requests"));
assert!(result.iter().any(|info| info.name == "flask"));
assert!(result.iter().any(|info| info.name == "numpy"));
}
#[test]
fn test_fetch_license_for_python_dependency_error_handling() {
let result =
fetch_license_for_python_dependency("definitely_nonexistent_package_12345", "1.0.0");
assert!(result.contains("Unknown") || result.contains("nonexistent"));
}
#[test]
fn test_parse_requirement_line() {
assert_eq!(
parse_requirement_line("requests==2.31.0"),
Some(("requests".to_string(), "2.31.0".to_string()))
);
assert_eq!(
parse_requirement_line("flask>=2.0.0"),
Some(("flask".to_string(), "2.0.0".to_string()))
);
assert_eq!(
parse_requirement_line("django"),
Some(("django".to_string(), "latest".to_string()))
);
}
#[test]
fn test_parse_pypi_requirement() {
assert_eq!(
parse_pypi_requirement("requests>=2.20.0"),
Some(("requests".to_string(), "2.20.0".to_string()))
);
assert_eq!(
parse_pypi_requirement("typing-extensions>=3.7.4; python_version < '3.8'"),
Some(("typing-extensions".to_string(), "3.7.4".to_string()))
);
assert_eq!(
parse_pypi_requirement("flask"),
Some(("flask".to_string(), "latest".to_string()))
);
assert_eq!(
parse_pypi_requirement("urllib3 (<3,>=1.21.1)"),
Some(("urllib3".to_string(), "1.21.1".to_string()))
);
assert_eq!(
parse_pypi_requirement("chardet (<6,>=3.0.2)"),
Some(("chardet".to_string(), "3.0.2".to_string()))
);
assert_eq!(
parse_pypi_requirement("PySocks (!=1.5.7,>=1.5.6)"),
Some(("PySocks".to_string(), "1.5.6".to_string()))
);
}
#[test]
fn test_parse_environment_markers() {
let marker1 = EnvironmentMarker::parse("python_version < '3.8'");
assert!(marker1.is_some());
if let Some(m) = marker1 {
assert!(!m.components.is_empty());
assert!(m.components[0].variable.contains("python_version"));
assert_eq!(m.components[0].operator, "<");
assert_eq!(m.components[0].value, "3.8");
}
let marker2 = EnvironmentMarker::parse("sys_platform == 'win32'");
assert!(marker2.is_some());
if let Some(m) = marker2 {
assert!(!m.components.is_empty());
assert_eq!(m.components[0].operator, "==");
assert_eq!(m.components[0].value, "win32");
}
let marker3 = EnvironmentMarker::parse("python_version >= '3.6' and os_name == 'nt'");
assert!(marker3.is_some());
if let Some(m) = marker3 {
assert_eq!(m.components.len(), 2);
}
let marker4 = EnvironmentMarker::parse("");
assert!(marker4.is_none());
}
#[test]
fn test_parse_requirement_with_markers() {
assert_eq!(
parse_requirement_line("requests==2.31.0"),
Some(("requests".to_string(), "2.31.0".to_string()))
);
assert_eq!(
parse_requirement_line("django>=3.2; python_version >= '3.8'"),
Some(("django".to_string(), "3.2".to_string()))
);
assert_eq!(
parse_requirement_line("numpy; sys_platform == 'linux'"),
Some(("numpy".to_string(), "latest".to_string()))
);
assert_eq!(
parse_requirement_line("scipy>=1.7,<2.0; python_version >= '3.9'"),
Some(("scipy".to_string(), "1.7,<2.0".to_string()))
);
}
#[test]
fn test_parse_pypi_requirement_with_extras() {
assert_eq!(
parse_pypi_requirement("requests[security]>=2.20.0"),
Some(("requests".to_string(), "2.20.0".to_string()))
);
assert_eq!(
parse_pypi_requirement("package[extra1,extra2]>=1.0; python_version >= '3.7'"),
Some(("package".to_string(), "1.0".to_string()))
);
}
#[test]
fn test_marker_description() {
let marker = EnvironmentMarker::parse("python_version < '3.8' and sys_platform == 'win32'");
assert!(marker.is_some());
if let Some(m) = marker {
let description = m.describe();
assert!(description.contains("python_version"));
assert!(description.contains("sys_platform"));
}
}
#[test]
fn test_marker_applies_to_environment() {
let marker1 = EnvironmentMarker::parse("python_version < '3.8'");
assert!(marker1.unwrap().applies_to_environment());
let marker2 = EnvironmentMarker::parse("sys_platform == 'darwin'");
assert!(marker2.unwrap().applies_to_environment());
}
}