Skip to main content

miden_client_sqlite_store/
chain_data.rs

1#![allow(clippy::items_after_statements)]
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::num::NonZeroUsize;
5use std::rc::Rc;
6use std::vec::Vec;
7
8use miden_client::Word;
9use miden_client::block::BlockHeader;
10use miden_client::crypto::{Forest, InOrderIndex, MmrPeaks};
11use miden_client::note::BlockNumber;
12use miden_client::store::{BlockRelevance, PartialBlockchainFilter, StoreError};
13use miden_client::utils::{Deserializable, Serializable};
14use rusqlite::types::Value;
15use rusqlite::{Connection, OptionalExtension, Transaction, params, params_from_iter};
16
17use super::SqliteStore;
18use crate::sql_error::SqlResultExt;
19use crate::{insert_sql, subst};
20
21struct SerializedBlockHeaderData {
22    block_num: u32,
23    header: Vec<u8>,
24    partial_blockchain_peaks: Vec<u8>,
25    has_client_notes: bool,
26}
27struct SerializedBlockHeaderParts {
28    _block_num: u64,
29    header: Vec<u8>,
30    _partial_blockchain_peaks: Vec<u8>,
31    has_client_notes: bool,
32}
33
34struct SerializedPartialBlockchainNodeData {
35    id: i64,
36    node: String,
37}
38struct SerializedPartialBlockchainNodeParts {
39    id: u64,
40    node: String,
41}
42
43impl SqliteStore {
44    pub(crate) fn insert_block_header(
45        conn: &mut Connection,
46        block_header: &BlockHeader,
47        partial_blockchain_peaks: &MmrPeaks,
48        has_client_notes: bool,
49    ) -> Result<(), StoreError> {
50        let tx = conn.transaction().into_store_error()?;
51
52        Self::insert_block_header_tx(
53            &tx,
54            block_header,
55            partial_blockchain_peaks,
56            has_client_notes,
57        )?;
58
59        tx.commit().into_store_error()?;
60        Ok(())
61    }
62
63    pub(crate) fn get_block_headers(
64        conn: &mut Connection,
65        block_numbers: &BTreeSet<BlockNumber>,
66    ) -> Result<Vec<(BlockHeader, BlockRelevance)>, StoreError> {
67        let block_number_list = block_numbers
68            .iter()
69            .map(|block_number| Value::Integer(i64::from(block_number.as_u32())))
70            .collect::<Vec<Value>>();
71
72        const QUERY: &str = "SELECT block_num, header, partial_blockchain_peaks, has_client_notes FROM block_headers WHERE block_num IN rarray(?)";
73
74        conn.prepare(QUERY)
75            .into_store_error()?
76            .query_map(params![Rc::new(block_number_list)], parse_block_headers_columns)
77            .into_store_error()?
78            .map(|result| {
79                let serialized_block_header_parts: SerializedBlockHeaderParts =
80                    result.into_store_error()?;
81                parse_block_header(&serialized_block_header_parts)
82            })
83            .collect()
84    }
85
86    pub(crate) fn get_tracked_block_headers(
87        conn: &mut Connection,
88    ) -> Result<Vec<BlockHeader>, StoreError> {
89        const QUERY: &str = "SELECT block_num, header, partial_blockchain_peaks, has_client_notes FROM block_headers WHERE has_client_notes=true";
90        conn.prepare(QUERY)
91            .into_store_error()?
92            .query_map(params![], parse_block_headers_columns)
93            .into_store_error()?
94            .map(|result| {
95                let serialized_block_header_parts: SerializedBlockHeaderParts =
96                    result.into_store_error()?;
97                parse_block_header(&serialized_block_header_parts).map(|(block, _)| block)
98            })
99            .collect()
100    }
101
102    pub(crate) fn get_partial_blockchain_nodes(
103        conn: &mut Connection,
104        filter: &PartialBlockchainFilter,
105    ) -> Result<BTreeMap<InOrderIndex, Word>, StoreError> {
106        match filter {
107            PartialBlockchainFilter::All => query_partial_blockchain_nodes(
108                conn,
109                "SELECT id, node FROM partial_blockchain_nodes",
110                params![],
111            ),
112
113            PartialBlockchainFilter::List(ids) if ids.is_empty() => Ok(BTreeMap::new()),
114            PartialBlockchainFilter::List(ids) => {
115                let id_values = ids
116                    .iter()
117                    .map(|id| Value::Integer(i64::try_from(id.inner()).expect("id is a valid i64")))
118                    .collect::<Vec<_>>();
119
120                query_partial_blockchain_nodes(
121                    conn,
122                    "SELECT id, node FROM partial_blockchain_nodes WHERE id IN rarray(?)",
123                    params_from_iter([Rc::new(id_values)]),
124                )
125            },
126
127            PartialBlockchainFilter::Forest(forest) if forest.is_empty() => Ok(BTreeMap::new()),
128            PartialBlockchainFilter::Forest(forest) => {
129                let max_index = i64::try_from(forest.rightmost_in_order_index().inner())
130                    .expect("id is a valid i64");
131
132                query_partial_blockchain_nodes(
133                    conn,
134                    "SELECT id, node FROM partial_blockchain_nodes WHERE id <= ?",
135                    params![max_index],
136                )
137            },
138        }
139    }
140
141    pub(crate) fn get_partial_blockchain_peaks_by_block_num(
142        conn: &mut Connection,
143        block_num: BlockNumber,
144    ) -> Result<MmrPeaks, StoreError> {
145        const QUERY: &str =
146            "SELECT partial_blockchain_peaks FROM block_headers WHERE block_num = ?";
147
148        let partial_blockchain_peaks: Option<Vec<u8>> = conn
149            .prepare(QUERY)
150            .into_store_error()?
151            .query_row(params![block_num.as_u32()], |row| row.get::<_, Vec<u8>>(0))
152            .optional()
153            .into_store_error()?;
154
155        if let Some(partial_blockchain_peaks) = partial_blockchain_peaks {
156            return parse_partial_blockchain_peaks(block_num.as_u32(), &partial_blockchain_peaks);
157        }
158
159        Ok(MmrPeaks::new(Forest::empty(), vec![])?)
160    }
161
162    pub fn insert_partial_blockchain_nodes(
163        conn: &mut Connection,
164        nodes: &[(InOrderIndex, Word)],
165    ) -> Result<(), StoreError> {
166        let tx = conn.transaction().into_store_error()?;
167
168        Self::insert_partial_blockchain_nodes_tx(&tx, nodes)?;
169        tx.commit().into_store_error()?;
170        Ok(())
171    }
172
173    /// Inserts a list of MMR authentication nodes to the Partial Blockchain nodes table.
174    pub(crate) fn insert_partial_blockchain_nodes_tx(
175        tx: &Transaction<'_>,
176        nodes: &[(InOrderIndex, Word)],
177    ) -> Result<(), StoreError> {
178        for (index, node) in nodes {
179            insert_partial_blockchain_node(tx, *index, *node)?;
180        }
181        Ok(())
182    }
183
184    /// Inserts a block header using a [`rusqlite::Transaction`].
185    ///
186    /// If the block header exists and `has_client_notes` is `true` then the `has_client_notes`
187    /// column is updated to `true` to signify that the block now contains a relevant note.
188    pub(crate) fn insert_block_header_tx(
189        tx: &Transaction<'_>,
190        block_header: &BlockHeader,
191        partial_blockchain_peaks: &MmrPeaks,
192        has_client_notes: bool,
193    ) -> Result<(), StoreError> {
194        let partial_blockchain_peaks = partial_blockchain_peaks.peaks().to_vec();
195        let SerializedBlockHeaderData {
196            block_num,
197            header,
198            partial_blockchain_peaks,
199            has_client_notes,
200        } = serialize_block_header(block_header, &partial_blockchain_peaks, has_client_notes);
201        const QUERY: &str = insert_sql!(
202            block_headers {
203                block_num,
204                header,
205                partial_blockchain_peaks,
206                has_client_notes,
207            } | IGNORE
208        );
209        tx.execute(QUERY, params![block_num, header, partial_blockchain_peaks, has_client_notes])
210            .into_store_error()?;
211
212        set_block_header_has_client_notes(tx, u64::from(block_num), has_client_notes)?;
213        Ok(())
214    }
215
216    /// Removes block headers that do not contain any client notes and aren't the genesis or last
217    /// block.
218    pub fn prune_irrelevant_blocks(conn: &mut Connection) -> Result<(), StoreError> {
219        let tx = conn.transaction().into_store_error()?;
220        let genesis: u32 = BlockNumber::GENESIS.as_u32();
221
222        let sync_block: Option<u32> = tx
223            .query_row("SELECT block_num FROM state_sync LIMIT 1", [], |r| r.get(0))
224            .optional()
225            .into_store_error()?;
226
227        if let Some(sync_height) = sync_block {
228            tx.execute(
229                r"
230            DELETE FROM block_headers
231            WHERE has_client_notes = 0
232              AND block_num > ?1
233              AND block_num < ?2
234            ",
235                rusqlite::params![genesis, sync_height],
236            )
237            .into_store_error()?;
238        }
239
240        tx.commit().into_store_error()
241    }
242}
243
244// HELPERS
245// ================================================================================================
246
247/// Inserts a node represented by its in-order index and the node value.
248fn insert_partial_blockchain_node(
249    tx: &Transaction<'_>,
250    id: InOrderIndex,
251    node: Word,
252) -> Result<(), StoreError> {
253    let SerializedPartialBlockchainNodeData { id, node } =
254        serialize_partial_blockchain_node(id, node);
255    const QUERY: &str = insert_sql!(partial_blockchain_nodes { id, node } | IGNORE);
256    tx.execute(QUERY, params![id, node]).into_store_error()?;
257    Ok(())
258}
259
260fn query_partial_blockchain_nodes<P: rusqlite::Params>(
261    conn: &mut Connection,
262    sql: &str,
263    params: P,
264) -> Result<BTreeMap<InOrderIndex, Word>, StoreError> {
265    let mut stmt = conn.prepare_cached(sql).into_store_error()?;
266
267    stmt.query_map(params, parse_partial_blockchain_nodes_columns)
268        .into_store_error()?
269        .map(|row_res| {
270            let parts: SerializedPartialBlockchainNodeParts = row_res.into_store_error()?;
271            parse_partial_blockchain_nodes(&parts)
272        })
273        .collect()
274}
275
276fn parse_partial_blockchain_peaks(forest: u32, peaks_nodes: &[u8]) -> Result<MmrPeaks, StoreError> {
277    let mmr_peaks_nodes = Vec::<Word>::read_from_bytes(peaks_nodes)?;
278
279    MmrPeaks::new(
280        Forest::new(usize::try_from(forest).expect("u64 should fit in usize")),
281        mmr_peaks_nodes,
282    )
283    .map_err(StoreError::MmrError)
284}
285
286fn serialize_block_header(
287    block_header: &BlockHeader,
288    partial_blockchain_peaks: &[Word],
289    has_client_notes: bool,
290) -> SerializedBlockHeaderData {
291    let block_num = block_header.block_num();
292    let header = block_header.to_bytes();
293    let partial_blockchain_peaks = partial_blockchain_peaks.to_bytes();
294
295    SerializedBlockHeaderData {
296        block_num: block_num.as_u32(),
297        header,
298        partial_blockchain_peaks,
299        has_client_notes,
300    }
301}
302
303fn parse_block_headers_columns(
304    row: &rusqlite::Row<'_>,
305) -> Result<SerializedBlockHeaderParts, rusqlite::Error> {
306    let block_num: u32 = row.get(0)?;
307    let header: Vec<u8> = row.get(1)?;
308    let partial_blockchain_peaks: Vec<u8> = row.get(2)?;
309    let has_client_notes: bool = row.get(3)?;
310
311    Ok(SerializedBlockHeaderParts {
312        _block_num: u64::from(block_num),
313        header,
314        _partial_blockchain_peaks: partial_blockchain_peaks,
315        has_client_notes,
316    })
317}
318
319fn parse_block_header(
320    serialized_block_header_parts: &SerializedBlockHeaderParts,
321) -> Result<(BlockHeader, BlockRelevance), StoreError> {
322    Ok((
323        BlockHeader::read_from_bytes(&serialized_block_header_parts.header)?,
324        serialized_block_header_parts.has_client_notes.into(),
325    ))
326}
327
328fn serialize_partial_blockchain_node(
329    id: InOrderIndex,
330    node: Word,
331) -> SerializedPartialBlockchainNodeData {
332    let id = i64::try_from(id.inner()).expect("id is a valid i64");
333    let node = node.to_hex();
334    SerializedPartialBlockchainNodeData { id, node }
335}
336
337fn parse_partial_blockchain_nodes_columns(
338    row: &rusqlite::Row<'_>,
339) -> Result<SerializedPartialBlockchainNodeParts, rusqlite::Error> {
340    let id: u64 = row.get(0)?;
341    let node = row.get(1)?;
342    Ok(SerializedPartialBlockchainNodeParts { id, node })
343}
344
345fn parse_partial_blockchain_nodes(
346    serialized_partial_blockchain_node_parts: &SerializedPartialBlockchainNodeParts,
347) -> Result<(InOrderIndex, Word), StoreError> {
348    let id = InOrderIndex::new(
349        NonZeroUsize::new(
350            usize::try_from(serialized_partial_blockchain_node_parts.id)
351                .expect("id is u64, should not fail"),
352        )
353        .unwrap(),
354    );
355    let node: Word = Word::try_from(&serialized_partial_blockchain_node_parts.node)?;
356    Ok((id, node))
357}
358
359pub(crate) fn set_block_header_has_client_notes(
360    tx: &Transaction<'_>,
361    block_num: u64,
362    has_client_notes: bool,
363) -> Result<(), StoreError> {
364    // Only update to change has_client_notes to true if it was false previously
365    const QUERY: &str = "\
366        UPDATE block_headers
367        SET has_client_notes=?
368        WHERE block_num=? AND has_client_notes=FALSE;";
369    tx.execute(QUERY, params![has_client_notes, block_num]).into_store_error()?;
370    Ok(())
371}
372
373#[cfg(test)]
374mod test {
375    use std::collections::{BTreeMap, BTreeSet};
376    use std::vec::Vec;
377
378    use miden_client::Word;
379    use miden_client::block::BlockHeader;
380    use miden_client::crypto::{Forest, InOrderIndex, MmrPeaks};
381    use miden_client::store::Store;
382    use miden_protocol::crypto::merkle::mmr::Mmr;
383    use miden_protocol::transaction::TransactionKernel;
384    use rusqlite::params;
385
386    use crate::SqliteStore;
387    use crate::tests::create_test_store;
388
389    async fn insert_dummy_block_headers(store: &mut SqliteStore) -> Vec<BlockHeader> {
390        let block_headers: Vec<BlockHeader> = (0..5)
391            .map(|block_num| {
392                BlockHeader::mock(block_num, None, None, &[], TransactionKernel.to_commitment())
393            })
394            .collect();
395
396        let block_headers_clone = block_headers.clone();
397        store
398            .interact_with_connection(move |conn| {
399                let tx = conn.transaction().unwrap();
400                let dummy_peaks = MmrPeaks::new(Forest::empty(), Vec::new()).unwrap();
401                (0..5).for_each(|block_num| {
402                    SqliteStore::insert_block_header_tx(
403                        &tx,
404                        &block_headers_clone[block_num],
405                        &dummy_peaks,
406                        false,
407                    )
408                    .unwrap();
409                });
410                tx.commit().unwrap();
411                Ok(())
412            })
413            .await
414            .unwrap();
415
416        block_headers
417    }
418
419    #[tokio::test]
420    async fn insert_and_get_block_headers_by_number() {
421        let mut store = create_test_store().await;
422        let block_headers = insert_dummy_block_headers(&mut store).await;
423
424        let block_header = Store::get_block_header_by_num(&store, 3.into()).await.unwrap().unwrap();
425        assert_eq!(block_headers[3], block_header.0);
426    }
427
428    #[tokio::test]
429    async fn insert_and_get_block_headers_by_list() {
430        let mut store = create_test_store().await;
431        let mock_block_headers = insert_dummy_block_headers(&mut store).await;
432
433        let block_headers: Vec<BlockHeader> =
434            Store::get_block_headers(&store, &[1.into(), 3.into()].into_iter().collect())
435                .await
436                .unwrap()
437                .into_iter()
438                .map(|(block_header, _has_notes)| block_header)
439                .collect();
440        assert_eq!(
441            &[mock_block_headers[1].clone(), mock_block_headers[3].clone()],
442            &block_headers[..]
443        );
444    }
445
446    /// Tests that large stored MMRs are built consistently throughout multiple prunes
447    #[tokio::test]
448    async fn partial_mmr_reconstructs_after_multiple_prune() {
449        // Setup (mock a large MMR to work with, with a partial tracked set)
450        // ----------------------------------------------------------------------------------------
451
452        let store = create_test_store().await;
453        const TOTAL_BLOCKS: usize = 7300;
454
455        let tx_kernel_commitment = TransactionKernel.to_commitment();
456        let block_headers: Vec<BlockHeader> = (0..TOTAL_BLOCKS)
457            .map(|block_num| {
458                BlockHeader::mock(
459                    u32::try_from(block_num).unwrap(),
460                    None,
461                    None,
462                    &[],
463                    tx_kernel_commitment,
464                )
465            })
466            .collect();
467
468        let mut mmr = Mmr::default();
469        for header in &block_headers {
470            mmr.add(header.commitment());
471        }
472
473        let mut tracked_set: BTreeSet<usize> = (0..(TOTAL_BLOCKS - 1)).step_by(97).collect();
474        tracked_set.insert(TOTAL_BLOCKS - 2);
475        let tracked_blocks: Vec<usize> = tracked_set.iter().copied().collect();
476
477        let mut tracked_nodes: BTreeMap<InOrderIndex, Word> = BTreeMap::new();
478        for &block_num in &tracked_blocks {
479            let header = &block_headers[block_num];
480            tracked_nodes.insert(InOrderIndex::from_leaf_pos(block_num), header.commitment());
481
482            let proof = mmr.open(block_num).expect("valid proof");
483            let mut idx = InOrderIndex::from_leaf_pos(block_num);
484            for node in proof.merkle_path.nodes() {
485                tracked_nodes.insert(idx.sibling(), *node);
486                idx = idx.parent();
487            }
488        }
489        let tracked_nodes: Vec<(InOrderIndex, Word)> = tracked_nodes.into_iter().collect();
490
491        let peaks_by_block: Vec<MmrPeaks> = (0..TOTAL_BLOCKS)
492            .map(|block_num| mmr.peaks_at(Forest::new(block_num)).expect("valid peaks"))
493            .collect();
494
495        // Save blocks and nodes
496        store
497            .interact_with_connection(move |conn| {
498                let tx = conn.transaction().unwrap();
499                for block_num in 0..TOTAL_BLOCKS {
500                    let has_notes = tracked_set.contains(&block_num);
501                    SqliteStore::insert_block_header_tx(
502                        &tx,
503                        &block_headers[block_num],
504                        &peaks_by_block[block_num],
505                        has_notes,
506                    )
507                    .unwrap();
508                }
509
510                SqliteStore::insert_partial_blockchain_nodes_tx(&tx, &tracked_nodes).unwrap();
511                tx.commit().unwrap();
512                Ok(())
513            })
514            .await
515            .unwrap();
516
517        let prune_heights = [
518            TOTAL_BLOCKS / 5,
519            (TOTAL_BLOCKS * 2) / 5,
520            (TOTAL_BLOCKS * 3) / 5,
521            TOTAL_BLOCKS - 1,
522        ];
523
524        // Tests/assertions
525        // ----------------------------------------------------------------------------------------
526
527        let mut previous_remaining: Option<i64> = None;
528        for height in prune_heights {
529            let height_i64 = i64::try_from(height).expect("fits in i64");
530
531            // Update sync height to simulate having synced further
532            store
533                .interact_with_connection(move |conn| {
534                    conn.execute("UPDATE state_sync SET block_num = ?", params![height_i64])
535                        .unwrap();
536                    Ok(())
537                })
538                .await
539                .unwrap();
540
541            // Prune
542            store.prune_irrelevant_blocks().await.unwrap();
543
544            // Assert blocks
545            let remaining_headers: i64 = store
546                .interact_with_connection(|conn| {
547                    let count = conn
548                        .query_row("SELECT COUNT(*) FROM block_headers", [], |row| row.get(0))
549                        .unwrap();
550                    Ok(count)
551                })
552                .await
553                .unwrap();
554            if let Some(previous) = previous_remaining {
555                assert!(remaining_headers < previous);
556            } else {
557                assert!(remaining_headers < i64::try_from(TOTAL_BLOCKS).unwrap());
558            }
559            previous_remaining = Some(remaining_headers);
560        }
561
562        // Try build MMR
563        let partial_mmr = Store::get_current_partial_mmr(&store).await.unwrap();
564        assert_eq!(partial_mmr.peaks().hash_peaks(), mmr.peaks().hash_peaks());
565
566        for block_num in tracked_blocks {
567            let partial_proof = partial_mmr.open(block_num).expect("partial mmr query succeeds");
568            assert!(partial_proof.is_some());
569            assert_eq!(partial_proof.unwrap(), mmr.open(block_num).unwrap());
570        }
571    }
572}