1#![deny(clippy::all)]
7#![deny(missing_docs)]
8
9mod error;
10mod indexed_db;
11
12use keyvaluedb::{
13 DBKeyRef, DBKeyValueRef, DBTransaction, DBTransactionError, DBValue, KeyValueDBPinBoxFuture,
14};
15use keyvaluedb_memorydb::{self as in_memory, InMemory};
16use send_wrapper::SendWrapper;
17use std::io;
18use std::sync::Arc;
19
20pub use crate::error::*;
21pub use keyvaluedb::KeyValueDB;
22
23use futures::prelude::*;
24
25use web_sys::IdbDatabase;
26
27struct DatabaseUnlockedInner {
28 table_name: String,
29 version: u32,
30 columns: u32,
31 in_memory: Option<InMemory>,
32 indexed_db: SendWrapper<IdbDatabase>,
33}
34
35impl Drop for DatabaseUnlockedInner {
36 fn drop(&mut self) {
37 self.indexed_db.close();
38 }
39}
40
41#[derive(Clone)]
43pub struct Database {
44 unlocked_inner: Arc<DatabaseUnlockedInner>,
45}
46
47impl Database {
48 pub async fn open(
51 table_name: &str,
52 columns: u32,
53 memory_cached: bool,
54 ) -> Result<Database, error::Error> {
55 let db = indexed_db::open(table_name, None, columns)
57 .await
58 .map_err(io_err_string)?;
59
60 let db = if columns > db.columns {
66 let next_version = db.version + 1;
67 drop(db);
68 indexed_db::open(table_name, Some(next_version), columns)
69 .await
70 .map_err(io_err_string)?
71 } else {
72 db
73 };
74 let indexed_db::IndexedDB { version, inner, .. } = db;
76 let in_memory = if memory_cached {
77 let in_memory = in_memory::create(columns);
78 for column in 0..columns {
80 let mut tx = DBTransaction::new();
81 let mut stream = indexed_db::idb_cursor(&inner, column, None, None)
82 .map_err(error::Error::from)?;
83 while let Some(kv) = stream.next().await {
84 match kv {
85 Ok((key, value)) => {
86 tx.put(column, &key, &value);
87 }
88 Err(e) => {
89 return Err(e.into());
90 }
91 }
92 }
93 in_memory
95 .write(tx)
96 .await
97 .expect("writing in memory always succeeds; qed");
98 }
99 Some(in_memory)
100 } else {
101 None
102 };
103
104 Ok(Database {
105 unlocked_inner: Arc::new(DatabaseUnlockedInner {
106 table_name: table_name.to_owned(),
107 version,
108 columns,
109 in_memory,
110 indexed_db: inner,
111 }),
112 })
113 }
114
115 pub async fn delete(table_name: &str) -> io::Result<()> {
117 indexed_db::delete(table_name).await.map_err(io_err_string)
118 }
119
120 pub fn list(
124 opt_prefix: Option<&str>,
125 ) -> KeyValueDBPinBoxFuture<'_, io::Result<Vec<(String, u32)>>> {
126 let opt_prefix = opt_prefix.map(|p| p.to_owned());
127 Box::pin(SendWrapper::new(async move {
128 let names = indexed_db::names_with_versions()
129 .await
130 .map_err(io_err_string)?;
131 let Some(prefix) = opt_prefix else {
132 return Ok(names);
133 };
134 Ok(names
135 .into_iter()
136 .filter(|(name, _ver)| name.starts_with(&prefix))
137 .collect())
138 }))
139 }
140
141 pub fn name(&self) -> String {
143 self.unlocked_inner.table_name.clone()
144 }
145
146 pub fn version(&self) -> u32 {
148 self.unlocked_inner.version
149 }
150}
151
152impl KeyValueDB for Database {
153 fn get<'a>(
154 &'a self,
155 col: u32,
156 key: &'a [u8],
157 ) -> KeyValueDBPinBoxFuture<'a, io::Result<Option<DBValue>>> {
158 let this = self.clone();
159 Box::pin(SendWrapper::new(async move {
160 if col >= this.unlocked_inner.columns {
161 return Err(io::Error::from(io::ErrorKind::NotFound));
162 }
163
164 if let Some(in_memory) = &this.unlocked_inner.in_memory {
165 in_memory.get(col, key).await
166 } else {
167 indexed_db::idb_get(&this.unlocked_inner.indexed_db, col, key).await
168 }
169 }))
170 }
171
172 fn delete<'a>(
173 &'a self,
174 col: u32,
175 key: &'a [u8],
176 ) -> KeyValueDBPinBoxFuture<'a, io::Result<Option<DBValue>>> {
177 let this = self.clone();
178 Box::pin(SendWrapper::new(async move {
179 if col >= this.unlocked_inner.columns {
180 return Err(io::Error::from(io::ErrorKind::NotFound));
181 }
182
183 let someval = indexed_db::idb_get(&this.unlocked_inner.indexed_db, col, key).await?;
184
185 let mut transaction = DBTransaction::new();
186 transaction.delete(col, key);
187
188 match indexed_db::idb_commit_transaction(
189 &this.unlocked_inner.indexed_db,
190 &transaction,
191 this.unlocked_inner.columns,
192 )
193 .await
194 {
195 Ok(()) => {}
196 Err(error) => {
197 return Err(io_err_string(format!("delete failed: {:?}", error)));
198 }
199 };
200
201 if let Some(in_memory) = &this.unlocked_inner.in_memory {
202 in_memory.delete(col, key).await?;
203 }
204
205 Ok(someval)
206 }))
207 }
208
209 fn write(
210 &self,
211 transaction: DBTransaction,
212 ) -> KeyValueDBPinBoxFuture<'_, Result<(), DBTransactionError>> {
213 let this = self.clone();
214 Box::pin(SendWrapper::new(async move {
215 {
216 match indexed_db::idb_commit_transaction(
217 &this.unlocked_inner.indexed_db,
218 &transaction,
219 this.unlocked_inner.columns,
220 )
221 .await
222 {
223 Ok(()) => {}
224 Err(error) => {
225 return Err(DBTransactionError { error, transaction });
226 }
227 };
228 }
229 if let Some(in_memory) = &this.unlocked_inner.in_memory {
230 in_memory.write(transaction).await
231 } else {
232 Ok(())
233 }
234 }))
235 }
236
237 fn iter<
238 'a,
239 T: Send + 'static,
240 C: Send + 'static,
241 F: FnMut(&mut C, DBKeyValueRef) -> io::Result<Option<T>> + Send + Sync + 'static,
242 >(
243 &'a self,
244 col: u32,
245 prefix: Option<&'a [u8]>,
246 mut context: C,
247 mut f: F,
248 ) -> KeyValueDBPinBoxFuture<'a, io::Result<(C, Option<T>)>> {
249 let this = self.clone();
250 Box::pin(async move {
251 if col >= this.unlocked_inner.columns {
252 return Err(io::Error::from(io::ErrorKind::NotFound));
253 }
254 if let Some(in_memory) = &this.unlocked_inner.in_memory {
255 in_memory.iter(col, prefix, context, f).await
256 } else {
257 let mut stream = indexed_db::idb_cursor(
258 &this.unlocked_inner.indexed_db,
259 col,
260 None,
261 prefix.map(|p| p.to_vec()),
262 )?;
263 while let Some(kv) = stream.next().await {
264 match kv {
265 Ok((key, value)) => {
266 if let Some(out) = f(&mut context, (&key, &value))? {
267 return Ok((context, Some(out)));
268 }
269 }
270 Err(e) => {
271 return Err(e);
272 }
273 }
274 }
275 Ok((context, None))
276 }
277 })
278 }
279
280 fn iter_keys<
281 'a,
282 T: Send + 'static,
283 C: Send + 'static,
284 F: FnMut(&mut C, DBKeyRef) -> io::Result<Option<T>> + Send + Sync + 'static,
285 >(
286 &'a self,
287 col: u32,
288 prefix: Option<&'a [u8]>,
289 mut context: C,
290 mut f: F,
291 ) -> KeyValueDBPinBoxFuture<'a, io::Result<(C, Option<T>)>> {
292 let this = self.clone();
293 Box::pin(async move {
294 if col >= this.unlocked_inner.columns {
295 return Err(io::Error::from(io::ErrorKind::NotFound));
296 }
297
298 if let Some(in_memory) = &this.unlocked_inner.in_memory {
299 in_memory.iter_keys(col, prefix, context, f).await
300 } else {
301 let mut stream = indexed_db::idb_cursor_keys(
302 &this.unlocked_inner.indexed_db,
303 col,
304 prefix.map(|p| p.to_vec()),
305 )?;
306 while let Some(k) = stream.next().await {
307 match k {
308 Ok(key) => {
309 if let Some(out) = f(&mut context, &key)? {
310 return Ok((context, Some(out)));
311 }
312 }
313 Err(e) => {
314 return Err(e);
315 }
316 }
317 }
318 Ok((context, None))
319 }
320 })
321 }
322
323 fn num_columns(&self) -> Result<u32, io::Error> {
324 Ok(self.unlocked_inner.columns)
325 }
326
327 fn num_keys(&self, col: u32) -> KeyValueDBPinBoxFuture<'_, io::Result<u64>> {
328 let this = self.clone();
329 Box::pin(async move {
330 if col >= this.unlocked_inner.columns {
331 return Err(io::Error::from(io::ErrorKind::NotFound));
332 }
333
334 if let Some(in_memory) = &this.unlocked_inner.in_memory {
335 in_memory.num_keys(col).await
336 } else {
337 let mut stream =
338 indexed_db::idb_get_key_count(&this.unlocked_inner.indexed_db, col, None)?;
339 if let Some(v) = stream.next().await {
340 match v {
341 Ok(value) => {
342 return Ok(value as u64);
343 }
344 Err(e) => {
345 return Err(e);
346 }
347 }
348 }
349 Err(io::Error::from(io::ErrorKind::InvalidData))
350 }
351 })
352 }
353}