Skip to main content

common/
filegen.rs

1use anyhow::{Context, anyhow};
2use async_recursion::async_recursion;
3use tracing::instrument;
4
5use crate::progress;
6
7/// Error type for filegen operations. See [`crate::error::OperationError`] for
8/// logging conventions and rationale.
9pub type Error = crate::error::OperationError<Summary>;
10
11#[derive(Copy, Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
12pub struct Summary {
13    pub files_created: usize,
14    pub directories_created: usize,
15    pub bytes_written: u64,
16}
17
18impl std::ops::Add for Summary {
19    type Output = Self;
20    fn add(self, other: Self) -> Self {
21        Self {
22            files_created: self.files_created + other.files_created,
23            directories_created: self.directories_created + other.directories_created,
24            bytes_written: self.bytes_written + other.bytes_written,
25        }
26    }
27}
28
29impl std::fmt::Display for Summary {
30    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
31        write!(
32            f,
33            "files created: {}\n\
34            directories created: {}\n\
35            bytes written: {}",
36            self.files_created,
37            self.directories_created,
38            bytesize::ByteSize(self.bytes_written)
39        )
40    }
41}
42
43/// Configuration for file generation
44#[derive(Debug, Clone)]
45pub struct FileGenConfig {
46    /// Root directory for file generation
47    pub root: std::path::PathBuf,
48    /// Directory width at each level
49    pub dirwidth: Vec<usize>,
50    /// Number of files to generate at each leaf
51    pub numfiles: usize,
52    /// Size of each file in bytes
53    pub filesize: usize,
54    /// Write buffer size in bytes
55    pub writebuf: usize,
56    /// Chunk size for I/O throttling
57    pub chunk_size: u64,
58    /// Whether to generate files at leaf directories only
59    pub leaf_files: bool,
60}
61
62impl FileGenConfig {
63    /// Create a new file generation configuration
64    pub fn new(
65        root: impl Into<std::path::PathBuf>,
66        dirwidth: Vec<usize>,
67        numfiles: usize,
68        filesize: usize,
69    ) -> Self {
70        Self {
71            root: root.into(),
72            dirwidth,
73            numfiles,
74            filesize,
75            writebuf: 1024 * 1024, // 1MB default
76            chunk_size: 0,
77            leaf_files: false,
78        }
79    }
80}
81
82#[instrument(skip(prog_track))]
83pub async fn write_file(
84    prog_track: &'static progress::Progress,
85    path: std::path::PathBuf,
86    mut filesize: usize,
87    bufsize: usize,
88    chunk_size: u64,
89) -> Result<Summary, Error> {
90    use tokio::io::AsyncWriteExt;
91    let _permit = throttle::open_file_permit().await;
92    throttle::get_file_iops_tokens(chunk_size, filesize as u64).await;
93    let _ops_guard = prog_track.ops.guard();
94    let original_filesize = filesize;
95    let mut bytes = vec![0u8; bufsize];
96    // The file open is the single metadata syscall in this path; wrap it
97    // with the cwnd permit + probe so filegen participates in the same
98    // adaptive control loop as copy/rm/link. Use the `_no_rate` variant
99    // because filegen gates the ops-throttle at task-spawn time (see
100    // `filegen` below) — going through the rate-gating helper here
101    // would consume two tokens per file and halve the effective rate.
102    let mut file = crate::walk::run_metadata_probed_no_rate(
103        congestion::Side::Destination,
104        congestion::MetadataOp::OpenCreate,
105        tokio::fs::OpenOptions::new()
106            .write(true)
107            .create(true)
108            .truncate(false)
109            .open(&path),
110    )
111    .await
112    .with_context(|| format!("Error opening {:?}", &path))
113    .map_err(|err| Error::new(err, Default::default()))?;
114    while filesize > 0 {
115        {
116            // make sure rng falls out of scope before await
117            rand::fill(&mut bytes[..]);
118        }
119        let writesize = std::cmp::min(filesize, bufsize);
120        file.write_all(&bytes[..writesize])
121            .await
122            .with_context(|| format!("Error writing to {:?}", &path))
123            .map_err(|err| Error::new(err, Default::default()))?;
124        filesize -= writesize;
125        prog_track.bytes_copied.add(writesize as u64);
126    }
127    prog_track.files_copied.inc();
128    Ok(Summary {
129        files_created: 1,
130        bytes_written: original_filesize as u64,
131        ..Default::default()
132    })
133}
134
135#[async_recursion]
136#[instrument(skip(prog_track))]
137pub async fn filegen(
138    prog_track: &'static progress::Progress,
139    config: &FileGenConfig,
140) -> Result<Summary, Error> {
141    let FileGenConfig {
142        root,
143        dirwidth,
144        numfiles,
145        filesize,
146        writebuf,
147        chunk_size,
148        leaf_files,
149    } = config;
150    let numdirs = *dirwidth.first().unwrap_or(&0);
151    let mut join_set = tokio::task::JoinSet::new();
152    // generate directories and recurse into them
153    for i in 0..numdirs {
154        let path = root.join(format!("dir{i}"));
155        let next_dirwidth = dirwidth[1..].to_vec();
156        let recurse_config = FileGenConfig {
157            root: path.clone(),
158            dirwidth: next_dirwidth,
159            numfiles: *numfiles,
160            filesize: *filesize,
161            writebuf: *writebuf,
162            chunk_size: *chunk_size,
163            leaf_files: *leaf_files,
164        };
165        let recurse = || async move {
166            // Bracket the create_dir metadata syscall with the cwnd permit
167            // + probe so filegen participates in the same adaptive control
168            // loop as copy/rm/link.
169            crate::walk::run_metadata_probed(
170                congestion::Side::Destination,
171                congestion::MetadataOp::MkDir,
172                tokio::fs::create_dir(&path),
173            )
174            .await
175            .with_context(|| format!("Error creating directory {:?}", &path))
176            .map_err(|err| Error::new(err, Default::default()))?;
177            prog_track.directories_created.inc();
178            let dir_summary = Summary {
179                directories_created: 1,
180                ..Default::default()
181            };
182            let recurse_summary = filegen(prog_track, &recurse_config).await?;
183            Ok(dir_summary + recurse_summary)
184        };
185        join_set.spawn(recurse());
186    }
187    // generate files (only if we're not in leaf_files mode, or if we are a leaf directory)
188    // a directory is a leaf when dirwidth is empty (no more subdirectories to create)
189    let is_leaf = dirwidth.is_empty();
190    let should_generate_files = !leaf_files || is_leaf;
191    if should_generate_files {
192        for i in 0..*numfiles {
193            // it's better to await the token here so that we throttle how many tasks we spawn. the
194            // ops-throttle will never cause a deadlock (unlike max-open-files limit) so it's safe to
195            // do here.
196            throttle::get_ops_token().await;
197            let path = root.join(format!("file{i}"));
198            join_set.spawn(write_file(
199                prog_track,
200                path,
201                *filesize,
202                *writebuf,
203                *chunk_size,
204            ));
205        }
206    }
207    let mut success = true;
208    let mut last_error: Option<anyhow::Error> = None;
209    let mut filegen_summary = Summary::default();
210    while let Some(res) = join_set.join_next().await {
211        match res.map_err(|err| Error::new(err.into(), Default::default()))? {
212            Ok(summary) => filegen_summary = filegen_summary + summary,
213            Err(error) => {
214                tracing::error!("filegen: {:?} failed with: {:#}", root, &error);
215                filegen_summary = filegen_summary + error.summary;
216                if last_error.is_none() {
217                    last_error = Some(error.source);
218                }
219                success = false;
220            }
221        }
222    }
223    if !success {
224        let error = if let Some(error) = last_error {
225            error.context(format!("filegen: {:?} failed!", &root))
226        } else {
227            anyhow!("filegen: {:?} failed!", &root)
228        };
229        return Err(Error::new(error, filegen_summary));
230    }
231    Ok(filegen_summary)
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237    use crate::testutils;
238    use std::os::unix::fs::PermissionsExt;
239    use tracing_test::traced_test;
240
241    static PROGRESS: std::sync::LazyLock<progress::Progress> =
242        std::sync::LazyLock::new(progress::Progress::new);
243
244    #[tokio::test]
245    #[traced_test]
246    async fn test_basic_filegen() -> Result<(), anyhow::Error> {
247        let tmp_dir = testutils::create_temp_dir().await?;
248        let test_path = tmp_dir.as_path();
249        // generate 2 subdirectories with 3 files per directory (including root)
250        let config = FileGenConfig {
251            root: test_path.to_path_buf(),
252            dirwidth: vec![2],
253            numfiles: 3,
254            filesize: 100,
255            writebuf: 50,
256            chunk_size: 0,
257            leaf_files: false,
258        };
259        let summary = filegen(&PROGRESS, &config).await?;
260        // verify summary
261        // files: 3 (in root) + 3 (in dir0) + 3 (in dir1) = 9 files
262        // directories: 2 (dir0, dir1)
263        // bytes: 100 bytes × 9 files = 900 bytes
264        assert_eq!(summary.files_created, 9);
265        assert_eq!(summary.directories_created, 2);
266        assert_eq!(summary.bytes_written, 900);
267        // verify files were actually created
268        assert!(test_path.join("file0").exists()); // root level files
269        assert!(test_path.join("dir0").join("file0").exists());
270        assert!(test_path.join("dir0").join("file1").exists());
271        assert!(test_path.join("dir0").join("file2").exists());
272        assert!(test_path.join("dir1").join("file0").exists());
273        assert!(test_path.join("dir1").join("file1").exists());
274        assert!(test_path.join("dir1").join("file2").exists());
275        // verify file sizes
276        let metadata = tokio::fs::metadata(test_path.join("dir0").join("file0")).await?;
277        assert_eq!(metadata.len(), 100);
278        // cleanup
279        tokio::fs::remove_dir_all(test_path).await?;
280        Ok(())
281    }
282
283    #[tokio::test]
284    #[traced_test]
285    async fn test_nested_filegen() -> Result<(), anyhow::Error> {
286        let tmp_dir = testutils::create_temp_dir().await?;
287        let test_path = tmp_dir.as_path();
288        // generate nested structure: 2 top-level dirs, each with 3 subdirs, 4 files per dir, 50 bytes each
289        let config = FileGenConfig {
290            root: test_path.to_path_buf(),
291            dirwidth: vec![2, 3],
292            numfiles: 4,
293            filesize: 50,
294            writebuf: 25,
295            chunk_size: 0,
296            leaf_files: false,
297        };
298        let summary = filegen(&PROGRESS, &config).await?;
299        // calculate expected values:
300        // directories: 2 top-level + (2 × 3) subdirs = 8 total
301        // files: 4 (in root) + 4×2 (in dir0, dir1) + 4×2×3 (in all leaf dirs) = 4 + 8 + 24 = 36 files
302        // bytes: 50 bytes × 36 files = 1800 bytes
303        assert_eq!(summary.files_created, 36);
304        assert_eq!(summary.directories_created, 8);
305        assert_eq!(summary.bytes_written, 1800);
306        // spot check some files exist
307        assert!(test_path.join("file0").exists()); // root files
308        assert!(test_path.join("dir0").join("file0").exists()); // top-level dir files
309        assert!(test_path.join("dir0").join("dir0").join("file0").exists());
310        assert!(test_path.join("dir0").join("dir2").join("file3").exists());
311        assert!(test_path.join("dir1").join("dir1").join("file2").exists());
312        // cleanup
313        tokio::fs::remove_dir_all(test_path).await?;
314        Ok(())
315    }
316
317    #[tokio::test]
318    #[traced_test]
319    async fn test_deeply_nested_filegen() -> Result<(), anyhow::Error> {
320        let tmp_dir = testutils::create_temp_dir().await?;
321        let test_path = tmp_dir.as_path();
322        // generate 3 levels: 2,2,2 with 2 files each, 10 bytes per file
323        let config = FileGenConfig {
324            root: test_path.to_path_buf(),
325            dirwidth: vec![2, 2, 2],
326            numfiles: 2,
327            filesize: 10,
328            writebuf: 10,
329            chunk_size: 0,
330            leaf_files: false,
331        };
332        let summary = filegen(&PROGRESS, &config).await?;
333        // directories: 2 + (2×2) + (2×2×2) = 2 + 4 + 8 = 14 dirs
334        // files: 2 (root) + 2×2 (level 1) + 2×2×2 (level 2) + 2×2×2×2 (level 3) = 2 + 4 + 8 + 16 = 30 files
335        // bytes: 10 bytes × 30 files = 300 bytes
336        assert_eq!(summary.files_created, 30);
337        assert_eq!(summary.directories_created, 14);
338        assert_eq!(summary.bytes_written, 300);
339        // verify deep nesting works
340        assert!(test_path.join("file0").exists()); // root files
341        assert!(
342            test_path
343                .join("dir0")
344                .join("dir0")
345                .join("dir0")
346                .join("file0")
347                .exists()
348        );
349        assert!(
350            test_path
351                .join("dir1")
352                .join("dir1")
353                .join("dir1")
354                .join("file1")
355                .exists()
356        );
357        // cleanup
358        tokio::fs::remove_dir_all(test_path).await?;
359        Ok(())
360    }
361
362    #[tokio::test]
363    #[traced_test]
364    async fn test_single_file() -> Result<(), anyhow::Error> {
365        let tmp_dir = testutils::create_temp_dir().await?;
366        let test_path = tmp_dir.as_path();
367        // generate just files, no directories
368        let config = FileGenConfig {
369            root: test_path.to_path_buf(),
370            dirwidth: vec![],
371            numfiles: 5,
372            filesize: 200,
373            writebuf: 100,
374            chunk_size: 0,
375            leaf_files: false,
376        };
377        let summary = filegen(&PROGRESS, &config).await?;
378        assert_eq!(summary.files_created, 5);
379        assert_eq!(summary.directories_created, 0);
380        assert_eq!(summary.bytes_written, 1000); // 200 × 5
381        for i in 0..5 {
382            // verify files
383            let file_path = test_path.join(format!("file{i}"));
384            assert!(file_path.exists());
385            let metadata = tokio::fs::metadata(&file_path).await?;
386            assert_eq!(metadata.len(), 200);
387        }
388        // cleanup
389        tokio::fs::remove_dir_all(test_path).await?;
390        Ok(())
391    }
392
393    #[tokio::test]
394    #[traced_test]
395    async fn test_zero_files() -> Result<(), anyhow::Error> {
396        let tmp_dir = testutils::create_temp_dir().await?;
397        let test_path = tmp_dir.as_path();
398        // generate only directories, no files
399        let config = FileGenConfig {
400            root: test_path.to_path_buf(),
401            dirwidth: vec![3, 2],
402            numfiles: 0,
403            filesize: 100,
404            writebuf: 50,
405            chunk_size: 0,
406            leaf_files: false,
407        };
408        let summary = filegen(&PROGRESS, &config).await?;
409        // directories: 3 + (3×2) = 9 dirs
410        assert_eq!(summary.files_created, 0);
411        assert_eq!(summary.directories_created, 9);
412        assert_eq!(summary.bytes_written, 0);
413        // verify directories exist but no files
414        assert!(test_path.join("dir0").join("dir0").exists());
415        assert!(test_path.join("dir2").join("dir1").exists());
416        assert!(!test_path.join("dir0").join("file0").exists());
417        // cleanup
418        tokio::fs::remove_dir_all(test_path).await?;
419        Ok(())
420    }
421
422    #[tokio::test]
423    #[traced_test]
424    async fn test_leaf_files_only() -> Result<(), anyhow::Error> {
425        let tmp_dir = testutils::create_temp_dir().await?;
426        let test_path = tmp_dir.as_path();
427        // generate with leaf_files=true, meaning files only in deepest directories
428        let config = FileGenConfig {
429            root: test_path.to_path_buf(),
430            dirwidth: vec![2, 3],
431            numfiles: 4,
432            filesize: 50,
433            writebuf: 25,
434            chunk_size: 0,
435            leaf_files: true,
436        };
437        let summary = filegen(&PROGRESS, &config).await?;
438        // directories: 2 top-level + (2 × 3) subdirs = 8 total
439        // files: ONLY in leaf dirs (6 leaf dirs) × 4 files each = 24 files
440        // bytes: 50 bytes × 24 files = 1200 bytes
441        assert_eq!(summary.files_created, 24);
442        assert_eq!(summary.directories_created, 8);
443        assert_eq!(summary.bytes_written, 1200);
444        // verify NO files in root or intermediate directories
445        assert!(!test_path.join("file0").exists()); // no root files
446        assert!(!test_path.join("dir0").join("file0").exists()); // no intermediate files
447        assert!(!test_path.join("dir1").join("file0").exists());
448        // verify files ONLY in leaf directories
449        assert!(test_path.join("dir0").join("dir0").join("file0").exists());
450        assert!(test_path.join("dir0").join("dir0").join("file3").exists());
451        assert!(test_path.join("dir0").join("dir2").join("file0").exists());
452        assert!(test_path.join("dir1").join("dir1").join("file0").exists());
453        // cleanup
454        tokio::fs::remove_dir_all(test_path).await?;
455        Ok(())
456    }
457
458    #[tokio::test]
459    #[traced_test]
460    async fn test_permission_error_includes_root_cause() -> Result<(), anyhow::Error> {
461        let tmp_dir = testutils::create_temp_dir().await?;
462        let root = tmp_dir.join("readonly");
463        tokio::fs::create_dir(&root).await?;
464        tokio::fs::set_permissions(&root, std::fs::Permissions::from_mode(0o555)).await?;
465
466        let config = FileGenConfig {
467            root: root.clone(),
468            dirwidth: Vec::new(),
469            numfiles: 1,
470            filesize: 10,
471            writebuf: 10,
472            chunk_size: 0,
473            leaf_files: false,
474        };
475        let result = filegen(&PROGRESS, &config).await;
476
477        // restore permissions to allow cleanup
478        tokio::fs::set_permissions(&root, std::fs::Permissions::from_mode(0o755)).await?;
479
480        assert!(
481            result.is_err(),
482            "filegen inside read-only directory should fail"
483        );
484        let err = result.unwrap_err();
485        let err_msg = format!("{:#}", err.source);
486        assert!(
487            err_msg.to_lowercase().contains("permission denied") || err_msg.contains("EACCES"),
488            "Error message must include permission denied text. Got: {}",
489            err_msg
490        );
491        Ok(())
492    }
493}