use page::{PAGE_PAYLOAD_SIZE, PAGE_SIZE};
use pagedfile::{PagedFile, PagedFileAppender};
use format::{Envelope, Payload, Data, IndexedData, Link};
use error::HammersbaldError;
use pref::PRef;
use byteorder::{ByteOrder, BigEndian};
use std::collections::VecDeque;
pub struct DataFile {
appender: PagedFileAppender
}
impl DataFile {
pub fn new(file: Box<PagedFile>) -> Result<DataFile, HammersbaldError> {
let len = file.len()?;
if len % PAGE_SIZE as u64 != 0 {
return Err(HammersbaldError::Corrupted("data file does not end at page boundary".to_string()));
}
if len >= PAGE_SIZE as u64 {
if let Some(last) = file.read_page(PRef::from(len - PAGE_SIZE as u64))? {
let lep = last.read_pref(PAGE_PAYLOAD_SIZE);
return Ok(DataFile{appender: PagedFileAppender::new(file, PRef::from(len), lep)});
}
else {
Err(HammersbaldError::Corrupted("missing first data page".to_string()))
}
}
else {
let appender = PagedFileAppender::new(file, PRef::from(0), PRef::invalid());
return Ok(DataFile{appender})
}
}
pub fn envelopes<'a>(&'a self) -> impl Iterator<Item=(PRef, Envelope)> +'a {
EnvelopeIterator::new(&self.appender, self.appender.lep())
}
pub fn dag<'a>(&'a self, root: PRef) -> DagIterator<'a> {
DagIterator::new(&self.appender, root)
}
pub fn shutdown (&mut self) {
self.appender.shutdown()
}
pub fn get_envelope(&self, mut pref: PRef) -> Result<Envelope, HammersbaldError> {
let mut len = [0u8;3];
pref = self.appender.read(pref, &mut len)?;
let mut buf = vec!(0u8; BigEndian::read_u24(&len) as usize);
self.appender.read(pref, &mut buf)?;
Ok(Envelope::deseralize(buf))
}
pub fn append_link (&mut self, link: Link) -> Result<PRef, HammersbaldError> {
let mut payload = vec!();
Payload::Link(link).serialize(&mut payload);
let envelope = Envelope::new(payload.as_slice(), self.appender.lep());
let mut store = vec!();
envelope.serialize(&mut store);
let me = self.appender.position();
self.appender.advance();
self.appender.append(store.as_slice())?;
Ok(me)
}
pub fn append_data (&mut self, key: &[u8], data: &[u8], referred: &Vec<PRef>) -> Result<PRef, HammersbaldError> {
let rv = Data::from_referred(referred.as_slice());
let indexed = IndexedData::new(key, Data::new(data, rv.as_slice()));
let mut payload = vec!();
Payload::Indexed(indexed).serialize(&mut payload);
let envelope = Envelope::new(payload.as_slice(), self.appender.lep());
let mut store = vec!();
envelope.serialize(&mut store);
let me = self.appender.position();
self.appender.advance();
self.appender.append(store.as_slice())?;
Ok(me)
}
pub fn append_referred (&mut self, data: &[u8], referred: &Vec<PRef>) -> Result<PRef, HammersbaldError> {
let rv = Data::from_referred(referred.as_slice());
let data = Data::new(data, rv.as_slice());
let mut payload = vec!();
Payload::Referred(data).serialize(&mut payload);
let envelope = Envelope::new(payload.as_slice(), self.appender.lep());
let mut store = vec!();
envelope.serialize(&mut store);
let me = self.appender.position();
self.appender.advance();
self.appender.append(store.as_slice())?;
Ok(me)
}
pub fn truncate(&mut self, pref: u64) -> Result<(), HammersbaldError> {
self.appender.truncate (pref)
}
pub fn flush (&mut self) -> Result<(), HammersbaldError> {
self.appender.flush()
}
pub fn sync (&self) -> Result<(), HammersbaldError> {
self.appender.sync()
}
pub fn len (&self) -> Result<u64, HammersbaldError> {
self.appender.len()
}
}
pub struct EnvelopeIterator<'f> {
file: &'f PagedFileAppender,
pos: PRef
}
impl<'f> EnvelopeIterator<'f> {
pub fn new (file: &'f PagedFileAppender, pos: PRef) -> EnvelopeIterator<'f> {
EnvelopeIterator {file, pos}
}
}
impl<'f> Iterator for EnvelopeIterator<'f> {
type Item = (PRef, Envelope);
fn next(&mut self) -> Option<<Self as Iterator>::Item> {
if self.pos.is_valid() {
let mut pos = self.pos;
let start = pos;
let mut len = [0u8;3];
pos = self.file.read(pos, &mut len).unwrap();
let mut buf = vec!(0u8; BigEndian::read_u24(&len) as usize);
self.file.read(pos, &mut buf).unwrap();
let envelope = Envelope::deseralize(buf);
self.pos = envelope.previous();
return Some((start, envelope))
}
None
}
}
pub struct DagIterator<'f> {
file: &'f PagedFileAppender,
pos: PRef,
next: VecDeque<PRef>
}
impl<'f> DagIterator<'f> {
pub fn new (file: &'f PagedFileAppender, pos: PRef) -> DagIterator<'f> {
let mut next = VecDeque::new();
next.push_back(pos);
DagIterator {file, pos, next}
}
fn schedule_descending (&mut self, mut referred: Vec<PRef>) {
referred.sort_unstable_by(|a, b| {
b.cmp(a)
});
for pref in referred {
self.next.push_back(pref);
}
}
}
impl<'f> Iterator for DagIterator<'f> {
type Item = (PRef, Envelope);
fn next(&mut self) -> Option<<Self as Iterator>::Item> {
if self.pos.is_valid() {
if let Some(mut pos) = self.next.pop_front() {
let start = pos;
let mut len = [0u8; 3];
pos = self.file.read(pos, &mut len).unwrap();
let mut buf = vec!(0u8; BigEndian::read_u24(&len) as usize);
self.file.read(pos, &mut buf).unwrap();
let envelope = Envelope::deseralize(buf);
match Payload::deserialize(envelope.payload()).unwrap() {
Payload::Indexed(indexed) => self.schedule_descending(indexed.data.referred()),
Payload::Referred(referred) => self.schedule_descending(referred.referred()),
_ => {}
}
return Some((start, envelope))
}
}
None
}
}