1use anyhow::{Context, anyhow};
2use async_recursion::async_recursion;
3use tracing::instrument;
4
5use crate::progress;
6
7pub type Error = crate::error::OperationError<Summary>;
10
11#[derive(Copy, Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
12pub struct Summary {
13 pub files_created: usize,
14 pub directories_created: usize,
15 pub bytes_written: u64,
16}
17
18impl std::ops::Add for Summary {
19 type Output = Self;
20 fn add(self, other: Self) -> Self {
21 Self {
22 files_created: self.files_created + other.files_created,
23 directories_created: self.directories_created + other.directories_created,
24 bytes_written: self.bytes_written + other.bytes_written,
25 }
26 }
27}
28
29impl std::fmt::Display for Summary {
30 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
31 write!(
32 f,
33 "files created: {}\n\
34 directories created: {}\n\
35 bytes written: {}",
36 self.files_created,
37 self.directories_created,
38 bytesize::ByteSize(self.bytes_written)
39 )
40 }
41}
42
43#[derive(Debug, Clone)]
45pub struct FileGenConfig {
46 pub root: std::path::PathBuf,
48 pub dirwidth: Vec<usize>,
50 pub numfiles: usize,
52 pub filesize: usize,
54 pub writebuf: usize,
56 pub chunk_size: u64,
58 pub leaf_files: bool,
60}
61
62impl FileGenConfig {
63 pub fn new(
65 root: impl Into<std::path::PathBuf>,
66 dirwidth: Vec<usize>,
67 numfiles: usize,
68 filesize: usize,
69 ) -> Self {
70 Self {
71 root: root.into(),
72 dirwidth,
73 numfiles,
74 filesize,
75 writebuf: 1024 * 1024, chunk_size: 0,
77 leaf_files: false,
78 }
79 }
80}
81
82#[instrument(skip(prog_track))]
83pub async fn write_file(
84 prog_track: &'static progress::Progress,
85 path: std::path::PathBuf,
86 mut filesize: usize,
87 bufsize: usize,
88 chunk_size: u64,
89) -> Result<Summary, Error> {
90 use tokio::io::AsyncWriteExt;
91 let _permit = throttle::open_file_permit().await;
92 throttle::get_file_iops_tokens(chunk_size, filesize as u64).await;
93 let _ops_guard = prog_track.ops.guard();
94 let original_filesize = filesize;
95 let mut bytes = vec![0u8; bufsize];
96 let mut file = crate::walk::run_metadata_probed_no_rate(
103 congestion::Side::Destination,
104 congestion::MetadataOp::OpenCreate,
105 tokio::fs::OpenOptions::new()
106 .write(true)
107 .create(true)
108 .truncate(false)
109 .open(&path),
110 )
111 .await
112 .with_context(|| format!("Error opening {:?}", &path))
113 .map_err(|err| Error::new(err, Default::default()))?;
114 while filesize > 0 {
115 {
116 rand::fill(&mut bytes[..]);
118 }
119 let writesize = std::cmp::min(filesize, bufsize);
120 file.write_all(&bytes[..writesize])
121 .await
122 .with_context(|| format!("Error writing to {:?}", &path))
123 .map_err(|err| Error::new(err, Default::default()))?;
124 filesize -= writesize;
125 prog_track.bytes_copied.add(writesize as u64);
126 }
127 prog_track.files_copied.inc();
128 Ok(Summary {
129 files_created: 1,
130 bytes_written: original_filesize as u64,
131 ..Default::default()
132 })
133}
134
135#[async_recursion]
136#[instrument(skip(prog_track))]
137pub async fn filegen(
138 prog_track: &'static progress::Progress,
139 config: &FileGenConfig,
140) -> Result<Summary, Error> {
141 let FileGenConfig {
142 root,
143 dirwidth,
144 numfiles,
145 filesize,
146 writebuf,
147 chunk_size,
148 leaf_files,
149 } = config;
150 let numdirs = *dirwidth.first().unwrap_or(&0);
151 let mut join_set = tokio::task::JoinSet::new();
152 for i in 0..numdirs {
154 let path = root.join(format!("dir{i}"));
155 let next_dirwidth = dirwidth[1..].to_vec();
156 let recurse_config = FileGenConfig {
157 root: path.clone(),
158 dirwidth: next_dirwidth,
159 numfiles: *numfiles,
160 filesize: *filesize,
161 writebuf: *writebuf,
162 chunk_size: *chunk_size,
163 leaf_files: *leaf_files,
164 };
165 let recurse = || async move {
166 crate::walk::run_metadata_probed(
170 congestion::Side::Destination,
171 congestion::MetadataOp::MkDir,
172 tokio::fs::create_dir(&path),
173 )
174 .await
175 .with_context(|| format!("Error creating directory {:?}", &path))
176 .map_err(|err| Error::new(err, Default::default()))?;
177 prog_track.directories_created.inc();
178 let dir_summary = Summary {
179 directories_created: 1,
180 ..Default::default()
181 };
182 let recurse_summary = filegen(prog_track, &recurse_config).await?;
183 Ok(dir_summary + recurse_summary)
184 };
185 join_set.spawn(recurse());
186 }
187 let is_leaf = dirwidth.is_empty();
190 let should_generate_files = !leaf_files || is_leaf;
191 if should_generate_files {
192 for i in 0..*numfiles {
193 throttle::get_ops_token().await;
197 let path = root.join(format!("file{i}"));
198 join_set.spawn(write_file(
199 prog_track,
200 path,
201 *filesize,
202 *writebuf,
203 *chunk_size,
204 ));
205 }
206 }
207 let mut success = true;
208 let mut last_error: Option<anyhow::Error> = None;
209 let mut filegen_summary = Summary::default();
210 while let Some(res) = join_set.join_next().await {
211 match res.map_err(|err| Error::new(err.into(), Default::default()))? {
212 Ok(summary) => filegen_summary = filegen_summary + summary,
213 Err(error) => {
214 tracing::error!("filegen: {:?} failed with: {:#}", root, &error);
215 filegen_summary = filegen_summary + error.summary;
216 if last_error.is_none() {
217 last_error = Some(error.source);
218 }
219 success = false;
220 }
221 }
222 }
223 if !success {
224 let error = if let Some(error) = last_error {
225 error.context(format!("filegen: {:?} failed!", &root))
226 } else {
227 anyhow!("filegen: {:?} failed!", &root)
228 };
229 return Err(Error::new(error, filegen_summary));
230 }
231 Ok(filegen_summary)
232}
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237 use crate::testutils;
238 use std::os::unix::fs::PermissionsExt;
239 use tracing_test::traced_test;
240
241 static PROGRESS: std::sync::LazyLock<progress::Progress> =
242 std::sync::LazyLock::new(progress::Progress::new);
243
244 #[tokio::test]
245 #[traced_test]
246 async fn test_basic_filegen() -> Result<(), anyhow::Error> {
247 let tmp_dir = testutils::create_temp_dir().await?;
248 let test_path = tmp_dir.as_path();
249 let config = FileGenConfig {
251 root: test_path.to_path_buf(),
252 dirwidth: vec![2],
253 numfiles: 3,
254 filesize: 100,
255 writebuf: 50,
256 chunk_size: 0,
257 leaf_files: false,
258 };
259 let summary = filegen(&PROGRESS, &config).await?;
260 assert_eq!(summary.files_created, 9);
265 assert_eq!(summary.directories_created, 2);
266 assert_eq!(summary.bytes_written, 900);
267 assert!(test_path.join("file0").exists()); assert!(test_path.join("dir0").join("file0").exists());
270 assert!(test_path.join("dir0").join("file1").exists());
271 assert!(test_path.join("dir0").join("file2").exists());
272 assert!(test_path.join("dir1").join("file0").exists());
273 assert!(test_path.join("dir1").join("file1").exists());
274 assert!(test_path.join("dir1").join("file2").exists());
275 let metadata = tokio::fs::metadata(test_path.join("dir0").join("file0")).await?;
277 assert_eq!(metadata.len(), 100);
278 tokio::fs::remove_dir_all(test_path).await?;
280 Ok(())
281 }
282
283 #[tokio::test]
284 #[traced_test]
285 async fn test_nested_filegen() -> Result<(), anyhow::Error> {
286 let tmp_dir = testutils::create_temp_dir().await?;
287 let test_path = tmp_dir.as_path();
288 let config = FileGenConfig {
290 root: test_path.to_path_buf(),
291 dirwidth: vec![2, 3],
292 numfiles: 4,
293 filesize: 50,
294 writebuf: 25,
295 chunk_size: 0,
296 leaf_files: false,
297 };
298 let summary = filegen(&PROGRESS, &config).await?;
299 assert_eq!(summary.files_created, 36);
304 assert_eq!(summary.directories_created, 8);
305 assert_eq!(summary.bytes_written, 1800);
306 assert!(test_path.join("file0").exists()); assert!(test_path.join("dir0").join("file0").exists()); assert!(test_path.join("dir0").join("dir0").join("file0").exists());
310 assert!(test_path.join("dir0").join("dir2").join("file3").exists());
311 assert!(test_path.join("dir1").join("dir1").join("file2").exists());
312 tokio::fs::remove_dir_all(test_path).await?;
314 Ok(())
315 }
316
317 #[tokio::test]
318 #[traced_test]
319 async fn test_deeply_nested_filegen() -> Result<(), anyhow::Error> {
320 let tmp_dir = testutils::create_temp_dir().await?;
321 let test_path = tmp_dir.as_path();
322 let config = FileGenConfig {
324 root: test_path.to_path_buf(),
325 dirwidth: vec![2, 2, 2],
326 numfiles: 2,
327 filesize: 10,
328 writebuf: 10,
329 chunk_size: 0,
330 leaf_files: false,
331 };
332 let summary = filegen(&PROGRESS, &config).await?;
333 assert_eq!(summary.files_created, 30);
337 assert_eq!(summary.directories_created, 14);
338 assert_eq!(summary.bytes_written, 300);
339 assert!(test_path.join("file0").exists()); assert!(
342 test_path
343 .join("dir0")
344 .join("dir0")
345 .join("dir0")
346 .join("file0")
347 .exists()
348 );
349 assert!(
350 test_path
351 .join("dir1")
352 .join("dir1")
353 .join("dir1")
354 .join("file1")
355 .exists()
356 );
357 tokio::fs::remove_dir_all(test_path).await?;
359 Ok(())
360 }
361
362 #[tokio::test]
363 #[traced_test]
364 async fn test_single_file() -> Result<(), anyhow::Error> {
365 let tmp_dir = testutils::create_temp_dir().await?;
366 let test_path = tmp_dir.as_path();
367 let config = FileGenConfig {
369 root: test_path.to_path_buf(),
370 dirwidth: vec![],
371 numfiles: 5,
372 filesize: 200,
373 writebuf: 100,
374 chunk_size: 0,
375 leaf_files: false,
376 };
377 let summary = filegen(&PROGRESS, &config).await?;
378 assert_eq!(summary.files_created, 5);
379 assert_eq!(summary.directories_created, 0);
380 assert_eq!(summary.bytes_written, 1000); for i in 0..5 {
382 let file_path = test_path.join(format!("file{i}"));
384 assert!(file_path.exists());
385 let metadata = tokio::fs::metadata(&file_path).await?;
386 assert_eq!(metadata.len(), 200);
387 }
388 tokio::fs::remove_dir_all(test_path).await?;
390 Ok(())
391 }
392
393 #[tokio::test]
394 #[traced_test]
395 async fn test_zero_files() -> Result<(), anyhow::Error> {
396 let tmp_dir = testutils::create_temp_dir().await?;
397 let test_path = tmp_dir.as_path();
398 let config = FileGenConfig {
400 root: test_path.to_path_buf(),
401 dirwidth: vec![3, 2],
402 numfiles: 0,
403 filesize: 100,
404 writebuf: 50,
405 chunk_size: 0,
406 leaf_files: false,
407 };
408 let summary = filegen(&PROGRESS, &config).await?;
409 assert_eq!(summary.files_created, 0);
411 assert_eq!(summary.directories_created, 9);
412 assert_eq!(summary.bytes_written, 0);
413 assert!(test_path.join("dir0").join("dir0").exists());
415 assert!(test_path.join("dir2").join("dir1").exists());
416 assert!(!test_path.join("dir0").join("file0").exists());
417 tokio::fs::remove_dir_all(test_path).await?;
419 Ok(())
420 }
421
422 #[tokio::test]
423 #[traced_test]
424 async fn test_leaf_files_only() -> Result<(), anyhow::Error> {
425 let tmp_dir = testutils::create_temp_dir().await?;
426 let test_path = tmp_dir.as_path();
427 let config = FileGenConfig {
429 root: test_path.to_path_buf(),
430 dirwidth: vec![2, 3],
431 numfiles: 4,
432 filesize: 50,
433 writebuf: 25,
434 chunk_size: 0,
435 leaf_files: true,
436 };
437 let summary = filegen(&PROGRESS, &config).await?;
438 assert_eq!(summary.files_created, 24);
442 assert_eq!(summary.directories_created, 8);
443 assert_eq!(summary.bytes_written, 1200);
444 assert!(!test_path.join("file0").exists()); assert!(!test_path.join("dir0").join("file0").exists()); assert!(!test_path.join("dir1").join("file0").exists());
448 assert!(test_path.join("dir0").join("dir0").join("file0").exists());
450 assert!(test_path.join("dir0").join("dir0").join("file3").exists());
451 assert!(test_path.join("dir0").join("dir2").join("file0").exists());
452 assert!(test_path.join("dir1").join("dir1").join("file0").exists());
453 tokio::fs::remove_dir_all(test_path).await?;
455 Ok(())
456 }
457
458 #[tokio::test]
459 #[traced_test]
460 async fn test_permission_error_includes_root_cause() -> Result<(), anyhow::Error> {
461 let tmp_dir = testutils::create_temp_dir().await?;
462 let root = tmp_dir.join("readonly");
463 tokio::fs::create_dir(&root).await?;
464 tokio::fs::set_permissions(&root, std::fs::Permissions::from_mode(0o555)).await?;
465
466 let config = FileGenConfig {
467 root: root.clone(),
468 dirwidth: Vec::new(),
469 numfiles: 1,
470 filesize: 10,
471 writebuf: 10,
472 chunk_size: 0,
473 leaf_files: false,
474 };
475 let result = filegen(&PROGRESS, &config).await;
476
477 tokio::fs::set_permissions(&root, std::fs::Permissions::from_mode(0o755)).await?;
479
480 assert!(
481 result.is_err(),
482 "filegen inside read-only directory should fail"
483 );
484 let err = result.unwrap_err();
485 let err_msg = format!("{:#}", err.source);
486 assert!(
487 err_msg.to_lowercase().contains("permission denied") || err_msg.contains("EACCES"),
488 "Error message must include permission denied text. Got: {}",
489 err_msg
490 );
491 Ok(())
492 }
493}