use crate::iterate::DbIterator;
use crate::logic::{DbLogic, EntryRef};
use crate::tasks::{TaskManager, TaskType};
use crate::{Error, Key, Params, StartMode, Value, WriteBatch, WriteOptions};
use std::sync::Arc;
pub struct Database {
inner: Arc<DbLogic>,
tasks: Arc<TaskManager>,
}
impl Database {
pub async fn new(mode: StartMode) -> Result<Self, Error> {
let params = Params::default();
Self::new_with_params(mode, params).await
}
pub async fn new_with_params(mode: StartMode, params: Params) -> Result<Self, Error> {
let compaction_concurrency = params.compaction_concurrency;
let inner = Arc::new(DbLogic::new(mode, params).await?);
let tasks = Arc::new(TaskManager::new(inner.clone(), compaction_concurrency).await);
Ok(Self { inner, tasks })
}
#[tracing::instrument(skip(self, key))]
pub async fn get(&self, key: &[u8]) -> Result<Option<EntryRef>, Error> {
match self.inner.get(key).await {
Ok((needs_compaction, data)) => {
if needs_compaction {
self.tasks.wake_up(&TaskType::LevelCompaction);
}
Ok(data)
}
Err(err) => Err(err),
}
}
#[tracing::instrument(skip(self, key))]
pub async fn delete(&self, key: Key) -> Result<(), Error> {
let mut batch = WriteBatch::new();
batch.delete(key);
self.write_opts(batch, &WriteOptions::default()).await
}
pub async fn synchronize(&self) -> Result<(), Error> {
self.inner.synchronize().await
}
pub async fn delete_opts(&self, key: Key, opts: &WriteOptions) -> Result<(), Error> {
let mut batch = WriteBatch::new();
batch.delete(key);
self.write_opts(batch, opts).await
}
pub async fn put(&self, key: Key, value: Value) -> Result<(), Error> {
const OPTS: WriteOptions = WriteOptions::new();
self.put_opts(key, value, &OPTS).await
}
#[tracing::instrument(skip(self))]
pub async fn put_opts(&self, key: Key, value: Value, opts: &WriteOptions) -> Result<(), Error> {
let mut batch = WriteBatch::new();
batch.put(key, value);
self.write_opts(batch, opts).await
}
pub async fn iter(&self) -> DbIterator {
let (mem_iters, table_iters, min_key, max_key) = self.inner.prepare_iter(None, None).await;
DbIterator::new(
mem_iters,
table_iters,
min_key,
max_key,
false,
#[cfg(feature = "wisckey")]
self.inner.get_value_log(),
)
}
pub async fn range_iter(&self, min_key: &[u8], max_key: &[u8]) -> DbIterator {
let (mem_iters, table_iters, min_key, max_key) =
self.inner.prepare_iter(Some(min_key), Some(max_key)).await;
DbIterator::new(
mem_iters,
table_iters,
min_key,
max_key,
false,
#[cfg(feature = "wisckey")]
self.inner.get_value_log(),
)
}
pub async fn reverse_range_iter(&self, max_key: &[u8], min_key: &[u8]) -> DbIterator {
let (mem_iters, table_iters, min_key, max_key) = self
.inner
.prepare_reverse_iter(Some(max_key), Some(min_key))
.await;
DbIterator::new(
mem_iters,
table_iters,
min_key.map(|k| k.to_vec()),
max_key.map(|k| k.to_vec()),
true,
#[cfg(feature = "wisckey")]
self.inner.get_value_log(),
)
}
pub async fn write(&self, write_batch: WriteBatch) -> Result<(), Error> {
const OPTS: WriteOptions = WriteOptions::new();
self.write_opts(write_batch, &OPTS).await
}
#[tracing::instrument(skip(self, write_batch, opts))]
pub async fn write_opts(
&self,
write_batch: WriteBatch,
opts: &WriteOptions,
) -> Result<(), Error> {
let needs_compaction = self.inner.write_opts(write_batch, opts).await?;
if needs_compaction {
self.tasks.wake_up(&TaskType::MemtableCompaction);
}
Ok(())
}
pub async fn stop(&self) -> Result<(), Error> {
self.inner.stop().await?;
self.tasks.stop_all().await
}
}
impl Drop for Database {
fn drop(&mut self) {
self.tasks.terminate();
}
}