use byteorder::{ReadBytesExt, BigEndian, ByteOrder};
use std::io;
use std::io::prelude::*;
use std::collections::HashMap;
use std::str;
use flate2::read::ZlibDecoder;
use errors::{Error, Result};
use {Header, CompressionType, ByteString};
const MAGIC: &'static str = "SEQ";
const DEFAULT_CODEC: &'static str = "org.apache.hadoop.io.compress.DefaultCodec";
const SYNC_SIZE: usize = 16;
pub type Item = (ByteString, ByteString);
pub struct Reader<R: io::Read> {
header: Header,
reader: R,
block_buffer: Vec<Item>,
}
impl<R: io::Read> Reader<R> {
pub fn new(mut r: R) -> Reader<R> {
let header = read_header(&mut r).unwrap();
Reader {
header: header,
reader: r,
block_buffer: Vec::new(),
}
}
}
fn read_header<R: io::Read>(reader: &mut R) -> Result<Header> {
let mut magic = [0; 3];
try!(reader.read(&mut magic));
if magic != MAGIC.as_bytes() {
return Err(Error::BadMagic(String::from_utf8_lossy(&magic).to_string()));
}
let mut version = [0; 1];
try!(reader.read(&mut version));
let version = version[0] as u16;
if version < 5 {
return Err(Error::VersionNotSupported(version));
}
let key_class = read_string(reader).unwrap();
let value_class = read_string(reader).unwrap();
let mut flags = [0; 2];
try!(reader.read(&mut flags));
let compression_type: CompressionType = {
let compression = flags[0] as u8;
let block_compression = flags[1] as u8;
match (compression, block_compression) {
(1, 1) => CompressionType::Block,
(1, 0) => CompressionType::Value,
(0, 0) => CompressionType::None,
_ => return Err(Error::CompressionTypeUnknown("Undefined type".to_string())),
}
};
let compression_codec = if compression_type != CompressionType::None {
Some(try!(read_string(reader)))
} else {
None
};
let pairs = reader.read_u32::<BigEndian>().unwrap();
for _ in 0..pairs {
}
let mut sync_marker = [0; SYNC_SIZE];
try!(reader.read(&mut sync_marker));
Ok(Header {
version: version,
compression_type: compression_type,
compression_codec: compression_codec,
key_class: key_class,
value_class: value_class,
metadata: HashMap::new(), sync_marker: sync_marker.to_vec(),
})
}
impl<R: io::Read> Iterator for Reader<R> {
type Item = (ByteString, ByteString);
fn next(&mut self) -> Option<Item> {
let mut last_sync_marker = [0; SYNC_SIZE];
let mut kv_length = to_opt!(self.reader.read_i32::<BigEndian>()) as isize;
if kv_length == -1 {
to_opt!(self.reader.read(&mut last_sync_marker));
if last_sync_marker.to_vec() != self.header.sync_marker {
panic!(Error::SyncMarkerMismatch);
}
kv_length = to_opt!(self.reader.read_i32::<BigEndian>()) as isize;
if kv_length == -1 {
return None;
}
}
if self.header.compression_type == CompressionType::Block {
if self.block_buffer.len() == 0 {
unimplemented!();
} else {
return self.block_buffer.pop();
}
}
let (key, value) = to_opt!(read_kv(kv_length, &self.header, &mut self.reader));
Some((key, value))
}
}
fn read_kv<R: io::Read>(kv_length: isize,
header: &Header,
reader: &mut R)
-> Result<(Vec<u8>, Vec<u8>)> {
let k_length = reader.read_i32::<BigEndian>().unwrap() as usize;
let k_start = 0;
let k_end = k_start + (k_length - 0);
let v_start = k_end;
let v_end = v_start + (kv_length as usize - k_length);
let mut buffer = vec![0; kv_length as usize];
try!(reader.read(&mut buffer));
let key = buffer[k_start..k_end].to_vec();
if header.compression_type == CompressionType::Value {
if header.compression_codec == Some(DEFAULT_CODEC.to_string()) {
let mut decoder = ZlibDecoder::new(&buffer[v_start..v_end]);
let mut buf = Vec::new();
try!(decoder.read_to_end(&mut buf));
Ok((key, buf))
} else {
return Err(Error::CompressionTypeUnknown("x".to_string()));
}
} else {
let value = buffer[v_start..v_end].to_vec();
Ok((key, value))
}
}
fn read_string<R: io::Read>(reader: &mut R) -> Result<String> {
let value_length = reader.read_u8().unwrap() as usize;
let mut string = vec![0; value_length];
try!(reader.read(&mut string));
str::from_utf8(&string).map(|v| v.to_owned()).map_err(|e| Error::BadEncoding(e))
}