Skip to main content

stout_install/
parallel.rs

1//! Parallel installation support
2//!
3//! Provides concurrent extraction and linking of multiple packages
4//! for faster installation times.
5
6use crate::error::Result;
7use crate::extract::extract_bottle;
8use crate::link::link_package;
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use tokio::sync::Semaphore;
12use tokio::task::JoinSet;
13use tracing::{debug, info};
14
15/// Configuration for parallel installation
16#[derive(Debug, Clone)]
17pub struct ParallelConfig {
18    /// Maximum number of concurrent extractions
19    pub max_concurrent_extractions: usize,
20    /// Maximum number of concurrent linking operations
21    pub max_concurrent_links: usize,
22}
23
24impl Default for ParallelConfig {
25    fn default() -> Self {
26        // Default to number of CPUs for extractions (CPU-bound)
27        // and 4 for links (mostly I/O bound, can conflict)
28        let cpus = std::thread::available_parallelism()
29            .map(|p| p.get())
30            .unwrap_or(4);
31        Self {
32            max_concurrent_extractions: cpus,
33            max_concurrent_links: 4,
34        }
35    }
36}
37
38/// Result of a single package installation
39#[derive(Debug)]
40pub struct PackageInstallResult {
41    /// Package name
42    pub name: String,
43    /// Path where package was installed in Cellar
44    pub install_path: PathBuf,
45    /// Linked files
46    pub linked_files: Vec<PathBuf>,
47}
48
49/// Parallel installer for multiple packages
50pub struct ParallelInstaller {
51    config: ParallelConfig,
52    extract_semaphore: Arc<Semaphore>,
53    link_semaphore: Arc<Semaphore>,
54}
55
56impl ParallelInstaller {
57    /// Create a new parallel installer with default configuration
58    pub fn new() -> Self {
59        Self::with_config(ParallelConfig::default())
60    }
61
62    /// Create with custom configuration
63    pub fn with_config(config: ParallelConfig) -> Self {
64        let extract_semaphore = Arc::new(Semaphore::new(config.max_concurrent_extractions));
65        let link_semaphore = Arc::new(Semaphore::new(config.max_concurrent_links));
66
67        Self {
68            config,
69            extract_semaphore,
70            link_semaphore,
71        }
72    }
73
74    /// Extract multiple bottles in parallel
75    ///
76    /// Returns a vector of (name, install_path) pairs in the same order as input
77    pub async fn extract_bottles(
78        &self,
79        bottles: Vec<BottleInfo>,
80        cellar: &Path,
81    ) -> Result<Vec<(String, PathBuf)>> {
82        info!(
83            "Extracting {} bottles with {} concurrent workers",
84            bottles.len(),
85            self.config.max_concurrent_extractions
86        );
87
88        let cellar = cellar.to_path_buf();
89        let semaphore: Arc<Semaphore> = Arc::clone(&self.extract_semaphore);
90        let mut join_set = JoinSet::new();
91
92        // Store order mapping
93        let order: Vec<String> = bottles.iter().map(|b| b.name.clone()).collect();
94
95        for bottle in bottles {
96            let cellar = cellar.clone();
97            let semaphore = Arc::clone(&semaphore);
98
99            join_set.spawn(async move {
100                // Acquire semaphore permit
101                let _permit = semaphore.acquire().await
102                    .map_err(|e| crate::error::Error::Other(format!("Semaphore error: {}", e)))?;
103
104                // Run blocking extraction in a spawn_blocking
105                let name = bottle.name.clone();
106                let bottle_path = bottle.bottle_path.clone();
107                let cellar_clone = cellar.clone();
108
109                let install_path = tokio::task::spawn_blocking(move || {
110                    extract_bottle(&bottle_path, &cellar_clone)
111                })
112                .await
113                .map_err(|e| crate::error::Error::Other(format!("Task join error: {}", e)))??;
114
115                debug!("Extracted {} to {}", name, install_path.display());
116                Ok::<_, crate::error::Error>((name, install_path))
117            });
118        }
119
120        // Collect results
121        let mut results: Vec<(String, PathBuf)> = Vec::new();
122        while let Some(result) = join_set.join_next().await {
123            match result {
124                Ok(Ok(item)) => results.push(item),
125                Ok(Err(e)) => return Err(e),
126                Err(e) => {
127                    return Err(crate::error::Error::Other(format!(
128                        "Task panic: {}",
129                        e
130                    )))
131                }
132            }
133        }
134
135        // Restore original order
136        let mut ordered: Vec<(String, PathBuf)> = Vec::with_capacity(results.len());
137        for name in &order {
138            if let Some(pos) = results.iter().position(|(n, _)| n == name) {
139                ordered.push(results.remove(pos));
140            }
141        }
142
143        info!("Extracted {} bottles", ordered.len());
144        Ok(ordered)
145    }
146
147    /// Link multiple packages in parallel
148    ///
149    /// Note: Linking can have conflicts if packages try to link the same file,
150    /// so we use a smaller concurrency limit by default.
151    pub async fn link_packages(
152        &self,
153        packages: Vec<LinkInfo>,
154        prefix: &Path,
155    ) -> Result<Vec<(String, Vec<PathBuf>)>> {
156        info!(
157            "Linking {} packages with {} concurrent workers",
158            packages.len(),
159            self.config.max_concurrent_links
160        );
161
162        let prefix = prefix.to_path_buf();
163        let semaphore: Arc<Semaphore> = Arc::clone(&self.link_semaphore);
164        let mut join_set = JoinSet::new();
165
166        let order: Vec<String> = packages.iter().map(|p| p.name.clone()).collect();
167
168        for pkg in packages {
169            let prefix = prefix.clone();
170            let semaphore = Arc::clone(&semaphore);
171
172            join_set.spawn(async move {
173                let _permit = semaphore.acquire().await
174                    .map_err(|e| crate::error::Error::Other(format!("Semaphore error: {}", e)))?;
175
176                let name = pkg.name.clone();
177                let install_path = pkg.install_path.clone();
178                let prefix_clone = prefix.clone();
179
180                let linked = tokio::task::spawn_blocking(move || {
181                    link_package(&install_path, &prefix_clone)
182                })
183                .await
184                .map_err(|e| crate::error::Error::Other(format!("Task join error: {}", e)))??;
185
186                debug!("Linked {} ({} files)", name, linked.len());
187                Ok::<_, crate::error::Error>((name, linked))
188            });
189        }
190
191        let mut results: Vec<(String, Vec<PathBuf>)> = Vec::new();
192        while let Some(result) = join_set.join_next().await {
193            match result {
194                Ok(Ok(item)) => results.push(item),
195                Ok(Err(e)) => return Err(e),
196                Err(e) => {
197                    return Err(crate::error::Error::Other(format!(
198                        "Task panic: {}",
199                        e
200                    )))
201                }
202            }
203        }
204
205        // Restore original order
206        let mut ordered: Vec<(String, Vec<PathBuf>)> = Vec::with_capacity(results.len());
207        for name in &order {
208            if let Some(pos) = results.iter().position(|(n, _)| n == name) {
209                ordered.push(results.remove(pos));
210            }
211        }
212
213        info!("Linked {} packages", ordered.len());
214        Ok(ordered)
215    }
216
217    /// Install multiple packages in parallel (extract then link)
218    ///
219    /// Extracts all bottles in parallel first, then links in parallel.
220    /// This two-phase approach avoids potential conflicts.
221    pub async fn install_bottles(
222        &self,
223        bottles: Vec<BottleInfo>,
224        cellar: &Path,
225        prefix: &Path,
226    ) -> Result<Vec<PackageInstallResult>> {
227        // Phase 1: Extract all bottles in parallel
228        let extracted = self.extract_bottles(bottles, cellar).await?;
229
230        // Phase 2: Link all packages in parallel
231        let link_infos: Vec<LinkInfo> = extracted
232            .iter()
233            .map(|(name, install_path)| LinkInfo {
234                name: name.clone(),
235                install_path: install_path.clone(),
236            })
237            .collect();
238
239        let linked = self.link_packages(link_infos, prefix).await?;
240
241        // Combine results
242        let results: Vec<PackageInstallResult> = extracted
243            .into_iter()
244            .zip(linked.into_iter())
245            .map(|((name, install_path), (_, linked_files))| PackageInstallResult {
246                name,
247                install_path,
248                linked_files,
249            })
250            .collect();
251
252        Ok(results)
253    }
254}
255
256impl Default for ParallelInstaller {
257    fn default() -> Self {
258        Self::new()
259    }
260}
261
262/// Information about a bottle to extract
263#[derive(Debug, Clone)]
264pub struct BottleInfo {
265    /// Package name
266    pub name: String,
267    /// Path to the downloaded bottle tarball
268    pub bottle_path: PathBuf,
269}
270
271/// Information about a package to link
272#[derive(Debug, Clone)]
273pub struct LinkInfo {
274    /// Package name
275    pub name: String,
276    /// Path to the installed package in Cellar
277    pub install_path: PathBuf,
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283
284    #[test]
285    fn test_parallel_config_default() {
286        let config = ParallelConfig::default();
287        assert!(config.max_concurrent_extractions >= 1);
288        assert!(config.max_concurrent_links >= 1);
289    }
290}