use std::ops::RangeBounds;
use std::path::{Path, PathBuf};
use std::sync::Arc;
#[cfg(feature = "ttl")]
use std::time::Duration;
use tokio::task::spawn_blocking;
#[cfg(feature = "ttl")]
use crate::Ttl;
use crate::{Emdb, EmdbStats, Error, Result};
fn join_err(err: tokio::task::JoinError) -> Error {
Error::Io(std::io::Error::other(format!("async join: {err}")))
}
async fn blocking<F, R>(f: F) -> Result<R>
where
F: FnOnce() -> Result<R> + Send + 'static,
R: Send + 'static,
{
spawn_blocking(f).await.map_err(join_err)?
}
#[cfg(feature = "ttl")]
async fn blocking_infallible<F, R>(f: F) -> Result<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
spawn_blocking(f).await.map_err(join_err)
}
const STREAM_CHANNEL_CAPACITY: usize = 64;
fn spawn_iter_stream<I, T>(iter: I) -> tokio_stream::wrappers::ReceiverStream<T>
where
I: Iterator<Item = T> + Send + 'static,
T: Send + 'static,
{
let (tx, rx) = tokio::sync::mpsc::channel::<T>(STREAM_CHANNEL_CAPACITY);
let _pump: tokio::task::JoinHandle<()> = spawn_blocking(move || {
for item in iter {
if tx.blocking_send(item).is_err() {
break;
}
}
});
tokio_stream::wrappers::ReceiverStream::new(rx)
}
#[derive(Clone, Debug)]
pub struct AsyncEmdb {
inner: Arc<Emdb>,
}
impl AsyncEmdb {
pub async fn open(path: impl AsRef<Path>) -> Result<Self> {
let path: PathBuf = path.as_ref().to_path_buf();
let emdb = blocking(move || Emdb::open(&path)).await?;
Ok(Self {
inner: Arc::new(emdb),
})
}
#[must_use]
pub fn open_in_memory() -> Self {
Self {
inner: Arc::new(Emdb::open_in_memory()),
}
}
#[must_use]
pub fn builder() -> crate::EmdbBuilder {
crate::EmdbBuilder::new()
}
#[must_use]
pub fn from_sync(emdb: Emdb) -> Self {
Self {
inner: Arc::new(emdb),
}
}
#[must_use]
pub fn sync_handle(&self) -> Arc<Emdb> {
Arc::clone(&self.inner)
}
#[must_use]
pub fn path(&self) -> &Path {
self.inner.path()
}
pub async fn insert<K, V>(&self, key: K, value: V) -> Result<()>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let inner = Arc::clone(&self.inner);
let key = key.as_ref().to_vec();
let value = value.as_ref().to_vec();
blocking(move || inner.insert(key, value)).await
}
pub async fn insert_many<I, K, V>(&self, items: I) -> Result<()>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let inner = Arc::clone(&self.inner);
let owned: Vec<(Vec<u8>, Vec<u8>)> = items
.into_iter()
.map(|(k, v)| (k.as_ref().to_vec(), v.as_ref().to_vec()))
.collect();
blocking(move || inner.insert_many(owned)).await
}
pub async fn get<K>(&self, key: K) -> Result<Option<Vec<u8>>>
where
K: AsRef<[u8]>,
{
let inner = Arc::clone(&self.inner);
let key = key.as_ref().to_vec();
blocking(move || inner.get(key)).await
}
pub async fn remove<K>(&self, key: K) -> Result<Option<Vec<u8>>>
where
K: AsRef<[u8]>,
{
let inner = Arc::clone(&self.inner);
let key = key.as_ref().to_vec();
blocking(move || inner.remove(key)).await
}
pub async fn contains_key<K>(&self, key: K) -> Result<bool>
where
K: AsRef<[u8]>,
{
let inner = Arc::clone(&self.inner);
let key = key.as_ref().to_vec();
blocking(move || inner.contains_key(key)).await
}
pub async fn len(&self) -> Result<usize> {
let inner = Arc::clone(&self.inner);
blocking(move || inner.len()).await
}
pub async fn is_empty(&self) -> Result<bool> {
let inner = Arc::clone(&self.inner);
blocking(move || inner.is_empty()).await
}
pub async fn clear(&self) -> Result<()> {
let inner = Arc::clone(&self.inner);
blocking(move || inner.clear()).await
}
pub async fn flush(&self) -> Result<()> {
let inner = Arc::clone(&self.inner);
blocking(move || inner.flush()).await
}
pub async fn checkpoint(&self) -> Result<()> {
let inner = Arc::clone(&self.inner);
blocking(move || inner.checkpoint()).await
}
pub async fn stats(&self) -> Result<EmdbStats> {
let inner = Arc::clone(&self.inner);
blocking(move || inner.stats()).await
}
pub async fn backup_to(&self, target: impl AsRef<Path>) -> Result<()> {
let inner = Arc::clone(&self.inner);
let target: PathBuf = target.as_ref().to_path_buf();
blocking(move || inner.backup_to(&target)).await
}
pub async fn compact(&self) -> Result<()> {
let inner = Arc::clone(&self.inner);
blocking(move || inner.compact()).await
}
pub async fn iter(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
let inner = Arc::clone(&self.inner);
blocking(move || Ok(inner.iter()?.collect())).await
}
pub async fn keys(&self) -> Result<Vec<Vec<u8>>> {
let inner = Arc::clone(&self.inner);
blocking(move || Ok(inner.keys()?.collect())).await
}
pub async fn range<R>(&self, range: R) -> Result<Vec<(Vec<u8>, Vec<u8>)>>
where
R: RangeBounds<Vec<u8>> + Send + 'static,
{
let inner = Arc::clone(&self.inner);
blocking(move || inner.range(range)).await
}
pub async fn range_prefix<K>(&self, prefix: K) -> Result<Vec<(Vec<u8>, Vec<u8>)>>
where
K: AsRef<[u8]>,
{
let inner = Arc::clone(&self.inner);
let prefix = prefix.as_ref().to_vec();
blocking(move || inner.range_prefix(prefix)).await
}
pub async fn iter_from<K>(&self, start: K) -> Result<Vec<(Vec<u8>, Vec<u8>)>>
where
K: AsRef<[u8]>,
{
let inner = Arc::clone(&self.inner);
let start = start.as_ref().to_vec();
blocking(move || Ok(inner.iter_from(start)?.collect())).await
}
pub async fn iter_after<K>(&self, start: K) -> Result<Vec<(Vec<u8>, Vec<u8>)>>
where
K: AsRef<[u8]>,
{
let inner = Arc::clone(&self.inner);
let start = start.as_ref().to_vec();
blocking(move || Ok(inner.iter_after(start)?.collect())).await
}
pub async fn iter_stream(
&self,
) -> Result<tokio_stream::wrappers::ReceiverStream<(Vec<u8>, Vec<u8>)>> {
let inner = Arc::clone(&self.inner);
let iter = blocking(move || inner.iter()).await?;
Ok(spawn_iter_stream(iter))
}
pub async fn keys_stream(&self) -> Result<tokio_stream::wrappers::ReceiverStream<Vec<u8>>> {
let inner = Arc::clone(&self.inner);
let iter = blocking(move || inner.keys()).await?;
Ok(spawn_iter_stream(iter))
}
pub async fn range_stream<R>(
&self,
range: R,
) -> Result<tokio_stream::wrappers::ReceiverStream<(Vec<u8>, Vec<u8>)>>
where
R: RangeBounds<Vec<u8>> + Send + 'static,
{
let inner = Arc::clone(&self.inner);
let iter = blocking(move || inner.range_iter(range)).await?;
Ok(spawn_iter_stream(iter))
}
pub async fn range_prefix_stream<K>(
&self,
prefix: K,
) -> Result<tokio_stream::wrappers::ReceiverStream<(Vec<u8>, Vec<u8>)>>
where
K: AsRef<[u8]>,
{
let inner = Arc::clone(&self.inner);
let prefix = prefix.as_ref().to_vec();
let iter = blocking(move || inner.range_prefix_iter(prefix)).await?;
Ok(spawn_iter_stream(iter))
}
pub async fn iter_from_stream<K>(
&self,
start: K,
) -> Result<tokio_stream::wrappers::ReceiverStream<(Vec<u8>, Vec<u8>)>>
where
K: AsRef<[u8]>,
{
let inner = Arc::clone(&self.inner);
let start = start.as_ref().to_vec();
let iter = blocking(move || inner.iter_from(start)).await?;
Ok(spawn_iter_stream(iter))
}
pub async fn iter_after_stream<K>(
&self,
start: K,
) -> Result<tokio_stream::wrappers::ReceiverStream<(Vec<u8>, Vec<u8>)>>
where
K: AsRef<[u8]>,
{
let inner = Arc::clone(&self.inner);
let start = start.as_ref().to_vec();
let iter = blocking(move || inner.iter_after(start)).await?;
Ok(spawn_iter_stream(iter))
}
#[cfg(feature = "ttl")]
pub async fn insert_with_ttl<K, V>(&self, key: K, value: V, ttl: Ttl) -> Result<()>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let inner = Arc::clone(&self.inner);
let key = key.as_ref().to_vec();
let value = value.as_ref().to_vec();
blocking(move || inner.insert_with_ttl(key, value, ttl)).await
}
#[cfg(feature = "ttl")]
pub async fn expires_at<K>(&self, key: K) -> Result<Option<u64>>
where
K: AsRef<[u8]>,
{
let inner = Arc::clone(&self.inner);
let key = key.as_ref().to_vec();
blocking(move || inner.expires_at(key)).await
}
#[cfg(feature = "ttl")]
pub async fn ttl<K>(&self, key: K) -> Result<Option<Duration>>
where
K: AsRef<[u8]>,
{
let inner = Arc::clone(&self.inner);
let key = key.as_ref().to_vec();
blocking(move || inner.ttl(key)).await
}
#[cfg(feature = "ttl")]
pub async fn persist<K>(&self, key: K) -> Result<bool>
where
K: AsRef<[u8]>,
{
let inner = Arc::clone(&self.inner);
let key = key.as_ref().to_vec();
blocking(move || inner.persist(key)).await
}
#[cfg(feature = "ttl")]
pub async fn sweep_expired(&self) -> Result<usize> {
let inner = Arc::clone(&self.inner);
blocking_infallible(move || inner.sweep_expired()).await
}
pub async fn namespace<N: AsRef<str>>(&self, name: N) -> Result<AsyncNamespace> {
let inner = Arc::clone(&self.inner);
let name = name.as_ref().to_owned();
let ns = blocking(move || inner.namespace(name)).await?;
Ok(AsyncNamespace::from_sync(ns))
}
pub async fn drop_namespace<N: AsRef<str>>(&self, name: N) -> Result<bool> {
let inner = Arc::clone(&self.inner);
let name = name.as_ref().to_owned();
blocking(move || inner.drop_namespace(name)).await
}
pub async fn list_namespaces(&self) -> Result<Vec<String>> {
let inner = Arc::clone(&self.inner);
blocking(move || inner.list_namespaces()).await
}
pub async fn transaction<F, T>(&self, f: F) -> Result<T>
where
F: FnOnce(&mut crate::Transaction<'_>) -> Result<T> + Send + 'static,
T: Send + 'static,
{
let inner = Arc::clone(&self.inner);
blocking(move || inner.transaction(f)).await
}
}
#[derive(Clone)]
pub struct AsyncNamespace {
inner: crate::Namespace,
}
impl std::fmt::Debug for AsyncNamespace {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AsyncNamespace")
.field("name", &self.inner.name())
.finish()
}
}
impl AsyncNamespace {
pub(crate) fn from_sync(ns: crate::Namespace) -> Self {
Self { inner: ns }
}
#[must_use]
pub fn name(&self) -> &str {
self.inner.name()
}
#[must_use]
pub fn sync_handle(&self) -> crate::Namespace {
self.inner.clone()
}
pub async fn insert<K, V>(&self, key: K, value: V) -> Result<()>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let ns = self.inner.clone();
let key = key.as_ref().to_vec();
let value = value.as_ref().to_vec();
blocking(move || ns.insert(key, value)).await
}
pub async fn insert_many<I, K, V>(&self, items: I) -> Result<()>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let ns = self.inner.clone();
let owned: Vec<(Vec<u8>, Vec<u8>)> = items
.into_iter()
.map(|(k, v)| (k.as_ref().to_vec(), v.as_ref().to_vec()))
.collect();
blocking(move || ns.insert_many(owned)).await
}
pub async fn get<K>(&self, key: K) -> Result<Option<Vec<u8>>>
where
K: AsRef<[u8]>,
{
let ns = self.inner.clone();
let key = key.as_ref().to_vec();
blocking(move || ns.get(key)).await
}
pub async fn remove<K>(&self, key: K) -> Result<Option<Vec<u8>>>
where
K: AsRef<[u8]>,
{
let ns = self.inner.clone();
let key = key.as_ref().to_vec();
blocking(move || ns.remove(key)).await
}
pub async fn contains_key<K>(&self, key: K) -> Result<bool>
where
K: AsRef<[u8]>,
{
let ns = self.inner.clone();
let key = key.as_ref().to_vec();
blocking(move || ns.contains_key(key)).await
}
pub async fn len(&self) -> Result<usize> {
let ns = self.inner.clone();
blocking(move || ns.len()).await
}
pub async fn is_empty(&self) -> Result<bool> {
let ns = self.inner.clone();
blocking(move || ns.is_empty()).await
}
pub async fn clear(&self) -> Result<()> {
let ns = self.inner.clone();
blocking(move || ns.clear()).await
}
pub async fn iter(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
let ns = self.inner.clone();
blocking(move || Ok(ns.iter()?.collect())).await
}
pub async fn keys(&self) -> Result<Vec<Vec<u8>>> {
let ns = self.inner.clone();
blocking(move || Ok(ns.keys()?.collect())).await
}
pub async fn range<R>(&self, range: R) -> Result<Vec<(Vec<u8>, Vec<u8>)>>
where
R: RangeBounds<Vec<u8>> + Send + 'static,
{
let ns = self.inner.clone();
blocking(move || ns.range(range)).await
}
pub async fn range_prefix<K>(&self, prefix: K) -> Result<Vec<(Vec<u8>, Vec<u8>)>>
where
K: AsRef<[u8]>,
{
let ns = self.inner.clone();
let prefix = prefix.as_ref().to_vec();
blocking(move || ns.range_prefix(prefix)).await
}
pub async fn iter_from<K>(&self, start: K) -> Result<Vec<(Vec<u8>, Vec<u8>)>>
where
K: AsRef<[u8]>,
{
let ns = self.inner.clone();
let start = start.as_ref().to_vec();
blocking(move || Ok(ns.iter_from(start)?.collect())).await
}
pub async fn iter_after<K>(&self, start: K) -> Result<Vec<(Vec<u8>, Vec<u8>)>>
where
K: AsRef<[u8]>,
{
let ns = self.inner.clone();
let start = start.as_ref().to_vec();
blocking(move || Ok(ns.iter_after(start)?.collect())).await
}
pub async fn iter_stream(
&self,
) -> Result<tokio_stream::wrappers::ReceiverStream<(Vec<u8>, Vec<u8>)>> {
let ns = self.inner.clone();
let iter = blocking(move || ns.iter()).await?;
Ok(spawn_iter_stream(iter))
}
pub async fn keys_stream(&self) -> Result<tokio_stream::wrappers::ReceiverStream<Vec<u8>>> {
let ns = self.inner.clone();
let iter = blocking(move || ns.keys()).await?;
Ok(spawn_iter_stream(iter))
}
pub async fn range_stream<R>(
&self,
range: R,
) -> Result<tokio_stream::wrappers::ReceiverStream<(Vec<u8>, Vec<u8>)>>
where
R: RangeBounds<Vec<u8>> + Send + 'static,
{
let ns = self.inner.clone();
let iter = blocking(move || ns.range_iter(range)).await?;
Ok(spawn_iter_stream(iter))
}
pub async fn range_prefix_stream<K>(
&self,
prefix: K,
) -> Result<tokio_stream::wrappers::ReceiverStream<(Vec<u8>, Vec<u8>)>>
where
K: AsRef<[u8]>,
{
let ns = self.inner.clone();
let prefix = prefix.as_ref().to_vec();
let iter = blocking(move || ns.range_prefix_iter(prefix)).await?;
Ok(spawn_iter_stream(iter))
}
pub async fn iter_from_stream<K>(
&self,
start: K,
) -> Result<tokio_stream::wrappers::ReceiverStream<(Vec<u8>, Vec<u8>)>>
where
K: AsRef<[u8]>,
{
let ns = self.inner.clone();
let start = start.as_ref().to_vec();
let iter = blocking(move || ns.iter_from(start)).await?;
Ok(spawn_iter_stream(iter))
}
pub async fn iter_after_stream<K>(
&self,
start: K,
) -> Result<tokio_stream::wrappers::ReceiverStream<(Vec<u8>, Vec<u8>)>>
where
K: AsRef<[u8]>,
{
let ns = self.inner.clone();
let start = start.as_ref().to_vec();
let iter = blocking(move || ns.iter_after(start)).await?;
Ok(spawn_iter_stream(iter))
}
}