1#![deny(clippy::all)]
7#![deny(missing_docs)]
8
9mod error;
10mod indexed_db;
11
12use keyvaluedb::{DBKeyRef, DBKeyValueRef, DBTransaction, DBTransactionError, DBValue};
13use keyvaluedb_memorydb::{self as in_memory, InMemory};
14use send_wrapper::SendWrapper;
15use std::future::Future;
16use std::io;
17use std::pin::Pin;
18use std::sync::Arc;
19
20pub use crate::error::*;
21pub use keyvaluedb::KeyValueDB;
22
23use futures::prelude::*;
24
25use web_sys::IdbDatabase;
26
27struct DatabaseUnlockedInner {
28 table_name: String,
29 version: u32,
30 columns: u32,
31 in_memory: Option<InMemory>,
32 indexed_db: SendWrapper<IdbDatabase>,
33}
34
35impl Drop for DatabaseUnlockedInner {
36 fn drop(&mut self) {
37 self.indexed_db.close();
38 }
39}
40
41#[derive(Clone)]
43pub struct Database {
44 unlocked_inner: Arc<DatabaseUnlockedInner>,
45}
46
47impl Database {
48 pub async fn open(
51 table_name: &str,
52 columns: u32,
53 memory_cached: bool,
54 ) -> Result<Database, error::Error> {
55 let db = indexed_db::open(table_name, None, columns)
57 .await
58 .map_err(io_err_string)?;
59
60 let db = if columns > db.columns {
66 let next_version = db.version + 1;
67 drop(db);
68 indexed_db::open(table_name, Some(next_version), columns)
69 .await
70 .map_err(io_err_string)?
71 } else {
72 db
73 };
74 let indexed_db::IndexedDB { version, inner, .. } = db;
76 let in_memory = if memory_cached {
77 let in_memory = in_memory::create(columns);
78 for column in 0..columns {
80 let mut tx = DBTransaction::new();
81 let mut stream = indexed_db::idb_cursor(&inner, column, None, None)
82 .map_err(error::Error::from)?;
83 while let Some(kv) = stream.next().await {
84 match kv {
85 Ok((key, value)) => {
86 tx.put(column, &key, &value);
87 }
88 Err(e) => {
89 return Err(e.into());
90 }
91 }
92 }
93 in_memory
95 .write(tx)
96 .await
97 .expect("writing in memory always succeeds; qed");
98 }
99 Some(in_memory)
100 } else {
101 None
102 };
103
104 Ok(Database {
105 unlocked_inner: Arc::new(DatabaseUnlockedInner {
106 table_name: table_name.to_owned(),
107 version,
108 columns,
109 in_memory,
110 indexed_db: inner,
111 }),
112 })
113 }
114
115 pub async fn delete(table_name: &str) -> io::Result<()> {
117 indexed_db::delete(table_name).await.map_err(io_err_string)
118 }
119
120 pub fn name(&self) -> String {
122 self.unlocked_inner.table_name.clone()
123 }
124
125 pub fn version(&self) -> u32 {
127 self.unlocked_inner.version
128 }
129}
130
131impl KeyValueDB for Database {
132 fn get<'a>(
133 &'a self,
134 col: u32,
135 key: &'a [u8],
136 ) -> Pin<Box<dyn Future<Output = io::Result<Option<DBValue>>> + Send + 'a>> {
137 let this = self.clone();
138 Box::pin(SendWrapper::new(async move {
139 if col >= this.unlocked_inner.columns {
140 return Err(io::Error::from(io::ErrorKind::NotFound));
141 }
142
143 if let Some(in_memory) = &this.unlocked_inner.in_memory {
144 in_memory.get(col, key).await
145 } else {
146 indexed_db::idb_get(&this.unlocked_inner.indexed_db, col, key).await
147 }
148 }))
149 }
150
151 fn delete<'a>(
152 &'a self,
153 col: u32,
154 key: &'a [u8],
155 ) -> Pin<Box<dyn Future<Output = io::Result<Option<DBValue>>> + Send + 'a>> {
156 let this = self.clone();
157 Box::pin(SendWrapper::new(async move {
158 if col >= this.unlocked_inner.columns {
159 return Err(io::Error::from(io::ErrorKind::NotFound));
160 }
161
162 let someval = indexed_db::idb_get(&this.unlocked_inner.indexed_db, col, key).await?;
163
164 let mut transaction = DBTransaction::new();
165 transaction.delete(col, key);
166
167 match indexed_db::idb_commit_transaction(
168 &this.unlocked_inner.indexed_db,
169 &transaction,
170 this.unlocked_inner.columns,
171 )
172 .await
173 {
174 Ok(()) => {}
175 Err(error) => {
176 return Err(io_err_string(format!("delete failed: {:?}", error)));
177 }
178 };
179
180 if let Some(in_memory) = &this.unlocked_inner.in_memory {
181 in_memory.delete(col, key).await?;
182 }
183
184 Ok(someval)
185 }))
186 }
187
188 fn write(
189 &self,
190 transaction: DBTransaction,
191 ) -> Pin<Box<dyn Future<Output = Result<(), DBTransactionError>> + Send + '_>> {
192 let this = self.clone();
193 Box::pin(SendWrapper::new(async move {
194 {
195 match indexed_db::idb_commit_transaction(
196 &this.unlocked_inner.indexed_db,
197 &transaction,
198 this.unlocked_inner.columns,
199 )
200 .await
201 {
202 Ok(()) => {}
203 Err(error) => {
204 return Err(DBTransactionError { error, transaction });
205 }
206 };
207 }
208 if let Some(in_memory) = &this.unlocked_inner.in_memory {
209 in_memory.write(transaction).await
210 } else {
211 Ok(())
212 }
213 }))
214 }
215
216 fn iter<
217 'a,
218 T: Send + 'static,
219 C: Send + 'static,
220 F: FnMut(&mut C, DBKeyValueRef) -> io::Result<Option<T>> + Send + Sync + 'static,
221 >(
222 &'a self,
223 col: u32,
224 prefix: Option<&'a [u8]>,
225 mut context: C,
226 mut f: F,
227 ) -> Pin<Box<dyn Future<Output = io::Result<(C, Option<T>)>> + Send + 'a>> {
228 let this = self.clone();
229 Box::pin(async move {
230 if col >= this.unlocked_inner.columns {
231 return Err(io::Error::from(io::ErrorKind::NotFound));
232 }
233 if let Some(in_memory) = &this.unlocked_inner.in_memory {
234 in_memory.iter(col, prefix, context, f).await
235 } else {
236 let mut stream = indexed_db::idb_cursor(
237 &this.unlocked_inner.indexed_db,
238 col,
239 None,
240 prefix.map(|p| p.to_vec()),
241 )?;
242 while let Some(kv) = stream.next().await {
243 match kv {
244 Ok((key, value)) => {
245 if let Some(out) = f(&mut context, (&key, &value))? {
246 return Ok((context, Some(out)));
247 }
248 }
249 Err(e) => {
250 return Err(e);
251 }
252 }
253 }
254 Ok((context, None))
255 }
256 })
257 }
258
259 fn iter_keys<
260 'a,
261 T: Send + 'static,
262 C: Send + 'static,
263 F: FnMut(&mut C, DBKeyRef) -> io::Result<Option<T>> + Send + Sync + 'static,
264 >(
265 &'a self,
266 col: u32,
267 prefix: Option<&'a [u8]>,
268 mut context: C,
269 mut f: F,
270 ) -> Pin<Box<dyn Future<Output = io::Result<(C, Option<T>)>> + Send + 'a>> {
271 let this = self.clone();
272 Box::pin(async move {
273 if col >= this.unlocked_inner.columns {
274 return Err(io::Error::from(io::ErrorKind::NotFound));
275 }
276
277 if let Some(in_memory) = &this.unlocked_inner.in_memory {
278 in_memory.iter_keys(col, prefix, context, f).await
279 } else {
280 let mut stream = indexed_db::idb_cursor_keys(
281 &this.unlocked_inner.indexed_db,
282 col,
283 prefix.map(|p| p.to_vec()),
284 )?;
285 while let Some(k) = stream.next().await {
286 match k {
287 Ok(key) => {
288 if let Some(out) = f(&mut context, &key)? {
289 return Ok((context, Some(out)));
290 }
291 }
292 Err(e) => {
293 return Err(e);
294 }
295 }
296 }
297 Ok((context, None))
298 }
299 })
300 }
301
302 fn num_columns(&self) -> Result<u32, io::Error> {
303 Ok(self.unlocked_inner.columns)
304 }
305
306 fn num_keys(&self, col: u32) -> Pin<Box<dyn Future<Output = io::Result<u64>> + Send + '_>> {
307 let this = self.clone();
308 Box::pin(async move {
309 if col >= this.unlocked_inner.columns {
310 return Err(io::Error::from(io::ErrorKind::NotFound));
311 }
312
313 if let Some(in_memory) = &this.unlocked_inner.in_memory {
314 in_memory.num_keys(col).await
315 } else {
316 let mut stream =
317 indexed_db::idb_get_key_count(&this.unlocked_inner.indexed_db, col, None)?;
318 if let Some(v) = stream.next().await {
319 match v {
320 Ok(value) => {
321 return Ok(value as u64);
322 }
323 Err(e) => {
324 return Err(e);
325 }
326 }
327 }
328 Err(io::Error::from(io::ErrorKind::InvalidData))
329 }
330 })
331 }
332}