stout_install/
parallel.rs1use crate::error::Result;
7use crate::extract::extract_bottle;
8use crate::link::link_package;
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use tokio::sync::Semaphore;
12use tokio::task::JoinSet;
13use tracing::{debug, info};
14
15#[derive(Debug, Clone)]
17pub struct ParallelConfig {
18 pub max_concurrent_extractions: usize,
20 pub max_concurrent_links: usize,
22}
23
24impl Default for ParallelConfig {
25 fn default() -> Self {
26 let cpus = std::thread::available_parallelism()
29 .map(|p| p.get())
30 .unwrap_or(4);
31 Self {
32 max_concurrent_extractions: cpus,
33 max_concurrent_links: 4,
34 }
35 }
36}
37
38#[derive(Debug)]
40pub struct PackageInstallResult {
41 pub name: String,
43 pub install_path: PathBuf,
45 pub linked_files: Vec<PathBuf>,
47}
48
49pub struct ParallelInstaller {
51 config: ParallelConfig,
52 extract_semaphore: Arc<Semaphore>,
53 link_semaphore: Arc<Semaphore>,
54}
55
56impl ParallelInstaller {
57 pub fn new() -> Self {
59 Self::with_config(ParallelConfig::default())
60 }
61
62 pub fn with_config(config: ParallelConfig) -> Self {
64 let extract_semaphore = Arc::new(Semaphore::new(config.max_concurrent_extractions));
65 let link_semaphore = Arc::new(Semaphore::new(config.max_concurrent_links));
66
67 Self {
68 config,
69 extract_semaphore,
70 link_semaphore,
71 }
72 }
73
74 pub async fn extract_bottles(
78 &self,
79 bottles: Vec<BottleInfo>,
80 cellar: &Path,
81 ) -> Result<Vec<(String, PathBuf)>> {
82 info!(
83 "Extracting {} bottles with {} concurrent workers",
84 bottles.len(),
85 self.config.max_concurrent_extractions
86 );
87
88 let cellar = cellar.to_path_buf();
89 let semaphore: Arc<Semaphore> = Arc::clone(&self.extract_semaphore);
90 let mut join_set = JoinSet::new();
91
92 let order: Vec<String> = bottles.iter().map(|b| b.name.clone()).collect();
94
95 for bottle in bottles {
96 let cellar = cellar.clone();
97 let semaphore = Arc::clone(&semaphore);
98
99 join_set.spawn(async move {
100 let _permit = semaphore.acquire().await
102 .map_err(|e| crate::error::Error::Other(format!("Semaphore error: {}", e)))?;
103
104 let name = bottle.name.clone();
106 let bottle_path = bottle.bottle_path.clone();
107 let cellar_clone = cellar.clone();
108
109 let install_path = tokio::task::spawn_blocking(move || {
110 extract_bottle(&bottle_path, &cellar_clone)
111 })
112 .await
113 .map_err(|e| crate::error::Error::Other(format!("Task join error: {}", e)))??;
114
115 debug!("Extracted {} to {}", name, install_path.display());
116 Ok::<_, crate::error::Error>((name, install_path))
117 });
118 }
119
120 let mut results: Vec<(String, PathBuf)> = Vec::new();
122 while let Some(result) = join_set.join_next().await {
123 match result {
124 Ok(Ok(item)) => results.push(item),
125 Ok(Err(e)) => return Err(e),
126 Err(e) => {
127 return Err(crate::error::Error::Other(format!(
128 "Task panic: {}",
129 e
130 )))
131 }
132 }
133 }
134
135 let mut ordered: Vec<(String, PathBuf)> = Vec::with_capacity(results.len());
137 for name in &order {
138 if let Some(pos) = results.iter().position(|(n, _)| n == name) {
139 ordered.push(results.remove(pos));
140 }
141 }
142
143 info!("Extracted {} bottles", ordered.len());
144 Ok(ordered)
145 }
146
147 pub async fn link_packages(
152 &self,
153 packages: Vec<LinkInfo>,
154 prefix: &Path,
155 ) -> Result<Vec<(String, Vec<PathBuf>)>> {
156 info!(
157 "Linking {} packages with {} concurrent workers",
158 packages.len(),
159 self.config.max_concurrent_links
160 );
161
162 let prefix = prefix.to_path_buf();
163 let semaphore: Arc<Semaphore> = Arc::clone(&self.link_semaphore);
164 let mut join_set = JoinSet::new();
165
166 let order: Vec<String> = packages.iter().map(|p| p.name.clone()).collect();
167
168 for pkg in packages {
169 let prefix = prefix.clone();
170 let semaphore = Arc::clone(&semaphore);
171
172 join_set.spawn(async move {
173 let _permit = semaphore.acquire().await
174 .map_err(|e| crate::error::Error::Other(format!("Semaphore error: {}", e)))?;
175
176 let name = pkg.name.clone();
177 let install_path = pkg.install_path.clone();
178 let prefix_clone = prefix.clone();
179
180 let linked = tokio::task::spawn_blocking(move || {
181 link_package(&install_path, &prefix_clone)
182 })
183 .await
184 .map_err(|e| crate::error::Error::Other(format!("Task join error: {}", e)))??;
185
186 debug!("Linked {} ({} files)", name, linked.len());
187 Ok::<_, crate::error::Error>((name, linked))
188 });
189 }
190
191 let mut results: Vec<(String, Vec<PathBuf>)> = Vec::new();
192 while let Some(result) = join_set.join_next().await {
193 match result {
194 Ok(Ok(item)) => results.push(item),
195 Ok(Err(e)) => return Err(e),
196 Err(e) => {
197 return Err(crate::error::Error::Other(format!(
198 "Task panic: {}",
199 e
200 )))
201 }
202 }
203 }
204
205 let mut ordered: Vec<(String, Vec<PathBuf>)> = Vec::with_capacity(results.len());
207 for name in &order {
208 if let Some(pos) = results.iter().position(|(n, _)| n == name) {
209 ordered.push(results.remove(pos));
210 }
211 }
212
213 info!("Linked {} packages", ordered.len());
214 Ok(ordered)
215 }
216
217 pub async fn install_bottles(
222 &self,
223 bottles: Vec<BottleInfo>,
224 cellar: &Path,
225 prefix: &Path,
226 ) -> Result<Vec<PackageInstallResult>> {
227 let extracted = self.extract_bottles(bottles, cellar).await?;
229
230 let link_infos: Vec<LinkInfo> = extracted
232 .iter()
233 .map(|(name, install_path)| LinkInfo {
234 name: name.clone(),
235 install_path: install_path.clone(),
236 })
237 .collect();
238
239 let linked = self.link_packages(link_infos, prefix).await?;
240
241 let results: Vec<PackageInstallResult> = extracted
243 .into_iter()
244 .zip(linked.into_iter())
245 .map(|((name, install_path), (_, linked_files))| PackageInstallResult {
246 name,
247 install_path,
248 linked_files,
249 })
250 .collect();
251
252 Ok(results)
253 }
254}
255
256impl Default for ParallelInstaller {
257 fn default() -> Self {
258 Self::new()
259 }
260}
261
262#[derive(Debug, Clone)]
264pub struct BottleInfo {
265 pub name: String,
267 pub bottle_path: PathBuf,
269}
270
271#[derive(Debug, Clone)]
273pub struct LinkInfo {
274 pub name: String,
276 pub install_path: PathBuf,
278}
279
280#[cfg(test)]
281mod tests {
282 use super::*;
283
284 #[test]
285 fn test_parallel_config_default() {
286 let config = ParallelConfig::default();
287 assert!(config.max_concurrent_extractions >= 1);
288 assert!(config.max_concurrent_links >= 1);
289 }
290}