use futures::stream::FuturesUnordered;
use futures::StreamExt;
use miette::IntoDiagnostic;
use rattler_conda_types::PrefixRecord;
use std::path::{Path, PathBuf};
use tokio::task::JoinHandle;
#[derive(Debug, Clone)]
pub struct Prefix {
root: PathBuf,
}
impl Prefix {
pub fn new(path: impl Into<PathBuf>) -> Self {
let root = path.into();
Self { root }
}
pub fn root(&self) -> &Path {
&self.root
}
pub async fn find_installed_packages(
&self,
concurrency_limit: Option<usize>,
) -> miette::Result<Vec<PrefixRecord>> {
let concurrency_limit = concurrency_limit.unwrap_or(100);
let mut meta_futures =
FuturesUnordered::<JoinHandle<Result<PrefixRecord, std::io::Error>>>::new();
let mut result = Vec::new();
for entry in std::fs::read_dir(self.root.join("conda-meta"))
.into_iter()
.flatten()
{
let entry = entry.into_diagnostic()?;
let path = entry.path();
if path.ends_with(".json") {
continue;
}
if meta_futures.len() >= concurrency_limit {
match meta_futures
.next()
.await
.expect("we know there are pending futures")
{
Ok(record) => result.push(record.into_diagnostic()?),
Err(e) => {
if let Ok(panic) = e.try_into_panic() {
std::panic::resume_unwind(panic);
}
return Ok(result);
}
}
}
let future = tokio::task::spawn_blocking(move || PrefixRecord::from_path(path));
meta_futures.push(future);
}
while let Some(record) = meta_futures.next().await {
match record {
Ok(record) => result.push(record.into_diagnostic()?),
Err(e) => {
if let Ok(panic) = e.try_into_panic() {
std::panic::resume_unwind(panic);
}
return Ok(result);
}
}
}
Ok(result)
}
}