use logfile::LogFile;
use tablefile::TableFile;
use datafile::{DataFile, EnvelopeIterator};
use memtable::MemTable;
use format::{Payload,Envelope};
use persistent::Persistent;
use transient::Transient;
use pref::PRef;
use error::HammersbaldError;
use byteorder::{WriteBytesExt, ReadBytesExt, BigEndian};
use std::{
io,
io::{Cursor, Read, Write}
};
pub struct Hammersbald {
mem: MemTable
}
pub fn persistent(name: &str, cached_data_pages: usize, bucket_fill_target: usize) -> Result<Box<HammersbaldAPI>, HammersbaldError> {
Persistent::new_db(name, cached_data_pages,bucket_fill_target)
}
pub fn transient(bucket_fill_target: usize) -> Result<Box<HammersbaldAPI>, HammersbaldError> {
Transient::new_db("",0,bucket_fill_target)
}
pub trait HammersbaldAPI : Send + Sync {
fn batch (&mut self) -> Result<(), HammersbaldError>;
fn shutdown (&mut self);
fn put_keyed(&mut self, key: &[u8], data: &[u8]) -> Result<PRef, HammersbaldError>;
fn get_keyed(&self, key: &[u8]) -> Result<Option<(PRef, Vec<u8>)>, HammersbaldError>;
fn put(&mut self, data: &[u8]) -> Result<PRef, HammersbaldError>;
fn get(&self, pref: PRef) -> Result<(Vec<u8>, Vec<u8>), HammersbaldError>;
fn may_have_key(&self, key: &[u8]) -> Result<bool, HammersbaldError>;
fn forget(&mut self, key: &[u8]) -> Result<(), HammersbaldError>;
fn iter(&self) -> HammersbaldIterator;
}
pub struct HammersbaldDataWriter {
data: Vec<u8>
}
impl HammersbaldDataWriter {
pub fn new () -> HammersbaldDataWriter {
HammersbaldDataWriter { data: vec!() }
}
pub fn as_slice<'a> (&'a self) -> &'a [u8] {
self.data.as_slice()
}
pub fn write_ref(&mut self, pref: PRef) {
self.data.write_u48::<BigEndian>(pref.as_u64()).unwrap();
}
pub fn reader<'a>(&'a self) -> Cursor<&'a [u8]> {
Cursor::new(self.data.as_slice())
}
}
impl Write for HammersbaldDataWriter {
fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
self.data.write(buf)
}
fn flush(&mut self) -> Result<(), io::Error> {
Ok(())
}
}
pub struct HammersbaldDataReader<'a> {
reader: Cursor<&'a [u8]>
}
impl<'a> HammersbaldDataReader<'a> {
pub fn new (data: &'a [u8]) -> HammersbaldDataReader<'a> {
HammersbaldDataReader{ reader: Cursor::new(data) }
}
pub fn read_ref (&mut self) -> Result<PRef, io::Error> {
Ok(PRef::from(self.reader.read_u48::<BigEndian>()?))
}
}
impl<'a> Read for HammersbaldDataReader<'a> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
self.reader.read(buf)
}
}
impl Hammersbald {
pub fn new(log: LogFile, table: TableFile, data: DataFile, link: DataFile, bucket_fill_target :usize) -> Result<Hammersbald, HammersbaldError> {
let mem = MemTable::new(log, table, data, link, bucket_fill_target);
let mut db = Hammersbald { mem };
db.recover()?;
db.load()?;
db.batch()?;
Ok(db)
}
fn load(&mut self) -> Result<(), HammersbaldError> {
self.mem.load()
}
fn recover(&mut self) -> Result<(), HammersbaldError> {
self.mem.recover()
}
pub fn slots<'a> (&'a self) -> impl Iterator<Item=&'a Vec<(u32, PRef)>> +'a {
self.mem.slots()
}
pub fn buckets<'a> (&'a self) -> impl Iterator<Item=PRef> +'a {
self.mem.buckets()
}
pub fn data_envelopes<'a>(&'a self) -> impl Iterator<Item=(PRef, Envelope)> +'a {
self.mem.data_envelopes()
}
pub fn link_envelopes<'a>(&'a self) -> impl Iterator<Item=(PRef, Envelope)> +'a {
self.mem.link_envelopes()
}
pub fn params(&self) -> (usize, u32, usize, u64, u64, u64, u64, u64) {
self.mem.params()
}
}
impl HammersbaldAPI for Hammersbald {
fn batch (&mut self) -> Result<(), HammersbaldError> {
self.mem.batch()
}
fn shutdown (&mut self) {
self.mem.shutdown()
}
fn put_keyed(&mut self, key: &[u8], data: &[u8]) -> Result<PRef, HammersbaldError> {
#[cfg(debug_assertions)]
{
if key.len() > 255 || data.len() >= 1 << 23 {
return Err(HammersbaldError::KeyTooLong);
}
}
let data_offset = self.mem.append_data(key, data)?;
self.mem.put(key, data_offset)?;
Ok(data_offset)
}
fn get_keyed(&self, key: &[u8]) -> Result<Option<(PRef, Vec<u8>)>, HammersbaldError> {
self.mem.get(key)
}
fn put(&mut self, data: &[u8]) -> Result<PRef, HammersbaldError> {
let data_offset = self.mem.append_referred(data)?;
Ok(data_offset)
}
fn get(&self, pref: PRef) -> Result<(Vec<u8>, Vec<u8>), HammersbaldError> {
let envelope = self.mem.get_envelope(pref)?;
match Payload::deserialize(envelope.payload())? {
Payload::Referred(referred) => return Ok((vec!(), referred.data.to_vec())),
Payload::Indexed(indexed) => return Ok((indexed.key.to_vec(), indexed.data.data.to_vec())),
_ => Err(HammersbaldError::Corrupted("referred should point to data".to_string()))
}
}
fn may_have_key(&self, key: &[u8]) -> Result<bool, HammersbaldError> {
self.mem.may_have_key(key)
}
fn forget(&mut self, key: &[u8]) -> Result<(), HammersbaldError> {
self.mem.forget(key)
}
fn iter(&self) -> HammersbaldIterator {
HammersbaldIterator{ ei: self.mem.data_envelopes()}
}
}
pub struct HammersbaldIterator<'a> {
ei: EnvelopeIterator<'a>
}
impl<'a> Iterator for HammersbaldIterator<'a> {
type Item = (PRef, Vec<u8>, Vec<u8>);
fn next(&mut self) -> Option<<Self as Iterator>::Item> {
if let Some((pref, envelope)) = self.ei.next() {
match Payload::deserialize(envelope.payload()).unwrap() {
Payload::Indexed(indexed) => {
return Some((pref, indexed.key.to_vec(), indexed.data.data.to_vec()))
},
Payload::Referred(referred) => {
return Some((pref, vec!(), referred.data.to_vec()))
},
_ => return None
}
}
None
}
}
#[cfg(test)]
mod test {
extern crate rand;
extern crate hex;
use transient::Transient;
use self::rand::thread_rng;
use std::collections::HashMap;
use api::test::rand::RngCore;
#[test]
fn test_two_batches () {
let mut db = Transient::new_db("first", 1, 1).unwrap();
let mut rng = thread_rng();
let mut check = HashMap::new();
let mut key = [0x0u8;32];
let mut data = [0x0u8;40];
for _ in 0 .. 10000 {
rng.fill_bytes(&mut key);
rng.fill_bytes(&mut data);
let pref = db.put_keyed(&key, &data).unwrap();
check.insert(key, (pref, data));
}
db.batch().unwrap();
for (k, (o, v)) in check.iter() {
assert_eq!(db.get_keyed(&k[..]).unwrap(), Some((*o, v.to_vec())));
}
for _ in 0 .. 10000 {
rng.fill_bytes(&mut key);
rng.fill_bytes(&mut data);
let pref = db.put_keyed(&key, &data).unwrap();
check.insert(key, (pref, data));
}
db.batch().unwrap();
for (k, (o, v)) in check.iter() {
assert_eq!(db.get_keyed(&k[..]).unwrap(), Some((*o, v.to_vec())));
}
db.shutdown();
}
}