dusk_node/database/rocksdb/
blocks.rs1use super::*;
8impl<DB: DBAccess> Ledger for DBTransaction<'_, DB> {
9 fn store_block(
10 &mut self,
11 header: &Header,
12 txs: &[SpentTransaction],
13 faults: &[Fault],
14 label: Label,
15 ) -> Result<usize> {
16 {
20 let cf = self.ledger_cf;
21
22 let mut buf = vec![];
23 LightBlock {
24 header: header.clone(),
25 transactions_ids: txs.iter().map(|t| t.inner.id()).collect(),
26 faults_ids: faults.iter().map(|f| f.id()).collect(),
27 }
28 .write(&mut buf)?;
29
30 self.put_cf(cf, header.hash, buf)?;
31 }
32
33 self.op_write(MD_HASH_KEY, header.hash)?;
35 self.op_write(MD_STATE_ROOT_KEY, header.state_hash)?;
36
37 {
39 let cf = self.ledger_txs_cf;
40
41 let mut stored_blobs = Vec::with_capacity(6);
42
43 for tx in txs {
45 let mut d = vec![];
46
47 if tx.inner.protocol().blob().is_some() {
48 let mut strip_tx = tx.clone();
49 if let Some(blobs) = strip_tx.inner.strip_blobs() {
50 for (hash, sidecar) in blobs.into_iter() {
51 let sidecar_bytes = sidecar.to_var_bytes();
52 self.store_blob_data(&hash, sidecar_bytes)?;
53 stored_blobs.push(hash);
54 }
55 }
56 strip_tx.write(&mut d)?;
57 } else {
58 tx.write(&mut d)?;
59 }
60 self.put_cf(cf, tx.inner.id(), d)?;
61 }
62
63 if !stored_blobs.is_empty() {
64 self.store_blobs_height(header.height, &stored_blobs)?;
66 }
67 }
68
69 {
71 let cf = self.ledger_faults_cf;
72
73 for f in faults {
75 let mut d = vec![];
76 f.write(&mut d)?;
77 self.put_cf(cf, f.id(), d)?;
78 }
79 }
80 self.store_block_label(header.height, &header.hash, label)?;
81
82 Ok(self.get_size())
83 }
84
85 fn faults_by_block(&self, start_height: u64) -> Result<Vec<Fault>> {
86 let mut faults = vec![];
87 let mut hash = self
88 .op_read(MD_HASH_KEY)?
89 .ok_or(error::RocksDbError::CannotReadTip)?;
90
91 loop {
92 let block = self
93 .light_block(&hash)?
94 .ok_or_else(|| error::RocksDbError::cannot_read_block(&hash))?;
95
96 let block_height = block.header.height;
97
98 if block_height >= start_height {
99 hash = block.header.prev_block_hash.to_vec();
100 faults.extend(self.faults(&block.faults_ids)?);
101 } else {
102 break;
103 }
104
105 if block_height == 0 {
106 break;
107 }
108 }
109 Ok(faults)
110 }
111
112 fn store_block_label(
113 &mut self,
114 height: u64,
115 hash: &[u8; 32],
116 label: Label,
117 ) -> Result<()> {
118 let mut buf = vec![];
120 buf.write_all(hash)?;
121 label.write(&mut buf)?;
122
123 self.put_cf(self.ledger_height_cf, height.to_le_bytes(), buf)?;
124 Ok(())
125 }
126
127 fn delete_block(&mut self, b: &Block) -> Result<()> {
128 self.inner.delete_cf(
129 self.ledger_height_cf,
130 b.header().height.to_le_bytes(),
131 )?;
132
133 for tx in b.txs() {
134 self.inner.delete_cf(self.ledger_txs_cf, tx.id())?;
135 }
136 for f in b.faults() {
137 self.inner.delete_cf(self.ledger_faults_cf, f.id())?;
138 }
139
140 self.delete_blobs_by_height(b.header().height)?;
141 self.inner.delete_cf(self.ledger_cf, b.header().hash)?;
142
143 Ok(())
144 }
145
146 fn block_exists(&self, hash: &[u8]) -> Result<bool> {
147 Ok(self.inner.get_cf(self.ledger_cf, hash)?.is_some())
148 }
149
150 fn faults(&self, faults_ids: &[[u8; 32]]) -> Result<Vec<Fault>> {
151 if faults_ids.is_empty() {
152 return Ok(vec![]);
153 }
154 let ids = faults_ids
155 .iter()
156 .map(|id| (self.ledger_faults_cf, id))
157 .collect::<Vec<_>>();
158
159 let faults_buffer = self.inner.multi_get_cf(ids);
161
162 let mut faults = vec![];
163 for buf in faults_buffer {
164 let buf = buf?.unwrap();
165 let fault = Fault::read(&mut &buf[..])?;
166 faults.push(fault);
167 }
168
169 Ok(faults)
170 }
171
172 fn latest_block_opt(&self) -> Result<Option<LightBlock>> {
173 let Some(tip_hash) = self.op_read(MD_HASH_KEY)? else {
174 return Ok(None);
175 };
176
177 let tip_block = self
178 .light_block(&tip_hash)?
179 .ok_or(error::RocksDbError::TipBlockMissing)?;
180 Ok(Some(tip_block))
181 }
182
183 fn latest_block(&self) -> Result<LightBlock> {
184 let tip_block = self
185 .latest_block_opt()?
186 .ok_or(error::RocksDbError::TipMetadataMissing)?;
187 Ok(tip_block)
188 }
189
190 fn blob_data_by_hash(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
191 Ok(self.inner.get_cf(self.ledger_blobs_cf, hash)?)
192 }
193
194 fn store_blob_data(&self, hash: &[u8; 32], data: Vec<u8>) -> Result<()> {
195 self.inner.put_cf(self.ledger_blobs_cf, hash, data)?;
196 Ok(())
197 }
198 fn store_blobs_height(
199 &self,
200 block_height: u64,
201 blob_hashes: &[[u8; 32]],
202 ) -> Result<()> {
203 if blob_hashes.is_empty() {
204 return Ok(());
205 }
206 let blob_hashes_bytes: Vec<_> =
207 blob_hashes.iter().flat_map(|hash| hash.to_vec()).collect();
208 self.inner.put_cf(
209 self.ledger_blobs_height_cf,
210 block_height.to_be_bytes(),
211 blob_hashes_bytes,
212 )?;
213 Ok(())
214 }
215
216 fn delete_blobs_by_height(&self, block_height: u64) -> Result<()> {
217 let blobs_to_delete = self.blobs_by_height(block_height)?;
218 if let Some(blob_hashes) = blobs_to_delete {
219 for hash in blob_hashes {
220 self.inner.delete_cf(self.ledger_blobs_cf, hash)?;
223 }
224 self.inner.delete_cf(
225 self.ledger_blobs_height_cf,
226 block_height.to_be_bytes(),
227 )?;
228 }
229
230 Ok(())
231 }
232
233 fn blobs_by_height(
234 &self,
235 block_height: u64,
236 ) -> Result<Option<Vec<[u8; 32]>>> {
237 let blob_hashes_bytes = self
238 .inner
239 .get_cf(self.ledger_blobs_height_cf, block_height.to_be_bytes())?;
240
241 if let Some(blob_hashes_bytes) = blob_hashes_bytes {
242 let mut blob_hashes = vec![];
243 for chunk in blob_hashes_bytes.chunks(32) {
244 let mut hash = [0u8; 32];
245 hash.copy_from_slice(chunk);
246 blob_hashes.push(hash);
247 }
248 Ok(Some(blob_hashes))
249 } else {
250 Ok(None)
251 }
252 }
253
254 fn block(&self, hash: &[u8]) -> Result<Option<Block>> {
255 match self.inner.get_cf(self.ledger_cf, hash)? {
256 Some(blob) => {
257 let record = LightBlock::read(&mut &blob[..])?;
258
259 let txs_buffers = self.inner.multi_get_cf(
261 record
262 .transactions_ids
263 .iter()
264 .map(|id| (self.ledger_txs_cf, id))
265 .collect::<Vec<(&ColumnFamily, &[u8; 32])>>(),
266 );
267
268 let mut txs = vec![];
269 for buf in txs_buffers {
270 let buf = buf?.unwrap();
271 let mut tx = SpentTransaction::read(&mut &buf[..])?;
272 if let Some(blobs) = tx.inner.blob_mut() {
273 for blob in blobs {
274 let sidecar = self
276 .blob_data_by_hash(&blob.hash)?
277 .map(|bytes| {
278 BlobSidecar::from_buf(&mut &bytes[..])
279 })
280 .transpose()
281 .map_err(
282 error::RocksDbError::blob_sidecar_parse,
283 )?;
284 blob.data = sidecar;
285 }
286 }
287 txs.push(tx.inner);
288 }
289
290 let faults_buffer = self.inner.multi_get_cf(
292 record
293 .faults_ids
294 .iter()
295 .map(|id| (self.ledger_faults_cf, id))
296 .collect::<Vec<(&ColumnFamily, &[u8; 32])>>(),
297 );
298 let mut faults = vec![];
299 for buf in faults_buffer {
300 let buf = buf?.unwrap();
301 let fault = Fault::read(&mut &buf[..])?;
302 faults.push(fault);
303 }
304
305 Ok(Some(
306 Block::new(record.header, txs, faults)
307 .expect("block should be valid"),
308 ))
309 }
310 None => Ok(None),
311 }
312 }
313
314 fn light_block(&self, hash: &[u8]) -> Result<Option<LightBlock>> {
315 match self.inner.get_cf(self.ledger_cf, hash)? {
316 Some(blob) => {
317 let record = LightBlock::read(&mut &blob[..])?;
318 Ok(Some(record))
319 }
320 None => Ok(None),
321 }
322 }
323
324 fn block_header(&self, hash: &[u8]) -> Result<Option<Header>> {
325 match self.inner.get_cf(self.ledger_cf, hash)? {
326 Some(blob) => {
327 let record = Header::read(&mut &blob[..])?;
328 Ok(Some(record))
329 }
330 None => Ok(None),
331 }
332 }
333
334 fn block_hash_by_height(&self, height: u64) -> Result<Option<[u8; 32]>> {
335 Ok(self
336 .inner
337 .get_cf(self.ledger_height_cf, height.to_le_bytes())?
338 .map(|h| {
339 const LEN: usize = 32;
340 let mut hash = [0u8; LEN];
341 hash.copy_from_slice(&h.as_slice()[0..LEN]);
342 hash
343 }))
344 }
345
346 fn ledger_tx(&self, tx_id: &[u8]) -> Result<Option<SpentTransaction>> {
347 let tx = self
348 .inner
349 .get_cf(self.ledger_txs_cf, tx_id)?
350 .map(|blob| SpentTransaction::read(&mut &blob[..]))
351 .transpose()?;
352
353 Ok(tx)
354 }
355
356 fn ledger_txs(
363 &self,
364 tx_ids: Vec<&[u8; 32]>,
365 ) -> Result<Vec<SpentTransaction>> {
366 let cf = self.ledger_txs_cf;
367
368 let ids = tx_ids.into_iter().map(|id| (cf, id)).collect::<Vec<_>>();
369
370 let multi_get_results = self.inner.multi_get_cf(ids);
371
372 let mut spent_transactions =
373 Vec::with_capacity(multi_get_results.len());
374 for result in multi_get_results.into_iter() {
375 let opt_blob = result.map_err(std::io::Error::other)?;
376
377 let Some(blob) = opt_blob else {
378 return Err(
379 error::RocksDbError::MissingLedgerTransaction.into()
380 );
381 };
382
383 let stx = SpentTransaction::read(&mut &blob[..])?;
384
385 spent_transactions.push(stx);
386 }
387
388 Ok(spent_transactions)
389 }
390
391 fn ledger_tx_exists(&self, tx_id: &[u8]) -> Result<bool> {
397 Ok(self.inner.get_cf(self.ledger_txs_cf, tx_id)?.is_some())
398 }
399
400 fn block_by_height(&self, height: u64) -> Result<Option<Block>> {
401 let hash = self.block_hash_by_height(height)?;
402 let block = match hash {
403 Some(hash) => self.block(&hash)?,
404 None => None,
405 };
406 Ok(block)
407 }
408
409 fn block_label_by_height(
410 &self,
411 height: u64,
412 ) -> Result<Option<([u8; 32], Label)>> {
413 const HASH_LEN: usize = 32;
414 Ok(self
415 .inner
416 .get_cf(self.ledger_height_cf, height.to_le_bytes())?
417 .map(|h| {
418 let mut hash = [0u8; HASH_LEN];
419 hash.copy_from_slice(&h.as_slice()[0..HASH_LEN]);
420
421 let label_buff = h[HASH_LEN..].to_vec();
422 Label::read(&mut &label_buff[..]).map(|label| (hash, label))
423 })
424 .transpose()?)
425 }
426}