use std::collections::{BTreeMap, HashMap};
use std::io;
use bincode::{deserialize, serialize};
use marble::Marble;
use serde::{Deserialize, Serialize};
type ObjectId = u64;
const INDEX_OBJECT_ID: ObjectId = 1;
#[derive(Serialize, Deserialize, Debug)]
struct Index {
pages: BTreeMap<Vec<u8>, ObjectId>,
max_pid: u64,
}
impl Default for Index {
fn default() -> Self {
Index {
pages: Default::default(),
max_pid: INDEX_OBJECT_ID + 1,
}
}
}
#[derive(Serialize, Deserialize, Debug)]
struct Page {
hi: Option<Vec<u8>>,
lo: Vec<u8>,
kvs: BTreeMap<Vec<u8>, Vec<u8>>,
}
struct Kv {
heap: Marble,
index: Index,
}
impl Kv {
fn recover(path: &str) -> io::Result<Kv> {
let heap = marble::open(path)?;
let index: Index = if let Some(index_data) = heap.read(INDEX_OBJECT_ID)? {
deserialize(&index_data).unwrap()
} else {
Index::default()
};
let mut kv = Kv { index, heap };
if kv.index.pages.is_empty() {
let genesis_page = Page {
hi: None,
lo: vec![],
kvs: BTreeMap::new(),
};
kv.allocate_leaf(genesis_page)?;
}
Ok(kv)
}
fn allocate_leaf(&mut self, leaf: Page) -> io::Result<()> {
self.index.max_pid += 1;
let object_id = self.index.max_pid;
let previous = self.index.pages.insert(leaf.lo.clone(), object_id);
assert!(previous.is_none());
let write_batch: HashMap<ObjectId, Option<Vec<u8>>> = [
(object_id, Some(serialize(&leaf).unwrap())),
(INDEX_OBJECT_ID, Some(serialize(&self.index).unwrap())),
]
.into_iter()
.collect();
self.heap.write_batch(write_batch)
}
fn pid_for_key(&self, key: Vec<u8>) -> ObjectId {
*self.index.pages.range(..=key).next_back().unwrap().1
}
fn get(&mut self, key: &Vec<u8>) -> io::Result<Option<Vec<u8>>> {
let object_id = self.pid_for_key(key.clone());
let leaf_data = self.heap.read(object_id)?.unwrap();
let leaf: Page = deserialize(&leaf_data).unwrap();
Ok(leaf.kvs.get(key).cloned())
}
fn set(&mut self, key: Vec<u8>, value: Vec<u8>) -> io::Result<Option<Vec<u8>>> {
self.mutate(key, Some(value))
}
fn remove(&mut self, key: Vec<u8>) -> io::Result<Option<Vec<u8>>> {
self.mutate(key, None)
}
fn mutate(&mut self, key: Vec<u8>, value: Option<Vec<u8>>) -> io::Result<Option<Vec<u8>>> {
let object_id = self.pid_for_key(key.clone());
let leaf_data = self.heap.read(object_id)?.unwrap();
let mut leaf: Page = deserialize(&leaf_data).unwrap();
let ret = if let Some(v) = value {
leaf.kvs.insert(key, v)
} else {
leaf.kvs.remove(&key)
};
let write_batch = [(object_id, Some(serialize(&leaf).unwrap()))];
self.heap.write_batch(write_batch)?;
let stats = self.heap.stats();
if stats.dead_objects > stats.live_objects {
self.heap.maintenance()?;
}
Ok(ret)
}
}
fn main() {
let mut kv = Kv::recover("kv_example").unwrap();
kv.set(b"yo".to_vec(), b"hi".to_vec()).unwrap();
assert_eq!(&*kv.get(&b"yo".to_vec()).unwrap().unwrap(), b"hi");
kv.remove(b"yo".to_vec()).unwrap();
assert!(&kv.get(&b"yo".to_vec()).unwrap().is_none());
}