use futures::prelude::*;
use js_sys::{Array, ArrayBuffer, Uint8Array};
use wasm_bindgen::{closure::Closure, JsCast, JsValue};
use web_sys::{
Event, IdbCursorWithValue, IdbDatabase, IdbKeyRange, IdbOpenDbRequest, IdbRequest,
IdbTransactionMode,
};
use keyvaluedb::{DBOp, DBTransaction};
use log::{debug, warn};
use crate::error::*;
pub struct IndexedDB {
pub version: u32,
pub columns: u32,
pub inner: super::SendWrapper<IdbDatabase>,
}
pub async fn open(name: &str, version: Option<u32>, columns: u32) -> Result<IndexedDB, Error> {
let (tx, rx) = flume::bounded::<Result<IndexedDB, Error>>(1);
let tx2 = tx.clone();
{
let window = match web_sys::window() {
Some(window) => window,
None => return Err(Error::WindowNotAvailable),
};
let idb_factory = window.indexed_db();
let idb_factory = match idb_factory {
Ok(Some(idb_factory)) => idb_factory,
Ok(None) => return Err(Error::NotSupported("No factory".to_owned())),
Err(err) => return Err(Error::NotSupported(format!("{:?}", err))),
};
let open_request = match version {
Some(version) => idb_factory.open_with_u32(name, version),
None => idb_factory.open(name),
}
.map_err(|e| Error::Generic(format!("failed top open factory {}: {:?}", name, e)))?;
try_create_missing_stores(&open_request, columns, version);
let on_success = Closure::once(move |event: &Event| {
let Some(target) = event.target() else {
if let Err(e) = tx.send(Err(Error::Generic("no event target".to_owned()))) {
warn!("on_cursor: error sending to a channel {:?}", e);
}
return;
};
let Some(req) = target.dyn_ref::<IdbRequest>() else {
if let Err(e) = tx.send(Err(bad_cast_generic("target", target.into()))) {
warn!("on_cursor: error sending to a channel {:?}", e);
}
return;
};
let result = match req.result() {
Ok(v) => v,
Err(e) => {
if let Err(e) = tx.send(Err(Error::Generic(format!("{:?}", e)))) {
warn!("on_cursor: error sending to a channel {:?}", e);
}
return;
}
};
assert!(result.is_instance_of::<IdbDatabase>());
let db = IdbDatabase::from(result);
let version = db.version().round() as u32;
let columns = db.object_store_names().length();
let _ = tx.send(Ok(IndexedDB {
version,
columns,
inner: super::SendWrapper::new(db),
}));
});
open_request.set_onsuccess(Some(on_success.as_ref().unchecked_ref()));
on_success.forget();
let on_error = Closure::once(move |event: &Event| {
let _ = tx2.send(Err(Error::Generic(format!("{:?}", event.type_()))));
});
open_request.set_onerror(Some(on_error.as_ref().unchecked_ref()));
on_error.forget();
}
match rx.recv_async().await {
Ok(Ok(v)) => Ok(v),
Ok(Err(e)) => Err(e),
Err(_) => Err(Error::Canceled),
}
}
pub async fn names_with_versions() -> Result<Vec<(String, u32)>, Error> {
let window = match web_sys::window() {
Some(window) => window,
None => return Err(Error::WindowNotAvailable),
};
let idb_factory = window.indexed_db();
let idb_factory = match idb_factory {
Ok(Some(f)) => f,
Ok(None) => return Err(Error::NotSupported("No factory".to_owned())),
Err(err) => return Err(Error::NotSupported(format!("{:?}", err))),
};
let factory_js: JsValue = idb_factory.into();
let databases_fn = js_sys::Reflect::get(&factory_js, &JsValue::from_str("databases"))
.map_err(|e| Error::NotSupported(format!("indexedDB.databases not available: {:?}", e)))?;
let databases_fn: js_sys::Function = databases_fn
.dyn_into()
.map_err(|_| Error::NotSupported("indexedDB.databases is not callable".to_owned()))?;
let promise = databases_fn
.call0(&factory_js)
.map_err(|e| Error::Generic(format!("indexedDB.databases() threw: {:?}", e)))?;
let promise: js_sys::Promise = promise
.dyn_into()
.map_err(|_| Error::Generic("indexedDB.databases() did not return a Promise".to_owned()))?;
let resolved = wasm_bindgen_futures::JsFuture::from(promise)
.await
.map_err(|e| Error::Generic(format!("indexedDB.databases() rejected: {:?}", e)))?;
let array: js_sys::Array = resolved
.dyn_into()
.map_err(|_| Error::Generic("indexedDB.databases() did not resolve to Array".to_owned()))?;
let mut out = Vec::with_capacity(array.length() as usize);
for i in 0..array.length() {
let entry = array.get(i);
let name = js_sys::Reflect::get(&entry, &JsValue::from_str("name"))
.ok()
.and_then(|v| v.as_string());
let version = js_sys::Reflect::get(&entry, &JsValue::from_str("version"))
.ok()
.and_then(|v| v.as_f64())
.map(|v| v.round() as u32);
if let (Some(name), Some(version)) = (name, version) {
out.push((name, version));
}
}
Ok(out)
}
pub async fn delete(name: &str) -> Result<(), Error> {
let (tx, rx) = flume::bounded::<Result<(), Error>>(1);
let tx2 = tx.clone();
{
let window = match web_sys::window() {
Some(window) => window,
None => return Err(Error::WindowNotAvailable),
};
let idb_factory = window.indexed_db();
let idb_factory = match idb_factory {
Ok(idb_factory) => idb_factory.unwrap(),
Err(err) => return Err(Error::NotSupported(format!("{:?}", err))),
};
let delete_request = idb_factory.delete_database(name).unwrap();
let on_success = Closure::once(move |_event: &Event| {
let _ = tx.send(Ok(()));
});
delete_request.set_onsuccess(Some(on_success.as_ref().unchecked_ref()));
on_success.forget();
let on_error = Closure::once(move |event: &Event| {
let _ = tx2.send(Err(Error::Generic(format!("{:?}", event.type_()))));
});
delete_request.set_onerror(Some(on_error.as_ref().unchecked_ref()));
on_error.forget();
}
match rx.recv_async().await {
Ok(Ok(v)) => Ok(v),
Ok(Err(e)) => Err(e),
Err(_) => Err(Error::Canceled),
}
}
fn store_name(num: u32) -> String {
format!("col{}", num)
}
fn store_names_js(columns: u32) -> Array {
let column_names = (0..columns).map(store_name);
let js_array = Array::new();
for name in column_names {
js_array.push(&JsValue::from(name));
}
js_array
}
fn try_create_missing_stores(req: &IdbOpenDbRequest, columns: u32, version: Option<u32>) {
let on_upgradeneeded = Closure::once(move |event: &Event| {
debug!(
"Upgrading or creating the database to version {:?}, columns {}",
version, columns
);
let target = event.target().unwrap();
let req = target.dyn_ref::<IdbRequest>().unwrap();
let result = req.result().unwrap();
let db: &IdbDatabase = result.unchecked_ref();
let previous_columns = db.object_store_names().length();
debug!(
"Previous version: {}, columns {}",
db.version(),
previous_columns
);
for name in (previous_columns..columns).map(store_name) {
let res = db.create_object_store(name.as_str());
if let Err(err) = res {
debug!("error creating object store {}: {:?}", name, err);
}
}
});
req.set_onupgradeneeded(Some(on_upgradeneeded.as_ref().unchecked_ref()));
on_upgradeneeded.forget();
}
pub async fn idb_commit_transaction(
idb: &IdbDatabase,
transaction: &DBTransaction,
columns: u32,
) -> std::io::Result<()> {
let store_names_js = store_names_js(columns);
let mode = IdbTransactionMode::Readwrite;
let rx_res = || -> Result<flume::Receiver<Result<(), String>>, String> {
let idb_txn = idb
.transaction_with_str_sequence_and_mode(&store_names_js, mode)
.map_err(|_| "failed to create IndexedDB transaction".to_owned())?;
let object_stores = (0..columns)
.map(|n| idb_txn.object_store(store_name(n).as_str()).unwrap())
.collect::<Vec<_>>();
for op in &transaction.ops {
match op {
DBOp::Insert { col, key, value } => {
let column = *col as usize;
let key_js = Uint8Array::from(key.as_ref());
let val_js = Uint8Array::from(value.as_ref());
let res = object_stores[column].put_with_key(val_js.as_ref(), key_js.as_ref());
if let Err(err) = res {
return Err(format!(
"error inserting key/values into col_{}: {:?}",
column, err
));
}
}
DBOp::Delete { col, key } => {
let column = *col as usize;
let key_js = Uint8Array::from(key.as_ref());
let res = object_stores[column].delete(key_js.as_ref());
if let Err(err) = res {
return Err(format!("error deleting key from col_{}: {:?}", column, err));
}
}
DBOp::DeletePrefix { col, prefix } => {
let column = *col as usize;
let prefix_js_start = Uint8Array::from(prefix.as_ref());
let range = if let Some(end_range) = keyvaluedb::end_prefix(&prefix[..]) {
let prefix_js_end = Uint8Array::from(end_range.as_ref());
IdbKeyRange::bound_with_lower_open_and_upper_open(
prefix_js_start.as_ref(),
prefix_js_end.as_ref(),
false,
true,
)
.unwrap()
} else {
IdbKeyRange::lower_bound(prefix_js_start.as_ref()).unwrap()
};
let res = object_stores[column].delete(range.as_ref());
if let Err(err) = res {
return Err(format!(
"error deleting prefix from col_{}: {:?}",
column, err
));
}
}
}
}
let (tx, rx) = flume::bounded::<Result<(), String>>(1);
let tx2 = tx.clone();
let on_complete = Closure::once(move || {
let _ = tx2.send(Ok(()));
});
idb_txn.set_oncomplete(Some(on_complete.as_ref().unchecked_ref()));
on_complete.forget();
let tx2 = tx.clone();
let on_error = Closure::once(move || {
let _ = tx2.send(Err("Failed to commit a transaction to IndexedDB".to_owned()));
});
idb_txn.set_onerror(Some(on_error.as_ref().unchecked_ref()));
on_error.forget();
Ok(rx)
}();
let rx = match rx_res {
Ok(rx) => rx,
Err(e) => {
return Err(std::io::Error::other(e));
}
};
match rx.recv_async().await {
Ok(Ok(r)) => Ok(r),
Ok(Err(e)) => Err(std::io::Error::other(e)),
Err(_) => Err(std::io::Error::other(
"No transaction was performed on IndexedDB".to_owned(),
)),
}
}
pub async fn idb_get(idb: &IdbDatabase, col: u32, key: &[u8]) -> std::io::Result<Option<Vec<u8>>> {
let range = Uint8Array::from(key);
let mut stream = idb_cursor(idb, col, Some(&range), None)?;
if let Some(kv) = stream.next().await {
match kv {
Ok((_, v)) => Ok(Some(v)),
Err(e) => Err(e),
}
} else {
Ok(None)
}
}
pub fn idb_cursor(
idb: &IdbDatabase,
col: u32,
range: Option<&JsValue>,
prefix: Option<Vec<u8>>,
) -> std::io::Result<impl Stream<Item = std::io::Result<(Vec<u8>, Vec<u8>)>>> {
let store_name = store_name(col);
let store_name = store_name.as_str();
let txn = idb
.transaction_with_str(store_name)
.map_err(io_err_jsvalue)?;
let store = txn.object_store(store_name).map_err(io_err_jsvalue)?;
let cursor = match range {
Some(r) => store.open_cursor_with_range(r).map_err(io_err_jsvalue)?,
None => store.open_cursor().map_err(io_err_jsvalue)?,
};
let (tx, rx) = futures::channel::mpsc::unbounded();
let on_cursor = Closure::wrap(Box::new(move |event: &Event| {
let Some(target) = event.target() else {
if let Err(e) = tx.unbounded_send(Err(io_err_string("no event target"))) {
warn!("on_cursor: error sending to a channel {:?}", e);
}
tx.close_channel();
return;
};
let Some(req) = target.dyn_ref::<IdbRequest>() else {
if let Err(e) = tx.unbounded_send(Err(bad_cast_io_err("target", target.into()))) {
warn!("on_cursor: error sending to a channel {:?}", e);
}
tx.close_channel();
return;
};
let result = match req.result() {
Ok(v) => v,
Err(e) => {
if let Err(e) = tx.unbounded_send(Err(io_err_jsvalue(e))) {
warn!("on_cursor: error sending to a channel {:?}", e);
}
tx.close_channel();
return;
}
};
if result.is_null() || result.is_undefined() {
tx.close_channel();
return;
}
let Some(cursor) = result.dyn_ref::<IdbCursorWithValue>() else {
if let Err(e) = tx.unbounded_send(Err(bad_cast_io_err("result", result))) {
warn!("on_cursor: error sending to a channel {:?}", e);
}
tx.close_channel();
return;
};
let mut ok = false;
if let Ok(key) = cursor.key() {
let Some(k) = key.dyn_ref::<ArrayBuffer>() else {
if let Err(e) = tx.unbounded_send(Err(bad_cast_io_err("key", key))) {
warn!("on_cursor: error sending to a channel {:?}", e);
}
tx.close_channel();
return;
};
let karr = Uint8Array::new(k);
let kv = karr.to_vec();
let mut skip = false;
if let Some(prefix) = &prefix {
if !kv.starts_with(prefix) {
skip = true;
}
}
if !skip {
if let Ok(value) = cursor.value() {
let Some(v) = value.dyn_ref::<Uint8Array>() else {
if let Err(e) = tx.unbounded_send(Err(bad_cast_io_err("value", value))) {
warn!("on_cursor: error sending to a channel {:?}", e);
}
tx.close_channel();
return;
};
let vv = v.to_vec();
if let Err(e) = tx.unbounded_send(Ok((kv, vv))) {
warn!("on_cursor: error sending to a channel {:?}", e);
}
ok = true;
} else {
warn!("on_cursor: no value for key {:?}", &kv);
}
} else {
ok = true;
}
}
if !ok {
tx.close_channel();
} else {
if let Err(e) = cursor.continue_() {
warn!("cursor advancement has failed {:?}", e);
}
}
}) as Box<dyn FnMut(&Event)>);
cursor.set_onsuccess(Some(on_cursor.as_ref().unchecked_ref()));
on_cursor.forget();
Ok(rx)
}
pub fn idb_cursor_keys(
idb: &IdbDatabase,
col: u32,
prefix: Option<Vec<u8>>,
) -> std::io::Result<impl Stream<Item = std::io::Result<Vec<u8>>>> {
let store_name = store_name(col);
let store_name = store_name.as_str();
let txn = idb
.transaction_with_str(store_name)
.map_err(io_err_jsvalue)?;
let store = txn.object_store(store_name).map_err(io_err_jsvalue)?;
let cursor = store.open_cursor().map_err(io_err_jsvalue)?;
let (tx, rx) = futures::channel::mpsc::unbounded();
let on_cursor = Closure::wrap(Box::new(move |event: &Event| {
let Some(target) = event.target() else {
if let Err(e) = tx.unbounded_send(Err(io_err_string("no event target"))) {
warn!("on_cursor: error sending to a channel {:?}", e);
}
tx.close_channel();
return;
};
let Some(req) = target.dyn_ref::<IdbRequest>() else {
if let Err(e) = tx.unbounded_send(Err(bad_cast_io_err("target", target.into()))) {
warn!("on_cursor: error sending to a channel {:?}", e);
}
tx.close_channel();
return;
};
let result = match req.result() {
Ok(v) => v,
Err(e) => {
if let Err(e) = tx.unbounded_send(Err(io_err_jsvalue(e))) {
warn!("on_cursor: error sending to a channel {:?}", e);
}
tx.close_channel();
return;
}
};
if result.is_null() || result.is_undefined() {
tx.close_channel();
return;
}
let Some(cursor) = result.dyn_ref::<IdbCursorWithValue>() else {
if let Err(e) = tx.unbounded_send(Err(bad_cast_io_err("result", result))) {
warn!("on_cursor: error sending to a channel {:?}", e);
}
tx.close_channel();
return;
};
let mut ok = false;
if let Ok(key) = cursor.key() {
let Some(k) = key.dyn_ref::<ArrayBuffer>() else {
if let Err(e) = tx.unbounded_send(Err(bad_cast_io_err("key", key))) {
warn!("on_cursor: error sending to a channel {:?}", e);
}
tx.close_channel();
return;
};
let karr = Uint8Array::new(k);
let kv = karr.to_vec();
let mut skip = false;
if let Some(prefix) = &prefix {
if !kv.starts_with(prefix) {
skip = true;
}
}
if !skip {
if let Err(e) = tx.unbounded_send(Ok(kv)) {
warn!("on_cursor: error sending to a channel {:?}", e);
}
}
ok = true;
}
if !ok {
tx.close_channel();
} else {
if let Err(e) = cursor.continue_() {
warn!("cursor advancement has failed {:?}", e);
}
}
}) as Box<dyn FnMut(&Event)>);
cursor.set_onsuccess(Some(on_cursor.as_ref().unchecked_ref()));
on_cursor.forget();
Ok(rx)
}
pub fn idb_get_key_count(
idb: &IdbDatabase,
col: u32,
range: Option<&JsValue>,
) -> std::io::Result<impl Stream<Item = std::io::Result<usize>>> {
let store_name = store_name(col);
let store_name = store_name.as_str();
let txn = idb
.transaction_with_str(store_name)
.map_err(io_err_jsvalue)?;
let store = txn.object_store(store_name).map_err(io_err_jsvalue)?;
let count = match range {
Some(r) => store.count_with_key(r).map_err(io_err_jsvalue)?,
None => store.count().map_err(io_err_jsvalue)?,
};
let (tx, rx) = futures::channel::mpsc::unbounded();
let on_count = Closure::wrap(Box::new(move |event: &Event| {
let Some(target) = event.target() else {
if let Err(e) = tx.unbounded_send(Err(io_err_string("no event target"))) {
warn!("on_count: error sending to a channel {:?}", e);
}
tx.close_channel();
return;
};
let Some(req) = target.dyn_ref::<IdbRequest>() else {
if let Err(e) = tx.unbounded_send(Err(bad_cast_io_err("target", target.into()))) {
warn!("on_count: error sending to a channel {:?}", e);
}
tx.close_channel();
return;
};
let result = match req.result() {
Ok(v) => v,
Err(e) => {
if let Err(e) = tx.unbounded_send(Err(io_err_jsvalue(e))) {
warn!("on_count: error sending to a channel {:?}", e);
}
tx.close_channel();
return;
}
};
if result.is_null() || result.is_undefined() {
tx.close_channel();
return;
}
let Some(value) = result.as_f64() else {
if let Err(e) = tx.unbounded_send(Err(bad_cast_io_err("result", result))) {
warn!("on_count: error sending to a channel {:?}", e);
}
tx.close_channel();
return;
};
if let Err(e) = tx.unbounded_send(Ok(value as usize)) {
warn!("on_cursor: error sending to a channel {:?}", e);
}
tx.close_channel();
}) as Box<dyn FnMut(&Event)>);
count.set_onsuccess(Some(on_count.as_ref().unchecked_ref()));
on_count.forget();
Ok(rx)
}