use crate::{
Error, ReadOptions, WriteBatch,
db::{DB, DBAccess},
ffi,
};
use libc::{c_char, c_uchar, size_t};
use std::mem::ManuallyDrop;
use std::{marker::PhantomData, slice};
pub type DBRawIterator<'a> = DBRawIteratorWithThreadMode<'a, DB>;
pub struct DBRawIteratorWithThreadMode<'a, D: DBAccess> {
inner: std::ptr::NonNull<ffi::rocksdb_iterator_t>,
readopts: ReadOptions,
db: PhantomData<&'a D>,
}
impl<'a, D: DBAccess> DBRawIteratorWithThreadMode<'a, D> {
pub(crate) fn new(db: &D, readopts: ReadOptions) -> Self {
let inner = unsafe { db.create_iterator(&readopts) };
Self::from_inner(inner, readopts)
}
pub(crate) fn new_cf(
db: &'a D,
cf_handle: *mut ffi::rocksdb_column_family_handle_t,
readopts: ReadOptions,
) -> Self {
let inner = unsafe { db.create_iterator_cf(cf_handle, &readopts) };
Self::from_inner(inner, readopts)
}
pub(crate) fn from_inner(inner: *mut ffi::rocksdb_iterator_t, readopts: ReadOptions) -> Self {
let inner = std::ptr::NonNull::new(inner).unwrap();
Self {
inner,
readopts,
db: PhantomData,
}
}
pub(crate) fn into_inner(self) -> (std::ptr::NonNull<ffi::rocksdb_iterator_t>, ReadOptions) {
let value = ManuallyDrop::new(self);
let inner = unsafe { std::ptr::read(&raw const value.inner) };
let readopts = unsafe { std::ptr::read(&raw const value.readopts) };
(inner, readopts)
}
pub fn valid(&self) -> bool {
unsafe { ffi::rocksdb_iter_valid(self.inner.as_ptr()) != 0 }
}
pub fn status(&self) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_iter_get_error(self.inner.as_ptr()));
}
Ok(())
}
pub fn refresh(&mut self) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_iter_refresh(self.inner.as_ptr()));
}
Ok(())
}
pub fn seek_to_first(&mut self) {
unsafe {
ffi::rocksdb_iter_seek_to_first(self.inner.as_ptr());
}
}
pub fn seek_to_last(&mut self) {
unsafe {
ffi::rocksdb_iter_seek_to_last(self.inner.as_ptr());
}
}
pub fn seek<K: AsRef<[u8]>>(&mut self, key: K) {
let key = key.as_ref();
unsafe {
ffi::rocksdb_iter_seek(
self.inner.as_ptr(),
key.as_ptr() as *const c_char,
key.len() as size_t,
);
}
}
pub fn seek_for_prev<K: AsRef<[u8]>>(&mut self, key: K) {
let key = key.as_ref();
unsafe {
ffi::rocksdb_iter_seek_for_prev(
self.inner.as_ptr(),
key.as_ptr() as *const c_char,
key.len() as size_t,
);
}
}
pub fn next(&mut self) {
if self.valid() {
unsafe {
ffi::rocksdb_iter_next(self.inner.as_ptr());
}
}
}
pub fn prev(&mut self) {
if self.valid() {
unsafe {
ffi::rocksdb_iter_prev(self.inner.as_ptr());
}
}
}
pub fn key(&self) -> Option<&[u8]> {
if self.valid() {
Some(unsafe { self.key_impl() })
} else {
None
}
}
pub fn value(&self) -> Option<&[u8]> {
if self.valid() {
Some(unsafe { self.value_impl() })
} else {
None
}
}
pub fn item(&self) -> Option<(&[u8], &[u8])> {
if self.valid() {
Some(unsafe { (self.key_impl(), self.value_impl()) })
} else {
None
}
}
unsafe fn key_impl(&self) -> &[u8] {
unsafe {
let slice = ffi::rocksdb_iter_key_slice(self.inner.as_ptr());
slice::from_raw_parts(slice.data as *const c_uchar, slice.size)
}
}
unsafe fn value_impl(&self) -> &[u8] {
unsafe {
let slice = ffi::rocksdb_iter_value_slice(self.inner.as_ptr());
slice::from_raw_parts(slice.data as *const c_uchar, slice.size)
}
}
pub unsafe fn timestamp(&self) -> &[u8] {
unsafe {
let slice = ffi::rocksdb_iter_timestamp_slice(self.inner.as_ptr());
slice::from_raw_parts(slice.data as *const c_uchar, slice.size)
}
}
}
impl<D: DBAccess> Drop for DBRawIteratorWithThreadMode<'_, D> {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_iter_destroy(self.inner.as_ptr());
}
}
}
unsafe impl<D: DBAccess> Send for DBRawIteratorWithThreadMode<'_, D> {}
unsafe impl<D: DBAccess> Sync for DBRawIteratorWithThreadMode<'_, D> {}
pub type DBIterator<'a> = DBIteratorWithThreadMode<'a, DB>;
pub struct DBIteratorWithThreadMode<'a, D: DBAccess> {
raw: DBRawIteratorWithThreadMode<'a, D>,
direction: Direction,
done: bool,
}
#[derive(Copy, Clone)]
pub enum Direction {
Forward,
Reverse,
}
pub type KVBytes = (Box<[u8]>, Box<[u8]>);
#[derive(Copy, Clone)]
pub enum IteratorMode<'a> {
Start,
End,
From(&'a [u8], Direction),
}
impl<'a, D: DBAccess> DBIteratorWithThreadMode<'a, D> {
pub(crate) fn new(db: &D, readopts: ReadOptions, mode: IteratorMode) -> Self {
Self::from_raw(DBRawIteratorWithThreadMode::new(db, readopts), mode)
}
pub(crate) fn new_cf(
db: &'a D,
cf_handle: *mut ffi::rocksdb_column_family_handle_t,
readopts: ReadOptions,
mode: IteratorMode,
) -> Self {
Self::from_raw(
DBRawIteratorWithThreadMode::new_cf(db, cf_handle, readopts),
mode,
)
}
fn from_raw(raw: DBRawIteratorWithThreadMode<'a, D>, mode: IteratorMode) -> Self {
let mut rv = DBIteratorWithThreadMode {
raw,
direction: Direction::Forward, done: false,
};
rv.set_mode(mode);
rv
}
pub fn set_mode(&mut self, mode: IteratorMode) {
self.done = false;
self.direction = match mode {
IteratorMode::Start => {
self.raw.seek_to_first();
Direction::Forward
}
IteratorMode::End => {
self.raw.seek_to_last();
Direction::Reverse
}
IteratorMode::From(key, Direction::Forward) => {
self.raw.seek(key);
Direction::Forward
}
IteratorMode::From(key, Direction::Reverse) => {
self.raw.seek_for_prev(key);
Direction::Reverse
}
};
}
pub fn refresh(&mut self, mode: IteratorMode) -> Result<(), Error> {
self.raw.refresh()?;
self.set_mode(mode);
Ok(())
}
}
impl<D: DBAccess> Iterator for DBIteratorWithThreadMode<'_, D> {
type Item = Result<KVBytes, Error>;
fn next(&mut self) -> Option<Result<KVBytes, Error>> {
if self.done {
None
} else if let Some((key, value)) = self.raw.item() {
let item = (Box::from(key), Box::from(value));
match self.direction {
Direction::Forward => self.raw.next(),
Direction::Reverse => self.raw.prev(),
}
Some(Ok(item))
} else {
self.done = true;
self.raw.status().err().map(Result::Err)
}
}
}
impl<D: DBAccess> std::iter::FusedIterator for DBIteratorWithThreadMode<'_, D> {}
impl<'a, D: DBAccess> Into<DBRawIteratorWithThreadMode<'a, D>> for DBIteratorWithThreadMode<'a, D> {
fn into(self) -> DBRawIteratorWithThreadMode<'a, D> {
self.raw
}
}
pub struct DBWALIterator {
pub(crate) inner: *mut ffi::rocksdb_wal_iterator_t,
pub(crate) start_seq_number: u64,
}
impl DBWALIterator {
pub fn valid(&self) -> bool {
unsafe { ffi::rocksdb_wal_iter_valid(self.inner) != 0 }
}
pub fn status(&self) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_wal_iter_status(self.inner));
}
Ok(())
}
}
impl Iterator for DBWALIterator {
type Item = Result<(u64, WriteBatch), Error>;
fn next(&mut self) -> Option<Self::Item> {
if !self.valid() {
return None;
}
let mut seq: u64 = 0;
let mut batch = WriteBatch {
inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &raw mut seq) },
};
while seq <= self.start_seq_number {
unsafe {
ffi::rocksdb_wal_iter_next(self.inner);
}
if !self.valid() {
return None;
}
batch = WriteBatch {
inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &raw mut seq) },
};
}
if !self.valid() {
return self.status().err().map(Result::Err);
}
unsafe {
ffi::rocksdb_wal_iter_next(self.inner);
}
Some(Ok((seq, batch)))
}
}
impl Drop for DBWALIterator {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_wal_iter_destroy(self.inner);
}
}
}