stout_install/
parallel.rs1use crate::error::Result;
7use crate::extract::{extract_bottle, relocate_bottle};
8use crate::link::link_package;
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use tokio::sync::Semaphore;
12use tokio::task::JoinSet;
13use tracing::{debug, info};
14
15#[derive(Debug, Clone)]
17pub struct ParallelConfig {
18 pub max_concurrent_extractions: usize,
20 pub max_concurrent_links: usize,
22}
23
24impl Default for ParallelConfig {
25 fn default() -> Self {
26 let cpus = std::thread::available_parallelism()
29 .map(|p| p.get())
30 .unwrap_or(4);
31 Self {
32 max_concurrent_extractions: cpus,
33 max_concurrent_links: 4,
34 }
35 }
36}
37
38#[derive(Debug)]
40pub struct PackageInstallResult {
41 pub name: String,
43 pub install_path: PathBuf,
45 pub linked_files: Vec<PathBuf>,
47}
48
49pub struct ParallelInstaller {
51 config: ParallelConfig,
52 extract_semaphore: Arc<Semaphore>,
53 link_semaphore: Arc<Semaphore>,
54}
55
56impl ParallelInstaller {
57 pub fn new() -> Self {
59 Self::with_config(ParallelConfig::default())
60 }
61
62 pub fn with_config(config: ParallelConfig) -> Self {
64 let extract_semaphore = Arc::new(Semaphore::new(config.max_concurrent_extractions));
65 let link_semaphore = Arc::new(Semaphore::new(config.max_concurrent_links));
66
67 Self {
68 config,
69 extract_semaphore,
70 link_semaphore,
71 }
72 }
73
74 pub async fn extract_bottles(
78 &self,
79 bottles: Vec<BottleInfo>,
80 cellar: &Path,
81 prefix: &Path,
82 ) -> Result<Vec<(String, PathBuf)>> {
83 info!(
84 "Extracting {} bottles with {} concurrent workers",
85 bottles.len(),
86 self.config.max_concurrent_extractions
87 );
88
89 let cellar = cellar.to_path_buf();
90 let prefix = prefix.to_path_buf();
91 let semaphore: Arc<Semaphore> = Arc::clone(&self.extract_semaphore);
92 let mut join_set = JoinSet::new();
93
94 let order: Vec<String> = bottles.iter().map(|b| b.name.clone()).collect();
96
97 for bottle in bottles {
98 let cellar = cellar.clone();
99 let prefix = prefix.clone();
100 let semaphore = Arc::clone(&semaphore);
101
102 join_set.spawn(async move {
103 let _permit = semaphore
105 .acquire()
106 .await
107 .map_err(|e| crate::error::Error::Other(format!("Semaphore error: {}", e)))?;
108
109 let name = bottle.name.clone();
111 let bottle_path = bottle.bottle_path.clone();
112 let cellar_clone = cellar.clone();
113 let prefix_clone = prefix.clone();
114
115 let install_path = tokio::task::spawn_blocking(move || {
116 let path = extract_bottle(&bottle_path, &cellar_clone)?;
117 relocate_bottle(&path, &prefix_clone)?;
119 Ok::<_, crate::error::Error>(path)
120 })
121 .await
122 .map_err(|e| crate::error::Error::Other(format!("Task join error: {}", e)))??;
123
124 debug!("Extracted {} to {}", name, install_path.display());
125 Ok::<_, crate::error::Error>((name, install_path))
126 });
127 }
128
129 let mut results: Vec<(String, PathBuf)> = Vec::new();
131 while let Some(result) = join_set.join_next().await {
132 match result {
133 Ok(Ok(item)) => results.push(item),
134 Ok(Err(e)) => return Err(e),
135 Err(e) => return Err(crate::error::Error::Other(format!("Task panic: {}", e))),
136 }
137 }
138
139 let mut ordered: Vec<(String, PathBuf)> = Vec::with_capacity(results.len());
141 for name in &order {
142 if let Some(pos) = results.iter().position(|(n, _)| n == name) {
143 ordered.push(results.remove(pos));
144 }
145 }
146
147 info!("Extracted {} bottles", ordered.len());
148 Ok(ordered)
149 }
150
151 pub async fn link_packages(
156 &self,
157 packages: Vec<LinkInfo>,
158 prefix: &Path,
159 ) -> Result<Vec<(String, Vec<PathBuf>)>> {
160 info!(
161 "Linking {} packages with {} concurrent workers",
162 packages.len(),
163 self.config.max_concurrent_links
164 );
165
166 let prefix = prefix.to_path_buf();
167 let semaphore: Arc<Semaphore> = Arc::clone(&self.link_semaphore);
168 let mut join_set = JoinSet::new();
169
170 let order: Vec<String> = packages.iter().map(|p| p.name.clone()).collect();
171
172 for pkg in packages {
173 let prefix = prefix.clone();
174 let semaphore = Arc::clone(&semaphore);
175
176 join_set.spawn(async move {
177 let _permit = semaphore
178 .acquire()
179 .await
180 .map_err(|e| crate::error::Error::Other(format!("Semaphore error: {}", e)))?;
181
182 let name = pkg.name.clone();
183 let install_path = pkg.install_path.clone();
184 let prefix_clone = prefix.clone();
185
186 let linked =
187 tokio::task::spawn_blocking(move || link_package(&install_path, &prefix_clone))
188 .await
189 .map_err(|e| {
190 crate::error::Error::Other(format!("Task join error: {}", e))
191 })??;
192
193 debug!("Linked {} ({} files)", name, linked.len());
194 Ok::<_, crate::error::Error>((name, linked))
195 });
196 }
197
198 let mut results: Vec<(String, Vec<PathBuf>)> = Vec::new();
199 while let Some(result) = join_set.join_next().await {
200 match result {
201 Ok(Ok(item)) => results.push(item),
202 Ok(Err(e)) => return Err(e),
203 Err(e) => return Err(crate::error::Error::Other(format!("Task panic: {}", e))),
204 }
205 }
206
207 let mut ordered: Vec<(String, Vec<PathBuf>)> = Vec::with_capacity(results.len());
209 for name in &order {
210 if let Some(pos) = results.iter().position(|(n, _)| n == name) {
211 ordered.push(results.remove(pos));
212 }
213 }
214
215 info!("Linked {} packages", ordered.len());
216 Ok(ordered)
217 }
218
219 pub async fn install_bottles(
224 &self,
225 bottles: Vec<BottleInfo>,
226 cellar: &Path,
227 prefix: &Path,
228 ) -> Result<Vec<PackageInstallResult>> {
229 let extracted = self.extract_bottles(bottles, cellar, prefix).await?;
231
232 let link_infos: Vec<LinkInfo> = extracted
234 .iter()
235 .map(|(name, install_path)| LinkInfo {
236 name: name.clone(),
237 install_path: install_path.clone(),
238 })
239 .collect();
240
241 let linked = self.link_packages(link_infos, prefix).await?;
242
243 let results: Vec<PackageInstallResult> = extracted
245 .into_iter()
246 .zip(linked)
247 .map(
248 |((name, install_path), (_, linked_files))| PackageInstallResult {
249 name,
250 install_path,
251 linked_files,
252 },
253 )
254 .collect();
255
256 Ok(results)
257 }
258}
259
260impl Default for ParallelInstaller {
261 fn default() -> Self {
262 Self::new()
263 }
264}
265
266#[derive(Debug, Clone)]
268pub struct BottleInfo {
269 pub name: String,
271 pub bottle_path: PathBuf,
273}
274
275#[derive(Debug, Clone)]
277pub struct LinkInfo {
278 pub name: String,
280 pub install_path: PathBuf,
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287
288 #[test]
289 fn test_parallel_config_default() {
290 let config = ParallelConfig::default();
291 assert!(config.max_concurrent_extractions >= 1);
292 assert!(config.max_concurrent_links >= 1);
293 }
294}