use crate::stream_aggregator::{DtStreamAgg, DtStreamAggChunk};
use crate::util::{prettybytes, Hhmmss};
use anyhow as ah;
use chrono::prelude::*;
use disktest_rawio::{RawIo, RawIoResult, DEFAULT_SECTOR_SIZE};
use movavg::MovAvg;
use std::cmp::min;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::available_parallelism;
use std::time::Instant;
pub use crate::stream_aggregator::DtStreamType;
const LOG_BYTE_THRES: u64 = 1024 * 1024;
const LOG_SEC_THRES: u64 = 10;
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub enum DisktestQuiet {
Normal = 0,
Reduced = 1,
NoInfo = 2,
NoWarn = 3,
}
pub struct DisktestFile {
path: PathBuf,
read: bool,
write: bool,
io: Option<RawIo>,
drop_offset: u64,
drop_count: u64,
quiet_level: DisktestQuiet,
}
impl DisktestFile {
pub fn open(path: &Path, read: bool, write: bool) -> ah::Result<DisktestFile> {
Ok(DisktestFile {
path: path.to_path_buf(),
read,
write,
io: None,
drop_offset: 0,
drop_count: 0,
quiet_level: DisktestQuiet::Normal,
})
}
fn do_open(&mut self) -> ah::Result<()> {
if self.io.is_none() {
self.io = Some(RawIo::new(&self.path, self.write, self.read, self.write)?);
self.drop_offset = 0;
self.drop_count = 0;
}
Ok(())
}
fn close(&mut self) -> ah::Result<()> {
let drop_offset = self.drop_offset;
let drop_count = self.drop_count;
self.drop_offset += drop_count;
self.drop_count = 0;
if let Some(mut io) = self.io.take() {
if drop_count > 0 {
if let Err(e) = io.drop_file_caches(drop_offset, drop_count) {
return Err(ah::format_err!("Cache drop error: {}", e));
}
} else {
io.close()?;
}
}
Ok(())
}
fn get_sector_size(&mut self) -> ah::Result<Option<u32>> {
self.do_open()?;
let io = self.io.as_ref().expect("get_sector_size: No file.");
Ok(io.get_sector_size())
}
fn seek(&mut self, offset: u64) -> ah::Result<u64> {
if self.drop_count > 0 {
self.close()?;
}
self.do_open()?;
match self.seek_noflush(offset) {
Ok(x) => {
self.drop_offset = offset;
self.drop_count = 0;
Ok(x)
}
other => other,
}
}
fn seek_noflush(&mut self, offset: u64) -> ah::Result<u64> {
self.do_open()?;
let io = self.io.as_mut().expect("seek: No file.");
io.seek(offset)
}
fn sync(&mut self) -> ah::Result<()> {
if let Some(io) = self.io.as_mut() {
io.sync()
} else {
Ok(())
}
}
fn read(&mut self, buffer: &mut [u8]) -> ah::Result<RawIoResult> {
self.do_open()?;
let io = self.io.as_mut().expect("read: No file.");
io.read(buffer)
}
fn write(&mut self, buffer: &[u8]) -> ah::Result<RawIoResult> {
self.do_open()?;
let io = self.io.as_mut().expect("write: No file.");
match io.write(buffer) {
Ok(res) => {
self.drop_count += buffer.len() as u64;
Ok(res)
}
Err(e) => Err(e),
}
}
fn get_path(&self) -> &PathBuf {
&self.path
}
}
impl Drop for DisktestFile {
fn drop(&mut self) {
if self.io.is_some() {
if self.quiet_level < DisktestQuiet::NoWarn {
eprintln!("WARNING: File not closed. Closing now...");
}
if let Err(e) = self.close() {
panic!("Failed to drop operating system caches: {}", e);
}
}
}
}
pub struct Disktest {
stream_agg: DtStreamAgg,
abort: Option<Arc<AtomicBool>>,
log_count: u64,
log_time: Instant,
rate_count: u64,
rate_count_start_time: Instant,
rate_avg: MovAvg<u64, u64, 5>,
begin_time: Instant,
quiet_level: DisktestQuiet,
}
impl Disktest {
pub const UNLIMITED: u64 = u64::MAX;
pub fn new(
algorithm: DtStreamType,
seed: &[u8],
round_id: u64,
invert_pattern: bool,
nr_threads: usize,
quiet_level: DisktestQuiet,
abort: Option<Arc<AtomicBool>>,
) -> Disktest {
let nr_threads = if nr_threads == 0 {
if let Ok(cpus) = available_parallelism() {
cpus.get()
} else {
1
}
} else {
nr_threads
};
let now = Instant::now();
Disktest {
stream_agg: DtStreamAgg::new(
algorithm,
seed,
round_id,
invert_pattern,
nr_threads,
quiet_level,
),
abort,
log_count: 0,
log_time: now,
rate_count: 0,
rate_count_start_time: now,
rate_avg: MovAvg::new(),
begin_time: now,
quiet_level,
}
}
fn abort_requested(&self) -> bool {
if let Some(abort) = &self.abort {
abort.load(Ordering::Relaxed)
} else {
false
}
}
fn log_reset(&mut self) {
let now = Instant::now();
self.log_count = 0;
self.log_time = now;
self.rate_count = 0;
self.rate_count_start_time = now;
self.rate_avg.reset();
self.begin_time = now;
}
fn log(&mut self, prefix: &str, inc_processed: usize, abs_processed: u64, final_step: bool) {
if self.quiet_level < DisktestQuiet::NoInfo {
self.log_count += inc_processed as u64;
self.rate_count += inc_processed as u64;
if (self.log_count >= LOG_BYTE_THRES && self.quiet_level == DisktestQuiet::Normal)
|| final_step
{
let now = Instant::now();
let expired = now.duration_since(self.log_time).as_secs() >= LOG_SEC_THRES;
if (expired && self.quiet_level == DisktestQuiet::Normal) || final_step {
let tod = Local::now().format("%R");
let dur_elapsed = now - self.begin_time;
let rate = if final_step {
let elapsed_ms = dur_elapsed.as_millis();
if elapsed_ms > 0 {
Some(((abs_processed as u128 * 1000) / elapsed_ms) as u64)
} else {
None
}
} else {
let rate_period_ms = (now - self.rate_count_start_time).as_millis();
if rate_period_ms > 0 {
let rate = ((self.rate_count as u128 * 1000) / rate_period_ms) as u64;
Some(self.rate_avg.feed(rate))
} else {
None
}
};
let rate_string = if let Some(rate) = rate {
format!(" @ {}/s", prettybytes(rate, true, false, false))
} else {
"".to_string()
};
let suffix = if final_step { "." } else { " ..." };
println!(
"[{} / {}] {}{}{}{}",
tod,
dur_elapsed.hhmmss(),
prefix,
prettybytes(abs_processed, true, true, final_step),
rate_string,
suffix
);
self.log_time = now;
self.rate_count_start_time = now;
self.rate_count = 0;
}
self.log_count = 0;
}
}
}
fn init(
&mut self,
file: &mut DisktestFile,
prefix: &str,
seek: u64,
max_bytes: u64,
) -> ah::Result<u64> {
file.quiet_level = self.quiet_level;
self.log_reset();
let sector_size = file.get_sector_size().unwrap_or(None);
if self.quiet_level < DisktestQuiet::NoInfo {
let sector_str = if let Some(sector_size) = sector_size.as_ref() {
format!(
" ({} sectors)",
prettybytes(*sector_size as _, true, false, false),
)
} else {
"".to_string()
};
println!(
"{} {}{}, starting at position {}...",
prefix,
file.get_path().display(),
sector_str,
prettybytes(seek, true, true, false)
);
}
let res = self
.stream_agg
.activate(seek, sector_size.unwrap_or(DEFAULT_SECTOR_SIZE))?;
if let Err(e) = file.seek(res.byte_offset) {
return Err(ah::format_err!(
"File seek to {} failed: {}",
seek,
e.to_string()
));
}
if let Some(sector_size) = sector_size.as_ref() {
if max_bytes < u64::MAX
&& max_bytes % *sector_size as u64 != 0
&& self.quiet_level < DisktestQuiet::NoWarn
{
#[cfg(target_os = "windows")]
eprintln!("WARNING: The desired byte count of {} is not a multiple of the sector size {}. \
This might result in a write or read error at the very end.",
prettybytes(max_bytes, true, true, true),
prettybytes(*sector_size as u64, true, true, true));
}
}
Ok(res.chunk_size)
}
fn write_finalize(
&mut self,
file: &mut DisktestFile,
success: bool,
bytes_written: u64,
) -> ah::Result<()> {
if self.quiet_level < DisktestQuiet::NoInfo {
println!("Writing stopped. Syncing...");
}
if let Err(e) = file.sync() {
return Err(ah::format_err!("Sync failed: {}", e));
}
self.log(
if success { "Done. Wrote " } else { "Wrote " },
0,
bytes_written,
true,
);
if let Err(e) = file.close() {
return Err(ah::format_err!(
"Failed to drop operating system caches: {}",
e
));
}
if success && self.quiet_level < DisktestQuiet::NoInfo {
println!("Successfully dropped file caches.");
}
Ok(())
}
pub fn write(&mut self, file: DisktestFile, seek: u64, max_bytes: u64) -> ah::Result<u64> {
let mut file = file;
let mut bytes_left = max_bytes;
let mut bytes_written = 0u64;
let write_chunk_size = self.init(&mut file, "Writing", seek, max_bytes)?;
loop {
let chunk = self.stream_agg.wait_chunk()?;
let write_len = min(write_chunk_size, bytes_left) as usize;
match file.write(&chunk.get_data()[0..write_len]) {
Ok(RawIoResult::Ok(_)) => (),
Ok(RawIoResult::Enospc) => {
if max_bytes == Disktest::UNLIMITED {
self.write_finalize(&mut file, true, bytes_written)?;
break; }
let _ = self.write_finalize(&mut file, false, bytes_written);
return Err(ah::format_err!("Write error: Out of disk space."));
}
Err(e) => {
let _ = self.write_finalize(&mut file, false, bytes_written);
return Err(e);
}
}
bytes_written += write_len as u64;
bytes_left -= write_len as u64;
if bytes_left == 0 {
self.write_finalize(&mut file, true, bytes_written)?;
break;
}
self.log("Wrote ", write_len, bytes_written, false);
if self.abort_requested() {
let _ = self.write_finalize(&mut file, false, bytes_written);
return Err(ah::format_err!("Aborted by signal!"));
}
}
Ok(bytes_written)
}
fn verify_finalize(
&mut self,
file: &mut DisktestFile,
success: bool,
bytes_read: u64,
) -> ah::Result<()> {
self.log(
if success {
"Done. Verified "
} else {
"Verified "
},
0,
bytes_read,
true,
);
if let Err(e) = file.close() {
return Err(ah::format_err!("Failed to close device: {}", e));
}
Ok(())
}
fn verify_failed(
&mut self,
file: &mut DisktestFile,
read_count: usize,
bytes_read: u64,
buffer: &[u8],
chunk: &DtStreamAggChunk,
) -> ah::Error {
if let Err(e) = self.verify_finalize(file, false, bytes_read) {
if self.quiet_level < DisktestQuiet::NoWarn {
eprintln!("{}", e);
}
}
for (i, buffer_byte) in buffer.iter().enumerate().take(read_count) {
if *buffer_byte != chunk.get_data()[i] {
let pos = bytes_read + i as u64;
if pos >= 1024 {
return ah::format_err!(
"Data MISMATCH at {}!",
prettybytes(pos, true, true, true)
);
} else {
return ah::format_err!("Data MISMATCH at byte {}!", pos);
}
}
}
panic!("Internal error: verify_failed() no mismatch.");
}
pub fn verify(&mut self, file: DisktestFile, seek: u64, max_bytes: u64) -> ah::Result<u64> {
let mut file = file;
let mut bytes_left = max_bytes;
let mut bytes_read = 0u64;
let readbuf_len = self.init(&mut file, "Verifying", seek, max_bytes)? as usize;
let mut buffer = vec![0; readbuf_len];
let mut read_count = 0;
let mut read_len = min(readbuf_len as u64, bytes_left) as usize;
loop {
match file.read(&mut buffer[read_count..read_count + (read_len - read_count)]) {
Ok(RawIoResult::Ok(n)) => {
read_count += n;
assert!(read_count <= read_len);
if read_count == read_len || (read_count > 0 && n == 0) {
let chunk = self.stream_agg.wait_chunk()?;
if buffer[..read_count] != chunk.get_data()[..read_count] {
return Err(self.verify_failed(
&mut file, read_count, bytes_read, &buffer, &chunk,
));
}
bytes_read += read_count as u64;
bytes_left -= read_count as u64;
if bytes_left == 0 {
self.verify_finalize(&mut file, true, bytes_read)?;
break;
}
self.log("Verified ", read_count, bytes_read, false);
read_count = 0;
read_len = min(readbuf_len as u64, bytes_left) as usize;
}
if n == 0 {
self.verify_finalize(&mut file, true, bytes_read)?;
break;
}
}
Ok(_) => unreachable!(),
Err(e) => {
let _ = self.verify_finalize(&mut file, false, bytes_read);
return Err(ah::format_err!(
"Read error at {}: {}",
prettybytes(bytes_read, true, true, true),
e
));
}
};
if self.abort_requested() {
let _ = self.verify_finalize(&mut file, false, bytes_read);
return Err(ah::format_err!("Aborted by signal!"));
}
}
Ok(bytes_read)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::generator::{GeneratorChaCha12, GeneratorChaCha20, GeneratorChaCha8, GeneratorCrc};
use std::fs::OpenOptions;
use std::io::{Seek, SeekFrom, Write};
use std::path::PathBuf;
use tempfile::tempdir;
fn run_test(algorithm: DtStreamType, base_size: usize, chunk_factor: usize) {
let tdir = tempdir().unwrap();
let tdir_path = tdir.path();
let mut serial = 0;
let seed = vec![42, 43, 44, 45];
let nr_threads = 2;
let mut dt = Disktest::new(
algorithm,
&seed,
0,
false,
nr_threads,
DisktestQuiet::Normal,
None,
);
let mk_filepath = |num| {
let mut path = PathBuf::from(tdir_path);
path.push(format!("tmp-{}.img", num));
path
};
let mk_file = |num, create| {
let path = mk_filepath(num);
let io = RawIo::new(&path, create, true, true).unwrap();
DisktestFile {
path,
read: true,
write: true,
io: Some(io),
drop_offset: 0,
drop_count: 0,
quiet_level: DisktestQuiet::Normal,
}
};
{
let nr_bytes = 1000;
assert_eq!(
dt.write(mk_file(serial, true), 0, nr_bytes).unwrap(),
nr_bytes
);
assert_eq!(
dt.verify(mk_file(serial, false), 0, u64::MAX).unwrap(),
nr_bytes
);
serial += 1;
}
{
let nr_bytes = 1000;
assert_eq!(
dt.write(mk_file(serial, true), 0, nr_bytes).unwrap(),
nr_bytes
);
assert_eq!(
dt.verify(mk_file(serial, false), 0, nr_bytes / 2).unwrap(),
nr_bytes / 2
);
serial += 1;
}
{
let nr_bytes = (base_size * chunk_factor * nr_threads * 2 + 100) as u64;
assert_eq!(
dt.write(mk_file(serial, true), 0, nr_bytes).unwrap(),
nr_bytes
);
assert_eq!(
dt.verify(mk_file(serial, false), 0, u64::MAX).unwrap(),
nr_bytes
);
serial += 1;
}
{
let nr_bytes = 1000;
{
let mut f = mk_file(serial, true);
f.io.as_mut().unwrap().set_len(100).unwrap();
f.io.as_mut().unwrap().seek(10).unwrap();
assert_eq!(dt.write(f, 0, nr_bytes).unwrap(), nr_bytes);
}
assert_eq!(
dt.verify(mk_file(serial, false), 0, u64::MAX).unwrap(),
nr_bytes
);
serial += 1;
}
{
let nr_bytes = 1000;
assert_eq!(
dt.write(mk_file(serial, true), 0, nr_bytes).unwrap(),
nr_bytes
);
{
let path = mk_filepath(serial);
let mut file = OpenOptions::new()
.read(true)
.write(true)
.open(path)
.unwrap();
file.seek(SeekFrom::Start(10)).unwrap();
writeln!(&file, "X").unwrap();
}
match dt.verify(mk_file(serial, false), 0, nr_bytes) {
Ok(_) => panic!("Verify of modified data did not fail!"),
Err(e) => assert_eq!(e.to_string(), "Data MISMATCH at byte 10!"),
}
serial += 1;
}
{
let nr_bytes = (base_size * chunk_factor * nr_threads * 10) as u64;
assert_eq!(
dt.write(mk_file(serial, true), 0, nr_bytes).unwrap(),
nr_bytes
);
for offset in (0..nr_bytes).step_by(base_size * chunk_factor / 2) {
let bytes_verified = dt.verify(mk_file(serial, false), offset, u64::MAX).unwrap();
assert!(bytes_verified > 0 && bytes_verified <= nr_bytes);
}
serial += 1;
}
{
let nr_bytes = (base_size * chunk_factor * nr_threads * 10) as u64;
assert_eq!(
dt.write(mk_file(serial, true), 0, nr_bytes).unwrap(),
nr_bytes
);
let offset = (base_size * chunk_factor * nr_threads * 2) as u64;
assert_eq!(
dt.write(mk_file(serial, false), offset, nr_bytes).unwrap(),
nr_bytes
);
assert_eq!(
dt.verify(mk_file(serial, false), 0, u64::MAX).unwrap(),
nr_bytes + offset
);
}
tdir.close().unwrap();
}
#[test]
fn test_chacha8() {
run_test(
DtStreamType::ChaCha8,
GeneratorChaCha8::BASE_SIZE,
GeneratorChaCha8::DEFAULT_CHUNK_FACTOR,
);
}
#[test]
fn test_chacha12() {
run_test(
DtStreamType::ChaCha12,
GeneratorChaCha12::BASE_SIZE,
GeneratorChaCha12::DEFAULT_CHUNK_FACTOR,
);
}
#[test]
fn test_chacha20() {
run_test(
DtStreamType::ChaCha20,
GeneratorChaCha20::BASE_SIZE,
GeneratorChaCha20::DEFAULT_CHUNK_FACTOR,
);
}
#[test]
fn test_crc() {
run_test(
DtStreamType::Crc,
GeneratorCrc::BASE_SIZE,
GeneratorCrc::DEFAULT_CHUNK_FACTOR,
);
}
}