common/
filegen.rs

1use anyhow::{anyhow, Context};
2use async_recursion::async_recursion;
3use tracing::instrument;
4
5use crate::progress;
6
7#[derive(Debug, thiserror::Error)]
8#[error("{source}")]
9pub struct Error {
10    #[source]
11    pub source: anyhow::Error,
12    pub summary: Summary,
13}
14
15impl Error {
16    #[must_use]
17    pub fn new(source: anyhow::Error, summary: Summary) -> Self {
18        Error { source, summary }
19    }
20}
21
22#[derive(Copy, Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
23pub struct Summary {
24    pub files_created: usize,
25    pub directories_created: usize,
26    pub bytes_written: u64,
27}
28
29impl std::ops::Add for Summary {
30    type Output = Self;
31    fn add(self, other: Self) -> Self {
32        Self {
33            files_created: self.files_created + other.files_created,
34            directories_created: self.directories_created + other.directories_created,
35            bytes_written: self.bytes_written + other.bytes_written,
36        }
37    }
38}
39
40impl std::fmt::Display for Summary {
41    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
42        write!(
43            f,
44            "files created: {}\n\
45            directories created: {}\n\
46            bytes written: {}",
47            self.files_created,
48            self.directories_created,
49            bytesize::ByteSize(self.bytes_written)
50        )
51    }
52}
53
54#[instrument(skip(prog_track))]
55pub async fn write_file(
56    prog_track: &'static progress::Progress,
57    path: std::path::PathBuf,
58    mut filesize: usize,
59    bufsize: usize,
60    chunk_size: u64,
61) -> Result<Summary, Error> {
62    use rand::Rng;
63    use tokio::io::AsyncWriteExt;
64    let _permit = throttle::open_file_permit().await;
65    throttle::get_file_iops_tokens(chunk_size, filesize as u64).await;
66    let _ops_guard = prog_track.ops.guard();
67    let original_filesize = filesize;
68    let mut bytes = vec![0u8; bufsize];
69    let mut file = tokio::fs::OpenOptions::new()
70        .write(true)
71        .create(true)
72        .truncate(false)
73        .open(&path)
74        .await
75        .with_context(|| format!("Error opening {:?}", &path))
76        .map_err(|err| Error::new(anyhow::Error::msg(err), Default::default()))?;
77    while filesize > 0 {
78        {
79            // make sure rng falls out of scope before await
80            let mut rng = rand::thread_rng();
81            rng.fill(&mut bytes[..]);
82        }
83        let writesize = std::cmp::min(filesize, bufsize);
84        file.write_all(&bytes[..writesize])
85            .await
86            .with_context(|| format!("Error writing to {:?}", &path))
87            .map_err(|err| Error::new(anyhow::Error::msg(err), Default::default()))?;
88        filesize -= writesize;
89    }
90    prog_track.files_copied.inc();
91    prog_track.bytes_copied.add(original_filesize as u64);
92    Ok(Summary {
93        files_created: 1,
94        bytes_written: original_filesize as u64,
95        ..Default::default()
96    })
97}
98
99#[async_recursion]
100#[instrument(skip(prog_track))]
101#[allow(clippy::too_many_arguments)]
102pub async fn filegen(
103    prog_track: &'static progress::Progress,
104    root: &std::path::Path,
105    dirwidth: &[usize],
106    numfiles: usize,
107    filesize: usize,
108    writebuf: usize,
109    chunk_size: u64,
110    leaf_files: bool,
111) -> Result<Summary, Error> {
112    let numdirs = *dirwidth.first().unwrap_or(&0);
113    let mut join_set = tokio::task::JoinSet::new();
114    // generate directories and recurse into them
115    for i in 0..numdirs {
116        let path = root.join(format!("dir{i}"));
117        let dirwidth = dirwidth[1..].to_owned();
118        let recurse = || async move {
119            tokio::fs::create_dir(&path)
120                .await
121                .with_context(|| format!("Error creating directory {:?}", &path))
122                .map_err(|err| Error::new(anyhow::Error::msg(err), Default::default()))?;
123            prog_track.directories_created.inc();
124            let dir_summary = Summary {
125                directories_created: 1,
126                ..Default::default()
127            };
128            let recurse_summary = filegen(
129                prog_track, &path, &dirwidth, numfiles, filesize, writebuf, chunk_size, leaf_files,
130            )
131            .await?;
132            Ok(dir_summary + recurse_summary)
133        };
134        join_set.spawn(recurse());
135    }
136    // generate files (only if we're not in leaf_files mode, or if we are a leaf directory)
137    // a directory is a leaf when dirwidth is empty (no more subdirectories to create)
138    let is_leaf = dirwidth.is_empty();
139    let should_generate_files = !leaf_files || is_leaf;
140    if should_generate_files {
141        for i in 0..numfiles {
142            // it's better to await the token here so that we throttle how many tasks we spawn. the
143            // ops-throttle will never cause a deadlock (unlike max-open-files limit) so it's safe to
144            // do here.
145            throttle::get_ops_token().await;
146            let path = root.join(format!("file{i}"));
147            join_set.spawn(write_file(prog_track, path, filesize, writebuf, chunk_size));
148        }
149    }
150    let mut success = true;
151    let mut filegen_summary = Summary::default();
152    while let Some(res) = join_set.join_next().await {
153        match res.map_err(|err| Error::new(anyhow::Error::msg(err), Default::default()))? {
154            Ok(summary) => filegen_summary = filegen_summary + summary,
155            Err(error) => {
156                tracing::error!("filegen: {:?} failed with: {:?}", root, &error);
157                filegen_summary = filegen_summary + error.summary;
158                success = false;
159            }
160        }
161    }
162    if !success {
163        return Err(Error::new(
164            anyhow!("filegen: {:?} failed!", &root),
165            filegen_summary,
166        ));
167    }
168    Ok(filegen_summary)
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174    use crate::testutils;
175    use tracing_test::traced_test;
176
177    lazy_static! {
178        static ref PROGRESS: progress::Progress = progress::Progress::new();
179    }
180
181    #[tokio::test]
182    #[traced_test]
183    async fn test_basic_filegen() -> Result<(), anyhow::Error> {
184        let tmp_dir = testutils::create_temp_dir().await?;
185        let test_path = tmp_dir.as_path();
186        // generate 2 subdirectories with 3 files per directory (including root)
187        let summary = filegen(
188            &PROGRESS,
189            test_path,
190            &[2],
191            3,
192            100,
193            50,    // buffer size
194            0,     // chunk_size
195            false, // leaf_files
196        )
197        .await?;
198        // verify summary
199        // files: 3 (in root) + 3 (in dir0) + 3 (in dir1) = 9 files
200        // directories: 2 (dir0, dir1)
201        // bytes: 100 bytes × 9 files = 900 bytes
202        assert_eq!(summary.files_created, 9);
203        assert_eq!(summary.directories_created, 2);
204        assert_eq!(summary.bytes_written, 900);
205        // verify files were actually created
206        assert!(test_path.join("file0").exists()); // root level files
207        assert!(test_path.join("dir0").join("file0").exists());
208        assert!(test_path.join("dir0").join("file1").exists());
209        assert!(test_path.join("dir0").join("file2").exists());
210        assert!(test_path.join("dir1").join("file0").exists());
211        assert!(test_path.join("dir1").join("file1").exists());
212        assert!(test_path.join("dir1").join("file2").exists());
213        // verify file sizes
214        let metadata = tokio::fs::metadata(test_path.join("dir0").join("file0")).await?;
215        assert_eq!(metadata.len(), 100);
216        // cleanup
217        tokio::fs::remove_dir_all(test_path).await?;
218        Ok(())
219    }
220
221    #[tokio::test]
222    #[traced_test]
223    async fn test_nested_filegen() -> Result<(), anyhow::Error> {
224        let tmp_dir = testutils::create_temp_dir().await?;
225        let test_path = tmp_dir.as_path();
226        // generate nested structure: 2 top-level dirs, each with 3 subdirs, 4 files per dir, 50 bytes each
227        let summary = filegen(
228            &PROGRESS,
229            test_path,
230            &[2, 3],
231            4,
232            50,
233            25,    // buffer size
234            0,     // chunk_size
235            false, // leaf_files
236        )
237        .await?;
238        // calculate expected values:
239        // directories: 2 top-level + (2 × 3) subdirs = 8 total
240        // files: 4 (in root) + 4×2 (in dir0, dir1) + 4×2×3 (in all leaf dirs) = 4 + 8 + 24 = 36 files
241        // bytes: 50 bytes × 36 files = 1800 bytes
242        assert_eq!(summary.files_created, 36);
243        assert_eq!(summary.directories_created, 8);
244        assert_eq!(summary.bytes_written, 1800);
245        // spot check some files exist
246        assert!(test_path.join("file0").exists()); // root files
247        assert!(test_path.join("dir0").join("file0").exists()); // top-level dir files
248        assert!(test_path.join("dir0").join("dir0").join("file0").exists());
249        assert!(test_path.join("dir0").join("dir2").join("file3").exists());
250        assert!(test_path.join("dir1").join("dir1").join("file2").exists());
251        // cleanup
252        tokio::fs::remove_dir_all(test_path).await?;
253        Ok(())
254    }
255
256    #[tokio::test]
257    #[traced_test]
258    async fn test_deeply_nested_filegen() -> Result<(), anyhow::Error> {
259        let tmp_dir = testutils::create_temp_dir().await?;
260        let test_path = tmp_dir.as_path();
261        // generate 3 levels: 2,2,2 with 2 files each, 10 bytes per file
262        let summary = filegen(
263            &PROGRESS,
264            test_path,
265            &[2, 2, 2],
266            2,
267            10,
268            10,    // buffer size
269            0,     // chunk_size
270            false, // leaf_files
271        )
272        .await?;
273        // directories: 2 + (2×2) + (2×2×2) = 2 + 4 + 8 = 14 dirs
274        // files: 2 (root) + 2×2 (level 1) + 2×2×2 (level 2) + 2×2×2×2 (level 3) = 2 + 4 + 8 + 16 = 30 files
275        // bytes: 10 bytes × 30 files = 300 bytes
276        assert_eq!(summary.files_created, 30);
277        assert_eq!(summary.directories_created, 14);
278        assert_eq!(summary.bytes_written, 300);
279        // verify deep nesting works
280        assert!(test_path.join("file0").exists()); // root files
281        assert!(test_path
282            .join("dir0")
283            .join("dir0")
284            .join("dir0")
285            .join("file0")
286            .exists());
287        assert!(test_path
288            .join("dir1")
289            .join("dir1")
290            .join("dir1")
291            .join("file1")
292            .exists());
293        // cleanup
294        tokio::fs::remove_dir_all(test_path).await?;
295        Ok(())
296    }
297
298    #[tokio::test]
299    #[traced_test]
300    async fn test_single_file() -> Result<(), anyhow::Error> {
301        let tmp_dir = testutils::create_temp_dir().await?;
302        let test_path = tmp_dir.as_path();
303        // generate just files, no directories
304        let summary = filegen(
305            &PROGRESS,
306            test_path,
307            &[],   // no subdirectories
308            5,     // 5 files
309            200,   // 200 bytes each
310            100,   // buffer size
311            0,     // chunk_size
312            false, // leaf_files
313        )
314        .await?;
315        assert_eq!(summary.files_created, 5);
316        assert_eq!(summary.directories_created, 0);
317        assert_eq!(summary.bytes_written, 1000); // 200 × 5
318        for i in 0..5 {
319            // verify files
320            let file_path = test_path.join(format!("file{i}"));
321            assert!(file_path.exists());
322            let metadata = tokio::fs::metadata(&file_path).await?;
323            assert_eq!(metadata.len(), 200);
324        }
325        // cleanup
326        tokio::fs::remove_dir_all(test_path).await?;
327        Ok(())
328    }
329
330    #[tokio::test]
331    #[traced_test]
332    async fn test_zero_files() -> Result<(), anyhow::Error> {
333        let tmp_dir = testutils::create_temp_dir().await?;
334        let test_path = tmp_dir.as_path();
335        // generate only directories, no files
336        let summary = filegen(
337            &PROGRESS,
338            test_path,
339            &[3, 2], // 3 top-level, each with 2 subdirs
340            0,       // 0 files
341            100,
342            50,
343            0,
344            false, // leaf_files
345        )
346        .await?;
347        // directories: 3 + (3×2) = 9 dirs
348        assert_eq!(summary.files_created, 0);
349        assert_eq!(summary.directories_created, 9);
350        assert_eq!(summary.bytes_written, 0);
351        // verify directories exist but no files
352        assert!(test_path.join("dir0").join("dir0").exists());
353        assert!(test_path.join("dir2").join("dir1").exists());
354        assert!(!test_path.join("dir0").join("file0").exists());
355        // cleanup
356        tokio::fs::remove_dir_all(test_path).await?;
357        Ok(())
358    }
359
360    #[tokio::test]
361    #[traced_test]
362    async fn test_leaf_files_only() -> Result<(), anyhow::Error> {
363        let tmp_dir = testutils::create_temp_dir().await?;
364        let test_path = tmp_dir.as_path();
365        // generate with leaf_files=true, meaning files only in deepest directories
366        let summary = filegen(
367            &PROGRESS,
368            test_path,
369            &[2, 3],
370            4,
371            50,
372            25,   // buffer size
373            0,    // chunk_size
374            true, // leaf_files - only generate files in leaf directories
375        )
376        .await?;
377        // directories: 2 top-level + (2 × 3) subdirs = 8 total
378        // files: ONLY in leaf dirs (6 leaf dirs) × 4 files each = 24 files
379        // bytes: 50 bytes × 24 files = 1200 bytes
380        assert_eq!(summary.files_created, 24);
381        assert_eq!(summary.directories_created, 8);
382        assert_eq!(summary.bytes_written, 1200);
383        // verify NO files in root or intermediate directories
384        assert!(!test_path.join("file0").exists()); // no root files
385        assert!(!test_path.join("dir0").join("file0").exists()); // no intermediate files
386        assert!(!test_path.join("dir1").join("file0").exists());
387        // verify files ONLY in leaf directories
388        assert!(test_path.join("dir0").join("dir0").join("file0").exists());
389        assert!(test_path.join("dir0").join("dir0").join("file3").exists());
390        assert!(test_path.join("dir0").join("dir2").join("file0").exists());
391        assert!(test_path.join("dir1").join("dir1").join("file0").exists());
392        // cleanup
393        tokio::fs::remove_dir_all(test_path).await?;
394        Ok(())
395    }
396}