use self::ConsumerState::*;
use producer::Producer;
use producer::ProducerState::*;
use internal::Err;
use std::io::SeekFrom;
#[derive(Debug,PartialEq,Eq,Clone,Copy)]
pub enum ConsumerState {
Await(
usize, usize ),
Seek(
usize, SeekFrom, usize ),
Incomplete,
ConsumerDone,
ConsumerError(Err)
}
pub trait Consumer {
fn consume(&mut self, input: &[u8]) -> ConsumerState;
fn end(&mut self);
fn run(&mut self, producer: &mut Producer) {
let mut acc: Vec<u8> = Vec::new();
let mut position = 0;
let mut should_seek = false;
let mut consumed:usize = 0;
let mut needed:usize = 0;
let mut seek_from:SeekFrom = SeekFrom::Current(0);
let mut eof = false;
let mut end = false;
loop {
if !should_seek && acc.len() - consumed >= needed {
let mut tmp = Vec::new();
tmp.push_all(&acc[consumed..acc.len()]);
acc.clear();
acc = tmp;
} else {
if should_seek {
let _ = producer.seek(seek_from);
should_seek = false;
acc.clear();
} else {
let mut tmp = Vec::new();
tmp.push_all(&acc[consumed..acc.len()]);
acc.clear();
acc = tmp;
}
loop {
let state = producer.produce();
match state {
Data(v) => {
acc.push_all(v);
position = position + v.len();
},
Eof(v) => {
if v.is_empty() {
self.end();
return
} else {
eof = true;
acc.push_all(v);
position = position + v.len();
break;
}
}
ProducerError(_) => {break;}
Continue => { continue; }
}
if acc.len() >= needed { break; }
}
}
match self.consume(&acc[0..needed]) {
ConsumerError(_) => {
},
ConsumerDone => {
end = true;
},
Seek(consumed_bytes, sf, needed_bytes) => {
seek_from = match sf {
SeekFrom::Current(i) => SeekFrom::Current(i - (acc.len() - needed) as i64),
a => a
};
should_seek = true;
consumed = consumed_bytes;
needed = needed_bytes;
},
Await(consumed_bytes, needed_bytes) => {
consumed = consumed_bytes;
needed = needed_bytes;
},
Incomplete => {
}
}
if eof {
self.end();
break;
}
if end {
self.end();
break;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use super::ConsumerState::*;
use producer::MemProducer;
use internal::{Needed,IResult};
use std::str;
#[macro_export]
macro_rules! take(
($name:ident $count:expr) => (
fn $name(i:&[u8]) -> IResult<&[u8], &[u8]>{
if i.len() < $count {
IResult::Incomplete(Needed::Size($count))
} else {
IResult::Done(&i[$count..],&i[0..$count])
}
}
)
);
struct TestPrintConsumer {
counter: usize,
ended: bool
}
impl TestPrintConsumer {
fn new() -> TestPrintConsumer {
TestPrintConsumer { counter: 0, ended: false }
}
}
take!(take4 4);
impl Consumer for TestPrintConsumer {
fn consume(&mut self, input: &[u8]) -> ConsumerState {
match take4(input) {
IResult::Error(a) => ConsumerError(a),
IResult::Incomplete(_) => Await(0, 4),
IResult::Done(_, o) => {
println!("{} -> {}", self.counter, str::from_utf8(o).unwrap());
self.counter = self.counter + 1;
if self.counter <= 4 {
Await(4, 4)
} else {
ConsumerDone
}
}
}
}
fn end(&mut self) {
assert_eq!(self.counter, 5);
self.ended = true;
}
}
#[test]
fn pull() {
let mut p = MemProducer::new(&b"abcdefghijklmnopqrstuvwx"[..], 4);
let mut c = TestPrintConsumer::new();
c.run(&mut p);
assert!(c.ended);
}
}