use crate::engine::database::Database;
use crate::error::DbxResult;
use std::sync::Arc;
use tokio::task::spawn_blocking;
#[derive(Clone)]
pub struct DatabaseAsync {
inner: Arc<Database>,
}
impl DatabaseAsync {
pub fn new(db: Arc<Database>) -> Self {
Self { inner: db }
}
pub fn inner(&self) -> Arc<Database> {
self.inner.clone()
}
pub async fn insert(&self, table: String, key: Vec<u8>, value: Vec<u8>) -> DbxResult<()> {
let db = self.inner.clone();
spawn_blocking(move || db.insert(&table, &key, &value))
.await
.unwrap_or_else(|e| {
Err(crate::error::DbxError::InvalidOperation {
message: "Async insert thread panicked".to_string(),
context: e.to_string(),
})
})
}
pub async fn get(&self, table: String, key: Vec<u8>) -> DbxResult<Option<Vec<u8>>> {
let db = self.inner.clone();
spawn_blocking(move || db.get(&table, &key))
.await
.unwrap_or_else(|e| {
Err(crate::error::DbxError::InvalidOperation {
message: "Async get thread panicked".to_string(),
context: e.to_string(),
})
})
}
pub async fn delete(&self, table: String, key: Vec<u8>) -> DbxResult<()> {
let db = self.inner.clone();
spawn_blocking(move || db.delete(&table, &key).map(|_| ()))
.await
.unwrap_or_else(|e| {
Err(crate::error::DbxError::InvalidOperation {
message: "Async delete thread panicked".to_string(),
context: e.to_string(),
})
})
}
pub async fn insert_if_not_exists(
&self,
table: String,
key: Vec<u8>,
value: Vec<u8>,
) -> DbxResult<bool> {
let db = self.inner.clone();
spawn_blocking(move || db.insert_if_not_exists(&table, &key, &value))
.await
.unwrap_or_else(|e| {
Err(crate::error::DbxError::InvalidOperation {
message: "Async CAS thread panicked".to_string(),
context: e.to_string(),
})
})
}
pub async fn compare_and_swap(
&self,
table: String,
key: Vec<u8>,
expected: Vec<u8>,
new_value: Vec<u8>,
) -> DbxResult<bool> {
let db = self.inner.clone();
spawn_blocking(move || db.compare_and_swap(&table, &key, &expected, &new_value))
.await
.unwrap_or_else(|e| {
Err(crate::error::DbxError::InvalidOperation {
message: "Async CAS thread panicked".to_string(),
context: e.to_string(),
})
})
}
}
use crate::grid::dlm::DistributedLockManager;
use std::time::Duration;
#[derive(Clone)]
pub struct GridDatabaseAsync {
inner: Arc<Database>,
dlm: Arc<DistributedLockManager>,
}
impl GridDatabaseAsync {
pub fn new(db: Arc<Database>, dlm: Arc<DistributedLockManager>) -> Self {
Self { inner: db, dlm }
}
pub async fn insert_with_lock(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<()> {
let fencing_token = self
.dlm
.acquire(table, key, 5000, Duration::from_secs(3))
.await
.map_err(|e| crate::error::DbxError::InvalidOperation {
message: "DLM acquire failed (insert)".to_string(),
context: format!("{:?}", e),
})?;
let db = self.inner.clone();
let t = table.to_string();
let k = key.to_vec();
let v = value.to_vec();
let result = spawn_blocking(move || db.insert(&t, &k, &v))
.await
.unwrap_or_else(|e| {
Err(crate::error::DbxError::InvalidOperation {
message: "Grid async insert thread panicked".to_string(),
context: e.to_string(),
})
});
self.dlm.release(table, key, fencing_token).await;
result
}
pub async fn compare_and_swap_with_lock(
&self,
table: &str,
key: &[u8],
expected: &[u8],
new_value: &[u8],
) -> DbxResult<bool> {
let fencing_token = self
.dlm
.acquire(table, key, 5000, Duration::from_secs(3))
.await
.map_err(|e| crate::error::DbxError::InvalidOperation {
message: "DLM acquire failed (CAS)".to_string(),
context: format!("{:?}", e),
})?;
let db = self.inner.clone();
let t = table.to_string();
let k = key.to_vec();
let ex = expected.to_vec();
let nv = new_value.to_vec();
let result = spawn_blocking(move || db.compare_and_swap(&t, &k, &ex, &nv))
.await
.unwrap_or_else(|e| {
Err(crate::error::DbxError::InvalidOperation {
message: "Grid async CAS thread panicked".to_string(),
context: e.to_string(),
})
});
self.dlm.release(table, key, fencing_token).await;
result
}
}