use crate::inner::Inner;
use crate::options::DatabaseOptions;
#[cfg(not(target_arch = "wasm32"))]
use crate::options::{
DEFAULT_CLEANUP_INTERVAL, DEFAULT_GC_FULL_SCAN_FREQUENCY, DEFAULT_GC_INTERVAL,
};
#[cfg(not(target_arch = "wasm32"))]
use crate::persistence::Persistence;
use crate::pool::Pool;
use crate::pool::DEFAULT_POOL_SIZE;
use crate::tx::Transaction;
use std::ops::Deref;
use std::sync::atomic::{fence, Ordering};
use std::sync::Arc;
use std::time::Duration;
pub struct Database {
inner: Arc<Inner>,
pool: Arc<Pool>,
#[cfg(not(target_arch = "wasm32"))]
persistence: Option<Persistence>,
#[cfg(not(target_arch = "wasm32"))]
gc_interval: Duration,
#[cfg(not(target_arch = "wasm32"))]
gc_full_scan_frequency: u64,
#[cfg(not(target_arch = "wasm32"))]
cleanup_interval: Duration,
}
impl Default for Database {
fn default() -> Self {
let inner = Arc::new(Inner::default());
let pool = Pool::new(inner.clone(), DEFAULT_POOL_SIZE);
Database {
inner,
pool,
#[cfg(not(target_arch = "wasm32"))]
persistence: None,
#[cfg(not(target_arch = "wasm32"))]
gc_interval: DEFAULT_GC_INTERVAL,
#[cfg(not(target_arch = "wasm32"))]
gc_full_scan_frequency: DEFAULT_GC_FULL_SCAN_FREQUENCY,
#[cfg(not(target_arch = "wasm32"))]
cleanup_interval: DEFAULT_CLEANUP_INTERVAL,
}
}
}
impl Drop for Database {
fn drop(&mut self) {
self.shutdown();
}
}
impl Deref for Database {
type Target = Inner;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl Database {
pub fn new() -> Self {
Self::new_with_options(DatabaseOptions::default())
}
pub fn new_with_options(opts: DatabaseOptions) -> Self {
let inner = Arc::new(Inner::new(&opts));
let pool = Pool::new(inner.clone(), opts.pool_size);
let db = Database {
inner,
pool,
#[cfg(not(target_arch = "wasm32"))]
persistence: None,
#[cfg(not(target_arch = "wasm32"))]
gc_interval: opts.gc_interval,
#[cfg(not(target_arch = "wasm32"))]
gc_full_scan_frequency: opts.gc_full_scan_frequency,
#[cfg(not(target_arch = "wasm32"))]
cleanup_interval: opts.cleanup_interval,
};
#[cfg(not(target_arch = "wasm32"))]
{
if opts.enable_cleanup {
db.initialise_cleanup_worker();
}
if opts.enable_gc {
db.initialise_garbage_worker();
}
}
db
}
#[cfg(not(target_arch = "wasm32"))]
pub fn new_with_persistence(
opts: DatabaseOptions,
persistence_opts: crate::PersistenceOptions,
) -> std::io::Result<Self> {
let inner = Arc::new(Inner::new(&opts));
let pool = Pool::new(inner.clone(), opts.pool_size);
let persist = Persistence::new_with_options(persistence_opts, inner.clone())
.map_err(std::io::Error::other)?;
inner.persistence.write().replace(Arc::new(persist.clone()));
let db = Database {
inner,
pool,
persistence: Some(persist),
gc_interval: opts.gc_interval,
gc_full_scan_frequency: opts.gc_full_scan_frequency,
cleanup_interval: opts.cleanup_interval,
};
if opts.enable_cleanup {
db.initialise_cleanup_worker();
}
if opts.enable_gc {
db.initialise_garbage_worker();
}
Ok(db)
}
pub fn with_gc(self) -> Self {
*self.garbage_collection_epoch.write() = None;
self
}
pub fn with_gc_history(self, history: Duration) -> Self {
*self.garbage_collection_epoch.write() = Some(history);
self
}
pub fn transaction(&self, write: bool) -> Transaction {
self.pool.get(write)
}
#[cfg(not(target_arch = "wasm32"))]
pub fn persistence(&self) -> Option<&Persistence> {
self.persistence.as_ref()
}
pub fn run_cleanup(&self) {
let oldest = self.earliest_active_commit(u64::MAX);
if oldest != u64::MAX {
self.transaction_commit_queue.range(..oldest).for_each(|e| {
e.remove();
});
}
}
pub fn run_gc(&self) {
let cleanup_ts = self.compute_cleanup_ts();
self.run_gc_dirty_inner(cleanup_ts);
self.run_gc_full(cleanup_ts);
}
pub fn run_gc_dirty(&self) {
let cleanup_ts = self.compute_cleanup_ts();
self.run_gc_dirty_inner(cleanup_ts);
}
fn compute_cleanup_ts(&self) -> u64 {
let now = self.oracle.current_time_ns();
let history = self.garbage_collection_epoch.read().unwrap_or_default().as_nanos();
let history_cutoff = now.saturating_sub(history as u64);
let earliest_tx = self.earliest_active_version(now);
let oracle_now = self.oracle.inner.timestamp.load(Ordering::SeqCst);
let proposed = history_cutoff.min(earliest_tx).min(oracle_now);
self.gc_floor.fetch_max(proposed, Ordering::SeqCst);
fence(Ordering::SeqCst);
let earliest_after = self.earliest_active_version(now);
proposed.min(earliest_after)
}
fn run_gc_dirty_inner(&self, cleanup_ts: u64) {
let mut iter = self.datastore.raw_iter_mut();
while let Some(key) = self.gc_dirty_keys.pop() {
if iter.seek_exact(&key) {
let (_, versions) = iter.next().expect("seek_exact returned true");
if versions.gc_older_versions(cleanup_ts) == 0 {
iter.remove_here();
}
}
}
}
fn run_gc_full(&self, cleanup_ts: u64) {
let mut iter = self.datastore.raw_iter_mut();
iter.seek_to_first();
while let Some((_, versions)) = iter.next() {
if versions.gc_older_versions(cleanup_ts) == 0 {
iter.remove_here();
}
}
}
fn shutdown(&self) {
#[cfg(not(target_arch = "wasm32"))]
{
if let Some(ref persistence) = self.persistence {
persistence.background_threads_enabled.store(false, Ordering::Release);
if let Some(handle) = persistence.fsync_handle.write().take() {
handle.thread().unpark();
let _ = handle.join();
}
if let Some(handle) = persistence.snapshot_handle.write().take() {
handle.thread().unpark();
let _ = handle.join();
}
if let Some(handle) = persistence.appender_handle.write().take() {
handle.thread().unpark();
let _ = handle.join();
}
}
}
self.background_threads_enabled.store(false, Ordering::Relaxed);
#[cfg(not(target_arch = "wasm32"))]
{
if let Some(handle) = self.transaction_cleanup_handle.write().take() {
handle.thread().unpark();
let _ = handle.join();
}
if let Some(handle) = self.garbage_collection_handle.write().take() {
handle.thread().unpark();
let _ = handle.join();
}
}
}
#[cfg(not(target_arch = "wasm32"))]
fn initialise_cleanup_worker(&self) {
let db = self.inner.clone();
if db.transaction_cleanup_handle.read().is_none() {
let interval = self.cleanup_interval;
let handle = std::thread::spawn(move || {
while db.background_threads_enabled.load(Ordering::Relaxed) {
std::thread::park_timeout(interval);
if !db.background_threads_enabled.load(Ordering::Relaxed) {
break;
}
let oldest = db.earliest_active_commit(u64::MAX);
if oldest != u64::MAX {
db.transaction_commit_queue.range(..oldest).for_each(|e| {
e.remove();
});
}
}
});
*self.inner.transaction_cleanup_handle.write() = Some(handle);
}
}
#[cfg(not(target_arch = "wasm32"))]
fn initialise_garbage_worker(&self) {
let db = self.inner.clone();
if db.garbage_collection_handle.read().is_none() {
let interval = self.gc_interval;
let full_scan_frequency = self.gc_full_scan_frequency.max(1);
let handle = std::thread::spawn(move || {
let mut cycle: u64 = 0;
while db.background_threads_enabled.load(Ordering::Relaxed) {
std::thread::park_timeout(interval);
if !db.background_threads_enabled.load(Ordering::Relaxed) {
break;
}
let cleanup_ts = {
let now = db.oracle.current_time_ns();
let history =
db.garbage_collection_epoch.read().unwrap_or_default().as_nanos();
let history_cutoff = now.saturating_sub(history as u64);
let earliest_tx = db.earliest_active_version(now);
let oracle_now = db.oracle.inner.timestamp.load(Ordering::SeqCst);
let proposed = history_cutoff.min(earliest_tx).min(oracle_now);
db.gc_floor.fetch_max(proposed, Ordering::SeqCst);
fence(Ordering::SeqCst);
let earliest_after = db.earliest_active_version(now);
proposed.min(earliest_after)
};
while let Some(key) = db.gc_dirty_keys.pop() {
let mut iter = db.datastore.raw_iter_mut();
if iter.seek_exact(&key) {
let (_, versions) = iter.next().expect("seek_exact returned true");
if versions.gc_older_versions(cleanup_ts) == 0 {
iter.remove_here();
}
}
}
cycle += 1;
if cycle.is_multiple_of(full_scan_frequency) {
let mut iter = db.datastore.raw_iter_mut();
iter.seek_to_first();
while let Some((_, versions)) = iter.next() {
if versions.gc_older_versions(cleanup_ts) == 0 {
iter.remove_here();
}
}
}
}
});
*self.inner.garbage_collection_handle.write() = Some(handle);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn begin_tx() {
let db = Database::new();
db.transaction(false);
}
#[test]
fn finished_tx_not_writeable() {
let db = Database::new();
let mut tx = db.transaction(true);
let res = tx.cancel();
assert!(res.is_ok());
let res = tx.put("test", "something");
assert!(res.is_err());
let res = tx.set("test", "something");
assert!(res.is_err());
let res = tx.del("test");
assert!(res.is_err());
let res = tx.commit();
assert!(res.is_err());
let res = tx.cancel();
assert!(res.is_err());
}
#[test]
fn cancelled_tx_is_cancelled() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.put("test", "something").unwrap();
let res = tx.exists("test").unwrap();
assert!(res);
let res = tx.get("test").unwrap();
assert_eq!(res.as_deref(), Some(b"something" as &[u8]));
let res = tx.cancel();
assert!(res.is_ok());
let mut tx = db.transaction(false);
let res = tx.exists("test").unwrap();
assert!(!res);
let res = tx.get("test").unwrap();
assert_eq!(res, None);
let res = tx.cancel();
assert!(res.is_ok());
}
#[test]
fn committed_tx_is_committed() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.put("test", "something").unwrap();
let res = tx.exists("test").unwrap();
assert!(res);
let res = tx.get("test").unwrap();
assert_eq!(res.as_deref(), Some(b"something" as &[u8]));
let res = tx.commit();
assert!(res.is_ok());
let mut tx = db.transaction(false);
let res = tx.exists("test").unwrap();
assert!(res);
let res = tx.get("test").unwrap();
assert_eq!(res.as_deref(), Some(b"something" as &[u8]));
let res = tx.cancel();
assert!(res.is_ok());
}
#[test]
fn multiple_concurrent_readers() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.put("test", "something").unwrap();
let res = tx.exists("test").unwrap();
assert!(res);
let res = tx.get("test").unwrap();
assert_eq!(res.as_deref(), Some(b"something" as &[u8]));
let res = tx.commit();
assert!(res.is_ok());
let mut tx1 = db.transaction(false);
let res = tx1.exists("test").unwrap();
assert!(res);
let res = tx1.exists("temp").unwrap();
assert!(!res);
let mut tx2 = db.transaction(false);
let res = tx2.exists("test").unwrap();
assert!(res);
let res = tx2.exists("temp").unwrap();
assert!(!res);
let res = tx1.cancel();
assert!(res.is_ok());
let res = tx2.cancel();
assert!(res.is_ok());
}
#[test]
fn multiple_concurrent_operators() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.put("test", "something").unwrap();
let res = tx.exists("test").unwrap();
assert!(res);
let res = tx.get("test").unwrap();
assert_eq!(res.as_deref(), Some(b"something" as &[u8]));
let res = tx.commit();
assert!(res.is_ok());
let mut tx1 = db.transaction(false);
let res = tx1.exists("test").unwrap();
assert!(res);
let res = tx1.exists("temp").unwrap();
assert!(!res);
let mut txw = db.transaction(true);
txw.put("temp", "other").unwrap();
let res = txw.exists("test").unwrap();
assert!(res);
let res = txw.exists("temp").unwrap();
assert!(res);
let res = txw.commit();
assert!(res.is_ok());
let mut tx2 = db.transaction(false);
let res = tx2.exists("test").unwrap();
assert!(res);
let res = tx2.exists("temp").unwrap();
assert!(res);
let res = tx1.exists("temp").unwrap();
assert!(!res);
let res = tx1.cancel();
assert!(res.is_ok());
let res = tx2.cancel();
assert!(res.is_ok());
}
#[test]
fn iterate_keys_forward() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.put("a", "a").unwrap();
tx.put("b", "b").unwrap();
tx.put("c", "c").unwrap();
tx.put("d", "d").unwrap();
tx.put("e", "e").unwrap();
tx.put("f", "f").unwrap();
tx.put("g", "g").unwrap();
tx.put("h", "h").unwrap();
tx.put("i", "i").unwrap();
tx.put("j", "j").unwrap();
tx.put("k", "k").unwrap();
tx.put("l", "l").unwrap();
tx.put("m", "m").unwrap();
tx.put("n", "n").unwrap();
tx.put("o", "o").unwrap();
let res = tx.keys("c".."z", None, Some(10)).unwrap();
assert_eq!(res.len(), 10);
assert_eq!(res[0].as_ref(), b"c");
assert_eq!(res[1], "d");
assert_eq!(res[2], "e");
assert_eq!(res[3], "f");
assert_eq!(res[4], "g");
assert_eq!(res[5], "h");
assert_eq!(res[6], "i");
assert_eq!(res[7], "j");
assert_eq!(res[8], "k");
assert_eq!(res[9], "l");
let res = tx.commit();
assert!(res.is_ok());
let mut tx = db.transaction(false);
let res = tx.keys("c".."z", None, Some(10)).unwrap();
assert_eq!(res.len(), 10);
assert_eq!(res[0].as_ref(), b"c");
assert_eq!(res[1], "d");
assert_eq!(res[2], "e");
assert_eq!(res[3], "f");
assert_eq!(res[4], "g");
assert_eq!(res[5], "h");
assert_eq!(res[6], "i");
assert_eq!(res[7], "j");
assert_eq!(res[8], "k");
assert_eq!(res[9], "l");
let res = tx.cancel();
assert!(res.is_ok());
let mut tx = db.transaction(false);
let res = tx.keys("c".."z", Some(3), Some(10)).unwrap();
assert_eq!(res.len(), 10);
assert_eq!(res[0], "f");
assert_eq!(res[1], "g");
assert_eq!(res[2], "h");
assert_eq!(res[3], "i");
assert_eq!(res[4], "j");
assert_eq!(res[5], "k");
assert_eq!(res[6], "l");
assert_eq!(res[7], "m");
assert_eq!(res[8], "n");
assert_eq!(res[9], "o");
let res = tx.cancel();
assert!(res.is_ok());
}
#[test]
fn iterate_keys_reverse() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.put("a", "a").unwrap();
tx.put("b", "b").unwrap();
tx.put("c", "c").unwrap();
tx.put("d", "d").unwrap();
tx.put("e", "e").unwrap();
tx.put("f", "f").unwrap();
tx.put("g", "g").unwrap();
tx.put("h", "h").unwrap();
tx.put("i", "i").unwrap();
tx.put("j", "j").unwrap();
tx.put("k", "k").unwrap();
tx.put("l", "l").unwrap();
tx.put("m", "m").unwrap();
tx.put("n", "n").unwrap();
tx.put("o", "o").unwrap();
let res = tx.keys_reverse("c".."z", None, Some(10)).unwrap();
assert_eq!(res.len(), 10);
assert_eq!(res[0], "o");
assert_eq!(res[1], "n");
assert_eq!(res[2], "m");
assert_eq!(res[3], "l");
assert_eq!(res[4], "k");
assert_eq!(res[5], "j");
assert_eq!(res[6], "i");
assert_eq!(res[7], "h");
assert_eq!(res[8], "g");
assert_eq!(res[9], "f");
let res = tx.commit();
assert!(res.is_ok());
let mut tx = db.transaction(false);
let res = tx.keys_reverse("c".."z", None, Some(10)).unwrap();
assert_eq!(res.len(), 10);
assert_eq!(res[0], "o");
assert_eq!(res[1], "n");
assert_eq!(res[2], "m");
assert_eq!(res[3], "l");
assert_eq!(res[4], "k");
assert_eq!(res[5], "j");
assert_eq!(res[6], "i");
assert_eq!(res[7], "h");
assert_eq!(res[8], "g");
assert_eq!(res[9], "f");
let res = tx.cancel();
assert!(res.is_ok());
let mut tx = db.transaction(false);
let res = tx.keys_reverse("c".."z", Some(3), Some(10)).unwrap();
assert_eq!(res.len(), 10);
assert_eq!(res[0], "l");
assert_eq!(res[1], "k");
assert_eq!(res[2], "j");
assert_eq!(res[3], "i");
assert_eq!(res[4], "h");
assert_eq!(res[5], "g");
assert_eq!(res[6], "f");
assert_eq!(res[7], "e");
assert_eq!(res[8], "d");
assert_eq!(res[9], "c");
let res = tx.cancel();
assert!(res.is_ok());
}
#[test]
fn iterate_keys_values_forward() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.put("a", "a").unwrap();
tx.put("b", "b").unwrap();
tx.put("c", "c").unwrap();
tx.put("d", "d").unwrap();
tx.put("e", "e").unwrap();
tx.put("f", "f").unwrap();
tx.put("g", "g").unwrap();
tx.put("h", "h").unwrap();
tx.put("i", "i").unwrap();
tx.put("j", "j").unwrap();
tx.put("k", "k").unwrap();
tx.put("l", "l").unwrap();
tx.put("m", "m").unwrap();
tx.put("n", "n").unwrap();
tx.put("o", "o").unwrap();
let res = tx.scan("c".."z", None, Some(10)).unwrap();
assert_eq!(res.len(), 10);
assert_eq!(res[0].0.as_ref(), b"c");
assert_eq!(res[0].1.as_ref(), b"c");
assert_eq!(res[1].0.as_ref(), b"d");
assert_eq!(res[1].1.as_ref(), b"d");
assert_eq!(res[2].0.as_ref(), b"e");
assert_eq!(res[3].0.as_ref(), b"f");
assert_eq!(res[4].0.as_ref(), b"g");
assert_eq!(res[5].0.as_ref(), b"h");
assert_eq!(res[6].0.as_ref(), b"i");
assert_eq!(res[7].0.as_ref(), b"j");
assert_eq!(res[8].0.as_ref(), b"k");
assert_eq!(res[9].0.as_ref(), b"l");
let res = tx.commit();
assert!(res.is_ok());
let mut tx = db.transaction(false);
let res = tx.scan("c".."z", None, Some(10)).unwrap();
assert_eq!(res.len(), 10);
assert_eq!(res[0].0.as_ref(), b"c");
assert_eq!(res[0].1.as_ref(), b"c");
assert_eq!(res[1].0.as_ref(), b"d");
assert_eq!(res[1].1.as_ref(), b"d");
assert_eq!(res[2].0.as_ref(), b"e");
assert_eq!(res[3].0.as_ref(), b"f");
assert_eq!(res[4].0.as_ref(), b"g");
assert_eq!(res[5].0.as_ref(), b"h");
assert_eq!(res[6].0.as_ref(), b"i");
assert_eq!(res[7].0.as_ref(), b"j");
assert_eq!(res[8].0.as_ref(), b"k");
assert_eq!(res[9].0.as_ref(), b"l");
let res = tx.cancel();
assert!(res.is_ok());
let mut tx = db.transaction(false);
let res = tx.scan("c".."z", Some(3), Some(10)).unwrap();
assert_eq!(res.len(), 10);
assert_eq!(res[0].0.as_ref(), b"f");
assert_eq!(res[1].0.as_ref(), b"g");
assert_eq!(res[2].0.as_ref(), b"h");
assert_eq!(res[3].0.as_ref(), b"i");
assert_eq!(res[4].0.as_ref(), b"j");
assert_eq!(res[5].0.as_ref(), b"k");
assert_eq!(res[6].0.as_ref(), b"l");
assert_eq!(res[7].0.as_ref(), b"m");
assert_eq!(res[8].0.as_ref(), b"n");
assert_eq!(res[9].0.as_ref(), b"o");
let res = tx.cancel();
assert!(res.is_ok());
}
#[test]
fn iterate_keys_values_reverse() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.put("a", "a").unwrap();
tx.put("b", "b").unwrap();
tx.put("c", "c").unwrap();
tx.put("d", "d").unwrap();
tx.put("e", "e").unwrap();
tx.put("f", "f").unwrap();
tx.put("g", "g").unwrap();
tx.put("h", "h").unwrap();
tx.put("i", "i").unwrap();
tx.put("j", "j").unwrap();
tx.put("k", "k").unwrap();
tx.put("l", "l").unwrap();
tx.put("m", "m").unwrap();
tx.put("n", "n").unwrap();
tx.put("o", "o").unwrap();
let res = tx.scan_reverse("c".."z", None, Some(10)).unwrap();
assert_eq!(res.len(), 10);
assert_eq!(res[0].0.as_ref(), b"o");
assert_eq!(res[1].0.as_ref(), b"n");
assert_eq!(res[2].0.as_ref(), b"m");
assert_eq!(res[3].0.as_ref(), b"l");
assert_eq!(res[4].0.as_ref(), b"k");
assert_eq!(res[5].0.as_ref(), b"j");
assert_eq!(res[6].0.as_ref(), b"i");
assert_eq!(res[7].0.as_ref(), b"h");
assert_eq!(res[8].0.as_ref(), b"g");
assert_eq!(res[9].0.as_ref(), b"f");
let res = tx.commit();
assert!(res.is_ok());
let mut tx = db.transaction(false);
let res = tx.scan_reverse("c".."z", None, Some(10)).unwrap();
assert_eq!(res.len(), 10);
assert_eq!(res[0].0.as_ref(), b"o");
assert_eq!(res[1].0.as_ref(), b"n");
assert_eq!(res[2].0.as_ref(), b"m");
assert_eq!(res[3].0.as_ref(), b"l");
assert_eq!(res[4].0.as_ref(), b"k");
assert_eq!(res[5].0.as_ref(), b"j");
assert_eq!(res[6].0.as_ref(), b"i");
assert_eq!(res[7].0.as_ref(), b"h");
assert_eq!(res[8].0.as_ref(), b"g");
assert_eq!(res[9].0.as_ref(), b"f");
let res = tx.cancel();
assert!(res.is_ok());
let mut tx = db.transaction(false);
let res = tx.scan_reverse("c".."z", Some(3), Some(10)).unwrap();
assert_eq!(res.len(), 10);
assert_eq!(res[0].0.as_ref(), b"l");
assert_eq!(res[1].0.as_ref(), b"k");
assert_eq!(res[2].0.as_ref(), b"j");
assert_eq!(res[3].0.as_ref(), b"i");
assert_eq!(res[4].0.as_ref(), b"h");
assert_eq!(res[5].0.as_ref(), b"g");
assert_eq!(res[6].0.as_ref(), b"f");
assert_eq!(res[7].0.as_ref(), b"e");
assert_eq!(res[8].0.as_ref(), b"d");
assert_eq!(res[9].0.as_ref(), b"c");
let res = tx.cancel();
assert!(res.is_ok());
}
#[test]
fn count_keys_values() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.put("a", "a").unwrap();
tx.put("b", "b").unwrap();
tx.put("c", "c").unwrap();
tx.put("d", "d").unwrap();
tx.put("e", "e").unwrap();
tx.put("f", "f").unwrap();
tx.put("g", "g").unwrap();
tx.put("h", "h").unwrap();
tx.put("i", "i").unwrap();
tx.put("j", "j").unwrap();
tx.put("k", "k").unwrap();
tx.put("l", "l").unwrap();
tx.put("m", "m").unwrap();
tx.put("n", "n").unwrap();
tx.put("o", "o").unwrap();
let res = tx.total("c".."z", None, Some(10)).unwrap();
assert_eq!(res, 10);
let res = tx.commit();
assert!(res.is_ok());
let mut tx = db.transaction(false);
let res = tx.total("c".."z", Some(3), Some(10)).unwrap();
assert_eq!(res, 10);
let res = tx.cancel();
assert!(res.is_ok());
}
#[test]
fn cursor_forward_iteration() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.put("a", "1").unwrap();
tx.put("b", "2").unwrap();
tx.put("c", "3").unwrap();
tx.put("d", "4").unwrap();
tx.put("e", "5").unwrap();
let res = tx.commit();
assert!(res.is_ok());
let mut tx = db.transaction(false);
let mut cursor = tx.cursor("a".."z").unwrap();
cursor.seek_to_first();
assert!(cursor.valid());
assert_eq!(cursor.key().unwrap().as_ref(), b"a");
assert_eq!(cursor.value().unwrap().as_ref(), b"1");
cursor.next();
assert!(cursor.valid());
assert_eq!(cursor.key().unwrap().as_ref(), b"b");
cursor.next(); cursor.next(); cursor.next(); assert!(cursor.valid());
assert_eq!(cursor.key().unwrap().as_ref(), b"e");
cursor.next();
assert!(!cursor.valid());
drop(cursor);
let res = tx.cancel();
assert!(res.is_ok());
}
#[test]
fn cursor_reverse_iteration() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.put("a", "1").unwrap();
tx.put("b", "2").unwrap();
tx.put("c", "3").unwrap();
tx.put("d", "4").unwrap();
tx.put("e", "5").unwrap();
let res = tx.commit();
assert!(res.is_ok());
let mut tx = db.transaction(false);
let mut cursor = tx.cursor("a".."z").unwrap();
cursor.seek_to_last();
assert!(cursor.valid());
assert_eq!(cursor.key().unwrap().as_ref(), b"e");
assert_eq!(cursor.value().unwrap().as_ref(), b"5");
cursor.prev();
assert!(cursor.valid());
assert_eq!(cursor.key().unwrap().as_ref(), b"d");
cursor.prev(); cursor.prev(); cursor.prev(); assert!(cursor.valid());
assert_eq!(cursor.key().unwrap().as_ref(), b"a");
cursor.prev();
assert!(!cursor.valid());
drop(cursor);
let res = tx.cancel();
assert!(res.is_ok());
}
#[test]
fn cursor_seek_operations() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.put("a", "1").unwrap();
tx.put("c", "3").unwrap();
tx.put("e", "5").unwrap();
tx.put("g", "7").unwrap();
let res = tx.commit();
assert!(res.is_ok());
let mut tx = db.transaction(false);
let mut cursor = tx.cursor("a".."z").unwrap();
cursor.seek("c");
assert!(cursor.valid());
assert_eq!(cursor.key().unwrap().as_ref(), b"c");
cursor.seek("d");
assert!(cursor.valid());
assert_eq!(cursor.key().unwrap().as_ref(), b"e");
cursor.seek_for_prev("e");
assert!(cursor.valid());
assert_eq!(cursor.key().unwrap().as_ref(), b"c");
cursor.seek("z");
assert!(!cursor.valid());
drop(cursor);
let res = tx.cancel();
assert!(res.is_ok());
}
#[test]
fn cursor_bidirectional_switch() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.put("a", "1").unwrap();
tx.put("b", "2").unwrap();
tx.put("c", "3").unwrap();
tx.put("d", "4").unwrap();
tx.put("e", "5").unwrap();
let res = tx.commit();
assert!(res.is_ok());
let mut tx = db.transaction(false);
let mut cursor = tx.cursor("a".."z").unwrap();
cursor.seek_to_first();
assert_eq!(cursor.key().unwrap().as_ref(), b"a");
cursor.next();
assert_eq!(cursor.key().unwrap().as_ref(), b"b");
cursor.next();
assert_eq!(cursor.key().unwrap().as_ref(), b"c");
cursor.prev();
assert!(cursor.valid());
assert_eq!(cursor.key().unwrap().as_ref(), b"b");
cursor.next();
assert!(cursor.valid());
assert_eq!(cursor.key().unwrap().as_ref(), b"c");
drop(cursor);
let res = tx.cancel();
assert!(res.is_ok());
}
#[test]
fn keys_iterator_forward() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.put("a", "1").unwrap();
tx.put("b", "2").unwrap();
tx.put("c", "3").unwrap();
tx.put("d", "4").unwrap();
tx.put("e", "5").unwrap();
let res = tx.commit();
assert!(res.is_ok());
let mut tx = db.transaction(false);
let keys: Vec<_> = tx.keys_iter("b".."e").unwrap().collect();
assert_eq!(keys.len(), 3);
assert_eq!(keys[0].as_ref(), b"b");
assert_eq!(keys[1].as_ref(), b"c");
assert_eq!(keys[2].as_ref(), b"d");
let res = tx.cancel();
assert!(res.is_ok());
}
#[test]
fn keys_iterator_reverse() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.put("a", "1").unwrap();
tx.put("b", "2").unwrap();
tx.put("c", "3").unwrap();
tx.put("d", "4").unwrap();
tx.put("e", "5").unwrap();
let res = tx.commit();
assert!(res.is_ok());
let mut tx = db.transaction(false);
let keys: Vec<_> = tx.keys_iter_reverse("b".."e").unwrap().collect();
assert_eq!(keys.len(), 3);
assert_eq!(keys[0].as_ref(), b"d");
assert_eq!(keys[1].as_ref(), b"c");
assert_eq!(keys[2].as_ref(), b"b");
let res = tx.cancel();
assert!(res.is_ok());
}
#[test]
fn keys_iterator_with_take() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.put("a", "1").unwrap();
tx.put("b", "2").unwrap();
tx.put("c", "3").unwrap();
tx.put("d", "4").unwrap();
tx.put("e", "5").unwrap();
tx.put("f", "6").unwrap();
tx.put("g", "7").unwrap();
let res = tx.commit();
assert!(res.is_ok());
let mut tx = db.transaction(false);
let keys: Vec<_> = tx.keys_iter("a".."z").unwrap().take(3).collect();
assert_eq!(keys.len(), 3);
assert_eq!(keys[0].as_ref(), b"a");
assert_eq!(keys[1].as_ref(), b"b");
assert_eq!(keys[2].as_ref(), b"c");
let res = tx.cancel();
assert!(res.is_ok());
}
#[test]
fn keys_iterator_with_skip() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.put("a", "1").unwrap();
tx.put("b", "2").unwrap();
tx.put("c", "3").unwrap();
tx.put("d", "4").unwrap();
tx.put("e", "5").unwrap();
let res = tx.commit();
assert!(res.is_ok());
let mut tx = db.transaction(false);
let keys: Vec<_> = tx.keys_iter("a".."z").unwrap().skip(2).collect();
assert_eq!(keys.len(), 3);
assert_eq!(keys[0].as_ref(), b"c");
assert_eq!(keys[1].as_ref(), b"d");
assert_eq!(keys[2].as_ref(), b"e");
let res = tx.cancel();
assert!(res.is_ok());
}
#[test]
fn scan_iterator_forward() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.put("a", "1").unwrap();
tx.put("b", "2").unwrap();
tx.put("c", "3").unwrap();
let res = tx.commit();
assert!(res.is_ok());
let mut tx = db.transaction(false);
let pairs: Vec<_> = tx.scan_iter("a".."z").unwrap().collect();
assert_eq!(pairs.len(), 3);
assert_eq!(pairs[0].0.as_ref(), b"a");
assert_eq!(pairs[0].1.as_ref(), b"1");
assert_eq!(pairs[1].0.as_ref(), b"b");
assert_eq!(pairs[1].1.as_ref(), b"2");
assert_eq!(pairs[2].0.as_ref(), b"c");
assert_eq!(pairs[2].1.as_ref(), b"3");
let res = tx.cancel();
assert!(res.is_ok());
}
#[test]
fn scan_iterator_reverse() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.put("a", "1").unwrap();
tx.put("b", "2").unwrap();
tx.put("c", "3").unwrap();
let res = tx.commit();
assert!(res.is_ok());
let mut tx = db.transaction(false);
let pairs: Vec<_> = tx.scan_iter_reverse("a".."z").unwrap().collect();
assert_eq!(pairs.len(), 3);
assert_eq!(pairs[0].0.as_ref(), b"c");
assert_eq!(pairs[0].1.as_ref(), b"3");
assert_eq!(pairs[1].0.as_ref(), b"b");
assert_eq!(pairs[1].1.as_ref(), b"2");
assert_eq!(pairs[2].0.as_ref(), b"a");
assert_eq!(pairs[2].1.as_ref(), b"1");
let res = tx.cancel();
assert!(res.is_ok());
}
#[test]
fn scan_iterator_with_take() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.put("a", "1").unwrap();
tx.put("b", "2").unwrap();
tx.put("c", "3").unwrap();
tx.put("d", "4").unwrap();
tx.put("e", "5").unwrap();
let res = tx.commit();
assert!(res.is_ok());
let mut tx = db.transaction(false);
let pairs: Vec<_> = tx.scan_iter("a".."z").unwrap().take(2).collect();
assert_eq!(pairs.len(), 2);
assert_eq!(pairs[0].0.as_ref(), b"a");
assert_eq!(pairs[1].0.as_ref(), b"b");
let res = tx.cancel();
assert!(res.is_ok());
}
#[test]
fn iterator_sees_uncommitted_writes() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.put("a", "1").unwrap();
tx.put("b", "2").unwrap();
let keys: Vec<_> = tx.keys_iter("a".."z").unwrap().collect();
assert_eq!(keys.len(), 2);
assert_eq!(keys[0].as_ref(), b"a");
assert_eq!(keys[1].as_ref(), b"b");
let res = tx.commit();
assert!(res.is_ok());
}
#[test]
fn cursor_handles_deleted_entries() {
let db = Database::new();
let mut tx = db.transaction(true);
tx.put("a", "1").unwrap();
tx.put("b", "2").unwrap();
tx.put("c", "3").unwrap();
let res = tx.commit();
assert!(res.is_ok());
let mut tx = db.transaction(true);
tx.del("b").unwrap();
let keys: Vec<_> = tx.keys_iter("a".."z").unwrap().collect();
assert_eq!(keys.len(), 2);
assert_eq!(keys[0].as_ref(), b"a");
assert_eq!(keys[1].as_ref(), b"c");
let res = tx.cancel();
assert!(res.is_ok());
}
}