1use crate::stream_aggregator::{DtStreamAgg, DtStreamAggChunk};
13use crate::util::{prettybytes, Hhmmss};
14use anyhow as ah;
15use chrono::prelude::*;
16use disktest_rawio::{RawIo, RawIoResult, DEFAULT_SECTOR_SIZE};
17use movavg::MovAvg;
18use std::cmp::min;
19use std::path::{Path, PathBuf};
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::Arc;
22use std::thread::available_parallelism;
23use std::time::Instant;
24
25pub use crate::stream_aggregator::DtStreamType;
26
27const LOG_BYTE_THRES: u64 = 1024 * 1024;
28const LOG_SEC_THRES: u64 = 10;
29
30#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
32pub enum DisktestQuiet {
33 Normal = 0,
35 Reduced = 1,
37 NoInfo = 2,
39 NoWarn = 3,
41}
42
43pub struct DisktestFile {
45 path: PathBuf,
46 read: bool,
47 write: bool,
48 io: Option<RawIo>,
49 drop_offset: u64,
50 drop_count: u64,
51 quiet_level: DisktestQuiet,
52}
53
54impl DisktestFile {
55 pub fn open(path: &Path, read: bool, write: bool) -> ah::Result<DisktestFile> {
57 Ok(DisktestFile {
58 path: path.to_path_buf(),
59 read,
60 write,
61 io: None,
62 drop_offset: 0,
63 drop_count: 0,
64 quiet_level: DisktestQuiet::Normal,
65 })
66 }
67
68 fn do_open(&mut self) -> ah::Result<()> {
69 if self.io.is_none() {
70 self.io = Some(RawIo::new(&self.path, self.write, self.read, self.write)?);
71 self.drop_offset = 0;
72 self.drop_count = 0;
73 }
74 Ok(())
75 }
76
77 fn close(&mut self) -> ah::Result<()> {
79 let drop_offset = self.drop_offset;
80 let drop_count = self.drop_count;
81
82 self.drop_offset += drop_count;
83 self.drop_count = 0;
84
85 if let Some(mut io) = self.io.take() {
87 if drop_count > 0 {
89 if let Err(e) = io.drop_file_caches(drop_offset, drop_count) {
90 return Err(ah::format_err!("Cache drop error: {}", e));
91 }
92 } else {
93 io.close()?;
94 }
95 }
96 Ok(())
97 }
98
99 fn get_sector_size(&mut self) -> ah::Result<Option<u32>> {
101 self.do_open()?;
102 let io = self.io.as_ref().expect("get_sector_size: No file.");
103 Ok(io.get_sector_size())
104 }
105
106 fn seek(&mut self, offset: u64) -> ah::Result<u64> {
108 if self.drop_count > 0 {
109 self.close()?;
110 }
111 self.do_open()?;
112 match self.seek_noflush(offset) {
113 Ok(x) => {
114 self.drop_offset = offset;
115 self.drop_count = 0;
116 Ok(x)
117 }
118 other => other,
119 }
120 }
121
122 fn seek_noflush(&mut self, offset: u64) -> ah::Result<u64> {
124 self.do_open()?;
125 let io = self.io.as_mut().expect("seek: No file.");
126 io.seek(offset)
127 }
128
129 fn sync(&mut self) -> ah::Result<()> {
131 if let Some(io) = self.io.as_mut() {
132 io.sync()
133 } else {
134 Ok(())
135 }
136 }
137
138 fn read(&mut self, buffer: &mut [u8]) -> ah::Result<RawIoResult> {
140 self.do_open()?;
141 let io = self.io.as_mut().expect("read: No file.");
142 io.read(buffer)
143 }
144
145 fn write(&mut self, buffer: &[u8]) -> ah::Result<RawIoResult> {
147 self.do_open()?;
148 let io = self.io.as_mut().expect("write: No file.");
149 match io.write(buffer) {
150 Ok(res) => {
151 self.drop_count += buffer.len() as u64;
152 Ok(res)
153 }
154 Err(e) => Err(e),
155 }
156 }
157
158 fn get_path(&self) -> &PathBuf {
160 &self.path
161 }
162}
163
164impl Drop for DisktestFile {
165 fn drop(&mut self) {
166 if self.io.is_some() {
167 if self.quiet_level < DisktestQuiet::NoWarn {
168 eprintln!("WARNING: File not closed. Closing now...");
169 }
170 if let Err(e) = self.close() {
171 panic!("Failed to drop operating system caches: {}", e);
172 }
173 }
174 }
175}
176
177pub struct Disktest {
179 stream_agg: DtStreamAgg,
180 abort: Option<Arc<AtomicBool>>,
181 log_count: u64,
182 log_time: Instant,
183 rate_count: u64,
184 rate_count_start_time: Instant,
185 rate_avg: MovAvg<u64, u64, 5>,
186 begin_time: Instant,
187 quiet_level: DisktestQuiet,
188}
189
190impl Disktest {
191 pub const UNLIMITED: u64 = u64::MAX;
193
194 pub fn new(
214 algorithm: DtStreamType,
215 seed: &[u8],
216 round_id: u64,
217 invert_pattern: bool,
218 nr_threads: usize,
219 quiet_level: DisktestQuiet,
220 abort: Option<Arc<AtomicBool>>,
221 ) -> Disktest {
222 let nr_threads = if nr_threads == 0 {
223 if let Ok(cpus) = available_parallelism() {
224 cpus.get()
225 } else {
226 1
227 }
228 } else {
229 nr_threads
230 };
231
232 let now = Instant::now();
233 Disktest {
234 stream_agg: DtStreamAgg::new(
235 algorithm,
236 seed,
237 round_id,
238 invert_pattern,
239 nr_threads,
240 quiet_level,
241 ),
242 abort,
243 log_count: 0,
244 log_time: now,
245 rate_count: 0,
246 rate_count_start_time: now,
247 rate_avg: MovAvg::new(),
248 begin_time: now,
249 quiet_level,
250 }
251 }
252
253 fn abort_requested(&self) -> bool {
255 if let Some(abort) = &self.abort {
256 abort.load(Ordering::Relaxed)
257 } else {
258 false
259 }
260 }
261
262 fn log_reset(&mut self) {
264 let now = Instant::now();
265 self.log_count = 0;
266 self.log_time = now;
267 self.rate_count = 0;
268 self.rate_count_start_time = now;
269 self.rate_avg.reset();
270 self.begin_time = now;
271 }
272
273 fn log(&mut self, prefix: &str, inc_processed: usize, abs_processed: u64, final_step: bool) {
275 if self.quiet_level < DisktestQuiet::NoInfo {
277 self.log_count += inc_processed as u64;
281 self.rate_count += inc_processed as u64;
282 if (self.log_count >= LOG_BYTE_THRES && self.quiet_level == DisktestQuiet::Normal)
283 || final_step
284 {
285 let now = Instant::now();
287 let expired = now.duration_since(self.log_time).as_secs() >= LOG_SEC_THRES;
288
289 if (expired && self.quiet_level == DisktestQuiet::Normal) || final_step {
290 let tod = Local::now().format("%R");
291
292 let dur_elapsed = now - self.begin_time;
293
294 let rate = if final_step {
295 let elapsed_ms = dur_elapsed.as_millis();
296 if elapsed_ms > 0 {
297 Some(((abs_processed as u128 * 1000) / elapsed_ms) as u64)
298 } else {
299 None
300 }
301 } else {
302 let rate_period_ms = (now - self.rate_count_start_time).as_millis();
303 if rate_period_ms > 0 {
304 let rate = ((self.rate_count as u128 * 1000) / rate_period_ms) as u64;
305 Some(self.rate_avg.feed(rate))
306 } else {
307 None
308 }
309 };
310
311 let rate_string = if let Some(rate) = rate {
312 format!(" @ {}/s", prettybytes(rate, true, false, false))
313 } else {
314 "".to_string()
315 };
316
317 let suffix = if final_step { "." } else { " ..." };
318
319 println!(
320 "[{} / {}] {}{}{}{}",
321 tod,
322 dur_elapsed.hhmmss(),
323 prefix,
324 prettybytes(abs_processed, true, true, final_step),
325 rate_string,
326 suffix
327 );
328 self.log_time = now;
329 self.rate_count_start_time = now;
330 self.rate_count = 0;
331 }
332 self.log_count = 0;
333 }
334 }
335 }
336
337 fn init(
339 &mut self,
340 file: &mut DisktestFile,
341 prefix: &str,
342 seek: u64,
343 max_bytes: u64,
344 ) -> ah::Result<u64> {
345 file.quiet_level = self.quiet_level;
346 self.log_reset();
347
348 let sector_size = file.get_sector_size().unwrap_or(None);
349
350 if self.quiet_level < DisktestQuiet::NoInfo {
351 let sector_str = if let Some(sector_size) = sector_size.as_ref() {
352 format!(
353 " ({} sectors)",
354 prettybytes(*sector_size as _, true, false, false),
355 )
356 } else {
357 "".to_string()
358 };
359 println!(
360 "{} {}{}, starting at position {}...",
361 prefix,
362 file.get_path().display(),
363 sector_str,
364 prettybytes(seek, true, true, false)
365 );
366 }
367
368 let res = self
369 .stream_agg
370 .activate(seek, sector_size.unwrap_or(DEFAULT_SECTOR_SIZE))?;
371
372 if let Err(e) = file.seek(res.byte_offset) {
373 return Err(ah::format_err!(
374 "File seek to {} failed: {}",
375 seek,
376 e.to_string()
377 ));
378 }
379
380 if let Some(sector_size) = sector_size.as_ref() {
381 if max_bytes < u64::MAX
382 && max_bytes % *sector_size as u64 != 0
383 && self.quiet_level < DisktestQuiet::NoWarn
384 {
385 #[cfg(target_os = "windows")]
386 eprintln!("WARNING: The desired byte count of {} is not a multiple of the sector size {}. \
387 This might result in a write or read error at the very end.",
388 prettybytes(max_bytes, true, true, true),
389 prettybytes(*sector_size as u64, true, true, true));
390 }
391 }
392
393 Ok(res.chunk_size)
394 }
395
396 fn write_finalize(
398 &mut self,
399 file: &mut DisktestFile,
400 success: bool,
401 bytes_written: u64,
402 ) -> ah::Result<()> {
403 if self.quiet_level < DisktestQuiet::NoInfo {
404 println!("Writing stopped. Syncing...");
405 }
406 if let Err(e) = file.sync() {
407 return Err(ah::format_err!("Sync failed: {}", e));
408 }
409
410 self.log(
411 if success { "Done. Wrote " } else { "Wrote " },
412 0,
413 bytes_written,
414 true,
415 );
416
417 if let Err(e) = file.close() {
418 return Err(ah::format_err!(
419 "Failed to drop operating system caches: {}",
420 e
421 ));
422 }
423 if success && self.quiet_level < DisktestQuiet::NoInfo {
424 println!("Successfully dropped file caches.");
425 }
426
427 Ok(())
428 }
429
430 pub fn write(&mut self, file: DisktestFile, seek: u64, max_bytes: u64) -> ah::Result<u64> {
432 let mut file = file;
433 let mut bytes_left = max_bytes;
434 let mut bytes_written = 0u64;
435
436 let write_chunk_size = self.init(&mut file, "Writing", seek, max_bytes)?;
437 loop {
438 let chunk = self.stream_agg.wait_chunk()?;
440 let write_len = min(write_chunk_size, bytes_left) as usize;
441
442 match file.write(&chunk.get_data()[0..write_len]) {
444 Ok(RawIoResult::Ok(_)) => (),
445 Ok(RawIoResult::Enospc) => {
446 if max_bytes == Disktest::UNLIMITED {
447 self.write_finalize(&mut file, true, bytes_written)?;
448 break; }
450 let _ = self.write_finalize(&mut file, false, bytes_written);
451 return Err(ah::format_err!("Write error: Out of disk space."));
452 }
453 Err(e) => {
454 let _ = self.write_finalize(&mut file, false, bytes_written);
455 return Err(e);
456 }
457 }
458
459 bytes_written += write_len as u64;
461 bytes_left -= write_len as u64;
462 if bytes_left == 0 {
463 self.write_finalize(&mut file, true, bytes_written)?;
464 break;
465 }
466 self.log("Wrote ", write_len, bytes_written, false);
467
468 if self.abort_requested() {
469 let _ = self.write_finalize(&mut file, false, bytes_written);
470 return Err(ah::format_err!("Aborted by signal!"));
471 }
472 }
473
474 Ok(bytes_written)
475 }
476
477 fn verify_finalize(
479 &mut self,
480 file: &mut DisktestFile,
481 success: bool,
482 bytes_read: u64,
483 ) -> ah::Result<()> {
484 self.log(
485 if success {
486 "Done. Verified "
487 } else {
488 "Verified "
489 },
490 0,
491 bytes_read,
492 true,
493 );
494 if let Err(e) = file.close() {
495 return Err(ah::format_err!("Failed to close device: {}", e));
496 }
497 Ok(())
498 }
499
500 fn verify_failed(
502 &mut self,
503 file: &mut DisktestFile,
504 read_count: usize,
505 bytes_read: u64,
506 buffer: &[u8],
507 chunk: &DtStreamAggChunk,
508 ) -> ah::Error {
509 if let Err(e) = self.verify_finalize(file, false, bytes_read) {
510 if self.quiet_level < DisktestQuiet::NoWarn {
511 eprintln!("{}", e);
512 }
513 }
514 for (i, buffer_byte) in buffer.iter().enumerate().take(read_count) {
515 if *buffer_byte != chunk.get_data()[i] {
516 let pos = bytes_read + i as u64;
517 if pos >= 1024 {
518 return ah::format_err!(
519 "Data MISMATCH at {}!",
520 prettybytes(pos, true, true, true)
521 );
522 } else {
523 return ah::format_err!("Data MISMATCH at byte {}!", pos);
524 }
525 }
526 }
527 panic!("Internal error: verify_failed() no mismatch.");
528 }
529
530 pub fn verify(&mut self, file: DisktestFile, seek: u64, max_bytes: u64) -> ah::Result<u64> {
532 let mut file = file;
533 let mut bytes_left = max_bytes;
534 let mut bytes_read = 0u64;
535
536 let readbuf_len = self.init(&mut file, "Verifying", seek, max_bytes)? as usize;
537 let mut buffer = vec![0; readbuf_len];
538 let mut read_count = 0;
539 let mut read_len = min(readbuf_len as u64, bytes_left) as usize;
540
541 loop {
542 match file.read(&mut buffer[read_count..read_count + (read_len - read_count)]) {
544 Ok(RawIoResult::Ok(n)) => {
545 read_count += n;
546
547 assert!(read_count <= read_len);
549 if read_count == read_len || (read_count > 0 && n == 0) {
550 let chunk = self.stream_agg.wait_chunk()?;
552 if buffer[..read_count] != chunk.get_data()[..read_count] {
553 return Err(self.verify_failed(
554 &mut file, read_count, bytes_read, &buffer, &chunk,
555 ));
556 }
557
558 bytes_read += read_count as u64;
560 bytes_left -= read_count as u64;
561 if bytes_left == 0 {
562 self.verify_finalize(&mut file, true, bytes_read)?;
563 break;
564 }
565 self.log("Verified ", read_count, bytes_read, false);
566 read_count = 0;
567 read_len = min(readbuf_len as u64, bytes_left) as usize;
568 }
569
570 if n == 0 {
572 self.verify_finalize(&mut file, true, bytes_read)?;
573 break;
574 }
575 }
576 Ok(_) => unreachable!(),
577 Err(e) => {
578 let _ = self.verify_finalize(&mut file, false, bytes_read);
579 return Err(ah::format_err!(
580 "Read error at {}: {}",
581 prettybytes(bytes_read, true, true, true),
582 e
583 ));
584 }
585 };
586
587 if self.abort_requested() {
588 let _ = self.verify_finalize(&mut file, false, bytes_read);
589 return Err(ah::format_err!("Aborted by signal!"));
590 }
591 }
592
593 Ok(bytes_read)
594 }
595}
596
597#[cfg(test)]
598mod tests {
599 use super::*;
600 use crate::generator::{GeneratorChaCha12, GeneratorChaCha20, GeneratorChaCha8, GeneratorCrc};
601 use std::fs::OpenOptions;
602 use std::io::{Seek, SeekFrom, Write};
603 use std::path::PathBuf;
604 use tempfile::tempdir;
605
606 fn run_test(algorithm: DtStreamType, base_size: usize, chunk_factor: usize) {
607 let tdir = tempdir().unwrap();
608 let tdir_path = tdir.path();
609 let mut serial = 0;
610
611 let seed = vec![42, 43, 44, 45];
612 let nr_threads = 2;
613 let mut dt = Disktest::new(
614 algorithm,
615 &seed,
616 0,
617 false,
618 nr_threads,
619 DisktestQuiet::Normal,
620 None,
621 );
622
623 let mk_filepath = |num| {
624 let mut path = PathBuf::from(tdir_path);
625 path.push(format!("tmp-{}.img", num));
626 path
627 };
628
629 let mk_file = |num, create| {
630 let path = mk_filepath(num);
631 let io = RawIo::new(&path, create, true, true).unwrap();
632 DisktestFile {
633 path,
634 read: true,
635 write: true,
636 io: Some(io),
637 drop_offset: 0,
638 drop_count: 0,
639 quiet_level: DisktestQuiet::Normal,
640 }
641 };
642
643 {
645 let nr_bytes = 1000;
646 assert_eq!(
647 dt.write(mk_file(serial, true), 0, nr_bytes).unwrap(),
648 nr_bytes
649 );
650 assert_eq!(
651 dt.verify(mk_file(serial, false), 0, u64::MAX).unwrap(),
652 nr_bytes
653 );
654 serial += 1;
655 }
656
657 {
659 let nr_bytes = 1000;
660 assert_eq!(
661 dt.write(mk_file(serial, true), 0, nr_bytes).unwrap(),
662 nr_bytes
663 );
664 assert_eq!(
665 dt.verify(mk_file(serial, false), 0, nr_bytes / 2).unwrap(),
666 nr_bytes / 2
667 );
668 serial += 1;
669 }
670
671 {
673 let nr_bytes = (base_size * chunk_factor * nr_threads * 2 + 100) as u64;
674 assert_eq!(
675 dt.write(mk_file(serial, true), 0, nr_bytes).unwrap(),
676 nr_bytes
677 );
678 assert_eq!(
679 dt.verify(mk_file(serial, false), 0, u64::MAX).unwrap(),
680 nr_bytes
681 );
682 serial += 1;
683 }
684
685 {
687 let nr_bytes = 1000;
688 {
689 let mut f = mk_file(serial, true);
690 f.io.as_mut().unwrap().set_len(100).unwrap();
691 f.io.as_mut().unwrap().seek(10).unwrap();
692 assert_eq!(dt.write(f, 0, nr_bytes).unwrap(), nr_bytes);
693 }
694 assert_eq!(
695 dt.verify(mk_file(serial, false), 0, u64::MAX).unwrap(),
696 nr_bytes
697 );
698 serial += 1;
699 }
700
701 {
703 let nr_bytes = 1000;
704 assert_eq!(
705 dt.write(mk_file(serial, true), 0, nr_bytes).unwrap(),
706 nr_bytes
707 );
708 {
709 let path = mk_filepath(serial);
710 let mut file = OpenOptions::new()
711 .read(true)
712 .write(true)
713 .open(path)
714 .unwrap();
715 file.seek(SeekFrom::Start(10)).unwrap();
716 writeln!(&file, "X").unwrap();
717 }
718 match dt.verify(mk_file(serial, false), 0, nr_bytes) {
719 Ok(_) => panic!("Verify of modified data did not fail!"),
720 Err(e) => assert_eq!(e.to_string(), "Data MISMATCH at byte 10!"),
721 }
722 serial += 1;
723 }
724
725 {
727 let nr_bytes = (base_size * chunk_factor * nr_threads * 10) as u64;
728 assert_eq!(
729 dt.write(mk_file(serial, true), 0, nr_bytes).unwrap(),
730 nr_bytes
731 );
732 for offset in (0..nr_bytes).step_by(base_size * chunk_factor / 2) {
733 let bytes_verified = dt.verify(mk_file(serial, false), offset, u64::MAX).unwrap();
734 assert!(bytes_verified > 0 && bytes_verified <= nr_bytes);
735 }
736 serial += 1;
737 }
738
739 {
741 let nr_bytes = (base_size * chunk_factor * nr_threads * 10) as u64;
742 assert_eq!(
743 dt.write(mk_file(serial, true), 0, nr_bytes).unwrap(),
744 nr_bytes
745 );
746 let offset = (base_size * chunk_factor * nr_threads * 2) as u64;
747 assert_eq!(
748 dt.write(mk_file(serial, false), offset, nr_bytes).unwrap(),
749 nr_bytes
750 );
751 assert_eq!(
752 dt.verify(mk_file(serial, false), 0, u64::MAX).unwrap(),
753 nr_bytes + offset
754 );
755 }
757
758 tdir.close().unwrap();
759 }
760
761 #[test]
762 fn test_chacha8() {
763 run_test(
764 DtStreamType::ChaCha8,
765 GeneratorChaCha8::BASE_SIZE,
766 GeneratorChaCha8::DEFAULT_CHUNK_FACTOR,
767 );
768 }
769
770 #[test]
771 fn test_chacha12() {
772 run_test(
773 DtStreamType::ChaCha12,
774 GeneratorChaCha12::BASE_SIZE,
775 GeneratorChaCha12::DEFAULT_CHUNK_FACTOR,
776 );
777 }
778
779 #[test]
780 fn test_chacha20() {
781 run_test(
782 DtStreamType::ChaCha20,
783 GeneratorChaCha20::BASE_SIZE,
784 GeneratorChaCha20::DEFAULT_CHUNK_FACTOR,
785 );
786 }
787
788 #[test]
789 fn test_crc() {
790 run_test(
791 DtStreamType::Crc,
792 GeneratorCrc::BASE_SIZE,
793 GeneratorCrc::DEFAULT_CHUNK_FACTOR,
794 );
795 }
796}
797
798