Skip to main content

miden_client_sqlite_store/
chain_data.rs

1#![allow(clippy::items_after_statements)]
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::num::NonZeroUsize;
5use std::rc::Rc;
6use std::vec::Vec;
7
8use miden_client::Word;
9use miden_client::block::BlockHeader;
10use miden_client::crypto::{Forest, InOrderIndex, MmrPeaks};
11use miden_client::note::BlockNumber;
12use miden_client::store::{BlockRelevance, PartialBlockchainFilter, StoreError};
13use miden_client::utils::{Deserializable, Serializable};
14use rusqlite::types::Value;
15use rusqlite::{Connection, OptionalExtension, Transaction, params, params_from_iter};
16
17use super::SqliteStore;
18use crate::sql_error::SqlResultExt;
19use crate::{insert_sql, subst};
20
21struct SerializedBlockHeaderData {
22    block_num: u32,
23    header: Vec<u8>,
24    partial_blockchain_peaks: Vec<u8>,
25    has_client_notes: bool,
26}
27struct SerializedBlockHeaderParts {
28    _block_num: u64,
29    header: Vec<u8>,
30    _partial_blockchain_peaks: Vec<u8>,
31    has_client_notes: bool,
32}
33
34struct SerializedPartialBlockchainNodeData {
35    id: i64,
36    node: String,
37}
38struct SerializedPartialBlockchainNodeParts {
39    id: u64,
40    node: String,
41}
42
43impl SqliteStore {
44    pub(crate) fn insert_block_header(
45        conn: &mut Connection,
46        block_header: &BlockHeader,
47        partial_blockchain_peaks: &MmrPeaks,
48        has_client_notes: bool,
49    ) -> Result<(), StoreError> {
50        let tx = conn.transaction().into_store_error()?;
51
52        Self::insert_block_header_tx(
53            &tx,
54            block_header,
55            partial_blockchain_peaks,
56            has_client_notes,
57        )?;
58
59        tx.commit().into_store_error()?;
60        Ok(())
61    }
62
63    pub(crate) fn get_block_headers(
64        conn: &mut Connection,
65        block_numbers: &BTreeSet<BlockNumber>,
66    ) -> Result<Vec<(BlockHeader, BlockRelevance)>, StoreError> {
67        let block_number_list = block_numbers
68            .iter()
69            .map(|block_number| Value::Integer(i64::from(block_number.as_u32())))
70            .collect::<Vec<Value>>();
71
72        const QUERY: &str = "SELECT block_num, header, partial_blockchain_peaks, has_client_notes FROM block_headers WHERE block_num IN rarray(?)";
73
74        conn.prepare(QUERY)
75            .into_store_error()?
76            .query_map(params![Rc::new(block_number_list)], parse_block_headers_columns)
77            .into_store_error()?
78            .map(|result| {
79                let serialized_block_header_parts: SerializedBlockHeaderParts =
80                    result.into_store_error()?;
81                parse_block_header(&serialized_block_header_parts)
82            })
83            .collect()
84    }
85
86    pub(crate) fn get_tracked_block_headers(
87        conn: &mut Connection,
88    ) -> Result<Vec<BlockHeader>, StoreError> {
89        const QUERY: &str = "SELECT block_num, header, partial_blockchain_peaks, has_client_notes FROM block_headers WHERE has_client_notes=true";
90        conn.prepare(QUERY)
91            .into_store_error()?
92            .query_map(params![], parse_block_headers_columns)
93            .into_store_error()?
94            .map(|result| {
95                let serialized_block_header_parts: SerializedBlockHeaderParts =
96                    result.into_store_error()?;
97                parse_block_header(&serialized_block_header_parts).map(|(block, _)| block)
98            })
99            .collect()
100    }
101
102    pub(crate) fn get_tracked_block_header_numbers(
103        conn: &mut Connection,
104    ) -> Result<BTreeSet<usize>, StoreError> {
105        const QUERY: &str = "SELECT block_num FROM block_headers WHERE has_client_notes=true";
106        conn.prepare(QUERY)
107            .into_store_error()?
108            .query_map(params![], |row| row.get::<_, u32>(0))
109            .into_store_error()?
110            .map(|result| {
111                let block_num: u32 = result.into_store_error()?;
112                Ok(block_num as usize)
113            })
114            .collect()
115    }
116
117    pub(crate) fn get_partial_blockchain_nodes(
118        conn: &mut Connection,
119        filter: &PartialBlockchainFilter,
120    ) -> Result<BTreeMap<InOrderIndex, Word>, StoreError> {
121        match filter {
122            PartialBlockchainFilter::All => query_partial_blockchain_nodes(
123                conn,
124                "SELECT id, node FROM partial_blockchain_nodes",
125                params![],
126            ),
127
128            PartialBlockchainFilter::List(ids) if ids.is_empty() => Ok(BTreeMap::new()),
129            PartialBlockchainFilter::List(ids) => {
130                let id_values = ids
131                    .iter()
132                    .map(|id| Value::Integer(i64::try_from(id.inner()).expect("id is a valid i64")))
133                    .collect::<Vec<_>>();
134
135                query_partial_blockchain_nodes(
136                    conn,
137                    "SELECT id, node FROM partial_blockchain_nodes WHERE id IN rarray(?)",
138                    params_from_iter([Rc::new(id_values)]),
139                )
140            },
141
142            PartialBlockchainFilter::Forest(forest) if forest.is_empty() => Ok(BTreeMap::new()),
143            PartialBlockchainFilter::Forest(forest) => {
144                let max_index = i64::try_from(forest.rightmost_in_order_index().inner())
145                    .expect("id is a valid i64");
146
147                query_partial_blockchain_nodes(
148                    conn,
149                    "SELECT id, node FROM partial_blockchain_nodes WHERE id <= ?",
150                    params![max_index],
151                )
152            },
153        }
154    }
155
156    pub(crate) fn get_partial_blockchain_peaks_by_block_num(
157        conn: &mut Connection,
158        block_num: BlockNumber,
159    ) -> Result<MmrPeaks, StoreError> {
160        const QUERY: &str =
161            "SELECT partial_blockchain_peaks FROM block_headers WHERE block_num = ?";
162
163        let partial_blockchain_peaks: Option<Vec<u8>> = conn
164            .prepare(QUERY)
165            .into_store_error()?
166            .query_row(params![block_num.as_u32()], |row| row.get::<_, Vec<u8>>(0))
167            .optional()
168            .into_store_error()?;
169
170        if let Some(partial_blockchain_peaks) = partial_blockchain_peaks {
171            return parse_partial_blockchain_peaks(block_num.as_u32(), &partial_blockchain_peaks);
172        }
173
174        Ok(MmrPeaks::new(Forest::empty(), vec![])?)
175    }
176
177    pub fn insert_partial_blockchain_nodes(
178        conn: &mut Connection,
179        nodes: &[(InOrderIndex, Word)],
180    ) -> Result<(), StoreError> {
181        let tx = conn.transaction().into_store_error()?;
182
183        Self::insert_partial_blockchain_nodes_tx(&tx, nodes)?;
184        tx.commit().into_store_error()?;
185        Ok(())
186    }
187
188    /// Inserts a list of MMR authentication nodes to the Partial Blockchain nodes table.
189    pub(crate) fn insert_partial_blockchain_nodes_tx(
190        tx: &Transaction<'_>,
191        nodes: &[(InOrderIndex, Word)],
192    ) -> Result<(), StoreError> {
193        for (index, node) in nodes {
194            insert_partial_blockchain_node(tx, *index, *node)?;
195        }
196        Ok(())
197    }
198
199    /// Inserts a block header using a [`rusqlite::Transaction`].
200    ///
201    /// If the block header exists and `has_client_notes` is `true` then the `has_client_notes`
202    /// column is updated to `true` to signify that the block now contains a relevant note.
203    pub(crate) fn insert_block_header_tx(
204        tx: &Transaction<'_>,
205        block_header: &BlockHeader,
206        partial_blockchain_peaks: &MmrPeaks,
207        has_client_notes: bool,
208    ) -> Result<(), StoreError> {
209        let partial_blockchain_peaks = partial_blockchain_peaks.peaks().to_vec();
210        let SerializedBlockHeaderData {
211            block_num,
212            header,
213            partial_blockchain_peaks,
214            has_client_notes,
215        } = serialize_block_header(block_header, &partial_blockchain_peaks, has_client_notes);
216        const QUERY: &str = insert_sql!(
217            block_headers {
218                block_num,
219                header,
220                partial_blockchain_peaks,
221                has_client_notes,
222            } | IGNORE
223        );
224        tx.execute(QUERY, params![block_num, header, partial_blockchain_peaks, has_client_notes])
225            .into_store_error()?;
226
227        set_block_header_has_client_notes(tx, u64::from(block_num), has_client_notes)?;
228        Ok(())
229    }
230
231    /// Removes block headers that do not contain any client notes and aren't the genesis or last
232    /// block.
233    pub fn prune_irrelevant_blocks(conn: &mut Connection) -> Result<(), StoreError> {
234        let tx = conn.transaction().into_store_error()?;
235        let genesis: u32 = BlockNumber::GENESIS.as_u32();
236
237        let sync_block: Option<u32> = tx
238            .query_row("SELECT block_num FROM state_sync LIMIT 1", [], |r| r.get(0))
239            .optional()
240            .into_store_error()?;
241
242        if let Some(sync_height) = sync_block {
243            tx.execute(
244                r"
245            DELETE FROM block_headers
246            WHERE has_client_notes = 0
247              AND block_num > ?1
248              AND block_num < ?2
249            ",
250                rusqlite::params![genesis, sync_height],
251            )
252            .into_store_error()?;
253        }
254
255        tx.commit().into_store_error()
256    }
257}
258
259// HELPERS
260// ================================================================================================
261
262/// Inserts a node represented by its in-order index and the node value.
263fn insert_partial_blockchain_node(
264    tx: &Transaction<'_>,
265    id: InOrderIndex,
266    node: Word,
267) -> Result<(), StoreError> {
268    let SerializedPartialBlockchainNodeData { id, node } =
269        serialize_partial_blockchain_node(id, node);
270    const QUERY: &str = insert_sql!(partial_blockchain_nodes { id, node } | IGNORE);
271    tx.execute(QUERY, params![id, node]).into_store_error()?;
272    Ok(())
273}
274
275fn query_partial_blockchain_nodes<P: rusqlite::Params>(
276    conn: &mut Connection,
277    sql: &str,
278    params: P,
279) -> Result<BTreeMap<InOrderIndex, Word>, StoreError> {
280    let mut stmt = conn.prepare_cached(sql).into_store_error()?;
281
282    stmt.query_map(params, parse_partial_blockchain_nodes_columns)
283        .into_store_error()?
284        .map(|row_res| {
285            let parts: SerializedPartialBlockchainNodeParts = row_res.into_store_error()?;
286            parse_partial_blockchain_nodes(&parts)
287        })
288        .collect()
289}
290
291fn parse_partial_blockchain_peaks(forest: u32, peaks_nodes: &[u8]) -> Result<MmrPeaks, StoreError> {
292    let mmr_peaks_nodes = Vec::<Word>::read_from_bytes(peaks_nodes)?;
293
294    MmrPeaks::new(
295        Forest::new(usize::try_from(forest).expect("u64 should fit in usize")),
296        mmr_peaks_nodes,
297    )
298    .map_err(StoreError::MmrError)
299}
300
301fn serialize_block_header(
302    block_header: &BlockHeader,
303    partial_blockchain_peaks: &[Word],
304    has_client_notes: bool,
305) -> SerializedBlockHeaderData {
306    let block_num = block_header.block_num();
307    let header = block_header.to_bytes();
308    let partial_blockchain_peaks = partial_blockchain_peaks.to_bytes();
309
310    SerializedBlockHeaderData {
311        block_num: block_num.as_u32(),
312        header,
313        partial_blockchain_peaks,
314        has_client_notes,
315    }
316}
317
318fn parse_block_headers_columns(
319    row: &rusqlite::Row<'_>,
320) -> Result<SerializedBlockHeaderParts, rusqlite::Error> {
321    let block_num: u32 = row.get(0)?;
322    let header: Vec<u8> = row.get(1)?;
323    let partial_blockchain_peaks: Vec<u8> = row.get(2)?;
324    let has_client_notes: bool = row.get(3)?;
325
326    Ok(SerializedBlockHeaderParts {
327        _block_num: u64::from(block_num),
328        header,
329        _partial_blockchain_peaks: partial_blockchain_peaks,
330        has_client_notes,
331    })
332}
333
334fn parse_block_header(
335    serialized_block_header_parts: &SerializedBlockHeaderParts,
336) -> Result<(BlockHeader, BlockRelevance), StoreError> {
337    Ok((
338        BlockHeader::read_from_bytes(&serialized_block_header_parts.header)?,
339        serialized_block_header_parts.has_client_notes.into(),
340    ))
341}
342
343fn serialize_partial_blockchain_node(
344    id: InOrderIndex,
345    node: Word,
346) -> SerializedPartialBlockchainNodeData {
347    let id = i64::try_from(id.inner()).expect("id is a valid i64");
348    let node = node.to_hex();
349    SerializedPartialBlockchainNodeData { id, node }
350}
351
352fn parse_partial_blockchain_nodes_columns(
353    row: &rusqlite::Row<'_>,
354) -> Result<SerializedPartialBlockchainNodeParts, rusqlite::Error> {
355    let id: u64 = row.get(0)?;
356    let node = row.get(1)?;
357    Ok(SerializedPartialBlockchainNodeParts { id, node })
358}
359
360fn parse_partial_blockchain_nodes(
361    serialized_partial_blockchain_node_parts: &SerializedPartialBlockchainNodeParts,
362) -> Result<(InOrderIndex, Word), StoreError> {
363    let id = InOrderIndex::new(
364        NonZeroUsize::new(
365            usize::try_from(serialized_partial_blockchain_node_parts.id)
366                .expect("id is u64, should not fail"),
367        )
368        .unwrap(),
369    );
370    let node: Word = Word::try_from(&serialized_partial_blockchain_node_parts.node)?;
371    Ok((id, node))
372}
373
374pub(crate) fn set_block_header_has_client_notes(
375    tx: &Transaction<'_>,
376    block_num: u64,
377    has_client_notes: bool,
378) -> Result<(), StoreError> {
379    // Only update to change has_client_notes to true if it was false previously
380    const QUERY: &str = "\
381        UPDATE block_headers
382        SET has_client_notes=?
383        WHERE block_num=? AND has_client_notes=FALSE;";
384    tx.execute(QUERY, params![has_client_notes, block_num]).into_store_error()?;
385    Ok(())
386}
387
388#[cfg(test)]
389mod test {
390    use std::collections::{BTreeMap, BTreeSet};
391    use std::vec::Vec;
392
393    use miden_client::Word;
394    use miden_client::block::BlockHeader;
395    use miden_client::crypto::{Forest, InOrderIndex, MmrPeaks};
396    use miden_client::store::Store;
397    use miden_protocol::crypto::merkle::mmr::Mmr;
398    use miden_protocol::transaction::TransactionKernel;
399    use rusqlite::params;
400
401    use crate::SqliteStore;
402    use crate::tests::create_test_store;
403
404    async fn insert_dummy_block_headers(store: &mut SqliteStore) -> Vec<BlockHeader> {
405        let block_headers: Vec<BlockHeader> = (0..5)
406            .map(|block_num| {
407                BlockHeader::mock(block_num, None, None, &[], TransactionKernel.to_commitment())
408            })
409            .collect();
410
411        let block_headers_clone = block_headers.clone();
412        store
413            .interact_with_connection(move |conn| {
414                let tx = conn.transaction().unwrap();
415                let dummy_peaks = MmrPeaks::new(Forest::empty(), Vec::new()).unwrap();
416                (0..5).for_each(|block_num| {
417                    SqliteStore::insert_block_header_tx(
418                        &tx,
419                        &block_headers_clone[block_num],
420                        &dummy_peaks,
421                        false,
422                    )
423                    .unwrap();
424                });
425                tx.commit().unwrap();
426                Ok(())
427            })
428            .await
429            .unwrap();
430
431        block_headers
432    }
433
434    #[tokio::test]
435    async fn insert_and_get_block_headers_by_number() {
436        let mut store = create_test_store().await;
437        let block_headers = insert_dummy_block_headers(&mut store).await;
438
439        let block_header = Store::get_block_header_by_num(&store, 3.into()).await.unwrap().unwrap();
440        assert_eq!(block_headers[3], block_header.0);
441    }
442
443    #[tokio::test]
444    async fn insert_and_get_block_headers_by_list() {
445        let mut store = create_test_store().await;
446        let mock_block_headers = insert_dummy_block_headers(&mut store).await;
447
448        let block_headers: Vec<BlockHeader> =
449            Store::get_block_headers(&store, &[1.into(), 3.into()].into_iter().collect())
450                .await
451                .unwrap()
452                .into_iter()
453                .map(|(block_header, _has_notes)| block_header)
454                .collect();
455        assert_eq!(
456            &[mock_block_headers[1].clone(), mock_block_headers[3].clone()],
457            &block_headers[..]
458        );
459    }
460
461    /// Tests that large stored MMRs are built consistently throughout multiple prunes
462    #[tokio::test]
463    async fn partial_mmr_reconstructs_after_multiple_prune() {
464        // Setup (mock a large MMR to work with, with a partial tracked set)
465        // ----------------------------------------------------------------------------------------
466
467        let store = create_test_store().await;
468        const TOTAL_BLOCKS: usize = 7300;
469
470        let tx_kernel_commitment = TransactionKernel.to_commitment();
471        let block_headers: Vec<BlockHeader> = (0..TOTAL_BLOCKS)
472            .map(|block_num| {
473                BlockHeader::mock(
474                    u32::try_from(block_num).unwrap(),
475                    None,
476                    None,
477                    &[],
478                    tx_kernel_commitment,
479                )
480            })
481            .collect();
482
483        let mut mmr = Mmr::default();
484        for header in &block_headers {
485            mmr.add(header.commitment());
486        }
487
488        let mut tracked_set: BTreeSet<usize> = (0..(TOTAL_BLOCKS - 1)).step_by(97).collect();
489        tracked_set.insert(TOTAL_BLOCKS - 2);
490        let tracked_blocks: Vec<usize> = tracked_set.iter().copied().collect();
491
492        let mut tracked_nodes: BTreeMap<InOrderIndex, Word> = BTreeMap::new();
493        for &block_num in &tracked_blocks {
494            let header = &block_headers[block_num];
495            tracked_nodes.insert(InOrderIndex::from_leaf_pos(block_num), header.commitment());
496
497            let proof = mmr.open(block_num).expect("valid proof");
498            let mut idx = InOrderIndex::from_leaf_pos(block_num);
499            for node in proof.merkle_path().nodes() {
500                tracked_nodes.insert(idx.sibling(), *node);
501                idx = idx.parent();
502            }
503        }
504        let tracked_nodes: Vec<(InOrderIndex, Word)> = tracked_nodes.into_iter().collect();
505
506        let peaks_by_block: Vec<MmrPeaks> = (0..TOTAL_BLOCKS)
507            .map(|block_num| mmr.peaks_at(Forest::new(block_num)).expect("valid peaks"))
508            .collect();
509
510        // Save blocks and nodes
511        store
512            .interact_with_connection(move |conn| {
513                let tx = conn.transaction().unwrap();
514                for block_num in 0..TOTAL_BLOCKS {
515                    let has_notes = tracked_set.contains(&block_num);
516                    SqliteStore::insert_block_header_tx(
517                        &tx,
518                        &block_headers[block_num],
519                        &peaks_by_block[block_num],
520                        has_notes,
521                    )
522                    .unwrap();
523                }
524
525                SqliteStore::insert_partial_blockchain_nodes_tx(&tx, &tracked_nodes).unwrap();
526                tx.commit().unwrap();
527                Ok(())
528            })
529            .await
530            .unwrap();
531
532        let prune_heights = [
533            TOTAL_BLOCKS / 5,
534            (TOTAL_BLOCKS * 2) / 5,
535            (TOTAL_BLOCKS * 3) / 5,
536            TOTAL_BLOCKS - 1,
537        ];
538
539        // Tests/assertions
540        // ----------------------------------------------------------------------------------------
541
542        let mut previous_remaining: Option<i64> = None;
543        for height in prune_heights {
544            let height_i64 = i64::try_from(height).expect("fits in i64");
545
546            // Update sync height to simulate having synced further
547            store
548                .interact_with_connection(move |conn| {
549                    conn.execute("UPDATE state_sync SET block_num = ?", params![height_i64])
550                        .unwrap();
551                    Ok(())
552                })
553                .await
554                .unwrap();
555
556            // Prune
557            store.prune_irrelevant_blocks().await.unwrap();
558
559            // Assert blocks
560            let remaining_headers: i64 = store
561                .interact_with_connection(|conn| {
562                    let count = conn
563                        .query_row("SELECT COUNT(*) FROM block_headers", [], |row| row.get(0))
564                        .unwrap();
565                    Ok(count)
566                })
567                .await
568                .unwrap();
569            if let Some(previous) = previous_remaining {
570                assert!(remaining_headers < previous);
571            } else {
572                assert!(remaining_headers < i64::try_from(TOTAL_BLOCKS).unwrap());
573            }
574            previous_remaining = Some(remaining_headers);
575        }
576
577        // Try build MMR
578        let partial_mmr = Store::get_current_partial_mmr(&store).await.unwrap();
579        assert_eq!(partial_mmr.peaks().hash_peaks(), mmr.peaks().hash_peaks());
580
581        for block_num in tracked_blocks {
582            let partial_proof = partial_mmr.open(block_num).expect("partial mmr query succeeds");
583            assert!(partial_proof.is_some());
584            assert_eq!(
585                partial_proof.unwrap().merkle_path(),
586                mmr.open(block_num).unwrap().merkle_path()
587            );
588        }
589    }
590}