use std::ops;
use pagecache::{Measure, M};
use super::*;
fn lower_bound_includes<'a>(
lb: &ops::Bound<Vec<u8>>,
item: &'a [u8],
) -> bool {
match lb {
ops::Bound::Included(ref start) => start.as_slice() <= item,
ops::Bound::Excluded(ref start) => start.as_slice() < item,
ops::Bound::Unbounded => true,
}
}
fn upper_bound_includes<'a>(
ub: &ops::Bound<Vec<u8>>,
item: &'a [u8],
) -> bool {
match ub {
ops::Bound::Included(ref end) => item <= end.as_ref(),
ops::Bound::Excluded(ref end) => item < end.as_ref(),
ops::Bound::Unbounded => true,
}
}
pub struct Keys<'a>(Iter<'a>);
impl<'a> Iterator for Keys<'a> {
type Item = Result<Vec<u8>, ()>;
fn next(&mut self) -> Option<Self::Item> {
self.0.next().map(|r| r.map(|(k, _v)| k))
}
}
impl<'a> DoubleEndedIterator for Keys<'a> {
fn next_back(&mut self) -> Option<Self::Item> {
self.0.next_back().map(|r| r.map(|(k, _v)| k))
}
}
pub struct Values<'a>(Iter<'a>);
impl<'a> Iterator for Values<'a> {
type Item = Result<PinnedValue, ()>;
fn next(&mut self) -> Option<Self::Item> {
self.0.next().map(|r| r.map(|(_k, v)| v))
}
}
impl<'a> DoubleEndedIterator for Values<'a> {
fn next_back(&mut self) -> Option<Self::Item> {
self.0.next_back().map(|r| r.map(|(_k, v)| v))
}
}
pub struct Iter<'a> {
pub(super) tree: &'a Tree,
pub(super) hi: ops::Bound<Vec<u8>>,
pub(super) lo: ops::Bound<Vec<u8>>,
pub(super) last_id: Option<PageId>,
pub(super) last_key: Option<Key>,
pub(super) broken: Option<Error<()>>,
pub(super) done: bool,
pub(super) guard: Guard,
pub(super) is_scan: bool,
}
impl<'a> Iter<'a> {
pub fn keys(self) -> Keys<'a> {
Keys(self)
}
pub fn values(self) -> Values<'a> {
Values(self)
}
}
impl<'a> Iterator for Iter<'a> {
type Item = Result<(Vec<u8>, PinnedValue), ()>;
fn next(&mut self) -> Option<Self::Item> {
let _measure = Measure::new(&M.tree_scan);
if self.done {
return None;
} else if let Some(broken) = self.broken.take() {
self.done = true;
return Some(Err(broken));
};
let start_bound = &self.lo;
let start: &[u8] = match start_bound {
ops::Bound::Included(ref start)
| ops::Bound::Excluded(ref start) => start.as_ref(),
ops::Bound::Unbounded => b"",
};
loop {
let value_guard = pin();
if self.last_id.is_none() {
let path_res =
self.tree.path_for_key(start, &self.guard);
if let Err(e) = path_res {
error!("iteration failed: {:?}", e);
self.done = true;
return Some(Err(e.danger_cast()));
}
let path = path_res.unwrap();
let (last_frag, _tree_ptr) =
path.last().expect("path should never be empty");
let last_node = last_frag.unwrap_base();
self.last_id = Some(last_node.id);
}
let last_id = self.last_id.unwrap();
let inclusive = match self.lo {
ops::Bound::Unbounded | ops::Bound::Included(..) => {
true
}
ops::Bound::Excluded(..) => false,
};
let res = self
.tree
.pages
.get(last_id, &self.guard)
.map(|page_get| page_get.unwrap());
if let Err(e) = res {
error!("iteration failed: {:?}", e);
self.done = true;
return Some(Err(e.danger_cast()));
}
let (frag, _ptr) = res.unwrap();
let node = frag.unwrap_base();
let leaf =
node.data.leaf_ref().expect("node should be a leaf");
let prefix = &node.lo;
let search = if inclusive && self.last_key.is_none() {
leaf.binary_search_by(|&(ref k, ref _v)| {
prefix_cmp_encoded(k, start, prefix)
})
.ok()
.or_else(|| {
binary_search_gt(leaf, |&(ref k, ref _v)| {
prefix_cmp_encoded(k, start, prefix)
})
})
} else {
let last_key = &self.last_key;
let search_key: &[u8] = if let Some(lk) = last_key {
lk.as_ref()
} else {
start
};
binary_search_gt(leaf, |&(ref k, ref _v)| {
prefix_cmp_encoded(k, search_key, prefix)
})
};
if let Some(idx) = search {
let (k, v) = &leaf[idx];
let decoded_k = prefix_decode(prefix, &k);
if !upper_bound_includes(&self.hi, &*decoded_k) {
return None;
}
self.last_key = Some(decoded_k.clone());
let ret = Ok((
decoded_k,
PinnedValue::new(&*v, value_guard),
));
return Some(ret);
}
if !node.hi.is_empty()
&& !upper_bound_includes(&self.hi, &node.hi)
{
return None;
}
match node.next {
Some(id) => self.last_id = Some(id),
None => {
assert_eq!(
node.hi,
vec![],
"if a node has no right sibling, \
it must be the upper-bound node"
);
return None;
}
}
}
}
}
impl<'a> DoubleEndedIterator for Iter<'a> {
fn next_back(&mut self) -> Option<Self::Item> {
let _measure = Measure::new(&M.tree_scan);
if self.done {
return None;
} else if let Some(broken) = self.broken.take() {
self.done = true;
return Some(Err(broken));
};
let end_bound = &self.hi;
let start_bound = &self.lo;
let (end, unbounded): (&[u8], bool) = if self.is_scan {
match start_bound {
ops::Bound::Included(ref start)
| ops::Bound::Excluded(ref start) => {
(start.as_ref(), false)
}
ops::Bound::Unbounded => (&[255; 100], true),
}
} else {
match end_bound {
ops::Bound::Included(ref start)
| ops::Bound::Excluded(ref start) => {
(start.as_ref(), false)
}
ops::Bound::Unbounded => (&[255; 100], true),
}
};
loop {
let value_guard = pin();
if self.last_id.is_none() {
let path_res =
self.tree.path_for_key(end, &self.guard);
if let Err(e) = path_res {
error!("iteration failed: {:?}", e);
self.done = true;
return Some(Err(e.danger_cast()));
}
let path = path_res.unwrap();
let (last_frag, _tree_ptr) =
path.last().expect("path should never be empty");
let mut last_node = last_frag.unwrap_base();
while unbounded && !last_node.hi.is_empty() {
let res = self
.tree
.pages
.get(last_node.next.unwrap(), &self.guard)
.map(|page_get| page_get.unwrap());
if let Err(e) = res {
error!("iteration failed: {:?}", e);
self.done = true;
return Some(Err(e.danger_cast()));
}
let (frag, _ptr) = res.unwrap();
last_node = frag.unwrap_base();
}
self.last_id = Some(last_node.id);
}
let last_id = self.last_id.unwrap();
let inclusive = match self.hi {
ops::Bound::Unbounded | ops::Bound::Included(..) => {
true
}
ops::Bound::Excluded(..) => false,
};
let res = self
.tree
.pages
.get(last_id, &self.guard)
.map(|page_get| page_get.unwrap());
if let Err(e) = res {
error!("iteration failed: {:?}", e);
self.done = true;
return Some(Err(e.danger_cast()));
}
let (frag, _ptr) = res.unwrap();
let node = frag.unwrap_base();
let leaf =
node.data.leaf_ref().expect("node should be a leaf");
let prefix = &node.lo;
let search = if inclusive && self.last_key.is_none() {
if unbounded {
if leaf.is_empty() {
None
} else {
Some(leaf.len() - 1)
}
} else {
binary_search_lub(leaf, |&(ref k, ref _v)| {
prefix_cmp_encoded(k, end, prefix)
})
}
} else {
let last_key = &self.last_key;
let search_key: &[u8] = if let Some(lk) = last_key {
lk.as_ref()
} else {
end
};
binary_search_lt(leaf, |&(ref k, ref _v)| {
prefix_cmp_encoded(k, search_key, prefix)
})
};
if let Some(idx) = search {
let (k, v) = &leaf[idx];
let decoded_k = prefix_decode(prefix, &k);
if !self.is_scan
&& !lower_bound_includes(&self.lo, &*decoded_k)
{
return None;
}
self.last_key = Some(decoded_k.clone());
let ret = Ok((
decoded_k,
PinnedValue::new(&*v, value_guard),
));
return Some(ret);
}
if !self.is_scan
&& !lower_bound_includes(&self.lo, &node.lo)
{
return None;
}
let pred = possible_predecessor(prefix)?;
let mut next_node =
match self.tree.path_for_key(pred, &self.guard) {
Err(e) => {
error!("next_back iteration failed: {:?}", e);
self.done = true;
return Some(Err(e.danger_cast()));
}
Ok(path) => path.last().unwrap().0.unwrap_base(),
};
while next_node.next != Some(node.id)
&& next_node.lo < node.lo
{
let res = self
.tree
.pages
.get(next_node.next?, &self.guard)
.map(|page_get| page_get.unwrap());
if let Err(e) = res {
error!("iteration failed: {:?}", e);
self.done = true;
return Some(Err(e.danger_cast()));
}
let (frag, _ptr) = res.unwrap();
next_node = frag.unwrap_base();
}
self.last_id = Some(next_node.id);
}
}
}
fn possible_predecessor(s: &[u8]) -> Option<Vec<u8>> {
let mut ret = s.to_vec();
match ret.pop() {
None => None,
Some(i) if i == 0 => Some(ret),
Some(i) => {
ret.push(i - 1);
for _ in 0..4 {
ret.push(255);
}
Some(ret)
}
}
}
#[test]
fn test_possible_predecessor() {
assert_eq!(possible_predecessor(b""), None);
assert_eq!(possible_predecessor(&[0]), Some(vec![]));
assert_eq!(possible_predecessor(&[0, 0]), Some(vec![0]));
assert_eq!(
possible_predecessor(&[0, 1]),
Some(vec![0, 0, 255, 255, 255, 255])
);
assert_eq!(
possible_predecessor(&[0, 2]),
Some(vec![0, 1, 255, 255, 255, 255])
);
assert_eq!(possible_predecessor(&[1, 0]), Some(vec![1]));
assert_eq!(
possible_predecessor(&[1, 1]),
Some(vec![1, 0, 255, 255, 255, 255])
);
assert_eq!(
possible_predecessor(&[155]),
Some(vec![154, 255, 255, 255, 255])
);
}