use crate::*;
cfg_if! {
if #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] {
use keyvaluedb_web::*;
use keyvaluedb::*;
} else {
use keyvaluedb_sqlite::*;
use keyvaluedb::*;
}
}
impl_veilid_log_facility!("tstore");
#[must_use]
#[derive(Debug)]
struct CryptInfo {
secret: SharedSecret,
}
impl CryptInfo {
pub fn new(secret: SharedSecret) -> Self {
Self { secret }
}
}
#[must_use]
pub struct TableDBUnlockedInner {
registry: VeilidComponentRegistry,
table: String,
database: Database,
commit_lock: AsyncMutex<()>,
encrypt_info: Option<CryptInfo>,
decrypt_info: Option<CryptInfo>,
}
impl fmt::Debug for TableDBUnlockedInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "TableDBUnlockedInner(table={})", self.table)
}
}
#[derive(Debug, Clone)]
#[must_use]
pub struct TableDB {
opened_column_count: u32,
unlocked_inner: Arc<TableDBUnlockedInner>,
}
impl VeilidComponentRegistryAccessor for TableDB {
fn registry(&self) -> VeilidComponentRegistry {
self.unlocked_inner.registry.clone()
}
}
impl TableDB {
pub(super) fn new(
table: String,
registry: VeilidComponentRegistry,
database: Database,
encryption_key: Option<SharedSecret>,
decryption_key: Option<SharedSecret>,
opened_column_count: u32,
) -> Self {
let encrypt_info = encryption_key.map(CryptInfo::new);
let decrypt_info = decryption_key.map(CryptInfo::new);
let total_columns = database.num_columns().unwrap_or_log();
Self {
opened_column_count: if opened_column_count == 0 {
total_columns
} else {
opened_column_count
},
unlocked_inner: Arc::new(TableDBUnlockedInner {
registry,
table,
database,
commit_lock: AsyncMutex::new(()),
encrypt_info,
decrypt_info,
}),
}
}
pub(super) fn new_from_unlocked_inner(
unlocked_inner: Arc<TableDBUnlockedInner>,
opened_column_count: u32,
) -> Self {
let db = &unlocked_inner.database;
let total_columns = db.num_columns().unwrap_or_log();
Self {
opened_column_count: if opened_column_count == 0 {
total_columns
} else {
opened_column_count
},
unlocked_inner,
}
}
pub(super) fn unlocked_inner(&self) -> Arc<TableDBUnlockedInner> {
self.unlocked_inner.clone()
}
#[must_use]
pub fn table_name(&self) -> String {
self.unlocked_inner.table.clone()
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "tstore", skip_all)
)]
#[must_use]
pub fn io_stats(&self, kind: IoStatsKind) -> IoStats {
self.unlocked_inner.database.io_stats(kind)
}
pub async fn cleanup(&self) -> VeilidAPIResult<()> {
self.unlocked_inner
.database
.cleanup()
.measure_debug(
TimestampDuration::new_secs(1),
veilid_log_dbg!(self, "TableDB::cleanup {}", self.table_name()),
)
.await
.map_err(VeilidAPIError::internal)
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "tstore", skip_all)
)]
pub fn get_column_count(&self) -> VeilidAPIResult<u32> {
let db = &self.unlocked_inner.database;
db.num_columns().map_err(VeilidAPIError::from)
}
pub fn estimate_storage_size(
&self,
_col: u32,
key: &[u8],
value: &[u8],
) -> VeilidAPIResult<u64> {
let size =
1 +
1 +
key.len() * 2 +
4 +
value.len() +
4 +
4;
size.try_into().map_err(VeilidAPIError::internal)
}
pub fn estimate_storage_size_json<T>(
&self,
col: u32,
key: &[u8],
value: &T,
) -> VeilidAPIResult<u64>
where
T: serde::Serialize,
{
let value_json = serde_json::to_vec(value).map_err(VeilidAPIError::internal)?;
self.estimate_storage_size(col, key, &value_json)
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "tstore", skip_all)
)]
pub(in crate::table_store) async fn maybe_encrypt(
&self,
data: &[u8],
keyed_nonce: bool,
) -> Vec<u8> {
let data = compress_prepend_size(data);
if let Some(ei) = &self.unlocked_inner.encrypt_info {
let crypto = self.crypto();
let vcrypto = crypto.get_async(ei.secret.kind()).unwrap_or_log();
let mut out = BytesMut::zeroed(vcrypto.nonce_length() + data.len());
if keyed_nonce {
let mut noncedata =
BytesMut::with_capacity(data.len() + ei.secret.ref_value().len());
noncedata.extend_from_slice(&data);
noncedata.extend_from_slice(ei.secret.ref_value());
let noncehash = vcrypto.generate_hash(noncedata.freeze()).await.value();
out.as_mut()[0..vcrypto.nonce_length()]
.copy_from_slice(&noncehash.as_ref()[0..vcrypto.nonce_length()]);
} else {
random_bytes(&mut out[0..vcrypto.nonce_length()]);
}
let nonce = Nonce::new(&out[0..vcrypto.nonce_length()]);
let out = vcrypto
.crypt_b2b_no_auth(
Bytes::from(data),
out,
vcrypto.nonce_length(),
&nonce,
&ei.secret,
)
.await
.unwrap_or_log();
out.to_vec()
} else {
data
}
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "tstore", skip_all)
)]
pub(in crate::table_store) async fn maybe_decrypt(
&self,
data: &[u8],
) -> std::io::Result<Vec<u8>> {
if let Some(di) = &self.unlocked_inner.decrypt_info {
let crypto = self.crypto();
let vcrypto = crypto.get_async(di.secret.kind()).unwrap_or_log();
assert!(data.len() >= vcrypto.nonce_length());
if data.len() == vcrypto.nonce_length() {
return Ok(Vec::new());
}
let out = BytesMut::zeroed(data.len() - vcrypto.nonce_length());
let mut data = Bytes::copy_from_slice(data);
let data_start = data.split_to(vcrypto.nonce_length());
let out = vcrypto
.crypt_b2b_no_auth(data, out, 0, &Nonce::new(data_start.as_ref()), &di.secret)
.await
.unwrap_or_log();
decompress_size_prepended(out.as_ref(), None)
.map_err(|e| std::io::Error::other(e.to_string()))
} else {
decompress_size_prepended(data, None).map_err(|e| std::io::Error::other(e.to_string()))
}
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "tstore", skip_all)
)]
pub async fn get_keys(&self, col: u32) -> VeilidAPIResult<Vec<Vec<u8>>> {
if col >= self.opened_column_count {
apibail_generic!(
"Column exceeds opened column count {} >= {}",
col,
self.opened_column_count
);
}
let db = self.unlocked_inner.database.clone();
let out = Vec::new();
let (mut out, _) = db
.iter_keys(col, None, out, |out, ekey| {
out.push(ekey.clone());
Ok(Option::<()>::None)
})
.await
.map_err(VeilidAPIError::from)?;
for k in &mut out {
*k = self.maybe_decrypt(k).await.map_err(VeilidAPIError::from)?;
}
Ok(out)
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "tstore", skip_all)
)]
pub async fn get_key_count(&self, col: u32) -> VeilidAPIResult<u64> {
if col >= self.opened_column_count {
apibail_generic!(
"Column exceeds opened column count {} >= {}",
col,
self.opened_column_count
);
}
let db = self.unlocked_inner.database.clone();
let key_count = db.num_keys(col).await.map_err(VeilidAPIError::from)?;
Ok(key_count)
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "tstore", skip_all)
)]
#[must_use]
pub fn transact(&self) -> TableDBTransaction {
let dbt = self.unlocked_inner.database.transaction();
TableDBTransaction::new(self.clone(), dbt)
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "tstore", skip_all)
)]
pub async fn store(&self, col: u32, key: &[u8], value: &[u8]) -> VeilidAPIResult<()> {
if col >= self.opened_column_count {
apibail_generic!(
"Column exceeds opened column count {} >= {}",
col,
self.opened_column_count
);
}
let db = self.unlocked_inner.database.clone();
let mut dbt = db.transaction();
dbt.put(
col,
self.maybe_encrypt(key, true).await,
self.maybe_encrypt(value, false).await,
);
db.write(dbt).await.map_err(VeilidAPIError::generic)
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "tstore", skip_all)
)]
pub async fn store_json<T>(&self, col: u32, key: &[u8], value: &T) -> VeilidAPIResult<()>
where
T: serde::Serialize,
{
let value = serde_json::to_vec(value).map_err(VeilidAPIError::internal)?;
self.store(col, key, &value).await
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "tstore", skip_all)
)]
pub async fn load(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<Vec<u8>>> {
if col >= self.opened_column_count {
apibail_generic!(
"Column exceeds opened column count {} >= {}",
col,
self.opened_column_count
);
}
let db = self.unlocked_inner.database.clone();
let key = self.maybe_encrypt(key, true).await;
match db.get(col, &key).await.map_err(VeilidAPIError::from)? {
Some(v) => Ok(Some(
self.maybe_decrypt(&v).await.map_err(VeilidAPIError::from)?,
)),
None => Ok(None),
}
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "tstore", skip_all)
)]
pub async fn load_json<T>(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<T>>
where
T: for<'de> serde::Deserialize<'de>,
{
let out = match self.load(col, key).await? {
Some(v) => Some(serde_json::from_slice(&v).map_err(VeilidAPIError::internal)?),
None => None,
};
Ok(out)
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "tstore", skip_all)
)]
pub async fn delete(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<Vec<u8>>> {
if col >= self.opened_column_count {
apibail_generic!(
"Column exceeds opened column count {} >= {}",
col,
self.opened_column_count
);
}
let key = self.maybe_encrypt(key, true).await;
let db = self.unlocked_inner.database.clone();
match db.delete(col, &key).await.map_err(VeilidAPIError::from)? {
Some(v) => Ok(Some(
self.maybe_decrypt(&v).await.map_err(VeilidAPIError::from)?,
)),
None => Ok(None),
}
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "tstore", skip_all)
)]
pub async fn delete_json<T>(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<T>>
where
T: for<'de> serde::Deserialize<'de>,
{
let old_value = match self.delete(col, key).await? {
Some(v) => Some(serde_json::from_slice(&v).map_err(VeilidAPIError::internal)?),
None => None,
};
Ok(old_value)
}
}
struct TableDBTransactionInner {
registry: VeilidComponentRegistry,
dbt: Option<DBTransaction>,
}
impl fmt::Debug for TableDBTransactionInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"TableDBTransactionInner({})",
match &self.dbt {
Some(dbt) => format!("len={}", dbt.ops.len()),
None => "".to_owned(),
}
)
}
}
impl Drop for TableDBTransactionInner {
fn drop(&mut self) {
if self.dbt.is_some() {
let registry = &self.registry;
veilid_log!(registry error "Dropped transaction without commit or rollback");
}
}
}
#[derive(Debug, Clone)]
pub struct TableDBTransaction {
db: TableDB,
inner: Arc<Mutex<TableDBTransactionInner>>,
}
impl VeilidComponentRegistryAccessor for TableDBTransaction {
fn registry(&self) -> VeilidComponentRegistry {
self.db.registry()
}
}
impl TableDBTransaction {
fn new(db: TableDB, dbt: DBTransaction) -> Self {
let registry = db.registry();
Self {
db,
inner: Arc::new(Mutex::new(TableDBTransactionInner {
registry,
dbt: Some(dbt),
})),
}
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "tstore", skip_all)
)]
pub async fn commit(self) -> VeilidAPIResult<()> {
let dbt = {
let mut inner = self.inner.lock();
inner
.dbt
.take()
.ok_or_else(|| VeilidAPIError::generic("transaction already completed"))?
};
if dbt.ops.is_empty() {
return Ok(());
}
let db = self.db.unlocked_inner.database.clone();
let _commit_lock = self
.db
.unlocked_inner
.commit_lock
.lock()
.measure_debug(
TimestampDuration::new_ms(200),
veilid_log_dbg!(
self,
"TableDBTransaction({})::commit lock",
self.db.table_name()
),
)
.await;
db.write(dbt).await.map_err(|e| {
veilid_log!(self error "commit failed, transaction lost: {:?}", e);
VeilidAPIError::generic(format!("commit failed, transaction lost: {}", e))
})
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "tstore", skip_all)
)]
pub fn rollback(self) {
let mut inner = self.inner.lock();
inner.dbt = None;
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "tstore", skip_all)
)]
pub async fn store(&self, col: u32, key: &[u8], value: &[u8]) -> VeilidAPIResult<()> {
if col >= self.db.opened_column_count {
apibail_generic!(
"Column exceeds opened column count {} >= {}",
col,
self.db.opened_column_count
);
}
let key = self.db.maybe_encrypt(key, true).await;
let value = self.db.maybe_encrypt(value, false).await;
let mut inner = self.inner.lock();
inner
.dbt
.as_mut()
.ok_or_else(|| VeilidAPIError::generic("store failed, transaction already completed"))?
.put_owned(col, key, value);
Ok(())
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "tstore", skip_all)
)]
pub async fn store_json<T>(&self, col: u32, key: &[u8], value: &T) -> VeilidAPIResult<()>
where
T: serde::Serialize,
{
let value = serde_json::to_vec(value).map_err(VeilidAPIError::internal)?;
self.store(col, key, &value).await
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "tstore", skip_all)
)]
pub async fn delete(&self, col: u32, key: &[u8]) -> VeilidAPIResult<()> {
if col >= self.db.opened_column_count {
apibail_generic!(
"Column exceeds opened column count {} >= {}",
col,
self.db.opened_column_count
);
}
let key = self.db.maybe_encrypt(key, true).await;
let mut inner = self.inner.lock();
inner
.dbt
.as_mut()
.ok_or_else(|| VeilidAPIError::generic("delete failed, transaction already completed"))?
.delete_owned(col, key);
Ok(())
}
}