1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
use js_sys::{Array, ArrayBuffer, Uint8Array};
use wasm_bindgen::{closure::Closure, JsCast, JsValue};
use web_sys::{Event, IdbCursorWithValue, IdbDatabase, IdbOpenDbRequest, IdbRequest, IdbTransactionMode};
use futures::channel;
use futures::prelude::*;
use kvdb::{DBOp, DBTransaction};
use log::{debug, warn};
use std::ops::Deref;
use crate::error::Error;
pub struct IndexedDB {
pub version: u32,
pub columns: u32,
pub inner: super::SendWrapper<IdbDatabase>,
}
pub fn open(name: &str, version: Option<u32>, columns: u32) -> impl Future<Output = Result<IndexedDB, Error>> {
let (tx, rx) = channel::oneshot::channel::<IndexedDB>();
let window = match web_sys::window() {
Some(window) => window,
None => return future::Either::Right(future::err(Error::WindowNotAvailable)),
};
let idb_factory = window.indexed_db();
let idb_factory = match idb_factory {
Ok(idb_factory) => idb_factory.expect("We can't get a null pointer back; qed"),
Err(err) => return future::Either::Right(future::err(Error::NotSupported(format!("{:?}", err)))),
};
let open_request = match version {
Some(version) => idb_factory.open_with_u32(name, version).expect("TypeError is not possible with Rust; qed"),
None => idb_factory.open(name).expect("TypeError is not possible with Rust; qed"),
};
try_create_missing_stores(&open_request, columns, version);
let on_success = Closure::once(move |event: &Event| {
let target = event.target().expect("Event should have a target; qed");
let req = target.dyn_ref::<IdbRequest>().expect("Event target is IdbRequest; qed");
let result = req.result().expect("IndexedDB.onsuccess should have a valid result; qed");
assert!(result.is_instance_of::<IdbDatabase>());
let db = IdbDatabase::from(result);
let version = db.version().round() as u32;
let columns = db.object_store_names().length();
let _ = tx.send(IndexedDB { version, columns, inner: super::SendWrapper::new(db) });
});
open_request.set_onsuccess(Some(on_success.as_ref().unchecked_ref()));
on_success.forget();
future::Either::Left(rx.then(|r| future::ok(r.expect("Sender isn't dropped; qed"))))
}
fn store_name(num: u32) -> String {
format!("col{}", num)
}
fn store_names_js(columns: u32) -> Array {
let column_names = (0..columns).map(store_name);
let js_array = Array::new();
for name in column_names {
js_array.push(&JsValue::from(name));
}
js_array
}
fn try_create_missing_stores(req: &IdbOpenDbRequest, columns: u32, version: Option<u32>) {
let on_upgradeneeded = Closure::once(move |event: &Event| {
debug!("Upgrading or creating the database to version {:?}, columns {}", version, columns);
let target = event.target().expect("Event should have a target; qed");
let req = target.dyn_ref::<IdbRequest>().expect("Event target is IdbRequest; qed");
let result = req.result().expect("IdbRequest should have a result; qed");
let db: &IdbDatabase = result.unchecked_ref();
let previous_columns = db.object_store_names().length();
debug!("Previous version: {}, columns {}", db.version(), previous_columns);
for name in (previous_columns..=columns).map(store_name) {
let res = db.create_object_store(name.as_str());
if let Err(err) = res {
debug!("error creating object store {}: {:?}", name, err);
}
}
});
req.set_onupgradeneeded(Some(on_upgradeneeded.as_ref().unchecked_ref()));
on_upgradeneeded.forget();
}
pub fn idb_commit_transaction(idb: &IdbDatabase, txn: &DBTransaction, columns: u32) -> impl Future<Output = ()> {
let store_names_js = store_names_js(columns);
let mode = IdbTransactionMode::Readwrite;
let idb_txn = idb
.transaction_with_str_sequence_and_mode(&store_names_js, mode)
.expect("The provided mode and store names are valid; qed");
let object_stores = (0..columns)
.map(|n| {
idb_txn
.object_store(store_name(n).as_str())
.expect("Object stores were created in try_create_object_stores; qed")
})
.collect::<Vec<_>>();
for op in &txn.ops {
match op {
DBOp::Insert { col, key, value } => {
let column = *col as usize;
let key_js = Uint8Array::from(key.as_ref());
let val_js = Uint8Array::from(value.as_ref());
let res = object_stores[column].put_with_key(val_js.as_ref(), key_js.as_ref());
if let Err(err) = res {
warn!("error inserting key/values into col_{}: {:?}", column, err);
}
}
DBOp::Delete { col, key } => {
let column = *col as usize;
let key_js = Uint8Array::from(key.as_ref());
let res = object_stores[column].delete(key_js.as_ref());
if let Err(err) = res {
warn!("error deleting key from col_{}: {:?}", column, err);
}
}
}
}
let (tx, rx) = channel::oneshot::channel::<()>();
let on_complete = Closure::once(move || {
let _ = tx.send(());
});
idb_txn.set_oncomplete(Some(on_complete.as_ref().unchecked_ref()));
on_complete.forget();
let on_error = Closure::once(move || {
warn!("Failed to commit a transaction to IndexedDB");
});
idb_txn.set_onerror(Some(on_error.as_ref().unchecked_ref()));
on_error.forget();
rx.map(|_| ())
}
pub fn idb_cursor(idb: &IdbDatabase, col: u32) -> impl Stream<Item = (Vec<u8>, Vec<u8>)> {
let store_name = store_name(col);
let store_name = store_name.as_str();
let txn = idb.transaction_with_str(store_name).expect("The stores were created on open: {}; qed");
let store = txn.object_store(store_name).expect("Opening a store shouldn't fail; qed");
let cursor = store.open_cursor().expect("Opening a cursor shouldn't fail; qed");
let (tx, rx) = channel::mpsc::unbounded();
let on_cursor = Closure::wrap(Box::new(move |event: &Event| {
let target = event.target().expect("on_cursor should have a target; qed");
let req = target.dyn_ref::<IdbRequest>().expect("target should be IdbRequest; qed");
let result = req.result().expect("IdbRequest should have a result; qed");
let cursor: &IdbCursorWithValue = result.unchecked_ref();
if let (Ok(key), Ok(value)) = (cursor.deref().key(), cursor.value()) {
let k: &ArrayBuffer = key.unchecked_ref();
let v: &Uint8Array = value.unchecked_ref();
let mut kv = vec![0u8; k.byte_length() as usize];
let mut vv = vec![0u8; v.byte_length() as usize];
Uint8Array::new(k).copy_to(&mut kv[..]);
v.copy_to(&mut vv[..]);
if let Err(e) = tx.unbounded_send((kv, vv)) {
warn!("on_cursor: error sending to a channel {:?}", e);
}
if let Err(e) = cursor.deref().continue_() {
warn!("cursor advancement has failed {:?}", e);
}
} else {
tx.close_channel();
}
}) as Box<dyn FnMut(&Event)>);
cursor.set_onsuccess(Some(on_cursor.as_ref().unchecked_ref()));
on_cursor.forget();
rx
}