bitcask-rs 0.1.1

Bitcask implemented in rust.
Documentation
use core::{Key, Result, Value};
use integer_encoding::{VarInt, VarIntReader, VarIntWriter};
use io_at::Cursor;
use std::fs::{create_dir_all, remove_file, File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write, BufReader, BufWriter};
use std::path::PathBuf;
use std::hash::Hasher;
use twox_hash::XxHash;


fn xxhash32(bufs: &[&[u8]]) -> u32 {
    let mut hash = XxHash::with_seed(0);
    for buf in bufs {
        hash.write(buf)
    }
    hash.finish() as u32
}


pub type Offset = u64;

struct SegmentEntry {
    key: Key,
    value: Value,
}

impl SegmentEntry {
    pub fn new_checking_hash(key: Key, value: Value, hash: u32) -> SegmentEntry {
        let entry = Self::new(key, value);
        assert_eq!(hash, entry.compute_hash());
        entry
    }

    pub fn new(key: Key, value: Value) -> SegmentEntry {
        SegmentEntry {
            key: key,
            value: value,
        }
    }

    pub fn compute_size(&self) -> u64 {
        let key_buf = self.key.as_slice();
        let key_size_length = (key_buf.len() as u64).required_space();
        let value_buf = self.value.as_slice();
        let value_size_length = (value_buf.len() as u64).required_space();
        let hash_length = self.compute_hash().required_space();
        let size = key_size_length as u64
            + value_size_length as u64
            + key_buf.len() as u64
            + value_buf.len() as u64
            + hash_length as u64;
        size
    }

    pub fn compute_hash(&self) -> u32 {
        xxhash32(&[self.key.as_slice(), self.value.as_slice()])
    }
}

fn read_from_cursor(file: &mut BufReader<Cursor<&File>>) -> Result<SegmentEntry> {
    let key_size = file.read_varint::<u64>()?;
    debug!(target: "bitcask::segment", "get key size {}", key_size);
    let mut key_buf = vec![0; key_size as usize];
    file.read_exact(&mut key_buf)?;
    debug!(target: "bitcask::segment", "get key buf {:?}", key_buf);
    let value_size = file.read_varint::<u64>()?;
    debug!(target: "bitcask::segment", "get value size {}", value_size);
    let mut value_buf = vec![0; value_size as usize];
    file.read_exact(&mut value_buf)?;
    debug!(target: "bitcask::segment", "get value buf {:?}", value_buf);
    let hash = file.read_varint::<u32>()?;
    debug!(target: "bitcask::segment", "get hash {}", hash);
    Ok(SegmentEntry::new_checking_hash(
        key_buf,
        value_buf,
        hash
    ))
}


fn write_at_cursor(entry: &SegmentEntry, file: &mut BufWriter<Cursor<&File>>) -> Result<u64> {
    let key_buf = entry.key.as_slice();
    debug!(target: "bitcask::segment", "insert key size {}", key_buf.len());
    let _key_size_length = file.write_varint(key_buf.len() as u64)?;
    debug!(target: "bitcask::segment", "insert key buf {:?}", key_buf);
    file.write_all(key_buf)?;
    let value_buf = entry.value.as_slice();
    debug!(target: "bitcask::segment", "insert value size {}", value_buf.len());
    let _value_size_length = file.write_varint(value_buf.len() as u64)?;
    debug!(target: "bitcask::segment", "insert value buf {:?}", value_buf);
    file.write_all(value_buf)?;
    let hash = entry.compute_hash();
    debug!(target: "bitcask::segment", "insert hash {:?}", hash);
    let _hash_length = file.write_varint(hash)?;
    Ok(entry.compute_size())
}

pub struct Segment {
    file_path: PathBuf,
    pub file_id: u64,
    file: Option<File>,
    pub size: u64,
}

impl Segment {
    pub fn get_path(file_id: u64, path: &PathBuf) -> PathBuf {
        path.join(format!("{}.data", file_id))
    }

    pub fn new(file_id: u64, path: &PathBuf) -> Self {
        create_dir_all(&path).expect("create dir");
        let file_path = Self::get_path(file_id, path);
        let file = OpenOptions::new()
            .create(true)
            .write(true)
            .truncate(true)
            .read(true)
            .open(&file_path)
            .expect("open segment file");

        debug!(target: "bitcask::segment", "new segment file {:?}", &file_path);
        Segment {
            file_id,
            file_path,
            file: Some(file),
            size: 0,
        }
    }

    pub fn open(file_id: u64, path: &PathBuf) -> Self {
        let file_path = Self::get_path(file_id, path);
        let mut file = OpenOptions::new()
            .read(true)
            .open(&file_path)
            .expect("open segment file");

        let size = file.seek(SeekFrom::End(0)).expect("find file size");
        Segment {
            file_id,
            file_path: file_path.clone(),
            file: Some(file),
            size,
        }
    }

    pub fn get(&self, offset: Offset) -> Result<Option<Value>> {
        let mut file = BufReader::new(Cursor::new(self.file.as_ref().expect("get file"), offset));
        Ok(Some(read_from_cursor(&mut file)?.value))
    }

    pub fn insert(&mut self, key: Key, value: Value) -> Result<Offset> {
        let offset = self.size;
        let mut file = BufWriter::new(Cursor::new(self.file.as_ref().expect("get file"), offset));
        let entry = SegmentEntry::new(key, value);
        self.size += write_at_cursor(&entry, &mut file)?;
        Ok(offset)
    }

    pub fn destroy(&mut self) -> Result<()> {
        self.file = None;
        remove_file(&self.file_path)?;
        Ok(())
    }

    pub fn iter(&self) -> SegmentIterator {
        SegmentIterator::new(self)
    }
}

pub struct SegmentIterator<'a> {
    segment: &'a Segment,
    offset: u64,
}

impl<'a> IntoIterator for &'a Segment {
    type Item = Result<Entry>;
    type IntoIter = SegmentIterator<'a>;

    fn into_iter(self) -> <Self as IntoIterator>::IntoIter {
        SegmentIterator::new(self)
    }
}

impl<'a> SegmentIterator<'a> {
    fn new(segment: &'a Segment) -> SegmentIterator<'a> {
        SegmentIterator { segment, offset: 0 }
    }
}

pub struct Entry {
    pub offset: u64,
    pub key: Key,
    pub value: Value,
}

impl<'a> Iterator for SegmentIterator<'a> {
    type Item = Result<Entry>;

    fn next(&mut self) -> Option<<Self as Iterator>::Item> {
        if self.offset >= self.segment.size {
            return None;
        }

        debug!(target: "bitcask::segment",
               "file path: {:?}, offset: {}, size: {}",
            &self.segment.file_path,
            self.offset,
            self.segment.size
        );
        let mut file = BufReader::new(Cursor::new(self.segment.file.as_ref().expect("get file"), self.offset));
        let segment_entry = read_from_cursor(&mut file).expect("read from cursor");
        let size = segment_entry.compute_size();
        let entry = Entry {
            key: segment_entry.key,
            value: segment_entry.value,
            offset: self.offset,
        };

        self.offset += size;

        Some(Ok(entry))
    }
}