1#![allow(clippy::items_after_statements)]
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::num::NonZeroUsize;
5use std::rc::Rc;
6use std::vec::Vec;
7
8use miden_client::Word;
9use miden_client::block::BlockHeader;
10use miden_client::crypto::{Forest, InOrderIndex, MmrPeaks};
11use miden_client::note::BlockNumber;
12use miden_client::store::{BlockRelevance, PartialBlockchainFilter, StoreError};
13use miden_client::utils::{Deserializable, Serializable};
14use rusqlite::types::Value;
15use rusqlite::{Connection, OptionalExtension, Transaction, params, params_from_iter};
16
17use super::SqliteStore;
18use crate::sql_error::SqlResultExt;
19use crate::{insert_sql, subst};
20
21struct SerializedBlockHeaderData {
22 block_num: u32,
23 header: Vec<u8>,
24 partial_blockchain_peaks: Vec<u8>,
25 has_client_notes: bool,
26}
27struct SerializedBlockHeaderParts {
28 _block_num: u64,
29 header: Vec<u8>,
30 _partial_blockchain_peaks: Vec<u8>,
31 has_client_notes: bool,
32}
33
34struct SerializedPartialBlockchainNodeData {
35 id: i64,
36 node: String,
37}
38struct SerializedPartialBlockchainNodeParts {
39 id: u64,
40 node: String,
41}
42
43impl SqliteStore {
44 pub(crate) fn insert_block_header(
45 conn: &mut Connection,
46 block_header: &BlockHeader,
47 partial_blockchain_peaks: &MmrPeaks,
48 has_client_notes: bool,
49 ) -> Result<(), StoreError> {
50 let tx = conn.transaction().into_store_error()?;
51
52 Self::insert_block_header_tx(
53 &tx,
54 block_header,
55 partial_blockchain_peaks,
56 has_client_notes,
57 )?;
58
59 tx.commit().into_store_error()?;
60 Ok(())
61 }
62
63 pub(crate) fn get_block_headers(
64 conn: &mut Connection,
65 block_numbers: &BTreeSet<BlockNumber>,
66 ) -> Result<Vec<(BlockHeader, BlockRelevance)>, StoreError> {
67 let block_number_list = block_numbers
68 .iter()
69 .map(|block_number| Value::Integer(i64::from(block_number.as_u32())))
70 .collect::<Vec<Value>>();
71
72 const QUERY: &str = "SELECT block_num, header, partial_blockchain_peaks, has_client_notes FROM block_headers WHERE block_num IN rarray(?)";
73
74 conn.prepare(QUERY)
75 .into_store_error()?
76 .query_map(params![Rc::new(block_number_list)], parse_block_headers_columns)
77 .into_store_error()?
78 .map(|result| {
79 let serialized_block_header_parts: SerializedBlockHeaderParts =
80 result.into_store_error()?;
81 parse_block_header(&serialized_block_header_parts)
82 })
83 .collect()
84 }
85
86 pub(crate) fn get_tracked_block_headers(
87 conn: &mut Connection,
88 ) -> Result<Vec<BlockHeader>, StoreError> {
89 const QUERY: &str = "SELECT block_num, header, partial_blockchain_peaks, has_client_notes FROM block_headers WHERE has_client_notes=true";
90 conn.prepare(QUERY)
91 .into_store_error()?
92 .query_map(params![], parse_block_headers_columns)
93 .into_store_error()?
94 .map(|result| {
95 let serialized_block_header_parts: SerializedBlockHeaderParts =
96 result.into_store_error()?;
97 parse_block_header(&serialized_block_header_parts).map(|(block, _)| block)
98 })
99 .collect()
100 }
101
102 pub(crate) fn get_tracked_block_header_numbers(
103 conn: &mut Connection,
104 ) -> Result<BTreeSet<usize>, StoreError> {
105 const QUERY: &str = "SELECT block_num FROM block_headers WHERE has_client_notes=true";
106 conn.prepare(QUERY)
107 .into_store_error()?
108 .query_map(params![], |row| row.get::<_, u32>(0))
109 .into_store_error()?
110 .map(|result| {
111 let block_num: u32 = result.into_store_error()?;
112 Ok(block_num as usize)
113 })
114 .collect()
115 }
116
117 pub(crate) fn get_partial_blockchain_nodes(
118 conn: &mut Connection,
119 filter: &PartialBlockchainFilter,
120 ) -> Result<BTreeMap<InOrderIndex, Word>, StoreError> {
121 match filter {
122 PartialBlockchainFilter::All => query_partial_blockchain_nodes(
123 conn,
124 "SELECT id, node FROM partial_blockchain_nodes",
125 params![],
126 ),
127
128 PartialBlockchainFilter::List(ids) if ids.is_empty() => Ok(BTreeMap::new()),
129 PartialBlockchainFilter::List(ids) => {
130 let id_values = ids
131 .iter()
132 .map(|id| Value::Integer(i64::try_from(id.inner()).expect("id is a valid i64")))
133 .collect::<Vec<_>>();
134
135 query_partial_blockchain_nodes(
136 conn,
137 "SELECT id, node FROM partial_blockchain_nodes WHERE id IN rarray(?)",
138 params_from_iter([Rc::new(id_values)]),
139 )
140 },
141
142 PartialBlockchainFilter::Forest(forest) if forest.is_empty() => Ok(BTreeMap::new()),
143 PartialBlockchainFilter::Forest(forest) => {
144 let max_index = i64::try_from(forest.rightmost_in_order_index().inner())
145 .expect("id is a valid i64");
146
147 query_partial_blockchain_nodes(
148 conn,
149 "SELECT id, node FROM partial_blockchain_nodes WHERE id <= ?",
150 params![max_index],
151 )
152 },
153 }
154 }
155
156 pub(crate) fn get_partial_blockchain_peaks_by_block_num(
157 conn: &mut Connection,
158 block_num: BlockNumber,
159 ) -> Result<MmrPeaks, StoreError> {
160 const QUERY: &str =
161 "SELECT partial_blockchain_peaks FROM block_headers WHERE block_num = ?";
162
163 let partial_blockchain_peaks: Option<Vec<u8>> = conn
164 .prepare(QUERY)
165 .into_store_error()?
166 .query_row(params![block_num.as_u32()], |row| row.get::<_, Vec<u8>>(0))
167 .optional()
168 .into_store_error()?;
169
170 if let Some(partial_blockchain_peaks) = partial_blockchain_peaks {
171 return parse_partial_blockchain_peaks(block_num.as_u32(), &partial_blockchain_peaks);
172 }
173
174 Ok(MmrPeaks::new(Forest::empty(), vec![])?)
175 }
176
177 pub fn insert_partial_blockchain_nodes(
178 conn: &mut Connection,
179 nodes: &[(InOrderIndex, Word)],
180 ) -> Result<(), StoreError> {
181 let tx = conn.transaction().into_store_error()?;
182
183 Self::insert_partial_blockchain_nodes_tx(&tx, nodes)?;
184 tx.commit().into_store_error()?;
185 Ok(())
186 }
187
188 pub(crate) fn insert_partial_blockchain_nodes_tx(
190 tx: &Transaction<'_>,
191 nodes: &[(InOrderIndex, Word)],
192 ) -> Result<(), StoreError> {
193 for (index, node) in nodes {
194 insert_partial_blockchain_node(tx, *index, *node)?;
195 }
196 Ok(())
197 }
198
199 pub(crate) fn insert_block_header_tx(
204 tx: &Transaction<'_>,
205 block_header: &BlockHeader,
206 partial_blockchain_peaks: &MmrPeaks,
207 has_client_notes: bool,
208 ) -> Result<(), StoreError> {
209 let partial_blockchain_peaks = partial_blockchain_peaks.peaks().to_vec();
210 let SerializedBlockHeaderData {
211 block_num,
212 header,
213 partial_blockchain_peaks,
214 has_client_notes,
215 } = serialize_block_header(block_header, &partial_blockchain_peaks, has_client_notes);
216 const QUERY: &str = insert_sql!(
217 block_headers {
218 block_num,
219 header,
220 partial_blockchain_peaks,
221 has_client_notes,
222 } | IGNORE
223 );
224 tx.execute(QUERY, params![block_num, header, partial_blockchain_peaks, has_client_notes])
225 .into_store_error()?;
226
227 set_block_header_has_client_notes(tx, u64::from(block_num), has_client_notes)?;
228 Ok(())
229 }
230
231 pub fn prune_irrelevant_blocks(conn: &mut Connection) -> Result<(), StoreError> {
234 let tx = conn.transaction().into_store_error()?;
235 let genesis: u32 = BlockNumber::GENESIS.as_u32();
236
237 let sync_block: Option<u32> = tx
238 .query_row("SELECT block_num FROM state_sync LIMIT 1", [], |r| r.get(0))
239 .optional()
240 .into_store_error()?;
241
242 if let Some(sync_height) = sync_block {
243 tx.execute(
244 r"
245 DELETE FROM block_headers
246 WHERE has_client_notes = 0
247 AND block_num > ?1
248 AND block_num < ?2
249 ",
250 rusqlite::params![genesis, sync_height],
251 )
252 .into_store_error()?;
253 }
254
255 tx.commit().into_store_error()
256 }
257}
258
259fn insert_partial_blockchain_node(
264 tx: &Transaction<'_>,
265 id: InOrderIndex,
266 node: Word,
267) -> Result<(), StoreError> {
268 let SerializedPartialBlockchainNodeData { id, node } =
269 serialize_partial_blockchain_node(id, node);
270 const QUERY: &str = insert_sql!(partial_blockchain_nodes { id, node } | IGNORE);
271 tx.execute(QUERY, params![id, node]).into_store_error()?;
272 Ok(())
273}
274
275fn query_partial_blockchain_nodes<P: rusqlite::Params>(
276 conn: &mut Connection,
277 sql: &str,
278 params: P,
279) -> Result<BTreeMap<InOrderIndex, Word>, StoreError> {
280 let mut stmt = conn.prepare_cached(sql).into_store_error()?;
281
282 stmt.query_map(params, parse_partial_blockchain_nodes_columns)
283 .into_store_error()?
284 .map(|row_res| {
285 let parts: SerializedPartialBlockchainNodeParts = row_res.into_store_error()?;
286 parse_partial_blockchain_nodes(&parts)
287 })
288 .collect()
289}
290
291fn parse_partial_blockchain_peaks(forest: u32, peaks_nodes: &[u8]) -> Result<MmrPeaks, StoreError> {
292 let mmr_peaks_nodes = Vec::<Word>::read_from_bytes(peaks_nodes)?;
293
294 MmrPeaks::new(
295 Forest::new(usize::try_from(forest).expect("u64 should fit in usize")),
296 mmr_peaks_nodes,
297 )
298 .map_err(StoreError::MmrError)
299}
300
301fn serialize_block_header(
302 block_header: &BlockHeader,
303 partial_blockchain_peaks: &[Word],
304 has_client_notes: bool,
305) -> SerializedBlockHeaderData {
306 let block_num = block_header.block_num();
307 let header = block_header.to_bytes();
308 let partial_blockchain_peaks = partial_blockchain_peaks.to_bytes();
309
310 SerializedBlockHeaderData {
311 block_num: block_num.as_u32(),
312 header,
313 partial_blockchain_peaks,
314 has_client_notes,
315 }
316}
317
318fn parse_block_headers_columns(
319 row: &rusqlite::Row<'_>,
320) -> Result<SerializedBlockHeaderParts, rusqlite::Error> {
321 let block_num: u32 = row.get(0)?;
322 let header: Vec<u8> = row.get(1)?;
323 let partial_blockchain_peaks: Vec<u8> = row.get(2)?;
324 let has_client_notes: bool = row.get(3)?;
325
326 Ok(SerializedBlockHeaderParts {
327 _block_num: u64::from(block_num),
328 header,
329 _partial_blockchain_peaks: partial_blockchain_peaks,
330 has_client_notes,
331 })
332}
333
334fn parse_block_header(
335 serialized_block_header_parts: &SerializedBlockHeaderParts,
336) -> Result<(BlockHeader, BlockRelevance), StoreError> {
337 Ok((
338 BlockHeader::read_from_bytes(&serialized_block_header_parts.header)?,
339 serialized_block_header_parts.has_client_notes.into(),
340 ))
341}
342
343fn serialize_partial_blockchain_node(
344 id: InOrderIndex,
345 node: Word,
346) -> SerializedPartialBlockchainNodeData {
347 let id = i64::try_from(id.inner()).expect("id is a valid i64");
348 let node = node.to_hex();
349 SerializedPartialBlockchainNodeData { id, node }
350}
351
352fn parse_partial_blockchain_nodes_columns(
353 row: &rusqlite::Row<'_>,
354) -> Result<SerializedPartialBlockchainNodeParts, rusqlite::Error> {
355 let id: u64 = row.get(0)?;
356 let node = row.get(1)?;
357 Ok(SerializedPartialBlockchainNodeParts { id, node })
358}
359
360fn parse_partial_blockchain_nodes(
361 serialized_partial_blockchain_node_parts: &SerializedPartialBlockchainNodeParts,
362) -> Result<(InOrderIndex, Word), StoreError> {
363 let id = InOrderIndex::new(
364 NonZeroUsize::new(
365 usize::try_from(serialized_partial_blockchain_node_parts.id)
366 .expect("id is u64, should not fail"),
367 )
368 .unwrap(),
369 );
370 let node: Word = Word::try_from(&serialized_partial_blockchain_node_parts.node)?;
371 Ok((id, node))
372}
373
374pub(crate) fn set_block_header_has_client_notes(
375 tx: &Transaction<'_>,
376 block_num: u64,
377 has_client_notes: bool,
378) -> Result<(), StoreError> {
379 const QUERY: &str = "\
381 UPDATE block_headers
382 SET has_client_notes=?
383 WHERE block_num=? AND has_client_notes=FALSE;";
384 tx.execute(QUERY, params![has_client_notes, block_num]).into_store_error()?;
385 Ok(())
386}
387
388#[cfg(test)]
389mod test {
390 use std::collections::{BTreeMap, BTreeSet};
391 use std::vec::Vec;
392
393 use miden_client::Word;
394 use miden_client::block::BlockHeader;
395 use miden_client::crypto::{Forest, InOrderIndex, MmrPeaks};
396 use miden_client::store::Store;
397 use miden_protocol::crypto::merkle::mmr::Mmr;
398 use miden_protocol::transaction::TransactionKernel;
399 use rusqlite::params;
400
401 use crate::SqliteStore;
402 use crate::tests::create_test_store;
403
404 async fn insert_dummy_block_headers(store: &mut SqliteStore) -> Vec<BlockHeader> {
405 let block_headers: Vec<BlockHeader> = (0..5)
406 .map(|block_num| {
407 BlockHeader::mock(block_num, None, None, &[], TransactionKernel.to_commitment())
408 })
409 .collect();
410
411 let block_headers_clone = block_headers.clone();
412 store
413 .interact_with_connection(move |conn| {
414 let tx = conn.transaction().unwrap();
415 let dummy_peaks = MmrPeaks::new(Forest::empty(), Vec::new()).unwrap();
416 (0..5).for_each(|block_num| {
417 SqliteStore::insert_block_header_tx(
418 &tx,
419 &block_headers_clone[block_num],
420 &dummy_peaks,
421 false,
422 )
423 .unwrap();
424 });
425 tx.commit().unwrap();
426 Ok(())
427 })
428 .await
429 .unwrap();
430
431 block_headers
432 }
433
434 #[tokio::test]
435 async fn insert_and_get_block_headers_by_number() {
436 let mut store = create_test_store().await;
437 let block_headers = insert_dummy_block_headers(&mut store).await;
438
439 let block_header = Store::get_block_header_by_num(&store, 3.into()).await.unwrap().unwrap();
440 assert_eq!(block_headers[3], block_header.0);
441 }
442
443 #[tokio::test]
444 async fn insert_and_get_block_headers_by_list() {
445 let mut store = create_test_store().await;
446 let mock_block_headers = insert_dummy_block_headers(&mut store).await;
447
448 let block_headers: Vec<BlockHeader> =
449 Store::get_block_headers(&store, &[1.into(), 3.into()].into_iter().collect())
450 .await
451 .unwrap()
452 .into_iter()
453 .map(|(block_header, _has_notes)| block_header)
454 .collect();
455 assert_eq!(
456 &[mock_block_headers[1].clone(), mock_block_headers[3].clone()],
457 &block_headers[..]
458 );
459 }
460
461 #[tokio::test]
463 async fn partial_mmr_reconstructs_after_multiple_prune() {
464 let store = create_test_store().await;
468 const TOTAL_BLOCKS: usize = 7300;
469
470 let tx_kernel_commitment = TransactionKernel.to_commitment();
471 let block_headers: Vec<BlockHeader> = (0..TOTAL_BLOCKS)
472 .map(|block_num| {
473 BlockHeader::mock(
474 u32::try_from(block_num).unwrap(),
475 None,
476 None,
477 &[],
478 tx_kernel_commitment,
479 )
480 })
481 .collect();
482
483 let mut mmr = Mmr::default();
484 for header in &block_headers {
485 mmr.add(header.commitment());
486 }
487
488 let mut tracked_set: BTreeSet<usize> = (0..(TOTAL_BLOCKS - 1)).step_by(97).collect();
489 tracked_set.insert(TOTAL_BLOCKS - 2);
490 let tracked_blocks: Vec<usize> = tracked_set.iter().copied().collect();
491
492 let mut tracked_nodes: BTreeMap<InOrderIndex, Word> = BTreeMap::new();
493 for &block_num in &tracked_blocks {
494 let header = &block_headers[block_num];
495 tracked_nodes.insert(InOrderIndex::from_leaf_pos(block_num), header.commitment());
496
497 let proof = mmr.open(block_num).expect("valid proof");
498 let mut idx = InOrderIndex::from_leaf_pos(block_num);
499 for node in proof.merkle_path().nodes() {
500 tracked_nodes.insert(idx.sibling(), *node);
501 idx = idx.parent();
502 }
503 }
504 let tracked_nodes: Vec<(InOrderIndex, Word)> = tracked_nodes.into_iter().collect();
505
506 let peaks_by_block: Vec<MmrPeaks> = (0..TOTAL_BLOCKS)
507 .map(|block_num| mmr.peaks_at(Forest::new(block_num)).expect("valid peaks"))
508 .collect();
509
510 store
512 .interact_with_connection(move |conn| {
513 let tx = conn.transaction().unwrap();
514 for block_num in 0..TOTAL_BLOCKS {
515 let has_notes = tracked_set.contains(&block_num);
516 SqliteStore::insert_block_header_tx(
517 &tx,
518 &block_headers[block_num],
519 &peaks_by_block[block_num],
520 has_notes,
521 )
522 .unwrap();
523 }
524
525 SqliteStore::insert_partial_blockchain_nodes_tx(&tx, &tracked_nodes).unwrap();
526 tx.commit().unwrap();
527 Ok(())
528 })
529 .await
530 .unwrap();
531
532 let prune_heights = [
533 TOTAL_BLOCKS / 5,
534 (TOTAL_BLOCKS * 2) / 5,
535 (TOTAL_BLOCKS * 3) / 5,
536 TOTAL_BLOCKS - 1,
537 ];
538
539 let mut previous_remaining: Option<i64> = None;
543 for height in prune_heights {
544 let height_i64 = i64::try_from(height).expect("fits in i64");
545
546 store
548 .interact_with_connection(move |conn| {
549 conn.execute("UPDATE state_sync SET block_num = ?", params![height_i64])
550 .unwrap();
551 Ok(())
552 })
553 .await
554 .unwrap();
555
556 store.prune_irrelevant_blocks().await.unwrap();
558
559 let remaining_headers: i64 = store
561 .interact_with_connection(|conn| {
562 let count = conn
563 .query_row("SELECT COUNT(*) FROM block_headers", [], |row| row.get(0))
564 .unwrap();
565 Ok(count)
566 })
567 .await
568 .unwrap();
569 if let Some(previous) = previous_remaining {
570 assert!(remaining_headers < previous);
571 } else {
572 assert!(remaining_headers < i64::try_from(TOTAL_BLOCKS).unwrap());
573 }
574 previous_remaining = Some(remaining_headers);
575 }
576
577 let partial_mmr = Store::get_current_partial_mmr(&store).await.unwrap();
579 assert_eq!(partial_mmr.peaks().hash_peaks(), mmr.peaks().hash_peaks());
580
581 for block_num in tracked_blocks {
582 let partial_proof = partial_mmr.open(block_num).expect("partial mmr query succeeds");
583 assert!(partial_proof.is_some());
584 assert_eq!(
585 partial_proof.unwrap().merkle_path(),
586 mmr.open(block_num).unwrap().merkle_path()
587 );
588 }
589 }
590}