keyvaluedb_web/
lib.rs

1//! A key-value database for use in browsers
2//!
3//! Writes data both into memory and IndexedDB, optionally reads the whole database in memory
4//! from the IndexedDB on `open`.
5
6#![deny(clippy::all)]
7#![deny(missing_docs)]
8
9mod error;
10mod indexed_db;
11
12use async_lock::Mutex as AsyncMutex;
13use keyvaluedb::{DBKeyRef, DBKeyValueRef, DBTransaction, DBTransactionError, DBValue};
14use keyvaluedb_memorydb::{self as in_memory, InMemory};
15use send_wrapper::SendWrapper;
16use std::future::Future;
17use std::io;
18use std::pin::Pin;
19use std::sync::Arc;
20
21pub use crate::error::*;
22pub use keyvaluedb::KeyValueDB;
23
24use futures::prelude::*;
25
26use web_sys::IdbDatabase;
27
28struct DatabaseInner {
29    indexed_db: SendWrapper<IdbDatabase>,
30}
31
32impl Drop for DatabaseInner {
33    fn drop(&mut self) {
34        self.indexed_db.close();
35    }
36}
37
38struct DatabaseUnlockedInner {
39    table_name: String,
40    version: u32,
41    columns: u32,
42    in_memory: Option<InMemory>,
43}
44
45/// Database backed by both IndexedDB and in memory implementation.
46#[derive(Clone)]
47pub struct Database {
48    unlocked_inner: Arc<DatabaseUnlockedInner>,
49    inner: Arc<AsyncMutex<DatabaseInner>>,
50}
51
52impl Database {
53    /// Opens the database with the given name,
54    /// and the specified number of columns (not including the default one).
55    pub async fn open(
56        table_name: &str,
57        columns: u32,
58        memory_cached: bool,
59    ) -> Result<Database, error::Error> {
60        // let's try to open the latest version of the db first
61        let db = indexed_db::open(table_name, None, columns)
62            .await
63            .map_err(io_err_string)?;
64
65        // If we need more column than the latest version has,
66        // then bump the version (+ 1 for the default column).
67        // In order to bump the version, we close the database
68        // and reopen it with a higher version than it was opened with previously.
69        // cf. https://github.com/paritytech/parity-common/pull/202#discussion_r321221751
70        let db = if columns + 1 > db.columns {
71            let next_version = db.version + 1;
72            drop(db);
73            indexed_db::open(table_name, Some(next_version), columns)
74                .await
75                .map_err(io_err_string)?
76        } else {
77            db
78        };
79        // populate the in_memory db from the IndexedDB
80        let indexed_db::IndexedDB { version, inner, .. } = db;
81        let in_memory = if memory_cached {
82            let in_memory = in_memory::create(columns);
83            // read the columns from the IndexedDB
84            for column in 0..columns {
85                let mut tx = DBTransaction::new();
86                let mut stream = indexed_db::idb_cursor(&inner, column, None, None)
87                    .map_err(error::Error::from)?;
88                while let Some(kv) = stream.next().await {
89                    match kv {
90                        Ok((key, value)) => {
91                            tx.put(column, &key, &value);
92                        }
93                        Err(e) => {
94                            return Err(e.into());
95                        }
96                    }
97                }
98                // write each column into memory
99                in_memory
100                    .write(tx)
101                    .await
102                    .expect("writing in memory always succeeds; qed");
103            }
104            Some(in_memory)
105        } else {
106            None
107        };
108
109        Ok(Database {
110            unlocked_inner: Arc::new(DatabaseUnlockedInner {
111                table_name: table_name.to_owned(),
112                version,
113                columns,
114                in_memory,
115            }),
116            inner: Arc::new(AsyncMutex::new(DatabaseInner { indexed_db: inner })),
117        })
118    }
119
120    /// Deletes the database with the given name,
121    pub async fn delete(table_name: &str) -> io::Result<()> {
122        indexed_db::delete(table_name).await.map_err(io_err_string)
123    }
124
125    /// Get the database name.
126    pub fn name(&self) -> String {
127        self.unlocked_inner.table_name.clone()
128    }
129
130    /// Get the database version.
131    pub fn version(&self) -> u32 {
132        self.unlocked_inner.version
133    }
134}
135
136impl KeyValueDB for Database {
137    fn get<'a>(
138        &self,
139        col: u32,
140        key: &'a [u8],
141    ) -> Pin<Box<dyn Future<Output = io::Result<Option<DBValue>>> + Send + 'a>> {
142        let this = self.clone();
143        Box::pin(SendWrapper::new(async move {
144            if col >= this.unlocked_inner.columns {
145                return Err(io::Error::from(io::ErrorKind::NotFound));
146            }
147
148            if let Some(in_memory) = &this.unlocked_inner.in_memory {
149                in_memory.get(col, key).await
150            } else {
151                let inner = this.inner.lock().await;
152                indexed_db::idb_get(&inner.indexed_db, col, key).await
153            }
154        }))
155    }
156
157    fn delete<'a>(
158        &self,
159        col: u32,
160        key: &'a [u8],
161    ) -> Pin<Box<dyn Future<Output = io::Result<Option<DBValue>>> + Send + 'a>> {
162        let this = self.clone();
163        Box::pin(SendWrapper::new(async move {
164            if col >= this.unlocked_inner.columns {
165                return Err(io::Error::from(io::ErrorKind::NotFound));
166            }
167
168            let inner = this.inner.lock().await;
169
170            let someval = indexed_db::idb_get(&inner.indexed_db, col, key).await?;
171
172            let mut transaction = DBTransaction::new();
173            transaction.delete(col, key);
174
175            match indexed_db::idb_commit_transaction(
176                &inner.indexed_db,
177                &transaction,
178                this.unlocked_inner.columns,
179            )
180            .await
181            {
182                Ok(()) => {}
183                Err(error) => {
184                    return Err(io_err_string(format!("delete failed: {:?}", error)));
185                }
186            };
187
188            if let Some(in_memory) = &this.unlocked_inner.in_memory {
189                in_memory.delete(col, key).await?;
190            }
191
192            Ok(someval)
193        }))
194    }
195
196    fn write(
197        &self,
198        transaction: DBTransaction,
199    ) -> Pin<Box<dyn Future<Output = Result<(), DBTransactionError>> + Send + 'static>> {
200        let this = self.clone();
201        Box::pin(SendWrapper::new(async move {
202            {
203                let inner = this.inner.lock().await;
204                match indexed_db::idb_commit_transaction(
205                    &inner.indexed_db,
206                    &transaction,
207                    this.unlocked_inner.columns,
208                )
209                .await
210                {
211                    Ok(()) => {}
212                    Err(error) => {
213                        return Err(DBTransactionError { error, transaction });
214                    }
215                };
216            }
217            if let Some(in_memory) = &this.unlocked_inner.in_memory {
218                in_memory.write(transaction).await
219            } else {
220                Ok(())
221            }
222        }))
223    }
224
225    fn iter<'a, T: 'a, F: FnMut(DBKeyValueRef) -> io::Result<Option<T>> + Send + Sync + 'a>(
226        &self,
227        col: u32,
228        prefix: Option<&'a [u8]>,
229        mut f: F,
230    ) -> Pin<Box<dyn Future<Output = io::Result<Option<T>>> + Send + 'a>> {
231        let this = self.clone();
232        Box::pin(async move {
233            if col >= this.unlocked_inner.columns {
234                return Err(io::Error::from(io::ErrorKind::NotFound));
235            }
236            if let Some(in_memory) = &this.unlocked_inner.in_memory {
237                in_memory.iter(col, prefix, f).await
238            } else {
239                let inner = this.inner.lock().await;
240                let mut stream = indexed_db::idb_cursor(
241                    &inner.indexed_db,
242                    col,
243                    None,
244                    prefix.map(|p| p.to_vec()),
245                )?;
246                while let Some(kv) = stream.next().await {
247                    match kv {
248                        Ok((key, value)) => {
249                            if let Some(out) = f((&key, &value))? {
250                                return Ok(Some(out));
251                            }
252                        }
253                        Err(e) => {
254                            return Err(e);
255                        }
256                    }
257                }
258                Ok(None)
259            }
260        })
261    }
262
263    fn iter_keys<'a, T: 'a, F: FnMut(DBKeyRef) -> io::Result<Option<T>> + Send + Sync + 'a>(
264        &self,
265        col: u32,
266        prefix: Option<&'a [u8]>,
267        mut f: F,
268    ) -> Pin<Box<dyn Future<Output = io::Result<Option<T>>> + Send + 'a>> {
269        let this = self.clone();
270        Box::pin(async move {
271            if col >= this.unlocked_inner.columns {
272                return Err(io::Error::from(io::ErrorKind::NotFound));
273            }
274
275            if let Some(in_memory) = &this.unlocked_inner.in_memory {
276                in_memory.iter_keys(col, prefix, f).await
277            } else {
278                let inner = this.inner.lock().await;
279                let mut stream = indexed_db::idb_cursor_keys(
280                    &inner.indexed_db,
281                    col,
282                    prefix.map(|p| p.to_vec()),
283                )?;
284                while let Some(k) = stream.next().await {
285                    match k {
286                        Ok(key) => {
287                            if let Some(out) = f(&key)? {
288                                return Ok(Some(out));
289                            }
290                        }
291                        Err(e) => {
292                            return Err(e);
293                        }
294                    }
295                }
296                Ok(None)
297            }
298        })
299    }
300
301    fn num_columns(&self) -> Result<u32, io::Error> {
302        Ok(self.unlocked_inner.columns)
303    }
304
305    fn num_keys(&self, col: u32) -> Pin<Box<dyn Future<Output = io::Result<u64>> + Send>> {
306        let this = self.clone();
307        Box::pin(async move {
308            if col >= this.unlocked_inner.columns {
309                return Err(io::Error::from(io::ErrorKind::NotFound));
310            }
311
312            if let Some(in_memory) = &this.unlocked_inner.in_memory {
313                in_memory.num_keys(col).await
314            } else {
315                let inner = this.inner.lock().await;
316                let mut stream = indexed_db::idb_get_key_count(&inner.indexed_db, col, None)?;
317                if let Some(v) = stream.next().await {
318                    match v {
319                        Ok(value) => {
320                            return Ok(value as u64);
321                        }
322                        Err(e) => {
323                            return Err(e);
324                        }
325                    }
326                }
327                Err(io::Error::from(io::ErrorKind::InvalidData))
328            }
329        })
330    }
331
332    // NOTE: not supported
333    fn restore(&self, _new_db: &str) -> std::io::Result<()> {
334        Err(io_err_string("Not supported yet"))
335    }
336}