use crate::skiplist;
use crate::skiplist::{record_size, FIRST_BINDING_SIZE};
use crate::transaction::{Cow, Env, Exclusive};
use crate::{rc, Cursor, Db, Error, MutTxn, PageItem, PageT, Representable, PAGE_SIZE_U16};
use rand::Rng;
use std;
use std::borrow::Borrow;
impl<E: Borrow<Env<Exclusive>>, T> MutTxn<E, T> {
pub fn put<R: Rng, K: Representable, V: Representable>(
&mut self,
rng: &mut R,
db: &mut Db<K, V>,
key: K,
value: V,
) -> Result<bool, Error> {
let mut cursor = Cursor::new();
if cursor.set(self, db, Some((key, Some(value))))? {
return Ok(false);
}
let leaf_pointer = cursor.pointer;
let last_allocated = self.put_cascade(rng, &mut cursor, key, value)?;
self.free_split_pages(rng, &mut cursor, leaf_pointer)?;
self.update_split_root(rng, &mut cursor, last_allocated, db)?;
debug!("put done");
Ok(true)
}
fn put_cascade<R: Rng, K: Representable, V: Representable>(
&mut self,
rng: &mut R,
cursor: &mut Cursor,
key: K,
value: V,
) -> Result<u64, Error> {
let mut last_allocated = 0;
let mut inserted_right_child = 0;
let mut inserted_key = key;
let mut inserted_value = value;
let mut inserted_left_child = 0u64;
loop {
let mut page = self.load_cow_page(cursor.current().page)?;
debug!("page: {:?}", page);
if cursor.pointer == cursor.first_rc_level {
let page = cursor.current().page;
let rc = rc(self, page)?;
if rc >= 2 {
self.set_rc(rng, page, rc - 1)?;
}
}
if skiplist::can_alloc(&page, inserted_key, inserted_value) != 0 {
self.put_can_alloc(
rng,
page,
cursor,
&mut inserted_key,
&mut inserted_left_child,
&mut inserted_right_child,
&mut inserted_value,
&mut last_allocated,
)?;
break;
} else {
debug!("cannot alloc");
self.put_split(
rng,
&mut page,
cursor,
&mut inserted_key,
&mut inserted_left_child,
&mut inserted_right_child,
&mut inserted_value,
)?;
cursor.pointer -= 1;
if cursor.pointer == 0 {
last_allocated = self.split_root(
rng,
cursor,
inserted_left_child,
inserted_right_child,
inserted_key,
inserted_value,
)?;
break;
}
}
}
Ok(last_allocated)
}
fn put_can_alloc<R: Rng, K: Representable, V: Representable>(
&mut self,
rng: &mut R,
mut page: Cow,
cursor: &mut Cursor,
inserted_key: &mut K,
inserted_left_child: &mut u64,
inserted_right_child: &mut u64,
inserted_value: &mut V,
last_allocated: &mut u64,
) -> Result<(), Error> {
debug!("can alloc");
let needs_dup = skiplist::first_free(&page) + record_size(*inserted_key, *inserted_value)
> PAGE_SIZE_U16;
match page {
Cow::MutPage(ref mut page) if cursor.pointer < cursor.first_rc_level && !needs_dup => {
debug!("does not need dup");
debug!("page right child: {:?}", cursor.current()[0]);
self.set_right_child(page, cursor.current()[0], *inserted_left_child);
self.skiplist_insert_after_cursor(
page,
rng,
&mut cursor.current_mut().cursor,
*inserted_key,
*inserted_value,
*inserted_right_child,
);
*last_allocated = page.page_offset();
}
_ => {
debug!(
"needs dup {:?} {:?} {:?}",
cursor.pointer, cursor.first_rc_level, needs_dup
);
let page = self.load_cow_page(cursor.current().page)?;
let it = Iter {
iter: skiplist::iter_all(&page).peekable(),
off: page.offset(cursor.current()[0] as isize),
next_is_extra: false,
extra_left: *inserted_left_child,
extra: (
Some((*inserted_key, *inserted_value)),
*inserted_right_child,
),
};
let incr_rc = cursor.pointer >= cursor.first_rc_level;
*last_allocated = self.spread_on_one_page(
rng,
it.map(|(b, c)| {
let fresh = c == *inserted_left_child || c == *inserted_right_child;
PageItem {
ptr: std::ptr::null(),
kv: b,
right: c,
incr_right_rc: incr_rc && !fresh,
incr_kv_rc: incr_rc,
}
}),
)?;
if cursor.pointer < cursor.first_rc_level {
unsafe {
self.free_page(cursor.current().page)
}
}
}
}
Ok(())
}
fn put_split<R: Rng, K: Representable, V: Representable>(
&mut self,
rng: &mut R,
page: &mut Cow,
cursor: &mut Cursor,
inserted_key: &mut K,
inserted_left_child: &mut u64,
inserted_right_child: &mut u64,
inserted_value: &mut V,
) -> Result<(), Error> {
let it = Iter {
iter: skiplist::iter_all(page).peekable(),
off: page.offset(cursor.current()[0] as isize),
next_is_extra: false,
extra_left: *inserted_left_child,
extra: (
Some((*inserted_key, *inserted_value)),
*inserted_right_child,
),
};
let total = skiplist::occupied(page)
+ record_size(*inserted_key, *inserted_value)
+ FIRST_BINDING_SIZE;
let (left, right, sep_key, sep_value) = if cursor.pointer >= cursor.first_rc_level {
let it = it.map(|(b, c)| {
if c == *inserted_left_child || c == *inserted_right_child {
PageItem {
ptr: std::ptr::null(),
kv: b,
right: c,
incr_right_rc: false,
incr_kv_rc: true,
}
} else {
PageItem {
ptr: std::ptr::null(),
kv: b,
right: c,
incr_right_rc: true,
incr_kv_rc: true,
}
}
});
self.spread_on_two_pages(rng, it, total)?
} else {
let it = it.map(|(b, c)| PageItem {
ptr: std::ptr::null(),
kv: b,
right: c,
incr_right_rc: false,
incr_kv_rc: false,
});
self.spread_on_two_pages(rng, it, total)?
};
*inserted_key = sep_key;
*inserted_left_child = left;
*inserted_right_child = right;
*inserted_value = sep_value;
Ok(())
}
fn update_put<R: Rng, K: Representable, V: Representable>(
&mut self,
rng: &mut R,
cursor: &mut Cursor,
) -> Result<(), Error> {
let mut page = self.load_cow_page(cursor.current().page)?;
match page {
Cow::MutPage(ref mut p) if cursor.pointer < cursor.first_rc_level => {
self.set_right_child(p, cursor.current()[0], cursor[cursor.pointer + 1].page);
cursor.current_mut().page = p.offset;
}
Cow::MutPage(_) | Cow::Page(_) => {
let incr_rc = cursor.pointer >= cursor.first_rc_level;
let current_ptr = page.offset(cursor.current()[0] as isize);
let new_page = self.spread_on_one_page::<_, K, V, _>(
rng,
skiplist::iter_all(&page).map(|(off, b, c)| {
let mut item = PageItem {
ptr: std::ptr::null(),
kv: b,
right: c,
incr_kv_rc: incr_rc,
incr_right_rc: incr_rc,
};
if off == current_ptr {
item.right = cursor[cursor.pointer + 1].page;
item.incr_right_rc = false
}
item
}),
)?;
if cursor.pointer < cursor.first_rc_level {
unsafe {
debug!("line {:?}, freeing {:?}", line!(), page.page_offset());
self.free_page(page.page_offset())
}
}
debug!("NEW_PAGE = {:?}", new_page);
cursor.current_mut().page = new_page;
}
}
Ok(())
}
fn free_split_pages<R: Rng>(
&mut self,
rng: &mut R,
cursor: &mut Cursor,
leaf_pointer: usize,
) -> Result<(), Error> {
debug!(
"freeing pages from {:?} to {:?} {:?} (inclusive)",
cursor.pointer + 1,
leaf_pointer,
cursor.first_rc_level
);
let last_free = std::cmp::min(leaf_pointer + 1, cursor.first_rc_level);
let first_free = std::cmp::min(cursor.pointer + 1, last_free);
for page_cursor in &cursor.stack[first_free..last_free] {
let rc = rc(self, page_cursor.page)?;
if rc <= 1 {
debug!("Freeing {:?}", page_cursor.page);
self.remove_rc(rng, page_cursor.page)?;
unsafe { self.free_page(page_cursor.page) }
} else {
debug!(
"Decrease RC for page {:?}, new rc: {:?}",
page_cursor.page,
rc - 1
);
self.set_rc(rng, page_cursor.page, rc - 1)?;
}
}
debug!("freed");
Ok(())
}
fn update_split_root<R: Rng, K: Representable, V: Representable>(
&mut self,
rng: &mut R,
cursor: &mut Cursor,
last_allocated: u64,
db: &mut Db<K, V>,
) -> Result<(), Error> {
if cursor.pointer > 0 {
cursor.stack[cursor.pointer].page = last_allocated;
cursor.pointer -= 1;
while cursor.pointer > 0 {
if cursor.pointer == cursor.first_rc_level {
let page = cursor.current().page;
let rc = rc(self, page)?;
if rc >= 2 {
self.set_rc(rng, page, rc - 1)?;
}
}
self.update_put::<R, K, V>(rng, cursor)?;
cursor.pointer -= 1;
}
db.0 = cursor[1].page
} else {
debug!("the root has split");
db.0 = last_allocated
}
Ok(())
}
}
struct Iter<'a, P: PageT + Sized + 'a, K: Representable, V: Representable> {
iter: std::iter::Peekable<skiplist::Iter<'a, P, K, V>>,
off: *const u8,
next_is_extra: bool,
extra_left: u64,
extra: (Option<(K, V)>, u64),
}
impl<'a, P: PageT + Sized + 'a, K: Representable, V: Representable> Iterator for Iter<'a, P, K, V> {
type Item = (Option<(K, V)>, u64);
fn next(&mut self) -> Option<Self::Item> {
if self.next_is_extra {
debug!("next_is_extra");
self.next_is_extra = false;
self.off = std::ptr::null();
Some(self.extra)
} else {
debug!("not next_is_extra");
if let Some((a, b, d)) = self.iter.next() {
debug!("{:?}", a);
if a == self.off {
self.next_is_extra = true;
Some((b, self.extra_left))
} else {
Some((b, d))
}
} else {
debug!("finished, extra = {:?}", self.off);
None
}
}
}
}