use crate::transaction::LoadPage;
use crate::{Exclusive, MutTxn, PageT, Representable, PAGE_SIZE};
use std;
use std::marker::PhantomData;
const VALUE_HEADER_LEN: usize = 8;
use std::borrow::Borrow;
#[derive(Clone, Copy)]
pub enum UnsafeValue {
Direct {
p: *const u8,
len: u64,
},
Large {
offset: u64,
len: u64,
},
}
impl std::fmt::Debug for UnsafeValue {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match *self {
UnsafeValue::Direct { p, len } => {
let x = std::str::from_utf8(unsafe {
std::slice::from_raw_parts(p, std::cmp::min(len as usize, 10))
});
if let Ok(x) = x {
write!(f, "UnsafeValue::Direct({:?})", x)?
} else {
write!(f, "UnsafeValue::Direct(Invalid UTF8, len {:?})", len)?
}
}
UnsafeValue::Large { offset, len } => {
write!(f, "UnsafeValue::Large({:?}, {:?})", offset, len)?
}
}
Ok(())
}
}
pub const MAX_INLINE_SIZE: u64 = 506;
impl Representable for UnsafeValue {
fn onpage_size(&self) -> u16 {
match *self {
UnsafeValue::Direct { len, .. } => len as u16 + 8,
UnsafeValue::Large { .. } => 16,
}
}
unsafe fn write_value(&self, ptr: *mut u8) {
debug!("write_value {:?}", ptr);
match *self {
UnsafeValue::Direct { len, p } => {
assert!(len <= MAX_INLINE_SIZE);
{
let ptr = ptr as *mut u64;
*ptr = len.to_le();
}
std::ptr::copy_nonoverlapping(p, ptr.offset(8), len as usize)
}
UnsafeValue::Large { len, offset } => {
assert!(len > MAX_INLINE_SIZE);
let ptr = ptr as *mut u64;
*ptr = len.to_le();
*(ptr.offset(1)) = offset.to_le()
}
}
}
unsafe fn read_value(ptr: *const u8) -> Self {
debug!("read_value {:?}", ptr);
let len = u64::from_le(*(ptr as *const u64));
if len <= MAX_INLINE_SIZE {
UnsafeValue::Direct {
len: len,
p: ptr.offset(8),
}
} else {
let ptr = ptr as *const u64;
UnsafeValue::Large {
len: len,
offset: u64::from_le(*(ptr.offset(1))),
}
}
}
unsafe fn cmp_value<T: LoadPage>(&self, txn: &T, y: Self) -> std::cmp::Ordering {
let x = Value::from_unsafe(self, txn);
let y = Value::from_unsafe(&y, txn);
x.cmp(y)
}
fn drop_value<E: Borrow<crate::transaction::Env<Exclusive>>, T, R>(
&self,
txn: &mut MutTxn<E, T>,
_: &mut R,
) -> Result<(), super::Error> {
match *self {
UnsafeValue::Large {
mut offset,
mut len,
} => {
debug!("drop value {:?} {:?}", offset, len);
loop {
let page = txn.load_page(offset).offset(0);
if len > PAGE_SIZE as u64 {
len -= PAGE_SIZE as u64 - VALUE_HEADER_LEN as u64;
unsafe {
let next_offset = u64::from_le(*(page as *const u64));
txn.free_page(offset);
offset = next_offset
}
} else {
unsafe {
txn.free_page(offset);
}
break;
}
}
}
_ => {}
}
Ok(())
}
type PageOffsets = std::option::IntoIter<u64>;
fn page_offsets(&self) -> Self::PageOffsets {
match *self {
UnsafeValue::Large { offset, .. } => Some(offset).into_iter(),
UnsafeValue::Direct { .. } => None.into_iter(),
}
}
}
impl<'b, T: LoadPage + 'b> PartialEq<[u8]> for Value<'b, T> {
fn eq(&self, b: &[u8]) -> bool {
let a: Value<T> = self.clone();
let b: Value<T> = Value::Direct {
p: b.as_ptr(),
len: b.len() as u64,
txn: PhantomData,
};
a.cmp(b) == std::cmp::Ordering::Equal
}
}
impl<'a, 'b, T: LoadPage + 'b> PartialEq<&'a [u8]> for Value<'b, T> {
fn eq(&self, b: &&[u8]) -> bool {
let a: Value<T> = self.clone();
let b: Value<T> = Value::Direct {
p: b.as_ptr(),
len: b.len() as u64,
txn: PhantomData,
};
a.cmp(b) == std::cmp::Ordering::Equal
}
}
impl<'b, T: LoadPage + 'b> PartialEq<Value<'b, T>> for [u8] {
fn eq(&self, b: &Value<'b, T>) -> bool {
let a: Value<T> = Value::Direct {
p: self.as_ptr(),
len: self.len() as u64,
txn: PhantomData,
};
let b: Value<T> = b.clone();
a.cmp(b) == std::cmp::Ordering::Equal
}
}
impl<'a, 'b, T: LoadPage + 'b> PartialEq<Value<'b, T>> for &'a [u8] {
fn eq(&self, b: &Value<'b, T>) -> bool {
let a: Value<T> = Value::Direct {
p: self.as_ptr(),
len: self.len() as u64,
txn: PhantomData,
};
let b: Value<T> = b.clone();
a.cmp(b) == std::cmp::Ordering::Equal
}
}
impl<'a, 'b, A: LoadPage + 'a, B: LoadPage + 'b> PartialEq<Value<'a, A>> for Value<'b, B> {
fn eq(&self, b: &Value<A>) -> bool {
let a = self.clone();
let b = b.clone();
for (a, b) in a.zip(b) {
if a != b {
return false;
}
}
true
}
}
impl<'b, T: LoadPage + 'b> Eq for Value<'b, T> {}
impl<'b, T: LoadPage + 'b> std::hash::Hash for Value<'b, T> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
let s = self.clone();
for slice in s {
slice.hash(state)
}
}
}
#[derive(Clone, Copy)]
pub enum Value<'a, T: 'a> {
Direct {
p: *const u8,
len: u64,
txn: PhantomData<&'a T>,
},
Large {
txn: &'a T,
offset: u64,
len: u64,
},
}
impl<'a, T: LoadPage> std::fmt::Debug for Value<'a, T> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let it: Value<_> = self.clone();
write!(f, "Value ({:?}) {{ value: [", self.len())?;
let mut first = true;
for x in it {
if !first {
write!(f, ", {:?}", std::str::from_utf8(x))?
} else {
write!(f, "{:?}", std::str::from_utf8(x))?;
first = false;
}
}
write!(f, "] }}")?;
Ok(())
}
}
impl<'a, T: LoadPage> Iterator for Value<'a, T> {
type Item = &'a [u8];
fn next(&mut self) -> Option<&'a [u8]> {
match self {
&mut Value::Large {
ref txn,
ref mut offset,
ref mut len,
} => {
debug!("iterator: {:?}, {:?}", offset, len);
if *len == 0 {
None
} else {
if *len <= PAGE_SIZE as u64 {
unsafe {
let page = txn.load_page(*offset).offset(0);
let slice = std::slice::from_raw_parts(page.offset(0), *len as usize);
*len = 0;
Some(slice)
}
} else {
unsafe {
let page = txn.load_page(*offset).offset(0);
*offset = u64::from_le(*(page as *const u64));
let l = PAGE_SIZE - VALUE_HEADER_LEN as u32;
*len -= l as u64;
Some(std::slice::from_raw_parts(
page.offset(VALUE_HEADER_LEN as isize),
l as usize,
))
}
}
}
}
&mut Value::Direct {
ref mut len,
ref mut p,
..
} => unsafe {
if p.is_null() {
None
} else if *len <= PAGE_SIZE as u64 {
let s = std::slice::from_raw_parts(*p, *len as usize);
*p = std::ptr::null_mut();
Some(s)
} else {
let l = PAGE_SIZE as usize - VALUE_HEADER_LEN;
let s = std::slice::from_raw_parts(*p, l);
*p = p.offset(l as isize);
*len -= l as u64;
Some(s)
}
},
}
}
}
impl UnsafeValue {
pub fn len(&self) -> u64 {
match self {
&UnsafeValue::Direct { len, .. } => len,
&UnsafeValue::Large { len, .. } => len,
}
}
pub unsafe fn as_slice<'a>(&self) -> &'a [u8] {
match self {
&UnsafeValue::Direct { p, len } => std::slice::from_raw_parts(p, len as usize),
&UnsafeValue::Large { .. } => unimplemented!(),
}
}
pub fn from_slice(slice: &[u8]) -> UnsafeValue {
UnsafeValue::Direct {
p: slice.as_ptr(),
len: slice.len() as u64,
}
}
pub fn alloc_if_needed<E: Borrow<crate::transaction::Env<Exclusive>>, T>(
txn: &mut MutTxn<E, T>,
value: &[u8],
) -> Result<UnsafeValue, super::Error> {
if value.len() > MAX_INLINE_SIZE as usize {
Self::alloc_large(txn, value)
} else {
Ok(UnsafeValue::Direct {
p: value.as_ptr(),
len: value.len() as u64,
})
}
}
fn alloc_large<E: Borrow<crate::transaction::Env<Exclusive>>, T>(
txn: &mut MutTxn<E, T>,
value: &[u8],
) -> Result<UnsafeValue, super::Error> {
debug!("alloc_value");
let mut len = value.len();
let mut p_value = value.as_ptr();
let mut page = txn.alloc_page()?;
let first_page = page.page_offset();
unsafe {
loop {
if len <= PAGE_SIZE as usize {
std::ptr::copy_nonoverlapping(p_value, page.offset(0), len);
break;
} else {
std::ptr::copy_nonoverlapping(p_value, page.offset(8), PAGE_SIZE as usize - 8);
p_value = p_value.offset((PAGE_SIZE - 8) as isize);
len -= PAGE_SIZE as usize - 8;
let next_page = txn.alloc_page()?;
*(page.offset(0) as *mut u64) = next_page.page_offset().to_le();
page = next_page
}
}
}
debug_assert!(first_page > 0);
debug!("/alloc_value");
Ok(UnsafeValue::Large {
offset: first_page,
len: value.len() as u64,
})
}
}
impl<'a, T: LoadPage> Value<'a, T> {
pub fn len(&self) -> u64 {
match self {
&Value::Direct { len, .. } => len,
&Value::Large { len, .. } => len,
}
}
pub fn clone(&self) -> Value<'a, T> {
match self {
&Value::Direct { p, len, txn } => Value::Direct {
len: len,
p: p,
txn: txn,
},
&Value::Large { offset, len, txn } => Value::Large {
len: len,
offset: offset,
txn: txn,
},
}
}
pub unsafe fn from_unsafe(u: &UnsafeValue, txn: &'a T) -> Value<'a, T> {
match *u {
UnsafeValue::Direct { p, len } => Value::Direct {
p: p,
len: len,
txn: PhantomData,
},
UnsafeValue::Large { offset, len } => Value::Large {
len: len,
offset: offset,
txn: txn,
},
}
}
pub fn to_unsafe(&self) -> UnsafeValue {
match *self {
Value::Direct { p, len, .. } => {
assert!(len <= MAX_INLINE_SIZE);
UnsafeValue::Direct { p: p, len: len }
}
Value::Large { offset, len, .. } => {
assert!(len > MAX_INLINE_SIZE);
UnsafeValue::Large {
len: len,
offset: offset,
}
}
}
}
pub fn from_slice(slice: &'a [u8]) -> Value<'a, T> {
Value::Direct {
p: slice.as_ptr(),
len: slice.len() as u64,
txn: PhantomData,
}
}
pub fn as_slice(&self) -> &'a [u8] {
match self {
&Value::Direct { p, len, .. } => unsafe { std::slice::from_raw_parts(p, len as usize) },
&Value::Large { .. } => unimplemented!(),
}
}
pub fn try_as_slice(&self) -> Option<&'a [u8]> {
match self {
&Value::Direct { p, len, .. } => {
Some(unsafe { std::slice::from_raw_parts(p, len as usize) })
}
&Value::Large { .. } => None,
}
}
pub fn into_cow(&'a self) -> std::borrow::Cow<'a, [u8]> {
match self {
&Value::Direct { p, len, .. } => {
std::borrow::Cow::Borrowed(unsafe { std::slice::from_raw_parts(p, len as usize) })
}
&Value::Large { .. } => {
let mut v = Vec::new();
for chunk in self.clone() {
v.extend(chunk)
}
std::borrow::Cow::Owned(v)
}
}
}
}