1use crate::stream_aggregator::{DtStreamAgg, DtStreamAggChunk};
13use crate::util::{prettybytes, Hhmmss};
14use anyhow as ah;
15use chrono::prelude::*;
16use disktest_rawio::{RawIo, RawIoOsIntf, RawIoResult, DEFAULT_SECTOR_SIZE};
17use movavg::MovAvg;
18use std::cmp::min;
19use std::path::{Path, PathBuf};
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::Arc;
22use std::thread::available_parallelism;
23use std::time::Instant;
24
25pub use crate::stream_aggregator::DtStreamType;
26
27const LOG_BYTE_THRES: u64 = 1024 * 1024;
28const LOG_SEC_THRES: u64 = 10;
29
30#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
32pub enum DisktestQuiet {
33 Normal = 0,
35 Reduced = 1,
37 NoInfo = 2,
39 NoWarn = 3,
41}
42
43pub struct DisktestFile {
45 path: PathBuf,
46 read: bool,
47 write: bool,
48 io: Option<RawIo>,
49 drop_offset: u64,
50 drop_count: u64,
51 quiet_level: DisktestQuiet,
52}
53
54impl DisktestFile {
55 pub fn open(path: &Path, read: bool, write: bool) -> ah::Result<DisktestFile> {
57 Ok(DisktestFile {
58 path: path.to_path_buf(),
59 read,
60 write,
61 io: None,
62 drop_offset: 0,
63 drop_count: 0,
64 quiet_level: DisktestQuiet::Normal,
65 })
66 }
67
68 fn do_open(&mut self) -> ah::Result<()> {
69 if self.io.is_none() {
70 self.io = Some(RawIo::new(&self.path, self.write, self.read, self.write)?);
71 self.drop_offset = 0;
72 self.drop_count = 0;
73 }
74 Ok(())
75 }
76
77 fn close(&mut self) -> ah::Result<()> {
79 let drop_offset = self.drop_offset;
80 let drop_count = self.drop_count;
81
82 self.drop_offset += drop_count;
83 self.drop_count = 0;
84
85 if let Some(mut io) = self.io.take() {
87 if drop_count > 0 {
89 if let Err(e) = io.drop_file_caches(drop_offset, drop_count) {
90 return Err(ah::format_err!("Cache drop error: {}", e));
91 }
92 } else {
93 io.close()?;
94 }
95 }
96 Ok(())
97 }
98
99 fn get_sector_size(&mut self) -> ah::Result<Option<u32>> {
101 self.do_open()?;
102 let io = self.io.as_ref().expect("get_sector_size: No file.");
103 Ok(io.get_sector_size())
104 }
105
106 fn seek(&mut self, offset: u64) -> ah::Result<u64> {
108 if self.drop_count > 0 {
109 self.close()?;
110 }
111 self.do_open()?;
112 match self.seek_noflush(offset) {
113 Ok(x) => {
114 self.drop_offset = offset;
115 self.drop_count = 0;
116 Ok(x)
117 }
118 other => other,
119 }
120 }
121
122 fn seek_noflush(&mut self, offset: u64) -> ah::Result<u64> {
124 self.do_open()?;
125 let io = self.io.as_mut().expect("seek: No file.");
126 io.seek(offset)
127 }
128
129 fn sync(&mut self) -> ah::Result<()> {
131 if let Some(io) = self.io.as_mut() {
132 io.sync()
133 } else {
134 Ok(())
135 }
136 }
137
138 fn read(&mut self, buffer: &mut [u8]) -> ah::Result<RawIoResult> {
140 self.do_open()?;
141 let io = self.io.as_mut().expect("read: No file.");
142 io.read(buffer)
143 }
144
145 fn write(&mut self, buffer: &[u8]) -> ah::Result<RawIoResult> {
147 self.do_open()?;
148 let io = self.io.as_mut().expect("write: No file.");
149 match io.write(buffer) {
150 Ok(res) => {
151 self.drop_count += buffer.len() as u64;
152 Ok(res)
153 }
154 Err(e) => Err(e),
155 }
156 }
157
158 fn get_path(&self) -> &PathBuf {
160 &self.path
161 }
162}
163
164impl Drop for DisktestFile {
165 fn drop(&mut self) {
166 if self.io.is_some() {
167 if self.quiet_level < DisktestQuiet::NoWarn {
168 eprintln!("WARNING: File not closed. Closing now...");
169 }
170 if let Err(e) = self.close() {
171 panic!("Failed to drop operating system caches: {}", e);
172 }
173 }
174 }
175}
176
177pub struct Disktest {
179 stream_agg: DtStreamAgg,
180 abort: Option<Arc<AtomicBool>>,
181 log_count: u64,
182 log_time: Instant,
183 rate_count: u64,
184 rate_count_start_time: Instant,
185 rate_avg: MovAvg<u64, u64, 5>,
186 begin_time: Instant,
187 quiet_level: DisktestQuiet,
188}
189
190impl Disktest {
191 pub const UNLIMITED: u64 = u64::MAX;
193
194 pub fn new(
214 algorithm: DtStreamType,
215 seed: &[u8],
216 round_id: u64,
217 invert_pattern: bool,
218 nr_threads: usize,
219 quiet_level: DisktestQuiet,
220 abort: Option<Arc<AtomicBool>>,
221 ) -> Disktest {
222 let nr_threads = if nr_threads == 0 {
223 if let Ok(cpus) = available_parallelism() {
224 cpus.get()
225 } else {
226 1
227 }
228 } else {
229 nr_threads
230 };
231
232 let now = Instant::now();
233 Disktest {
234 stream_agg: DtStreamAgg::new(
235 algorithm,
236 seed,
237 round_id,
238 invert_pattern,
239 nr_threads,
240 quiet_level,
241 ),
242 abort,
243 log_count: 0,
244 log_time: now,
245 rate_count: 0,
246 rate_count_start_time: now,
247 rate_avg: MovAvg::new(),
248 begin_time: now,
249 quiet_level,
250 }
251 }
252
253 fn abort_requested(&self) -> bool {
255 if let Some(abort) = &self.abort {
256 abort.load(Ordering::Relaxed)
257 } else {
258 false
259 }
260 }
261
262 fn log_reset(&mut self) {
264 let now = Instant::now();
265 self.log_count = 0;
266 self.log_time = now;
267 self.rate_count = 0;
268 self.rate_count_start_time = now;
269 self.rate_avg.reset();
270 self.begin_time = now;
271 }
272
273 fn log(&mut self, prefix: &str, inc_processed: usize, abs_processed: u64, final_step: bool) {
275 if self.quiet_level < DisktestQuiet::NoInfo {
277 self.log_count += inc_processed as u64;
281 self.rate_count += inc_processed as u64;
282 if (self.log_count >= LOG_BYTE_THRES && self.quiet_level == DisktestQuiet::Normal)
283 || final_step
284 {
285 let now = Instant::now();
287 let expired = now.duration_since(self.log_time).as_secs() >= LOG_SEC_THRES;
288
289 if (expired && self.quiet_level == DisktestQuiet::Normal) || final_step {
290 let tod = Local::now().format("%R");
291
292 let dur_elapsed = now - self.begin_time;
293
294 let rate = if final_step {
295 let elapsed_ms = dur_elapsed.as_millis();
296 if elapsed_ms > 0 {
297 Some(((abs_processed as u128 * 1000) / elapsed_ms) as u64)
298 } else {
299 None
300 }
301 } else {
302 let rate_period_ms = (now - self.rate_count_start_time).as_millis();
303 if rate_period_ms > 0 {
304 let rate = ((self.rate_count as u128 * 1000) / rate_period_ms) as u64;
305 Some(self.rate_avg.feed(rate))
306 } else {
307 None
308 }
309 };
310
311 let rate_string = if let Some(rate) = rate {
312 format!(" @ {}/s", prettybytes(rate, true, false, false))
313 } else {
314 "".to_string()
315 };
316
317 let suffix = if final_step { "." } else { " ..." };
318
319 println!(
320 "[{} / {}] {}{}{}{}",
321 tod,
322 dur_elapsed.hhmmss(),
323 prefix,
324 prettybytes(abs_processed, true, true, final_step),
325 rate_string,
326 suffix
327 );
328 self.log_time = now;
329 self.rate_count_start_time = now;
330 self.rate_count = 0;
331 }
332 self.log_count = 0;
333 }
334 }
335 }
336
337 fn init(
339 &mut self,
340 file: &mut DisktestFile,
341 prefix: &str,
342 seek: u64,
343 max_bytes: u64,
344 ) -> ah::Result<u64> {
345 file.quiet_level = self.quiet_level;
346 self.log_reset();
347
348 let sector_size = file.get_sector_size().unwrap_or(None);
349
350 if self.quiet_level < DisktestQuiet::NoInfo {
351 let sector_str = if let Some(sector_size) = sector_size.as_ref() {
352 format!(
353 " ({} sectors)",
354 prettybytes(*sector_size as _, true, false, false),
355 )
356 } else {
357 "".to_string()
358 };
359 println!(
360 "{} {}{}, starting at position {}...",
361 prefix,
362 file.get_path().display(),
363 sector_str,
364 prettybytes(seek, true, true, false)
365 );
366 }
367
368 let res = self
369 .stream_agg
370 .activate(seek, sector_size.unwrap_or(DEFAULT_SECTOR_SIZE))?;
371
372 if let Err(e) = file.seek(res.byte_offset) {
373 return Err(ah::format_err!("File seek to {seek} failed: {e}"));
374 }
375
376 if let Some(sector_size) = sector_size.as_ref() {
377 if max_bytes < u64::MAX
378 && max_bytes % *sector_size as u64 != 0
379 && self.quiet_level < DisktestQuiet::NoWarn
380 {
381 #[cfg(target_os = "windows")]
382 eprintln!("WARNING: The desired byte count of {} is not a multiple of the sector size {}. \
383 This might result in a write or read error at the very end.",
384 prettybytes(max_bytes, true, true, true),
385 prettybytes(*sector_size as u64, true, true, true));
386 }
387 }
388
389 Ok(res.chunk_size)
390 }
391
392 fn write_finalize(
394 &mut self,
395 file: &mut DisktestFile,
396 success: bool,
397 bytes_written: u64,
398 ) -> ah::Result<()> {
399 if self.quiet_level < DisktestQuiet::NoInfo {
400 println!("Writing stopped. Syncing...");
401 }
402 if let Err(e) = file.sync() {
403 return Err(ah::format_err!("Sync failed: {}", e));
404 }
405
406 self.log(
407 if success { "Done. Wrote " } else { "Wrote " },
408 0,
409 bytes_written,
410 true,
411 );
412
413 if let Err(e) = file.close() {
414 return Err(ah::format_err!(
415 "Failed to drop operating system caches: {}",
416 e
417 ));
418 }
419 if success && self.quiet_level < DisktestQuiet::NoInfo {
420 println!("Successfully dropped file caches.");
421 }
422
423 Ok(())
424 }
425
426 pub fn write(&mut self, file: DisktestFile, seek: u64, max_bytes: u64) -> ah::Result<u64> {
428 let mut file = file;
429 let mut bytes_left = max_bytes;
430 let mut bytes_written = 0u64;
431
432 let write_chunk_size = self.init(&mut file, "Writing", seek, max_bytes)?;
433 loop {
434 let chunk = self.stream_agg.wait_chunk()?;
436 let write_len = min(write_chunk_size, bytes_left) as usize;
437
438 match file.write(&chunk.get_data()[0..write_len]) {
440 Ok(RawIoResult::Ok(_)) => (),
441 Ok(RawIoResult::Enospc) => {
442 if max_bytes == Disktest::UNLIMITED {
443 self.write_finalize(&mut file, true, bytes_written)?;
444 break; }
446 let _ = self.write_finalize(&mut file, false, bytes_written);
447 return Err(ah::format_err!("Write error: Out of disk space."));
448 }
449 Err(e) => {
450 let _ = self.write_finalize(&mut file, false, bytes_written);
451 return Err(e);
452 }
453 }
454
455 bytes_written += write_len as u64;
457 bytes_left -= write_len as u64;
458 if bytes_left == 0 {
459 self.write_finalize(&mut file, true, bytes_written)?;
460 break;
461 }
462 self.log("Wrote ", write_len, bytes_written, false);
463
464 if self.abort_requested() {
465 let _ = self.write_finalize(&mut file, false, bytes_written);
466 return Err(ah::format_err!("Aborted by signal!"));
467 }
468 }
469
470 Ok(bytes_written)
471 }
472
473 fn verify_finalize(
475 &mut self,
476 file: &mut DisktestFile,
477 success: bool,
478 bytes_read: u64,
479 ) -> ah::Result<()> {
480 self.log(
481 if success {
482 "Done. Verified "
483 } else {
484 "Verified "
485 },
486 0,
487 bytes_read,
488 true,
489 );
490 if let Err(e) = file.close() {
491 return Err(ah::format_err!("Failed to close device: {}", e));
492 }
493 Ok(())
494 }
495
496 fn verify_failed(
498 &mut self,
499 file: &mut DisktestFile,
500 read_count: usize,
501 bytes_read: u64,
502 buffer: &[u8],
503 chunk: &DtStreamAggChunk,
504 ) -> ah::Error {
505 if let Err(e) = self.verify_finalize(file, false, bytes_read) {
506 if self.quiet_level < DisktestQuiet::NoWarn {
507 eprintln!("{}", e);
508 }
509 }
510 for (i, buffer_byte) in buffer.iter().enumerate().take(read_count) {
511 if *buffer_byte != chunk.get_data()[i] {
512 let pos = bytes_read + i as u64;
513 if pos >= 1024 {
514 return ah::format_err!(
515 "Data MISMATCH at {}!",
516 prettybytes(pos, true, true, true)
517 );
518 } else {
519 return ah::format_err!("Data MISMATCH at byte {}!", pos);
520 }
521 }
522 }
523 panic!("Internal error: verify_failed() no mismatch.");
524 }
525
526 pub fn verify(&mut self, file: DisktestFile, seek: u64, max_bytes: u64) -> ah::Result<u64> {
528 let mut file = file;
529 let mut bytes_left = max_bytes;
530 let mut bytes_read = 0u64;
531
532 let readbuf_len = self.init(&mut file, "Verifying", seek, max_bytes)? as usize;
533 let mut buffer = vec![0; readbuf_len];
534 let mut read_count = 0;
535 let mut read_len = min(readbuf_len as u64, bytes_left) as usize;
536
537 loop {
538 match file.read(&mut buffer[read_count..read_count + (read_len - read_count)]) {
540 Ok(RawIoResult::Ok(n)) => {
541 read_count += n;
542
543 assert!(read_count <= read_len);
545 if read_count == read_len || (read_count > 0 && n == 0) {
546 let chunk = self.stream_agg.wait_chunk()?;
548 if buffer[..read_count] != chunk.get_data()[..read_count] {
549 return Err(self.verify_failed(
550 &mut file, read_count, bytes_read, &buffer, &chunk,
551 ));
552 }
553
554 bytes_read += read_count as u64;
556 bytes_left -= read_count as u64;
557 if bytes_left == 0 {
558 self.verify_finalize(&mut file, true, bytes_read)?;
559 break;
560 }
561 self.log("Verified ", read_count, bytes_read, false);
562 read_count = 0;
563 read_len = min(readbuf_len as u64, bytes_left) as usize;
564 }
565
566 if n == 0 {
568 self.verify_finalize(&mut file, true, bytes_read)?;
569 break;
570 }
571 }
572 Ok(_) => unreachable!(),
573 Err(e) => {
574 let _ = self.verify_finalize(&mut file, false, bytes_read);
575 return Err(ah::format_err!(
576 "Read error at {}: {}",
577 prettybytes(bytes_read, true, true, true),
578 e
579 ));
580 }
581 };
582
583 if self.abort_requested() {
584 let _ = self.verify_finalize(&mut file, false, bytes_read);
585 return Err(ah::format_err!("Aborted by signal!"));
586 }
587 }
588
589 Ok(bytes_read)
590 }
591}
592
593#[cfg(test)]
594mod tests {
595 use super::*;
596 use crate::generator::{GeneratorChaCha12, GeneratorChaCha20, GeneratorChaCha8, GeneratorCrc};
597 use std::fs::OpenOptions;
598 use std::io::{Seek, SeekFrom, Write};
599 use std::path::PathBuf;
600 use tempfile::tempdir;
601
602 fn run_test(algorithm: DtStreamType, base_size: usize, chunk_factor: usize) {
603 let tdir = tempdir().unwrap();
604 let tdir_path = tdir.path();
605 let mut serial = 0;
606
607 let seed = vec![42, 43, 44, 45];
608 let nr_threads = 2;
609 let mut dt = Disktest::new(
610 algorithm,
611 &seed,
612 0,
613 false,
614 nr_threads,
615 DisktestQuiet::Normal,
616 None,
617 );
618
619 let mk_filepath = |num| {
620 let mut path = PathBuf::from(tdir_path);
621 path.push(format!("tmp-{}.img", num));
622 path
623 };
624
625 let mk_file = |num, create| {
626 let path = mk_filepath(num);
627 let io = RawIo::new(&path, create, true, true).unwrap();
628 DisktestFile {
629 path,
630 read: true,
631 write: true,
632 io: Some(io),
633 drop_offset: 0,
634 drop_count: 0,
635 quiet_level: DisktestQuiet::Normal,
636 }
637 };
638
639 {
641 let nr_bytes = 1000;
642 assert_eq!(
643 dt.write(mk_file(serial, true), 0, nr_bytes).unwrap(),
644 nr_bytes
645 );
646 assert_eq!(
647 dt.verify(mk_file(serial, false), 0, u64::MAX).unwrap(),
648 nr_bytes
649 );
650 serial += 1;
651 }
652
653 {
655 let nr_bytes = 1000;
656 assert_eq!(
657 dt.write(mk_file(serial, true), 0, nr_bytes).unwrap(),
658 nr_bytes
659 );
660 assert_eq!(
661 dt.verify(mk_file(serial, false), 0, nr_bytes / 2).unwrap(),
662 nr_bytes / 2
663 );
664 serial += 1;
665 }
666
667 {
669 let nr_bytes = (base_size * chunk_factor * nr_threads * 2 + 100) as u64;
670 assert_eq!(
671 dt.write(mk_file(serial, true), 0, nr_bytes).unwrap(),
672 nr_bytes
673 );
674 assert_eq!(
675 dt.verify(mk_file(serial, false), 0, u64::MAX).unwrap(),
676 nr_bytes
677 );
678 serial += 1;
679 }
680
681 {
683 let nr_bytes = 1000;
684 {
685 let mut f = mk_file(serial, true);
686 f.io.as_mut().unwrap().set_len(100).unwrap();
687 f.io.as_mut().unwrap().seek(10).unwrap();
688 assert_eq!(dt.write(f, 0, nr_bytes).unwrap(), nr_bytes);
689 }
690 assert_eq!(
691 dt.verify(mk_file(serial, false), 0, u64::MAX).unwrap(),
692 nr_bytes
693 );
694 serial += 1;
695 }
696
697 {
699 let nr_bytes = 1000;
700 assert_eq!(
701 dt.write(mk_file(serial, true), 0, nr_bytes).unwrap(),
702 nr_bytes
703 );
704 {
705 let path = mk_filepath(serial);
706 let mut file = OpenOptions::new()
707 .read(true)
708 .write(true)
709 .open(path)
710 .unwrap();
711 file.seek(SeekFrom::Start(10)).unwrap();
712 writeln!(&file, "X").unwrap();
713 }
714 match dt.verify(mk_file(serial, false), 0, nr_bytes) {
715 Ok(_) => panic!("Verify of modified data did not fail!"),
716 Err(e) => assert_eq!(e.to_string(), "Data MISMATCH at byte 10!"),
717 }
718 serial += 1;
719 }
720
721 {
723 let nr_bytes = (base_size * chunk_factor * nr_threads * 10) as u64;
724 assert_eq!(
725 dt.write(mk_file(serial, true), 0, nr_bytes).unwrap(),
726 nr_bytes
727 );
728 for offset in (0..nr_bytes).step_by(base_size * chunk_factor / 2) {
729 let bytes_verified = dt.verify(mk_file(serial, false), offset, u64::MAX).unwrap();
730 assert!(bytes_verified > 0 && bytes_verified <= nr_bytes);
731 }
732 serial += 1;
733 }
734
735 {
737 let nr_bytes = (base_size * chunk_factor * nr_threads * 10) as u64;
738 assert_eq!(
739 dt.write(mk_file(serial, true), 0, nr_bytes).unwrap(),
740 nr_bytes
741 );
742 let offset = (base_size * chunk_factor * nr_threads * 2) as u64;
743 assert_eq!(
744 dt.write(mk_file(serial, false), offset, nr_bytes).unwrap(),
745 nr_bytes
746 );
747 assert_eq!(
748 dt.verify(mk_file(serial, false), 0, u64::MAX).unwrap(),
749 nr_bytes + offset
750 );
751 }
753
754 tdir.close().unwrap();
755 }
756
757 #[test]
758 fn test_chacha8() {
759 run_test(
760 DtStreamType::ChaCha8,
761 GeneratorChaCha8::BASE_SIZE,
762 GeneratorChaCha8::DEFAULT_CHUNK_FACTOR,
763 );
764 }
765
766 #[test]
767 fn test_chacha12() {
768 run_test(
769 DtStreamType::ChaCha12,
770 GeneratorChaCha12::BASE_SIZE,
771 GeneratorChaCha12::DEFAULT_CHUNK_FACTOR,
772 );
773 }
774
775 #[test]
776 fn test_chacha20() {
777 run_test(
778 DtStreamType::ChaCha20,
779 GeneratorChaCha20::BASE_SIZE,
780 GeneratorChaCha20::DEFAULT_CHUNK_FACTOR,
781 );
782 }
783
784 #[test]
785 fn test_crc() {
786 run_test(
787 DtStreamType::Crc,
788 GeneratorCrc::BASE_SIZE,
789 GeneratorCrc::DEFAULT_CHUNK_FACTOR,
790 );
791 }
792}
793
794