use rand::Rng;
use skiplist;
use skiplist::{record_size, FIRST_BINDING_SIZE};
use std;
use transaction::Cow;
use {rc, Cursor, Db, Error, MutTxn, PageItem, PageT, Representable, PAGE_SIZE_U16};
impl<'env, T> MutTxn<'env, T> {
pub fn put<R: Rng, K: Representable, V: Representable>(
&mut self,
rng: &mut R,
db: &mut Db<K, V>,
key: K,
value: V,
) -> Result<bool, Error> {
let mut cursor = Cursor::new();
if cursor.set(self, db, Some((key, Some(value))))? {
return Ok(false);
}
let leaf_pointer = cursor.pointer;
let last_allocated;
{
let mut inserted_right_child = 0;
let mut inserted_key = key;
let mut inserted_value = value;
let mut inserted_left_child = 0u64;
loop {
let mut page = self.load_cow_page(cursor.current().page);
debug!("page: {:?}", page);
if cursor.pointer == cursor.first_rc_level {
let page = cursor.current().page;
let rc = rc(self, page);
if rc >= 2 {
try!(self.set_rc(rng, page, rc - 1));
}
}
if skiplist::can_alloc(&page, inserted_key, inserted_value) != 0 {
debug!("can alloc");
let needs_dup = skiplist::first_free(&page)
+ record_size(inserted_key, inserted_value)
> PAGE_SIZE_U16;
match page {
Cow::MutPage(ref mut page)
if cursor.pointer < cursor.first_rc_level && !needs_dup =>
{
debug!("does not need dup");
debug!("page right child: {:?}", cursor.current()[0]);
page.set_right_child(cursor.current()[0], inserted_left_child);
page.skiplist_insert_after_cursor(
rng,
&mut cursor.current_mut().cursor,
inserted_key,
inserted_value,
inserted_right_child,
);
last_allocated = page.page_offset();
}
_ => {
debug!(
"needs dup {:?} {:?} {:?}",
cursor.pointer, cursor.first_rc_level, needs_dup
);
let page = self.load_cow_page(cursor.current().page);
let it = Iter {
iter: skiplist::iter_all(&page).peekable(),
off: page.offset(cursor.current()[0] as isize),
next_is_extra: false,
extra_left: inserted_left_child,
extra: (Some((inserted_key, inserted_value)), inserted_right_child),
};
let incr_rc = cursor.pointer >= cursor.first_rc_level;
last_allocated = self.spread_on_one_page(
rng,
it.map(|(b, c)| {
let fresh =
c == inserted_left_child || c == inserted_right_child;
PageItem {
ptr: std::ptr::null(),
kv: b,
right: c,
incr_right_rc: incr_rc && !fresh,
incr_kv_rc: incr_rc,
}
}),
)?;
if cursor.pointer < cursor.first_rc_level {
unsafe {
self.free_page(cursor.current().page)
}
}
}
}
break;
} else {
debug!("cannot alloc");
let (left, right, sep_key, sep_value) = {
let it = Iter {
iter: skiplist::iter_all(&page).peekable(),
off: page.offset(cursor.current()[0] as isize),
next_is_extra: false,
extra_left: inserted_left_child,
extra: (Some((inserted_key, inserted_value)), inserted_right_child),
};
let total = skiplist::occupied(&page)
+ record_size(key, value)
+ FIRST_BINDING_SIZE;
if cursor.pointer >= cursor.first_rc_level {
let it = it.map(|(b, c)| {
if c == inserted_left_child || c == inserted_right_child {
PageItem {
ptr: std::ptr::null(),
kv: b,
right: c,
incr_right_rc: false,
incr_kv_rc: true,
}
} else {
PageItem {
ptr: std::ptr::null(),
kv: b,
right: c,
incr_right_rc: true,
incr_kv_rc: true,
}
}
});
self.spread_on_two_pages(rng, it, total)?
} else {
let it = it.map(|(b, c)| PageItem {
ptr: std::ptr::null(),
kv: b,
right: c,
incr_right_rc: false,
incr_kv_rc: false,
});
self.spread_on_two_pages(rng, it, total)?
}
};
inserted_key = sep_key;
inserted_left_child = left;
inserted_right_child = right;
inserted_value = sep_value;
cursor.pointer -= 1;
if cursor.pointer == 0 {
last_allocated = try!(self.split_root(
rng,
&mut cursor,
inserted_left_child,
inserted_right_child,
inserted_key,
inserted_value
));
break;
}
}
}
}
debug!(
"freeing pages from {:?} to {:?} {:?} (inclusive)",
cursor.pointer + 1,
leaf_pointer,
cursor.first_rc_level
);
let last_free = std::cmp::min(leaf_pointer + 1, cursor.first_rc_level);
let first_free = std::cmp::min(cursor.pointer + 1, last_free);
for page_cursor in &cursor.stack[first_free..last_free] {
let rc = rc(self, page_cursor.page);
if rc <= 1 {
debug!("Freeing {:?}", page_cursor.page);
try!(self.remove_rc(rng, page_cursor.page));
unsafe { self.free_page(page_cursor.page) }
} else {
debug!(
"Decrease RC for page {:?}, new rc: {:?}",
page_cursor.page,
rc - 1
);
try!(self.set_rc(rng, page_cursor.page, rc - 1));
}
}
if cursor.pointer > 0 {
cursor.stack[cursor.pointer].page = last_allocated;
cursor.pointer -= 1;
while cursor.pointer > 0 {
if cursor.pointer == cursor.first_rc_level {
let page = cursor.current().page;
let rc = rc(self, page);
if rc >= 2 {
try!(self.set_rc(rng, page, rc - 1));
}
}
try!(self.update_put::<R, K, V>(rng, &mut cursor));
cursor.pointer -= 1;
}
db.0 = cursor[1].page
} else {
debug!("the root has split");
db.0 = last_allocated
}
Ok(true)
}
fn update_put<R: Rng, K: Representable, V: Representable>(
&mut self,
rng: &mut R,
cursor: &mut Cursor,
) -> Result<(), Error> {
let mut page = self.load_cow_page(cursor.current().page);
match page {
Cow::MutPage(ref mut p) if cursor.pointer < cursor.first_rc_level => {
p.set_right_child(cursor.current()[0], cursor[cursor.pointer + 1].page);
cursor.current_mut().page = p.offset;
}
Cow::MutPage(_) | Cow::Page(_) => {
let incr_rc = cursor.pointer >= cursor.first_rc_level;
let current_ptr = page.offset(cursor.current()[0] as isize);
let new_page = self.spread_on_one_page::<_, K, V, _>(
rng,
skiplist::iter_all(&page).map(|(off, b, c)| {
let mut item = PageItem {
ptr: std::ptr::null(),
kv: b,
right: c,
incr_kv_rc: incr_rc,
incr_right_rc: incr_rc,
};
if off == current_ptr {
item.right = cursor[cursor.pointer + 1].page;
item.incr_right_rc = false
}
item
}),
)?;
if cursor.pointer < cursor.first_rc_level {
unsafe {
debug!("line {:?}, freeing {:?}", line!(), page.page_offset());
self.free_page(page.page_offset())
}
}
debug!("NEW_PAGE = {:?}", new_page);
cursor.current_mut().page = new_page;
}
}
Ok(())
}
}
struct Iter<'a, P: PageT + Sized + 'a, K: Representable, V: Representable> {
iter: std::iter::Peekable<skiplist::Iter<'a, P, K, V>>,
off: *const u8,
next_is_extra: bool,
extra_left: u64,
extra: (Option<(K, V)>, u64),
}
impl<'a, P: PageT + Sized + 'a, K: Representable, V: Representable> Iterator for Iter<'a, P, K, V> {
type Item = (Option<(K, V)>, u64);
fn next(&mut self) -> Option<Self::Item> {
if self.next_is_extra {
debug!("next_is_extra");
self.next_is_extra = false;
self.off = std::ptr::null();
Some(self.extra)
} else {
debug!("not next_is_extra");
if let Some((a, b, d)) = self.iter.next() {
debug!("{:?}", a);
if a == self.off {
self.next_is_extra = true;
Some((b, self.extra_left))
} else {
Some((b, d))
}
} else {
debug!("finished, extra = {:?}", self.off);
None
}
}
}
}