Skip to main content

dusk_node/database/rocksdb/
blocks.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4//
5// Copyright (c) DUSK NETWORK. All rights reserved.
6
7use super::*;
8impl<DB: DBAccess> Ledger for DBTransaction<'_, DB> {
9    fn store_block(
10        &mut self,
11        header: &Header,
12        txs: &[SpentTransaction],
13        faults: &[Fault],
14        label: Label,
15    ) -> Result<usize> {
16        // COLUMN FAMILY: CF_LEDGER_HEADER
17        // It consists of one record per block - Header record
18        // It also includes single record to store metadata - Register record
19        {
20            let cf = self.ledger_cf;
21
22            let mut buf = vec![];
23            LightBlock {
24                header: header.clone(),
25                transactions_ids: txs.iter().map(|t| t.inner.id()).collect(),
26                faults_ids: faults.iter().map(|f| f.id()).collect(),
27            }
28            .write(&mut buf)?;
29
30            self.put_cf(cf, header.hash, buf)?;
31        }
32
33        // Update metadata values
34        self.op_write(MD_HASH_KEY, header.hash)?;
35        self.op_write(MD_STATE_ROOT_KEY, header.state_hash)?;
36
37        // COLUMN FAMILY: CF_LEDGER_TXS
38        {
39            let cf = self.ledger_txs_cf;
40
41            let mut stored_blobs = Vec::with_capacity(6);
42
43            // store all block transactions
44            for tx in txs {
45                let mut d = vec![];
46
47                if tx.inner.protocol().blob().is_some() {
48                    let mut strip_tx = tx.clone();
49                    if let Some(blobs) = strip_tx.inner.strip_blobs() {
50                        for (hash, sidecar) in blobs.into_iter() {
51                            let sidecar_bytes = sidecar.to_var_bytes();
52                            self.store_blob_data(&hash, sidecar_bytes)?;
53                            stored_blobs.push(hash);
54                        }
55                    }
56                    strip_tx.write(&mut d)?;
57                } else {
58                    tx.write(&mut d)?;
59                }
60                self.put_cf(cf, tx.inner.id(), d)?;
61            }
62
63            if !stored_blobs.is_empty() {
64                // Store all blobs hashes in the ledger
65                self.store_blobs_height(header.height, &stored_blobs)?;
66            }
67        }
68
69        // COLUMN FAMILY: CF_LEDGER_FAULTS
70        {
71            let cf = self.ledger_faults_cf;
72
73            // store all block faults
74            for f in faults {
75                let mut d = vec![];
76                f.write(&mut d)?;
77                self.put_cf(cf, f.id(), d)?;
78            }
79        }
80        self.store_block_label(header.height, &header.hash, label)?;
81
82        Ok(self.get_size())
83    }
84
85    fn faults_by_block(&self, start_height: u64) -> Result<Vec<Fault>> {
86        let mut faults = vec![];
87        let mut hash = self
88            .op_read(MD_HASH_KEY)?
89            .ok_or(error::RocksDbError::CannotReadTip)?;
90
91        loop {
92            let block = self
93                .light_block(&hash)?
94                .ok_or_else(|| error::RocksDbError::cannot_read_block(&hash))?;
95
96            let block_height = block.header.height;
97
98            if block_height >= start_height {
99                hash = block.header.prev_block_hash.to_vec();
100                faults.extend(self.faults(&block.faults_ids)?);
101            } else {
102                break;
103            }
104
105            if block_height == 0 {
106                break;
107            }
108        }
109        Ok(faults)
110    }
111
112    fn store_block_label(
113        &mut self,
114        height: u64,
115        hash: &[u8; 32],
116        label: Label,
117    ) -> Result<()> {
118        // CF: HEIGHT -> (BLOCK_HASH, BLOCK_LABEL)
119        let mut buf = vec![];
120        buf.write_all(hash)?;
121        label.write(&mut buf)?;
122
123        self.put_cf(self.ledger_height_cf, height.to_le_bytes(), buf)?;
124        Ok(())
125    }
126
127    fn delete_block(&mut self, b: &Block) -> Result<()> {
128        self.inner.delete_cf(
129            self.ledger_height_cf,
130            b.header().height.to_le_bytes(),
131        )?;
132
133        for tx in b.txs() {
134            self.inner.delete_cf(self.ledger_txs_cf, tx.id())?;
135        }
136        for f in b.faults() {
137            self.inner.delete_cf(self.ledger_faults_cf, f.id())?;
138        }
139
140        self.delete_blobs_by_height(b.header().height)?;
141        self.inner.delete_cf(self.ledger_cf, b.header().hash)?;
142
143        Ok(())
144    }
145
146    fn block_exists(&self, hash: &[u8]) -> Result<bool> {
147        Ok(self.inner.get_cf(self.ledger_cf, hash)?.is_some())
148    }
149
150    fn faults(&self, faults_ids: &[[u8; 32]]) -> Result<Vec<Fault>> {
151        if faults_ids.is_empty() {
152            return Ok(vec![]);
153        }
154        let ids = faults_ids
155            .iter()
156            .map(|id| (self.ledger_faults_cf, id))
157            .collect::<Vec<_>>();
158
159        // Retrieve all faults ID with single call
160        let faults_buffer = self.inner.multi_get_cf(ids);
161
162        let mut faults = vec![];
163        for buf in faults_buffer {
164            let buf = buf?.unwrap();
165            let fault = Fault::read(&mut &buf[..])?;
166            faults.push(fault);
167        }
168
169        Ok(faults)
170    }
171
172    fn latest_block_opt(&self) -> Result<Option<LightBlock>> {
173        let Some(tip_hash) = self.op_read(MD_HASH_KEY)? else {
174            return Ok(None);
175        };
176
177        let tip_block = self
178            .light_block(&tip_hash)?
179            .ok_or(error::RocksDbError::TipBlockMissing)?;
180        Ok(Some(tip_block))
181    }
182
183    fn latest_block(&self) -> Result<LightBlock> {
184        let tip_block = self
185            .latest_block_opt()?
186            .ok_or(error::RocksDbError::TipMetadataMissing)?;
187        Ok(tip_block)
188    }
189
190    fn blob_data_by_hash(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
191        Ok(self.inner.get_cf(self.ledger_blobs_cf, hash)?)
192    }
193
194    fn store_blob_data(&self, hash: &[u8; 32], data: Vec<u8>) -> Result<()> {
195        self.inner.put_cf(self.ledger_blobs_cf, hash, data)?;
196        Ok(())
197    }
198    fn store_blobs_height(
199        &self,
200        block_height: u64,
201        blob_hashes: &[[u8; 32]],
202    ) -> Result<()> {
203        if blob_hashes.is_empty() {
204            return Ok(());
205        }
206        let blob_hashes_bytes: Vec<_> =
207            blob_hashes.iter().flat_map(|hash| hash.to_vec()).collect();
208        self.inner.put_cf(
209            self.ledger_blobs_height_cf,
210            block_height.to_be_bytes(),
211            blob_hashes_bytes,
212        )?;
213        Ok(())
214    }
215
216    fn delete_blobs_by_height(&self, block_height: u64) -> Result<()> {
217        let blobs_to_delete = self.blobs_by_height(block_height)?;
218        if let Some(blob_hashes) = blobs_to_delete {
219            for hash in blob_hashes {
220                // What happen if the blobs also exists linked to another
221                // transaction?
222                self.inner.delete_cf(self.ledger_blobs_cf, hash)?;
223            }
224            self.inner.delete_cf(
225                self.ledger_blobs_height_cf,
226                block_height.to_be_bytes(),
227            )?;
228        }
229
230        Ok(())
231    }
232
233    fn blobs_by_height(
234        &self,
235        block_height: u64,
236    ) -> Result<Option<Vec<[u8; 32]>>> {
237        let blob_hashes_bytes = self
238            .inner
239            .get_cf(self.ledger_blobs_height_cf, block_height.to_be_bytes())?;
240
241        if let Some(blob_hashes_bytes) = blob_hashes_bytes {
242            let mut blob_hashes = vec![];
243            for chunk in blob_hashes_bytes.chunks(32) {
244                let mut hash = [0u8; 32];
245                hash.copy_from_slice(chunk);
246                blob_hashes.push(hash);
247            }
248            Ok(Some(blob_hashes))
249        } else {
250            Ok(None)
251        }
252    }
253
254    fn block(&self, hash: &[u8]) -> Result<Option<Block>> {
255        match self.inner.get_cf(self.ledger_cf, hash)? {
256            Some(blob) => {
257                let record = LightBlock::read(&mut &blob[..])?;
258
259                // Retrieve all transactions buffers with single call
260                let txs_buffers = self.inner.multi_get_cf(
261                    record
262                        .transactions_ids
263                        .iter()
264                        .map(|id| (self.ledger_txs_cf, id))
265                        .collect::<Vec<(&ColumnFamily, &[u8; 32])>>(),
266                );
267
268                let mut txs = vec![];
269                for buf in txs_buffers {
270                    let buf = buf?.unwrap();
271                    let mut tx = SpentTransaction::read(&mut &buf[..])?;
272                    if let Some(blobs) = tx.inner.blob_mut() {
273                        for blob in blobs {
274                            // Retrieve blob data from the ledger
275                            let sidecar = self
276                                .blob_data_by_hash(&blob.hash)?
277                                .map(|bytes| {
278                                    BlobSidecar::from_buf(&mut &bytes[..])
279                                })
280                                .transpose()
281                                .map_err(
282                                    error::RocksDbError::blob_sidecar_parse,
283                                )?;
284                            blob.data = sidecar;
285                        }
286                    }
287                    txs.push(tx.inner);
288                }
289
290                // Retrieve all faults ID with single call
291                let faults_buffer = self.inner.multi_get_cf(
292                    record
293                        .faults_ids
294                        .iter()
295                        .map(|id| (self.ledger_faults_cf, id))
296                        .collect::<Vec<(&ColumnFamily, &[u8; 32])>>(),
297                );
298                let mut faults = vec![];
299                for buf in faults_buffer {
300                    let buf = buf?.unwrap();
301                    let fault = Fault::read(&mut &buf[..])?;
302                    faults.push(fault);
303                }
304
305                Ok(Some(
306                    Block::new(record.header, txs, faults)
307                        .expect("block should be valid"),
308                ))
309            }
310            None => Ok(None),
311        }
312    }
313
314    fn light_block(&self, hash: &[u8]) -> Result<Option<LightBlock>> {
315        match self.inner.get_cf(self.ledger_cf, hash)? {
316            Some(blob) => {
317                let record = LightBlock::read(&mut &blob[..])?;
318                Ok(Some(record))
319            }
320            None => Ok(None),
321        }
322    }
323
324    fn block_header(&self, hash: &[u8]) -> Result<Option<Header>> {
325        match self.inner.get_cf(self.ledger_cf, hash)? {
326            Some(blob) => {
327                let record = Header::read(&mut &blob[..])?;
328                Ok(Some(record))
329            }
330            None => Ok(None),
331        }
332    }
333
334    fn block_hash_by_height(&self, height: u64) -> Result<Option<[u8; 32]>> {
335        Ok(self
336            .inner
337            .get_cf(self.ledger_height_cf, height.to_le_bytes())?
338            .map(|h| {
339                const LEN: usize = 32;
340                let mut hash = [0u8; LEN];
341                hash.copy_from_slice(&h.as_slice()[0..LEN]);
342                hash
343            }))
344    }
345
346    fn ledger_tx(&self, tx_id: &[u8]) -> Result<Option<SpentTransaction>> {
347        let tx = self
348            .inner
349            .get_cf(self.ledger_txs_cf, tx_id)?
350            .map(|blob| SpentTransaction::read(&mut &blob[..]))
351            .transpose()?;
352
353        Ok(tx)
354    }
355
356    /// Returns a list of transactions from the ledger
357    ///
358    /// This function expects a list of transaction IDs that are in the ledger.
359    ///
360    /// It will return an error if any of the transaction IDs are not found in
361    /// the ledger.
362    fn ledger_txs(
363        &self,
364        tx_ids: Vec<&[u8; 32]>,
365    ) -> Result<Vec<SpentTransaction>> {
366        let cf = self.ledger_txs_cf;
367
368        let ids = tx_ids.into_iter().map(|id| (cf, id)).collect::<Vec<_>>();
369
370        let multi_get_results = self.inner.multi_get_cf(ids);
371
372        let mut spent_transactions =
373            Vec::with_capacity(multi_get_results.len());
374        for result in multi_get_results.into_iter() {
375            let opt_blob = result.map_err(std::io::Error::other)?;
376
377            let Some(blob) = opt_blob else {
378                return Err(
379                    error::RocksDbError::MissingLedgerTransaction.into()
380                );
381            };
382
383            let stx = SpentTransaction::read(&mut &blob[..])?;
384
385            spent_transactions.push(stx);
386        }
387
388        Ok(spent_transactions)
389    }
390
391    /// Returns true if the transaction exists in the
392    /// ledger
393    ///
394    /// This is a convenience method that checks if a transaction exists in the
395    /// ledger without unmarshalling the transaction
396    fn ledger_tx_exists(&self, tx_id: &[u8]) -> Result<bool> {
397        Ok(self.inner.get_cf(self.ledger_txs_cf, tx_id)?.is_some())
398    }
399
400    fn block_by_height(&self, height: u64) -> Result<Option<Block>> {
401        let hash = self.block_hash_by_height(height)?;
402        let block = match hash {
403            Some(hash) => self.block(&hash)?,
404            None => None,
405        };
406        Ok(block)
407    }
408
409    fn block_label_by_height(
410        &self,
411        height: u64,
412    ) -> Result<Option<([u8; 32], Label)>> {
413        const HASH_LEN: usize = 32;
414        Ok(self
415            .inner
416            .get_cf(self.ledger_height_cf, height.to_le_bytes())?
417            .map(|h| {
418                let mut hash = [0u8; HASH_LEN];
419                hash.copy_from_slice(&h.as_slice()[0..HASH_LEN]);
420
421                let label_buff = h[HASH_LEN..].to_vec();
422                Label::read(&mut &label_buff[..]).map(|label| (hash, label))
423            })
424            .transpose()?)
425    }
426}