#[mockall_double::double]
use crate::dep_resolution::res;
use crate::dep_resolution::WarehouseRelease;
use crate::dep_types::Extras;
use crate::{
commands,
dep_types::{Constraint, DependencyError, Req, ReqType, Version},
files,
install::{self, PackageType},
py_versions, CliConfig,
};
use ini::Ini;
use regex::Regex;
use serde::Deserialize;
use std::io::{self, BufRead, BufReader, Read, Write};
use std::str::FromStr;
use std::{
collections::HashMap,
env, fs,
path::{Path, PathBuf},
process, thread, time,
};
use tar::Archive;
use termcolor::{Color, ColorSpec, StandardStream, WriteColor};
use xz2::read::XzDecoder;
#[derive(Debug)]
pub struct Paths {
pub bin: PathBuf,
pub lib: PathBuf,
pub entry_pt: PathBuf,
pub cache: PathBuf,
}
#[derive(Debug, Default)]
pub struct Metadata {
pub name: String,
pub summary: Option<String>,
pub version: Version,
pub author: Option<String>,
pub author_email: Option<String>,
pub license: Option<String>,
pub keywords: Vec<String>,
pub platform: Option<String>,
pub requires_dist: Vec<Req>,
}
#[derive(Copy, Clone, Debug, Deserialize, PartialEq)]
pub enum Os {
Linux32,
Linux,
Windows32,
Windows,
Mac,
Any,
}
impl FromStr for Os {
type Err = DependencyError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let re_linux32 = Regex::new(r"(many)?linux.*i686").unwrap();
let re_linux = Regex::new(r"((many)?linux.*|cygwin|(open)?bsd6*)").unwrap();
let re_win = Regex::new(r"^win(dows|_amd64)?").unwrap();
let re_mac = Regex::new(r"(macosx.*|darwin|.*mac.*)").unwrap();
Ok(match s {
x if re_linux32.is_match(x) => Self::Linux32,
x if re_linux.is_match(x) => Self::Linux,
"win32" => Self::Windows32,
x if re_win.is_match(x) => Self::Windows,
x if re_mac.is_match(x) => Self::Mac,
"any" => Self::Any,
_ => {
return Err(DependencyError::new(&format!("Problem parsing Os: {}", s)));
}
})
}
}
pub fn print_color(message: &str, color: Color) {
if let Err(_e) = print_color_res(message, color) {
panic!("Error printing in color")
}
}
fn print_color_res(message: &str, color: Color) -> io::Result<()> {
let mut stdout = StandardStream::stdout(CliConfig::current().color_choice);
stdout.set_color(ColorSpec::new().set_fg(Some(color)))?;
writeln!(&mut stdout, "{}", message)?;
stdout.reset()?;
Ok(())
}
pub fn print_color_(message: &str, color: Color) {
if let Err(_e) = print_color_res_(message, color) {
panic!("Error printing in color")
}
}
fn print_color_res_(message: &str, color: Color) -> io::Result<()> {
let mut stdout = StandardStream::stdout(CliConfig::current().color_choice);
stdout.set_color(ColorSpec::new().set_fg(Some(color)))?;
write!(&mut stdout, "{}", message)?;
stdout.reset()?;
Ok(())
}
pub fn abort(message: &str) {
print_color(message, Color::Red);
process::exit(1)
}
pub fn find_venvs(pypackages_dir: &Path) -> Vec<(u32, u32)> {
let py_versions: &[(u32, u32)] = &[
(2, 6),
(2, 7),
(2, 8),
(2, 9),
(3, 0),
(3, 1),
(3, 2),
(3, 3),
(3, 4),
(3, 5),
(3, 6),
(3, 7),
(3, 8),
(3, 9),
(3, 10),
(3, 11),
(3, 12),
];
let mut result = vec![];
for (maj, mi) in py_versions.iter() {
let venv_path = pypackages_dir.join(&format!("{}.{}/.venv", maj, mi));
if venv_path.join("bin/python").exists() || venv_path.join("Scripts/python.exe").exists() {
result.push((*maj, *mi))
}
}
result
}
pub fn find_bin_path(vers_path: &Path) -> PathBuf {
#[cfg(target_os = "windows")]
return vers_path.join(".venv/Scripts");
#[cfg(target_os = "linux")]
return vers_path.join(".venv/bin");
#[cfg(target_os = "macos")]
return vers_path.join(".venv/bin");
}
pub fn wait_for_dirs(dirs: &[PathBuf]) -> Result<(), crate::py_versions::AliasError> {
let timeout = 1000; for _ in 0..timeout {
let mut all_created = true;
for dir in dirs {
if !dir.exists() {
all_created = false;
}
}
if all_created {
return Ok(());
}
thread::sleep(time::Duration::from_millis(10));
}
Err(crate::py_versions::AliasError {
details: "Timed out attempting to create a directory".to_string(),
})
}
pub fn set_pythonpath(paths: &[PathBuf]) {
let formatted_paths = paths
.iter()
.map(|p| p.to_str().unwrap())
.collect::<Vec<&str>>()
.join(":");
env::set_var("PYTHONPATH", formatted_paths);
}
pub fn show_installed(lib_path: &Path, path_reqs: &[Req]) {
let installed = find_installed(lib_path);
let scripts = find_console_scripts(&lib_path.join("../bin"));
if installed.is_empty() {
print_color("No packages are installed.", Color::Blue); } else {
print_color("These packages are installed:", Color::Blue); for (name, version, _tops) in installed {
print_color_(&name, Color::Cyan);
print_color(&format!("=={}", version.to_string_color()), Color::White);
}
for req in path_reqs {
print_color_(&req.name, Color::Cyan);
print_color(
&format!(", at path: {}", req.path.as_ref().unwrap()),
Color::White,
);
}
}
if scripts.is_empty() {
print_color("\nNo console scripts are installed.", Color::Blue); } else {
print_color("\nThese console scripts are installed:", Color::Blue); for script in scripts {
print_color(&script, Color::Cyan); }
}
}
pub fn find_installed(lib_path: &Path) -> Vec<(String, Version, Vec<String>)> {
if !lib_path.exists() {
return vec![];
}
let mut result = vec![];
for folder_name in &find_folders(&lib_path) {
let re_dist = Regex::new(r"^(.*?)-(.*?)\.dist-info$").unwrap();
if let Some(caps) = re_dist.captures(folder_name) {
let name = caps.get(1).unwrap().as_str();
let vers = Version::from_str(
caps.get(2)
.expect("Problem parsing version in folder name")
.as_str(),
)
.expect("Problem parsing version in package folder");
let top_level = lib_path.join(folder_name).join("top_level.txt");
let mut tops = vec![];
match fs::File::open(top_level) {
Ok(f) => {
for line in BufReader::new(f).lines().flatten() {
tops.push(line);
}
}
Err(_) => tops.push(folder_name.to_owned()),
}
result.push((name.to_owned(), vers, tops));
}
}
result
}
pub fn find_console_scripts(bin_path: &Path) -> Vec<String> {
let mut result = vec![];
if !bin_path.exists() {
return vec![];
}
for entry in bin_path
.read_dir()
.expect("Trouble opening bin path")
.flatten()
{
if entry.file_type().unwrap().is_file() {
result.push(entry.file_name().to_str().unwrap().to_owned())
}
}
result
}
pub fn merge_reqs(
added: &[String],
dev: bool,
cfg: &crate::Config,
cfg_path: &Path,
) -> (Vec<Req>, Vec<Req>) {
let mut added_reqs = vec![];
for p in added.iter() {
let trimmed = p.replace(',', "");
match Req::from_str(&trimmed, false) {
Ok(r) => added_reqs.push(r),
Err(_) => abort(&format!("Unable to parse this package: {}. \
Note that installing a specific version via the CLI is currently unsupported. If you need to specify a version,\
edit `pyproject.toml`", &p)),
}
}
let existing = if dev { &cfg.dev_reqs } else { &cfg.reqs };
let mut added_reqs_unique: Vec<Req> = added_reqs
.into_iter()
.filter(|ar| {
let mut add = true;
for cr in existing.iter() {
if cr == ar
|| (cr.name.to_lowercase() == ar.name.to_lowercase()
&& ar.constraints.is_empty())
{
add = false;
break;
}
}
add
})
.collect();
for added_req in &mut added_reqs_unique {
if added_req.constraints.is_empty() {
let (_, vers, _) = if let Ok(r) = res::get_version_info(
&added_req.name,
Some(Req::new_with_extras(
added_req.name.clone(),
vec![Constraint::new_any()],
Extras::new_py(Constraint::new(
ReqType::Exact,
cfg.py_version.clone().unwrap_or_else(Version::new_any),
)),
)),
) {
r
} else {
abort("Problem getting latest version of the package you added. Is it spelled correctly? Is the internet OK?");
unreachable!()
};
added_req.constraints.push(Constraint::new(
ReqType::Caret,
vers,
));
}
}
let mut result = vec![];
for cr in existing.iter() {
let mut replaced = false;
for added_req in &added_reqs_unique {
if compare_names(&added_req.name, &cr.name) && added_req.constraints != cr.constraints {
result.push(added_req.clone());
replaced = true;
break;
}
}
if !replaced {
result.push(cr.clone());
}
}
result.append(&mut added_reqs_unique.clone());
if dev {
if !added_reqs_unique.is_empty() {
files::add_reqs_to_cfg(&cfg_path, &[], &added_reqs_unique);
}
(cfg.reqs.clone(), result)
} else {
if !added_reqs_unique.is_empty() {
files::add_reqs_to_cfg(&cfg_path, &added_reqs_unique, &[]);
}
(result, cfg.dev_reqs.clone())
}
}
pub fn standardize_name(name: &str) -> String {
name.to_lowercase().replace('-', "_").replace('.', "_")
}
pub fn compare_names(name1: &str, name2: &str) -> bool {
standardize_name(name1) == standardize_name(name2)
}
pub fn extract_zip(file: &fs::File, out_path: &Path, rename: &Option<(String, String)>) {
let mut archive = if let Ok(a) = zip::ZipArchive::new(file) {
a
} else {
abort(&format!(
"Problem reading the wheel archive: {:?}. Is it corrupted?",
&file
));
unreachable!()
};
for i in 0..archive.len() {
let mut file = archive.by_index(i).unwrap();
let file_str2 = file.enclosed_name().unwrap();
let file_str = file_str2.to_str().expect("Problem converting path to str");
let extracted_file = if !file_str.contains("dist-info") && !file_str.contains("egg-info") {
match rename {
Some((old, new)) => PathBuf::from_str(
file.enclosed_name()
.unwrap()
.to_str()
.unwrap()
.to_owned()
.replace(old, new)
.as_str(),
),
None => PathBuf::from_str(file.enclosed_name().unwrap().to_str().unwrap()),
}
} else {
PathBuf::from_str(file.enclosed_name().unwrap().to_str().unwrap())
};
let outpath = out_path.join(extracted_file.unwrap());
if (&*file.name()).ends_with('/') {
fs::create_dir_all(&outpath).unwrap();
} else {
if let Some(p) = outpath.parent() {
if !p.exists() {
fs::create_dir_all(&p).unwrap();
}
}
let mut outfile = fs::File::create(&outpath).unwrap();
io::copy(&mut file, &mut outfile).unwrap();
}
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
if let Some(mode) = file.unix_mode() {
fs::set_permissions(&outpath, fs::Permissions::from_mode(mode)).unwrap();
}
}
}
}
pub fn unpack_tar_xz(archive_path: &Path, dest: &Path) {
let archive_bytes = fs::read(archive_path).expect("Problem reading archive as bytes");
let mut tar: Vec<u8> = Vec::new();
let mut decompressor = XzDecoder::new(&archive_bytes[..]);
if decompressor.read_to_end(&mut tar).is_err() {
abort(&format!(
"Problem decompressing the archive: {:?}. This may be due to a failed downoad. \
Try deleting it, then trying again. Note that Pyflow will only install officially-released \
Python versions. If you'd like to use a pre-release, you must install it manually.",
archive_path
))
}
let mut archive = Archive::new(&tar[..]);
if archive.unpack(dest).is_err() {
abort(&format!(
"Problem unpacking tar: {}",
archive_path.to_str().unwrap()
))
}
}
pub fn find_or_create_venv(
cfg_vers: &Version,
pypackages_dir: &Path,
pyflow_dir: &Path,
dep_cache_path: &Path,
) -> (PathBuf, Version) {
let venvs = find_venvs(pypackages_dir);
let compatible_venvs: Vec<&(u32, u32)> = venvs
.iter()
.filter(|(ma, mi)| cfg_vers.major == Some(*ma) && cfg_vers.minor == Some(*mi))
.collect();
let vers_path;
let py_vers;
match compatible_venvs.len() {
0 => {
let vers =
py_versions::create_venv(cfg_vers, pypackages_dir, pyflow_dir, dep_cache_path);
vers_path = pypackages_dir.join(vers.to_string_med());
py_vers = Version::new_opt(vers.major, vers.minor, None); }
1 => {
vers_path = pypackages_dir.join(&format!(
"{}.{}",
compatible_venvs[0].0, compatible_venvs[0].1
));
py_vers = Version::new_short(compatible_venvs[0].0, compatible_venvs[0].1);
}
_ => {
abort(
"Multiple compatible Python environments found
for this project.",
);
unreachable!()
}
}
#[cfg(target_os = "windows")]
{
(vers_path, py_vers)
}
#[cfg(target_os = "linux")]
{
let vers_path = fs::canonicalize(vers_path);
let vers_path = match vers_path {
Ok(path) => path,
Err(error) => {
abort(&format!(
"Problem converting path to absolute path: {:?}",
error
));
unreachable!()
}
};
(vers_path, py_vers)
}
#[cfg(target_os = "macos")]
{
let vers_path = fs::canonicalize(vers_path);
let vers_path = match vers_path {
Ok(path) => path,
Err(error) => {
abort(&format!(
"Problem converting path to absolute path: {:?}",
error
));
unreachable!()
}
};
(vers_path, py_vers)
}
}
pub fn fallible_v_parse(vers: &str) -> Version {
let vers = vers.replace(" ", "").replace("\n", "").replace("\r", "");
if let Ok(v) = Version::from_str(&vers) {
v
} else {
abort("Problem parsing the Python version you entered. It should look like this: 3.7 or 3.7.1");
unreachable!()
}
}
pub fn prompt_list<T: Clone + ToString>(
init_msg: &str,
type_: &str,
items: &[(String, T)],
show_item: bool,
) -> (String, T) {
print_color(init_msg, Color::Magenta);
for (i, (name, content)) in items.iter().enumerate() {
if show_item {
println!("{}: {}: {}", i + 1, name, content.to_string())
} else {
println!("{}: {}", i + 1, name)
}
}
let mut mapping = HashMap::new();
for (i, item) in items.iter().enumerate() {
mapping.insert(i + 1, item);
}
let mut input = String::new();
io::stdin()
.read_line(&mut input)
.expect("Problem reading input");
let input = input
.chars()
.next()
.expect("Problem parsing input")
.to_string()
.parse::<usize>();
let input = if let Ok(ip) = input {
ip
} else {
abort("Please try again; enter a number like 1 or 2 .");
unreachable!()
};
let (name, content) = if let Some(r) = mapping.get(&input) {
r
} else {
abort(&format!(
"Can't find the {} associated with that number. Is it in the list above?",
type_
));
unreachable!()
};
(name.to_string(), content.clone())
}
fn os_from_wheel_fname(filename: &str) -> Result<Os, DependencyError> {
let re = Regex::new(r"^(?:.*?-)+(.*).whl$").unwrap();
if let Some(caps) = re.captures(filename) {
let parsed = caps.get(1).unwrap().as_str();
return Ok(
Os::from_str(parsed).unwrap_or_else(|_| panic!("Problem parsing Os: {}", parsed))
);
}
Err(DependencyError::new("Problem parsing os from wheel name"))
}
pub fn find_best_release(
data: &[WarehouseRelease],
name: &str,
version: &Version,
os: Os,
python_vers: &Version,
) -> (WarehouseRelease, PackageType) {
let mut compatible_releases = vec![];
let mut source_releases = vec![];
for rel in data.iter() {
let mut compatible = true;
match rel.packagetype.as_ref() {
"bdist_wheel" => {
if let Some(py_ver) = &rel.requires_python {
let py_constrs = Constraint::from_str_multiple(py_ver)
.expect("Problem parsing constraint from requires_python");
for constr in &py_constrs {
if !constr.is_compatible(python_vers) {
compatible = false;
}
}
}
let wheel_os =
os_from_wheel_fname(&rel.filename).expect("Problem getting os from wheel name");
if wheel_os != os && wheel_os != Os::Any {
compatible = false;
}
if let Ok(constrs) = Constraint::from_wh_py_vers(&rel.python_version) {
let mut compat_py_v = false;
for constr in &constrs {
if constr.is_compatible(python_vers) {
compat_py_v = true;
}
}
if !compat_py_v {
compatible = false;
}
} else {
println!(
"Unable to match python version from python_version: {}",
&rel.python_version
)
};
if compatible {
compatible_releases.push(rel.clone());
}
}
"sdist" => source_releases.push(rel.clone()),
"bdist_wininst" | "bdist_msi" | "bdist_egg" => (), _ => {
println!("Found surprising package type: {}", rel.packagetype);
continue;
}
}
}
let best_release;
let package_type;
if compatible_releases.is_empty() {
if source_releases.is_empty() {
abort(&format!(
"Unable to find a compatible release for {}: {}",
name,
version.to_string_color()
));
unreachable!()
} else {
best_release = source_releases[0].clone();
package_type = install::PackageType::Source;
}
} else {
best_release = compatible_releases[0].clone();
package_type = install::PackageType::Wheel;
}
(best_release, package_type)
}
pub fn get_git_author() -> Vec<String> {
let gitcfg = directories::BaseDirs::new()
.unwrap()
.home_dir()
.join(".gitconfig");
if !gitcfg.exists() {
return vec![];
}
let conf = Ini::load_from_file(gitcfg).expect("Could not read ~/.gitconfig");
let user = conf.section(Some("user".to_owned()));
if let Some(user) = user {
let name: String = user.get("name").unwrap_or(&String::from("")).to_string();
let email: String = user.get("email").unwrap_or(&String::from("")).to_string();
vec![format!("{} <{}>", name, email)]
} else {
vec![]
}
}
pub fn find_first_file(path: &Path) -> PathBuf {
{
for entry in path
.read_dir()
.expect("Trouble reading the directory when finding the first file.")
.flatten()
{
if entry.file_type().unwrap().is_file() {
return entry.path();
}
}
abort(&format!(
"Problem the first file in the directory: {:?}",
path
));
unreachable!()
};
}
pub fn open_archive(path: &Path) -> fs::File {
if let Ok(f) = fs::File::open(&path) {
f
} else {
abort(&format!(
"Problem opening the archive file: {:?}. Was there a problem while
downloading it?",
&path
));
unreachable!()
}
}
pub fn parse_metadata(path: &Path) -> Metadata {
let re = |key: &str| Regex::new(&format!(r"^{}:\s*(.*)$", key)).unwrap();
let mut result = Metadata::default();
let data = fs::read_to_string(path).expect("Problem reading METADATA");
for line in data.lines() {
if let Some(caps) = re("Version").captures(line) {
let val = caps.get(1).unwrap().as_str();
result.version =
Version::from_str(val).expect("Problem parsing version from `METADATA`");
}
if let Some(caps) = re("Requires-Dist").captures(line) {
let val = caps.get(1).unwrap().as_str();
let req =
Req::from_str(val, true).expect("Problem parsing requirement from `METADATA`");
result.requires_dist.push(req);
}
}
result
}
pub fn find_folders(path: &Path) -> Vec<String> {
let mut result = vec![];
for entry in path.read_dir().expect("Can't open lib path").flatten() {
if entry
.file_type()
.expect("Problem reading lib path file type")
.is_dir()
{
result.push(
entry
.file_name()
.to_str()
.expect("Problem converting folder name to string")
.to_owned(),
);
}
}
result
}
fn default_python() -> Version {
match commands::find_py_version("python") {
Some(x) => x,
None => Version::new_short(3, 9),
}
}
pub fn prompt_py_vers() -> Version {
print_color(
"Please enter the Python version for this project: (eg: 3.8)",
Color::Magenta,
);
let default_ver = default_python();
print!("Default [{}]:", default_ver);
std::io::stdout().flush().unwrap();
let mut input = String::new();
io::stdin()
.read_line(&mut input)
.expect("Unable to read user input for version");
input.pop(); let input = input.replace("\n", "").replace("\r", "");
if !input.is_empty() {
fallible_v_parse(&input)
} else {
default_ver
}
}
pub fn find_dont_uninstall(reqs: &[Req], dev_reqs: &[Req]) -> Vec<String> {
let mut result: Vec<String> = reqs
.iter()
.filter_map(|r| {
if r.git.is_some() || r.path.is_some() {
Some(r.name.to_owned())
} else {
None
}
})
.collect();
for r in dev_reqs {
if r.git.is_some() || r.path.is_some() {
result.push(r.name.to_owned());
}
}
result
}
pub(crate) fn check_command_output(output: &process::Output, msg: &str) {
check_command_output_with(output, |s| panic!("{}: {}", msg, s));
}
pub(crate) fn check_command_output_with(output: &process::Output, f: impl Fn(&str)) {
if !output.status.success() {
let stderr =
std::str::from_utf8(&output.stderr).expect("building string from command output");
f(&stderr)
}
}
pub fn canon_join(path: &Path, extend: &str) -> PathBuf {
let ex_path = Path::new(extend);
let canon = match ex_path.canonicalize() {
Ok(c) => c,
Err(e) => {
abort(&format!("{}\n\"{}\"", e, extend));
unreachable!()
}
};
let mut new_path = path.to_path_buf();
for comp in canon.components() {
new_path = match comp {
std::path::Component::Normal(c) => new_path.join(c),
_ => new_path.join(""),
}
}
new_path
}
#[cfg(test)]
mod tests {
use rstest::rstest;
use super::*;
use crate::dep_types;
#[test]
fn dummy_test() {}
#[rstest(
input,
expected,
case("manylinux1_i686", Ok(Os::Linux32)),
case("manylinux2010_i686", Ok(Os::Linux32)),
case("manylinux2014_i686", Ok(Os::Linux32)),
case("cygwin", Ok(Os::Linux)),
case("linux", Ok(Os::Linux)),
case("linux2", Ok(Os::Linux)),
case("manylinux1_x86_64", Ok(Os::Linux)),
case("manylinux2010_x86_64", Ok(Os::Linux)),
case("manylinux2014_aarch64", Ok(Os::Linux)),
case("manylinux2014_ppc64le", Ok(Os::Linux)),
case("manylinux2014_x86_64", Ok(Os::Linux)),
case("win32", Ok(Os::Windows32)),
case("windows", Ok(Os::Windows)),
case("win", Ok(Os::Windows)),
case("win_amd64", Ok(Os::Windows)),
case("macosx_10_6_intel", Ok(Os::Mac)),
case("darwin", Ok(Os::Mac)),
case("openbsd6", Ok(Os::Linux)),
case("any", Ok(Os::Any)),
case("some other bsd name", Ok(Os::Linux)),
case("some other mac name", Ok(Os::Mac))
)]
fn test_os_from_str(input: &str, expected: Result<Os, dep_types::DependencyError>) {
assert_eq!(Os::from_str(input), expected);
}
}