use std::path::{Path, PathBuf};
use std::sync::Arc;
use obj_core::Document;
use obj_core::{Id, Result};
use crate::asynchronous::collection::AsyncCollection;
use crate::asynchronous::query::AsyncQuery;
use crate::{Config, Db, DbStat, IntegrityReport, ReadTxn, WriteTxn};
#[derive(Clone, Debug)]
pub struct AsyncDb {
inner: Arc<Db>,
}
impl AsyncDb {
#[must_use]
pub fn from_blocking(db: Db) -> Self {
Self {
inner: Arc::new(db),
}
}
pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
Self::open_with(path, Config::default()).await
}
pub async fn open_with<P: AsRef<Path>>(path: P, config: Config) -> Result<Self> {
let path_buf: PathBuf = path.as_ref().to_path_buf();
unblock(move || Db::open_with(path_buf, config).map(Self::from_blocking)).await
}
pub async fn memory() -> Result<Self> {
Self::memory_with(Config::default()).await
}
pub async fn memory_with(config: Config) -> Result<Self> {
unblock(move || Db::memory_with(config).map(Self::from_blocking)).await
}
pub async fn open_readonly<P: AsRef<Path>>(path: P) -> Result<Self> {
let path_buf: PathBuf = path.as_ref().to_path_buf();
unblock(move || Db::open_readonly(path_buf).map(Self::from_blocking)).await
}
#[must_use]
pub fn as_blocking(&self) -> &Db {
&self.inner
}
pub async fn insert<T>(&self, doc: T) -> Result<Id>
where
T: Document + Send + 'static,
{
let inner = Arc::clone(&self.inner);
unblock(move || inner.insert(doc)).await
}
pub async fn get<T>(&self, id: Id) -> Result<Option<T>>
where
T: Document + Send + 'static,
{
let inner = Arc::clone(&self.inner);
unblock(move || inner.get::<T>(id)).await
}
pub async fn update<T, F>(&self, id: Id, f: F) -> Result<()>
where
T: Document + Send + 'static,
F: FnOnce(&mut T) + Send + 'static,
{
let inner = Arc::clone(&self.inner);
unblock(move || inner.update::<T, F>(id, f)).await
}
pub async fn delete<T>(&self, id: Id) -> Result<bool>
where
T: Document + Send + 'static,
{
let inner = Arc::clone(&self.inner);
unblock(move || inner.delete::<T>(id)).await
}
pub async fn upsert<T>(&self, id: Id, doc: T) -> Result<()>
where
T: Document + Send + 'static,
{
let inner = Arc::clone(&self.inner);
unblock(move || inner.upsert(id, doc)).await
}
pub async fn find_unique<T, K>(&self, index_name: &str, key: K) -> Result<Option<T>>
where
T: Document + Send + 'static,
K: Into<obj_core::codec::Dynamic> + Send + 'static,
{
let inner = Arc::clone(&self.inner);
let name = index_name.to_owned();
unblock(move || inner.find_unique::<T>(&name, key)).await
}
pub async fn all<T>(&self) -> Result<Vec<T>>
where
T: Document + Send + 'static,
{
let inner = Arc::clone(&self.inner);
unblock(move || inner.all::<T>()).await
}
pub async fn transaction<R, F>(&self, body: F) -> Result<R>
where
R: Send + 'static,
F: FnOnce(&mut WriteTxn<'_>) -> Result<R> + Send + 'static,
{
let inner = Arc::clone(&self.inner);
unblock(move || inner.transaction(body)).await
}
pub async fn read_transaction<R, F>(&self, body: F) -> Result<R>
where
R: Send + 'static,
F: FnOnce(&ReadTxn<'_>) -> Result<R> + Send + 'static,
{
let inner = Arc::clone(&self.inner);
unblock(move || inner.read_transaction(body)).await
}
pub async fn backup_to<P: AsRef<Path>>(&self, dest: P) -> Result<()> {
let inner = Arc::clone(&self.inner);
let dest_buf: PathBuf = dest.as_ref().to_path_buf();
unblock(move || inner.backup_to(dest_buf)).await
}
pub async fn attach<P>(&mut self, path: P, namespace: impl Into<String>) -> Result<()>
where
P: AsRef<Path>,
{
let path_buf: PathBuf = path.as_ref().to_path_buf();
let namespace = namespace.into();
self.with_mut_db(move |db| db.attach(&path_buf, namespace))
.await
}
pub async fn detach(&mut self, namespace: &str) -> Result<()> {
let namespace = namespace.to_owned();
self.with_mut_db(move |db| db.detach(&namespace)).await
}
async fn with_mut_db<F, R>(&mut self, f: F) -> Result<R>
where
F: FnOnce(&mut Db) -> Result<R> + Send + 'static,
R: Send + 'static,
{
let sentinel = match Db::memory() {
Ok(db) => Arc::new(db),
Err(e) => return Err(e),
};
let original = std::mem::replace(&mut self.inner, sentinel);
let mut db = match Arc::try_unwrap(original) {
Ok(db) => db,
Err(arc) => {
self.inner = arc;
return Err(obj_core::Error::Busy {
kind: obj_core::LockKind::WriterInProcess,
});
}
};
let (db_back, result) = unblock(move || {
let result = f(&mut db);
(db, result)
})
.await;
self.inner = Arc::new(db_back);
result
}
pub async fn integrity_check(&self) -> Result<IntegrityReport> {
let inner = Arc::clone(&self.inner);
unblock(move || inner.integrity_check()).await
}
pub async fn stat(&self) -> Result<DbStat> {
let inner = Arc::clone(&self.inner);
unblock(move || inner.stat()).await
}
#[must_use]
pub fn collection<T>(&self, name: impl Into<String>) -> AsyncCollection<T>
where
T: Document + Send + 'static,
{
AsyncCollection::lazy(Arc::clone(&self.inner), name.into())
}
#[must_use]
pub fn query<T>(&self) -> AsyncQuery<T>
where
T: Document + Send + 'static,
{
AsyncQuery::new(Arc::clone(&self.inner))
}
}
pub(crate) async fn unblock<F, R>(f: F) -> R
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
#[cfg(feature = "tracing")]
let span = tracing::Span::current();
blocking::unblock(move || {
#[cfg(feature = "tracing")]
let _guard = span.enter();
f()
})
.await
}