use anyhow::{anyhow, Context};
use async_recursion::async_recursion;
use tracing::instrument;
use crate::progress;
#[derive(Debug, thiserror::Error)]
#[error("{source:#}")]
pub struct Error {
#[source]
pub source: anyhow::Error,
pub summary: Summary,
}
impl Error {
#[must_use]
pub fn new(source: anyhow::Error, summary: Summary) -> Self {
Error { source, summary }
}
}
#[derive(Copy, Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
pub struct Summary {
pub files_created: usize,
pub directories_created: usize,
pub bytes_written: u64,
}
impl std::ops::Add for Summary {
type Output = Self;
fn add(self, other: Self) -> Self {
Self {
files_created: self.files_created + other.files_created,
directories_created: self.directories_created + other.directories_created,
bytes_written: self.bytes_written + other.bytes_written,
}
}
}
impl std::fmt::Display for Summary {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"files created: {}\n\
directories created: {}\n\
bytes written: {}",
self.files_created,
self.directories_created,
bytesize::ByteSize(self.bytes_written)
)
}
}
#[derive(Debug, Clone)]
pub struct FileGenConfig {
pub root: std::path::PathBuf,
pub dirwidth: Vec<usize>,
pub numfiles: usize,
pub filesize: usize,
pub writebuf: usize,
pub chunk_size: u64,
pub leaf_files: bool,
}
impl FileGenConfig {
pub fn new(
root: impl Into<std::path::PathBuf>,
dirwidth: Vec<usize>,
numfiles: usize,
filesize: usize,
) -> Self {
Self {
root: root.into(),
dirwidth,
numfiles,
filesize,
writebuf: 1024 * 1024, chunk_size: 0,
leaf_files: false,
}
}
}
#[instrument(skip(prog_track))]
pub async fn write_file(
prog_track: &'static progress::Progress,
path: std::path::PathBuf,
mut filesize: usize,
bufsize: usize,
chunk_size: u64,
) -> Result<Summary, Error> {
use tokio::io::AsyncWriteExt;
let _permit = throttle::open_file_permit().await;
throttle::get_file_iops_tokens(chunk_size, filesize as u64).await;
let _ops_guard = prog_track.ops.guard();
let original_filesize = filesize;
let mut bytes = vec![0u8; bufsize];
let mut file = tokio::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(false)
.open(&path)
.await
.with_context(|| format!("Error opening {:?}", &path))
.map_err(|err| Error::new(err, Default::default()))?;
while filesize > 0 {
{
rand::fill(&mut bytes[..]);
}
let writesize = std::cmp::min(filesize, bufsize);
file.write_all(&bytes[..writesize])
.await
.with_context(|| format!("Error writing to {:?}", &path))
.map_err(|err| Error::new(err, Default::default()))?;
filesize -= writesize;
prog_track.bytes_copied.add(writesize as u64);
}
prog_track.files_copied.inc();
Ok(Summary {
files_created: 1,
bytes_written: original_filesize as u64,
..Default::default()
})
}
#[async_recursion]
#[instrument(skip(prog_track))]
pub async fn filegen(
prog_track: &'static progress::Progress,
config: &FileGenConfig,
) -> Result<Summary, Error> {
let FileGenConfig {
root,
dirwidth,
numfiles,
filesize,
writebuf,
chunk_size,
leaf_files,
} = config;
let numdirs = *dirwidth.first().unwrap_or(&0);
let mut join_set = tokio::task::JoinSet::new();
for i in 0..numdirs {
let path = root.join(format!("dir{i}"));
let next_dirwidth = dirwidth[1..].to_vec();
let recurse_config = FileGenConfig {
root: path.clone(),
dirwidth: next_dirwidth,
numfiles: *numfiles,
filesize: *filesize,
writebuf: *writebuf,
chunk_size: *chunk_size,
leaf_files: *leaf_files,
};
let recurse = || async move {
tokio::fs::create_dir(&path)
.await
.with_context(|| format!("Error creating directory {:?}", &path))
.map_err(|err| Error::new(err, Default::default()))?;
prog_track.directories_created.inc();
let dir_summary = Summary {
directories_created: 1,
..Default::default()
};
let recurse_summary = filegen(prog_track, &recurse_config).await?;
Ok(dir_summary + recurse_summary)
};
join_set.spawn(recurse());
}
let is_leaf = dirwidth.is_empty();
let should_generate_files = !leaf_files || is_leaf;
if should_generate_files {
for i in 0..*numfiles {
throttle::get_ops_token().await;
let path = root.join(format!("file{i}"));
join_set.spawn(write_file(
prog_track,
path,
*filesize,
*writebuf,
*chunk_size,
));
}
}
let mut success = true;
let mut last_error: Option<anyhow::Error> = None;
let mut filegen_summary = Summary::default();
while let Some(res) = join_set.join_next().await {
match res.map_err(|err| Error::new(err.into(), Default::default()))? {
Ok(summary) => filegen_summary = filegen_summary + summary,
Err(error) => {
tracing::error!("filegen: {:?} failed with: {:#}", root, &error);
filegen_summary = filegen_summary + error.summary;
if last_error.is_none() {
last_error = Some(error.source);
}
success = false;
}
}
}
if !success {
let error = if let Some(error) = last_error {
error.context(format!("filegen: {:?} failed!", &root))
} else {
anyhow!("filegen: {:?} failed!", &root)
};
return Err(Error::new(error, filegen_summary));
}
Ok(filegen_summary)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::testutils;
use std::os::unix::fs::PermissionsExt;
use tracing_test::traced_test;
static PROGRESS: std::sync::LazyLock<progress::Progress> =
std::sync::LazyLock::new(progress::Progress::new);
#[tokio::test]
#[traced_test]
async fn test_basic_filegen() -> Result<(), anyhow::Error> {
let tmp_dir = testutils::create_temp_dir().await?;
let test_path = tmp_dir.as_path();
let config = FileGenConfig {
root: test_path.to_path_buf(),
dirwidth: vec![2],
numfiles: 3,
filesize: 100,
writebuf: 50,
chunk_size: 0,
leaf_files: false,
};
let summary = filegen(&PROGRESS, &config).await?;
assert_eq!(summary.files_created, 9);
assert_eq!(summary.directories_created, 2);
assert_eq!(summary.bytes_written, 900);
assert!(test_path.join("file0").exists()); assert!(test_path.join("dir0").join("file0").exists());
assert!(test_path.join("dir0").join("file1").exists());
assert!(test_path.join("dir0").join("file2").exists());
assert!(test_path.join("dir1").join("file0").exists());
assert!(test_path.join("dir1").join("file1").exists());
assert!(test_path.join("dir1").join("file2").exists());
let metadata = tokio::fs::metadata(test_path.join("dir0").join("file0")).await?;
assert_eq!(metadata.len(), 100);
tokio::fs::remove_dir_all(test_path).await?;
Ok(())
}
#[tokio::test]
#[traced_test]
async fn test_nested_filegen() -> Result<(), anyhow::Error> {
let tmp_dir = testutils::create_temp_dir().await?;
let test_path = tmp_dir.as_path();
let config = FileGenConfig {
root: test_path.to_path_buf(),
dirwidth: vec![2, 3],
numfiles: 4,
filesize: 50,
writebuf: 25,
chunk_size: 0,
leaf_files: false,
};
let summary = filegen(&PROGRESS, &config).await?;
assert_eq!(summary.files_created, 36);
assert_eq!(summary.directories_created, 8);
assert_eq!(summary.bytes_written, 1800);
assert!(test_path.join("file0").exists()); assert!(test_path.join("dir0").join("file0").exists()); assert!(test_path.join("dir0").join("dir0").join("file0").exists());
assert!(test_path.join("dir0").join("dir2").join("file3").exists());
assert!(test_path.join("dir1").join("dir1").join("file2").exists());
tokio::fs::remove_dir_all(test_path).await?;
Ok(())
}
#[tokio::test]
#[traced_test]
async fn test_deeply_nested_filegen() -> Result<(), anyhow::Error> {
let tmp_dir = testutils::create_temp_dir().await?;
let test_path = tmp_dir.as_path();
let config = FileGenConfig {
root: test_path.to_path_buf(),
dirwidth: vec![2, 2, 2],
numfiles: 2,
filesize: 10,
writebuf: 10,
chunk_size: 0,
leaf_files: false,
};
let summary = filegen(&PROGRESS, &config).await?;
assert_eq!(summary.files_created, 30);
assert_eq!(summary.directories_created, 14);
assert_eq!(summary.bytes_written, 300);
assert!(test_path.join("file0").exists()); assert!(test_path
.join("dir0")
.join("dir0")
.join("dir0")
.join("file0")
.exists());
assert!(test_path
.join("dir1")
.join("dir1")
.join("dir1")
.join("file1")
.exists());
tokio::fs::remove_dir_all(test_path).await?;
Ok(())
}
#[tokio::test]
#[traced_test]
async fn test_single_file() -> Result<(), anyhow::Error> {
let tmp_dir = testutils::create_temp_dir().await?;
let test_path = tmp_dir.as_path();
let config = FileGenConfig {
root: test_path.to_path_buf(),
dirwidth: vec![],
numfiles: 5,
filesize: 200,
writebuf: 100,
chunk_size: 0,
leaf_files: false,
};
let summary = filegen(&PROGRESS, &config).await?;
assert_eq!(summary.files_created, 5);
assert_eq!(summary.directories_created, 0);
assert_eq!(summary.bytes_written, 1000); for i in 0..5 {
let file_path = test_path.join(format!("file{i}"));
assert!(file_path.exists());
let metadata = tokio::fs::metadata(&file_path).await?;
assert_eq!(metadata.len(), 200);
}
tokio::fs::remove_dir_all(test_path).await?;
Ok(())
}
#[tokio::test]
#[traced_test]
async fn test_zero_files() -> Result<(), anyhow::Error> {
let tmp_dir = testutils::create_temp_dir().await?;
let test_path = tmp_dir.as_path();
let config = FileGenConfig {
root: test_path.to_path_buf(),
dirwidth: vec![3, 2],
numfiles: 0,
filesize: 100,
writebuf: 50,
chunk_size: 0,
leaf_files: false,
};
let summary = filegen(&PROGRESS, &config).await?;
assert_eq!(summary.files_created, 0);
assert_eq!(summary.directories_created, 9);
assert_eq!(summary.bytes_written, 0);
assert!(test_path.join("dir0").join("dir0").exists());
assert!(test_path.join("dir2").join("dir1").exists());
assert!(!test_path.join("dir0").join("file0").exists());
tokio::fs::remove_dir_all(test_path).await?;
Ok(())
}
#[tokio::test]
#[traced_test]
async fn test_leaf_files_only() -> Result<(), anyhow::Error> {
let tmp_dir = testutils::create_temp_dir().await?;
let test_path = tmp_dir.as_path();
let config = FileGenConfig {
root: test_path.to_path_buf(),
dirwidth: vec![2, 3],
numfiles: 4,
filesize: 50,
writebuf: 25,
chunk_size: 0,
leaf_files: true,
};
let summary = filegen(&PROGRESS, &config).await?;
assert_eq!(summary.files_created, 24);
assert_eq!(summary.directories_created, 8);
assert_eq!(summary.bytes_written, 1200);
assert!(!test_path.join("file0").exists()); assert!(!test_path.join("dir0").join("file0").exists()); assert!(!test_path.join("dir1").join("file0").exists());
assert!(test_path.join("dir0").join("dir0").join("file0").exists());
assert!(test_path.join("dir0").join("dir0").join("file3").exists());
assert!(test_path.join("dir0").join("dir2").join("file0").exists());
assert!(test_path.join("dir1").join("dir1").join("file0").exists());
tokio::fs::remove_dir_all(test_path).await?;
Ok(())
}
#[tokio::test]
#[traced_test]
async fn test_permission_error_includes_root_cause() -> Result<(), anyhow::Error> {
let tmp_dir = testutils::create_temp_dir().await?;
let root = tmp_dir.join("readonly");
tokio::fs::create_dir(&root).await?;
tokio::fs::set_permissions(&root, std::fs::Permissions::from_mode(0o555)).await?;
let config = FileGenConfig {
root: root.clone(),
dirwidth: Vec::new(),
numfiles: 1,
filesize: 10,
writebuf: 10,
chunk_size: 0,
leaf_files: false,
};
let result = filegen(&PROGRESS, &config).await;
tokio::fs::set_permissions(&root, std::fs::Permissions::from_mode(0o755)).await?;
assert!(
result.is_err(),
"filegen inside read-only directory should fail"
);
let err = result.unwrap_err();
let err_msg = format!("{:#}", err.source);
assert!(
err_msg.to_lowercase().contains("permission denied") || err_msg.contains("EACCES"),
"Error message must include permission denied text. Got: {}",
err_msg
);
Ok(())
}
}