1use anyhow::{anyhow, Context};
2use async_recursion::async_recursion;
3use tracing::instrument;
4
5use crate::progress;
6
7#[derive(Debug, thiserror::Error)]
8#[error("{source}")]
9pub struct Error {
10 #[source]
11 pub source: anyhow::Error,
12 pub summary: Summary,
13}
14
15impl Error {
16 #[must_use]
17 pub fn new(source: anyhow::Error, summary: Summary) -> Self {
18 Error { source, summary }
19 }
20}
21
22#[derive(Copy, Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
23pub struct Summary {
24 pub files_created: usize,
25 pub directories_created: usize,
26 pub bytes_written: u64,
27}
28
29impl std::ops::Add for Summary {
30 type Output = Self;
31 fn add(self, other: Self) -> Self {
32 Self {
33 files_created: self.files_created + other.files_created,
34 directories_created: self.directories_created + other.directories_created,
35 bytes_written: self.bytes_written + other.bytes_written,
36 }
37 }
38}
39
40impl std::fmt::Display for Summary {
41 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
42 write!(
43 f,
44 "files created: {}\n\
45 directories created: {}\n\
46 bytes written: {}",
47 self.files_created,
48 self.directories_created,
49 bytesize::ByteSize(self.bytes_written)
50 )
51 }
52}
53
54#[instrument(skip(prog_track))]
55pub async fn write_file(
56 prog_track: &'static progress::Progress,
57 path: std::path::PathBuf,
58 mut filesize: usize,
59 bufsize: usize,
60 chunk_size: u64,
61) -> Result<Summary, Error> {
62 use rand::Rng;
63 use tokio::io::AsyncWriteExt;
64 let _permit = throttle::open_file_permit().await;
65 throttle::get_file_iops_tokens(chunk_size, filesize as u64).await;
66 let _ops_guard = prog_track.ops.guard();
67 let original_filesize = filesize;
68 let mut bytes = vec![0u8; bufsize];
69 let mut file = tokio::fs::OpenOptions::new()
70 .write(true)
71 .create(true)
72 .truncate(false)
73 .open(&path)
74 .await
75 .with_context(|| format!("Error opening {:?}", &path))
76 .map_err(|err| Error::new(anyhow::Error::msg(err), Default::default()))?;
77 while filesize > 0 {
78 {
79 let mut rng = rand::thread_rng();
81 rng.fill(&mut bytes[..]);
82 }
83 let writesize = std::cmp::min(filesize, bufsize);
84 file.write_all(&bytes[..writesize])
85 .await
86 .with_context(|| format!("Error writing to {:?}", &path))
87 .map_err(|err| Error::new(anyhow::Error::msg(err), Default::default()))?;
88 filesize -= writesize;
89 }
90 prog_track.files_copied.inc();
91 prog_track.bytes_copied.add(original_filesize as u64);
92 Ok(Summary {
93 files_created: 1,
94 bytes_written: original_filesize as u64,
95 ..Default::default()
96 })
97}
98
99#[async_recursion]
100#[instrument(skip(prog_track))]
101#[allow(clippy::too_many_arguments)]
102pub async fn filegen(
103 prog_track: &'static progress::Progress,
104 root: &std::path::Path,
105 dirwidth: &[usize],
106 numfiles: usize,
107 filesize: usize,
108 writebuf: usize,
109 chunk_size: u64,
110 leaf_files: bool,
111) -> Result<Summary, Error> {
112 let numdirs = *dirwidth.first().unwrap_or(&0);
113 let mut join_set = tokio::task::JoinSet::new();
114 for i in 0..numdirs {
116 let path = root.join(format!("dir{i}"));
117 let dirwidth = dirwidth[1..].to_owned();
118 let recurse = || async move {
119 tokio::fs::create_dir(&path)
120 .await
121 .with_context(|| format!("Error creating directory {:?}", &path))
122 .map_err(|err| Error::new(anyhow::Error::msg(err), Default::default()))?;
123 prog_track.directories_created.inc();
124 let dir_summary = Summary {
125 directories_created: 1,
126 ..Default::default()
127 };
128 let recurse_summary = filegen(
129 prog_track, &path, &dirwidth, numfiles, filesize, writebuf, chunk_size, leaf_files,
130 )
131 .await?;
132 Ok(dir_summary + recurse_summary)
133 };
134 join_set.spawn(recurse());
135 }
136 let is_leaf = dirwidth.is_empty();
139 let should_generate_files = !leaf_files || is_leaf;
140 if should_generate_files {
141 for i in 0..numfiles {
142 throttle::get_ops_token().await;
146 let path = root.join(format!("file{i}"));
147 join_set.spawn(write_file(prog_track, path, filesize, writebuf, chunk_size));
148 }
149 }
150 let mut success = true;
151 let mut filegen_summary = Summary::default();
152 while let Some(res) = join_set.join_next().await {
153 match res.map_err(|err| Error::new(anyhow::Error::msg(err), Default::default()))? {
154 Ok(summary) => filegen_summary = filegen_summary + summary,
155 Err(error) => {
156 tracing::error!("filegen: {:?} failed with: {:?}", root, &error);
157 filegen_summary = filegen_summary + error.summary;
158 success = false;
159 }
160 }
161 }
162 if !success {
163 return Err(Error::new(
164 anyhow!("filegen: {:?} failed!", &root),
165 filegen_summary,
166 ));
167 }
168 Ok(filegen_summary)
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174 use crate::testutils;
175 use tracing_test::traced_test;
176
177 lazy_static! {
178 static ref PROGRESS: progress::Progress = progress::Progress::new();
179 }
180
181 #[tokio::test]
182 #[traced_test]
183 async fn test_basic_filegen() -> Result<(), anyhow::Error> {
184 let tmp_dir = testutils::create_temp_dir().await?;
185 let test_path = tmp_dir.as_path();
186 let summary = filegen(
188 &PROGRESS,
189 test_path,
190 &[2],
191 3,
192 100,
193 50, 0, false, )
197 .await?;
198 assert_eq!(summary.files_created, 9);
203 assert_eq!(summary.directories_created, 2);
204 assert_eq!(summary.bytes_written, 900);
205 assert!(test_path.join("file0").exists()); assert!(test_path.join("dir0").join("file0").exists());
208 assert!(test_path.join("dir0").join("file1").exists());
209 assert!(test_path.join("dir0").join("file2").exists());
210 assert!(test_path.join("dir1").join("file0").exists());
211 assert!(test_path.join("dir1").join("file1").exists());
212 assert!(test_path.join("dir1").join("file2").exists());
213 let metadata = tokio::fs::metadata(test_path.join("dir0").join("file0")).await?;
215 assert_eq!(metadata.len(), 100);
216 tokio::fs::remove_dir_all(test_path).await?;
218 Ok(())
219 }
220
221 #[tokio::test]
222 #[traced_test]
223 async fn test_nested_filegen() -> Result<(), anyhow::Error> {
224 let tmp_dir = testutils::create_temp_dir().await?;
225 let test_path = tmp_dir.as_path();
226 let summary = filegen(
228 &PROGRESS,
229 test_path,
230 &[2, 3],
231 4,
232 50,
233 25, 0, false, )
237 .await?;
238 assert_eq!(summary.files_created, 36);
243 assert_eq!(summary.directories_created, 8);
244 assert_eq!(summary.bytes_written, 1800);
245 assert!(test_path.join("file0").exists()); assert!(test_path.join("dir0").join("file0").exists()); assert!(test_path.join("dir0").join("dir0").join("file0").exists());
249 assert!(test_path.join("dir0").join("dir2").join("file3").exists());
250 assert!(test_path.join("dir1").join("dir1").join("file2").exists());
251 tokio::fs::remove_dir_all(test_path).await?;
253 Ok(())
254 }
255
256 #[tokio::test]
257 #[traced_test]
258 async fn test_deeply_nested_filegen() -> Result<(), anyhow::Error> {
259 let tmp_dir = testutils::create_temp_dir().await?;
260 let test_path = tmp_dir.as_path();
261 let summary = filegen(
263 &PROGRESS,
264 test_path,
265 &[2, 2, 2],
266 2,
267 10,
268 10, 0, false, )
272 .await?;
273 assert_eq!(summary.files_created, 30);
277 assert_eq!(summary.directories_created, 14);
278 assert_eq!(summary.bytes_written, 300);
279 assert!(test_path.join("file0").exists()); assert!(test_path
282 .join("dir0")
283 .join("dir0")
284 .join("dir0")
285 .join("file0")
286 .exists());
287 assert!(test_path
288 .join("dir1")
289 .join("dir1")
290 .join("dir1")
291 .join("file1")
292 .exists());
293 tokio::fs::remove_dir_all(test_path).await?;
295 Ok(())
296 }
297
298 #[tokio::test]
299 #[traced_test]
300 async fn test_single_file() -> Result<(), anyhow::Error> {
301 let tmp_dir = testutils::create_temp_dir().await?;
302 let test_path = tmp_dir.as_path();
303 let summary = filegen(
305 &PROGRESS,
306 test_path,
307 &[], 5, 200, 100, 0, false, )
314 .await?;
315 assert_eq!(summary.files_created, 5);
316 assert_eq!(summary.directories_created, 0);
317 assert_eq!(summary.bytes_written, 1000); for i in 0..5 {
319 let file_path = test_path.join(format!("file{i}"));
321 assert!(file_path.exists());
322 let metadata = tokio::fs::metadata(&file_path).await?;
323 assert_eq!(metadata.len(), 200);
324 }
325 tokio::fs::remove_dir_all(test_path).await?;
327 Ok(())
328 }
329
330 #[tokio::test]
331 #[traced_test]
332 async fn test_zero_files() -> Result<(), anyhow::Error> {
333 let tmp_dir = testutils::create_temp_dir().await?;
334 let test_path = tmp_dir.as_path();
335 let summary = filegen(
337 &PROGRESS,
338 test_path,
339 &[3, 2], 0, 100,
342 50,
343 0,
344 false, )
346 .await?;
347 assert_eq!(summary.files_created, 0);
349 assert_eq!(summary.directories_created, 9);
350 assert_eq!(summary.bytes_written, 0);
351 assert!(test_path.join("dir0").join("dir0").exists());
353 assert!(test_path.join("dir2").join("dir1").exists());
354 assert!(!test_path.join("dir0").join("file0").exists());
355 tokio::fs::remove_dir_all(test_path).await?;
357 Ok(())
358 }
359
360 #[tokio::test]
361 #[traced_test]
362 async fn test_leaf_files_only() -> Result<(), anyhow::Error> {
363 let tmp_dir = testutils::create_temp_dir().await?;
364 let test_path = tmp_dir.as_path();
365 let summary = filegen(
367 &PROGRESS,
368 test_path,
369 &[2, 3],
370 4,
371 50,
372 25, 0, true, )
376 .await?;
377 assert_eq!(summary.files_created, 24);
381 assert_eq!(summary.directories_created, 8);
382 assert_eq!(summary.bytes_written, 1200);
383 assert!(!test_path.join("file0").exists()); assert!(!test_path.join("dir0").join("file0").exists()); assert!(!test_path.join("dir1").join("file0").exists());
387 assert!(test_path.join("dir0").join("dir0").join("file0").exists());
389 assert!(test_path.join("dir0").join("dir0").join("file3").exists());
390 assert!(test_path.join("dir0").join("dir2").join("file0").exists());
391 assert!(test_path.join("dir1").join("dir1").join("file0").exists());
392 tokio::fs::remove_dir_all(test_path).await?;
394 Ok(())
395 }
396}