keyvaluedb-web 0.1.8

A key-value database for use in browsers
Documentation
//! A key-value database for use in browsers
//!
//! Writes data both into memory and IndexedDB, optionally reads the whole database in memory
//! from the IndexedDB on `open`.

#![deny(clippy::all)]
#![deny(missing_docs)]

mod error;
mod indexed_db;

use keyvaluedb::{
    DBKeyRef, DBKeyValueRef, DBTransaction, DBTransactionError, DBValue, KeyValueDBPinBoxFuture,
};
use keyvaluedb_memorydb::{self as in_memory, InMemory};
use send_wrapper::SendWrapper;
use std::io;
use std::sync::Arc;

pub use crate::error::*;
pub use keyvaluedb::KeyValueDB;

use futures::prelude::*;

use web_sys::IdbDatabase;

struct DatabaseUnlockedInner {
    table_name: String,
    version: u32,
    columns: u32,
    in_memory: Option<InMemory>,
    indexed_db: SendWrapper<IdbDatabase>,
}

impl Drop for DatabaseUnlockedInner {
    fn drop(&mut self) {
        self.indexed_db.close();
    }
}

/// Database backed by both IndexedDB and in memory implementation.
#[derive(Clone)]
pub struct Database {
    unlocked_inner: Arc<DatabaseUnlockedInner>,
}

impl Database {
    /// Opens the database with the given name,
    /// and the specified number of columns (not including the default one).
    pub async fn open(
        table_name: &str,
        columns: u32,
        memory_cached: bool,
    ) -> Result<Database, error::Error> {
        // let's try to open the latest version of the db first
        let db = indexed_db::open(table_name, None, columns)
            .await
            .map_err(io_err_string)?;

        // If we need more column than the latest version has,
        // then bump the version.
        // In order to bump the version, we close the database
        // and reopen it with a higher version than it was opened with previously.
        // cf. https://github.com/paritytech/parity-common/pull/202#discussion_r321221751
        let db = if columns > db.columns {
            let next_version = db.version + 1;
            drop(db);
            indexed_db::open(table_name, Some(next_version), columns)
                .await
                .map_err(io_err_string)?
        } else {
            db
        };
        // populate the in_memory db from the IndexedDB
        let indexed_db::IndexedDB { version, inner, .. } = db;
        let in_memory = if memory_cached {
            let in_memory = in_memory::create(columns);
            // read the columns from the IndexedDB
            for column in 0..columns {
                let mut tx = DBTransaction::new();
                let mut stream = indexed_db::idb_cursor(&inner, column, None, None)
                    .map_err(error::Error::from)?;
                while let Some(kv) = stream.next().await {
                    match kv {
                        Ok((key, value)) => {
                            tx.put(column, &key, &value);
                        }
                        Err(e) => {
                            return Err(e.into());
                        }
                    }
                }
                // write each column into memory
                in_memory
                    .write(tx)
                    .await
                    .expect("writing in memory always succeeds; qed");
            }
            Some(in_memory)
        } else {
            None
        };

        Ok(Database {
            unlocked_inner: Arc::new(DatabaseUnlockedInner {
                table_name: table_name.to_owned(),
                version,
                columns,
                in_memory,
                indexed_db: inner,
            }),
        })
    }

    /// Deletes the database with the given name,
    pub async fn delete(table_name: &str) -> io::Result<()> {
        indexed_db::delete(table_name).await.map_err(io_err_string)
    }

    /// Enumerate every IndexedDB database visible to the current origin,
    /// optionally filtered to names starting with `opt_prefix`. Returns
    /// `(name, version)` tuples. Backed by the `indexedDB.databases()` API.
    pub fn list(
        opt_prefix: Option<&str>,
    ) -> KeyValueDBPinBoxFuture<'_, io::Result<Vec<(String, u32)>>> {
        let opt_prefix = opt_prefix.map(|p| p.to_owned());
        Box::pin(SendWrapper::new(async move {
            let names = indexed_db::names_with_versions()
                .await
                .map_err(io_err_string)?;
            let Some(prefix) = opt_prefix else {
                return Ok(names);
            };
            Ok(names
                .into_iter()
                .filter(|(name, _ver)| name.starts_with(&prefix))
                .collect())
        }))
    }

    /// Get the database name.
    pub fn name(&self) -> String {
        self.unlocked_inner.table_name.clone()
    }

    /// Get the database version.
    pub fn version(&self) -> u32 {
        self.unlocked_inner.version
    }
}

