1use crate::stream_aggregator::{DtStreamAgg, DtStreamAggChunk};
13use crate::util::{Hhmmss as _, prettybytes};
14use anyhow::{self as ah, Context as _};
15use chrono::prelude::*;
16use disktest_rawio::{DEFAULT_SECTOR_SIZE, RawIo, RawIoOsIntf as _, RawIoResult};
17use movavg::MovAvg;
18use std::cmp::min;
19use std::path::{Path, PathBuf};
20use std::sync::Arc;
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::thread::available_parallelism;
23use std::time::Instant;
24
25pub use crate::stream_aggregator::DtStreamType;
26
27const LOG_BYTE_THRES: u64 = 1024 * 1024;
28const LOG_SEC_THRES: u64 = 10;
29
30#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
32pub enum DisktestQuiet {
33 Normal = 0,
35 Reduced = 1,
37 NoInfo = 2,
39 NoWarn = 3,
41}
42
43pub struct DisktestFile {
45 path: PathBuf,
46 read: bool,
47 write: bool,
48 io: Option<RawIo>,
49 drop_offset: u64,
50 drop_count: u64,
51 quiet_level: DisktestQuiet,
52}
53
54impl DisktestFile {
55 pub fn open(path: &Path, read: bool, write: bool) -> ah::Result<DisktestFile> {
57 Ok(DisktestFile {
58 path: path.to_path_buf(),
59 read,
60 write,
61 io: None,
62 drop_offset: 0,
63 drop_count: 0,
64 quiet_level: DisktestQuiet::Normal,
65 })
66 }
67
68 fn do_open(&mut self) -> ah::Result<()> {
69 if self.io.is_none() {
70 self.io = Some(RawIo::new(&self.path, self.write, self.read, self.write)?);
71 self.drop_offset = 0;
72 self.drop_count = 0;
73 }
74 Ok(())
75 }
76
77 fn close(&mut self) -> ah::Result<()> {
79 let drop_offset = self.drop_offset;
80 let drop_count = self.drop_count;
81
82 self.drop_offset += drop_count;
83 self.drop_count = 0;
84
85 if let Some(mut io) = self.io.take() {
87 if drop_count > 0 {
89 if let Err(e) = io.drop_file_caches(drop_offset, drop_count) {
90 return Err(ah::format_err!("Cache drop error: {e}"));
91 }
92 } else {
93 io.close()?;
94 }
95 }
96 Ok(())
97 }
98
99 fn get_sector_size(&mut self) -> ah::Result<Option<u32>> {
101 self.do_open()?;
102 let io = self.io.as_ref().expect("get_sector_size: No file.");
103 Ok(io.get_sector_size())
104 }
105
106 fn seek(&mut self, offset: u64) -> ah::Result<u64> {
108 if self.drop_count > 0 {
109 self.close()?;
110 }
111 self.do_open()?;
112 match self.seek_noflush(offset) {
113 Ok(x) => {
114 self.drop_offset = offset;
115 self.drop_count = 0;
116 Ok(x)
117 }
118 other => other,
119 }
120 }
121
122 fn seek_noflush(&mut self, offset: u64) -> ah::Result<u64> {
124 self.do_open()?;
125 let io = self.io.as_mut().expect("seek: No file.");
126 io.seek(offset)
127 }
128
129 fn sync(&mut self) -> ah::Result<()> {
131 if let Some(io) = self.io.as_mut() {
132 io.sync()
133 } else {
134 Ok(())
135 }
136 }
137
138 fn read(&mut self, buffer: &mut [u8]) -> ah::Result<RawIoResult> {
140 self.do_open()?;
141 let io = self.io.as_mut().expect("read: No file.");
142 io.read(buffer)
143 }
144
145 fn write(&mut self, buffer: &[u8]) -> ah::Result<RawIoResult> {
147 self.do_open()?;
148 let io = self.io.as_mut().expect("write: No file.");
149 match io.write(buffer) {
150 Ok(res) => {
151 self.drop_count += buffer.len() as u64;
152 Ok(res)
153 }
154 Err(e) => Err(e),
155 }
156 }
157
158 fn get_path(&self) -> &PathBuf {
160 &self.path
161 }
162}
163
164impl Drop for DisktestFile {
165 fn drop(&mut self) {
166 if self.io.is_some() {
167 if self.quiet_level < DisktestQuiet::NoWarn {
168 eprintln!("WARNING: File not closed. Closing now...");
169 }
170 if let Err(e) = self.close() {
171 panic!("Failed to drop operating system caches: {e}");
172 }
173 }
174 }
175}
176
177pub struct Disktest {
179 stream_agg: DtStreamAgg,
180 abort: Option<Arc<AtomicBool>>,
181 log_count: u64,
182 log_time: Instant,
183 rate_count: u64,
184 rate_count_start_time: Instant,
185 rate_avg: MovAvg<u64, u64, 5>,
186 begin_time: Instant,
187 quiet_level: DisktestQuiet,
188}
189
190impl Disktest {
191 pub const UNLIMITED: u64 = u64::MAX;
193
194 pub fn new(
214 algorithm: DtStreamType,
215 seed: &[u8],
216 round_id: u64,
217 invert_pattern: bool,
218 nr_threads: usize,
219 quiet_level: DisktestQuiet,
220 abort: Option<Arc<AtomicBool>>,
221 ) -> Disktest {
222 let nr_threads = if nr_threads == 0 {
223 if let Ok(cpus) = available_parallelism() {
224 cpus.get()
225 } else {
226 1
227 }
228 } else {
229 nr_threads
230 };
231
232 let now = Instant::now();
233 Disktest {
234 stream_agg: DtStreamAgg::new(
235 algorithm,
236 seed,
237 round_id,
238 invert_pattern,
239 nr_threads,
240 quiet_level,
241 ),
242 abort,
243 log_count: 0,
244 log_time: now,
245 rate_count: 0,
246 rate_count_start_time: now,
247 rate_avg: MovAvg::new(),
248 begin_time: now,
249 quiet_level,
250 }
251 }
252
253 fn abort_requested(&self) -> bool {
255 if let Some(abort) = &self.abort {
256 abort.load(Ordering::Relaxed)
257 } else {
258 false
259 }
260 }
261
262 fn log_reset(&mut self) {
264 let now = Instant::now();
265 self.log_count = 0;
266 self.log_time = now;
267 self.rate_count = 0;
268 self.rate_count_start_time = now;
269 self.rate_avg.reset();
270 self.begin_time = now;
271 }
272
273 fn log(&mut self, prefix: &str, inc_processed: usize, abs_processed: u64, final_step: bool) {
275 if self.quiet_level < DisktestQuiet::NoInfo {
277 self.log_count += inc_processed as u64;
281 self.rate_count += inc_processed as u64;
282 if (self.log_count >= LOG_BYTE_THRES && self.quiet_level == DisktestQuiet::Normal)
283 || final_step
284 {
285 let now = Instant::now();
287 let expired = now.duration_since(self.log_time).as_secs() >= LOG_SEC_THRES;
288
289 if (expired && self.quiet_level == DisktestQuiet::Normal) || final_step {
290 let tod = Local::now().format("%R");
291
292 let dur_elapsed = now - self.begin_time;
293
294 let rate = if final_step {
295 let elapsed_ms = dur_elapsed.as_millis();
296 (u128::from(abs_processed) * 1000)
297 .checked_div(elapsed_ms)
298 .map(|rate| u64::try_from(rate).unwrap_or(u64::MAX))
299 } else {
300 let rate_period_ms = (now - self.rate_count_start_time).as_millis();
301 (u128::from(self.rate_count) * 1000)
302 .checked_div(rate_period_ms)
303 .map(|rate| self.rate_avg.feed(u64::try_from(rate).unwrap_or(u64::MAX)))
304 };
305
306 let rate_string = if let Some(rate) = rate {
307 format!(" @ {}/s", prettybytes(rate, true, false, false))
308 } else {
309 "".to_string()
310 };
311
312 let suffix = if final_step { "." } else { " ..." };
313
314 println!(
315 "[{} / {}] {}{}{}{}",
316 tod,
317 dur_elapsed.hhmmss(),
318 prefix,
319 prettybytes(abs_processed, true, true, final_step),
320 rate_string,
321 suffix
322 );
323 self.log_time = now;
324 self.rate_count_start_time = now;
325 self.rate_count = 0;
326 }
327 self.log_count = 0;
328 }
329 }
330 }
331
332 fn init(
334 &mut self,
335 file: &mut DisktestFile,
336 prefix: &str,
337 seek: u64,
338 max_bytes: u64,
339 ) -> ah::Result<u64> {
340 file.quiet_level = self.quiet_level;
341 self.log_reset();
342
343 let sector_size = file.get_sector_size().unwrap_or(None);
344
345 if self.quiet_level < DisktestQuiet::NoInfo {
346 let sector_str = if let Some(sector_size) = sector_size.as_ref() {
347 format!(
348 " ({} sectors)",
349 prettybytes((*sector_size).into(), true, false, false),
350 )
351 } else {
352 "".to_string()
353 };
354 println!(
355 "{} {}{}, starting at position {}...",
356 prefix,
357 file.get_path().display(),
358 sector_str,
359 prettybytes(seek, true, true, false)
360 );
361 }
362
363 let res = self
364 .stream_agg
365 .activate(seek, sector_size.unwrap_or(DEFAULT_SECTOR_SIZE))?;
366
367 if let Err(e) = file.seek(res.byte_offset) {
368 return Err(ah::format_err!("File seek to {seek} failed: {e}"));
369 }
370
371 if let Some(sector_size) = sector_size.as_ref() {
372 if max_bytes < u64::MAX
373 && max_bytes % u64::from(*sector_size) != 0
374 && self.quiet_level < DisktestQuiet::NoWarn
375 {
376 #[cfg(target_os = "windows")]
377 eprintln!(
378 "WARNING: The desired byte count of {} is not a multiple of the sector size {}. \
379 This might result in a write or read error at the very end.",
380 prettybytes(max_bytes, true, true, true),
381 prettybytes(u64::from(*sector_size), true, true, true)
382 );
383 }
384 }
385
386 Ok(res.chunk_size)
387 }
388
389 fn write_finalize(
391 &mut self,
392 file: &mut DisktestFile,
393 success: bool,
394 bytes_written: u64,
395 ) -> ah::Result<()> {
396 if self.quiet_level < DisktestQuiet::NoInfo {
397 println!("Writing stopped. Syncing...");
398 }
399 if let Err(e) = file.sync() {
400 return Err(ah::format_err!("Sync failed: {e}"));
401 }
402
403 self.log(
404 if success { "Done. Wrote " } else { "Wrote " },
405 0,
406 bytes_written,
407 true,
408 );
409
410 if let Err(e) = file.close() {
411 return Err(ah::format_err!(
412 "Failed to drop operating system caches: {e}"
413 ));
414 }
415 if success && self.quiet_level < DisktestQuiet::NoInfo {
416 println!("Successfully dropped file caches.");
417 }
418
419 Ok(())
420 }
421
422 pub fn write(&mut self, file: DisktestFile, seek: u64, max_bytes: u64) -> ah::Result<u64> {
424 let mut file = file;
425 let mut bytes_left = max_bytes;
426 let mut bytes_written = 0_u64;
427
428 let write_chunk_size = self.init(&mut file, "Writing", seek, max_bytes)?;
429 loop {
430 let chunk = self.stream_agg.wait_chunk()?;
432 let this_chunk = min(write_chunk_size, bytes_left);
433 let write_len = usize::try_from(this_chunk).unwrap_or(usize::MAX);
434
435 match file.write(&chunk.get_data()[0..write_len]) {
437 Ok(RawIoResult::Ok(_)) => (),
438 Ok(RawIoResult::Enospc) => {
439 if max_bytes == Disktest::UNLIMITED {
440 self.write_finalize(&mut file, true, bytes_written)?;
441 break; }
443 let _ = self.write_finalize(&mut file, false, bytes_written);
444 return Err(ah::format_err!("Write error: Out of disk space."));
445 }
446 Err(e) => {
447 let _ = self.write_finalize(&mut file, false, bytes_written);
448 return Err(e);
449 }
450 }
451
452 bytes_written += write_len as u64;
454 bytes_left -= write_len as u64;
455 if bytes_left == 0 {
456 self.write_finalize(&mut file, true, bytes_written)?;
457 break;
458 }
459 self.log("Wrote ", write_len, bytes_written, false);
460
461 if self.abort_requested() {
462 let _ = self.write_finalize(&mut file, false, bytes_written);
463 return Err(ah::format_err!("Aborted by signal!"));
464 }
465 }
466
467 Ok(bytes_written)
468 }
469
470 fn verify_finalize(
472 &mut self,
473 file: &mut DisktestFile,
474 success: bool,
475 bytes_read: u64,
476 ) -> ah::Result<()> {
477 self.log(
478 if success {
479 "Done. Verified "
480 } else {
481 "Verified "
482 },
483 0,
484 bytes_read,
485 true,
486 );
487 if let Err(e) = file.close() {
488 return Err(ah::format_err!("Failed to close device: {e}"));
489 }
490 Ok(())
491 }
492
493 fn verify_failed(
495 &mut self,
496 file: &mut DisktestFile,
497 read_count: usize,
498 bytes_read: u64,
499 buffer: &[u8],
500 chunk: &DtStreamAggChunk,
501 ) -> ah::Error {
502 if let Err(e) = self.verify_finalize(file, false, bytes_read) {
503 if self.quiet_level < DisktestQuiet::NoWarn {
504 eprintln!("{e}");
505 }
506 }
507 for (i, buffer_byte) in buffer.iter().enumerate().take(read_count) {
508 if *buffer_byte != chunk.get_data()[i] {
509 let pos = bytes_read + i as u64;
510 if pos >= 1024 {
511 return ah::format_err!(
512 "Data MISMATCH at {}!",
513 prettybytes(pos, true, true, true)
514 );
515 }
516 return ah::format_err!("Data MISMATCH at byte {pos}!");
517 }
518 }
519 panic!("Internal error: verify_failed() no mismatch.");
520 }
521
522 pub fn verify(&mut self, file: DisktestFile, seek: u64, max_bytes: u64) -> ah::Result<u64> {
524 let mut file = file;
525 let mut bytes_left = max_bytes;
526 let mut bytes_read = 0_u64;
527
528 let readbuf_len = self.init(&mut file, "Verifying", seek, max_bytes)?;
529 let readbuf_len =
530 usize::try_from(readbuf_len).context("Number of bytes overflows usize")?;
531 let mut buffer = vec![0; readbuf_len];
532 let mut read_count = 0;
533 let read_len = min(readbuf_len as u64, bytes_left);
534 let mut read_len = usize::try_from(read_len).context("Number of bytes overflows usize")?;
535
536 loop {
537 match file.read(&mut buffer[read_count..read_count + (read_len - read_count)]) {
539 Ok(RawIoResult::Ok(n)) => {
540 read_count += n;
541
542 assert!(read_count <= read_len);
544 if read_count == read_len || (read_count > 0 && n == 0) {
545 let chunk = self.stream_agg.wait_chunk()?;
547 if buffer[..read_count] != chunk.get_data()[..read_count] {
548 return Err(self.verify_failed(
549 &mut file, read_count, bytes_read, &buffer, &chunk,
550 ));
551 }
552
553 bytes_read += u64::try_from(read_count).context("u64 overflow")?;
555 bytes_left -= u64::try_from(read_count).context("u64 overflow")?;
556 if bytes_left == 0 {
557 self.verify_finalize(&mut file, true, bytes_read)?;
558 break;
559 }
560 self.log("Verified ", read_count, bytes_read, false);
561 read_count = 0;
562 read_len = usize::try_from(min(readbuf_len as u64, bytes_left))
563 .context("Number of bytes overflows usize")?;
564 }
565
566 if n == 0 {
568 self.verify_finalize(&mut file, true, bytes_read)?;
569 break;
570 }
571 }
572 Ok(_) => unreachable!(),
573 Err(e) => {
574 let _ = self.verify_finalize(&mut file, false, bytes_read);
575 return Err(ah::format_err!(
576 "Read error at {}: {}",
577 prettybytes(bytes_read, true, true, true),
578 e
579 ));
580 }
581 }
582
583 if self.abort_requested() {
584 let _ = self.verify_finalize(&mut file, false, bytes_read);
585 return Err(ah::format_err!("Aborted by signal!"));
586 }
587 }
588
589 Ok(bytes_read)
590 }
591}
592
593#[cfg(test)]
594mod tests {
595 use super::*;
596 use crate::generator::{GeneratorChaCha8, GeneratorChaCha12, GeneratorChaCha20, GeneratorCrc};
597 use std::fs::OpenOptions;
598 use std::io::{Seek as _, SeekFrom, Write as _};
599 use std::path::PathBuf;
600 use tempfile::tempdir;
601
602 fn run_test(algorithm: DtStreamType, base_size: usize, chunk_factor: usize) {
603 let tdir = tempdir().unwrap();
604 let tdir_path = tdir.path();
605 let mut serial = 0;
606
607 let seed = vec![42, 43, 44, 45];
608 let nr_threads = 2;
609 let mut dt = Disktest::new(
610 algorithm,
611 &seed,
612 0,
613 false,
614 nr_threads,
615 DisktestQuiet::Normal,
616 None,
617 );
618
619 let mk_filepath = |num| {
620 let mut path = PathBuf::from(tdir_path);
621 path.push(format!("tmp-{num}.img"));
622 path
623 };
624
625 let mk_file = |num, create| {
626 let path = mk_filepath(num);
627 let io = RawIo::new(&path, create, true, true).unwrap();
628 DisktestFile {
629 path,
630 read: true,
631 write: true,
632 io: Some(io),
633 drop_offset: 0,
634 drop_count: 0,
635 quiet_level: DisktestQuiet::Normal,
636 }
637 };
638
639 {
641 let nr_bytes = 1000;
642 assert_eq!(
643 dt.write(mk_file(serial, true), 0, nr_bytes).unwrap(),
644 nr_bytes
645 );
646 assert_eq!(
647 dt.verify(mk_file(serial, false), 0, u64::MAX).unwrap(),
648 nr_bytes
649 );
650 serial += 1;
651 }
652
653 {
655 let nr_bytes = 1000;
656 assert_eq!(
657 dt.write(mk_file(serial, true), 0, nr_bytes).unwrap(),
658 nr_bytes
659 );
660 assert_eq!(
661 dt.verify(mk_file(serial, false), 0, nr_bytes / 2).unwrap(),
662 nr_bytes / 2
663 );
664 serial += 1;
665 }
666
667 {
669 let nr_bytes = (base_size * chunk_factor * nr_threads * 2 + 100) as u64;
670 assert_eq!(
671 dt.write(mk_file(serial, true), 0, nr_bytes).unwrap(),
672 nr_bytes
673 );
674 assert_eq!(
675 dt.verify(mk_file(serial, false), 0, u64::MAX).unwrap(),
676 nr_bytes
677 );
678 serial += 1;
679 }
680
681 {
683 let nr_bytes = 1000;
684 {
685 let mut f = mk_file(serial, true);
686 f.io.as_mut().unwrap().set_len(100).unwrap();
687 f.io.as_mut().unwrap().seek(10).unwrap();
688 assert_eq!(dt.write(f, 0, nr_bytes).unwrap(), nr_bytes);
689 }
690 assert_eq!(
691 dt.verify(mk_file(serial, false), 0, u64::MAX).unwrap(),
692 nr_bytes
693 );
694 serial += 1;
695 }
696
697 {
699 let nr_bytes = 1000;
700 assert_eq!(
701 dt.write(mk_file(serial, true), 0, nr_bytes).unwrap(),
702 nr_bytes
703 );
704 {
705 let path = mk_filepath(serial);
706 let mut file = OpenOptions::new()
707 .read(true)
708 .write(true)
709 .open(path)
710 .unwrap();
711 file.seek(SeekFrom::Start(10)).unwrap();
712 writeln!(&file, "X").unwrap();
713 }
714 match dt.verify(mk_file(serial, false), 0, nr_bytes) {
715 Ok(_) => panic!("Verify of modified data did not fail!"),
716 Err(e) => assert_eq!(e.to_string(), "Data MISMATCH at byte 10!"),
717 }
718 serial += 1;
719 }
720
721 {
723 let nr_bytes = (base_size * chunk_factor * nr_threads * 10) as u64;
724 assert_eq!(
725 dt.write(mk_file(serial, true), 0, nr_bytes).unwrap(),
726 nr_bytes
727 );
728 for offset in (0..nr_bytes).step_by(base_size * chunk_factor / 2) {
729 let bytes_verified = dt.verify(mk_file(serial, false), offset, u64::MAX).unwrap();
730 assert!(bytes_verified > 0 && bytes_verified <= nr_bytes);
731 }
732 serial += 1;
733 }
734
735 {
737 let nr_bytes = (base_size * chunk_factor * nr_threads * 10) as u64;
738 assert_eq!(
739 dt.write(mk_file(serial, true), 0, nr_bytes).unwrap(),
740 nr_bytes
741 );
742 let offset = (base_size * chunk_factor * nr_threads * 2) as u64;
743 assert_eq!(
744 dt.write(mk_file(serial, false), offset, nr_bytes).unwrap(),
745 nr_bytes
746 );
747 assert_eq!(
748 dt.verify(mk_file(serial, false), 0, u64::MAX).unwrap(),
749 nr_bytes + offset
750 );
751 }
753
754 tdir.close().unwrap();
755 }
756
757 #[test]
758 fn test_chacha8() {
759 run_test(
760 DtStreamType::ChaCha8,
761 GeneratorChaCha8::BASE_SIZE,
762 GeneratorChaCha8::DEFAULT_CHUNK_FACTOR,
763 );
764 }
765
766 #[test]
767 fn test_chacha12() {
768 run_test(
769 DtStreamType::ChaCha12,
770 GeneratorChaCha12::BASE_SIZE,
771 GeneratorChaCha12::DEFAULT_CHUNK_FACTOR,
772 );
773 }
774
775 #[test]
776 fn test_chacha20() {
777 run_test(
778 DtStreamType::ChaCha20,
779 GeneratorChaCha20::BASE_SIZE,
780 GeneratorChaCha20::DEFAULT_CHUNK_FACTOR,
781 );
782 }
783
784 #[test]
785 fn test_crc() {
786 run_test(
787 DtStreamType::Crc,
788 GeneratorCrc::BASE_SIZE,
789 GeneratorCrc::DEFAULT_CHUNK_FACTOR,
790 );
791 }
792}
793
794