use std::{
future::Future,
path::{Path, PathBuf},
sync::Arc,
};
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::sync::Semaphore;
use tracing::instrument::WithSubscriber as _;
pub fn normalize_path<T: FnOnce(&str)>(
path: &Path,
relative_to: &Path,
repo_root: &Path,
warn: T,
) -> Option<PathBuf> {
let target_path = if let Some(parent) = relative_to.parent() {
parent.join(path)
} else {
warn("couldn't get relative_to's parent");
return None;
};
let target_path = match target_path.canonicalize() {
Ok(canonical) => canonical,
Err(e) => {
warn(&format!("error occurred in canonicalize: {e:?}"));
return None;
}
};
let target_path = match target_path.strip_prefix(repo_root) {
Ok(stripped) => stripped,
Err(e) => {
warn(&format!("error occurred stripping repo root: {e:?}"));
return None;
}
};
Some(PathBuf::from(target_path))
}
pub async fn spawn_limited_concurrency<F>(max_concurrency: usize, futures: Vec<F>) -> Vec<F::Output>
where
F: Future + Send, F::Output: Send,
{
let mut results = Vec::with_capacity(futures.len());
let semaphore = Arc::new(Semaphore::new(max_concurrency));
let mut pending = FuturesUnordered::new();
for future in futures {
let sem = semaphore.clone();
pending.push(
async move {
let _permit = sem.acquire().await.unwrap();
future.await
}
.with_current_subscriber(),
);
}
while let Some(result) = pending.next().await {
results.push(result);
}
results
}