use std::marker::PhantomData;
use noxu_bind::ByteArrayBinding;
use noxu_bind::EntryBinding;
use noxu_db::{
Cursor, CursorConfig, Database, DatabaseEntry, Get, OperationStatus,
Transaction,
};
use crate::error::{CollectionError, Result};
pub(crate) static BYTE_ARRAY_BINDING: ByteArrayBinding = ByteArrayBinding;
pub(crate) fn open_cursor<'a>(
db: &Database,
txn: Option<&'a Transaction>,
config: Option<&CursorConfig>,
) -> Result<Cursor<'a>> {
match txn {
Some(t) => Ok(db.open_cursor_in(t, config)?),
None => Ok(db.open_cursor(config)?),
}
}
pub(crate) fn db_get(
db: &Database,
txn: Option<&Transaction>,
key: &DatabaseEntry,
) -> Result<Option<Vec<u8>>> {
let k = key.data_opt().unwrap_or(&[]);
let found = match txn {
Some(t) => db.get_in(t, k)?,
None => db.get(k)?,
};
Ok(found.map(|b| b.to_vec()))
}
pub(crate) fn db_put(
db: &Database,
txn: Option<&Transaction>,
key: &DatabaseEntry,
data: &DatabaseEntry,
) -> Result<()> {
let k = key.data_opt().unwrap_or(&[]);
let v = data.data_opt().unwrap_or(&[]);
match txn {
Some(t) => db.put_in(t, k, v)?,
None => db.put(k, v)?,
}
Ok(())
}
pub(crate) fn db_put_no_overwrite(
db: &Database,
txn: Option<&Transaction>,
key: &DatabaseEntry,
data: &DatabaseEntry,
) -> Result<bool> {
let k = key.data_opt().unwrap_or(&[]);
let v = data.data_opt().unwrap_or(&[]);
let inserted = match txn {
Some(t) => db.put_no_overwrite_in(t, k, v)?,
None => db.put_no_overwrite(k, v)?,
};
Ok(inserted)
}
pub(crate) fn db_delete(
db: &Database,
txn: Option<&Transaction>,
key: &DatabaseEntry,
) -> Result<bool> {
let k = key.data_opt().unwrap_or(&[]);
let deleted = match txn {
Some(t) => db.delete_in(t, k)?,
None => db.delete(k)?,
};
Ok(deleted)
}
pub(crate) fn encode_key<K, KB: EntryBinding<K>>(
binding: &KB,
key: &K,
) -> Result<DatabaseEntry> {
let mut entry = DatabaseEntry::new();
binding
.object_to_entry(key, &mut entry)
.map_err(|e| CollectionError::BindingError(e.to_string()))?;
Ok(entry)
}
pub(crate) fn encode_value<V, VB: EntryBinding<V>>(
binding: &VB,
value: &V,
) -> Result<DatabaseEntry> {
let mut entry = DatabaseEntry::new();
binding
.object_to_entry(value, &mut entry)
.map_err(|e| CollectionError::BindingError(e.to_string()))?;
Ok(entry)
}
pub(crate) fn decode_key<K, KB: EntryBinding<K>>(
binding: &KB,
entry: &DatabaseEntry,
) -> Result<K> {
binding
.entry_to_object(entry)
.map_err(|e| CollectionError::BindingError(e.to_string()))
}
pub(crate) fn decode_value<V, VB: EntryBinding<V>>(
binding: &VB,
entry: &DatabaseEntry,
) -> Result<V> {
binding
.entry_to_object(entry)
.map_err(|e| CollectionError::BindingError(e.to_string()))
}
#[derive(Copy, Clone, Debug)]
pub(crate) enum ScanDirection {
Forward,
Reverse,
}
pub(crate) type StartKey<'a> = Option<&'a [u8]>;
#[allow(clippy::type_complexity)]
pub(crate) fn scan_records<'a, K, V, KB, VB, T, F>(
db: &Database,
txn: Option<&'a Transaction>,
start: StartKey<'a>,
direction: ScanDirection,
key_binding: &KB,
value_binding: &VB,
mut project: F,
) -> Result<Vec<T>>
where
KB: EntryBinding<K>,
VB: EntryBinding<V>,
F: FnMut(K, V) -> T,
{
let mut out: Vec<T> = Vec::new();
let mut cursor = open_cursor(db, txn, None)?;
let mut key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
let initial_op = match direction {
ScanDirection::Forward => Get::First,
ScanDirection::Reverse => Get::Last,
};
let mut status = cursor.get(&mut key, &mut data, initial_op, None)?;
if !matches!(status, OperationStatus::Success) {
let _ = cursor.close();
return Ok(out);
}
if let Some(bound) = start {
loop {
let cur = key.data_opt().unwrap_or(&[]);
let in_range = match direction {
ScanDirection::Forward => cur >= bound,
ScanDirection::Reverse => cur <= bound,
};
if in_range {
break;
}
let step = match direction {
ScanDirection::Forward => Get::Next,
ScanDirection::Reverse => Get::Prev,
};
status = cursor.get(&mut key, &mut data, step, None)?;
if !matches!(status, OperationStatus::Success) {
let _ = cursor.close();
return Ok(out);
}
}
}
loop {
let k = decode_key(key_binding, &key)?;
let v = decode_value(value_binding, &data)?;
out.push(project(k, v));
let step = match direction {
ScanDirection::Forward => Get::Next,
ScanDirection::Reverse => Get::Prev,
};
match cursor.get(&mut key, &mut data, step, None)? {
OperationStatus::Success => continue,
_ => break,
}
}
cursor.close()?;
Ok(out)
}
pub(crate) struct ScanIter<'a, K, V, KB, VB, T, F>
where
KB: EntryBinding<K>,
VB: EntryBinding<V>,
F: FnMut(K, V) -> T,
{
cursor: Cursor<'a>,
key_binding: &'a KB,
value_binding: &'a VB,
start: Option<Vec<u8>>,
direction: ScanDirection,
project: F,
started: bool,
done: bool,
_marker: PhantomData<fn() -> (K, V, T)>,
}
impl<'a, K, V, KB, VB, T, F> ScanIter<'a, K, V, KB, VB, T, F>
where
KB: EntryBinding<K>,
VB: EntryBinding<V>,
F: FnMut(K, V) -> T,
{
fn step_op(&self) -> Get {
match self.direction {
ScanDirection::Forward => Get::Next,
ScanDirection::Reverse => Get::Prev,
}
}
fn initial_op(&self) -> Get {
match self.direction {
ScanDirection::Forward => Get::First,
ScanDirection::Reverse => Get::Last,
}
}
fn in_range(&self, cur: &[u8]) -> bool {
match (&self.start, self.direction) {
(None, _) => true,
(Some(bound), ScanDirection::Forward) => cur >= bound.as_slice(),
(Some(bound), ScanDirection::Reverse) => cur <= bound.as_slice(),
}
}
}
impl<'a, K, V, KB, VB, T, F> Iterator for ScanIter<'a, K, V, KB, VB, T, F>
where
KB: EntryBinding<K>,
VB: EntryBinding<V>,
F: FnMut(K, V) -> T,
{
type Item = Result<T>;
fn next(&mut self) -> Option<Self::Item> {
if self.done {
return None;
}
let mut key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
loop {
let op = if self.started {
self.step_op()
} else {
self.started = true;
self.initial_op()
};
match self.cursor.get(&mut key, &mut data, op, None) {
Err(e) => {
self.done = true;
return Some(Err(e.into()));
}
Ok(OperationStatus::Success) => {
let cur = key.data_opt().unwrap_or(&[]);
if !self.in_range(cur) {
continue;
}
let k = match decode_key(self.key_binding, &key) {
Ok(k) => k,
Err(e) => {
self.done = true;
return Some(Err(e));
}
};
let v = match decode_value(self.value_binding, &data) {
Ok(v) => v,
Err(e) => {
self.done = true;
return Some(Err(e));
}
};
return Some(Ok((self.project)(k, v)));
}
Ok(_) => {
self.done = true;
return None;
}
}
}
}
}
#[allow(clippy::type_complexity)]
pub(crate) fn scan_iter_owned_start<'a, K, V, KB, VB, T, F>(
db: &Database,
txn: Option<&'a Transaction>,
start: Option<Vec<u8>>,
direction: ScanDirection,
key_binding: &'a KB,
value_binding: &'a VB,
project: F,
) -> Result<ScanIter<'a, K, V, KB, VB, T, F>>
where
KB: EntryBinding<K>,
VB: EntryBinding<V>,
F: FnMut(K, V) -> T,
{
let cursor = open_cursor(db, txn, None)?;
Ok(ScanIter {
cursor,
key_binding,
value_binding,
start,
direction,
project,
started: false,
done: false,
_marker: PhantomData,
})
}
#[allow(clippy::type_complexity)]
pub(crate) fn scan_iter<'a, K, V, KB, VB, T, F>(
db: &Database,
txn: Option<&'a Transaction>,
start: StartKey<'a>,
direction: ScanDirection,
key_binding: &'a KB,
value_binding: &'a VB,
project: F,
) -> Result<ScanIter<'a, K, V, KB, VB, T, F>>
where
KB: EntryBinding<K>,
VB: EntryBinding<V>,
F: FnMut(K, V) -> T,
{
scan_iter_owned_start(
db,
txn,
start.map(|s| s.to_vec()),
direction,
key_binding,
value_binding,
project,
)
}
pub(crate) fn cursor_endpoint<K, V, KB, VB>(
db: &Database,
txn: Option<&Transaction>,
key_binding: &KB,
value_binding: &VB,
which: Get,
) -> Result<Option<(K, V)>>
where
KB: EntryBinding<K>,
VB: EntryBinding<V>,
{
let mut cursor = open_cursor(db, txn, None)?;
let mut key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
let status = cursor.get(&mut key, &mut data, which, None)?;
let result = match status {
OperationStatus::Success => {
let k = decode_key(key_binding, &key)?;
let v = decode_value(value_binding, &data)?;
Some((k, v))
}
_ => None,
};
cursor.close()?;
Ok(result)
}