use crate::error::Result;
use crate::extract::extract_bottle;
use crate::link::link_package;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use tracing::{debug, info};
#[derive(Debug, Clone)]
pub struct ParallelConfig {
pub max_concurrent_extractions: usize,
pub max_concurrent_links: usize,
}
impl Default for ParallelConfig {
fn default() -> Self {
let cpus = std::thread::available_parallelism()
.map(|p| p.get())
.unwrap_or(4);
Self {
max_concurrent_extractions: cpus,
max_concurrent_links: 4,
}
}
}
#[derive(Debug)]
pub struct PackageInstallResult {
pub name: String,
pub install_path: PathBuf,
pub linked_files: Vec<PathBuf>,
}
pub struct ParallelInstaller {
config: ParallelConfig,
extract_semaphore: Arc<Semaphore>,
link_semaphore: Arc<Semaphore>,
}
impl ParallelInstaller {
pub fn new() -> Self {
Self::with_config(ParallelConfig::default())
}
pub fn with_config(config: ParallelConfig) -> Self {
let extract_semaphore = Arc::new(Semaphore::new(config.max_concurrent_extractions));
let link_semaphore = Arc::new(Semaphore::new(config.max_concurrent_links));
Self {
config,
extract_semaphore,
link_semaphore,
}
}
pub async fn extract_bottles(
&self,
bottles: Vec<BottleInfo>,
cellar: &Path,
) -> Result<Vec<(String, PathBuf)>> {
info!(
"Extracting {} bottles with {} concurrent workers",
bottles.len(),
self.config.max_concurrent_extractions
);
let cellar = cellar.to_path_buf();
let semaphore: Arc<Semaphore> = Arc::clone(&self.extract_semaphore);
let mut join_set = JoinSet::new();
let order: Vec<String> = bottles.iter().map(|b| b.name.clone()).collect();
for bottle in bottles {
let cellar = cellar.clone();
let semaphore = Arc::clone(&semaphore);
join_set.spawn(async move {
let _permit = semaphore.acquire().await
.map_err(|e| crate::error::Error::Other(format!("Semaphore error: {}", e)))?;
let name = bottle.name.clone();
let bottle_path = bottle.bottle_path.clone();
let cellar_clone = cellar.clone();
let install_path = tokio::task::spawn_blocking(move || {
extract_bottle(&bottle_path, &cellar_clone)
})
.await
.map_err(|e| crate::error::Error::Other(format!("Task join error: {}", e)))??;
debug!("Extracted {} to {}", name, install_path.display());
Ok::<_, crate::error::Error>((name, install_path))
});
}
let mut results: Vec<(String, PathBuf)> = Vec::new();
while let Some(result) = join_set.join_next().await {
match result {
Ok(Ok(item)) => results.push(item),
Ok(Err(e)) => return Err(e),
Err(e) => {
return Err(crate::error::Error::Other(format!(
"Task panic: {}",
e
)))
}
}
}
let mut ordered: Vec<(String, PathBuf)> = Vec::with_capacity(results.len());
for name in &order {
if let Some(pos) = results.iter().position(|(n, _)| n == name) {
ordered.push(results.remove(pos));
}
}
info!("Extracted {} bottles", ordered.len());
Ok(ordered)
}
pub async fn link_packages(
&self,
packages: Vec<LinkInfo>,
prefix: &Path,
) -> Result<Vec<(String, Vec<PathBuf>)>> {
info!(
"Linking {} packages with {} concurrent workers",
packages.len(),
self.config.max_concurrent_links
);
let prefix = prefix.to_path_buf();
let semaphore: Arc<Semaphore> = Arc::clone(&self.link_semaphore);
let mut join_set = JoinSet::new();
let order: Vec<String> = packages.iter().map(|p| p.name.clone()).collect();
for pkg in packages {
let prefix = prefix.clone();
let semaphore = Arc::clone(&semaphore);
join_set.spawn(async move {
let _permit = semaphore.acquire().await
.map_err(|e| crate::error::Error::Other(format!("Semaphore error: {}", e)))?;
let name = pkg.name.clone();
let install_path = pkg.install_path.clone();
let prefix_clone = prefix.clone();
let linked = tokio::task::spawn_blocking(move || {
link_package(&install_path, &prefix_clone)
})
.await
.map_err(|e| crate::error::Error::Other(format!("Task join error: {}", e)))??;
debug!("Linked {} ({} files)", name, linked.len());
Ok::<_, crate::error::Error>((name, linked))
});
}
let mut results: Vec<(String, Vec<PathBuf>)> = Vec::new();
while let Some(result) = join_set.join_next().await {
match result {
Ok(Ok(item)) => results.push(item),
Ok(Err(e)) => return Err(e),
Err(e) => {
return Err(crate::error::Error::Other(format!(
"Task panic: {}",
e
)))
}
}
}
let mut ordered: Vec<(String, Vec<PathBuf>)> = Vec::with_capacity(results.len());
for name in &order {
if let Some(pos) = results.iter().position(|(n, _)| n == name) {
ordered.push(results.remove(pos));
}
}
info!("Linked {} packages", ordered.len());
Ok(ordered)
}
pub async fn install_bottles(
&self,
bottles: Vec<BottleInfo>,
cellar: &Path,
prefix: &Path,
) -> Result<Vec<PackageInstallResult>> {
let extracted = self.extract_bottles(bottles, cellar).await?;
let link_infos: Vec<LinkInfo> = extracted
.iter()
.map(|(name, install_path)| LinkInfo {
name: name.clone(),
install_path: install_path.clone(),
})
.collect();
let linked = self.link_packages(link_infos, prefix).await?;
let results: Vec<PackageInstallResult> = extracted
.into_iter()
.zip(linked.into_iter())
.map(|((name, install_path), (_, linked_files))| PackageInstallResult {
name,
install_path,
linked_files,
})
.collect();
Ok(results)
}
}
impl Default for ParallelInstaller {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct BottleInfo {
pub name: String,
pub bottle_path: PathBuf,
}
#[derive(Debug, Clone)]
pub struct LinkInfo {
pub name: String,
pub install_path: PathBuf,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parallel_config_default() {
let config = ParallelConfig::default();
assert!(config.max_concurrent_extractions >= 1);
assert!(config.max_concurrent_links >= 1);
}
}