use std::{collections::BTreeSet, rc::Rc, str::FromStr, sync::OnceLock};
use crate::{
repo::{DataStore, PinModeRequirement, PinStore, References},
Error, PinKind, PinMode,
};
use futures::{channel::oneshot, stream::BoxStream, SinkExt, StreamExt, TryStreamExt};
use idb::{
Database, DatabaseEvent, Factory, ObjectStore, ObjectStoreParams, Transaction, TransactionMode,
};
use ipld_core::cid::Cid;
use send_wrapper::SendWrapper;
use wasm_bindgen_futures::wasm_bindgen::JsValue;
const NAMESPACE: &str = "rust-datastore-store";
#[derive(Debug)]
pub struct IdbDataStore {
factory: send_wrapper::SendWrapper<Rc<Factory>>,
database: OnceLock<send_wrapper::SendWrapper<Rc<Database>>>,
namespace: String,
}
impl IdbDataStore {
pub fn new(namespace: Option<String>) -> Self {
let namespace = match namespace {
Some(ns) => format!("{NAMESPACE}-{ns}"),
None => NAMESPACE.to_string(),
};
let factory = Factory::new().unwrap();
Self {
factory: SendWrapper::new(Rc::new(factory)),
database: OnceLock::new(),
namespace,
}
}
pub fn get_db(&self) -> &send_wrapper::SendWrapper<Rc<Database>> {
self.database.get().expect("initialized")
}
}
impl DataStore for IdbDataStore {
async fn init(&self) -> Result<(), Error> {
let factory = self.factory.clone();
let namespace = self.namespace.clone();
let (tx, rx) = oneshot::channel();
wasm_bindgen_futures::spawn_local(async move {
let res = async {
let mut request = factory.open(&namespace, None)?;
request.on_upgrade_needed(|event| {
let db = event.database().unwrap();
db.create_object_store("datastore", ObjectStoreParams::new())
.unwrap();
db.create_object_store("pinstore", ObjectStoreParams::new())
.unwrap();
});
let database = request.await?;
Ok::<_, Box<dyn std::error::Error>>(SendWrapper::new(Rc::new(database)))
}
.await
.map_err(|e| anyhow::anyhow!("{e}"));
_ = tx.send(res);
});
let db = rx.await??;
self.database.get_or_init(|| db);
Ok(())
}
async fn contains(&self, key: &[u8]) -> Result<bool, Error> {
let database = self.get_db().to_owned();
let key = key.to_owned();
let (tx, rx) = oneshot::channel();
wasm_bindgen_futures::spawn_local(async move {
let res = async {
let transaction =
database.transaction(&["datastore"], TransactionMode::ReadOnly)?;
let store = transaction.object_store("datastore")?;
let key = serde_wasm_bindgen::to_value(&key)?;
let val = store.get(key)?.await?;
transaction.await?;
Ok::<_, Box<dyn std::error::Error>>(val.is_some())
}
.await
.map_err(|e| anyhow::anyhow!("{e}"));
_ = tx.send(res);
});
rx.await?
}
async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
let database = self.get_db().to_owned();
let key = key.to_owned();
let (tx, rx) = oneshot::channel();
wasm_bindgen_futures::spawn_local(async move {
let res = async {
let transaction =
database.transaction(&["datastore"], TransactionMode::ReadOnly)?;
let store = transaction.object_store("datastore")?;
let key = serde_wasm_bindgen::to_value(&key)?;
let block = store.get(key)?.await.map(|val| {
val.and_then(|val| {
let bytes: Vec<u8> = serde_wasm_bindgen::from_value(val).ok()?;
Some(bytes)
})
})?;
transaction.await?;
Ok::<_, Box<dyn std::error::Error>>(block)
}
.await
.map_err(|e| anyhow::anyhow!("{e}"));
_ = tx.send(res);
});
rx.await?
}
async fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> {
let database = self.get_db().to_owned();
let key = key.to_owned();
let value = value.to_owned();
let (tx, rx) = oneshot::channel();
wasm_bindgen_futures::spawn_local(async move {
let res = async {
let transaction =
database.transaction(&["datastore"], TransactionMode::ReadWrite)?;
let store = transaction.object_store("datastore")?;
let key = serde_wasm_bindgen::to_value(&key)?;
let val = serde_wasm_bindgen::to_value(&value)?;
store.put(&val, Some(&key))?.await?;
transaction.commit()?.await?;
Ok::<_, Box<dyn std::error::Error>>(())
}
.await
.map_err(|e| anyhow::anyhow!("{e}"));
_ = tx.send(res);
});
rx.await?
}
async fn remove(&self, key: &[u8]) -> Result<(), Error> {
let database = self.get_db().to_owned();
let key = key.to_owned();
let (tx, rx) = oneshot::channel();
wasm_bindgen_futures::spawn_local(async move {
let res = async {
let transaction =
database.transaction(&["datastore"], TransactionMode::ReadWrite)?;
let store = transaction.object_store("datastore")?;
let key = serde_wasm_bindgen::to_value(&key)?;
store.delete(key)?.await?;
transaction.commit()?.await?;
Ok::<_, Box<dyn std::error::Error>>(())
}
.await
.map_err(|e| anyhow::anyhow!("{e}"));
_ = tx.send(res);
});
rx.await?
}
async fn iter(&self) -> BoxStream<'static, (Vec<u8>, Vec<u8>)> {
let database = self.get_db().clone();
let (mut tx, rx) = futures::channel::mpsc::channel(10);
wasm_bindgen_futures::spawn_local(async move {
let transaction = database
.transaction(&["datastore"], TransactionMode::ReadOnly)
.unwrap();
let store = transaction.object_store("datastore").unwrap();
let key_res = store
.get_all_keys(None, None)
.unwrap()
.await
.unwrap()
.into_iter()
.filter_map(|val| serde_wasm_bindgen::from_value::<Vec<u8>>(val).ok())
.collect::<Vec<_>>();
let res = store
.get_all(None, None)
.unwrap()
.await
.unwrap()
.into_iter()
.filter_map(|val| serde_wasm_bindgen::from_value::<Vec<u8>>(val).ok())
.collect::<Vec<_>>();
for kv in key_res.into_iter().zip(res) {
_ = tx.send(kv).await;
}
});
rx.boxed()
}
}
impl PinStore for IdbDataStore {
async fn is_pinned(&self, cid: &Cid) -> Result<bool, Error> {
let cid = cid.to_owned();
let database = self.get_db().to_owned();
let (tx, rx) = oneshot::channel();
wasm_bindgen_futures::spawn_local(async move {
let res = async {
let transaction = database.transaction(&["pinstore"], TransactionMode::ReadOnly)?;
let store = transaction.object_store("pinstore")?;
let key = JsValue::from_str(&cid.to_string());
let val = store.get(key)?.await?;
transaction.await?;
Ok::<_, Box<dyn std::error::Error>>(val.is_some())
}
.await
.map_err(|e| anyhow::anyhow!("{e}"));
_ = tx.send(res);
});
rx.await?
}
async fn insert_direct_pin(&self, target: &Cid) -> Result<(), Error> {
let target = target.to_owned();
let db = self.get_db().to_owned();
let (tx, rx) = oneshot::channel();
wasm_bindgen_futures::spawn_local(async move {
let res = async {
let transaction = db.transaction(&["pinstore"], TransactionMode::ReadWrite)?;
let store = transaction.object_store("pinstore")?;
let already_pinned = get_pinned_mode(&transaction, &store, &target).await?;
match already_pinned {
Some((PinMode::Direct, _)) => return Ok(()),
Some((PinMode::Recursive, _)) => {
let r = || Err(anyhow::anyhow!("already pinned recursively"));
r()?;
}
Some((PinMode::Indirect, key)) => {
let key = serde_wasm_bindgen::to_value(&key)?;
store.delete(key)?.await?;
}
None => {}
}
let direct_key = get_pin_key(&target, &PinMode::Direct);
let key = serde_wasm_bindgen::to_value(&direct_key).unwrap();
let val = serde_wasm_bindgen::to_value(direct_value()).unwrap();
store.put(&val, Some(&key))?.await?;
transaction.commit()?.await?;
Ok::<_, Box<dyn std::error::Error>>(())
}
.await
.map_err(|e| anyhow::anyhow!("{e}"));
_ = tx.send(res)
});
rx.await?
}
async fn insert_recursive_pin(
&self,
target: &Cid,
referenced: References<'_>,
) -> Result<(), Error> {
let set = referenced.try_collect::<BTreeSet<_>>().await?;
let target = target.to_owned();
let db = self.get_db().to_owned();
let (tx, rx) = oneshot::channel();
wasm_bindgen_futures::spawn_local(async move {
let res = async {
let transaction = db.transaction(&["pinstore"], TransactionMode::ReadWrite)?;
let store = transaction.object_store("pinstore")?;
let already_pinned = get_pinned_mode(&transaction, &store, &target).await?;
match already_pinned {
Some((PinMode::Recursive, _)) => return Ok(()),
Some((PinMode::Direct, key)) | Some((PinMode::Indirect, key)) => {
let key = serde_wasm_bindgen::to_value(&key)?;
store.delete(key)?.await?;
}
None => {}
}
let recursive_key = get_pin_key(&target, &PinMode::Recursive);
let key = serde_wasm_bindgen::to_value(&recursive_key)?;
let val = serde_wasm_bindgen::to_value(recursive_value())?;
store.put(&val, Some(&key))?.await?;
let target_value = indirect_value(&target);
for cid in set.iter() {
let indirect_key = get_pin_key(cid, &PinMode::Indirect);
if get_pinned_mode(&transaction, &store, cid).await?.is_some() {
continue;
}
let indirect_key = serde_wasm_bindgen::to_value(&indirect_key)?;
let target_value = serde_wasm_bindgen::to_value(&target_value)?;
store.put(&target_value, Some(&indirect_key))?.await?;
}
transaction.commit()?.await?;
Ok::<_, Box<dyn std::error::Error>>(())
}
.await
.map_err(|e| anyhow::anyhow!("{e}"));
_ = tx.send(res);
});
rx.await?
}
async fn remove_direct_pin(&self, target: &Cid) -> Result<(), Error> {
let target = target.to_owned();
let db = self.get_db().to_owned();
let (tx, rx) = oneshot::channel();
wasm_bindgen_futures::spawn_local(async move {
let res = async {
let transaction = db.transaction(&["pinstore"], TransactionMode::ReadWrite)?;
let store = transaction.object_store("pinstore")?;
if is_not_pinned_or_pinned_indirectly(&transaction, &store, &target).await? {
let r = || Err(anyhow::anyhow!("not pinned or pinned indirectly"));
r()?;
}
let key = get_pin_key(&target, &PinMode::Direct);
let key = serde_wasm_bindgen::to_value(&key)?;
store.delete(key)?.await?;
transaction.commit()?.await?;
Ok::<_, Box<dyn std::error::Error>>(())
}
.await
.map_err(|e| anyhow::anyhow!("{e}"));
_ = tx.send(res)
});
rx.await?
}
async fn remove_recursive_pin(
&self,
target: &Cid,
referenced: References<'_>,
) -> Result<(), Error> {
let set = referenced.try_collect::<BTreeSet<_>>().await?;
let target = target.to_owned();
let db = self.get_db().to_owned();
let (tx, rx) = oneshot::channel();
wasm_bindgen_futures::spawn_local(async move {
let res = async {
let transaction = db.transaction(&["pinstore"], TransactionMode::ReadWrite)?;
let store = transaction.object_store("pinstore")?;
if is_not_pinned_or_pinned_indirectly(&transaction, &store, &target).await? {
let r = || Err(anyhow::anyhow!("not pinned or pinned indirectly"));
r()?;
}
let key = get_pin_key(&target, &PinMode::Recursive);
let key = serde_wasm_bindgen::to_value(&key)?;
store.delete(key)?.await?;
for cid in &set {
let already_pinned = get_pinned_mode(&transaction, &store, cid).await?;
match already_pinned {
Some((PinMode::Recursive, _)) | Some((PinMode::Direct, _)) => continue, Some((PinMode::Indirect, key)) => {
let key = serde_wasm_bindgen::to_value(&key)?;
store.delete(key)?.await?;
}
None => {}
}
}
transaction.commit()?.await?;
Ok::<_, Box<dyn std::error::Error>>(())
}
.await
.map_err(|e| anyhow::anyhow!("{e}"));
_ = tx.send(res);
});
rx.await?
}
async fn list(
&self,
requirement: Option<PinMode>,
) -> BoxStream<'static, Result<(Cid, PinMode), Error>> {
let db = self.get_db().to_owned();
let (mut tx, rx) = futures::channel::mpsc::channel(1);
let requirement = PinModeRequirement::from(requirement);
wasm_bindgen_futures::spawn_local(async move {
let transaction = db
.transaction(&["pinstore"], TransactionMode::ReadOnly)
.unwrap();
let store = transaction.object_store("pinstore").unwrap();
let res = store.get_all_keys(None, None).unwrap().await.unwrap();
for k in res.into_iter().filter_map(|val| val.as_string()) {
let k = k.as_bytes();
if !k.starts_with(b"pin.") || k.len() < 7 {
let _ = tx.send(Err(anyhow::anyhow!("invalid pin: {:?}", k))).await;
return;
}
let mode = match k[4] {
b'd' => PinMode::Direct,
b'r' => PinMode::Recursive,
b'i' => PinMode::Indirect,
x => {
_ = tx
.send(Err(anyhow::anyhow!("invalid pinmode: {}", x as char)))
.await;
return;
}
};
if !requirement.matches(&mode) {
continue;
} else {
let cid = std::str::from_utf8(&k[6..])
.map_err(Error::from)
.and_then(|x| Cid::from_str(x).map_err(Error::from))
.map_err(|e| e.context("failed to read pin:".to_string()))
.map(move |cid| (cid, mode));
_ = tx.send(cid).await;
}
}
});
rx.boxed()
}
async fn query(
&self,
ids: Vec<Cid>,
requirement: Option<PinMode>,
) -> Result<Vec<(Cid, PinKind<Cid>)>, Error> {
let requirement = PinModeRequirement::from(requirement);
let db = self.get_db().to_owned();
let (tx, rx) = oneshot::channel();
wasm_bindgen_futures::spawn_local(async move {
let res = async {
let transaction = db.transaction(&["pinstore"], TransactionMode::ReadOnly)?;
let store = transaction.object_store("pinstore")?;
let mut modes = Vec::with_capacity(ids.len());
for id in ids.iter() {
let mode_and_key = get_pinned_mode(&transaction, &store, id).await?;
let matched = match mode_and_key {
Some((pin_mode, key)) if requirement.matches(&pin_mode) => match pin_mode {
PinMode::Direct => Some(PinKind::Direct),
PinMode::Recursive => Some(PinKind::Recursive(0)),
PinMode::Indirect => {
let key = serde_wasm_bindgen::to_value(&key)?;
store
.get(key)?
.await?
.and_then(|root| {
serde_wasm_bindgen::from_value::<Vec<u8>>(root).ok()
})
.map(|root| {
cid_from_indirect_value(&root)
.map(PinKind::IndirectFrom)
.map_err(|e| {
e.context(format!(
"failed to read indirect pin source: {:?}",
String::from_utf8_lossy(root.as_ref()).as_ref(),
))
})
})
.transpose()?
}
},
Some(_) | None => None,
};
modes.push(matched);
}
Ok::<_, Box<dyn std::error::Error>>(
ids.into_iter()
.zip(modes.into_iter())
.filter_map(|(cid, mode)| mode.map(move |mode| (cid, mode)))
.collect::<Vec<_>>(),
)
}
.await
.map_err(|e| anyhow::anyhow!("{e}"));
_ = tx.send(res);
});
rx.await?
}
}
fn direct_value() -> &'static [u8] {
Default::default()
}
fn recursive_value() -> &'static [u8] {
Default::default()
}
fn indirect_value(recursively_pinned: &Cid) -> String {
recursively_pinned.to_string()
}
fn cid_from_indirect_value(bytes: &[u8]) -> Result<Cid, Error> {
std::str::from_utf8(bytes)
.map_err(Error::from)
.and_then(|s| Cid::from_str(s).map_err(Error::from))
}
fn pin_mode_literal(pin_mode: &PinMode) -> &'static str {
match pin_mode {
PinMode::Direct => "d",
PinMode::Indirect => "i",
PinMode::Recursive => "r",
}
}
fn get_pin_key(cid: &Cid, pin_mode: &PinMode) -> String {
format!("pin.{}.{}", pin_mode_literal(pin_mode), cid)
}
async fn get_pinned_mode(
_: &Transaction,
tree: &ObjectStore,
block: &Cid,
) -> Result<Option<(PinMode, String)>, Box<dyn std::error::Error>> {
for mode in &[PinMode::Direct, PinMode::Recursive, PinMode::Indirect] {
let key = get_pin_key(block, mode);
let key_val = serde_wasm_bindgen::to_value(&key)?;
let val = tree.get(key_val)?.await?;
if val.is_some() {
return Ok(Some((*mode, key)));
}
}
Ok(None)
}
async fn is_not_pinned_or_pinned_indirectly(
tx: &Transaction,
tree: &ObjectStore,
block: &Cid,
) -> Result<bool, Box<dyn std::error::Error>> {
match get_pinned_mode(tx, tree, block).await? {
Some((PinMode::Indirect, _)) | None => Ok(true),
_ => Ok(false),
}
}