dusk_node/database/rocksdb/
tx_events.rs1use super::*;
8impl<DB: DBAccess> ConsensusStorage for DBTransaction<'_, DB> {
10 fn store_candidate(&mut self, b: Block) -> Result<()> {
21 let mut serialized = vec![];
22 b.write(&mut serialized)?;
23
24 self.inner
25 .put_cf(self.candidates_cf, b.header().hash, serialized)?;
26
27 let key = serialize_key(b.header().height, b.header().hash)?;
28 self.inner
29 .put_cf(self.candidates_height_cf, key, b.header().hash)?;
30
31 Ok(())
32 }
33
34 fn candidate(&self, hash: &[u8]) -> Result<Option<Block>> {
45 if let Some(blob) = self.inner.get_cf(self.candidates_cf, hash)? {
46 let b = Block::read(&mut &blob[..])?;
47 return Ok(Some(b));
48 }
49
50 Ok(None)
52 }
53
54 fn candidate_by_iteration(
55 &self,
56 consensus_header: &ConsensusHeader,
57 ) -> Result<Option<Block>> {
58 let iter = self
59 .inner
60 .iterator_cf(self.candidates_cf, IteratorMode::Start);
61
62 for (_, blob) in iter.map(Result::unwrap) {
63 let b = Block::read(&mut &blob[..])?;
64
65 let header = b.header();
66 if header.prev_block_hash == consensus_header.prev_block_hash
67 && header.iteration == consensus_header.iteration
68 {
69 return Ok(Some(b));
70 }
71 }
72
73 Ok(None)
74 }
75
76 fn delete_candidate<F>(&mut self, closure: F) -> Result<()>
87 where
88 F: FnOnce(u64) -> bool + std::marker::Copy,
89 {
90 let iter = self
91 .inner
92 .iterator_cf(self.candidates_height_cf, IteratorMode::Start);
93
94 for (key, hash) in iter.map(Result::unwrap) {
95 let (height, _) = deserialize_key(&mut &key.to_vec()[..])?;
96 if closure(height) {
97 self.inner.delete_cf(self.candidates_cf, hash)?;
98 self.inner.delete_cf(self.candidates_height_cf, key)?;
99 }
100 }
101
102 Ok(())
103 }
104
105 fn count_candidates(&self) -> usize {
106 let iter = self
107 .inner
108 .iterator_cf(self.candidates_height_cf, IteratorMode::Start);
109
110 iter.count()
111 }
112
113 fn clear_candidates(&mut self) -> Result<()> {
120 self.delete_candidate(|_| true)
121 }
122
123 fn store_validation_result(
134 &mut self,
135 consensus_header: &ConsensusHeader,
136 validation_result: &payload::ValidationResult,
137 ) -> Result<()> {
138 let mut serialized = vec![];
139 validation_result.write(&mut serialized)?;
140
141 let key = serialize_iter_key(consensus_header)?;
142 self.inner
143 .put_cf(self.validation_results_cf, key, serialized)?;
144
145 Ok(())
146 }
147
148 fn validation_result(
160 &self,
161 consensus_header: &ConsensusHeader,
162 ) -> Result<Option<payload::ValidationResult>> {
163 let key = serialize_iter_key(consensus_header)?;
164 if let Some(blob) =
165 self.inner.get_cf(self.validation_results_cf, key)?
166 {
167 let validation_result =
168 payload::ValidationResult::read(&mut &blob[..])?;
169 return Ok(Some(validation_result));
170 }
171
172 Ok(None)
174 }
175
176 fn delete_validation_results<F>(&mut self, closure: F) -> Result<()>
188 where
189 F: FnOnce([u8; 32]) -> bool + std::marker::Copy,
190 {
191 let iter = self
192 .inner
193 .iterator_cf(self.validation_results_cf, IteratorMode::Start);
194
195 for (key, _) in iter.map(Result::unwrap) {
196 let (prev_block_hash, _) =
197 deserialize_iter_key(&mut &key.to_vec()[..])?;
198 if closure(prev_block_hash) {
199 self.inner.delete_cf(self.validation_results_cf, key)?;
200 }
201 }
202
203 Ok(())
204 }
205
206 fn count_validation_results(&self) -> usize {
207 let iter = self
208 .inner
209 .iterator_cf(self.validation_results_cf, IteratorMode::Start);
210
211 iter.count()
212 }
213
214 fn clear_validation_results(&mut self) -> Result<()> {
221 self.delete_validation_results(|_| true)
222 }
223}
224
225impl<DB: DBAccess> Mempool for DBTransaction<'_, DB> {
226 fn store_mempool_tx(
227 &mut self,
228 tx: &LedgerTransaction,
229 timestamp: u64,
230 ) -> Result<()> {
231 let mut tx_data = vec![];
233 tx.write(&mut tx_data)?;
234
235 let hash = tx.id();
236 self.put_cf(self.mempool_cf, hash, tx_data)?;
237
238 for n in tx.to_spend_ids() {
241 let key = n.to_bytes();
242 self.put_cf(self.spending_id_cf, key, hash)?;
243 }
244
245 let timestamp = timestamp.to_be_bytes();
246
247 self.put_cf(
251 self.fees_cf,
252 serialize_key(tx.gas_price(), hash)?,
253 timestamp,
254 )?;
255
256 Ok(())
257 }
258
259 fn mempool_tx(&self, hash: [u8; 32]) -> Result<Option<LedgerTransaction>> {
260 let data = self.inner.get_cf(self.mempool_cf, hash)?;
261
262 match data {
263 None => Ok(None),
265 Some(blob) => {
266 Ok(Some(LedgerTransaction::read(&mut &blob.to_vec()[..])?))
267 }
268 }
269 }
270
271 fn mempool_tx_exists(&self, h: [u8; 32]) -> Result<bool> {
272 Ok(self.inner.get_cf(self.mempool_cf, h)?.is_some())
273 }
274
275 fn delete_mempool_tx(
276 &mut self,
277 h: [u8; 32],
278 cascade: bool,
279 ) -> Result<Vec<[u8; 32]>> {
280 let mut deleted = vec![];
281 let tx = self.mempool_tx(h)?;
282 if let Some(tx) = tx {
283 let hash = tx.id();
284
285 self.inner.delete_cf(self.mempool_cf, hash)?;
286
287 for n in tx.to_spend_ids() {
290 let key = n.to_bytes();
291 self.inner.delete_cf(self.spending_id_cf, key)?;
292 }
293
294 self.inner.delete_cf(
296 self.fees_cf,
297 serialize_key(tx.gas_price(), hash)?,
298 )?;
299
300 deleted.push(h);
301
302 if cascade {
303 let mut dependants = vec![];
304 let mut next_spending_id = tx.next_spending_id();
307 while let Some(spending_id) = next_spending_id {
308 next_spending_id = spending_id.next();
309 let next_txs =
310 self.mempool_txs_by_spendable_ids(&[spending_id]);
311 if next_txs.is_empty() {
312 break;
313 }
314 dependants.extend(next_txs);
315 }
316
317 for tx_id in dependants {
319 let cascade_deleted =
320 self.delete_mempool_tx(tx_id, false)?;
321 deleted.extend(cascade_deleted);
322 }
323 }
324 }
325
326 Ok(deleted)
327 }
328
329 fn mempool_txs_by_spendable_ids(
330 &self,
331 n: &[SpendingId],
332 ) -> HashSet<[u8; 32]> {
333 n.iter()
334 .filter_map(|n| {
335 match self.inner.get_cf(self.spending_id_cf, n.to_bytes()) {
336 Ok(Some(tx_id)) => tx_id.try_into().ok(),
337 _ => None,
338 }
339 })
340 .collect()
341 }
342
343 fn mempool_txs_sorted_by_fee(
344 &self,
345 ) -> Box<dyn Iterator<Item = LedgerTransaction> + '_> {
346 let iter = MemPoolIterator::new(&self.inner, self.fees_cf, self);
347
348 Box::new(iter)
349 }
350
351 fn mempool_txs_ids_sorted_by_fee(
352 &self,
353 ) -> Box<dyn Iterator<Item = (u64, [u8; 32])> + '_> {
354 let iter = MemPoolFeeIterator::new(&self.inner, self.fees_cf, true);
355
356 Box::new(iter)
357 }
358
359 fn mempool_txs_ids_sorted_by_low_fee(
360 &self,
361 ) -> Box<dyn Iterator<Item = (u64, [u8; 32])> + '_> {
362 let iter = MemPoolFeeIterator::new(&self.inner, self.fees_cf, false);
363
364 Box::new(iter)
365 }
366
367 fn mempool_expired_txs(&self, timestamp: u64) -> Result<Vec<[u8; 32]>> {
369 let mut iter = self.inner.raw_iterator_cf(self.fees_cf);
370 iter.seek_to_first();
371 let mut txs_list = vec![];
372
373 while iter.valid() {
374 if let Some(key) = iter.key() {
375 let (_, tx_id) = deserialize_key(&mut &key.to_vec()[..])?;
376
377 let tx_timestamp = u64::from_be_bytes(
378 iter.value()
379 .ok_or(error::RocksDbError::MissingIteratorValue)?
380 .try_into()
381 .map_err(|_| {
382 error::RocksDbError::InvalidTimestampData
383 })?,
384 );
385
386 if tx_timestamp <= timestamp {
387 txs_list.push(tx_id);
388 }
389 }
390
391 iter.next();
392 }
393
394 Ok(txs_list)
395 }
396
397 fn mempool_txs_ids(&self) -> Result<Vec<[u8; 32]>> {
398 let mut iter = self.inner.raw_iterator_cf(self.fees_cf);
399 iter.seek_to_last();
400
401 let mut txs_list = vec![];
402
403 while iter.valid() {
405 if let Some(key) = iter.key() {
406 let (_, tx_id) = deserialize_key(&mut &key.to_vec()[..])?;
407
408 txs_list.push(tx_id);
409 }
410
411 iter.prev();
412 }
413
414 Ok(txs_list)
415 }
416
417 fn mempool_txs_count(&self) -> usize {
418 self.inner
419 .iterator_cf(self.mempool_cf, IteratorMode::Start)
420 .count()
421 }
422}
423
424pub struct MemPoolIterator<'db, DB: DBAccess, M: Mempool> {
425 iter: MemPoolFeeIterator<'db, DB>,
426 mempool: &'db M,
427}
428
429impl<'db, DB: DBAccess, M: Mempool> MemPoolIterator<'db, DB, M> {
430 fn new(
431 db: &'db rocksdb::Transaction<DB>,
432 fees_cf: &ColumnFamily,
433 mempool: &'db M,
434 ) -> Self {
435 let iter = MemPoolFeeIterator::new(db, fees_cf, true);
436 MemPoolIterator { iter, mempool }
437 }
438}
439
440impl<DB: DBAccess, M: Mempool> Iterator for MemPoolIterator<'_, DB, M> {
441 type Item = LedgerTransaction;
442 fn next(&mut self) -> Option<Self::Item> {
443 self.iter.next().and_then(|(_, tx_id)| {
444 self.mempool.mempool_tx(tx_id).ok().flatten()
445 })
446 }
447}
448
449pub struct MemPoolFeeIterator<'db, DB: DBAccess> {
450 iter: DBRawIteratorWithThreadMode<'db, rocksdb::Transaction<'db, DB>>,
451 fee_desc: bool,
452}
453
454impl<'db, DB: DBAccess> MemPoolFeeIterator<'db, DB> {
455 fn new(
456 db: &'db rocksdb::Transaction<DB>,
457 fees_cf: &ColumnFamily,
458 fee_desc: bool,
459 ) -> Self {
460 let mut iter = db.raw_iterator_cf(fees_cf);
461 if fee_desc {
462 iter.seek_to_last();
463 } else {
464 iter.seek_to_first();
465 };
466 MemPoolFeeIterator { iter, fee_desc }
467 }
468}
469
470impl<DB: DBAccess> Iterator for MemPoolFeeIterator<'_, DB> {
471 type Item = (u64, [u8; 32]);
472 fn next(&mut self) -> Option<Self::Item> {
473 match self.iter.valid() {
474 true => {
475 if let Some(key) = self.iter.key() {
476 let (gas_price, hash) =
477 deserialize_key(&mut &key.to_vec()[..]).ok()?;
478 if self.fee_desc {
479 self.iter.prev();
480 } else {
481 self.iter.next();
482 }
483 Some((gas_price, hash))
484 } else {
485 None
486 }
487 }
488 false => None,
489 }
490 }
491}