use crate::default_retry_policy;
use crate::progress::{
default_progress_style, finished_progress_style, global_multi_progress,
ProgressBarMessageFormatter,
};
use futures::future::ready;
use futures::{stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use indicatif::ProgressBar;
use itertools::Itertools;
use miette::{IntoDiagnostic, WrapErr};
use rattler::install::{
link_package, unlink_package, InstallDriver, InstallOptions, Transaction, TransactionOperation,
};
use rattler::package_cache::PackageCache;
use rattler_conda_types::{PrefixRecord, RepoDataRecord};
use reqwest_middleware::ClientWithMiddleware;
use std::cmp::Ordering;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
pub async fn execute_transaction(
package_cache: Arc<PackageCache>,
transaction: &Transaction<PrefixRecord, RepoDataRecord>,
prefix_records: &[PrefixRecord],
target_prefix: PathBuf,
download_client: ClientWithMiddleware,
top_level_progress: ProgressBar,
) -> miette::Result<()> {
let install_driver = InstallDriver::new(100, Some(prefix_records));
let install_options = InstallOptions {
python_info: transaction.python_info.clone(),
platform: Some(transaction.platform),
..Default::default()
};
let multi_progress = global_multi_progress();
let total_packages_to_download = transaction
.operations
.iter()
.filter(|op| op.record_to_install().is_some())
.count();
let download_pb = if total_packages_to_download > 0 {
let pb = multi_progress
.insert_after(
&top_level_progress,
indicatif::ProgressBar::new(total_packages_to_download as u64),
)
.with_style(default_progress_style())
.with_finish(indicatif::ProgressFinish::WithMessage("Done!".into()))
.with_prefix("downloading");
pb.enable_steady_tick(Duration::from_millis(100));
Some(ProgressBarMessageFormatter::new(pb))
} else {
None
};
let total_operations = transaction.operations.len();
let link_pb = {
let first_pb = download_pb
.as_ref()
.map(ProgressBarMessageFormatter::progress_bar)
.unwrap_or(&top_level_progress);
let pb = multi_progress
.insert_after(
first_pb,
indicatif::ProgressBar::new(total_operations as u64),
)
.with_style(default_progress_style())
.with_finish(indicatif::ProgressFinish::WithMessage("Done!".into()))
.with_prefix("linking");
pb.enable_steady_tick(Duration::from_millis(100));
ProgressBarMessageFormatter::new(pb)
};
let sorted_operations = transaction
.operations
.iter()
.enumerate()
.sorted_unstable_by(|&(a_idx, a), &(b_idx, b)| {
match (a.record_to_install(), b.record_to_install()) {
(Some(a), Some(b)) => {
let a_size = a.package_record.size.or(a.package_record.legacy_bz2_size);
let b_size = b.package_record.size.or(b.package_record.legacy_bz2_size);
if let (Some(a_size), Some(b_size)) = (a_size, b_size) {
match a_size.cmp(&b_size) {
Ordering::Less => return Ordering::Greater,
Ordering::Greater => return Ordering::Less,
Ordering::Equal => {}
}
}
}
(Some(_), None) => {
return Ordering::Less;
}
(None, Some(_)) => {
return Ordering::Greater;
}
_ => {}
}
a_idx.cmp(&b_idx)
})
.map(|(_, op)| op);
let result = stream::iter(sorted_operations.into_iter())
.map(Ok)
.try_for_each_concurrent(50, |op| {
let target_prefix = target_prefix.clone();
let download_client = download_client.clone();
let package_cache = &package_cache;
let install_driver = &install_driver;
let download_pb = download_pb.as_ref();
let link_pb = &link_pb;
let install_options = &install_options;
async move {
execute_operation(
&target_prefix,
download_client,
package_cache,
install_driver,
download_pb,
link_pb,
op,
install_options,
)
.await
}
})
.await;
install_driver
.post_process(transaction, &target_prefix)
.into_diagnostic()?;
if let Some(download_pb) = download_pb {
download_pb.into_progress_bar().finish_and_clear();
}
link_pb.into_progress_bar().finish_and_clear();
result
}
#[allow(clippy::too_many_arguments)]
async fn execute_operation(
target_prefix: &Path,
download_client: ClientWithMiddleware,
package_cache: &PackageCache,
install_driver: &InstallDriver,
download_pb: Option<&ProgressBarMessageFormatter>,
link_pb: &ProgressBarMessageFormatter,
op: &TransactionOperation<PrefixRecord, RepoDataRecord>,
install_options: &InstallOptions,
) -> miette::Result<()> {
let install_record = op.record_to_install();
let remove_record = op.record_to_remove();
let remove_future = if let Some(remove_record) = remove_record {
link_pb
.wrap(
format!(
"removing {} {}",
&remove_record
.repodata_record
.package_record
.name
.as_source(),
&remove_record.repodata_record.package_record.version
),
remove_package_from_environment(target_prefix, remove_record),
)
.left_future()
} else {
ready(Ok(())).right_future()
};
let cached_package_dir_fut = if let Some(install_record) = install_record {
async {
let task = if let Some(pb) = download_pb {
Some(
pb.start(install_record.package_record.name.as_source().to_string())
.await,
)
} else {
None
};
let result = package_cache
.get_or_fetch_from_url_with_retry(
&install_record.package_record,
install_record.url.clone(),
download_client.clone(),
default_retry_policy(),
)
.map_ok(|cache_dir| Some((install_record.clone(), cache_dir)))
.await
.into_diagnostic()
.with_context(|| format!("failed to download package {}", install_record.url));
if let Some(task) = task {
let pb = task.finish().await;
pb.inc(1);
if pb.length() == Some(pb.position()) {
pb.set_style(finished_progress_style());
}
}
result
}
.left_future()
} else {
ready(Ok(None)).right_future()
};
let (_, install_package) = tokio::try_join!(remove_future, cached_package_dir_fut)?;
if let Some((record, package_dir)) = install_package {
link_pb
.wrap(
record.package_record.name.as_source().to_string(),
install_package_to_environment(
target_prefix,
package_dir,
record.clone(),
install_driver,
install_options,
),
)
.await?;
}
link_pb.progress_bar().inc(1);
if link_pb.progress_bar().length() == Some(link_pb.progress_bar().position()) {
link_pb.progress_bar().set_style(finished_progress_style());
}
Ok(())
}
async fn install_package_to_environment(
target_prefix: &Path,
package_dir: PathBuf,
repodata_record: RepoDataRecord,
install_driver: &InstallDriver,
install_options: &InstallOptions,
) -> miette::Result<()> {
let paths = link_package(
&package_dir,
target_prefix,
install_driver,
install_options.clone(),
)
.await
.into_diagnostic()?;
let prefix_record = PrefixRecord {
repodata_record,
package_tarball_full_path: None,
extracted_package_dir: Some(package_dir),
files: paths
.iter()
.map(|entry| entry.relative_path.clone())
.collect(),
paths_data: paths.into(),
requested_spec: None,
link: None,
};
let target_prefix = target_prefix.to_path_buf();
match tokio::task::spawn_blocking(move || {
let conda_meta_path = target_prefix.join("conda-meta");
std::fs::create_dir_all(&conda_meta_path)?;
let pkg_meta_path = conda_meta_path.join(prefix_record.file_name());
prefix_record.write_to_path(pkg_meta_path, true)
})
.await
{
Ok(result) => Ok(result.into_diagnostic()?),
Err(err) => {
if let Ok(panic) = err.try_into_panic() {
std::panic::resume_unwind(panic);
}
Ok(())
}
}
}
async fn remove_package_from_environment(
target_prefix: &Path,
package: &PrefixRecord,
) -> miette::Result<()> {
unlink_package(target_prefix, package)
.await
.into_diagnostic()
}