keyvaluedb_web/
lib.rs

1//! A key-value database for use in browsers
2//!
3//! Writes data both into memory and IndexedDB, optionally reads the whole database in memory
4//! from the IndexedDB on `open`.
5
6#![deny(clippy::all)]
7#![deny(missing_docs)]
8
9mod error;
10mod indexed_db;
11
12use keyvaluedb::{DBKeyRef, DBKeyValueRef, DBTransaction, DBTransactionError, DBValue};
13use keyvaluedb_memorydb::{self as in_memory, InMemory};
14use send_wrapper::SendWrapper;
15use std::future::Future;
16use std::io;
17use std::pin::Pin;
18use std::sync::Arc;
19
20pub use crate::error::*;
21pub use keyvaluedb::KeyValueDB;
22
23use futures::prelude::*;
24
25use web_sys::IdbDatabase;
26
27struct DatabaseUnlockedInner {
28    table_name: String,
29    version: u32,
30    columns: u32,
31    in_memory: Option<InMemory>,
32    indexed_db: SendWrapper<IdbDatabase>,
33}
34
35impl Drop for DatabaseUnlockedInner {
36    fn drop(&mut self) {
37        self.indexed_db.close();
38    }
39}
40
41/// Database backed by both IndexedDB and in memory implementation.
42#[derive(Clone)]
43pub struct Database {
44    unlocked_inner: Arc<DatabaseUnlockedInner>,
45}
46
47impl Database {
48    /// Opens the database with the given name,
49    /// and the specified number of columns (not including the default one).
50    pub async fn open(
51        table_name: &str,
52        columns: u32,
53        memory_cached: bool,
54    ) -> Result<Database, error::Error> {
55        // let's try to open the latest version of the db first
56        let db = indexed_db::open(table_name, None, columns)
57            .await
58            .map_err(io_err_string)?;
59
60        // If we need more column than the latest version has,
61        // then bump the version.
62        // In order to bump the version, we close the database
63        // and reopen it with a higher version than it was opened with previously.
64        // cf. https://github.com/paritytech/parity-common/pull/202#discussion_r321221751
65        let db = if columns > db.columns {
66            let next_version = db.version + 1;
67            drop(db);
68            indexed_db::open(table_name, Some(next_version), columns)
69                .await
70                .map_err(io_err_string)?
71        } else {
72            db
73        };
74        // populate the in_memory db from the IndexedDB
75        let indexed_db::IndexedDB { version, inner, .. } = db;
76        let in_memory = if memory_cached {
77            let in_memory = in_memory::create(columns);
78            // read the columns from the IndexedDB
79            for column in 0..columns {
80                let mut tx = DBTransaction::new();
81                let mut stream = indexed_db::idb_cursor(&inner, column, None, None)
82                    .map_err(error::Error::from)?;
83                while let Some(kv) = stream.next().await {
84                    match kv {
85                        Ok((key, value)) => {
86                            tx.put(column, &key, &value);
87                        }
88                        Err(e) => {
89                            return Err(e.into());
90                        }
91                    }
92                }
93                // write each column into memory
94                in_memory
95                    .write(tx)
96                    .await
97                    .expect("writing in memory always succeeds; qed");
98            }
99            Some(in_memory)
100        } else {
101            None
102        };
103
104        Ok(Database {
105            unlocked_inner: Arc::new(DatabaseUnlockedInner {
106                table_name: table_name.to_owned(),
107                version,
108                columns,
109                in_memory,
110                indexed_db: inner,
111            }),
112        })
113    }
114
115    /// Deletes the database with the given name,
116    pub async fn delete(table_name: &str) -> io::Result<()> {
117        indexed_db::delete(table_name).await.map_err(io_err_string)
118    }
119
120    /// Get the database name.
121    pub fn name(&self) -> String {
122        self.unlocked_inner.table_name.clone()
123    }
124
125    /// Get the database version.
126    pub fn version(&self) -> u32 {
127        self.unlocked_inner.version
128    }
129}
130
131impl KeyValueDB for Database {
132    fn get<'a>(
133        &'a self,
134        col: u32,
135        key: &'a [u8],
136    ) -> Pin<Box<dyn Future<Output = io::Result<Option<DBValue>>> + Send + 'a>> {
137        let this = self.clone();
138        Box::pin(SendWrapper::new(async move {
139            if col >= this.unlocked_inner.columns {
140                return Err(io::Error::from(io::ErrorKind::NotFound));
141            }
142
143            if let Some(in_memory) = &this.unlocked_inner.in_memory {
144                in_memory.get(col, key).await
145            } else {
146                indexed_db::idb_get(&this.unlocked_inner.indexed_db, col, key).await
147            }
148        }))
149    }
150
151    fn delete<'a>(
152        &'a self,
153        col: u32,
154        key: &'a [u8],
155    ) -> Pin<Box<dyn Future<Output = io::Result<Option<DBValue>>> + Send + 'a>> {
156        let this = self.clone();
157        Box::pin(SendWrapper::new(async move {
158            if col >= this.unlocked_inner.columns {
159                return Err(io::Error::from(io::ErrorKind::NotFound));
160            }
161
162            let someval = indexed_db::idb_get(&this.unlocked_inner.indexed_db, col, key).await?;
163
164            let mut transaction = DBTransaction::new();
165            transaction.delete(col, key);
166
167            match indexed_db::idb_commit_transaction(
168                &this.unlocked_inner.indexed_db,
169                &transaction,
170                this.unlocked_inner.columns,
171            )
172            .await
173            {
174                Ok(()) => {}
175                Err(error) => {
176                    return Err(io_err_string(format!("delete failed: {:?}", error)));
177                }
178            };
179
180            if let Some(in_memory) = &this.unlocked_inner.in_memory {
181                in_memory.delete(col, key).await?;
182            }
183
184            Ok(someval)
185        }))
186    }
187
188    fn write(
189        &self,
190        transaction: DBTransaction,
191    ) -> Pin<Box<dyn Future<Output = Result<(), DBTransactionError>> + Send + '_>> {
192        let this = self.clone();
193        Box::pin(SendWrapper::new(async move {
194            {
195                match indexed_db::idb_commit_transaction(
196                    &this.unlocked_inner.indexed_db,
197                    &transaction,
198                    this.unlocked_inner.columns,
199                )
200                .await
201                {
202                    Ok(()) => {}
203                    Err(error) => {
204                        return Err(DBTransactionError { error, transaction });
205                    }
206                };
207            }
208            if let Some(in_memory) = &this.unlocked_inner.in_memory {
209                in_memory.write(transaction).await
210            } else {
211                Ok(())
212            }
213        }))
214    }
215
216    fn iter<
217        'a,
218        T: Send + 'static,
219        C: Send + 'static,
220        F: FnMut(&mut C, DBKeyValueRef) -> io::Result<Option<T>> + Send + Sync + 'static,
221    >(
222        &'a self,
223        col: u32,
224        prefix: Option<&'a [u8]>,
225        mut context: C,
226        mut f: F,
227    ) -> Pin<Box<dyn Future<Output = io::Result<(C, Option<T>)>> + Send + 'a>> {
228        let this = self.clone();
229        Box::pin(async move {
230            if col >= this.unlocked_inner.columns {
231                return Err(io::Error::from(io::ErrorKind::NotFound));
232            }
233            if let Some(in_memory) = &this.unlocked_inner.in_memory {
234                in_memory.iter(col, prefix, context, f).await
235            } else {
236                let mut stream = indexed_db::idb_cursor(
237                    &this.unlocked_inner.indexed_db,
238                    col,
239                    None,
240                    prefix.map(|p| p.to_vec()),
241                )?;
242                while let Some(kv) = stream.next().await {
243                    match kv {
244                        Ok((key, value)) => {
245                            if let Some(out) = f(&mut context, (&key, &value))? {
246                                return Ok((context, Some(out)));
247                            }
248                        }
249                        Err(e) => {
250                            return Err(e);
251                        }
252                    }
253                }
254                Ok((context, None))
255            }
256        })
257    }
258
259    fn iter_keys<
260        'a,
261        T: Send + 'static,
262        C: Send + 'static,
263        F: FnMut(&mut C, DBKeyRef) -> io::Result<Option<T>> + Send + Sync + 'static,
264    >(
265        &'a self,
266        col: u32,
267        prefix: Option<&'a [u8]>,
268        mut context: C,
269        mut f: F,
270    ) -> Pin<Box<dyn Future<Output = io::Result<(C, Option<T>)>> + Send + 'a>> {
271        let this = self.clone();
272        Box::pin(async move {
273            if col >= this.unlocked_inner.columns {
274                return Err(io::Error::from(io::ErrorKind::NotFound));
275            }
276
277            if let Some(in_memory) = &this.unlocked_inner.in_memory {
278                in_memory.iter_keys(col, prefix, context, f).await
279            } else {
280                let mut stream = indexed_db::idb_cursor_keys(
281                    &this.unlocked_inner.indexed_db,
282                    col,
283                    prefix.map(|p| p.to_vec()),
284                )?;
285                while let Some(k) = stream.next().await {
286                    match k {
287                        Ok(key) => {
288                            if let Some(out) = f(&mut context, &key)? {
289                                return Ok((context, Some(out)));
290                            }
291                        }
292                        Err(e) => {
293                            return Err(e);
294                        }
295                    }
296                }
297                Ok((context, None))
298            }
299        })
300    }
301
302    fn num_columns(&self) -> Result<u32, io::Error> {
303        Ok(self.unlocked_inner.columns)
304    }
305
306    fn num_keys(&self, col: u32) -> Pin<Box<dyn Future<Output = io::Result<u64>> + Send + '_>> {
307        let this = self.clone();
308        Box::pin(async move {
309            if col >= this.unlocked_inner.columns {
310                return Err(io::Error::from(io::ErrorKind::NotFound));
311            }
312
313            if let Some(in_memory) = &this.unlocked_inner.in_memory {
314                in_memory.num_keys(col).await
315            } else {
316                let mut stream =
317                    indexed_db::idb_get_key_count(&this.unlocked_inner.indexed_db, col, None)?;
318                if let Some(v) = stream.next().await {
319                    match v {
320                        Ok(value) => {
321                            return Ok(value as u64);
322                        }
323                        Err(e) => {
324                            return Err(e);
325                        }
326                    }
327                }
328                Err(io::Error::from(io::ErrorKind::InvalidData))
329            }
330        })
331    }
332}