1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
use std::{
borrow::Borrow,
collections::{HashMap, HashSet},
path::{Path, PathBuf},
sync::{Arc, Mutex, MutexGuard},
};
use indexmap::IndexSet;
use itertools::Itertools;
use rattler_conda_types::{prefix::Prefix, prefix_record::PathType, PackageRecord, PrefixRecord};
use simple_spawn_blocking::{tokio::run_blocking_task, Cancelled};
use thiserror::Error;
use tokio::sync::{AcquireError, OwnedSemaphorePermit, Semaphore};
use super::{
clobber_registry::{ClobberError, ClobberMode, ClobberRegistry, ClobberedPath},
link_script::{PrePostLinkError, PrePostLinkResult},
unlink::{recursively_remove_empty_directories, UnlinkError},
Transaction, TransactionOperation,
};
use crate::install::link_script::LinkScriptError;
/// Packages can mostly be installed in isolation and therefore in parallel.
/// However, when installing a large number of packages at the same time the
/// different installation tasks start competing for resources. The
/// [`InstallDriver`] helps to assist in making sure that tasks don't starve
/// each other from resource as well as making sure that due to the large number
/// of requests the process doesn't try to acquire more resources than the
/// system has available.
pub struct InstallDriver {
io_concurrency_semaphore: Option<Arc<Semaphore>>,
pub(crate) clobber_registry: Arc<Mutex<ClobberRegistry>>,
execute_link_scripts: bool,
clobber_mode: ClobberMode,
}
impl Default for InstallDriver {
fn default() -> Self {
Self::builder()
.execute_link_scripts(false)
.with_io_concurrency_limit(100)
.finish()
}
}
/// A builder to configure a new `InstallDriver`.
#[derive(Debug, Default)]
pub struct InstallDriverBuilder {
io_concurrency_semaphore: Option<Arc<Semaphore>>,
clobber_registry: Option<ClobberRegistry>,
execute_link_scripts: bool,
clobber_mode: ClobberMode,
}
/// The result of the post-processing step.
#[derive(Debug)]
pub struct PostProcessResult {
/// The result of running the post link scripts. This is only present if
/// running the scripts is allowed.
pub post_link_result: Option<Result<PrePostLinkResult, LinkScriptError>>,
/// The paths that were clobbered during the installation process.
pub clobbered_paths: HashMap<PathBuf, ClobberedPath>,
}
/// An error that might have occurred during post-processing
#[derive(Debug, Error)]
pub enum PostProcessingError {
#[error("failed to unclobber clobbered files")]
ClobberError(#[from] ClobberError),
/// Failed to determine the currently installed packages.
#[error("failed to determine the installed packages")]
FailedToDetectInstalledPackages(#[source] std::io::Error),
/// Clobbering was detected and the clobber mode is set to error.
#[error("{} file(s) are provided by multiple packages", .0.len())]
ClobberingDetected(HashMap<PathBuf, ClobberedPath>),
}
impl InstallDriverBuilder {
/// Sets an optional IO concurrency limit. This is used to make sure
/// that the system doesn't acquire more IO resources than the system has
/// available.
pub fn with_io_concurrency_limit(self, limit: usize) -> Self {
Self {
io_concurrency_semaphore: Some(Arc::new(Semaphore::new(limit))),
..self
}
}
/// Sets an optional IO concurrency semaphore. This is used to make sure
/// that the system doesn't acquire more IO resources than the system has
/// available.
pub fn with_io_concurrency_semaphore(self, io_concurrency_semaphore: Arc<Semaphore>) -> Self {
Self {
io_concurrency_semaphore: Some(io_concurrency_semaphore),
..self
}
}
/// Sets the prefix records that are present in the current environment.
/// This is used to initialize the clobber registry.
pub fn with_prefix_records<'i>(
self,
prefix_records: impl IntoIterator<Item = &'i PrefixRecord>,
) -> Self {
Self {
clobber_registry: Some(ClobberRegistry::new(prefix_records)),
..self
}
}
/// Sets whether to execute link scripts or not.
pub fn execute_link_scripts(self, execute_link_scripts: bool) -> Self {
Self {
execute_link_scripts,
..self
}
}
/// Sets the clobber mode. This controls what happens when multiple packages
/// install the same file path.
///
/// By default, [`ClobberMode::Rename`] is used, which renames the "losing"
/// files to a `__clobbers__/` directory. Use [`ClobberMode::Error`] to
/// instead raise an error when clobbering is detected.
pub fn clobber_mode(self, clobber_mode: ClobberMode) -> Self {
Self {
clobber_mode,
..self
}
}
pub fn finish(self) -> InstallDriver {
InstallDriver {
io_concurrency_semaphore: self.io_concurrency_semaphore,
clobber_registry: self
.clobber_registry
.map(Mutex::new)
.map(Arc::new)
.unwrap_or_default(),
execute_link_scripts: self.execute_link_scripts,
clobber_mode: self.clobber_mode,
}
}
}
impl InstallDriver {
/// Constructs a builder to configure a new `InstallDriver`.
pub fn builder() -> InstallDriverBuilder {
InstallDriverBuilder::default()
}
/// Returns a permit that will allow the caller to perform IO operations.
/// This is used to make sure that the system doesn't try to acquire
/// more IO resources than the system has available.
pub async fn acquire_io_permit(&self) -> Result<Option<OwnedSemaphorePermit>, AcquireError> {
match self.io_concurrency_semaphore.clone() {
None => Ok(None),
Some(semaphore) => Ok(Some(semaphore.acquire_owned().await?)),
}
}
/// Return a locked reference to the paths registry. This is used to make
/// sure that the same path is not installed twice.
pub fn clobber_registry(&self) -> MutexGuard<'_, ClobberRegistry> {
self.clobber_registry.lock().unwrap()
}
/// Call this before any packages are installed to perform any pre
/// processing that is required.
pub fn pre_process<Old: Borrow<PrefixRecord>, New>(
&self,
transaction: &Transaction<Old, New>,
target_prefix: &Path,
reporter: Option<&dyn crate::install::installer::Reporter>,
) -> Result<Option<PrePostLinkResult>, PrePostLinkError> {
let mut result = None;
if self.execute_link_scripts {
match self.run_pre_unlink_scripts(transaction, target_prefix, reporter) {
Ok(res) => {
result = Some(res);
}
Err(e) => {
tracing::error!("Error running pre-unlink scripts: {:?}", e);
}
}
}
// For all packages that are removed, we need to remove menuinst entries as well
for record in transaction.removed_packages() {
let prefix_record: &PrefixRecord = record.borrow();
if !prefix_record.installed_system_menus.is_empty() {
match rattler_menuinst::remove_menu_items(&prefix_record.installed_system_menus) {
Ok(_) => {}
Err(e) => {
tracing::warn!("Failed to remove menu item: {}", e);
}
}
}
}
Ok(result)
}
/// Runs a blocking task that will execute on a separate thread. The task is
/// not started until an IO permit is acquired. This is used to make
/// sure that the system doesn't try to acquire more IO resources than
/// the system has available.
pub async fn run_blocking_io_task<
T: Send + 'static,
E: Send + From<Cancelled> + 'static,
F: FnOnce() -> Result<T, E> + Send + 'static,
>(
&self,
f: F,
) -> Result<T, E> {
let permit = self.acquire_io_permit().await.map_err(|_err| Cancelled)?;
run_blocking_task(move || {
let _permit = permit;
f()
})
.await
}
/// Call this after all packages have been installed to perform any post
/// processing that is required.
///
/// This function will select a winner among multiple packages that might
/// write to a single package and will also execute any
/// `post-link.sh/bat` scripts
pub fn post_process<Old: Borrow<PrefixRecord> + AsRef<New>, New: AsRef<PackageRecord>>(
&self,
transaction: &Transaction<Old, New>,
target_prefix: &Prefix,
reporter: Option<&dyn crate::install::installer::Reporter>,
) -> Result<PostProcessResult, PostProcessingError> {
let prefix_records: Vec<PrefixRecord> = PrefixRecord::collect_from_prefix(target_prefix)
.map_err(PostProcessingError::FailedToDetectInstalledPackages)?;
let required_packages =
PackageRecord::sort_topologically(prefix_records.iter().collect::<Vec<_>>());
let clobbered_paths = self
.clobber_registry()
.unclobber(&required_packages, target_prefix)?;
// If the clobber mode is set to error, return an error if any
// clobbering was detected.
if self.clobber_mode == ClobberMode::Error && !clobbered_paths.is_empty() {
return Err(PostProcessingError::ClobberingDetected(clobbered_paths));
}
self.remove_empty_directories(&transaction.operations, &prefix_records, target_prefix)
.unwrap_or_else(|e| {
tracing::warn!("Failed to remove empty directories: {} (ignored)", e);
});
let post_link_result = if self.execute_link_scripts {
Some(self.run_post_link_scripts(
transaction,
&required_packages,
target_prefix,
reporter,
))
} else {
None
};
Ok(PostProcessResult {
post_link_result,
clobbered_paths,
})
}
/// Remove all empty directories that are not part of the new prefix
/// records.
pub fn remove_empty_directories<Old: Borrow<PrefixRecord>, New>(
&self,
operations: &[TransactionOperation<Old, New>],
new_prefix_records: &[PrefixRecord],
target_prefix: &Path,
) -> Result<(), UnlinkError> {
let mut keep_directories = HashSet::new();
// find all forced directories in the prefix records
for record in new_prefix_records {
for paths in record.paths_data.paths.iter() {
if paths.path_type == PathType::Directory {
let path = target_prefix.join(&paths.relative_path);
keep_directories.insert(path);
}
}
}
// find all removed directories
for record in operations
.iter()
.filter_map(|op| op.record_to_remove().map(Borrow::borrow))
{
let mut removed_directories = HashSet::new();
for paths in record.paths_data.paths.iter() {
if paths.path_type != PathType::Directory {
if let Some(parent) = paths.relative_path.parent() {
removed_directories.insert(parent);
}
}
}
let is_python_noarch = record.repodata_record.package_record.noarch.is_python();
// Sort the directories by length, so that we delete the deepest directories
// first.
let mut directories: IndexSet<&Path> =
removed_directories.into_iter().sorted().collect();
while let Some(directory) = directories.pop() {
let directory_path = target_prefix.join(directory);
let removed_until = recursively_remove_empty_directories(
&directory_path,
target_prefix,
is_python_noarch,
&keep_directories,
)?;
// The directory is not empty which means our parent directory is also not
// empty, recursively remove the parent directory from the set
// as well.
while let Some(parent) = removed_until.parent() {
if !directories.shift_remove(parent) {
break;
}
}
}
}
Ok(())
}
}