1use super::*;
2use crate::storage_manager::OutboundTransactionHandle;
3
4impl_veilid_log_facility!("veilid_api");
5
6#[derive(Clone)]
17#[must_use]
18pub struct DHTTransaction {
19 api: VeilidAPI,
21 inner: Arc<Mutex<DHTTransactionInner>>,
23}
24
25impl fmt::Debug for DHTTransaction {
26 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27 f.debug_struct("DHTTransaction")
28 .field("handle", &self.inner.lock().opt_transaction_handle)
29 .finish()
30 }
31}
32
33impl DHTTransaction {
34 pub(super) fn new(api: VeilidAPI, handle: OutboundTransactionHandle) -> VeilidAPIResult<Self> {
37 let registry = api.core_context()?.registry();
38 Ok(Self {
39 api,
40 inner: Arc::new(Mutex::new(DHTTransactionInner {
41 registry,
42 opt_transaction_handle: Some(handle),
43 })),
44 })
45 }
46
47 pub fn api(&self) -> VeilidAPI {
49 self.api.clone()
50 }
51
52 #[must_use]
53 pub(crate) fn log_key(&self) -> &str {
54 self.api.log_key()
55 }
56
57 #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key(), transaction_handle), skip(self), ret))]
60 pub async fn commit(self) -> VeilidAPIResult<()> {
61 record_duration_fut(async {
62 let storage_manager = self.api.core_context()?.storage_manager();
63 let transaction_handle = {
64 let mut inner = self.inner.lock();
65 inner
66 .opt_transaction_handle
67 .take()
68 .ok_or_else(|| VeilidAPIError::generic("transaction already completed"))?
69 };
70
71 tracing::Span::current().record("transaction_handle", transaction_handle.to_string());
72
73 veilid_log!(self debug
74 "DHTTransaction::commit(transaction_handle: {}", transaction_handle);
75
76 Box::pin(storage_manager.end_and_commit_transaction(transaction_handle)).await
78 })
79 .await
80 .inspect_err(log_veilid_api_error!(self))
81 }
82
83 #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key(), transaction_handle), skip(self), ret))]
86 pub async fn rollback(self) -> VeilidAPIResult<()> {
87 record_duration_fut(async {
88 let storage_manager = self.api.core_context()?.storage_manager();
89 let transaction_handle = {
90 let mut inner = self.inner.lock();
91 inner
92 .opt_transaction_handle
93 .take()
94 .ok_or_else(|| VeilidAPIError::generic("transaction already completed"))?
95 };
96
97 tracing::Span::current().record("transaction_handle", transaction_handle.to_string());
98
99 veilid_log!(self debug
100 "DHTTransaction::rollback(transaction_handle: {}", transaction_handle);
101
102 Box::pin(storage_manager.rollback_transaction(transaction_handle)).await
103 })
104 .await
105 .inspect_err(log_veilid_api_error!(self))
106 }
107
108 #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key(), transaction_handle, data = print_data(&data, Some(64))), skip(self, data), ret))]
118 pub async fn set(
119 &self,
120 record_key: RecordKey,
121 subkey: ValueSubkey,
122 data: Vec<u8>,
123 options: Option<DHTTransactionSetValueOptions>,
124 ) -> VeilidAPIResult<Option<ValueData>> {
125 record_duration_fut(async {
126 let storage_manager = self.api.core_context()?.storage_manager();
127 let transaction_handle = {
128 let inner = self.inner.lock();
129 inner
130 .opt_transaction_handle
131 .clone()
132 .ok_or_else(|| VeilidAPIError::generic("transaction already completed"))?
133 };
134
135 tracing::Span::current().record("transaction_handle", transaction_handle.to_string());
136
137 veilid_log!(self debug
138 "DHTTransaction::set(transaction_handle: {}, key: {}, subkey: {}, data: len={}, options: {:?})", transaction_handle, record_key, subkey, data.len(), options);
139
140 storage_manager.check_record_key(&record_key)?;
141
142 Box::pin(storage_manager.transaction_set(
143 transaction_handle,
144 record_key,
145 subkey,
146 data,
147 options,
148 ))
149 .await
150 }).await.inspect_err(log_veilid_api_error!(self))
151 }
152
153 #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), skip(self), ret))]
162 pub async fn get(
163 &self,
164 record_key: RecordKey,
165 subkey: ValueSubkey,
166 ) -> VeilidAPIResult<Option<ValueData>> {
167 record_duration_fut(async {
168 let storage_manager = self.api.core_context()?.storage_manager();
169 let transaction_handle = {
170 let inner = self.inner.lock();
171 inner
172 .opt_transaction_handle
173 .clone()
174 .ok_or_else(|| VeilidAPIError::generic("transaction already completed"))?
175 };
176
177 tracing::Span::current().record("transaction_handle", transaction_handle.to_string());
178
179 veilid_log!(self debug
180 "DHTTransaction::get(transaction_handle: {}, key: {}, subkey: {})", transaction_handle, record_key, subkey);
181
182 storage_manager.check_record_key(&record_key)?;
183
184 let storage_manager = self.api.core_context()?.storage_manager();
185 Box::pin(storage_manager.transaction_get(transaction_handle, record_key, subkey)).await
186 }).await.inspect_err(log_veilid_api_error!(self))
187 }
188
189 #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key(), transaction_handle), skip(self), ret))]
197 pub async fn inspect(
198 &self,
199 record_key: RecordKey,
200 subkeys: Option<ValueSubkeyRangeSet>,
201 scope: DHTReportScope,
202 ) -> VeilidAPIResult<DHTRecordReport> {
203 record_duration_fut(async {
204 let storage_manager = self.api.core_context()?.storage_manager();
205 let transaction_handle = {
206 let inner = self.inner.lock();
207 inner
208 .opt_transaction_handle
209 .clone()
210 .ok_or_else(|| VeilidAPIError::generic("transaction already completed"))?
211 };
212
213 tracing::Span::current().record("transaction_handle", transaction_handle.to_string());
214
215 veilid_log!(self debug
216 "DHTTransaction::inspect(transaction_handle: {}, record_key: {}, subkeys: {}, scope: {:?})", transaction_handle, record_key, subkeys.as_ref().map(|x| x.to_string()).unwrap_or_else(|| "None".to_string()), scope);
217
218 storage_manager.check_record_key(&record_key)?;
219
220 let storage_manager = self.api.core_context()?.storage_manager();
221 storage_manager.transaction_inspect(
222 transaction_handle,
223 record_key,
224 subkeys,
225 scope,
226 )
227 }).await.inspect_err(log_veilid_api_error!(self))
228 }
229}
230struct DHTTransactionInner {
233 registry: VeilidComponentRegistry,
234 opt_transaction_handle: Option<OutboundTransactionHandle>,
235}
236
237impl Drop for DHTTransactionInner {
238 fn drop(&mut self) {
239 if let Some(transaction_handle) = self.opt_transaction_handle.take() {
240 let registry = &self.registry;
241 veilid_log!(registry warn "Dropped DHT transaction without commit or rollback");
242
243 let storage_manager = registry.storage_manager();
244 storage_manager.drop_transaction_sync(transaction_handle);
245 }
246 }
247}