1#![deny(clippy::all)]
7#![deny(missing_docs)]
8
9mod error;
10mod indexed_db;
11
12use async_lock::Mutex as AsyncMutex;
13use keyvaluedb::{DBKeyRef, DBKeyValueRef, DBTransaction, DBTransactionError, DBValue};
14use keyvaluedb_memorydb::{self as in_memory, InMemory};
15use send_wrapper::SendWrapper;
16use std::future::Future;
17use std::io;
18use std::pin::Pin;
19use std::sync::Arc;
20
21pub use crate::error::*;
22pub use keyvaluedb::KeyValueDB;
23
24use futures::prelude::*;
25
26use web_sys::IdbDatabase;
27
28struct DatabaseInner {
29 indexed_db: SendWrapper<IdbDatabase>,
30}
31
32impl Drop for DatabaseInner {
33 fn drop(&mut self) {
34 self.indexed_db.close();
35 }
36}
37
38struct DatabaseUnlockedInner {
39 table_name: String,
40 version: u32,
41 columns: u32,
42 in_memory: Option<InMemory>,
43}
44
45#[derive(Clone)]
47pub struct Database {
48 unlocked_inner: Arc<DatabaseUnlockedInner>,
49 inner: Arc<AsyncMutex<DatabaseInner>>,
50}
51
52impl Database {
53 pub async fn open(
56 table_name: &str,
57 columns: u32,
58 memory_cached: bool,
59 ) -> Result<Database, error::Error> {
60 let db = indexed_db::open(table_name, None, columns)
62 .await
63 .map_err(io_err_string)?;
64
65 let db = if columns + 1 > db.columns {
71 let next_version = db.version + 1;
72 drop(db);
73 indexed_db::open(table_name, Some(next_version), columns)
74 .await
75 .map_err(io_err_string)?
76 } else {
77 db
78 };
79 let indexed_db::IndexedDB { version, inner, .. } = db;
81 let in_memory = if memory_cached {
82 let in_memory = in_memory::create(columns);
83 for column in 0..columns {
85 let mut tx = DBTransaction::new();
86 let mut stream = indexed_db::idb_cursor(&inner, column, None, None)
87 .map_err(error::Error::from)?;
88 while let Some(kv) = stream.next().await {
89 match kv {
90 Ok((key, value)) => {
91 tx.put(column, &key, &value);
92 }
93 Err(e) => {
94 return Err(e.into());
95 }
96 }
97 }
98 in_memory
100 .write(tx)
101 .await
102 .expect("writing in memory always succeeds; qed");
103 }
104 Some(in_memory)
105 } else {
106 None
107 };
108
109 Ok(Database {
110 unlocked_inner: Arc::new(DatabaseUnlockedInner {
111 table_name: table_name.to_owned(),
112 version,
113 columns,
114 in_memory,
115 }),
116 inner: Arc::new(AsyncMutex::new(DatabaseInner { indexed_db: inner })),
117 })
118 }
119
120 pub async fn delete(table_name: &str) -> io::Result<()> {
122 indexed_db::delete(table_name).await.map_err(io_err_string)
123 }
124
125 pub fn name(&self) -> String {
127 self.unlocked_inner.table_name.clone()
128 }
129
130 pub fn version(&self) -> u32 {
132 self.unlocked_inner.version
133 }
134}
135
136impl KeyValueDB for Database {
137 fn get<'a>(
138 &self,
139 col: u32,
140 key: &'a [u8],
141 ) -> Pin<Box<dyn Future<Output = io::Result<Option<DBValue>>> + Send + 'a>> {
142 let this = self.clone();
143 Box::pin(SendWrapper::new(async move {
144 if col >= this.unlocked_inner.columns {
145 return Err(io::Error::from(io::ErrorKind::NotFound));
146 }
147
148 if let Some(in_memory) = &this.unlocked_inner.in_memory {
149 in_memory.get(col, key).await
150 } else {
151 let inner = this.inner.lock().await;
152 indexed_db::idb_get(&inner.indexed_db, col, key).await
153 }
154 }))
155 }
156
157 fn delete<'a>(
158 &self,
159 col: u32,
160 key: &'a [u8],
161 ) -> Pin<Box<dyn Future<Output = io::Result<Option<DBValue>>> + Send + 'a>> {
162 let this = self.clone();
163 Box::pin(SendWrapper::new(async move {
164 if col >= this.unlocked_inner.columns {
165 return Err(io::Error::from(io::ErrorKind::NotFound));
166 }
167
168 let inner = this.inner.lock().await;
169
170 let someval = indexed_db::idb_get(&inner.indexed_db, col, key).await?;
171
172 let mut transaction = DBTransaction::new();
173 transaction.delete(col, key);
174
175 match indexed_db::idb_commit_transaction(
176 &inner.indexed_db,
177 &transaction,
178 this.unlocked_inner.columns,
179 )
180 .await
181 {
182 Ok(()) => {}
183 Err(error) => {
184 return Err(io_err_string(format!("delete failed: {:?}", error)));
185 }
186 };
187
188 if let Some(in_memory) = &this.unlocked_inner.in_memory {
189 in_memory.delete(col, key).await?;
190 }
191
192 Ok(someval)
193 }))
194 }
195
196 fn write(
197 &self,
198 transaction: DBTransaction,
199 ) -> Pin<Box<dyn Future<Output = Result<(), DBTransactionError>> + Send + 'static>> {
200 let this = self.clone();
201 Box::pin(SendWrapper::new(async move {
202 {
203 let inner = this.inner.lock().await;
204 match indexed_db::idb_commit_transaction(
205 &inner.indexed_db,
206 &transaction,
207 this.unlocked_inner.columns,
208 )
209 .await
210 {
211 Ok(()) => {}
212 Err(error) => {
213 return Err(DBTransactionError { error, transaction });
214 }
215 };
216 }
217 if let Some(in_memory) = &this.unlocked_inner.in_memory {
218 in_memory.write(transaction).await
219 } else {
220 Ok(())
221 }
222 }))
223 }
224
225 fn iter<'a, T: 'a, F: FnMut(DBKeyValueRef) -> io::Result<Option<T>> + Send + Sync + 'a>(
226 &self,
227 col: u32,
228 prefix: Option<&'a [u8]>,
229 mut f: F,
230 ) -> Pin<Box<dyn Future<Output = io::Result<Option<T>>> + Send + 'a>> {
231 let this = self.clone();
232 Box::pin(async move {
233 if col >= this.unlocked_inner.columns {
234 return Err(io::Error::from(io::ErrorKind::NotFound));
235 }
236 if let Some(in_memory) = &this.unlocked_inner.in_memory {
237 in_memory.iter(col, prefix, f).await
238 } else {
239 let inner = this.inner.lock().await;
240 let mut stream = indexed_db::idb_cursor(
241 &inner.indexed_db,
242 col,
243 None,
244 prefix.map(|p| p.to_vec()),
245 )?;
246 while let Some(kv) = stream.next().await {
247 match kv {
248 Ok((key, value)) => {
249 if let Some(out) = f((&key, &value))? {
250 return Ok(Some(out));
251 }
252 }
253 Err(e) => {
254 return Err(e);
255 }
256 }
257 }
258 Ok(None)
259 }
260 })
261 }
262
263 fn iter_keys<'a, T: 'a, F: FnMut(DBKeyRef) -> io::Result<Option<T>> + Send + Sync + 'a>(
264 &self,
265 col: u32,
266 prefix: Option<&'a [u8]>,
267 mut f: F,
268 ) -> Pin<Box<dyn Future<Output = io::Result<Option<T>>> + Send + 'a>> {
269 let this = self.clone();
270 Box::pin(async move {
271 if col >= this.unlocked_inner.columns {
272 return Err(io::Error::from(io::ErrorKind::NotFound));
273 }
274
275 if let Some(in_memory) = &this.unlocked_inner.in_memory {
276 in_memory.iter_keys(col, prefix, f).await
277 } else {
278 let inner = this.inner.lock().await;
279 let mut stream = indexed_db::idb_cursor_keys(
280 &inner.indexed_db,
281 col,
282 prefix.map(|p| p.to_vec()),
283 )?;
284 while let Some(k) = stream.next().await {
285 match k {
286 Ok(key) => {
287 if let Some(out) = f(&key)? {
288 return Ok(Some(out));
289 }
290 }
291 Err(e) => {
292 return Err(e);
293 }
294 }
295 }
296 Ok(None)
297 }
298 })
299 }
300
301 fn num_columns(&self) -> Result<u32, io::Error> {
302 Ok(self.unlocked_inner.columns)
303 }
304
305 fn num_keys(&self, col: u32) -> Pin<Box<dyn Future<Output = io::Result<u64>> + Send>> {
306 let this = self.clone();
307 Box::pin(async move {
308 if col >= this.unlocked_inner.columns {
309 return Err(io::Error::from(io::ErrorKind::NotFound));
310 }
311
312 if let Some(in_memory) = &this.unlocked_inner.in_memory {
313 in_memory.num_keys(col).await
314 } else {
315 let inner = this.inner.lock().await;
316 let mut stream = indexed_db::idb_get_key_count(&inner.indexed_db, col, None)?;
317 if let Some(v) = stream.next().await {
318 match v {
319 Ok(value) => {
320 return Ok(value as u64);
321 }
322 Err(e) => {
323 return Err(e);
324 }
325 }
326 }
327 Err(io::Error::from(io::ErrorKind::InvalidData))
328 }
329 })
330 }
331
332 fn restore(&self, _new_db: &str) -> std::io::Result<()> {
334 Err(io_err_string("Not supported yet"))
335 }
336}