use crate::{
cli::{
reporters::run_with_reporter,
style::{INFO_STYLE, SUCCESS_STYLE},
},
util::remove_empty_dir,
};
use anyhow::Context as _;
use async_stream::try_stream;
use clap::Args;
use fs_err::tokio as fs;
use futures::{future::BoxFuture, FutureExt as _, Stream, StreamExt as _};
use pesde::{
source::fs::{FsEntry, PackageFs},
Project,
};
use std::{
collections::{HashMap, HashSet},
future::Future,
path::{Path, PathBuf},
};
use tokio::task::JoinSet;
#[derive(Debug, Args)]
pub struct PruneCommand;
async fn read_dir_stream(
dir: &Path,
) -> std::io::Result<impl Stream<Item = std::io::Result<fs::DirEntry>>> {
let mut read_dir = fs::read_dir(dir).await?;
Ok(try_stream! {
while let Some(entry) = read_dir.next_entry().await? {
yield entry;
}
})
}
#[allow(unreachable_code)]
async fn get_nlinks(path: &Path) -> anyhow::Result<u64> {
#[cfg(unix)]
{
use std::os::unix::fs::MetadataExt as _;
let metadata = fs::metadata(path).await?;
return Ok(metadata.nlink());
}
#[cfg(windows)]
{
use std::os::windows::ffi::OsStrExt as _;
use windows::{
core::PWSTR,
Win32::{
Foundation::CloseHandle,
Storage::FileSystem::{
CreateFileW, GetFileInformationByHandle, FILE_ATTRIBUTE_NORMAL,
FILE_GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING,
},
},
};
let path = path.to_path_buf();
return tokio::task::spawn_blocking(move || unsafe {
let handle = CreateFileW(
PWSTR(
path.as_os_str()
.encode_wide()
.chain(std::iter::once(0))
.collect::<Vec<_>>()
.as_mut_ptr(),
),
FILE_GENERIC_READ.0,
FILE_SHARE_READ,
None,
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL,
None,
)?;
let mut info =
windows::Win32::Storage::FileSystem::BY_HANDLE_FILE_INFORMATION::default();
let res = GetFileInformationByHandle(handle, &mut info);
CloseHandle(handle)?;
res?;
Ok(info.nNumberOfLinks as u64)
})
.await
.unwrap();
}
#[cfg(not(any(unix, windows)))]
{
compile_error!("unsupported platform");
}
anyhow::bail!("unsupported platform")
}
#[derive(Debug)]
struct ExtendJoinSet<T: Send + 'static>(JoinSet<T>);
impl<T: Send + 'static, F: Future<Output = T> + Send + 'static> Extend<F> for ExtendJoinSet<T> {
fn extend<I: IntoIterator<Item = F>>(&mut self, iter: I) {
for item in iter {
self.0.spawn(item);
}
}
}
impl<T: Send + 'static> Default for ExtendJoinSet<T> {
fn default() -> Self {
Self(JoinSet::new())
}
}
async fn discover_cas_packages(cas_dir: &Path) -> anyhow::Result<HashMap<PathBuf, PackageFs>> {
fn read_entry(
entry: fs::DirEntry,
) -> BoxFuture<'static, anyhow::Result<HashMap<PathBuf, PackageFs>>> {
async move {
if entry
.metadata()
.await
.context("failed to read entry metadata")?
.is_dir()
{
let mut tasks = read_dir_stream(&entry.path())
.await
.context("failed to read entry directory")?
.map(|entry| async move {
read_entry(entry.context("failed to read inner cas index dir entry")?).await
})
.collect::<ExtendJoinSet<Result<_, anyhow::Error>>>()
.await
.0;
let mut res = HashMap::new();
while let Some(entry) = tasks.join_next().await {
res.extend(entry.unwrap()?);
}
return Ok(res);
}
let contents = fs::read_to_string(entry.path()).await?;
let fs = toml::from_str(&contents).context("failed to deserialize PackageFs")?;
Ok(HashMap::from([(entry.path(), fs)]))
}
.boxed()
}
let mut tasks = ["index", "wally_index", "git_index"]
.into_iter()
.map(|index| cas_dir.join(index))
.map(|index| async move {
let mut res = HashMap::new();
let tasks = match read_dir_stream(&index).await {
Ok(tasks) => tasks,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(res),
Err(e) => return Err(e).context("failed to read cas index directory"),
};
let mut tasks = tasks
.map(|entry| async move {
read_entry(entry.context("failed to read cas index dir entry")?).await
})
.collect::<ExtendJoinSet<Result<_, anyhow::Error>>>()
.await
.0;
while let Some(task) = tasks.join_next().await {
res.extend(task.unwrap()?);
}
Ok(res)
})
.collect::<JoinSet<Result<_, anyhow::Error>>>();
let mut cas_entries = HashMap::new();
while let Some(task) = tasks.join_next().await {
cas_entries.extend(task.unwrap()?);
}
Ok(cas_entries)
}
async fn remove_hashes(cas_dir: &Path) -> anyhow::Result<HashSet<String>> {
let mut res = HashSet::new();
let tasks = match read_dir_stream(cas_dir).await {
Ok(tasks) => tasks,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(res),
Err(e) => return Err(e).context("failed to read cas directory"),
};
let mut tasks = tasks
.map(|cas_entry| async move {
let cas_entry = cas_entry.context("failed to read cas dir entry")?;
let prefix = cas_entry.file_name();
let Some(prefix) = prefix.to_str() else {
return Ok(None);
};
if prefix.len() != 2 {
return Ok(None);
}
let mut tasks = read_dir_stream(&cas_entry.path())
.await
.context("failed to read hash directory")?
.map(|hash_entry| {
let prefix = prefix.to_string();
async move {
let hash_entry = hash_entry.context("failed to read hash dir entry")?;
let hash = hash_entry.file_name();
let hash = hash.to_str().expect("non-UTF-8 hash").to_string();
let hash = format!("{prefix}{hash}");
let path = hash_entry.path();
let nlinks = get_nlinks(&path)
.await
.context("failed to count file usage")?;
if nlinks > 1 {
return Ok(None);
}
fs::remove_file(&path)
.await
.context("failed to remove unused file")?;
if let Some(parent) = path.parent() {
remove_empty_dir(parent).await?;
}
Ok(Some(hash))
}
})
.collect::<ExtendJoinSet<Result<_, anyhow::Error>>>()
.await
.0;
let mut removed_hashes = HashSet::new();
while let Some(removed_hash) = tasks.join_next().await {
let Some(hash) = removed_hash.unwrap()? else {
continue;
};
removed_hashes.insert(hash);
}
Ok(Some(removed_hashes))
})
.collect::<ExtendJoinSet<Result<_, anyhow::Error>>>()
.await
.0;
while let Some(removed_hashes) = tasks.join_next().await {
let Some(removed_hashes) = removed_hashes.unwrap()? else {
continue;
};
res.extend(removed_hashes);
}
Ok(res)
}
impl PruneCommand {
pub async fn run(self, project: Project) -> anyhow::Result<()> {
let (cas_entries, removed_hashes) = run_with_reporter(|_, root_progress, _| async {
let root_progress = root_progress;
root_progress.reset();
root_progress.set_message("discover packages");
let cas_entries = discover_cas_packages(project.cas_dir()).await?;
root_progress.reset();
root_progress.set_message("remove unused files");
let removed_hashes = remove_hashes(project.cas_dir()).await?;
Ok::<_, anyhow::Error>((cas_entries, removed_hashes))
})
.await?;
let mut tasks = JoinSet::new();
let mut removed_packages = 0usize;
'entry: for (path, fs) in cas_entries {
let PackageFs::Cas(entries) = fs else {
continue;
};
for entry in entries.into_values() {
let FsEntry::File(hash) = entry else {
continue;
};
if removed_hashes.contains(&hash) {
let cas_dir = project.cas_dir().to_path_buf();
tasks.spawn(async move {
fs::remove_file(&path)
.await
.context("failed to remove unused file")?;
let mut path = &*path;
while let Some(parent) = path.parent() {
if parent == cas_dir {
break;
}
remove_empty_dir(parent).await?;
path = parent;
}
Ok::<_, anyhow::Error>(())
});
removed_packages += 1;
continue 'entry;
}
}
}
while let Some(task) = tasks.join_next().await {
task.unwrap()?;
}
println!(
"{} removed {} unused packages and {} individual files!",
SUCCESS_STYLE.apply_to("done!"),
INFO_STYLE.apply_to(removed_packages),
INFO_STYLE.apply_to(removed_hashes.len())
);
Ok(())
}
}