use lazy_static::lazy_static;
use std::{
borrow::Borrow,
cmp,
convert::TryInto,
ffi, fmt,
fmt::Display,
fs,
io::Write,
marker, mem,
ops::{Bound, RangeBounds},
path, result,
str::FromStr,
sync::{
self,
atomic::{
AtomicPtr,
Ordering::{Acquire, Release},
},
mpsc, Arc,
},
thread, time,
};
use crate::core::{Diff, Entry, Footprint, Result, Serialize};
use crate::core::{Index, IndexIter, Reader, Writer};
use crate::error::Error;
use crate::jsondata::{Json, Property};
use crate::util;
use crate::robt_entry::MEntry;
use crate::robt_index::{MBlock, ZBlock};
include!("robt_marker.rs");
struct Levels<K, V>(AtomicPtr<Arc<Vec<Snapshot<K, V>>>>)
where
K: Clone + Ord + Serialize,
V: Clone + Diff + Serialize,
<V as Diff>::D: Serialize;
impl<K, V> Levels<K, V>
where
K: Clone + Ord + Serialize,
V: Clone + Diff + Serialize,
<V as Diff>::D: Serialize,
{
fn new() -> Levels<K, V> {
Levels(AtomicPtr::new(Box::leak(Box::new(Arc::new(vec![])))))
}
fn get_snapshots(&self) -> Arc<Vec<Snapshot<K, V>>> {
unsafe { Arc::clone(self.0.load(Acquire).as_ref().unwrap()) }
}
fn compare_swap_snapshots(&self, new_snapshots: Vec<Snapshot<K, V>>) {
let _olds = unsafe { Box::from_raw(self.0.load(Acquire)) };
let new_snapshots = Box::leak(Box::new(Arc::new(new_snapshots)));
self.0.store(new_snapshots, Release);
}
}
pub(crate) struct Robt<K, V, M>
where
K: 'static + Sync + Send + Clone + Ord + Serialize + Footprint,
V: 'static + Sync + Send + Clone + Diff + Serialize + Footprint,
<V as Diff>::D: Serialize,
M: 'static + Sync + Send + Index<K, V>,
{
config: Config,
mem_ratio: f64,
disk_ratio: f64,
levels: Levels<K, V>,
todisk: MemToDisk<K, V, M>,
tocompact: DiskCompact<K, V, M>,
}
impl<K, V, M> Robt<K, V, M>
where
K: 'static + Sync + Send + Clone + Ord + Serialize + Footprint,
V: 'static + Sync + Send + Clone + Diff + Serialize + Footprint,
<V as Diff>::D: Serialize,
M: 'static + Sync + Send + Index<K, V>,
{
const MEM_RATIO: f64 = 0.2;
const DISK_RATIO: f64 = 0.5;
pub(crate) fn new(config: Config) -> Robt<K, V, M> {
Robt {
config: config.clone(),
mem_ratio: Self::MEM_RATIO,
disk_ratio: Self::DISK_RATIO,
levels: Levels::new(),
todisk: MemToDisk::new(config.clone()),
tocompact: DiskCompact::new(config.clone()),
}
}
pub(crate) fn set_mem_ratio(mut self, ratio: f64) -> Robt<K, V, M> {
self.mem_ratio = ratio;
self
}
pub(crate) fn set_disk_ratio(mut self, ratio: f64) -> Robt<K, V, M> {
self.disk_ratio = ratio;
self
}
}
impl<K, V, M> Robt<K, V, M>
where
K: 'static + Sync + Send + Clone + Ord + Serialize + Footprint,
V: 'static + Sync + Send + Clone + Diff + Serialize + Footprint,
<V as Diff>::D: Serialize,
M: 'static + Sync + Send + Index<K, V>,
{
pub(crate) fn flush_to_disk(
&mut self,
index: Arc<M>,
app_meta: Vec<u8>,
) -> Result<()> {
let _resp = self.todisk.send(Request::MemFlush {
index,
app_meta,
phantom_key: marker::PhantomData,
phantom_val: marker::PhantomData,
})?;
Ok(())
}
}
enum Request<K, V, M>
where
K: 'static + Sync + Send + Clone + Ord + Serialize + Footprint,
V: 'static + Sync + Send + Clone + Diff + Serialize + Footprint,
<V as Diff>::D: Serialize,
M: 'static + Sync + Send + Index<K, V>,
{
MemFlush {
index: Arc<M>,
app_meta: Vec<u8>,
phantom_key: marker::PhantomData<K>,
phantom_val: marker::PhantomData<V>,
},
}
enum Response {
Ok,
}
struct MemToDisk<K, V, M>
where
K: 'static + Sync + Send + Clone + Ord + Serialize + Footprint,
V: 'static + Sync + Send + Clone + Diff + Serialize + Footprint,
<V as Diff>::D: Serialize,
M: 'static + Sync + Send + Index<K, V>,
{
config: Config,
t_handle: thread::JoinHandle<Result<()>>,
tx: mpsc::SyncSender<(Request<K, V, M>, mpsc::SyncSender<Response>)>,
}
impl<K, V, M> MemToDisk<K, V, M>
where
K: 'static + Sync + Send + Clone + Ord + Serialize + Footprint,
V: 'static + Sync + Send + Clone + Diff + Serialize + Footprint,
<V as Diff>::D: Serialize,
M: 'static + Sync + Send + Index<K, V>,
{
fn new(config: Config) -> MemToDisk<K, V, M> {
let (tx, rx) = mpsc::sync_channel(1);
let conf = config.clone();
let t_handle = thread::spawn(move || thread_mem_to_disk(conf, rx));
MemToDisk {
config,
t_handle,
tx,
}
}
fn send(&mut self, req: Request<K, V, M>) -> Result<Response> {
let (tx, rx) = mpsc::sync_channel(0);
self.tx.send((req, tx))?;
Ok(rx.recv()?)
}
fn close_wait(self) -> Result<()> {
mem::drop(self.tx);
match self.t_handle.join() {
Ok(res) => res,
Err(err) => match err.downcast_ref::<String>() {
Some(msg) => Err(Error::ThreadFail(msg.to_string())),
None => Err(Error::ThreadFail("unknown error".to_string())),
},
}
}
}
fn thread_mem_to_disk<K, V, M>(
_config: Config,
_rx: mpsc::Receiver<(Request<K, V, M>, mpsc::SyncSender<Response>)>,
) -> Result<()>
where
K: 'static + Sync + Send + Clone + Ord + Serialize + Footprint,
V: 'static + Sync + Send + Clone + Diff + Serialize + Footprint,
<V as Diff>::D: Serialize,
M: 'static + Sync + Send + Index<K, V>,
{
Ok(())
}
struct DiskCompact<K, V, M>
where
K: 'static + Sync + Send + Clone + Ord + Serialize + Footprint,
V: 'static + Sync + Send + Clone + Diff + Serialize + Footprint,
<V as Diff>::D: Serialize,
M: 'static + Sync + Send + Index<K, V>,
{
config: Config,
t_handle: thread::JoinHandle<Result<()>>,
tx: mpsc::SyncSender<(Request<K, V, M>, mpsc::SyncSender<Response>)>,
}
impl<K, V, M> DiskCompact<K, V, M>
where
K: 'static + Sync + Send + Clone + Ord + Serialize + Footprint,
V: 'static + Sync + Send + Clone + Diff + Serialize + Footprint,
<V as Diff>::D: Serialize,
M: 'static + Sync + Send + Index<K, V>,
{
fn new(config: Config) -> DiskCompact<K, V, M> {
let (tx, rx) = mpsc::sync_channel(1);
let conf = config.clone();
let t_handle = thread::spawn(move || thread_disk_compact(conf, rx));
DiskCompact {
config,
t_handle,
tx,
}
}
fn send(&mut self, req: Request<K, V, M>) -> Result<Response> {
let (tx, rx) = mpsc::sync_channel(0);
self.tx.send((req, tx))?;
Ok(rx.recv()?)
}
fn close_wait(self) -> Result<()> {
mem::drop(self.tx);
match self.t_handle.join() {
Ok(res) => res,
Err(err) => match err.downcast_ref::<String>() {
Some(msg) => Err(Error::ThreadFail(msg.to_string())),
None => Err(Error::ThreadFail("unknown error".to_string())),
},
}
}
}
fn thread_disk_compact<K, V, M>(
_config: Config,
_rx: mpsc::Receiver<(Request<K, V, M>, mpsc::SyncSender<Response>)>,
) -> Result<()>
where
K: 'static + Sync + Send + Clone + Ord + Serialize + Footprint,
V: 'static + Sync + Send + Clone + Diff + Serialize + Footprint,
<V as Diff>::D: Serialize,
M: 'static + Sync + Send + Index<K, V>,
{
Ok(())
}
#[derive(Clone)]
pub struct Config {
pub z_blocksize: usize,
pub m_blocksize: usize,
pub v_blocksize: usize,
pub tomb_purge: Option<u64>,
pub delta_ok: bool,
pub vlog_file: Option<ffi::OsString>,
pub value_in_vlog: bool,
pub flush_queue_size: usize,
}
impl Default for Config {
fn default() -> Config {
Config {
z_blocksize: Self::ZBLOCKSIZE,
v_blocksize: Self::VBLOCKSIZE,
m_blocksize: Self::MBLOCKSIZE,
tomb_purge: Default::default(),
delta_ok: true,
vlog_file: Default::default(),
value_in_vlog: false,
flush_queue_size: Self::FLUSH_QUEUE_SIZE,
}
}
}
impl From<Stats> for Config {
fn from(stats: Stats) -> Config {
Config {
z_blocksize: stats.z_blocksize,
m_blocksize: stats.m_blocksize,
v_blocksize: stats.v_blocksize,
tomb_purge: Default::default(),
delta_ok: stats.delta_ok,
vlog_file: stats.vlog_file,
value_in_vlog: stats.value_in_vlog,
flush_queue_size: Self::FLUSH_QUEUE_SIZE,
}
}
}
impl Config {
pub const ZBLOCKSIZE: usize = 4 * 1024;
pub const VBLOCKSIZE: usize = 4 * 1024;
pub const MBLOCKSIZE: usize = 4 * 1024;
const MARKER_BLOCK_SIZE: usize = 1024 * 4;
const FLUSH_QUEUE_SIZE: usize = 64;
pub fn set_blocksize(&mut self, z: usize, v: usize, m: usize) -> &mut Self {
self.z_blocksize = z;
self.v_blocksize = v;
self.m_blocksize = m;
self
}
pub fn set_tombstone_purge(&mut self, before: u64) -> &mut Self {
self.tomb_purge = Some(before);
self
}
pub fn set_delta(&mut self, vlog_file: Option<ffi::OsString>) -> &mut Self {
match vlog_file {
Some(vlog_file) => {
self.delta_ok = true;
self.vlog_file = Some(vlog_file);
}
None => {
self.delta_ok = false;
}
}
self
}
pub fn set_value_log(&mut self, file: Option<ffi::OsString>) -> &mut Self {
match file {
Some(vlog_file) => {
self.value_in_vlog = true;
self.vlog_file = Some(vlog_file);
}
None => {
self.value_in_vlog = false;
}
}
self
}
pub fn set_flush_queue_size(&mut self, size: usize) -> &mut Self {
self.flush_queue_size = size;
self
}
}
impl Config {
pub(crate) fn stitch_index_file(dir: &str, name: &str) -> ffi::OsString {
let mut index_file = path::PathBuf::from(dir);
index_file.push(format!("robt-{}.indx", name));
let index_file: &ffi::OsStr = index_file.as_ref();
index_file.to_os_string()
}
pub(crate) fn stitch_vlog_file(dir: &str, name: &str) -> ffi::OsString {
let mut vlog_file = path::PathBuf::from(dir);
vlog_file.push(format!("robt-{}.vlog", name));
let vlog_file: &ffi::OsStr = vlog_file.as_ref();
vlog_file.to_os_string()
}
pub(crate) fn compute_root_block(n: usize) -> usize {
if (n % Config::MARKER_BLOCK_SIZE) == 0 {
n
} else {
((n / Config::MARKER_BLOCK_SIZE) + 1) * Config::MARKER_BLOCK_SIZE
}
}
pub fn to_index_file(&self, dir: &str, name: &str) -> ffi::OsString {
Self::stitch_index_file(&dir, &name)
}
pub fn to_value_log(&self, dir: &str, name: &str) -> Option<ffi::OsString> {
match &self.vlog_file {
Some(file) => Some(file.clone()),
None => Some(Self::stitch_vlog_file(&dir, &name)),
}
}
}
pub enum MetaItem {
Marker(Vec<u8>),
Stats(String),
AppMetadata(Vec<u8>),
Root(u64),
}
pub(crate) fn write_meta_items(
file: ffi::OsString,
items: Vec<MetaItem>,
) -> Result<u64> {
let p = path::Path::new(&file);
let mut opts = fs::OpenOptions::new();
let mut fd = opts.append(true).open(p)?;
let (mut hdr, mut block) = (vec![], vec![]);
hdr.resize(32, 0);
for (i, item) in items.into_iter().enumerate() {
match (i, item) {
(0, MetaItem::Root(fpos)) => {
hdr[0..8].copy_from_slice(&fpos.to_be_bytes());
}
(1, MetaItem::AppMetadata(md)) => {
hdr[8..16].copy_from_slice(&(md.len() as u64).to_be_bytes());
block.extend_from_slice(&md);
}
(2, MetaItem::Stats(s)) => {
hdr[16..24].copy_from_slice(&(s.len() as u64).to_be_bytes());
block.extend_from_slice(s.as_bytes());
}
(3, MetaItem::Marker(data)) => {
hdr[24..32].copy_from_slice(&(data.len() as u64).to_be_bytes());
block.extend_from_slice(&data);
}
(i, _) => panic!("unreachable arm at {}", i),
}
}
block.extend_from_slice(&hdr[..]);
let n = Config::compute_root_block(block.len());
let (shift, m) = (n - block.len(), block.len());
block.resize(n, 0);
block.copy_within(0..m, shift);
let ln = block.len();
let n = fd.write(&block)?;
fd.sync_all()?;
if n == ln {
Ok(n.try_into().unwrap())
} else {
let msg = format!("write_meta_items: {:?} {}/{}...", &file, ln, n);
Err(Error::PartialWrite(msg))
}
}
pub fn read_meta_items(
dir: &str,
name: &str,
) -> Result<Vec<MetaItem>> {
let index_file = Config::stitch_index_file(dir, name);
let m = fs::metadata(&index_file)?.len();
let mut fd = util::open_file_r(index_file.as_ref())?;
let hdr = util::read_buffer(&mut fd, m - 32, 32, "read root-block header")?;
let root = u64::from_be_bytes(hdr[..8].try_into().unwrap());
let n_md = u64::from_be_bytes(hdr[8..16].try_into().unwrap()) as usize;
let n_stats = u64::from_be_bytes(hdr[16..24].try_into().unwrap()) as usize;
let n_marker = u64::from_be_bytes(hdr[24..32].try_into().unwrap()) as usize;
let n = Config::compute_root_block(n_stats + n_md + n_marker + 32)
.try_into()
.unwrap();
let block: Vec<u8> = util::read_buffer(&mut fd, m - n, n, "read root-block")?
.into_iter()
.collect();
let mut meta_items: Vec<MetaItem> = vec![];
let z = (n as usize) - 32;
let (x, y) = (z - n_marker, z);
let marker = block[x..y].to_vec();
if marker.ne(&ROOT_MARKER.as_slice()) {
let msg = format!("unexpected marker {:?}", marker);
return Err(Error::InvalidSnapshot(msg));
}
let (x, y) = (z - n_marker - n_stats, z - n_marker);
let stats = std::str::from_utf8(&block[x..y])?.to_string();
let (x, y) = (z - n_marker - n_stats - n_md, z - n_marker - n_stats);
let app_data = block[x..y].to_vec();
meta_items.push(MetaItem::Root(root));
meta_items.push(MetaItem::AppMetadata(app_data));
meta_items.push(MetaItem::Stats(stats.clone()));
meta_items.push(MetaItem::Marker(marker.clone()));
let stats: Stats = stats.parse()?;
let at = m - n - (stats.m_blocksize as u64);
if at != root {
let msg = format!("expected root at {}, found {}", at, root);
Err(Error::InvalidSnapshot(msg))
} else {
Ok(meta_items)
}
}
impl fmt::Display for MetaItem {
fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
match self {
MetaItem::Marker(_) => write!(f, "MetaItem::Marker"),
MetaItem::AppMetadata(_) => write!(f, "MetaItem::AppMetadata"),
MetaItem::Stats(_) => write!(f, "MetaItem::Stats"),
MetaItem::Root(_) => write!(f, "MetaItem::Root"),
}
}
}
#[derive(Clone, Default, PartialEq)]
pub struct Stats {
pub z_blocksize: usize,
pub m_blocksize: usize,
pub v_blocksize: usize,
pub delta_ok: bool,
pub vlog_file: Option<ffi::OsString>,
pub value_in_vlog: bool,
pub n_count: u64,
pub n_deleted: usize,
pub seqno: u64,
pub key_mem: usize,
pub diff_mem: usize,
pub val_mem: usize,
pub z_bytes: usize,
pub m_bytes: usize,
pub v_bytes: usize,
pub padding: usize,
pub n_abytes: usize,
pub build_time: u64,
pub epoch: i128,
}
impl From<Config> for Stats {
fn from(config: Config) -> Stats {
Stats {
z_blocksize: config.z_blocksize,
m_blocksize: config.m_blocksize,
v_blocksize: config.v_blocksize,
delta_ok: config.delta_ok,
vlog_file: config.vlog_file,
value_in_vlog: config.value_in_vlog,
n_count: Default::default(),
n_deleted: Default::default(),
seqno: Default::default(),
key_mem: Default::default(),
diff_mem: Default::default(),
val_mem: Default::default(),
z_bytes: Default::default(),
v_bytes: Default::default(),
m_bytes: Default::default(),
padding: Default::default(),
n_abytes: Default::default(),
build_time: Default::default(),
epoch: Default::default(),
}
}
}
impl FromStr for Stats {
type Err = Error;
fn from_str(s: &str) -> Result<Stats> {
let js: Json = s.parse()?;
let to_usize = |key: &str| -> Result<usize> {
let n: usize = js.get(key)?.integer().unwrap().try_into().unwrap();
Ok(n)
};
let to_u64 = |key: &str| -> Result<u64> {
let n: u64 = js.get(key)?.integer().unwrap().try_into().unwrap();
Ok(n)
};
let s = js.get("/vlog_file")?.string().unwrap();
let vlog_file: Option<ffi::OsString> = match s {
s if s.len() == 0 => None,
s => Some(s.into()),
};
Ok(Stats {
z_blocksize: to_usize("/z_blocksize")?,
m_blocksize: to_usize("/m_blocksize")?,
v_blocksize: to_usize("/v_blocksize")?,
delta_ok: js.get("/delta_ok")?.boolean().unwrap(),
vlog_file: vlog_file,
value_in_vlog: js.get("/value_in_vlog")?.boolean().unwrap(),
n_count: to_u64("/n_count")?,
n_deleted: to_usize("/n_deleted")?,
seqno: to_u64("/seqno")?,
key_mem: to_usize("/key_mem")?,
diff_mem: to_usize("/diff_mem")?,
val_mem: to_usize("/val_mem")?,
z_bytes: to_usize("/z_bytes")?,
v_bytes: to_usize("/v_bytes")?,
m_bytes: to_usize("/m_bytes")?,
padding: to_usize("/padding")?,
n_abytes: to_usize("/n_abytes")?,
build_time: to_u64("/build_time")?,
epoch: js.get("/epoch")?.integer().unwrap(),
})
}
}
impl Display for Stats {
fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
let mut js = Json::new::<Vec<Property>>(vec![]);
let vlog_file = self.vlog_file.clone().unwrap_or(Default::default());
let vlog_file = match vlog_file.into_string() {
Ok(vlog_file) => vlog_file,
Err(err) => panic!(err),
};
js.set("/z_blocksize", Json::new(self.z_blocksize)).ok();
js.set("/m_blocksize", Json::new(self.m_blocksize)).ok();
js.set("/v_blocksize", Json::new(self.v_blocksize)).ok();
js.set("/delta_ok", Json::new(self.delta_ok)).ok();
js.set("/vlog_file", Json::new(vlog_file)).ok();
js.set("/value_in_vlog", Json::new(self.value_in_vlog)).ok();
js.set("/n_count", Json::new(self.n_count)).ok();
js.set("/n_deleted", Json::new(self.n_deleted)).ok();
js.set("/seqno", Json::new(self.seqno)).ok();
js.set("/key_mem", Json::new(self.key_mem)).ok();
js.set("/diff_mem", Json::new(self.diff_mem)).ok();
js.set("/val_mem", Json::new(self.val_mem)).ok();
js.set("/z_bytes", Json::new(self.z_bytes)).ok();
js.set("/v_bytes", Json::new(self.v_bytes)).ok();
js.set("/m_bytes", Json::new(self.m_bytes)).ok();
js.set("/padding", Json::new(self.padding)).ok();
js.set("/n_abytes", Json::new(self.n_abytes)).ok();
js.set("/build_time", Json::new(self.build_time)).ok();
js.set("/epoch", Json::new(self.epoch)).ok();
write!(f, "{}", js.to_string())
}
}
pub struct Builder<K, V>
where
K: Clone + Ord + Serialize,
V: Clone + Diff + Serialize,
<V as Diff>::D: Serialize,
{
config: Config,
iflusher: Flusher,
vflusher: Option<Flusher>,
stats: Stats,
phantom_key: marker::PhantomData<K>,
phantom_val: marker::PhantomData<V>,
}
impl<K, V> Builder<K, V>
where
K: Clone + Ord + Serialize,
V: Clone + Diff + Serialize,
<V as Diff>::D: Serialize,
{
pub fn initial(
dir: &str,
name: &str,
config: Config,
) -> Result<Builder<K, V>> {
let create = true;
let iflusher = {
let file = config.to_index_file(dir, name);
Flusher::new(file, config.clone(), create)?
};
let vflusher = config
.to_value_log(dir, name)
.map(|file| Flusher::new(file, config.clone(), create))
.transpose()?;
Ok(Builder {
config: config.clone(),
iflusher,
vflusher,
stats: From::from(config),
phantom_key: marker::PhantomData,
phantom_val: marker::PhantomData,
})
}
pub fn incremental(
dir: &str,
name: &str,
config: Config,
) -> Result<Builder<K, V>> {
let iflusher = {
let file = config.to_index_file(dir, name);
Flusher::new(file, config.clone(), true )?
};
let vflusher = config
.to_value_log(dir, name)
.map(|file| Flusher::new(file, config.clone(), false ))
.transpose()?;
let mut stats: Stats = From::from(config.clone());
stats.n_abytes += vflusher.as_ref().map_or(0, |vf| vf.fpos) as usize;
Ok(Builder {
config: config.clone(),
iflusher,
vflusher,
stats,
phantom_key: marker::PhantomData,
phantom_val: marker::PhantomData,
})
}
pub fn build<I>(mut self, iter: I, app_meta: Vec<u8>) -> Result<()>
where
I: Iterator<Item = Result<Entry<K, V>>>,
{
let (took, root): (u64, u64) = {
let start = time::SystemTime::now();
let root = self.build_tree(iter)?;
(
start.elapsed().unwrap().as_nanos().try_into().unwrap(),
root,
)
};
let stats: String = {
self.stats.build_time = took;
let epoch: i128 = time::UNIX_EPOCH
.elapsed()
.unwrap()
.as_nanos()
.try_into()
.unwrap();
self.stats.epoch = epoch;
self.stats.to_string()
};
let meta_items: Vec<MetaItem> = vec![
MetaItem::Root(root),
MetaItem::AppMetadata(app_meta),
MetaItem::Stats(stats),
MetaItem::Marker(ROOT_MARKER.clone()),
];
let index_file: ffi::OsString = self.iflusher.file.clone();
self.iflusher.close_wait()?;
self.vflusher.take().map(|x| x.close_wait()).transpose()?;
write_meta_items(index_file, meta_items)?;
Ok(())
}
fn build_tree<I>(&mut self, iter: I) -> Result<u64>
where
I: Iterator<Item = Result<Entry<K, V>>>,
{
struct Context<K, V>
where
K: Clone + Ord + Serialize,
V: Clone + Diff + Serialize,
<V as Diff>::D: Serialize,
{
fpos: u64,
zfpos: u64,
vfpos: u64,
z: ZBlock<K, V>,
ms: Vec<MBlock<K, V>>,
};
let mut c = {
let vfpos = self.stats.n_abytes.try_into().unwrap();
Context {
fpos: 0,
zfpos: 0,
vfpos,
z: ZBlock::new_encode(vfpos, self.config.clone()),
ms: vec![MBlock::new_encode(self.config.clone())],
}
};
for entry in iter {
let mut entry = match self.preprocess(entry?) {
Some(entry) => entry,
None => continue,
};
match c.z.insert(&entry, &mut self.stats) {
Ok(_) => (),
Err(Error::__ZBlockOverflow(_)) => {
let (zbytes, vbytes) = c.z.finalize(&mut self.stats);
c.z.flush(&mut self.iflusher, self.vflusher.as_mut())?;
c.fpos += zbytes;
c.vfpos += vbytes;
let mut m = c.ms.pop().unwrap();
match m.insertz(c.z.as_first_key(), c.zfpos) {
Ok(_) => c.ms.push(m),
Err(Error::__MBlockOverflow(_)) => {
let x = m.finalize(&mut self.stats);
m.flush(&mut self.iflusher)?;
let k = m.as_first_key();
let r = self.insertms(c.ms, c.fpos + x, k, c.fpos)?;
c.ms = r.0;
c.fpos = r.1;
m.reset();
m.insertz(c.z.as_first_key(), c.zfpos).unwrap();
c.ms.push(m)
}
Err(err) => return Err(err),
}
c.zfpos = c.fpos;
c.z.reset(c.vfpos);
c.z.insert(&entry, &mut self.stats).unwrap();
}
Err(err) => return Err(err),
};
self.postprocess(&mut entry);
}
if c.z.has_first_key() {
let (zbytes, vbytes) = c.z.finalize(&mut self.stats);
c.z.flush(&mut self.iflusher, self.vflusher.as_mut())?;
c.fpos += zbytes;
c.vfpos += vbytes;
let mut m = c.ms.pop().unwrap();
match m.insertz(c.z.as_first_key(), c.zfpos) {
Ok(_) => c.ms.push(m),
Err(Error::__MBlockOverflow(_)) => {
let x = m.finalize(&mut self.stats);
m.flush(&mut self.iflusher)?;
let mkey = m.as_first_key();
let res = self.insertms(c.ms, c.fpos + x, mkey, c.fpos)?;
c.ms = res.0;
c.fpos = res.1;
m.reset();
m.insertz(c.z.as_first_key(), c.zfpos)?;
c.ms.push(m);
}
Err(err) => return Err(err),
}
} else {
return Err(Error::EmptyIterator);
}
while let Some(mut m) = c.ms.pop() {
let is_root = m.has_first_key() && c.ms.len() == 0;
if is_root {
let x = m.finalize(&mut self.stats);
m.flush(&mut self.iflusher)?;
c.fpos += x;
} else if m.has_first_key() {
let x = m.finalize(&mut self.stats);
m.flush(&mut self.iflusher)?;
let mkey = m.as_first_key();
let res = self.insertms(c.ms, c.fpos + x, mkey, c.fpos)?;
c.ms = res.0;
c.fpos = res.1
}
}
let n: u64 = self.config.m_blocksize.try_into().unwrap();
Ok(c.fpos - n)
}
fn insertms(
&mut self,
mut ms: Vec<MBlock<K, V>>,
mut fpos: u64,
key: &K,
mfpos: u64,
) -> Result<(Vec<MBlock<K, V>>, u64)> {
let m0 = ms.pop();
let m0 = match m0 {
None => {
let mut m0 = MBlock::new_encode(self.config.clone());
m0.insertm(key, mfpos).unwrap();
m0
}
Some(mut m0) => match m0.insertm(key, mfpos) {
Ok(_) => m0,
Err(Error::__MBlockOverflow(_)) => {
let x = m0.finalize(&mut self.stats);
m0.flush(&mut self.iflusher)?;
let mkey = m0.as_first_key();
let res = self.insertms(ms, fpos + x, mkey, fpos)?;
ms = res.0;
fpos = res.1;
m0.reset();
m0.insertm(key, mfpos).unwrap();
m0
}
Err(err) => return Err(err),
},
};
ms.push(m0);
Ok((ms, fpos))
}
fn preprocess(&mut self, entry: Entry<K, V>) -> Option<Entry<K, V>> {
let entry = match self.config.tomb_purge {
Some(before) => entry.purge(Bound::Excluded(before)),
_ => Some(entry),
};
match entry {
Some(entry) => {
self.stats.seqno = cmp::max(self.stats.seqno, entry.to_seqno());
Some(entry)
}
None => None,
}
}
fn postprocess(&mut self, entry: &mut Entry<K, V>) {
self.stats.n_count += 1;
if entry.is_deleted() {
self.stats.n_deleted += 1;
}
}
}
pub(crate) struct Flusher {
file: ffi::OsString,
fpos: u64,
t_handle: thread::JoinHandle<Result<()>>,
tx: mpsc::SyncSender<Vec<u8>>,
}
impl Flusher {
fn new(
file: ffi::OsString,
config: Config,
create: bool,
) -> Result<Flusher> {
let (fd, fpos) = if create {
(util::open_file_cw(file.clone())?, Default::default())
} else {
(util::open_file_w(&file)?, fs::metadata(&file)?.len())
};
let (tx, rx) = mpsc::sync_channel(config.flush_queue_size);
let file1 = file.clone();
let t_handle = thread::spawn(move || thread_flush(file1, fd, rx));
Ok(Flusher {
file,
fpos,
t_handle,
tx,
})
}
pub(crate) fn send(&mut self, block: Vec<u8>) -> Result<()> {
self.tx.send(block)?;
Ok(())
}
fn close_wait(self) -> Result<()> {
mem::drop(self.tx);
match self.t_handle.join() {
Ok(Ok(())) => Ok(()),
Ok(Err(Error::PartialWrite(err))) => Err(Error::PartialWrite(err)),
Ok(Err(_)) => unreachable!(),
Err(err) => match err.downcast_ref::<String>() {
Some(msg) => Err(Error::ThreadFail(msg.to_string())),
None => Err(Error::ThreadFail("unknown error".to_string())),
},
}
}
}
fn thread_flush(
file: ffi::OsString,
mut fd: fs::File,
rx: mpsc::Receiver<Vec<u8>>,
) -> Result<()> {
for data in rx.iter() {
let n = fd.write(&data)?;
if n != data.len() {
let msg = format!("flusher: {:?} {}/{}...", &file, data.len(), n);
return Err(Error::PartialWrite(msg));
}
}
fd.sync_all()?;
Ok(())
}
pub struct Snapshot<K, V>
where
K: Clone + Ord + Serialize,
V: Clone + Diff + Serialize,
{
dir: String,
name: String,
meta: Vec<MetaItem>,
config: Config,
index_fd: fs::File,
vlog_fd: Option<fs::File>,
mutex: sync::Mutex<i32>,
phantom_key: marker::PhantomData<K>,
phantom_val: marker::PhantomData<V>,
}
impl<K, V> Snapshot<K, V>
where
K: Clone + Ord + Serialize,
V: Clone + Diff + Serialize,
{
pub fn open(dir: &str, name: &str) -> Result<Snapshot<K, V>> {
let meta_items = read_meta_items(dir, name)?;
let mut snap = Snapshot {
dir: dir.to_string(),
name: name.to_string(),
meta: meta_items,
config: Default::default(),
index_fd: {
let index_file = Config::stitch_index_file(dir, name);
util::open_file_r(&index_file.as_ref())?
},
vlog_fd: Default::default(),
mutex: sync::Mutex::new(0),
phantom_key: marker::PhantomData,
phantom_val: marker::PhantomData,
};
snap.config = snap.to_stats()?.into();
snap.config.vlog_file = snap.config.vlog_file.map(|vfile| {
let vfile = path::Path::new(&vfile).file_name().unwrap();
let ipath = Config::stitch_index_file(&dir, &name);
let mut vpath = path::PathBuf::new();
vpath.push(path::Path::new(&ipath).parent().unwrap());
vpath.push(vfile);
vpath.as_os_str().to_os_string()
});
snap.vlog_fd = snap
.config
.to_value_log(dir, name)
.as_ref()
.map(|s| util::open_file_r(s.as_ref()))
.transpose()?;
Ok(snap)
}
}
impl<K, V> Snapshot<K, V>
where
K: Clone + Ord + Serialize,
V: Clone + Diff + Serialize,
{
pub fn len(&self) -> usize {
self.to_stats().unwrap().n_count.try_into().unwrap()
}
pub fn to_seqno(&self) -> u64 {
self.to_stats().unwrap().seqno
}
pub fn to_app_meta(&self) -> Result<Vec<u8>> {
if let MetaItem::AppMetadata(data) = &self.meta[1] {
Ok(data.clone())
} else {
let msg = "snapshot app-metadata missing".to_string();
Err(Error::InvalidSnapshot(msg))
}
}
pub fn to_stats(&self) -> Result<Stats> {
if let MetaItem::Stats(stats) = &self.meta[2] {
Ok(stats.parse()?)
} else {
let msg = "snapshot statistics missing".to_string();
Err(Error::InvalidSnapshot(msg))
}
}
pub fn to_root(&self) -> Result<u64> {
if let MetaItem::Root(root) = self.meta[0] {
Ok(root)
} else {
Err(Error::InvalidSnapshot("snapshot root missing".to_string()))
}
}
}
impl<K, V> Index<K, V> for Snapshot<K, V>
where
K: Clone + Ord + Serialize + Footprint,
V: Clone + Diff + Serialize + Footprint,
<V as Diff>::D: Serialize,
{
type W = Snapshot<K, V>;
type R = Snapshot<K, V>;
fn make_new(&self) -> Result<Box<Self>> {
Ok(Box::new(Snapshot::open(
self.name.as_str(),
self.dir.as_str(),
)?))
}
fn to_reader(&mut self) -> Result<Self::R> {
Snapshot::open(&self.dir, &self.name)
}
fn to_writer(&mut self) -> Result<Self::W> {
panic!("write ops are not allowed in Read-Only-Btree snapshots!!");
}
}
impl<K, V> Footprint for Snapshot<K, V>
where
K: Clone + Ord + Serialize,
V: Clone + Diff + Serialize,
{
fn footprint(&self) -> isize {
let (dir, name) = (self.dir.as_str(), self.name.as_str());
let mut footprint = fs::metadata(self.config.to_index_file(dir, name))
.unwrap()
.len();
footprint += match self.config.to_value_log(dir, name) {
Some(vlog_file) => fs::metadata(vlog_file).unwrap().len(),
None => 0,
};
footprint.try_into().unwrap()
}
}
impl<K, V> Writer<K, V> for Snapshot<K, V>
where
K: Clone + Ord + Serialize + Footprint,
V: Clone + Diff + Serialize + Footprint,
{
fn set(&mut self, _key: K, _value: V) -> Result<Option<Entry<K, V>>> {
panic!("set operation not allwed on Read-Only-Btree snapshot !!");
}
fn set_cas(&mut self, _: K, _: V, _: u64) -> Result<Option<Entry<K, V>>> {
panic!("set operation not allwed on Read-Only-Btree snapshot !!");
}
fn delete<Q>(&mut self, _key: &Q) -> Result<Option<Entry<K, V>>>
where
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Ord + ?Sized,
{
panic!("set operation not allwed on Read-Only-Btree snapshot !!");
}
}
impl<K, V> Reader<K, V> for Snapshot<K, V>
where
K: Clone + Ord + Serialize,
V: Clone + Diff + Serialize,
<V as Diff>::D: Clone + Serialize,
{
fn get<Q>(&self, key: &Q) -> Result<Entry<K, V>>
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
let _lock = self.mutex.lock();
let snap = unsafe {
let snap = self as *const Snapshot<K, V> as *mut Snapshot<K, V>;
snap.as_mut().unwrap()
};
let versions = false;
snap.do_get(key, versions)
}
fn iter(&self) -> Result<IndexIter<K, V>> {
let _lock = self.mutex.lock();
let snap = unsafe {
let snap = self as *const Snapshot<K, V> as *mut Snapshot<K, V>;
snap.as_mut().unwrap()
};
let mut mzs = vec![];
snap.build_fwd(snap.to_root().unwrap(), &mut mzs)?;
Ok(Iter::new(snap, mzs))
}
fn range<'a, R, Q>(&'a self, range: R) -> Result<IndexIter<K, V>>
where
K: Borrow<Q>,
R: 'a + RangeBounds<Q>,
Q: 'a + Ord + ?Sized,
{
let _lock = self.mutex.lock();
let snap = unsafe {
let snap = self as *const Snapshot<K, V> as *mut Snapshot<K, V>;
snap.as_mut().unwrap()
};
let versions = false;
snap.do_range(range, versions)
}
fn reverse<'a, R, Q>(&'a self, range: R) -> Result<IndexIter<K, V>>
where
K: Borrow<Q>,
R: 'a + RangeBounds<Q>,
Q: 'a + Ord + ?Sized,
{
let _lock = self.mutex.lock();
let snap = unsafe {
let snap = self as *const Snapshot<K, V> as *mut Snapshot<K, V>;
snap.as_mut().unwrap()
};
let versions = false;
snap.do_reverse(range, versions)
}
fn get_with_versions<Q>(&self, key: &Q) -> Result<Entry<K, V>>
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
let _lock = self.mutex.lock();
let snap = unsafe {
let snap = self as *const Snapshot<K, V> as *mut Snapshot<K, V>;
snap.as_mut().unwrap()
};
let versions = true;
snap.do_get(key, versions)
}
fn iter_with_versions(&self) -> Result<IndexIter<K, V>> {
let _lock = self.mutex.lock();
let snap = unsafe {
let snap = self as *const Snapshot<K, V> as *mut Snapshot<K, V>;
snap.as_mut().unwrap()
};
let mut mzs = vec![];
snap.build_fwd(snap.to_root().unwrap(), &mut mzs)?;
Ok(Iter::new_versions(snap, mzs))
}
fn range_with_versions<'a, R, Q>(
&'a self,
range: R,
) -> Result<IndexIter<K, V>>
where
K: Borrow<Q>,
R: 'a + RangeBounds<Q>,
Q: 'a + Ord + ?Sized,
{
let _lock = self.mutex.lock();
let snap = unsafe {
let snap = self as *const Snapshot<K, V> as *mut Snapshot<K, V>;
snap.as_mut().unwrap()
};
let versions = true;
snap.do_range(range, versions)
}
fn reverse_with_versions<'a, R, Q>(
&'a self,
range: R,
) -> Result<IndexIter<K, V>>
where
K: Borrow<Q>,
R: 'a + RangeBounds<Q>,
Q: 'a + Ord + ?Sized,
{
let _lock = self.mutex.lock();
let snap = unsafe {
let snap = self as *const Snapshot<K, V> as *mut Snapshot<K, V>;
snap.as_mut().unwrap()
};
let versions = true;
snap.do_reverse(range, versions)
}
}
impl<K, V> Snapshot<K, V>
where
K: Clone + Ord + Serialize,
V: Clone + Diff + Serialize,
<V as Diff>::D: Clone + Serialize,
{
fn get_zpos<Q>(&mut self, key: &Q, fpos: u64) -> Result<u64>
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
let fd = &mut self.index_fd;
let mblock = MBlock::<K, V>::new_decode(fd, fpos, &self.config)?;
match mblock.get(key, Bound::Unbounded, Bound::Unbounded) {
Err(Error::__LessThan) => Err(Error::KeyNotFound),
Ok(mentry) if mentry.is_zblock() => Ok(mentry.to_fpos()),
Ok(mentry) => self.get_zpos(key, mentry.to_fpos()),
Err(err) => Err(err),
}
}
fn do_get<Q>(&mut self, key: &Q, versions: bool) -> Result<Entry<K, V>>
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
let zfpos = self.get_zpos(key, self.to_root().unwrap())?;
let fd = &mut self.index_fd;
let zblock: ZBlock<K, V> = ZBlock::new_decode(fd, zfpos, &self.config)?;
match zblock.find(key, Bound::Unbounded, Bound::Unbounded) {
Ok((_, entry)) => {
if entry.as_key().borrow().eq(key) {
self.fetch(entry, versions)
} else {
Err(Error::KeyNotFound)
}
}
Err(Error::__LessThan) => Err(Error::KeyNotFound),
Err(Error::__ZBlockExhausted(_)) => Err(Error::KeyNotFound),
Err(err) => Err(err),
}
}
fn do_range<'a, R, Q>(
&'a mut self,
range: R,
versions: bool,
) -> Result<IndexIter<K, V>>
where
K: Borrow<Q>,
R: 'a + RangeBounds<Q>,
Q: 'a + Ord + ?Sized,
{
let mut mzs = vec![];
let skip_one = match range.start_bound() {
Bound::Unbounded => {
self.build_fwd(self.to_root().unwrap(), &mut mzs)?;
false
}
Bound::Included(key) => {
let entry = self.build(key, &mut mzs)?;
match key.cmp(entry.as_key().borrow()) {
cmp::Ordering::Greater => true,
_ => false,
}
}
Bound::Excluded(key) => {
let entry = self.build(key, &mut mzs)?;
match key.cmp(entry.as_key().borrow()) {
cmp::Ordering::Equal | cmp::Ordering::Greater => true,
_ => false,
}
}
};
let mut r = Range::new(self, mzs, range, versions);
if skip_one {
r.next();
}
Ok(r)
}
fn do_reverse<'a, R, Q>(
&'a mut self,
range: R,
versions: bool,
) -> Result<IndexIter<K, V>>
where
K: Borrow<Q>,
R: 'a + RangeBounds<Q>,
Q: 'a + Ord + ?Sized,
{
let mut mzs = vec![];
let skip_one = match range.end_bound() {
Bound::Unbounded => {
self.build_rev(self.to_root().unwrap(), &mut mzs)?;
false
}
Bound::Included(key) => {
let entry = self.build(&key, &mut mzs)?;
match key.cmp(entry.as_key().borrow()) {
cmp::Ordering::Less => true,
_ => false,
}
}
Bound::Excluded(key) => {
let entry = self.build(&key, &mut mzs)?;
match key.cmp(entry.as_key().borrow()) {
cmp::Ordering::Less | cmp::Ordering::Equal => true,
_ => false,
}
}
};
let mut rr = Reverse::new(self, mzs, range, versions);
if skip_one {
rr.next();
}
Ok(rr)
}
fn build_fwd(
&mut self,
mut fpos: u64,
mzs: &mut Vec<MZ<K, V>>,
) -> Result<()> {
let fd = &mut self.index_fd;
let config = &self.config;
let zfpos = loop {
let mblock = MBlock::<K, V>::new_decode(fd, fpos, config)?;
mzs.push(MZ::M { fpos, index: 0 });
let mentry = mblock.to_entry(0)?;
if mentry.is_zblock() {
break mentry.to_fpos();
}
fpos = mentry.to_fpos();
};
let zblock = ZBlock::new_decode(fd, zfpos, config)?;
mzs.push(MZ::Z { zblock, index: 0 });
Ok(())
}
fn rebuild_fwd(&mut self, mzs: &mut Vec<MZ<K, V>>) -> Result<()> {
let fd = &mut self.index_fd;
let config = &self.config;
match mzs.pop() {
None => Ok(()),
Some(MZ::M { fpos, mut index }) => {
let mblock = MBlock::<K, V>::new_decode(fd, fpos, config)?;
index += 1;
match mblock.to_entry(index) {
Ok(MEntry::DecZ { fpos: zfpos, .. }) => {
mzs.push(MZ::M { fpos, index });
let zblock = ZBlock::new_decode(fd, zfpos, config)?;
mzs.push(MZ::Z { zblock, index: 0 });
Ok(())
}
Ok(MEntry::DecM { fpos: mfpos, .. }) => {
mzs.push(MZ::M { fpos, index });
self.build_fwd(mfpos, mzs)?;
Ok(())
}
Err(Error::__MBlockExhausted(_)) => self.rebuild_fwd(mzs),
_ => unreachable!(),
}
}
Some(MZ::Z { .. }) => unreachable!(),
}
}
fn build_rev(
&mut self,
mut fpos: u64,
mzs: &mut Vec<MZ<K, V>>,
) -> Result<()> {
let fd = &mut self.index_fd;
let config = &self.config;
let zfpos = loop {
let mblock = MBlock::<K, V>::new_decode(fd, fpos, config)?;
let index = mblock.len() - 1;
mzs.push(MZ::M { fpos, index });
let mentry = mblock.to_entry(index)?;
if mentry.is_zblock() {
break mentry.to_fpos();
}
fpos = mentry.to_fpos();
};
let zblock = ZBlock::new_decode(fd, zfpos, config)?;
let index: isize = (zblock.len() - 1).try_into().unwrap();
mzs.push(MZ::Z { zblock, index });
Ok(())
}
fn rebuild_rev(&mut self, mzs: &mut Vec<MZ<K, V>>) -> Result<()> {
let fd = &mut self.index_fd;
let config = &self.config;
match mzs.pop() {
None => Ok(()),
Some(MZ::M { index: 0, .. }) => self.rebuild_rev(mzs),
Some(MZ::M { fpos, mut index }) => {
let mblock = MBlock::<K, V>::new_decode(fd, fpos, config)?;
index -= 1;
match mblock.to_entry(index) {
Ok(MEntry::DecZ { fpos: zfpos, .. }) => {
mzs.push(MZ::M { fpos, index });
let zblock = ZBlock::new_decode(fd, zfpos, config)?;
let idx: isize = (zblock.len() - 1).try_into().unwrap();
mzs.push(MZ::Z { zblock, index: idx });
Ok(())
}
Ok(MEntry::DecM { fpos: mfpos, .. }) => {
mzs.push(MZ::M { fpos, index });
self.build_rev(mfpos, mzs)?;
Ok(())
}
_ => unreachable!(),
}
}
Some(MZ::Z { .. }) => unreachable!(),
}
}
fn build<Q>(
&mut self,
key: &Q,
mzs: &mut Vec<MZ<K, V>>,
) -> Result<Entry<K, V>>
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
let mut fpos = self.to_root().unwrap();
let fd = &mut self.index_fd;
let config = &self.config;
let (from_min, to_max) = (Bound::Unbounded, Bound::Unbounded);
let zfpos = loop {
let mblock = MBlock::<K, V>::new_decode(fd, fpos, config)?;
let mentry = match mblock.find(key, from_min, to_max) {
Ok(mentry) => Ok(mentry),
Err(Error::__LessThan) => mblock.to_entry(0),
Err(err) => Err(err),
}?;
let index = mentry.to_index();
mzs.push(MZ::M { fpos, index });
if mentry.is_zblock() {
break mentry.to_fpos();
}
fpos = mentry.to_fpos();
};
let zblock = ZBlock::new_decode(fd, zfpos, config)?;
let (index, entry) = match zblock.find(key, from_min, to_max) {
Ok((index, entry)) => Ok((index, entry)),
Err(Error::__LessThan) => zblock.to_entry(0),
Err(Error::__ZBlockExhausted(index)) => {
let (_, entry) = zblock.to_entry(index)?;
Ok((index, entry))
}
Err(err) => Err(err),
}?;
mzs.push(MZ::Z {
zblock,
index: index.try_into().unwrap(),
});
Ok(entry)
}
fn fetch(
&mut self,
mut entry: Entry<K, V>,
versions: bool,
) -> Result<Entry<K, V>> {
match &mut self.vlog_fd {
Some(fd) => entry.fetch_value(fd)?,
_ => (),
}
if versions {
match &mut self.vlog_fd {
Some(fd) => entry.fetch_deltas(fd)?,
_ => (),
}
}
Ok(entry)
}
}
pub struct Iter<'a, K, V>
where
K: Clone + Ord + Serialize,
V: Clone + Diff + Serialize,
<V as Diff>::D: Clone + Serialize,
{
snap: &'a mut Snapshot<K, V>,
mzs: Vec<MZ<K, V>>,
versions: bool,
}
impl<'a, K, V> Iter<'a, K, V>
where
K: Clone + Ord + Serialize,
V: Clone + Diff + Serialize,
<V as Diff>::D: Clone + Serialize,
{
fn new(snap: &'a mut Snapshot<K, V>, mzs: Vec<MZ<K, V>>) -> Box<Self> {
Box::new(Iter {
snap,
mzs,
versions: false,
})
}
fn new_versions(
snap: &'a mut Snapshot<K, V>,
mzs: Vec<MZ<K, V>>,
) -> Box<Self> {
Box::new(Iter {
snap,
mzs,
versions: true,
})
}
}
impl<'a, K, V> Iterator for Iter<'a, K, V>
where
K: Clone + Ord + Serialize,
V: Clone + Diff + Serialize,
<V as Diff>::D: Clone + Serialize,
{
type Item = Result<Entry<K, V>>;
fn next(&mut self) -> Option<Result<Entry<K, V>>> {
match self.mzs.pop() {
None => None,
Some(mut z) => match z.next() {
Some(Ok(entry)) => {
self.mzs.push(z);
Some(self.snap.fetch(entry, self.versions))
}
Some(Err(err)) => {
self.mzs.truncate(0);
Some(Err(err))
}
None => match self.snap.rebuild_fwd(&mut self.mzs) {
Err(err) => Some(Err(err)),
Ok(_) => self.next(),
},
},
}
}
}
pub struct Range<'a, K, V, R, Q>
where
K: Clone + Ord + Borrow<Q> + Serialize,
V: Clone + Diff + Serialize,
<V as Diff>::D: Clone + Serialize,
R: RangeBounds<Q>,
Q: Ord + ?Sized,
{
snap: &'a mut Snapshot<K, V>,
mzs: Vec<MZ<K, V>>,
range: R,
high: marker::PhantomData<Q>,
versions: bool,
}
impl<'a, K, V, R, Q> Range<'a, K, V, R, Q>
where
K: Clone + Ord + Borrow<Q> + Serialize,
V: Clone + Diff + Serialize,
<V as Diff>::D: Clone + Serialize,
R: RangeBounds<Q>,
Q: Ord + ?Sized,
{
fn new(
snap: &'a mut Snapshot<K, V>,
mzs: Vec<MZ<K, V>>,
range: R,
versions: bool,
) -> Box<Self> {
Box::new(Range {
snap,
mzs,
range,
high: marker::PhantomData,
versions,
})
}
fn till_ok(&self, entry: &Entry<K, V>) -> bool {
match self.range.end_bound() {
Bound::Unbounded => true,
Bound::Included(key) => entry.as_key().borrow().le(key),
Bound::Excluded(key) => entry.as_key().borrow().lt(key),
}
}
}
impl<'a, K, V, R, Q> Iterator for Range<'a, K, V, R, Q>
where
K: Clone + Ord + Borrow<Q> + Serialize,
V: Clone + Diff + Serialize,
<V as Diff>::D: Clone + Serialize,
R: RangeBounds<Q>,
Q: Ord + ?Sized,
{
type Item = Result<Entry<K, V>>;
fn next(&mut self) -> Option<Result<Entry<K, V>>> {
match self.mzs.pop() {
None => None,
Some(mut z) => match z.next() {
Some(Ok(entry)) => {
if self.till_ok(&entry) {
self.mzs.push(z);
Some(self.snap.fetch(entry, self.versions))
} else {
self.mzs.truncate(0);
None
}
}
Some(Err(err)) => {
self.mzs.truncate(0);
Some(Err(err))
}
None => match self.snap.rebuild_fwd(&mut self.mzs) {
Err(err) => Some(Err(err)),
Ok(_) => self.next(),
},
},
}
}
}
pub struct Reverse<'a, K, V, R, Q>
where
K: Clone + Ord + Borrow<Q> + Serialize,
V: Clone + Diff + Serialize,
<V as Diff>::D: Clone + Serialize,
R: RangeBounds<Q>,
Q: Ord + ?Sized,
{
snap: &'a mut Snapshot<K, V>,
mzs: Vec<MZ<K, V>>,
range: R,
low: marker::PhantomData<Q>,
versions: bool,
}
impl<'a, K, V, R, Q> Reverse<'a, K, V, R, Q>
where
K: Clone + Ord + Borrow<Q> + Serialize,
V: Clone + Diff + Serialize,
<V as Diff>::D: Clone + Serialize,
R: RangeBounds<Q>,
Q: Ord + ?Sized,
{
fn new(
snap: &'a mut Snapshot<K, V>,
mzs: Vec<MZ<K, V>>,
range: R,
versions: bool,
) -> Box<Self> {
Box::new(Reverse {
snap,
mzs,
range,
low: marker::PhantomData,
versions,
})
}
fn till_ok(&self, entry: &Entry<K, V>) -> bool {
match self.range.start_bound() {
Bound::Unbounded => true,
Bound::Included(key) => entry.as_key().borrow().ge(key),
Bound::Excluded(key) => entry.as_key().borrow().gt(key),
}
}
}
impl<'a, K, V, R, Q> Iterator for Reverse<'a, K, V, R, Q>
where
K: Clone + Ord + Borrow<Q> + Serialize,
V: Clone + Diff + Serialize,
<V as Diff>::D: Clone + Serialize,
R: RangeBounds<Q>,
Q: Ord + ?Sized,
{
type Item = Result<Entry<K, V>>;
fn next(&mut self) -> Option<Result<Entry<K, V>>> {
match self.mzs.pop() {
None => None,
Some(mut z) => match z.next_back() {
Some(Err(err)) => {
self.mzs.truncate(0);
Some(Err(err))
}
Some(Ok(entry)) => {
if self.till_ok(&entry) {
self.mzs.push(z);
Some(self.snap.fetch(entry, self.versions))
} else {
self.mzs.truncate(0);
None
}
}
None => match self.snap.rebuild_rev(&mut self.mzs) {
Err(err) => Some(Err(err)),
Ok(_) => self.next(),
},
},
}
}
}
enum MZ<K, V>
where
K: Clone + Ord + Serialize,
V: Clone + Diff + Serialize,
<V as Diff>::D: Clone + Serialize,
{
M { fpos: u64, index: usize },
Z { zblock: ZBlock<K, V>, index: isize },
}
impl<K, V> Iterator for MZ<K, V>
where
K: Clone + Ord + Serialize,
V: Clone + Diff + Serialize,
<V as Diff>::D: Clone + Serialize,
{
type Item = Result<Entry<K, V>>;
fn next(&mut self) -> Option<Result<Entry<K, V>>> {
match self {
MZ::Z { zblock, index } => {
let undex: usize = (*index).try_into().unwrap();
match zblock.to_entry(undex) {
Ok((_, entry)) => {
*index += 1;
Some(Ok(entry))
}
Err(Error::__ZBlockExhausted(_)) => None,
Err(err) => Some(Err(err)),
}
}
MZ::M { .. } => unreachable!(),
}
}
}
impl<K, V> DoubleEndedIterator for MZ<K, V>
where
K: Clone + Ord + Serialize,
V: Clone + Diff + Serialize,
<V as Diff>::D: Clone + Serialize,
{
fn next_back(&mut self) -> Option<Result<Entry<K, V>>> {
match self {
MZ::Z { zblock, index } if *index >= 0 => {
let undex: usize = (*index).try_into().unwrap();
match zblock.to_entry(undex) {
Ok((_, entry)) => {
*index -= 1;
Some(Ok(entry))
}
Err(Error::__ZBlockExhausted(_)) => None,
Err(err) => Some(Err(err)),
}
}
MZ::Z { .. } => None,
MZ::M { .. } => unreachable!(),
}
}
}
#[cfg(test)]
#[path = "robt_test.rs"]
mod robt_test;