use super::*;
use crate::storage_manager::OutboundTransactionHandle;
impl_veilid_log_facility!("veilid_api");
#[derive(Clone)]
#[must_use]
pub struct DHTTransaction {
api: VeilidAPI,
inner: Arc<Mutex<DHTTransactionInner>>,
}
impl fmt::Debug for DHTTransaction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DHTTransaction")
.field("handle", &self.inner.lock().opt_transaction_handle)
.finish()
}
}
impl DHTTransaction {
pub(super) fn new(api: VeilidAPI, handle: OutboundTransactionHandle) -> VeilidAPIResult<Self> {
let registry = api.core_context()?.registry();
Ok(Self {
api,
inner: Arc::new(Mutex::new(DHTTransactionInner {
registry,
opt_transaction_handle: Some(handle),
})),
})
}
pub fn api(&self) -> VeilidAPI {
self.api.clone()
}
#[must_use]
pub(crate) fn log_key(&self) -> &str {
self.api.log_key()
}
#[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key(), transaction_handle), skip(self), ret))]
pub async fn commit(self) -> VeilidAPIResult<()> {
record_duration_fut(async {
let storage_manager = self.api.core_context()?.storage_manager();
let transaction_handle = {
let mut inner = self.inner.lock();
inner
.opt_transaction_handle
.take()
.ok_or_else(|| VeilidAPIError::generic("transaction already completed"))?
};
tracing::Span::current().record("transaction_handle", transaction_handle.to_string());
veilid_log!(self debug
"DHTTransaction::commit(transaction_handle: {}", transaction_handle);
Box::pin(storage_manager.end_and_commit_transaction(transaction_handle)).await
})
.await
.inspect_err(log_veilid_api_error!(self))
}
#[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key(), transaction_handle), skip(self), ret))]
pub async fn rollback(self) -> VeilidAPIResult<()> {
record_duration_fut(async {
let storage_manager = self.api.core_context()?.storage_manager();
let transaction_handle = {
let mut inner = self.inner.lock();
inner
.opt_transaction_handle
.take()
.ok_or_else(|| VeilidAPIError::generic("transaction already completed"))?
};
tracing::Span::current().record("transaction_handle", transaction_handle.to_string());
veilid_log!(self debug
"DHTTransaction::rollback(transaction_handle: {}", transaction_handle);
Box::pin(storage_manager.rollback_transaction(transaction_handle)).await
})
.await
.inspect_err(log_veilid_api_error!(self))
}
#[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key(), transaction_handle, data = print_data(&data, Some(64))), skip(self, data), ret))]
pub async fn set(
&self,
record_key: RecordKey,
subkey: ValueSubkey,
data: Vec<u8>,
options: Option<DHTTransactionSetValueOptions>,
) -> VeilidAPIResult<Option<ValueData>> {
record_duration_fut(async {
let storage_manager = self.api.core_context()?.storage_manager();
let transaction_handle = {
let inner = self.inner.lock();
inner
.opt_transaction_handle
.clone()
.ok_or_else(|| VeilidAPIError::generic("transaction already completed"))?
};
tracing::Span::current().record("transaction_handle", transaction_handle.to_string());
veilid_log!(self debug
"DHTTransaction::set(transaction_handle: {}, key: {}, subkey: {}, data: len={}, options: {:?})", transaction_handle, record_key, subkey, data.len(), options);
storage_manager.check_record_key(&record_key)?;
Box::pin(storage_manager.transaction_set(
transaction_handle,
record_key,
subkey,
data,
options,
))
.await
}).await.inspect_err(log_veilid_api_error!(self))
}
#[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), skip(self), ret))]
pub async fn get(
&self,
record_key: RecordKey,
subkey: ValueSubkey,
) -> VeilidAPIResult<Option<ValueData>> {
record_duration_fut(async {
let storage_manager = self.api.core_context()?.storage_manager();
let transaction_handle = {
let inner = self.inner.lock();
inner
.opt_transaction_handle
.clone()
.ok_or_else(|| VeilidAPIError::generic("transaction already completed"))?
};
tracing::Span::current().record("transaction_handle", transaction_handle.to_string());
veilid_log!(self debug
"DHTTransaction::get(transaction_handle: {}, key: {}, subkey: {})", transaction_handle, record_key, subkey);
storage_manager.check_record_key(&record_key)?;
let storage_manager = self.api.core_context()?.storage_manager();
Box::pin(storage_manager.transaction_get(transaction_handle, record_key, subkey)).await
}).await.inspect_err(log_veilid_api_error!(self))
}
#[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key(), transaction_handle), skip(self), ret))]
pub async fn inspect(
&self,
record_key: RecordKey,
subkeys: Option<ValueSubkeyRangeSet>,
scope: DHTReportScope,
) -> VeilidAPIResult<DHTRecordReport> {
record_duration_fut(async {
let storage_manager = self.api.core_context()?.storage_manager();
let transaction_handle = {
let inner = self.inner.lock();
inner
.opt_transaction_handle
.clone()
.ok_or_else(|| VeilidAPIError::generic("transaction already completed"))?
};
tracing::Span::current().record("transaction_handle", transaction_handle.to_string());
veilid_log!(self debug
"DHTTransaction::inspect(transaction_handle: {}, record_key: {}, subkeys: {}, scope: {:?})", transaction_handle, record_key, subkeys.as_ref().map(|x| x.to_string()).unwrap_or_else(|| "None".to_string()), scope);
storage_manager.check_record_key(&record_key)?;
let storage_manager = self.api.core_context()?.storage_manager();
storage_manager.transaction_inspect(
transaction_handle,
record_key,
subkeys,
scope,
)
}).await.inspect_err(log_veilid_api_error!(self))
}
}
struct DHTTransactionInner {
registry: VeilidComponentRegistry,
opt_transaction_handle: Option<OutboundTransactionHandle>,
}
impl Drop for DHTTransactionInner {
fn drop(&mut self) {
if let Some(transaction_handle) = self.opt_transaction_handle.take() {
let registry = &self.registry;
veilid_log!(registry warn "Dropped DHT transaction without commit or rollback");
let storage_manager = registry.storage_manager();
storage_manager.drop_transaction_sync(transaction_handle);
}
}
}