#![deny(clippy::all)]
mod tools;
pub use async_sqlite::rusqlite::OpenFlags;
use async_sqlite::rusqlite::{params, OptionalExtension as _};
use async_sqlite::*;
use keyvaluedb::{
DBKeyRef, DBKeyValue, DBKeyValueRef, DBOp, DBTransaction, DBTransactionError, DBValue, IoStats,
IoStatsKind, KeyValueDB, KeyValueDBPinBoxFuture,
};
use parking_lot::Mutex;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::{
io,
path::{Path, PathBuf},
str::FromStr,
};
use tools::*;
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum VacuumMode {
None,
Incremental,
Full,
}
#[derive(Clone)]
pub struct DatabaseConfig {
pub columns: u32,
pub flags: OpenFlags,
pub num_conns: usize,
pub vacuum_mode: VacuumMode,
}
impl DatabaseConfig {
pub fn new() -> Self {
Default::default()
}
pub fn with_columns(self, columns: u32) -> Self {
assert!(columns > 0, "the number of columns must not be zero");
Self { columns, ..self }
}
pub fn with_in_memory(self) -> Self {
Self {
flags: OpenFlags::SQLITE_OPEN_READ_WRITE
| OpenFlags::SQLITE_OPEN_CREATE
| OpenFlags::SQLITE_OPEN_NO_MUTEX
| OpenFlags::SQLITE_OPEN_MEMORY,
..self
}
}
pub fn with_flags(self, flags: OpenFlags) -> Self {
Self { flags, ..self }
}
pub fn with_num_conns(self, num_conns: usize) -> Self {
Self { num_conns, ..self }
}
pub fn with_vacuum_mode(self, vacuum_mode: VacuumMode) -> Self {
Self {
vacuum_mode,
..self
}
}
}
impl Default for DatabaseConfig {
fn default() -> DatabaseConfig {
DatabaseConfig {
columns: 1,
flags: OpenFlags::SQLITE_OPEN_READ_WRITE
| OpenFlags::SQLITE_OPEN_CREATE
| OpenFlags::SQLITE_OPEN_NO_MUTEX,
num_conns: 1,
vacuum_mode: VacuumMode::None,
}
}
}
pub struct DatabaseTable {
_table: String,
str_has_value: String,
str_has_value_like: String,
str_get_unique_value: String,
str_get_first_value_like: String,
str_set_unique_value: String,
str_remove_unique_value: String,
str_remove_and_return_unique_value: String,
str_remove_unique_value_like: String,
str_iter_with_prefix: String,
str_iter_no_prefix: String,
str_iter_keys_with_prefix: String,
str_iter_keys_no_prefix: String,
}
impl DatabaseTable {
pub fn new(table: String) -> Self {
let str_has_value = format!("SELECT 1 FROM {} WHERE [key] = ? LIMIT 1", table);
let str_has_value_like = format!(
"SELECT 1 FROM {} WHERE [key] LIKE ? ESCAPE '\\' LIMIT 1",
table
);
let str_get_unique_value = format!("SELECT value FROM {} WHERE [key] = ? LIMIT 1", table);
let str_get_first_value_like = format!(
"SELECT key, value FROM {} WHERE [key] LIKE ? ESCAPE '\\' LIMIT 1",
table
);
let str_set_unique_value = format!(
"INSERT OR REPLACE INTO {} ([key], value) VALUES(?, ?)",
table
);
let str_remove_unique_value = format!("DELETE FROM {} WHERE [key] = ?", table);
let str_remove_and_return_unique_value =
format!("DELETE FROM {} WHERE [key] = ? RETURNING value", table);
let str_remove_unique_value_like =
format!("DELETE FROM {} WHERE [key] LIKE ? ESCAPE '\\'", table);
let str_iter_with_prefix = format!(
"SELECT key, value FROM {} WHERE [key] LIKE ? ESCAPE '\\'",
table
);
let str_iter_no_prefix = format!("SELECT key, value FROM {}", table);
let str_iter_keys_with_prefix =
format!("SELECT key FROM {} WHERE [key] LIKE ? ESCAPE '\\'", table);
let str_iter_keys_no_prefix = format!("SELECT key FROM {}", table);
Self {
_table: table,
str_has_value,
str_has_value_like,
str_get_unique_value,
str_get_first_value_like,
str_set_unique_value,
str_remove_unique_value,
str_remove_and_return_unique_value,
str_remove_unique_value_like,
str_iter_with_prefix,
str_iter_no_prefix,
str_iter_keys_with_prefix,
str_iter_keys_no_prefix,
}
}
}
pub struct DatabaseUnlockedInner {
path: PathBuf,
config: DatabaseConfig,
pool: Pool,
control_table: Arc<DatabaseTable>,
column_tables: Vec<Arc<DatabaseTable>>,
}
impl Drop for DatabaseUnlockedInner {
fn drop(&mut self) {
let _ = self.pool.close_blocking();
}
}
pub struct DatabaseInner {
overall_stats: IoStats,
current_stats: IoStats,
}
#[derive(Clone)]
pub struct Database {
unlocked_inner: Arc<DatabaseUnlockedInner>,
inner: Arc<Mutex<DatabaseInner>>,
}
impl Database {
pub fn open<P: AsRef<Path>>(path: P, config: DatabaseConfig) -> io::Result<Self> {
assert_ne!(config.columns, 0, "number of columns must be >= 1");
let path = PathBuf::from(path.as_ref());
let flags = config.flags;
let mut column_tables = vec![];
for n in 0..config.columns {
column_tables.push(Arc::new(DatabaseTable::new(get_column_table_name(n))))
}
let control_table = Arc::new(DatabaseTable::new("control".to_string()));
let pool_builder = PoolBuilder::new()
.path(&path)
.flags(flags)
.num_conns(config.num_conns);
let pool = pool_builder.open_blocking().map_err(io::Error::other)?;
let out = Self {
unlocked_inner: Arc::new(DatabaseUnlockedInner {
path,
config,
pool,
control_table,
column_tables,
}),
inner: Arc::new(Mutex::new(DatabaseInner {
overall_stats: IoStats::empty(),
current_stats: IoStats::empty(),
})),
};
let vacuum_mode = out.config().vacuum_mode;
out.conn_blocking(move |conn| {
conn.set_prepared_statement_cache_capacity(256);
conn.pragma_update(None, "case_sensitive_like", "ON")?;
conn.pragma_update(None, "journal_mode", "WAL")?;
conn.pragma_update(None, "synchronous", "normal")?;
conn.pragma_update(None, "journal_size_limit", 6144000)?;
conn.pragma_update(None, "wal_checkpoint", "TRUNCATE")?;
match vacuum_mode {
VacuumMode::None | VacuumMode::Full => {
let current: u32 =
conn.pragma_query_value(None, "auto_vacuum", |x| x.get(0))?;
if current != 0 {
conn.execute("VACUUM", [])?;
conn.pragma_update(None, "auto_vacuum", 0)?;
}
}
VacuumMode::Incremental => {
let current: u32 =
conn.pragma_query_value(None, "auto_vacuum", |x| x.get(0))?;
if current != 2 {
conn.execute("VACUUM", [])?;
conn.pragma_update(None, "auto_vacuum", "2")?;
}
}
}
Ok(())
})
.map_err(io::Error::other)?;
out.open_resize_columns()?;
Ok(out)
}
pub fn path(&self) -> PathBuf {
self.unlocked_inner.path.clone()
}
pub fn config(&self) -> DatabaseConfig {
self.unlocked_inner.config.clone()
}
pub fn columns(&self) -> u32 {
self.unlocked_inner.config.columns
}
pub fn control_table(&self) -> Arc<DatabaseTable> {
self.unlocked_inner.control_table.clone()
}
pub fn column_table(&self, col: u32) -> Arc<DatabaseTable> {
self.unlocked_inner.column_tables[col as usize].clone()
}
pub fn conn_blocking<T, F>(&self, func: F) -> Result<T, Error>
where
F: FnOnce(&rusqlite::Connection) -> Result<T, rusqlite::Error> + Send + 'static,
T: Send + 'static,
{
self.unlocked_inner.pool.conn_blocking(func)
}
pub async fn conn<T, F>(&self, func: F) -> Result<T, Error>
where
F: FnOnce(&rusqlite::Connection) -> Result<T, rusqlite::Error> + Send + 'static,
T: Send + 'static,
{
self.unlocked_inner.pool.conn(func).await
}
pub async fn conn_mut<T, F>(&self, func: F) -> Result<T, Error>
where
F: FnOnce(&mut rusqlite::Connection) -> Result<T, rusqlite::Error> + Send + 'static,
T: Send + 'static,
{
self.unlocked_inner.pool.conn_mut(func).await
}
pub fn remove_last_column(&self) -> Result<(), Error> {
let this = self.clone();
self.conn_blocking(move |conn| {
let columns = Self::get_unique_value(conn, this.control_table(), "columns", 0u32)?;
if columns == 0 {
return Err(rusqlite::Error::QueryReturnedNoRows);
}
Self::set_unique_value(conn, this.control_table(), "columns", columns - 1)?;
conn.execute(
&format!("DROP TABLE {}", get_column_table_name(columns - 1)),
[],
)?;
Ok(())
})
}
pub fn add_column(&self) -> Result<(), Error> {
let this = self.clone();
self.conn_blocking(move |conn| {
let columns = Self::get_unique_value(conn, this.control_table(), "columns", 0u32)?;
Self::set_unique_value(conn, this.control_table(), "columns", columns + 1)?;
Self::create_column_table(conn, columns)
})
}
pub fn transaction(&self) -> DBTransaction {
DBTransaction::new()
}
pub async fn vacuum(&self) -> Result<(), Error> {
match self.config().vacuum_mode {
VacuumMode::None => {
self.conn(move |conn| {
conn.pragma_update(None, "wal_checkpoint", "TRUNCATE")?;
Ok(())
})
.await
}
VacuumMode::Incremental => {
self.conn(move |conn| {
conn.execute("PRAGMA incremental_vacuum", [])?;
conn.pragma_update(None, "wal_checkpoint", "TRUNCATE")?;
Ok(())
})
.await
}
VacuumMode::Full => {
self.conn(move |conn| {
conn.execute("VACUUM", [])?;
conn.pragma_update(None, "wal_checkpoint", "TRUNCATE")?;
Ok(())
})
.await
}
}
}
fn validate_column(&self, col: u32) -> rusqlite::Result<()> {
if col >= self.columns() {
return Err(rusqlite::Error::InvalidColumnIndex(col as usize));
}
Ok(())
}
fn create_column_table(conn: &rusqlite::Connection, column: u32) -> rusqlite::Result<()> {
conn.execute(&format!("CREATE TABLE IF NOT EXISTS {} (id INTEGER PRIMARY KEY AUTOINCREMENT, [key] TEXT UNIQUE, value BLOB)", get_column_table_name(column)), []).map(drop)
}
fn get_unique_value<V>(
conn: &rusqlite::Connection,
table: Arc<DatabaseTable>,
key: &str,
default: V,
) -> rusqlite::Result<V>
where
V: FromStr,
{
let mut stmt = conn.prepare_cached(&table.str_get_unique_value)?;
if let Ok(found) = stmt.query_row([key], |row| -> rusqlite::Result<String> { row.get(0) }) {
if let Ok(v) = V::from_str(&found) {
return Ok(v);
}
}
Ok(default)
}
fn set_unique_value<V>(
conn: &rusqlite::Connection,
table: Arc<DatabaseTable>,
key: &str,
value: V,
) -> rusqlite::Result<()>
where
V: ToString,
{
let mut stmt = conn.prepare_cached(&table.str_set_unique_value)?;
let changed = stmt.execute([key, value.to_string().as_str()])?;
assert!(
changed <= 1,
"multiple changes to unique key should not occur"
);
if changed == 0 {
return Err(rusqlite::Error::QueryReturnedNoRows);
}
Ok(())
}
fn has_value(
conn: &rusqlite::Connection,
table: Arc<DatabaseTable>,
key: &str,
) -> rusqlite::Result<bool> {
let mut stmt = conn.prepare_cached(&table.str_has_value)?;
stmt.exists([key])
}
fn has_value_like(
conn: &rusqlite::Connection,
table: Arc<DatabaseTable>,
key: &str,
) -> rusqlite::Result<bool> {
let mut stmt = conn.prepare_cached(&table.str_has_value_like)?;
stmt.exists([key])
}
fn load_unique_value_blob(
conn: &rusqlite::Connection,
table: Arc<DatabaseTable>,
key: &str,
) -> rusqlite::Result<Option<Vec<u8>>> {
let mut stmt = conn.prepare_cached(&table.str_get_unique_value)?;
stmt.query_row([key], |row| -> rusqlite::Result<Vec<u8>> { row.get(0) })
.optional()
}
fn load_first_value_blob_like(
conn: &rusqlite::Connection,
table: Arc<DatabaseTable>,
like: &str,
) -> rusqlite::Result<Option<(String, Vec<u8>)>> {
let mut stmt = conn.prepare_cached(&table.str_get_first_value_like)?;
stmt.query_row([like], |row| -> rusqlite::Result<(String, Vec<u8>)> {
Ok((row.get(0)?, row.get(1)?))
})
.optional()
}
fn store_unique_value_blob(
conn: &rusqlite::Connection,
table: Arc<DatabaseTable>,
key: &str,
value: &[u8],
) -> rusqlite::Result<()> {
let mut stmt = conn.prepare_cached(&table.str_set_unique_value)?;
let changed = stmt.execute(params![key, value])?;
assert!(
changed <= 1,
"multiple changes to unique key should not occur"
);
if changed == 0 {
return Err(rusqlite::Error::QueryReturnedNoRows);
}
Ok(())
}
fn remove_unique_value_blob(
conn: &rusqlite::Connection,
table: Arc<DatabaseTable>,
key: &str,
) -> rusqlite::Result<()> {
let mut stmt = conn.prepare_cached(&table.str_remove_unique_value)?;
let _ = stmt.execute([key])?;
Ok(())
}
fn remove_and_return_unique_value_blob(
conn: &rusqlite::Connection,
table: Arc<DatabaseTable>,
key: &str,
) -> rusqlite::Result<Option<Vec<u8>>> {
let mut stmt = conn.prepare_cached(&table.str_remove_and_return_unique_value)?;
stmt.query_row([key], |row| -> rusqlite::Result<Vec<u8>> { row.get(0) })
.optional()
}
fn remove_unique_value_blob_like(
conn: &rusqlite::Connection,
table: Arc<DatabaseTable>,
like: &str,
) -> rusqlite::Result<usize> {
let mut stmt = conn.prepare_cached(&table.str_remove_unique_value_like)?;
let changed = stmt.execute([like])?;
Ok(changed)
}
fn open_resize_columns(&self) -> io::Result<()> {
let columns = self.columns();
let this = self.clone();
self.conn_blocking(move |conn| {
conn.execute("CREATE TABLE IF NOT EXISTS control (id INTEGER PRIMARY KEY AUTOINCREMENT, [key] TEXT UNIQUE, value TEXT)", [])?;
let on_disk_columns =
Self::get_unique_value(conn, this.control_table(), "columns", 0u32)?;
if columns <= on_disk_columns {
return Ok(());
}
for cn in on_disk_columns..columns {
Self::create_column_table(conn, cn)?;
}
Self::set_unique_value(
conn,
this.control_table(),
"columns",
columns,
)?;
Ok(())
}).map_err(io::Error::other)
}
fn stats_read(&self, count: usize, bytes: usize) {
let mut inner = self.inner.lock();
inner.current_stats.reads += count as u64;
inner.overall_stats.reads += count as u64;
inner.current_stats.bytes_read += bytes as u64;
inner.overall_stats.bytes_read += bytes as u64;
}
fn stats_write(&self, sizes: &[usize]) {
if sizes.is_empty() {
return;
}
let mut inner = self.inner.lock();
for &size in sizes {
inner.current_stats.record_write(size);
inner.overall_stats.record_write(size);
}
}
fn stats_tx_write(&self, size: usize, duration: Duration) {
let mut inner = self.inner.lock();
inner
.current_stats
.record_tx_write(size, duration.as_micros() as f64);
inner
.overall_stats
.record_tx_write(size, duration.as_micros() as f64);
}
fn stats_delete(&self, count: usize) {
if count == 0 {
return;
}
let mut inner = self.inner.lock();
inner.current_stats.deletes += count as u64;
inner.overall_stats.deletes += count as u64;
}
fn stats_delete_prefix(&self, count: usize) {
if count == 0 {
return;
}
let mut inner = self.inner.lock();
inner.current_stats.prefix_deletes += count as u64;
inner.overall_stats.prefix_deletes += count as u64;
}
fn stats_transaction(&self, count: usize) {
let mut inner = self.inner.lock();
inner.current_stats.transactions += count as u64;
inner.overall_stats.transactions += count as u64;
}
}
impl KeyValueDB for Database {
fn get(&self, col: u32, key: &[u8]) -> KeyValueDBPinBoxFuture<'_, io::Result<Option<DBValue>>> {
let key_text = key_to_text(key);
let key_len = key.len();
Box::pin(async move {
let that = self.clone();
that.validate_column(col).map_err(io::Error::other)?;
let someval = self
.conn_blocking(move |conn| {
Self::load_unique_value_blob(conn, that.column_table(col), &key_text)
})
.map_err(io::Error::other)?;
{
match &someval {
Some(val) => self.stats_read(1, key_len + val.len()),
None => self.stats_read(1, key_len),
}
}
Ok(someval)
})
}
fn delete(
&self,
col: u32,
key: &[u8],
) -> KeyValueDBPinBoxFuture<'_, io::Result<Option<DBValue>>> {
let key_text = key_to_text(key);
let key_len = key.len();
Box::pin(async move {
let that = self.clone();
that.validate_column(col).map_err(io::Error::other)?;
self.conn_blocking(move |conn| {
let someval = Self::remove_and_return_unique_value_blob(
conn,
that.column_table(col),
&key_text,
)?;
match &someval {
Some(val) => {
that.stats_read(1, key_len + val.len());
}
None => that.stats_read(1, key_len),
}
Ok(someval)
})
.map_err(io::Error::other)
})
}
fn write(
&self,
transaction: DBTransaction,
) -> KeyValueDBPinBoxFuture<'_, Result<(), DBTransactionError>> {
let transaction = Arc::new(transaction);
Box::pin(async move {
self.stats_transaction(1);
let that = self.clone();
let transaction_clone = transaction.clone();
self.conn_mut(move |conn| {
let mut sizes = Vec::with_capacity(transaction_clone.ops.len());
let mut total_tx_size = 0;
let mut deletes = 0usize;
let mut prefix_deletes = 0usize;
let start = Instant::now();
let tx = conn.transaction()?;
for op in &transaction_clone.ops {
match op {
DBOp::Insert { col, key, value } => {
that.validate_column(*col)?;
Self::store_unique_value_blob(
&tx,
that.column_table(*col),
&key_to_text(key),
value,
)?;
sizes.push(key.len() + value.len());
total_tx_size += key.len() + value.len();
}
DBOp::Delete { col, key } => {
that.validate_column(*col)?;
Self::remove_unique_value_blob(
&tx,
that.column_table(*col),
&key_to_text(key),
)?;
deletes += 1;
}
DBOp::DeletePrefix { col, prefix } => {
that.validate_column(*col)?;
Self::remove_unique_value_blob_like(
&tx,
that.column_table(*col),
&(like_key_to_text(prefix) + "%"),
)?;
prefix_deletes += 1;
}
}
}
tx.commit()?;
let duration = Instant::now() - start;
that.stats_write(&sizes);
that.stats_tx_write(total_tx_size, duration);
that.stats_delete(deletes);
that.stats_delete_prefix(prefix_deletes);
Ok(())
})
.await
.map_err(io::Error::other)
.map_err(|error| {
let transaction = transaction.as_ref().clone();
DBTransactionError { error, transaction }
})
})
}
fn iter<
'a,
T: Send + 'static,
C: Send + 'static,
F: FnMut(&mut C, DBKeyValueRef) -> io::Result<Option<T>> + Send + Sync + 'static,
>(
&'a self,
col: u32,
prefix: Option<&'a [u8]>,
context: C,
mut f: F,
) -> KeyValueDBPinBoxFuture<'a, io::Result<(C, Option<T>)>> {
let opt_prefix_query = prefix.map(|p| like_key_to_text(p) + "%");
Box::pin(async move {
if col >= self.columns() {
return Err(io::Error::from(io::ErrorKind::NotFound));
}
let that = self.clone();
let context = Arc::new(Mutex::new(Some(context)));
let context_ref = context.clone();
let res = self
.conn(move |conn| {
let mut context = context_ref.lock();
let context = context.as_mut().unwrap();
let mut stmt;
let mut rows;
if let Some(prefix_query) = opt_prefix_query {
stmt = match conn
.prepare_cached(&that.column_table(col).str_iter_with_prefix)
{
Ok(v) => v,
Err(e) => {
return Ok(Err(io::Error::other(e)));
}
};
rows = match stmt.query([prefix_query]) {
Ok(v) => v,
Err(e) => {
return Ok(Err(io::Error::other(e)));
}
};
} else {
stmt = match conn.prepare_cached(&that.column_table(col).str_iter_no_prefix)
{
Ok(v) => v,
Err(e) => {
return Ok(Err(io::Error::other(e)));
}
};
rows = match stmt.query([]) {
Ok(v) => v,
Err(e) => {
return Ok(Err(io::Error::other(e)));
}
};
}
let mut sw = 0usize;
let mut sbw = 0usize;
let out = loop {
match rows.next() {
Ok(Some(row)) => {
let kt: String = match row.get(0) {
Err(e) => {
break Err(io::Error::other(e));
}
Ok(v) => v,
};
let v: Vec<u8> = match row.get(1) {
Err(e) => {
break Err(io::Error::other(e));
}
Ok(v) => v,
};
let k: Vec<u8> = match text_to_key(&kt) {
Err(e) => {
break Err(io::Error::other(format!(
"SQLite row get column 0 text convert error: {:?}",
e
)));
}
Ok(v) => v,
};
sw += 1;
sbw += k.len() + v.len();
match f(context, (&k, &v)) {
Ok(None) => (),
Ok(Some(out)) => {
that.stats_read(sw, sbw);
break Ok(Some(out));
}
Err(e) => {
that.stats_read(sw, sbw);
break Err(e);
}
}
}
Ok(None) => {
break Ok(None);
}
Err(_) => {
break Ok(None);
}
}
};
that.stats_read(sw, sbw);
Ok(out)
})
.await
.map_err(io::Error::other)?;
let context = context.lock().take().unwrap();
res.map(|x| (context, x))
})
}
fn iter_keys<
'a,
T: Send + 'static,
C: Send + 'static,
F: FnMut(&mut C, DBKeyRef) -> io::Result<Option<T>> + Send + Sync + 'static,
>(
&'a self,
col: u32,
prefix: Option<&'a [u8]>,
context: C,
mut f: F,
) -> KeyValueDBPinBoxFuture<'a, io::Result<(C, Option<T>)>> {
let opt_prefix_query = prefix.map(|p| like_key_to_text(p) + "%");
Box::pin(async move {
if col >= self.columns() {
return Err(io::Error::from(io::ErrorKind::NotFound));
}
let that = self.clone();
let context = Arc::new(Mutex::new(Some(context)));
let context_ref = context.clone();
let res = self
.conn(move |conn| {
let mut context = context_ref.lock();
let context = context.as_mut().unwrap();
let mut stmt;
let mut rows;
if let Some(prefix_query) = opt_prefix_query {
stmt = match conn
.prepare_cached(&that.column_table(col).str_iter_keys_with_prefix)
{
Ok(v) => v,
Err(e) => {
return Ok(Err(io::Error::other(e)));
}
};
rows = match stmt.query([prefix_query]) {
Ok(v) => v,
Err(e) => {
return Ok(Err(io::Error::other(e)));
}
};
} else {
stmt = match conn
.prepare_cached(&that.column_table(col).str_iter_keys_no_prefix)
{
Ok(v) => v,
Err(e) => {
return Ok(Err(io::Error::other(e)));
}
};
rows = match stmt.query([]) {
Ok(v) => v,
Err(e) => {
return Ok(Err(io::Error::other(e)));
}
};
}
let mut sw = 0usize;
let mut sbw = 0usize;
let out = loop {
match rows.next() {
Ok(Some(row)) => {
let kt: String = match row.get(0) {
Err(e) => {
break Err(io::Error::other(e));
}
Ok(v) => v,
};
let k: Vec<u8> = match text_to_key(&kt) {
Err(e) => {
break Err(io::Error::other(format!(
"SQLite row get column 0 text convert error: {:?}",
e
)));
}
Ok(v) => v,
};
sw += 1;
sbw += k.len();
match f(context, &k) {
Ok(None) => (),
Ok(Some(out)) => {
that.stats_read(sw, sbw);
break Ok(Some(out));
}
Err(e) => {
that.stats_read(sw, sbw);
break Err(e);
}
}
}
Ok(None) => {
break Ok(None);
}
Err(_) => {
break Ok(None);
}
}
};
that.stats_read(sw, sbw);
Ok(out)
})
.await
.map_err(io::Error::other)?;
let context = context.lock().take().unwrap();
res.map(|x| (context, x))
})
}
fn io_stats(&self, kind: IoStatsKind) -> IoStats {
fn duration_since(timestamp_microseconds: u64) -> Duration {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(Duration::from_micros(0), |time| {
let now = time.as_micros() as u64;
if now >= timestamp_microseconds {
Duration::from_micros(now - timestamp_microseconds)
} else {
Duration::from_micros(0)
}
})
}
let mut inner = self.inner.lock();
match kind {
IoStatsKind::Overall => {
let mut stats = inner.overall_stats.clone();
stats.span = duration_since(stats.started);
stats
}
IoStatsKind::SincePrevious => {
let mut stats = inner.current_stats.clone();
stats.span = duration_since(stats.started);
inner.current_stats = IoStats::empty();
stats
}
}
}
fn num_columns(&self) -> io::Result<u32> {
let this = self.clone();
self.conn_blocking(move |conn| {
Self::get_unique_value(conn, this.control_table(), "columns", 0u32)
})
.map_err(io::Error::other)
}
fn num_keys(&self, col: u32) -> KeyValueDBPinBoxFuture<'_, io::Result<u64>> {
let this = self.clone();
Box::pin(async move {
this.conn(move |conn| {
conn.query_row(
&format!("SELECT Count(*) FROM {}", get_column_table_name(col)),
[],
|row| -> rusqlite::Result<u64> { row.get(0) },
)
})
.await
.map_err(|_| io::Error::from(io::ErrorKind::NotFound))
})
}
fn has_key<'a>(
&'a self,
col: u32,
key: &'a [u8],
) -> KeyValueDBPinBoxFuture<'a, io::Result<bool>> {
let key_text = key_to_text(key);
let key_len = key.len();
Box::pin(async move {
let that = self.clone();
that.validate_column(col).map_err(io::Error::other)?;
let someval = self
.conn_blocking(move |conn| Self::has_value(conn, that.column_table(col), &key_text))
.map_err(io::Error::other)?;
self.stats_read(1, key_len);
Ok(someval)
})
}
fn has_prefix<'a>(
&'a self,
col: u32,
prefix: &'a [u8],
) -> KeyValueDBPinBoxFuture<'a, io::Result<bool>> {
let prefix_len = prefix.len();
let prefix_text = like_key_to_text(prefix) + "%";
Box::pin(async move {
let that = self.clone();
that.validate_column(col).map_err(io::Error::other)?;
let someval = self
.conn_blocking(move |conn| {
Self::has_value_like(conn, that.column_table(col), &prefix_text)
})
.map_err(io::Error::other)?;
self.stats_read(1, prefix_len);
Ok(someval)
})
}
fn first_with_prefix<'a>(
&'a self,
col: u32,
prefix: &'a [u8],
) -> KeyValueDBPinBoxFuture<'a, io::Result<Option<DBKeyValue>>> {
let prefix_len = prefix.len();
let like = like_key_to_text(prefix) + "%";
Box::pin(async move {
let that = self.clone();
that.validate_column(col).map_err(io::Error::other)?;
let someval = self
.conn_blocking(move |conn| {
Self::load_first_value_blob_like(conn, that.column_table(col), &like)
})
.map_err(io::Error::other)?;
self.stats_read(1, prefix_len);
match someval {
Some((kt, val)) => match text_to_key(&kt) {
Err(e) => Err(io::Error::other(format!(
"SQLite row get column 0 text convert error: {:?}",
e
))),
Ok(k) => Ok(Some((k, val))),
},
None => Ok(None),
}
})
}
fn cleanup(&self) -> KeyValueDBPinBoxFuture<'_, io::Result<()>> {
Box::pin(async { self.vacuum().await.map_err(io::Error::other) })
}
}
#[cfg(test)]
mod tests {
use super::*;
use keyvaluedb_shared_tests as st;
use tempfile::Builder as TempfileBuilder;
fn create(columns: u32) -> io::Result<Database> {
let tempfile = TempfileBuilder::new()
.prefix("")
.tempfile()?
.path()
.to_path_buf();
let config = DatabaseConfig::new().with_columns(columns);
Database::open(tempfile, config)
}
fn create_vacuum_mode(columns: u32, vacuum_mode: VacuumMode) -> io::Result<Database> {
let tempfile = TempfileBuilder::new()
.prefix("")
.tempfile()?
.path()
.to_path_buf();
let config = DatabaseConfig::new()
.with_columns(columns)
.with_vacuum_mode(vacuum_mode);
Database::open(tempfile, config)
}
#[tokio::test]
async fn get_fails_with_non_existing_column() -> io::Result<()> {
let db = create(1)?;
st::test_get_fails_with_non_existing_column(db).await
}
#[tokio::test]
async fn num_keys() -> io::Result<()> {
let db = create(1)?;
st::test_num_keys(db).await
}
#[tokio::test]
async fn put_and_get() -> io::Result<()> {
let db = create(1)?;
st::test_put_and_get(db).await
}
#[tokio::test]
async fn delete_and_get() -> io::Result<()> {
let db = create(1)?;
st::test_delete_and_get(db).await
}
#[tokio::test]
async fn delete_and_get_single() -> io::Result<()> {
let db = create(1)?;
st::test_delete_and_get_single(db).await
}
#[tokio::test]
async fn delete_prefix() -> io::Result<()> {
let db = create(st::DELETE_PREFIX_NUM_COLUMNS)?;
st::test_delete_prefix(db).await
}
#[tokio::test]
async fn iter() -> io::Result<()> {
let db = create(1)?;
st::test_iter(db).await
}
#[tokio::test]
async fn iter_keys() -> io::Result<()> {
let db = create(1)?;
st::test_iter_keys(db).await
}
#[tokio::test]
async fn iter_with_prefix() -> io::Result<()> {
let db = create(1)?;
st::test_iter_with_prefix(db).await
}
#[tokio::test]
async fn complex() -> io::Result<()> {
let db = create(1)?;
st::test_complex(db).await
}
#[tokio::test]
async fn cleanup() -> io::Result<()> {
let db = create(1)?;
st::test_cleanup(db).await?;
let db = create_vacuum_mode(1, VacuumMode::None)?;
st::test_cleanup(db).await?;
let db = create_vacuum_mode(1, VacuumMode::Incremental)?;
st::test_cleanup(db).await?;
let db = create_vacuum_mode(1, VacuumMode::Full)?;
st::test_cleanup(db).await?;
let tempfile = TempfileBuilder::new()
.prefix("")
.tempfile()?
.path()
.to_path_buf();
let config = DatabaseConfig::new().with_vacuum_mode(VacuumMode::None);
let db = Database::open(tempfile.clone(), config)?;
st::test_cleanup(db).await?;
let config = DatabaseConfig::new().with_vacuum_mode(VacuumMode::Incremental);
let db = Database::open(tempfile.clone(), config)?;
st::test_cleanup(db).await?;
let config = DatabaseConfig::new().with_vacuum_mode(VacuumMode::Full);
let db = Database::open(tempfile.clone(), config)?;
st::test_cleanup(db).await?;
let config = DatabaseConfig::new().with_vacuum_mode(VacuumMode::None);
let db = Database::open(tempfile, config)?;
st::test_cleanup(db).await?;
Ok(())
}
#[tokio::test]
async fn stats() -> io::Result<()> {
let db = create(st::IO_STATS_NUM_COLUMNS)?;
st::test_io_stats(db).await
}
#[tokio::test]
#[should_panic]
async fn db_config_with_zero_columns() {
let _cfg = DatabaseConfig::new().with_columns(0);
}
#[tokio::test]
#[should_panic]
async fn open_db_with_zero_columns() {
let cfg = DatabaseConfig::new().with_columns(0);
let _db = Database::open("", cfg);
}
#[tokio::test]
async fn add_columns() {
let config_1 = DatabaseConfig::default();
let config_5 = DatabaseConfig::new().with_columns(5);
let tempfile = TempfileBuilder::new()
.prefix("")
.tempfile()
.unwrap()
.path()
.to_path_buf();
{
let db = Database::open(&tempfile, config_1).unwrap();
assert_eq!(db.num_columns().unwrap(), 1);
for i in 2..=5 {
db.add_column().unwrap();
assert_eq!(db.num_columns().unwrap(), i);
}
}
{
let db = Database::open(&tempfile, config_5).unwrap();
assert_eq!(db.num_columns().unwrap(), 5);
}
}
#[tokio::test]
async fn remove_columns() {
let config_1 = DatabaseConfig::default();
let config_5 = DatabaseConfig::new().with_columns(5);
let tempfile = TempfileBuilder::new()
.prefix("drop_columns")
.tempfile()
.unwrap()
.path()
.to_path_buf();
{
let db = Database::open(&tempfile, config_5).expect("open with 5 columns");
assert_eq!(db.num_columns().unwrap(), 5);
for i in (1..5).rev() {
db.remove_last_column().unwrap();
assert_eq!(db.num_columns().unwrap(), i);
}
}
{
let db = Database::open(&tempfile, config_1).unwrap();
assert_eq!(db.num_columns().unwrap(), 1);
}
}
#[tokio::test]
async fn test_num_keys() {
let tempfile = TempfileBuilder::new()
.prefix("")
.tempfile()
.unwrap()
.path()
.to_path_buf();
let config = DatabaseConfig::new().with_columns(1);
let db = Database::open(tempfile, config).unwrap();
assert_eq!(
db.num_keys(0).await.unwrap(),
0,
"database is empty after creation"
);
let key1 = b"beef";
let mut batch = db.transaction();
batch.put(0, key1, key1);
db.write(batch).await.unwrap();
assert_eq!(
db.num_keys(0).await.unwrap(),
1,
"adding a key increases the count"
);
}
}