use disk_buffer::*;
use arguments::errors::{FileErr};
use super::InputIteratorErr;
use itoa;
use time;
use std::io::{self, Write, Read};
use std::path::{Path, PathBuf};
use std::str;
pub struct ETA {
pub left: u64,
pub time: u64,
pub average: u64,
}
impl ETA {
pub fn write_to_stderr(&self, completed: usize) {
let stderr = io::stderr();
let mut stderr = stderr.lock();
let _ = stderr.write(b"ETA: ");
let _ = itoa::write(&mut stderr, self.time / 1_000_000_000);
let _ = stderr.write(b"s Left: ");
let _ = itoa::write(&mut stderr, self.left);
let _ = stderr.write(b" AVG: ");
let _ = itoa::write(&mut stderr, self.average / 1_000_000_000);
let _ = stderr.write(b".");
let remainder = (self.average % 1_000_000_000) / 10_000_000;
let _ = itoa::write(&mut stderr, remainder);
if remainder < 10 { let _ = stderr.write(b"0"); }
let _ = stderr.write(b"s Completed: ");
let _ = itoa::write(&mut stderr, completed);
let _ = stderr.write(b"\n");
}
}
pub struct InputIterator<IO: Read> {
pub total_arguments: usize,
pub curr_argument: usize,
pub completed: usize,
start_time: u64,
average_time: u64,
input_buffer: InputBuffer<IO>,
}
impl<IO: Read> InputIterator<IO> {
pub fn new(path: &Path, file: IO, args: usize) -> Result<InputIterator<IO>, FileErr> {
let disk_buffer = DiskBufferReader::new(path, file);
let input_buffer = InputBuffer::new(disk_buffer)?;
Ok(InputIterator {
total_arguments: args,
curr_argument: 0,
completed: 0,
input_buffer: input_buffer,
start_time: time::precise_time_ns(),
average_time: 0,
})
}
fn buffer(&mut self) -> Result<(), InputIteratorErr> {
self.input_buffer.disk_buffer.buffer(self.input_buffer.capacity).map_err(|why| {
InputIteratorErr::FileRead(PathBuf::from(self.input_buffer.disk_buffer.path.clone()), why)
})?;
let bytes_read = self.input_buffer.disk_buffer.capacity;
self.input_buffer.start = self.input_buffer.end + 1;
count_arguments(&mut self.input_buffer, bytes_read);
self.input_buffer.index = 0;
Ok(())
}
pub fn eta(&self) -> ETA {
let left = self.total_arguments as u64 - self.completed as u64;
ETA {
left: left,
time: left * self.average_time,
average: self.average_time
}
}
pub fn next_value(&mut self, buffer: &mut String) -> Option<Result<(), InputIteratorErr>> {
if self.curr_argument == self.total_arguments {
return None
} else if self.curr_argument == self.input_buffer.end {
if let Err(err) = self.buffer() { return Some(Err(err)); }
}
let end = self.input_buffer.indices[self.input_buffer.index + 1];
let start = if self.input_buffer.index == 0 {
self.input_buffer.indices[self.input_buffer.index]
} else {
self.input_buffer.indices[self.input_buffer.index] + 1
};
match self.completed {
0 => (),
1 => self.average_time = time::precise_time_ns() - self.start_time,
_ => self.average_time = (time::precise_time_ns() - self.start_time) / self.completed as u64,
}
self.curr_argument += 1;
self.input_buffer.index += 1;
buffer.truncate(0);
unsafe { buffer.push_str(str::from_utf8_unchecked(&self.input_buffer.disk_buffer.data[start..end])); }
Some(Ok(()))
}
}
impl<IO: Read> Iterator for InputIterator<IO> {
type Item = Result<String, InputIteratorErr>;
fn next(&mut self) -> Option<Result<String, InputIteratorErr>> {
if self.curr_argument == self.total_arguments {
return None
} else if self.curr_argument == self.input_buffer.end {
if let Err(err) = self.buffer() { return Some(Err(err)); }
}
let end = self.input_buffer.indices[self.input_buffer.index + 1];
let start = if self.input_buffer.index == 0 {
self.input_buffer.indices[self.input_buffer.index]
} else {
self.input_buffer.indices[self.input_buffer.index] + 1
};
match self.completed {
0 => (),
1 => self.average_time = time::precise_time_ns() - self.start_time,
_ => self.average_time = (time::precise_time_ns() - self.start_time) / self.completed as u64,
}
self.curr_argument += 1;
self.input_buffer.index += 1;
Some(Ok(String::from_utf8_lossy(&self.input_buffer.disk_buffer.data[start..end]).into_owned()))
}
}
struct InputBuffer<IO: Read> {
index: usize,
start: usize,
end: usize,
capacity: usize,
disk_buffer: DiskBufferReader<IO>,
indices: [usize; BUFFER_SIZE / 2],
}
impl<IO: Read> InputBuffer<IO> {
fn new(mut unprocessed: DiskBufferReader<IO>) -> Result<InputBuffer<IO>, FileErr> {
unprocessed.buffer(0).map_err(|why| FileErr::Read(unprocessed.path.clone(), why))?;
let bytes_read = unprocessed.capacity;
let mut temp = InputBuffer {
index: 0,
start: 0,
end: 0,
capacity: 0,
disk_buffer: unprocessed,
indices: [0usize; BUFFER_SIZE / 2]
};
count_arguments(&mut temp, bytes_read);
Ok(temp)
}
}
fn count_arguments<IO: Read>(buffer: &mut InputBuffer<IO>, bytes_read: usize) {
let mut newlines = 1;
buffer.capacity = 0;
for (indice, _) in buffer.disk_buffer.data.iter().take(bytes_read).enumerate().filter(|&(_, byte)| *byte == b'\n') {
buffer.indices[newlines] = indice;
newlines += 1;
}
newlines -= 1;
buffer.capacity = buffer.indices[newlines];
buffer.end += newlines;
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs::File;
#[test]
fn test_input_iterator() {
let file = File::open("tests/buffer.dat").unwrap();
let iterator = InputIterator::new(Path::new("tests/buffer.dat"), file, 4096).unwrap();
assert_eq!(0, iterator.input_buffer.start);
assert_eq!(1859, iterator.input_buffer.end);
for (actual, expected) in iterator.zip((1..4096)) {
assert_eq!(actual.unwrap(), expected.to_string());
}
}
}