use crate::{
dep_resolution,
dep_types::{Constraint, Req, ReqType, Version},
files, py_versions,
};
use crossterm::{Color, Colored};
use regex::Regex;
use std::io::{self, BufRead, BufReader, Read};
use std::str::FromStr;
use std::{
env, fs,
path::{Path, PathBuf},
process, thread, time,
};
use tar::Archive;
use xz2::read::XzDecoder;
pub fn print_color(message: &str, color: Color) {
println!(
"{}{}{}",
Colored::Fg(color),
message,
Colored::Fg(Color::Reset)
);
}
pub fn abort(message: &str) {
println!(
"{}{}{}",
Colored::Fg(Color::Red),
message,
Colored::Fg(Color::Reset)
);
process::exit(1)
}
pub fn find_venvs(pyflows_dir: &Path) -> Vec<(u32, u32)> {
let py_versions: &[(u32, u32)] = &[
(2, 6),
(2, 7),
(2, 8),
(2, 9),
(3, 0),
(3, 1),
(3, 2),
(3, 3),
(3, 4),
(3, 5),
(3, 6),
(3, 7),
(3, 8),
(3, 9),
(3, 10),
(3, 11),
(3, 12),
];
let mut result = vec![];
for (maj, mi) in py_versions.iter() {
let venv_path = pyflows_dir.join(&format!("{}.{}/.venv", maj, mi));
if (venv_path.join("bin/python").exists() && venv_path.join("bin/pip").exists())
|| (venv_path.join("Scripts/python.exe").exists()
&& venv_path.join("Scripts/pip.exe").exists())
{
result.push((*maj, *mi))
}
}
result
}
pub fn find_bin_path(vers_path: &Path) -> PathBuf {
#[cfg(target_os = "windows")]
return vers_path.join(".venv/Scripts");
#[cfg(target_os = "linux")]
return vers_path.join(".venv/bin");
#[cfg(target_os = "macos")]
return vers_path.join(".venv/bin");
}
pub fn wait_for_dirs(dirs: &[PathBuf]) -> Result<(), crate::py_versions::AliasError> {
let timeout = 1000; for _ in 0..timeout {
let mut all_created = true;
for dir in dirs {
if !dir.exists() {
all_created = false;
}
}
if all_created {
return Ok(());
}
thread::sleep(time::Duration::from_millis(10));
}
Err(crate::py_versions::AliasError {
details: "Timed out attempting to create a directory".to_string(),
})
}
pub fn set_pythonpath(lib_path: &Path) {
env::set_var(
"PYTHONPATH",
lib_path
.to_str()
.expect("Problem converting current path to string"),
);
}
pub fn show_installed(lib_path: &Path) {
let installed = find_installed(lib_path);
let scripts = find_console_scripts(&lib_path.join("../bin"));
print_color("These packages are installed:", Color::DarkBlue);
for (name, version, _tops) in installed {
println!(
"{}{}{} == {}",
Colored::Fg(Color::Cyan),
name,
Colored::Fg(Color::Reset),
version
);
}
print_color("\nThese console scripts are installed:", Color::DarkBlue);
for script in scripts {
print_color(&script, Color::DarkCyan);
}
}
pub fn find_installed(lib_path: &Path) -> Vec<(String, Version, Vec<String>)> {
let mut package_folders = vec![];
if !lib_path.exists() {
return vec![];
}
for entry in lib_path.read_dir().expect("Can't open lib path") {
if let Ok(entry) = entry {
if entry
.file_type()
.expect("Problem reading lib path file type")
.is_dir()
{
package_folders.push(entry.file_name())
}
}
}
let mut result = vec![];
for folder in package_folders.iter() {
let folder_name = folder
.to_str()
.expect("Problem converting folder name to string");
let re_dist = Regex::new(r"^(.*?)-(.*?)\.dist-info$").unwrap();
if let Some(caps) = re_dist.captures(&folder_name) {
let name = caps.get(1).unwrap().as_str();
let vers = Version::from_str(
caps.get(2)
.expect("Problem parsing version in folder name")
.as_str(),
)
.expect("Problem parsing version in package folder");
let top_level = lib_path.join(folder_name).join("top_level.txt");
let mut tops = vec![];
match fs::File::open(top_level) {
Ok(f) => {
for line in BufReader::new(f).lines() {
if let Ok(l) = line {
tops.push(l);
}
}
}
Err(_) => tops.push(folder_name.to_owned()),
}
result.push((name.to_owned(), vers, tops));
}
}
result
}
pub fn find_console_scripts(bin_path: &Path) -> Vec<String> {
let mut result = vec![];
if !bin_path.exists() {
return vec![];
}
for entry in bin_path.read_dir().expect("Trouble opening bin path") {
if let Ok(entry) = entry {
if entry.file_type().unwrap().is_file() {
result.push(entry.file_name().to_str().unwrap().to_owned())
}
}
}
result
}
pub fn merge_reqs(added: &[String], cfg: &crate::Config, cfg_filename: &str) -> Vec<Req> {
let mut added_reqs = vec![];
for p in added.iter() {
match Req::from_str(&p, false) {
Ok(r) => added_reqs.push(r),
Err(_) => abort(&format!("Unable to parse this package: {}. \
Note that installing a specific version via the CLI is currently unsupported. If you need to specify a version,\
edit `pyproject.toml`", &p)),
}
}
let mut added_reqs_unique: Vec<Req> = added_reqs
.into_iter()
.filter(|ar| {
let mut add = true;
for cr in cfg.reqs.iter() {
if cr == ar
|| (cr.name.to_lowercase() == ar.name.to_lowercase()
&& ar.constraints.is_empty())
{
add = false;
break;
}
}
add
})
.collect();
for added_req in added_reqs_unique.iter_mut() {
if added_req.constraints.is_empty() {
let (_, vers, _) = dep_resolution::get_version_info(&added_req.name)
.expect("Problem getting latest version of the package you added.");
added_req.constraints.push(Constraint::new(
ReqType::Caret,
vers,
));
}
}
let mut result = vec![];
for cr in cfg.reqs.iter() {
let mut replaced = false;
for added_req in added_reqs_unique.iter() {
if compare_names(&added_req.name, &cr.name) && added_req.constraints != cr.constraints {
result.push(added_req.clone());
replaced = true;
break;
}
}
if !replaced {
result.push(cr.clone());
}
}
if !added_reqs_unique.is_empty() {
files::add_reqs_to_cfg(cfg_filename, &added_reqs_unique);
}
result.append(&mut added_reqs_unique);
result
}
pub fn standardize_name(name: &str) -> String {
name.to_lowercase().replace('-', "_")
}
pub fn compare_names(name1: &str, name2: &str) -> bool {
standardize_name(name1) == standardize_name(name2)
}
pub fn extract_zip(file: &fs::File, out_path: &Path, rename: &Option<(String, String)>) {
let mut archive = zip::ZipArchive::new(file).unwrap();
for i in 0..archive.len() {
let mut file = archive.by_index(i).unwrap();
let file_str2 = file.sanitized_name();
let file_str = file_str2.to_str().expect("Problem converting path to str");
let extracted_file = if !file_str.contains("dist-info") && !file_str.contains("egg-info") {
match rename {
Some((old, new)) => file
.sanitized_name()
.to_str()
.unwrap()
.to_owned()
.replace(old, new)
.into(),
None => file.sanitized_name(),
}
} else {
file.sanitized_name()
};
let outpath = out_path.join(extracted_file);
if (&*file.name()).ends_with('/') {
fs::create_dir_all(&outpath).unwrap();
} else {
if let Some(p) = outpath.parent() {
if !p.exists() {
fs::create_dir_all(&p).unwrap();
}
}
let mut outfile = fs::File::create(&outpath).unwrap();
io::copy(&mut file, &mut outfile).unwrap();
}
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
if let Some(mode) = file.unix_mode() {
fs::set_permissions(&outpath, fs::Permissions::from_mode(mode)).unwrap();
}
}
}
}
pub fn unpack_tar_xz(archive_path: &Path, dest: &Path) {
let archive_bytes = fs::read(archive_path).expect("Problem reading archive as bytes");
let mut tar: Vec<u8> = Vec::new();
let mut decompressor = XzDecoder::new(&archive_bytes[..]);
decompressor
.read_to_end(&mut tar)
.expect("Problem decompressing archive");
let mut archive = Archive::new(&tar[..]);
if archive.unpack(dest).is_err() {
abort(&format!(
"Problem unpacking tar: {}",
archive_path.to_str().unwrap()
))
}
}
pub fn find_venv_info(cfg_vers: &Version, pyflows_dir: &Path) -> (PathBuf, Version) {
let venvs = find_venvs(&pyflows_dir);
let compatible_venvs: Vec<&(u32, u32)> = venvs
.iter()
.filter(|(ma, mi)| cfg_vers.major == *ma && cfg_vers.minor == *mi)
.collect();
let vers_path;
let py_vers;
match compatible_venvs.len() {
0 => {
let vers = py_versions::create_venv(&cfg_vers, pyflows_dir);
vers_path = pyflows_dir.join(&format!("{}.{}", vers.major, vers.minor));
py_vers = Version::new_short(vers.major, vers.minor); }
1 => {
vers_path = pyflows_dir.join(&format!(
"{}.{}",
compatible_venvs[0].0, compatible_venvs[0].1
));
py_vers = Version::new_short(compatible_venvs[0].0, compatible_venvs[0].1);
}
_ => {
abort(
"Multiple compatible Python environments found
for this project.",
);
unreachable!()
}
}
(vers_path, py_vers)
}
pub fn wipe_dir(path: &Path) {
if !path.exists() {
fs::create_dir(&path).expect("Problem creating directory");
}
for entry in fs::read_dir(&path).expect("Problem reading path") {
if let Ok(entry) = entry {
let path2 = entry.path();
if path2.is_file() {
fs::remove_file(path2).expect("Problem removing a file");
}
};
}
}