Skip to main content

stout_install/
parallel.rs

1//! Parallel installation support
2//!
3//! Provides concurrent extraction and linking of multiple packages
4//! for faster installation times.
5
6use crate::error::Result;
7use crate::extract::{extract_bottle, relocate_bottle};
8use crate::link::link_package;
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use tokio::sync::Semaphore;
12use tokio::task::JoinSet;
13use tracing::{debug, info};
14
15/// Configuration for parallel installation
16#[derive(Debug, Clone)]
17pub struct ParallelConfig {
18    /// Maximum number of concurrent extractions
19    pub max_concurrent_extractions: usize,
20    /// Maximum number of concurrent linking operations
21    pub max_concurrent_links: usize,
22}
23
24impl Default for ParallelConfig {
25    fn default() -> Self {
26        // Default to number of CPUs for extractions (CPU-bound)
27        // and 4 for links (mostly I/O bound, can conflict)
28        let cpus = std::thread::available_parallelism()
29            .map(|p| p.get())
30            .unwrap_or(4);
31        Self {
32            max_concurrent_extractions: cpus,
33            max_concurrent_links: 4,
34        }
35    }
36}
37
38/// Result of a single package installation
39#[derive(Debug)]
40pub struct PackageInstallResult {
41    /// Package name
42    pub name: String,
43    /// Path where package was installed in Cellar
44    pub install_path: PathBuf,
45    /// Linked files
46    pub linked_files: Vec<PathBuf>,
47}
48
49/// Parallel installer for multiple packages
50pub struct ParallelInstaller {
51    config: ParallelConfig,
52    extract_semaphore: Arc<Semaphore>,
53    link_semaphore: Arc<Semaphore>,
54}
55
56impl ParallelInstaller {
57    /// Create a new parallel installer with default configuration
58    pub fn new() -> Self {
59        Self::with_config(ParallelConfig::default())
60    }
61
62    /// Create with custom configuration
63    pub fn with_config(config: ParallelConfig) -> Self {
64        let extract_semaphore = Arc::new(Semaphore::new(config.max_concurrent_extractions));
65        let link_semaphore = Arc::new(Semaphore::new(config.max_concurrent_links));
66
67        Self {
68            config,
69            extract_semaphore,
70            link_semaphore,
71        }
72    }
73
74    /// Extract multiple bottles in parallel
75    ///
76    /// Returns a vector of (name, install_path) pairs in the same order as input
77    pub async fn extract_bottles(
78        &self,
79        bottles: Vec<BottleInfo>,
80        cellar: &Path,
81        prefix: &Path,
82    ) -> Result<Vec<(String, PathBuf)>> {
83        info!(
84            "Extracting {} bottles with {} concurrent workers",
85            bottles.len(),
86            self.config.max_concurrent_extractions
87        );
88
89        let cellar = cellar.to_path_buf();
90        let prefix = prefix.to_path_buf();
91        let semaphore: Arc<Semaphore> = Arc::clone(&self.extract_semaphore);
92        let mut join_set = JoinSet::new();
93
94        // Store order mapping
95        let order: Vec<String> = bottles.iter().map(|b| b.name.clone()).collect();
96
97        for bottle in bottles {
98            let cellar = cellar.clone();
99            let prefix = prefix.clone();
100            let semaphore = Arc::clone(&semaphore);
101
102            join_set.spawn(async move {
103                // Acquire semaphore permit
104                let _permit = semaphore
105                    .acquire()
106                    .await
107                    .map_err(|e| crate::error::Error::Other(format!("Semaphore error: {}", e)))?;
108
109                // Run blocking extraction in a spawn_blocking
110                let name = bottle.name.clone();
111                let bottle_path = bottle.bottle_path.clone();
112                let cellar_clone = cellar.clone();
113                let prefix_clone = prefix.clone();
114
115                let install_path = tokio::task::spawn_blocking(move || {
116                    let path = extract_bottle(&bottle_path, &cellar_clone)?;
117                    // Relocate Homebrew placeholders to actual paths
118                    relocate_bottle(&path, &prefix_clone)?;
119                    Ok::<_, crate::error::Error>(path)
120                })
121                .await
122                .map_err(|e| crate::error::Error::Other(format!("Task join error: {}", e)))??;
123
124                debug!("Extracted {} to {}", name, install_path.display());
125                Ok::<_, crate::error::Error>((name, install_path))
126            });
127        }
128
129        // Collect results
130        let mut results: Vec<(String, PathBuf)> = Vec::new();
131        while let Some(result) = join_set.join_next().await {
132            match result {
133                Ok(Ok(item)) => results.push(item),
134                Ok(Err(e)) => return Err(e),
135                Err(e) => return Err(crate::error::Error::Other(format!("Task panic: {}", e))),
136            }
137        }
138
139        // Restore original order
140        let mut ordered: Vec<(String, PathBuf)> = Vec::with_capacity(results.len());
141        for name in &order {
142            if let Some(pos) = results.iter().position(|(n, _)| n == name) {
143                ordered.push(results.remove(pos));
144            }
145        }
146
147        info!("Extracted {} bottles", ordered.len());
148        Ok(ordered)
149    }
150
151    /// Link multiple packages in parallel
152    ///
153    /// Note: Linking can have conflicts if packages try to link the same file,
154    /// so we use a smaller concurrency limit by default.
155    pub async fn link_packages(
156        &self,
157        packages: Vec<LinkInfo>,
158        prefix: &Path,
159    ) -> Result<Vec<(String, Vec<PathBuf>)>> {
160        info!(
161            "Linking {} packages with {} concurrent workers",
162            packages.len(),
163            self.config.max_concurrent_links
164        );
165
166        let prefix = prefix.to_path_buf();
167        let semaphore: Arc<Semaphore> = Arc::clone(&self.link_semaphore);
168        let mut join_set = JoinSet::new();
169
170        let order: Vec<String> = packages.iter().map(|p| p.name.clone()).collect();
171
172        for pkg in packages {
173            let prefix = prefix.clone();
174            let semaphore = Arc::clone(&semaphore);
175
176            join_set.spawn(async move {
177                let _permit = semaphore
178                    .acquire()
179                    .await
180                    .map_err(|e| crate::error::Error::Other(format!("Semaphore error: {}", e)))?;
181
182                let name = pkg.name.clone();
183                let install_path = pkg.install_path.clone();
184                let prefix_clone = prefix.clone();
185
186                let linked =
187                    tokio::task::spawn_blocking(move || link_package(&install_path, &prefix_clone))
188                        .await
189                        .map_err(|e| {
190                            crate::error::Error::Other(format!("Task join error: {}", e))
191                        })??;
192
193                debug!("Linked {} ({} files)", name, linked.len());
194                Ok::<_, crate::error::Error>((name, linked))
195            });
196        }
197
198        let mut results: Vec<(String, Vec<PathBuf>)> = Vec::new();
199        while let Some(result) = join_set.join_next().await {
200            match result {
201                Ok(Ok(item)) => results.push(item),
202                Ok(Err(e)) => return Err(e),
203                Err(e) => return Err(crate::error::Error::Other(format!("Task panic: {}", e))),
204            }
205        }
206
207        // Restore original order
208        let mut ordered: Vec<(String, Vec<PathBuf>)> = Vec::with_capacity(results.len());
209        for name in &order {
210            if let Some(pos) = results.iter().position(|(n, _)| n == name) {
211                ordered.push(results.remove(pos));
212            }
213        }
214
215        info!("Linked {} packages", ordered.len());
216        Ok(ordered)
217    }
218
219    /// Install multiple packages in parallel (extract then link)
220    ///
221    /// Extracts all bottles in parallel first, then links in parallel.
222    /// This two-phase approach avoids potential conflicts.
223    pub async fn install_bottles(
224        &self,
225        bottles: Vec<BottleInfo>,
226        cellar: &Path,
227        prefix: &Path,
228    ) -> Result<Vec<PackageInstallResult>> {
229        // Phase 1: Extract all bottles in parallel
230        let extracted = self.extract_bottles(bottles, cellar, prefix).await?;
231
232        // Phase 2: Link all packages in parallel
233        let link_infos: Vec<LinkInfo> = extracted
234            .iter()
235            .map(|(name, install_path)| LinkInfo {
236                name: name.clone(),
237                install_path: install_path.clone(),
238            })
239            .collect();
240
241        let linked = self.link_packages(link_infos, prefix).await?;
242
243        // Combine results
244        let results: Vec<PackageInstallResult> = extracted
245            .into_iter()
246            .zip(linked)
247            .map(
248                |((name, install_path), (_, linked_files))| PackageInstallResult {
249                    name,
250                    install_path,
251                    linked_files,
252                },
253            )
254            .collect();
255
256        Ok(results)
257    }
258}
259
260impl Default for ParallelInstaller {
261    fn default() -> Self {
262        Self::new()
263    }
264}
265
266/// Information about a bottle to extract
267#[derive(Debug, Clone)]
268pub struct BottleInfo {
269    /// Package name
270    pub name: String,
271    /// Path to the downloaded bottle tarball
272    pub bottle_path: PathBuf,
273}
274
275/// Information about a package to link
276#[derive(Debug, Clone)]
277pub struct LinkInfo {
278    /// Package name
279    pub name: String,
280    /// Path to the installed package in Cellar
281    pub install_path: PathBuf,
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287
288    #[test]
289    fn test_parallel_config_default() {
290        let config = ParallelConfig::default();
291        assert!(config.max_concurrent_extractions >= 1);
292        assert!(config.max_concurrent_links >= 1);
293    }
294}