impl KeyValueDB for Database {
    fn get<'a>(
        &'a self,
        col: u32,
        key: &'a [u8],
    ) -> KeyValueDBPinBoxFuture<'a, io::Result<Option<DBValue>>> {
        let this = self.clone();
        Box::pin(SendWrapper::new(async move {
            if col >= this.unlocked_inner.columns {
                return Err(io::Error::from(io::ErrorKind::NotFound));
            }

            if let Some(in_memory) = &this.unlocked_inner.in_memory {
                in_memory.get(col, key).await
            } else {
                indexed_db::idb_get(&this.unlocked_inner.indexed_db, col, key).await
            }
        }))
    }

    fn delete<'a>(
        &'a self,
        col: u32,
        key: &'a [u8],
    ) -> KeyValueDBPinBoxFuture<'a, io::Result<Option<DBValue>>> {
        let this = self.clone();
        Box::pin(SendWrapper::new(async move {
            if col >= this.unlocked_inner.columns {
                return Err(io::Error::from(io::ErrorKind::NotFound));
            }

            let someval = indexed_db::idb_get(&this.unlocked_inner.indexed_db, col, key).await?;

            let mut transaction = DBTransaction::new();
            transaction.delete(col, key);

            match indexed_db::idb_commit_transaction(
                &this.unlocked_inner.indexed_db,
                &transaction,
                this.unlocked_inner.columns,
            )
            .await
            {
                Ok(()) => {}
                Err(error) => {
                    return Err(io_err_string(format!("delete failed: {:?}", error)));
                }
            };

            if let Some(in_memory) = &this.unlocked_inner.in_memory {
                in_memory.delete(col, key).await?;
            }

            Ok(someval)
        }))
    }

    fn write(
        &self,
        transaction: DBTransaction,
    ) -> KeyValueDBPinBoxFuture<'_, Result<(), DBTransactionError>> {
        let this = self.clone();
        Box::pin(SendWrapper::new(async move {
            {
                match indexed_db::idb_commit_transaction(
                    &this.unlocked_inner.indexed_db,
                    &transaction,
                    this.unlocked_inner.columns,
                )
                .await
                {
                    Ok(()) => {}
                    Err(error) => {
                        return Err(DBTransactionError { error, transaction });
                    }
                };
            }
            if let Some(in_memory) = &this.unlocked_inner.in_memory {
                in_memory.write(transaction).await
            } else {
                Ok(())
            }
        }))
    }

    fn iter<
        'a,
        T: Send + 'static,
        C: Send + 'static,
        F: FnMut(&mut C, DBKeyValueRef) -> io::Result<Option<T>> + Send + Sync + 'static,
    >(
        &'a self,
        col: u32,
        prefix: Option<&'a [u8]>,
        mut context: C,
        mut f: F,
    ) -> KeyValueDBPinBoxFuture<'a, io::Result<(C, Option<T>)>> {
        let this = self.clone();
        Box::pin(async move {
            if col >= this.unlocked_inner.columns {
                return Err(io::Error::from(io::ErrorKind::NotFound));
            }
            if let Some(in_memory) = &this.unlocked_inner.in_memory {
                in_memory.iter(col, prefix, context, f).await
            } else {
                let mut stream = indexed_db::idb_cursor(
                    &this.unlocked_inner.indexed_db,
                    col,
                    None,
                    prefix.map(|p| p.to_vec()),
                )?;
                while let Some(kv) = stream.next().await {
                    match kv {
                        Ok((key, value)) => {
                            if let Some(out) = f(&mut context, (&key, &value))? {
                                return Ok((context, Some(out)));
                            }
                        }
                        Err(e) => {
                            return Err(e);
                        }
                    }
                }
                Ok((context, None))
            }
        })
    }

    fn iter_keys<
        'a,
        T: Send + 'static,
        C: Send + 'static,
        F: FnMut(&mut C, DBKeyRef) -> io::Result<Option<T>> + Send + Sync + 'static,
    >(
        &'a self,
        col: u32,
        prefix: Option<&'a [u8]>,
        mut context: C,
        mut f: F,
    ) -> KeyValueDBPinBoxFuture<'a, io::Result<(C, Option<T>)>> {
        let this = self.clone();
        Box::pin(async move {
            if col >= this.unlocked_inner.columns {
                return Err(io::Error::from(io::ErrorKind::NotFound));
            }

            if let Some(in_memory) = &this.unlocked_inner.in_memory {
                in_memory.iter_keys(col, prefix, context, f).await
            } else {
                let mut stream = indexed_db::idb_cursor_keys(
                    &this.unlocked_inner.indexed_db,
                    col,
                    prefix.map(|p| p.to_vec()),
                )?;
                while let Some(k) = stream.next().await {
                    match k {
                        Ok(key) => {
                            if let Some(out) = f(&mut context, &key)? {
                                return Ok((context, Some(out)));
                            }
                        }
                        Err(e) => {
                            return Err(e);
                        }
                    }
                }
                Ok((context, None))
            }
        })
    }

    fn num_columns(&self) -> Result<u32, io::Error> {
        Ok(self.unlocked_inner.columns)
    }

    fn num_keys(&self, col: u32) -> KeyValueDBPinBoxFuture<'_, io::Result<u64>> {
        let this = self.clone();
        Box::pin(async move {
            if col >= this.unlocked_inner.columns {
                return Err(io::Error::from(io::ErrorKind::NotFound));
            }

            if let Some(in_memory) = &this.unlocked_inner.in_memory {
                in_memory.num_keys(col).await
            } else {
                let mut stream =
                    indexed_db::idb_get_key_count(&this.unlocked_inner.indexed_db, col, None)?;
                if let Some(v) = stream.next().await {
                    match v {
                        Ok(value) => {
                            return Ok(value as u64);
                        }
                        Err(e) => {
                            return Err(e);
                        }
                    }
                }
                Err(io::Error::from(io::ErrorKind::InvalidData))
            }
        })
    }
}