Skip to main content

dusk_node/database/rocksdb/
tx_events.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4//
5// Copyright (c) DUSK NETWORK. All rights reserved.
6
7use super::*;
8/// Implementation of the `Candidate` trait for `DBTransaction<'db, DB>`.
9impl<DB: DBAccess> ConsensusStorage for DBTransaction<'_, DB> {
10    /// Stores a candidate block in the database.
11    ///
12    /// # Arguments
13    ///
14    /// * `b` - The block to store.
15    ///
16    /// # Returns
17    ///
18    /// Returns `Ok(())` if the block is successfully stored, or an error if the
19    /// operation fails.
20    fn store_candidate(&mut self, b: Block) -> Result<()> {
21        let mut serialized = vec![];
22        b.write(&mut serialized)?;
23
24        self.inner
25            .put_cf(self.candidates_cf, b.header().hash, serialized)?;
26
27        let key = serialize_key(b.header().height, b.header().hash)?;
28        self.inner
29            .put_cf(self.candidates_height_cf, key, b.header().hash)?;
30
31        Ok(())
32    }
33
34    /// Fetches a candidate block from the database.
35    ///
36    /// # Arguments
37    ///
38    /// * `hash` - The hash of the block to fetch.
39    ///
40    /// # Returns
41    ///
42    /// Returns `Ok(Some(block))` if the block is found, `Ok(None)` if the block
43    /// is not found, or an error if the operation fails.
44    fn candidate(&self, hash: &[u8]) -> Result<Option<Block>> {
45        if let Some(blob) = self.inner.get_cf(self.candidates_cf, hash)? {
46            let b = Block::read(&mut &blob[..])?;
47            return Ok(Some(b));
48        }
49
50        // Block not found
51        Ok(None)
52    }
53
54    fn candidate_by_iteration(
55        &self,
56        consensus_header: &ConsensusHeader,
57    ) -> Result<Option<Block>> {
58        let iter = self
59            .inner
60            .iterator_cf(self.candidates_cf, IteratorMode::Start);
61
62        for (_, blob) in iter.map(Result::unwrap) {
63            let b = Block::read(&mut &blob[..])?;
64
65            let header = b.header();
66            if header.prev_block_hash == consensus_header.prev_block_hash
67                && header.iteration == consensus_header.iteration
68            {
69                return Ok(Some(b));
70            }
71        }
72
73        Ok(None)
74    }
75
76    /// Deletes candidate-related items from the database based on a closure.
77    ///
78    /// # Arguments
79    ///
80    /// * `closure` - If the closure returns `true`, the block will be deleted.
81    ///
82    /// # Returns
83    ///
84    /// Returns `Ok(())` if the deletion is successful, or an error if the
85    /// operation fails.
86    fn delete_candidate<F>(&mut self, closure: F) -> Result<()>
87    where
88        F: FnOnce(u64) -> bool + std::marker::Copy,
89    {
90        let iter = self
91            .inner
92            .iterator_cf(self.candidates_height_cf, IteratorMode::Start);
93
94        for (key, hash) in iter.map(Result::unwrap) {
95            let (height, _) = deserialize_key(&mut &key.to_vec()[..])?;
96            if closure(height) {
97                self.inner.delete_cf(self.candidates_cf, hash)?;
98                self.inner.delete_cf(self.candidates_height_cf, key)?;
99            }
100        }
101
102        Ok(())
103    }
104
105    fn count_candidates(&self) -> usize {
106        let iter = self
107            .inner
108            .iterator_cf(self.candidates_height_cf, IteratorMode::Start);
109
110        iter.count()
111    }
112
113    /// Deletes all items from the `CF_CANDIDATES` column family.
114    ///
115    /// # Returns
116    ///
117    /// Returns `Ok(())` if the deletion is successful, or an error if the
118    /// operation fails.
119    fn clear_candidates(&mut self) -> Result<()> {
120        self.delete_candidate(|_| true)
121    }
122
123    /// Stores a ValidationResult in the database.
124    ///
125    /// # Arguments
126    ///
127    /// * `vr` - The ValidationResult to store.
128    ///
129    /// # Returns
130    ///
131    /// Returns `Ok(())` if the ValidationResult is successfully stored, or an
132    /// error if the operation fails.
133    fn store_validation_result(
134        &mut self,
135        consensus_header: &ConsensusHeader,
136        validation_result: &payload::ValidationResult,
137    ) -> Result<()> {
138        let mut serialized = vec![];
139        validation_result.write(&mut serialized)?;
140
141        let key = serialize_iter_key(consensus_header)?;
142        self.inner
143            .put_cf(self.validation_results_cf, key, serialized)?;
144
145        Ok(())
146    }
147
148    /// Fetches a ValidationResult from the database.
149    ///
150    /// # Arguments
151    ///
152    /// * `consensus_header` - The ConsensusHeader of the ValidationResult.
153    ///
154    /// # Returns
155    ///
156    /// Returns `Ok(Some(ValidationResult))` if the ValidationResult is found,
157    /// `Ok(None)` if the ValidationResult is not found, or an error if the
158    /// operation fails.
159    fn validation_result(
160        &self,
161        consensus_header: &ConsensusHeader,
162    ) -> Result<Option<payload::ValidationResult>> {
163        let key = serialize_iter_key(consensus_header)?;
164        if let Some(blob) =
165            self.inner.get_cf(self.validation_results_cf, key)?
166        {
167            let validation_result =
168                payload::ValidationResult::read(&mut &blob[..])?;
169            return Ok(Some(validation_result));
170        }
171
172        // ValidationResult not found
173        Ok(None)
174    }
175
176    /// Deletes ValidationResult items from the database based on a closure.
177    ///
178    /// # Arguments
179    ///
180    /// * `closure` - If the closure returns `true`, the ValidationResult will
181    ///   be deleted.
182    ///
183    /// # Returns
184    ///
185    /// Returns `Ok(())` if the deletion is successful, or an error if the
186    /// operation fails.
187    fn delete_validation_results<F>(&mut self, closure: F) -> Result<()>
188    where
189        F: FnOnce([u8; 32]) -> bool + std::marker::Copy,
190    {
191        let iter = self
192            .inner
193            .iterator_cf(self.validation_results_cf, IteratorMode::Start);
194
195        for (key, _) in iter.map(Result::unwrap) {
196            let (prev_block_hash, _) =
197                deserialize_iter_key(&mut &key.to_vec()[..])?;
198            if closure(prev_block_hash) {
199                self.inner.delete_cf(self.validation_results_cf, key)?;
200            }
201        }
202
203        Ok(())
204    }
205
206    fn count_validation_results(&self) -> usize {
207        let iter = self
208            .inner
209            .iterator_cf(self.validation_results_cf, IteratorMode::Start);
210
211        iter.count()
212    }
213
214    /// Deletes all items from the `CF_VALIDATION_RESULTS` column family.
215    ///
216    /// # Returns
217    ///
218    /// Returns `Ok(())` if the deletion is successful, or an error if the
219    /// operation fails.
220    fn clear_validation_results(&mut self) -> Result<()> {
221        self.delete_validation_results(|_| true)
222    }
223}
224
225impl<DB: DBAccess> Mempool for DBTransaction<'_, DB> {
226    fn store_mempool_tx(
227        &mut self,
228        tx: &LedgerTransaction,
229        timestamp: u64,
230    ) -> Result<()> {
231        // Map Hash to serialized transaction
232        let mut tx_data = vec![];
233        tx.write(&mut tx_data)?;
234
235        let hash = tx.id();
236        self.put_cf(self.mempool_cf, hash, tx_data)?;
237
238        // Add Secondary indexes //
239        // Spending Ids
240        for n in tx.to_spend_ids() {
241            let key = n.to_bytes();
242            self.put_cf(self.spending_id_cf, key, hash)?;
243        }
244
245        let timestamp = timestamp.to_be_bytes();
246
247        // Map Fee_Hash to Timestamp
248        // Key pair is used to facilitate sort-by-fee
249        // Also, the timestamp is used to remove expired transactions
250        self.put_cf(
251            self.fees_cf,
252            serialize_key(tx.gas_price(), hash)?,
253            timestamp,
254        )?;
255
256        Ok(())
257    }
258
259    fn mempool_tx(&self, hash: [u8; 32]) -> Result<Option<LedgerTransaction>> {
260        let data = self.inner.get_cf(self.mempool_cf, hash)?;
261
262        match data {
263            // None has a meaning key not found
264            None => Ok(None),
265            Some(blob) => {
266                Ok(Some(LedgerTransaction::read(&mut &blob.to_vec()[..])?))
267            }
268        }
269    }
270
271    fn mempool_tx_exists(&self, h: [u8; 32]) -> Result<bool> {
272        Ok(self.inner.get_cf(self.mempool_cf, h)?.is_some())
273    }
274
275    fn delete_mempool_tx(
276        &mut self,
277        h: [u8; 32],
278        cascade: bool,
279    ) -> Result<Vec<[u8; 32]>> {
280        let mut deleted = vec![];
281        let tx = self.mempool_tx(h)?;
282        if let Some(tx) = tx {
283            let hash = tx.id();
284
285            self.inner.delete_cf(self.mempool_cf, hash)?;
286
287            // Delete Secondary indexes
288            // Delete spendingids (nullifiers or nonce)
289            for n in tx.to_spend_ids() {
290                let key = n.to_bytes();
291                self.inner.delete_cf(self.spending_id_cf, key)?;
292            }
293
294            // Delete Fee_Hash
295            self.inner.delete_cf(
296                self.fees_cf,
297                serialize_key(tx.gas_price(), hash)?,
298            )?;
299
300            deleted.push(h);
301
302            if cascade {
303                let mut dependants = vec![];
304                // Get the next spending id (aka next nonce tx)
305                // retrieve tx_id and delete it
306                let mut next_spending_id = tx.next_spending_id();
307                while let Some(spending_id) = next_spending_id {
308                    next_spending_id = spending_id.next();
309                    let next_txs =
310                        self.mempool_txs_by_spendable_ids(&[spending_id]);
311                    if next_txs.is_empty() {
312                        break;
313                    }
314                    dependants.extend(next_txs);
315                }
316
317                // delete all dependants
318                for tx_id in dependants {
319                    let cascade_deleted =
320                        self.delete_mempool_tx(tx_id, false)?;
321                    deleted.extend(cascade_deleted);
322                }
323            }
324        }
325
326        Ok(deleted)
327    }
328
329    fn mempool_txs_by_spendable_ids(
330        &self,
331        n: &[SpendingId],
332    ) -> HashSet<[u8; 32]> {
333        n.iter()
334            .filter_map(|n| {
335                match self.inner.get_cf(self.spending_id_cf, n.to_bytes()) {
336                    Ok(Some(tx_id)) => tx_id.try_into().ok(),
337                    _ => None,
338                }
339            })
340            .collect()
341    }
342
343    fn mempool_txs_sorted_by_fee(
344        &self,
345    ) -> Box<dyn Iterator<Item = LedgerTransaction> + '_> {
346        let iter = MemPoolIterator::new(&self.inner, self.fees_cf, self);
347
348        Box::new(iter)
349    }
350
351    fn mempool_txs_ids_sorted_by_fee(
352        &self,
353    ) -> Box<dyn Iterator<Item = (u64, [u8; 32])> + '_> {
354        let iter = MemPoolFeeIterator::new(&self.inner, self.fees_cf, true);
355
356        Box::new(iter)
357    }
358
359    fn mempool_txs_ids_sorted_by_low_fee(
360        &self,
361    ) -> Box<dyn Iterator<Item = (u64, [u8; 32])> + '_> {
362        let iter = MemPoolFeeIterator::new(&self.inner, self.fees_cf, false);
363
364        Box::new(iter)
365    }
366
367    /// Get all expired transactions hashes.
368    fn mempool_expired_txs(&self, timestamp: u64) -> Result<Vec<[u8; 32]>> {
369        let mut iter = self.inner.raw_iterator_cf(self.fees_cf);
370        iter.seek_to_first();
371        let mut txs_list = vec![];
372
373        while iter.valid() {
374            if let Some(key) = iter.key() {
375                let (_, tx_id) = deserialize_key(&mut &key.to_vec()[..])?;
376
377                let tx_timestamp = u64::from_be_bytes(
378                    iter.value()
379                        .ok_or(error::RocksDbError::MissingIteratorValue)?
380                        .try_into()
381                        .map_err(|_| {
382                            error::RocksDbError::InvalidTimestampData
383                        })?,
384                );
385
386                if tx_timestamp <= timestamp {
387                    txs_list.push(tx_id);
388                }
389            }
390
391            iter.next();
392        }
393
394        Ok(txs_list)
395    }
396
397    fn mempool_txs_ids(&self) -> Result<Vec<[u8; 32]>> {
398        let mut iter = self.inner.raw_iterator_cf(self.fees_cf);
399        iter.seek_to_last();
400
401        let mut txs_list = vec![];
402
403        // Iterate all keys from the end in reverse lexicographic order
404        while iter.valid() {
405            if let Some(key) = iter.key() {
406                let (_, tx_id) = deserialize_key(&mut &key.to_vec()[..])?;
407
408                txs_list.push(tx_id);
409            }
410
411            iter.prev();
412        }
413
414        Ok(txs_list)
415    }
416
417    fn mempool_txs_count(&self) -> usize {
418        self.inner
419            .iterator_cf(self.mempool_cf, IteratorMode::Start)
420            .count()
421    }
422}
423
424pub struct MemPoolIterator<'db, DB: DBAccess, M: Mempool> {
425    iter: MemPoolFeeIterator<'db, DB>,
426    mempool: &'db M,
427}
428
429impl<'db, DB: DBAccess, M: Mempool> MemPoolIterator<'db, DB, M> {
430    fn new(
431        db: &'db rocksdb::Transaction<DB>,
432        fees_cf: &ColumnFamily,
433        mempool: &'db M,
434    ) -> Self {
435        let iter = MemPoolFeeIterator::new(db, fees_cf, true);
436        MemPoolIterator { iter, mempool }
437    }
438}
439
440impl<DB: DBAccess, M: Mempool> Iterator for MemPoolIterator<'_, DB, M> {
441    type Item = LedgerTransaction;
442    fn next(&mut self) -> Option<Self::Item> {
443        self.iter.next().and_then(|(_, tx_id)| {
444            self.mempool.mempool_tx(tx_id).ok().flatten()
445        })
446    }
447}
448
449pub struct MemPoolFeeIterator<'db, DB: DBAccess> {
450    iter: DBRawIteratorWithThreadMode<'db, rocksdb::Transaction<'db, DB>>,
451    fee_desc: bool,
452}
453
454impl<'db, DB: DBAccess> MemPoolFeeIterator<'db, DB> {
455    fn new(
456        db: &'db rocksdb::Transaction<DB>,
457        fees_cf: &ColumnFamily,
458        fee_desc: bool,
459    ) -> Self {
460        let mut iter = db.raw_iterator_cf(fees_cf);
461        if fee_desc {
462            iter.seek_to_last();
463        } else {
464            iter.seek_to_first();
465        };
466        MemPoolFeeIterator { iter, fee_desc }
467    }
468}
469
470impl<DB: DBAccess> Iterator for MemPoolFeeIterator<'_, DB> {
471    type Item = (u64, [u8; 32]);
472    fn next(&mut self) -> Option<Self::Item> {
473        match self.iter.valid() {
474            true => {
475                if let Some(key) = self.iter.key() {
476                    let (gas_price, hash) =
477                        deserialize_key(&mut &key.to_vec()[..]).ok()?;
478                    if self.fee_desc {
479                        self.iter.prev();
480                    } else {
481                        self.iter.next();
482                    }
483                    Some((gas_price, hash))
484                } else {
485                    None
486                }
487            }
488            false => None,
489        }
490    }
491}