miden_client_sqlite_store/
lib.rs

1//! SQLite-backed Store implementation for miden-client.
2//! This crate provides `SqliteStore` and its full implementation.
3//!
4//! [`SqliteStore`] enables the persistence of accounts, transactions, notes, block headers, and MMR
5//! nodes using an `SQLite` database.
6
7use std::boxed::Box;
8use std::collections::{BTreeMap, BTreeSet};
9use std::path::PathBuf;
10use std::string::{String, ToString};
11use std::sync::{Arc, RwLock};
12use std::vec::Vec;
13
14use db_management::pool_manager::{Pool, SqlitePoolManager};
15use db_management::utils::{
16    apply_migrations,
17    get_setting,
18    list_setting_keys,
19    remove_setting,
20    set_setting,
21};
22use miden_client::Word;
23use miden_client::account::{
24    Account,
25    AccountCode,
26    AccountHeader,
27    AccountId,
28    AccountIdPrefix,
29    AccountStorage,
30    Address,
31};
32use miden_client::asset::{Asset, AssetVault, AssetWitness};
33use miden_client::block::BlockHeader;
34use miden_client::crypto::{InOrderIndex, MerkleStore, MmrPeaks};
35use miden_client::note::{BlockNumber, NoteScript, NoteTag, Nullifier};
36use miden_client::store::{
37    AccountRecord,
38    AccountStatus,
39    BlockRelevance,
40    InputNoteRecord,
41    NoteFilter,
42    OutputNoteRecord,
43    PartialBlockchainFilter,
44    Store,
45    StoreError,
46    TransactionFilter,
47};
48use miden_client::sync::{NoteTagRecord, StateSyncUpdate};
49use miden_client::transaction::{TransactionRecord, TransactionStoreUpdate};
50use miden_objects::account::StorageMapWitness;
51use rusqlite::Connection;
52use rusqlite::types::Value;
53use sql_error::SqlResultExt;
54
55use crate::merkle_store::{insert_asset_nodes, insert_storage_map_nodes};
56
57mod account;
58mod builder;
59mod chain_data;
60mod db_management;
61mod merkle_store;
62mod note;
63mod sql_error;
64mod sync;
65mod transaction;
66
67pub use builder::ClientBuilderSqliteExt;
68
69// SQLITE STORE
70// ================================================================================================
71
72/// Represents a pool of connections with an `SQLite` database. The pool is used to interact
73/// concurrently with the underlying database in a safe and efficient manner.
74///
75/// Current table definitions can be found at `store.sql` migration file.
76pub struct SqliteStore {
77    pub(crate) pool: Pool,
78    merkle_store: Arc<RwLock<MerkleStore>>,
79}
80
81impl SqliteStore {
82    // CONSTRUCTORS
83    // --------------------------------------------------------------------------------------------
84
85    /// Returns a new instance of [Store] instantiated with the specified configuration options.
86    pub async fn new(database_filepath: PathBuf) -> Result<Self, StoreError> {
87        let sqlite_pool_manager = SqlitePoolManager::new(database_filepath);
88        let pool = Pool::builder(sqlite_pool_manager)
89            .build()
90            .map_err(|e| StoreError::DatabaseError(e.to_string()))?;
91
92        let conn = pool.get().await.map_err(|e| StoreError::DatabaseError(e.to_string()))?;
93
94        conn.interact(apply_migrations)
95            .await
96            .map_err(|e| StoreError::DatabaseError(e.to_string()))?
97            .map_err(|e| StoreError::DatabaseError(e.to_string()))?;
98
99        let store = SqliteStore {
100            pool,
101            merkle_store: Arc::new(RwLock::new(MerkleStore::new())),
102        };
103
104        // Initialize merkle store
105        for id in store.get_account_ids().await? {
106            let vault = store.get_account_vault(id).await?;
107            let storage = store.get_account_storage(id).await?;
108
109            let mut merkle_store =
110                store.merkle_store.write().expect("merkle_store write lock not poisoned");
111            insert_asset_nodes(&mut merkle_store, &vault);
112            insert_storage_map_nodes(&mut merkle_store, &storage);
113        }
114
115        Ok(store)
116    }
117
118    /// Interacts with the database by executing the provided function on a connection from the
119    /// pool.
120    ///
121    /// This function is a helper method which simplifies the process of making queries to the
122    /// database. It acquires a connection from the pool and executes the provided function,
123    /// returning the result.
124    async fn interact_with_connection<F, R>(&self, f: F) -> Result<R, StoreError>
125    where
126        F: FnOnce(&mut Connection) -> Result<R, StoreError> + Send + 'static,
127        R: Send + 'static,
128    {
129        self.pool
130            .get()
131            .await
132            .map_err(|err| StoreError::DatabaseError(err.to_string()))?
133            .interact(f)
134            .await
135            .map_err(|err| StoreError::DatabaseError(err.to_string()))?
136    }
137}
138
139// SQLite implementation of the Store trait
140//
141// To simplify, all implementations rely on inner SqliteStore functions that map 1:1 by name
142// This way, the actual implementations are grouped by entity types in their own sub-modules
143#[async_trait::async_trait]
144impl Store for SqliteStore {
145    fn get_current_timestamp(&self) -> Option<u64> {
146        Some(current_timestamp_u64())
147    }
148
149    async fn get_note_tags(&self) -> Result<Vec<NoteTagRecord>, StoreError> {
150        self.interact_with_connection(SqliteStore::get_note_tags).await
151    }
152
153    async fn get_unique_note_tags(&self) -> Result<BTreeSet<NoteTag>, StoreError> {
154        self.interact_with_connection(SqliteStore::get_unique_note_tags).await
155    }
156
157    async fn add_note_tag(&self, tag: NoteTagRecord) -> Result<bool, StoreError> {
158        self.interact_with_connection(move |conn| SqliteStore::add_note_tag(conn, tag))
159            .await
160    }
161
162    async fn remove_note_tag(&self, tag: NoteTagRecord) -> Result<usize, StoreError> {
163        self.interact_with_connection(move |conn| SqliteStore::remove_note_tag(conn, tag))
164            .await
165    }
166
167    async fn get_sync_height(&self) -> Result<BlockNumber, StoreError> {
168        self.interact_with_connection(SqliteStore::get_sync_height).await
169    }
170
171    async fn apply_state_sync(&self, state_sync_update: StateSyncUpdate) -> Result<(), StoreError> {
172        let merkle_store = self.merkle_store.clone();
173        self.interact_with_connection(move |conn| {
174            SqliteStore::apply_state_sync(conn, &merkle_store, state_sync_update)
175        })
176        .await
177    }
178
179    async fn get_transactions(
180        &self,
181        transaction_filter: TransactionFilter,
182    ) -> Result<Vec<TransactionRecord>, StoreError> {
183        self.interact_with_connection(move |conn| {
184            SqliteStore::get_transactions(conn, &transaction_filter)
185        })
186        .await
187    }
188
189    async fn apply_transaction(&self, tx_update: TransactionStoreUpdate) -> Result<(), StoreError> {
190        let merkle_store = self.merkle_store.clone();
191        self.interact_with_connection(move |conn| {
192            SqliteStore::apply_transaction(conn, &merkle_store, &tx_update)
193        })
194        .await
195    }
196
197    async fn get_input_notes(
198        &self,
199        filter: NoteFilter,
200    ) -> Result<Vec<InputNoteRecord>, StoreError> {
201        self.interact_with_connection(move |conn| SqliteStore::get_input_notes(conn, &filter))
202            .await
203    }
204
205    async fn get_output_notes(
206        &self,
207        note_filter: NoteFilter,
208    ) -> Result<Vec<OutputNoteRecord>, StoreError> {
209        self.interact_with_connection(move |conn| SqliteStore::get_output_notes(conn, &note_filter))
210            .await
211    }
212
213    async fn upsert_input_notes(&self, notes: &[InputNoteRecord]) -> Result<(), StoreError> {
214        let notes = notes.to_vec();
215        self.interact_with_connection(move |conn| SqliteStore::upsert_input_notes(conn, &notes))
216            .await
217    }
218
219    async fn get_note_script(&self, script_root: Word) -> Result<NoteScript, StoreError> {
220        self.interact_with_connection(move |conn| SqliteStore::get_note_script(conn, script_root))
221            .await
222    }
223
224    async fn upsert_note_scripts(&self, note_scripts: &[NoteScript]) -> Result<(), StoreError> {
225        let note_scripts = note_scripts.to_vec();
226        self.interact_with_connection(move |conn| {
227            SqliteStore::upsert_note_scripts(conn, &note_scripts)
228        })
229        .await
230    }
231
232    async fn insert_block_header(
233        &self,
234        block_header: &BlockHeader,
235        partial_blockchain_peaks: MmrPeaks,
236        has_client_notes: bool,
237    ) -> Result<(), StoreError> {
238        let block_header = block_header.clone();
239        self.interact_with_connection(move |conn| {
240            SqliteStore::insert_block_header(
241                conn,
242                &block_header,
243                &partial_blockchain_peaks,
244                has_client_notes,
245            )
246        })
247        .await
248    }
249
250    async fn prune_irrelevant_blocks(&self) -> Result<(), StoreError> {
251        self.interact_with_connection(SqliteStore::prune_irrelevant_blocks).await
252    }
253
254    async fn get_block_headers(
255        &self,
256        block_numbers: &BTreeSet<BlockNumber>,
257    ) -> Result<Vec<(BlockHeader, BlockRelevance)>, StoreError> {
258        let block_numbers = block_numbers.clone();
259        Ok(self
260            .interact_with_connection(move |conn| {
261                SqliteStore::get_block_headers(conn, &block_numbers)
262            })
263            .await?)
264    }
265
266    async fn get_tracked_block_headers(&self) -> Result<Vec<BlockHeader>, StoreError> {
267        self.interact_with_connection(SqliteStore::get_tracked_block_headers).await
268    }
269
270    async fn get_partial_blockchain_nodes(
271        &self,
272        filter: PartialBlockchainFilter,
273    ) -> Result<BTreeMap<InOrderIndex, Word>, StoreError> {
274        self.interact_with_connection(move |conn| {
275            SqliteStore::get_partial_blockchain_nodes(conn, &filter)
276        })
277        .await
278    }
279
280    async fn insert_partial_blockchain_nodes(
281        &self,
282        nodes: &[(InOrderIndex, Word)],
283    ) -> Result<(), StoreError> {
284        let nodes = nodes.to_vec();
285        self.interact_with_connection(move |conn| {
286            SqliteStore::insert_partial_blockchain_nodes(conn, &nodes)
287        })
288        .await
289    }
290
291    async fn get_partial_blockchain_peaks_by_block_num(
292        &self,
293        block_num: BlockNumber,
294    ) -> Result<MmrPeaks, StoreError> {
295        self.interact_with_connection(move |conn| {
296            SqliteStore::get_partial_blockchain_peaks_by_block_num(conn, block_num)
297        })
298        .await
299    }
300
301    async fn insert_account(
302        &self,
303        account: &Account,
304        initial_address: Address,
305    ) -> Result<(), StoreError> {
306        let cloned_account = account.clone();
307        let merkle_store = self.merkle_store.clone();
308
309        self.interact_with_connection(move |conn| {
310            SqliteStore::insert_account(conn, &merkle_store, &cloned_account, &initial_address)
311        })
312        .await
313    }
314
315    async fn update_account(&self, account: &Account) -> Result<(), StoreError> {
316        let cloned_account = account.clone();
317        let merkle_store = self.merkle_store.clone();
318
319        self.interact_with_connection(move |conn| {
320            SqliteStore::update_account(conn, &merkle_store, &cloned_account)
321        })
322        .await
323    }
324
325    async fn get_account_ids(&self) -> Result<Vec<AccountId>, StoreError> {
326        self.interact_with_connection(SqliteStore::get_account_ids).await
327    }
328
329    async fn get_account_headers(&self) -> Result<Vec<(AccountHeader, AccountStatus)>, StoreError> {
330        self.interact_with_connection(SqliteStore::get_account_headers).await
331    }
332
333    async fn get_account_header(
334        &self,
335        account_id: AccountId,
336    ) -> Result<Option<(AccountHeader, AccountStatus)>, StoreError> {
337        self.interact_with_connection(move |conn| SqliteStore::get_account_header(conn, account_id))
338            .await
339    }
340
341    async fn get_account_header_by_commitment(
342        &self,
343        account_commitment: Word,
344    ) -> Result<Option<AccountHeader>, StoreError> {
345        self.interact_with_connection(move |conn| {
346            SqliteStore::get_account_header_by_commitment(conn, account_commitment)
347        })
348        .await
349    }
350
351    async fn get_account(
352        &self,
353        account_id: AccountId,
354    ) -> Result<Option<AccountRecord>, StoreError> {
355        self.interact_with_connection(move |conn| SqliteStore::get_account(conn, account_id))
356            .await
357    }
358
359    async fn upsert_foreign_account_code(
360        &self,
361        account_id: AccountId,
362        code: AccountCode,
363    ) -> Result<(), StoreError> {
364        self.interact_with_connection(move |conn| {
365            SqliteStore::upsert_foreign_account_code(conn, account_id, &code)
366        })
367        .await
368    }
369
370    async fn get_foreign_account_code(
371        &self,
372        account_ids: Vec<AccountId>,
373    ) -> Result<BTreeMap<AccountId, AccountCode>, StoreError> {
374        self.interact_with_connection(move |conn| {
375            SqliteStore::get_foreign_account_code(conn, account_ids)
376        })
377        .await
378    }
379
380    async fn set_setting(&self, key: String, value: Vec<u8>) -> Result<(), StoreError> {
381        self.interact_with_connection(move |conn| {
382            set_setting(conn, &key, &value).into_store_error()
383        })
384        .await
385    }
386
387    async fn get_setting(&self, key: String) -> Result<Option<Vec<u8>>, StoreError> {
388        self.interact_with_connection(move |conn| get_setting(conn, &key)).await
389    }
390
391    async fn remove_setting(&self, key: String) -> Result<(), StoreError> {
392        self.interact_with_connection(move |conn| remove_setting(conn, &key)).await
393    }
394
395    async fn list_setting_keys(&self) -> Result<Vec<String>, StoreError> {
396        self.interact_with_connection(move |conn| list_setting_keys(conn)).await
397    }
398
399    async fn get_unspent_input_note_nullifiers(&self) -> Result<Vec<Nullifier>, StoreError> {
400        self.interact_with_connection(SqliteStore::get_unspent_input_note_nullifiers)
401            .await
402    }
403
404    async fn get_account_vault(&self, account_id: AccountId) -> Result<AssetVault, StoreError> {
405        self.interact_with_connection(move |conn| SqliteStore::get_account_vault(conn, account_id))
406            .await
407    }
408
409    async fn get_account_asset(
410        &self,
411        account_id: AccountId,
412        faucet_id_prefix: AccountIdPrefix,
413    ) -> Result<Option<(Asset, AssetWitness)>, StoreError> {
414        let merkle_store = self.merkle_store.clone();
415        self.interact_with_connection(move |conn| {
416            SqliteStore::get_account_asset(conn, &merkle_store, account_id, faucet_id_prefix)
417        })
418        .await
419    }
420
421    async fn get_account_storage(
422        &self,
423        account_id: AccountId,
424    ) -> Result<AccountStorage, StoreError> {
425        self.interact_with_connection(move |conn| {
426            SqliteStore::get_account_storage(conn, account_id)
427        })
428        .await
429    }
430
431    async fn get_account_map_item(
432        &self,
433        account_id: AccountId,
434        index: u8,
435        key: Word,
436    ) -> Result<(Word, StorageMapWitness), StoreError> {
437        let merkle_store = self.merkle_store.clone();
438
439        self.interact_with_connection(move |conn| {
440            SqliteStore::get_account_map_item(conn, &merkle_store, account_id, index, key)
441        })
442        .await
443    }
444
445    async fn get_addresses_by_account_id(
446        &self,
447        account_id: AccountId,
448    ) -> Result<Vec<Address>, StoreError> {
449        self.interact_with_connection(move |conn| {
450            SqliteStore::get_account_addresses(conn, account_id)
451        })
452        .await
453    }
454
455    async fn insert_address(
456        &self,
457        address: Address,
458        account_id: AccountId,
459    ) -> Result<(), StoreError> {
460        self.interact_with_connection(move |conn| {
461            let tx = conn.transaction().into_store_error()?;
462            SqliteStore::insert_address(&tx, &address, account_id)?;
463            tx.commit().into_store_error()
464        })
465        .await
466    }
467
468    async fn remove_address(
469        &self,
470        address: Address,
471        account_id: AccountId,
472    ) -> Result<(), StoreError> {
473        self.interact_with_connection(move |conn| {
474            SqliteStore::remove_address(conn, &address, account_id)
475        })
476        .await
477    }
478}
479
480// UTILS
481// ================================================================================================
482
483/// Returns the current UTC timestamp as `u64` (non-leap seconds since Unix epoch).
484pub(crate) fn current_timestamp_u64() -> u64 {
485    let now = chrono::Utc::now();
486    u64::try_from(now.timestamp()).expect("timestamp is always after epoch")
487}
488
489/// Gets a `u64` value from the database.
490///
491/// `Sqlite` uses `i64` as its internal representation format, and so when retrieving
492/// we need to make sure we cast as `u64` to get the original value
493pub fn column_value_as_u64<I: rusqlite::RowIndex>(
494    row: &rusqlite::Row<'_>,
495    index: I,
496) -> rusqlite::Result<u64> {
497    let value: i64 = row.get(index)?;
498    #[allow(
499        clippy::cast_sign_loss,
500        reason = "We store u64 as i64 as sqlite only allows the latter."
501    )]
502    Ok(value as u64)
503}
504
505/// Converts a `u64` into a [Value].
506///
507/// `Sqlite` uses `i64` as its internal representation format. Note that the `as` operator performs
508/// a lossless conversion from `u64` to `i64`.
509pub fn u64_to_value(v: u64) -> Value {
510    #[allow(
511        clippy::cast_possible_wrap,
512        reason = "We store u64 as i64 as sqlite only allows the latter."
513    )]
514    Value::Integer(v as i64)
515}
516
517// TESTS
518// ================================================================================================
519
520#[cfg(test)]
521pub mod tests {
522    use std::boxed::Box;
523
524    use miden_client::store::Store;
525    use miden_client::testing::common::create_test_store_path;
526
527    use super::SqliteStore;
528
529    fn assert_send_sync<T: Send + Sync>() {}
530
531    #[test]
532    fn is_send_sync() {
533        assert_send_sync::<SqliteStore>();
534        assert_send_sync::<Box<dyn Store>>();
535    }
536
537    // Function that returns a `Send` future from a dynamic trait that must be `Sync`.
538    async fn dyn_trait_send_fut(store: Box<dyn Store>) {
539        // This wouldn't compile if `get_tracked_block_headers` doesn't return a `Send` future.
540        let res = store.get_tracked_block_headers().await;
541        assert!(res.is_ok());
542    }
543
544    #[tokio::test]
545    async fn future_is_send() {
546        let client = SqliteStore::new(create_test_store_path()).await.unwrap();
547        let client: Box<SqliteStore> = client.into();
548        tokio::task::spawn(async move { dyn_trait_send_fut(client).await });
549    }
550
551    pub(crate) async fn create_test_store() -> SqliteStore {
552        SqliteStore::new(create_test_store_path()).await.unwrap()
553    }
554}