Skip to main content

veilid_core/veilid_api/
dht_transaction.rs

1use super::*;
2use crate::storage_manager::OutboundTransactionHandle;
3
4impl_veilid_log_facility!("veilid_api");
5
6///////////////////////////////////////////////////////////////////////////////////////
7
8/// DHT Transactions the way you perform multiple simulateous atomic operations over a set of DHT records.
9///
10/// DHT operations performed out of a transaction may be processed in any order, and only operate on one subkey at a time
11/// for a given record. Transactions allow you to bind a set of operations so they all succeed, or fail together, and at the same time.
12///
13/// Transactional DHT operations can only be performed when the node is online, and will error with [VeilidAPIError::TryAgain] if offline.
14///
15/// Transactions must be committed when all of their operations are registered, or rolled back if the group of operations is to be cancelled.
16#[derive(Clone)]
17#[must_use]
18pub struct DHTTransaction {
19    /// API in use
20    api: VeilidAPI,
21    /// Inner transaction
22    inner: Arc<Mutex<DHTTransactionInner>>,
23}
24
25impl fmt::Debug for DHTTransaction {
26    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27        f.debug_struct("DHTTransaction")
28            .field("handle", &self.inner.lock().opt_transaction_handle)
29            .finish()
30    }
31}
32
33impl DHTTransaction {
34    ////////////////////////////////////////////////////////////////
35
36    pub(super) fn new(api: VeilidAPI, handle: OutboundTransactionHandle) -> VeilidAPIResult<Self> {
37        let registry = api.core_context()?.registry();
38        Ok(Self {
39            api,
40            inner: Arc::new(Mutex::new(DHTTransactionInner {
41                registry,
42                opt_transaction_handle: Some(handle),
43            })),
44        })
45    }
46
47    /// Get the [VeilidAPI] object that created this [DHTTransaction].
48    pub fn api(&self) -> VeilidAPI {
49        self.api.clone()
50    }
51
52    #[must_use]
53    pub(crate) fn log_key(&self) -> &str {
54        self.api.log_key()
55    }
56
57    /// Commit the transaction
58    /// All write operations are performed atomically
59    #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key(), transaction_handle), skip(self), ret))]
60    pub async fn commit(self) -> VeilidAPIResult<()> {
61        record_duration_fut(async {
62            let storage_manager = self.api.core_context()?.storage_manager();
63            let transaction_handle = {
64                let mut inner = self.inner.lock();
65                inner
66                    .opt_transaction_handle
67                    .take()
68                    .ok_or_else(|| VeilidAPIError::generic("transaction already completed"))?
69            };
70
71            tracing::Span::current().record("transaction_handle", transaction_handle.to_string());
72
73            veilid_log!(self debug
74                "DHTTransaction::commit(transaction_handle: {}", transaction_handle);
75
76            // End and commit transaction
77            Box::pin(storage_manager.end_and_commit_transaction(transaction_handle)).await
78        })
79        .await
80        .inspect_err(log_veilid_api_error!(self))
81    }
82
83    /// Rollback the transaction
84    /// No write operations are performed,
85    #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key(), transaction_handle), skip(self), ret))]
86    pub async fn rollback(self) -> VeilidAPIResult<()> {
87        record_duration_fut(async {
88            let storage_manager = self.api.core_context()?.storage_manager();
89            let transaction_handle = {
90                let mut inner = self.inner.lock();
91                inner
92                    .opt_transaction_handle
93                    .take()
94                    .ok_or_else(|| VeilidAPIError::generic("transaction already completed"))?
95            };
96
97            tracing::Span::current().record("transaction_handle", transaction_handle.to_string());
98
99            veilid_log!(self debug
100                "DHTTransaction::rollback(transaction_handle: {}", transaction_handle);
101
102            Box::pin(storage_manager.rollback_transaction(transaction_handle)).await
103        })
104        .await
105        .inspect_err(log_veilid_api_error!(self))
106    }
107
108    /// Add a set_dht_value operation to the transaction
109    ///
110    /// * Will fail if performed offline
111    /// * Will fail if existing offline writes exist for this record key
112    ///
113    /// The writer, if specified, will override the 'default_writer' specified when the record is opened.
114    ///
115    /// Returns `None` if the value was successfully set.
116    /// Returns `Some(data)` if the value set was older than the one available on the network.
117    #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key(), transaction_handle, data = print_data(&data, Some(64))), skip(self, data), ret))]
118    pub async fn set(
119        &self,
120        record_key: RecordKey,
121        subkey: ValueSubkey,
122        data: Vec<u8>,
123        options: Option<DHTTransactionSetValueOptions>,
124    ) -> VeilidAPIResult<Option<ValueData>> {
125        record_duration_fut(async {
126            let storage_manager = self.api.core_context()?.storage_manager();
127            let transaction_handle = {
128                let inner = self.inner.lock();
129                inner
130                    .opt_transaction_handle
131                    .clone()
132                    .ok_or_else(|| VeilidAPIError::generic("transaction already completed"))?
133            };
134
135            tracing::Span::current().record("transaction_handle", transaction_handle.to_string());
136
137            veilid_log!(self debug
138                "DHTTransaction::set(transaction_handle: {}, key: {}, subkey: {}, data: len={}, options: {:?})", transaction_handle, record_key, subkey, data.len(), options);
139
140            storage_manager.check_record_key(&record_key)?;
141
142            Box::pin(storage_manager.transaction_set(
143                transaction_handle,
144                record_key,
145                subkey,
146                data,
147                options,
148            ))
149            .await
150        }).await.inspect_err(log_veilid_api_error!(self))
151    }
152
153    /// Perform a get_dht_value operation inside the transaction
154    ///
155    /// * Will fail if performed offline
156    /// * Will pull the latest value from the network, will fail if the local value is newer
157    /// * Will fail if existing offline writes exist for this record key
158    ///
159    /// Returns `None` if the value subkey has not yet been set.
160    /// Returns `Some(data)` if the value subkey has valid data.
161    #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key()), skip(self), ret))]
162    pub async fn get(
163        &self,
164        record_key: RecordKey,
165        subkey: ValueSubkey,
166    ) -> VeilidAPIResult<Option<ValueData>> {
167        record_duration_fut(async {
168            let storage_manager = self.api.core_context()?.storage_manager();
169            let transaction_handle = {
170                let inner = self.inner.lock();
171                inner
172                    .opt_transaction_handle
173                    .clone()
174                    .ok_or_else(|| VeilidAPIError::generic("transaction already completed"))?
175            };
176
177            tracing::Span::current().record("transaction_handle", transaction_handle.to_string());
178
179            veilid_log!(self debug
180                "DHTTransaction::get(transaction_handle: {}, key: {}, subkey: {})", transaction_handle, record_key, subkey);
181
182            storage_manager.check_record_key(&record_key)?;
183
184            let storage_manager = self.api.core_context()?.storage_manager();
185            Box::pin(storage_manager.transaction_get(transaction_handle, record_key, subkey)).await
186        }).await.inspect_err(log_veilid_api_error!(self))
187    }
188
189    /// Perform a inspect_dht_record operation inside the transaction
190    ///
191    /// * Does not perform any network activity, as the transaction state keeps all of the required information after the begin
192    ///
193    /// For information on arguments, see [RoutingContext::inspect_dht_record]
194    ///
195    /// Returns a DHTRecordReport with the subkey ranges that were returned that overlapped the schema, and sequence numbers for each of the subkeys in the range.
196    #[cfg_attr(feature = "instrument", instrument(target = "veilid_api", level = "debug", fields(duration, __VEILID_LOG_KEY = self.log_key(), transaction_handle), skip(self), ret))]
197    pub async fn inspect(
198        &self,
199        record_key: RecordKey,
200        subkeys: Option<ValueSubkeyRangeSet>,
201        scope: DHTReportScope,
202    ) -> VeilidAPIResult<DHTRecordReport> {
203        record_duration_fut(async {
204            let storage_manager = self.api.core_context()?.storage_manager();
205            let transaction_handle = {
206                let inner = self.inner.lock();
207                inner
208                    .opt_transaction_handle
209                    .clone()
210                    .ok_or_else(|| VeilidAPIError::generic("transaction already completed"))?
211            };
212
213            tracing::Span::current().record("transaction_handle", transaction_handle.to_string());
214
215            veilid_log!(self debug
216                "DHTTransaction::inspect(transaction_handle: {}, record_key: {}, subkeys: {}, scope: {:?})", transaction_handle, record_key, subkeys.as_ref().map(|x| x.to_string()).unwrap_or_else(|| "None".to_string()), scope);
217
218            storage_manager.check_record_key(&record_key)?;
219
220            let storage_manager = self.api.core_context()?.storage_manager();
221            storage_manager.transaction_inspect(
222                transaction_handle,
223                record_key,
224                subkeys,
225                scope,
226            )
227        }).await.inspect_err(log_veilid_api_error!(self))
228    }
229}
230//////////////////////////////////////////////////////////////////////////////////////
231
232struct DHTTransactionInner {
233    registry: VeilidComponentRegistry,
234    opt_transaction_handle: Option<OutboundTransactionHandle>,
235}
236
237impl Drop for DHTTransactionInner {
238    fn drop(&mut self) {
239        if let Some(transaction_handle) = self.opt_transaction_handle.take() {
240            let registry = &self.registry;
241            veilid_log!(registry warn "Dropped DHT transaction without commit or rollback");
242
243            let storage_manager = registry.storage_manager();
244            storage_manager.drop_transaction_sync(transaction_handle);
245        }
246    }
247}