#![deny(clippy::all)]
#![deny(missing_docs)]
mod error;
mod indexed_db;
use keyvaluedb::{
DBKeyRef, DBKeyValueRef, DBTransaction, DBTransactionError, DBValue, KeyValueDBPinBoxFuture,
};
use keyvaluedb_memorydb::{self as in_memory, InMemory};
use send_wrapper::SendWrapper;
use std::io;
use std::sync::Arc;
pub use crate::error::*;
pub use keyvaluedb::KeyValueDB;
use futures::prelude::*;
use web_sys::IdbDatabase;
struct DatabaseUnlockedInner {
table_name: String,
version: u32,
columns: u32,
in_memory: Option<InMemory>,
indexed_db: SendWrapper<IdbDatabase>,
}
impl Drop for DatabaseUnlockedInner {
fn drop(&mut self) {
self.indexed_db.close();
}
}
#[derive(Clone)]
pub struct Database {
unlocked_inner: Arc<DatabaseUnlockedInner>,
}
impl Database {
pub async fn open(
table_name: &str,
columns: u32,
memory_cached: bool,
) -> Result<Database, error::Error> {
let db = indexed_db::open(table_name, None, columns)
.await
.map_err(io_err_string)?;
let db = if columns > db.columns {
let next_version = db.version + 1;
drop(db);
indexed_db::open(table_name, Some(next_version), columns)
.await
.map_err(io_err_string)?
} else {
db
};
let indexed_db::IndexedDB { version, inner, .. } = db;
let in_memory = if memory_cached {
let in_memory = in_memory::create(columns);
for column in 0..columns {
let mut tx = DBTransaction::new();
let mut stream = indexed_db::idb_cursor(&inner, column, None, None)
.map_err(error::Error::from)?;
while let Some(kv) = stream.next().await {
match kv {
Ok((key, value)) => {
tx.put(column, &key, &value);
}
Err(e) => {
return Err(e.into());
}
}
}
in_memory
.write(tx)
.await
.expect("writing in memory always succeeds; qed");
}
Some(in_memory)
} else {
None
};
Ok(Database {
unlocked_inner: Arc::new(DatabaseUnlockedInner {
table_name: table_name.to_owned(),
version,
columns,
in_memory,
indexed_db: inner,
}),
})
}
pub async fn delete(table_name: &str) -> io::Result<()> {
indexed_db::delete(table_name).await.map_err(io_err_string)
}
pub fn list(
opt_prefix: Option<&str>,
) -> KeyValueDBPinBoxFuture<'_, io::Result<Vec<(String, u32)>>> {
let opt_prefix = opt_prefix.map(|p| p.to_owned());
Box::pin(SendWrapper::new(async move {
let names = indexed_db::names_with_versions()
.await
.map_err(io_err_string)?;
let Some(prefix) = opt_prefix else {
return Ok(names);
};
Ok(names
.into_iter()
.filter(|(name, _ver)| name.starts_with(&prefix))
.collect())
}))
}
pub fn name(&self) -> String {
self.unlocked_inner.table_name.clone()
}
pub fn version(&self) -> u32 {
self.unlocked_inner.version
}
}
impl KeyValueDB for Database {
fn get<'a>(
&'a self,
col: u32,
key: &'a [u8],
) -> KeyValueDBPinBoxFuture<'a, io::Result<Option<DBValue>>> {
let this = self.clone();
Box::pin(SendWrapper::new(async move {
if col >= this.unlocked_inner.columns {
return Err(io::Error::from(io::ErrorKind::NotFound));
}
if let Some(in_memory) = &this.unlocked_inner.in_memory {
in_memory.get(col, key).await
} else {
indexed_db::idb_get(&this.unlocked_inner.indexed_db, col, key).await
}
}))
}
fn delete<'a>(
&'a self,
col: u32,
key: &'a [u8],
) -> KeyValueDBPinBoxFuture<'a, io::Result<Option<DBValue>>> {
let this = self.clone();
Box::pin(SendWrapper::new(async move {
if col >= this.unlocked_inner.columns {
return Err(io::Error::from(io::ErrorKind::NotFound));
}
let someval = indexed_db::idb_get(&this.unlocked_inner.indexed_db, col, key).await?;
let mut transaction = DBTransaction::new();
transaction.delete(col, key);
match indexed_db::idb_commit_transaction(
&this.unlocked_inner.indexed_db,
&transaction,
this.unlocked_inner.columns,
)
.await
{
Ok(()) => {}
Err(error) => {
return Err(io_err_string(format!("delete failed: {:?}", error)));
}
};
if let Some(in_memory) = &this.unlocked_inner.in_memory {
in_memory.delete(col, key).await?;
}
Ok(someval)
}))
}
fn write(
&self,
transaction: DBTransaction,
) -> KeyValueDBPinBoxFuture<'_, Result<(), DBTransactionError>> {
let this = self.clone();
Box::pin(SendWrapper::new(async move {
{
match indexed_db::idb_commit_transaction(
&this.unlocked_inner.indexed_db,
&transaction,
this.unlocked_inner.columns,
)
.await
{
Ok(()) => {}
Err(error) => {
return Err(DBTransactionError { error, transaction });
}
};
}
if let Some(in_memory) = &this.unlocked_inner.in_memory {
in_memory.write(transaction).await
} else {
Ok(())
}
}))
}
fn iter<
'a,
T: Send + 'static,
C: Send + 'static,
F: FnMut(&mut C, DBKeyValueRef) -> io::Result<Option<T>> + Send + Sync + 'static,
>(
&'a self,
col: u32,
prefix: Option<&'a [u8]>,
mut context: C,
mut f: F,
) -> KeyValueDBPinBoxFuture<'a, io::Result<(C, Option<T>)>> {
let this = self.clone();
Box::pin(async move {
if col >= this.unlocked_inner.columns {
return Err(io::Error::from(io::ErrorKind::NotFound));
}
if let Some(in_memory) = &this.unlocked_inner.in_memory {
in_memory.iter(col, prefix, context, f).await
} else {
let mut stream = indexed_db::idb_cursor(
&this.unlocked_inner.indexed_db,
col,
None,
prefix.map(|p| p.to_vec()),
)?;
while let Some(kv) = stream.next().await {
match kv {
Ok((key, value)) => {
if let Some(out) = f(&mut context, (&key, &value))? {
return Ok((context, Some(out)));
}
}
Err(e) => {
return Err(e);
}
}
}
Ok((context, None))
}
})
}
fn iter_keys<
'a,
T: Send + 'static,
C: Send + 'static,
F: FnMut(&mut C, DBKeyRef) -> io::Result<Option<T>> + Send + Sync + 'static,
>(
&'a self,
col: u32,
prefix: Option<&'a [u8]>,
mut context: C,
mut f: F,
) -> KeyValueDBPinBoxFuture<'a, io::Result<(C, Option<T>)>> {
let this = self.clone();
Box::pin(async move {
if col >= this.unlocked_inner.columns {
return Err(io::Error::from(io::ErrorKind::NotFound));
}
if let Some(in_memory) = &this.unlocked_inner.in_memory {
in_memory.iter_keys(col, prefix, context, f).await
} else {
let mut stream = indexed_db::idb_cursor_keys(
&this.unlocked_inner.indexed_db,
col,
prefix.map(|p| p.to_vec()),
)?;
while let Some(k) = stream.next().await {
match k {
Ok(key) => {
if let Some(out) = f(&mut context, &key)? {
return Ok((context, Some(out)));
}
}
Err(e) => {
return Err(e);
}
}
}
Ok((context, None))
}
})
}
fn num_columns(&self) -> Result<u32, io::Error> {
Ok(self.unlocked_inner.columns)
}
fn num_keys(&self, col: u32) -> KeyValueDBPinBoxFuture<'_, io::Result<u64>> {
let this = self.clone();
Box::pin(async move {
if col >= this.unlocked_inner.columns {
return Err(io::Error::from(io::ErrorKind::NotFound));
}
if let Some(in_memory) = &this.unlocked_inner.in_memory {
in_memory.num_keys(col).await
} else {
let mut stream =
indexed_db::idb_get_key_count(&this.unlocked_inner.indexed_db, col, None)?;
if let Some(v) = stream.next().await {
match v {
Ok(value) => {
return Ok(value as u64);
}
Err(e) => {
return Err(e);
}
}
}
Err(io::Error::from(io::ErrorKind::InvalidData))
}
})
}
}