Skip to main content

keyvaluedb_web/
lib.rs

1//! A key-value database for use in browsers
2//!
3//! Writes data both into memory and IndexedDB, optionally reads the whole database in memory
4//! from the IndexedDB on `open`.
5
6#![deny(clippy::all)]
7#![deny(missing_docs)]
8
9mod error;
10mod indexed_db;
11
12use keyvaluedb::{
13    DBKeyRef, DBKeyValueRef, DBTransaction, DBTransactionError, DBValue, KeyValueDBPinBoxFuture,
14};
15use keyvaluedb_memorydb::{self as in_memory, InMemory};
16use send_wrapper::SendWrapper;
17use std::io;
18use std::sync::Arc;
19
20pub use crate::error::*;
21pub use keyvaluedb::KeyValueDB;
22
23use futures::prelude::*;
24
25use web_sys::IdbDatabase;
26
27struct DatabaseUnlockedInner {
28    table_name: String,
29    version: u32,
30    columns: u32,
31    in_memory: Option<InMemory>,
32    indexed_db: SendWrapper<IdbDatabase>,
33}
34
35impl Drop for DatabaseUnlockedInner {
36    fn drop(&mut self) {
37        self.indexed_db.close();
38    }
39}
40
41/// Database backed by both IndexedDB and in memory implementation.
42#[derive(Clone)]
43pub struct Database {
44    unlocked_inner: Arc<DatabaseUnlockedInner>,
45}
46
47impl Database {
48    /// Opens the database with the given name,
49    /// and the specified number of columns (not including the default one).
50    pub async fn open(
51        table_name: &str,
52        columns: u32,
53        memory_cached: bool,
54    ) -> Result<Database, error::Error> {
55        // let's try to open the latest version of the db first
56        let db = indexed_db::open(table_name, None, columns)
57            .await
58            .map_err(io_err_string)?;
59
60        // If we need more column than the latest version has,
61        // then bump the version.
62        // In order to bump the version, we close the database
63        // and reopen it with a higher version than it was opened with previously.
64        // cf. https://github.com/paritytech/parity-common/pull/202#discussion_r321221751
65        let db = if columns > db.columns {
66            let next_version = db.version + 1;
67            drop(db);
68            indexed_db::open(table_name, Some(next_version), columns)
69                .await
70                .map_err(io_err_string)?
71        } else {
72            db
73        };
74        // populate the in_memory db from the IndexedDB
75        let indexed_db::IndexedDB { version, inner, .. } = db;
76        let in_memory = if memory_cached {
77            let in_memory = in_memory::create(columns);
78            // read the columns from the IndexedDB
79            for column in 0..columns {
80                let mut tx = DBTransaction::new();
81                let mut stream = indexed_db::idb_cursor(&inner, column, None, None)
82                    .map_err(error::Error::from)?;
83                while let Some(kv) = stream.next().await {
84                    match kv {
85                        Ok((key, value)) => {
86                            tx.put(column, &key, &value);
87                        }
88                        Err(e) => {
89                            return Err(e.into());
90                        }
91                    }
92                }
93                // write each column into memory
94                in_memory
95                    .write(tx)
96                    .await
97                    .expect("writing in memory always succeeds; qed");
98            }
99            Some(in_memory)
100        } else {
101            None
102        };
103
104        Ok(Database {
105            unlocked_inner: Arc::new(DatabaseUnlockedInner {
106                table_name: table_name.to_owned(),
107                version,
108                columns,
109                in_memory,
110                indexed_db: inner,
111            }),
112        })
113    }
114
115    /// Deletes the database with the given name,
116    pub async fn delete(table_name: &str) -> io::Result<()> {
117        indexed_db::delete(table_name).await.map_err(io_err_string)
118    }
119
120    /// Enumerate every IndexedDB database visible to the current origin,
121    /// optionally filtered to names starting with `opt_prefix`. Returns
122    /// `(name, version)` tuples. Backed by the `indexedDB.databases()` API.
123    pub fn list(
124        opt_prefix: Option<&str>,
125    ) -> KeyValueDBPinBoxFuture<'_, io::Result<Vec<(String, u32)>>> {
126        let opt_prefix = opt_prefix.map(|p| p.to_owned());
127        Box::pin(SendWrapper::new(async move {
128            let names = indexed_db::names_with_versions()
129                .await
130                .map_err(io_err_string)?;
131            let Some(prefix) = opt_prefix else {
132                return Ok(names);
133            };
134            Ok(names
135                .into_iter()
136                .filter(|(name, _ver)| name.starts_with(&prefix))
137                .collect())
138        }))
139    }
140
141    /// Get the database name.
142    pub fn name(&self) -> String {
143        self.unlocked_inner.table_name.clone()
144    }
145
146    /// Get the database version.
147    pub fn version(&self) -> u32 {
148        self.unlocked_inner.version
149    }
150}
151
152impl KeyValueDB for Database {
153    fn get<'a>(
154        &'a self,
155        col: u32,
156        key: &'a [u8],
157    ) -> KeyValueDBPinBoxFuture<'a, io::Result<Option<DBValue>>> {
158        let this = self.clone();
159        Box::pin(SendWrapper::new(async move {
160            if col >= this.unlocked_inner.columns {
161                return Err(io::Error::from(io::ErrorKind::NotFound));
162            }
163
164            if let Some(in_memory) = &this.unlocked_inner.in_memory {
165                in_memory.get(col, key).await
166            } else {
167                indexed_db::idb_get(&this.unlocked_inner.indexed_db, col, key).await
168            }
169        }))
170    }
171
172    fn delete<'a>(
173        &'a self,
174        col: u32,
175        key: &'a [u8],
176    ) -> KeyValueDBPinBoxFuture<'a, io::Result<Option<DBValue>>> {
177        let this = self.clone();
178        Box::pin(SendWrapper::new(async move {
179            if col >= this.unlocked_inner.columns {
180                return Err(io::Error::from(io::ErrorKind::NotFound));
181            }
182
183            let someval = indexed_db::idb_get(&this.unlocked_inner.indexed_db, col, key).await?;
184
185            let mut transaction = DBTransaction::new();
186            transaction.delete(col, key);
187
188            match indexed_db::idb_commit_transaction(
189                &this.unlocked_inner.indexed_db,
190                &transaction,
191                this.unlocked_inner.columns,
192            )
193            .await
194            {
195                Ok(()) => {}
196                Err(error) => {
197                    return Err(io_err_string(format!("delete failed: {:?}", error)));
198                }
199            };
200
201            if let Some(in_memory) = &this.unlocked_inner.in_memory {
202                in_memory.delete(col, key).await?;
203            }
204
205            Ok(someval)
206        }))
207    }
208
209    fn write(
210        &self,
211        transaction: DBTransaction,
212    ) -> KeyValueDBPinBoxFuture<'_, Result<(), DBTransactionError>> {
213        let this = self.clone();
214        Box::pin(SendWrapper::new(async move {
215            {
216                match indexed_db::idb_commit_transaction(
217                    &this.unlocked_inner.indexed_db,
218                    &transaction,
219                    this.unlocked_inner.columns,
220                )
221                .await
222                {
223                    Ok(()) => {}
224                    Err(error) => {
225                        return Err(DBTransactionError { error, transaction });
226                    }
227                };
228            }
229            if let Some(in_memory) = &this.unlocked_inner.in_memory {
230                in_memory.write(transaction).await
231            } else {
232                Ok(())
233            }
234        }))
235    }
236
237    fn iter<
238        'a,
239        T: Send + 'static,
240        C: Send + 'static,
241        F: FnMut(&mut C, DBKeyValueRef) -> io::Result<Option<T>> + Send + Sync + 'static,
242    >(
243        &'a self,
244        col: u32,
245        prefix: Option<&'a [u8]>,
246        mut context: C,
247        mut f: F,
248    ) -> KeyValueDBPinBoxFuture<'a, io::Result<(C, Option<T>)>> {
249        let this = self.clone();
250        Box::pin(async move {
251            if col >= this.unlocked_inner.columns {
252                return Err(io::Error::from(io::ErrorKind::NotFound));
253            }
254            if let Some(in_memory) = &this.unlocked_inner.in_memory {
255                in_memory.iter(col, prefix, context, f).await
256            } else {
257                let mut stream = indexed_db::idb_cursor(
258                    &this.unlocked_inner.indexed_db,
259                    col,
260                    None,
261                    prefix.map(|p| p.to_vec()),
262                )?;
263                while let Some(kv) = stream.next().await {
264                    match kv {
265                        Ok((key, value)) => {
266                            if let Some(out) = f(&mut context, (&key, &value))? {
267                                return Ok((context, Some(out)));
268                            }
269                        }
270                        Err(e) => {
271                            return Err(e);
272                        }
273                    }
274                }
275                Ok((context, None))
276            }
277        })
278    }
279
280    fn iter_keys<
281        'a,
282        T: Send + 'static,
283        C: Send + 'static,
284        F: FnMut(&mut C, DBKeyRef) -> io::Result<Option<T>> + Send + Sync + 'static,
285    >(
286        &'a self,
287        col: u32,
288        prefix: Option<&'a [u8]>,
289        mut context: C,
290        mut f: F,
291    ) -> KeyValueDBPinBoxFuture<'a, io::Result<(C, Option<T>)>> {
292        let this = self.clone();
293        Box::pin(async move {
294            if col >= this.unlocked_inner.columns {
295                return Err(io::Error::from(io::ErrorKind::NotFound));
296            }
297
298            if let Some(in_memory) = &this.unlocked_inner.in_memory {
299                in_memory.iter_keys(col, prefix, context, f).await
300            } else {
301                let mut stream = indexed_db::idb_cursor_keys(
302                    &this.unlocked_inner.indexed_db,
303                    col,
304                    prefix.map(|p| p.to_vec()),
305                )?;
306                while let Some(k) = stream.next().await {
307                    match k {
308                        Ok(key) => {
309                            if let Some(out) = f(&mut context, &key)? {
310                                return Ok((context, Some(out)));
311                            }
312                        }
313                        Err(e) => {
314                            return Err(e);
315                        }
316                    }
317                }
318                Ok((context, None))
319            }
320        })
321    }
322
323    fn num_columns(&self) -> Result<u32, io::Error> {
324        Ok(self.unlocked_inner.columns)
325    }
326
327    fn num_keys(&self, col: u32) -> KeyValueDBPinBoxFuture<'_, io::Result<u64>> {
328        let this = self.clone();
329        Box::pin(async move {
330            if col >= this.unlocked_inner.columns {
331                return Err(io::Error::from(io::ErrorKind::NotFound));
332            }
333
334            if let Some(in_memory) = &this.unlocked_inner.in_memory {
335                in_memory.num_keys(col).await
336            } else {
337                let mut stream =
338                    indexed_db::idb_get_key_count(&this.unlocked_inner.indexed_db, col, None)?;
339                if let Some(v) = stream.next().await {
340                    match v {
341                        Ok(value) => {
342                            return Ok(value as u64);
343                        }
344                        Err(e) => {
345                            return Err(e);
346                        }
347                    }
348                }
349                Err(io::Error::from(io::ErrorKind::InvalidData))
350            }
351        })
352    }
353}