use async_compression::tokio::bufread::XzDecoder;
use fs2::FileExt;
use std::{
error::Error,
fs::{self, OpenOptions},
};
use tempfile::tempdir_in;
use tokio::task;
use tokio_tar::Archive;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use futures::stream::TryStreamExt;
use crate::{
cellar,
client::build_client,
config::Config,
inventory,
types::{Installation, Package},
};
pub enum InstallEvent {
DownloadSize(u64), Progress(u64), }
pub async fn install<F>(
pkg: &Package,
config: &Config,
mut event_callback: Option<F>,
) -> Result<Installation, Box<dyn Error>>
where
F: FnMut(InstallEvent) + Send + 'static,
{
let shelf = config.pkgx_dir.join(&pkg.project);
fs::create_dir_all(&shelf)?;
#[cfg(windows)]
let lockfile = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(shelf.join("lockfile"))?;
#[cfg(not(windows))]
let lockfile = OpenOptions::new()
.read(true) .open(shelf.clone())?;
task::spawn_blocking({
let lockfile = lockfile.try_clone()?;
move || {
lockfile
.lock_exclusive()
.expect("unexpected error: install locking failed");
}
})
.await?;
let dst_path = cellar::dst(pkg, config);
if dst_path.is_dir() {
FileExt::unlock(&lockfile)?;
return Ok(Installation {
path: dst_path,
pkg: pkg.clone(),
});
}
let url = inventory::get_url(pkg, config);
let client = build_client()?;
let rsp = client.get(url).send().await?.error_for_status()?;
let total_size = rsp
.content_length()
.ok_or("Failed to get content length from response")?;
if let Some(cb) = event_callback.as_mut() {
cb(InstallEvent::DownloadSize(total_size));
}
let stream = rsp.bytes_stream();
let stream = stream.inspect_ok(move |chunk| {
if let Some(cb) = event_callback.as_mut() {
cb(InstallEvent::Progress(chunk.len() as u64));
}
});
let stream = stream
.map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e))
.into_async_read();
let stream = stream.compat();
let decoder = XzDecoder::new(stream);
let temp_dir = tempdir_in(config.pkgx_dir.join(&pkg.project))?;
let mut archive = Archive::new(decoder);
archive.unpack(&temp_dir).await?;
let partial_path = format!("{}/v{}", pkg.project, pkg.version.raw);
fs::rename(temp_dir.path().join(&partial_path), &dst_path)?;
let installation = Installation {
path: dst_path,
pkg: pkg.clone(),
};
#[cfg(not(windows))]
symlink(&installation, config).await?;
FileExt::unlock(&lockfile)?;
Ok(installation)
}
#[cfg(not(windows))]
use {
libsemverator::range::Range as VersionReq, libsemverator::semver::Semver as Version,
std::collections::VecDeque, std::path::Path, std::path::PathBuf,
};
#[cfg(not(windows))]
async fn symlink(installation: &Installation, config: &Config) -> Result<(), Box<dyn Error>> {
let mut versions: VecDeque<(Version, PathBuf)> = cellar::ls(&installation.pkg.project, config)
.await?
.into_iter()
.map(|entry| (entry.pkg.version, entry.path))
.collect();
versions.make_contiguous().sort_by(|a, b| a.0.cmp(&b.0));
if versions.is_empty() {
return Err(format!("no versions for package {}", installation.pkg.project).into());
}
let shelf = installation.path.parent().unwrap();
let newest = versions.back().unwrap();
let v_mm = format!(
"{}.{}",
installation.pkg.version.major, installation.pkg.version.minor
);
let minor_range = if installation.pkg.version.major > 0 {
VersionReq::caret(&v_mm)?
} else {
VersionReq::parse(&format!(
">={},<0.{}",
v_mm,
installation.pkg.version.minor + 1
))?
};
let most_minor = versions
.iter()
.filter(|(version, _)| minor_range.satisfies(version))
.next_back()
.ok_or_else(|| {
anyhow::anyhow!(
"Could not find most minor version for {}",
installation.pkg.project
)
})?;
if most_minor.0 != installation.pkg.version {
return Ok(());
}
make_symlink(shelf, &format!("v{}", v_mm), installation).await?;
let major_range = VersionReq::parse(&format!("^{}", installation.pkg.version.major))?;
let most_major = versions
.iter()
.filter(|(version, _)| major_range.satisfies(version))
.next_back()
.ok_or_else(|| anyhow::anyhow!("Could not find most major version"))?;
if most_major.0 != installation.pkg.version {
return Ok(());
}
make_symlink(
shelf,
&format!("v{}", installation.pkg.version.major),
installation,
)
.await?;
if installation.pkg.version == newest.0 {
make_symlink(shelf, "v*", installation).await?;
}
Ok(())
}
#[cfg(not(windows))]
async fn make_symlink(
shelf: &Path,
symname: &str,
installation: &Installation,
) -> Result<(), Box<dyn Error>> {
let symlink_path = shelf.join(symname);
if symlink_path.is_symlink() {
if let Err(err) = fs::remove_file(&symlink_path) {
if err.kind() != std::io::ErrorKind::NotFound {
return Err(err.into());
}
}
}
let target = installation
.path
.file_name()
.ok_or_else(|| anyhow::anyhow!("Could not get the base name of the installation path"))?;
#[cfg(not(windows))]
std::os::unix::fs::symlink(target, &symlink_path)?;
#[cfg(windows)]
std::os::windows::fs::symlink_dir(target, symlink_path)?;
Ok(())
}