#![deny(clippy::all)]
#![deny(missing_docs)]
mod error;
mod indexed_db;
use async_lock::Mutex as AsyncMutex;
use keyvaluedb::{DBKeyRef, DBKeyValueRef, DBTransaction, DBTransactionError, DBValue};
use keyvaluedb_memorydb::{self as in_memory, InMemory};
use send_wrapper::SendWrapper;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::sync::Arc;
pub use crate::error::*;
pub use keyvaluedb::KeyValueDB;
use futures::prelude::*;
use web_sys::IdbDatabase;
struct DatabaseInner {
indexed_db: SendWrapper<IdbDatabase>,
}
impl Drop for DatabaseInner {
fn drop(&mut self) {
self.indexed_db.close();
}
}
struct DatabaseUnlockedInner {
table_name: String,
version: u32,
columns: u32,
in_memory: Option<InMemory>,
}
#[derive(Clone)]
pub struct Database {
unlocked_inner: Arc<DatabaseUnlockedInner>,
inner: Arc<AsyncMutex<DatabaseInner>>,
}
impl Database {
pub async fn open(
table_name: &str,
columns: u32,
memory_cached: bool,
) -> Result<Database, error::Error> {
let db = indexed_db::open(table_name, None, columns)
.await
.map_err(io_err_string)?;
let db = if columns + 1 > db.columns {
let next_version = db.version + 1;
drop(db);
indexed_db::open(table_name, Some(next_version), columns)
.await
.map_err(io_err_string)?
} else {
db
};
let indexed_db::IndexedDB { version, inner, .. } = db;
let in_memory = if memory_cached {
let in_memory = in_memory::create(columns);
for column in 0..columns {
let mut tx = DBTransaction::new();
let mut stream = indexed_db::idb_cursor(&inner, column, None, None)
.map_err(error::Error::from)?;
while let Some(kv) = stream.next().await {
match kv {
Ok((key, value)) => {
tx.put(column, &key, &value);
}
Err(e) => {
return Err(e.into());
}
}
}
in_memory
.write(tx)
.await
.expect("writing in memory always succeeds; qed");
}
Some(in_memory)
} else {
None
};
Ok(Database {
unlocked_inner: Arc::new(DatabaseUnlockedInner {
table_name: table_name.to_owned(),
version,
columns,
in_memory,
}),
inner: Arc::new(AsyncMutex::new(DatabaseInner { indexed_db: inner })),
})
}
pub async fn delete(table_name: &str) -> io::Result<()> {
indexed_db::delete(table_name).await.map_err(io_err_string)
}
pub fn name(&self) -> String {
self.unlocked_inner.table_name.clone()
}
pub fn num_columns(&self) -> Result<u32, io::Error> {
Ok(self.unlocked_inner.columns)
}
pub fn version(&self) -> u32 {
self.unlocked_inner.version
}
}
impl KeyValueDB for Database {
fn get<'a>(
&self,
col: u32,
key: &'a [u8],
) -> Pin<Box<dyn Future<Output = io::Result<Option<DBValue>>> + Send + 'a>> {
let this = self.clone();
Box::pin(SendWrapper::new(async move {
if col >= this.unlocked_inner.columns {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("No such column family: {:?}", col),
));
}
if let Some(in_memory) = &this.unlocked_inner.in_memory {
in_memory.get(col, key).await
} else {
let inner = this.inner.lock().await;
indexed_db::idb_get(&inner.indexed_db, col, key).await
}
}))
}
fn delete<'a>(
&self,
col: u32,
key: &'a [u8],
) -> Pin<Box<dyn Future<Output = io::Result<Option<DBValue>>> + Send + 'a>> {
let this = self.clone();
Box::pin(SendWrapper::new(async move {
if col >= this.unlocked_inner.columns {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("No such column family: {:?}", col),
));
}
let inner = this.inner.lock().await;
let someval = indexed_db::idb_get(&inner.indexed_db, col, key).await?;
let mut transaction = DBTransaction::new();
transaction.delete(col, key);
match indexed_db::idb_commit_transaction(
&inner.indexed_db,
&transaction,
this.unlocked_inner.columns,
)
.await
{
Ok(()) => {}
Err(error) => {
return Err(io_err_string(format!("delete failed: {:?}", error)));
}
};
if let Some(in_memory) = &this.unlocked_inner.in_memory {
in_memory.delete(col, key).await?;
}
Ok(someval)
}))
}
fn write(
&self,
transaction: DBTransaction,
) -> Pin<Box<dyn Future<Output = Result<(), DBTransactionError>> + Send + 'static>> {
let this = self.clone();
Box::pin(SendWrapper::new(async move {
{
let inner = this.inner.lock().await;
match indexed_db::idb_commit_transaction(
&inner.indexed_db,
&transaction,
this.unlocked_inner.columns,
)
.await
{
Ok(()) => {}
Err(error) => {
return Err(DBTransactionError { error, transaction });
}
};
}
if let Some(in_memory) = &this.unlocked_inner.in_memory {
in_memory.write(transaction).await
} else {
Ok(())
}
}))
}
fn iter<'a, T: 'a, F: FnMut(DBKeyValueRef) -> io::Result<Option<T>> + Send + Sync + 'a>(
&self,
col: u32,
prefix: Option<&'a [u8]>,
mut f: F,
) -> Pin<Box<dyn Future<Output = io::Result<Option<T>>> + Send + 'a>> {
let this = self.clone();
Box::pin(async move {
if col >= this.unlocked_inner.columns {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("No such column family: {:?}", col),
));
}
if let Some(in_memory) = &this.unlocked_inner.in_memory {
in_memory.iter(col, prefix, f).await
} else {
let inner = this.inner.lock().await;
let mut stream = indexed_db::idb_cursor(
&inner.indexed_db,
col,
None,
prefix.map(|p| p.to_vec()),
)?;
while let Some(kv) = stream.next().await {
match kv {
Ok((key, value)) => {
if let Some(out) = f((&key, &value))? {
return Ok(Some(out));
}
}
Err(e) => {
return Err(e);
}
}
}
Ok(None)
}
})
}
fn iter_keys<'a, T: 'a, F: FnMut(DBKeyRef) -> io::Result<Option<T>> + Send + Sync + 'a>(
&self,
col: u32,
prefix: Option<&'a [u8]>,
mut f: F,
) -> Pin<Box<dyn Future<Output = io::Result<Option<T>>> + Send + 'a>> {
let this = self.clone();
Box::pin(async move {
if col >= this.unlocked_inner.columns {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("No such column family: {:?}", col),
));
}
if let Some(in_memory) = &this.unlocked_inner.in_memory {
in_memory.iter_keys(col, prefix, f).await
} else {
let inner = this.inner.lock().await;
let mut stream = indexed_db::idb_cursor_keys(
&inner.indexed_db,
col,
prefix.map(|p| p.to_vec()),
)?;
while let Some(k) = stream.next().await {
match k {
Ok(key) => {
if let Some(out) = f(&key)? {
return Ok(Some(out));
}
}
Err(e) => {
return Err(e);
}
}
}
Ok(None)
}
})
}
fn restore(&self, _new_db: &str) -> std::io::Result<()> {
Err(io_err_string("Not supported yet"))
}
}