Skip to main content

miden_client_sqlite_store/
lib.rs

1//! SQLite-backed Store implementation for miden-client.
2//! This crate provides `SqliteStore` and its full implementation.
3//!
4//! [`SqliteStore`] enables the persistence of accounts, transactions, notes, block headers, and MMR
5//! nodes using an `SQLite` database.
6
7use std::boxed::Box;
8use std::collections::{BTreeMap, BTreeSet};
9use std::path::PathBuf;
10use std::string::{String, ToString};
11use std::sync::{Arc, RwLock};
12use std::vec::Vec;
13
14use db_management::pool_manager::{Pool, SqlitePoolManager};
15use db_management::utils::{
16    apply_migrations,
17    get_setting,
18    list_setting_keys,
19    remove_setting,
20    set_setting,
21};
22use miden_client::Word;
23use miden_client::account::{
24    Account,
25    AccountCode,
26    AccountHeader,
27    AccountId,
28    AccountStorage,
29    Address,
30    StorageMapKey,
31    StorageSlotName,
32};
33use miden_client::asset::{Asset, AssetVault, AssetWitness};
34use miden_client::block::BlockHeader;
35use miden_client::crypto::{InOrderIndex, MmrPeaks};
36use miden_client::note::{BlockNumber, NoteScript, NoteTag, Nullifier};
37use miden_client::store::{
38    AccountRecord,
39    AccountSmtForest,
40    AccountStatus,
41    AccountStorageFilter,
42    BlockRelevance,
43    InputNoteRecord,
44    NoteFilter,
45    OutputNoteRecord,
46    PartialBlockchainFilter,
47    Store,
48    StoreError,
49    TransactionFilter,
50};
51use miden_client::sync::{NoteTagRecord, StateSyncUpdate};
52use miden_client::transaction::{TransactionRecord, TransactionStoreUpdate};
53use miden_protocol::Felt;
54use miden_protocol::account::StorageMapWitness;
55use miden_protocol::asset::AssetVaultKey;
56use rusqlite::Connection;
57use rusqlite::types::Value;
58use sql_error::SqlResultExt;
59
60mod account;
61mod builder;
62mod chain_data;
63mod db_management;
64mod note;
65mod sql_error;
66mod sync;
67mod transaction;
68
69pub use builder::ClientBuilderSqliteExt;
70
71// SQLITE STORE
72// ================================================================================================
73
74/// Represents a pool of connections with an `SQLite` database. The pool is used to interact
75/// concurrently with the underlying database in a safe and efficient manner.
76///
77/// Current table definitions can be found at `store.sql` migration file.
78pub struct SqliteStore {
79    pub(crate) pool: Pool,
80    database_filepath: String,
81    smt_forest: Arc<RwLock<AccountSmtForest>>,
82}
83
84impl SqliteStore {
85    // CONSTRUCTORS
86    // --------------------------------------------------------------------------------------------
87
88    /// Returns a new instance of [Store] instantiated with the specified configuration options.
89    pub async fn new(database_filepath: PathBuf) -> Result<Self, StoreError> {
90        let database_filepath_str = database_filepath.to_string_lossy().into_owned();
91        let sqlite_pool_manager = SqlitePoolManager::new(database_filepath);
92        let pool = Pool::builder(sqlite_pool_manager)
93            .build()
94            .map_err(|e| StoreError::DatabaseError(e.to_string()))?;
95
96        let conn = pool.get().await.map_err(|e| StoreError::DatabaseError(e.to_string()))?;
97
98        conn.interact(apply_migrations)
99            .await
100            .map_err(|e| StoreError::DatabaseError(e.to_string()))?
101            .map_err(|e| StoreError::DatabaseError(e.to_string()))?;
102
103        let store = SqliteStore {
104            pool,
105            database_filepath: database_filepath_str,
106            smt_forest: Arc::new(RwLock::new(AccountSmtForest::new())),
107        };
108
109        // Initialize SMT forest
110        for id in store.get_account_ids().await? {
111            let vault = store.get_account_vault(id).await?;
112            let storage = store.get_account_storage(id, AccountStorageFilter::All).await?;
113            let header = store.get_account_header(id).await?;
114
115            let mut smt_forest = store.smt_forest.write().expect("smt write lock not poisoned");
116            if header.is_some() {
117                smt_forest.insert_and_register_account_state(id, &vault, &storage)?;
118            } else {
119                smt_forest.insert_account_state(&vault, &storage)?;
120            }
121        }
122
123        Ok(store)
124    }
125
126    /// Interacts with the database by executing the provided function on a connection from the
127    /// pool.
128    ///
129    /// This function is a helper method which simplifies the process of making queries to the
130    /// database. It acquires a connection from the pool and executes the provided function,
131    /// returning the result.
132    async fn interact_with_connection<F, R>(&self, f: F) -> Result<R, StoreError>
133    where
134        F: FnOnce(&mut Connection) -> Result<R, StoreError> + Send + 'static,
135        R: Send + 'static,
136    {
137        self.pool
138            .get()
139            .await
140            .map_err(|err| StoreError::DatabaseError(err.to_string()))?
141            .interact(f)
142            .await
143            .map_err(|err| StoreError::DatabaseError(err.to_string()))?
144    }
145}
146
147// SQLite implementation of the Store trait
148//
149// To simplify, all implementations rely on inner SqliteStore functions that map 1:1 by name
150// This way, the actual implementations are grouped by entity types in their own sub-modules
151#[async_trait::async_trait]
152impl Store for SqliteStore {
153    fn identifier(&self) -> &str {
154        &self.database_filepath
155    }
156
157    fn get_current_timestamp(&self) -> Option<u64> {
158        Some(current_timestamp_u64())
159    }
160
161    async fn get_note_tags(&self) -> Result<Vec<NoteTagRecord>, StoreError> {
162        self.interact_with_connection(SqliteStore::get_note_tags).await
163    }
164
165    async fn get_unique_note_tags(&self) -> Result<BTreeSet<NoteTag>, StoreError> {
166        self.interact_with_connection(SqliteStore::get_unique_note_tags).await
167    }
168
169    async fn add_note_tag(&self, tag: NoteTagRecord) -> Result<bool, StoreError> {
170        self.interact_with_connection(move |conn| SqliteStore::add_note_tag(conn, tag))
171            .await
172    }
173
174    async fn remove_note_tag(&self, tag: NoteTagRecord) -> Result<usize, StoreError> {
175        self.interact_with_connection(move |conn| SqliteStore::remove_note_tag(conn, tag))
176            .await
177    }
178
179    async fn get_sync_height(&self) -> Result<BlockNumber, StoreError> {
180        self.interact_with_connection(SqliteStore::get_sync_height).await
181    }
182
183    async fn apply_state_sync(&self, state_sync_update: StateSyncUpdate) -> Result<(), StoreError> {
184        let smt_forest = self.smt_forest.clone();
185        self.interact_with_connection(move |conn| {
186            SqliteStore::apply_state_sync(conn, &smt_forest, state_sync_update)
187        })
188        .await
189    }
190
191    async fn get_transactions(
192        &self,
193        transaction_filter: TransactionFilter,
194    ) -> Result<Vec<TransactionRecord>, StoreError> {
195        self.interact_with_connection(move |conn| {
196            SqliteStore::get_transactions(conn, &transaction_filter)
197        })
198        .await
199    }
200
201    async fn apply_transaction(&self, tx_update: TransactionStoreUpdate) -> Result<(), StoreError> {
202        let smt_forest = self.smt_forest.clone();
203        self.interact_with_connection(move |conn| {
204            SqliteStore::apply_transaction(conn, &smt_forest, &tx_update)
205        })
206        .await
207    }
208
209    async fn get_input_notes(
210        &self,
211        filter: NoteFilter,
212    ) -> Result<Vec<InputNoteRecord>, StoreError> {
213        self.interact_with_connection(move |conn| SqliteStore::get_input_notes(conn, &filter))
214            .await
215    }
216
217    async fn get_output_notes(
218        &self,
219        note_filter: NoteFilter,
220    ) -> Result<Vec<OutputNoteRecord>, StoreError> {
221        self.interact_with_connection(move |conn| SqliteStore::get_output_notes(conn, &note_filter))
222            .await
223    }
224
225    async fn get_input_note_by_offset(
226        &self,
227        filter: NoteFilter,
228        consumer: AccountId,
229        block_start: Option<BlockNumber>,
230        block_end: Option<BlockNumber>,
231        offset: u32,
232    ) -> Result<Option<InputNoteRecord>, StoreError> {
233        self.interact_with_connection(move |conn| {
234            SqliteStore::get_input_note_by_offset(
235                conn,
236                &filter,
237                consumer,
238                block_start,
239                block_end,
240                offset,
241            )
242        })
243        .await
244    }
245
246    async fn upsert_input_notes(&self, notes: &[InputNoteRecord]) -> Result<(), StoreError> {
247        let notes = notes.to_vec();
248        self.interact_with_connection(move |conn| SqliteStore::upsert_input_notes(conn, &notes))
249            .await
250    }
251
252    async fn get_note_script(&self, script_root: Word) -> Result<NoteScript, StoreError> {
253        self.interact_with_connection(move |conn| SqliteStore::get_note_script(conn, script_root))
254            .await
255    }
256
257    async fn upsert_note_scripts(&self, note_scripts: &[NoteScript]) -> Result<(), StoreError> {
258        let note_scripts = note_scripts.to_vec();
259        self.interact_with_connection(move |conn| {
260            SqliteStore::upsert_note_scripts(conn, &note_scripts)
261        })
262        .await
263    }
264
265    async fn insert_block_header(
266        &self,
267        block_header: &BlockHeader,
268        partial_blockchain_peaks: MmrPeaks,
269        has_client_notes: bool,
270    ) -> Result<(), StoreError> {
271        let block_header = block_header.clone();
272        self.interact_with_connection(move |conn| {
273            SqliteStore::insert_block_header(
274                conn,
275                &block_header,
276                &partial_blockchain_peaks,
277                has_client_notes,
278            )
279        })
280        .await
281    }
282
283    async fn prune_irrelevant_blocks(&self) -> Result<(), StoreError> {
284        self.interact_with_connection(SqliteStore::prune_irrelevant_blocks).await
285    }
286
287    async fn prune_account_history(
288        &self,
289        account_id: AccountId,
290        up_to_nonce: Felt,
291    ) -> Result<usize, StoreError> {
292        self.interact_with_connection(move |conn| {
293            SqliteStore::prune_account_history(conn, account_id, up_to_nonce)
294        })
295        .await
296    }
297
298    async fn get_block_headers(
299        &self,
300        block_numbers: &BTreeSet<BlockNumber>,
301    ) -> Result<Vec<(BlockHeader, BlockRelevance)>, StoreError> {
302        let block_numbers = block_numbers.clone();
303        Ok(self
304            .interact_with_connection(move |conn| {
305                SqliteStore::get_block_headers(conn, &block_numbers)
306            })
307            .await?)
308    }
309
310    async fn get_tracked_block_headers(&self) -> Result<Vec<BlockHeader>, StoreError> {
311        self.interact_with_connection(SqliteStore::get_tracked_block_headers).await
312    }
313
314    async fn get_tracked_block_header_numbers(&self) -> Result<BTreeSet<usize>, StoreError> {
315        self.interact_with_connection(SqliteStore::get_tracked_block_header_numbers)
316            .await
317    }
318
319    async fn get_partial_blockchain_nodes(
320        &self,
321        filter: PartialBlockchainFilter,
322    ) -> Result<BTreeMap<InOrderIndex, Word>, StoreError> {
323        self.interact_with_connection(move |conn| {
324            SqliteStore::get_partial_blockchain_nodes(conn, &filter)
325        })
326        .await
327    }
328
329    async fn insert_partial_blockchain_nodes(
330        &self,
331        nodes: &[(InOrderIndex, Word)],
332    ) -> Result<(), StoreError> {
333        let nodes = nodes.to_vec();
334        self.interact_with_connection(move |conn| {
335            SqliteStore::insert_partial_blockchain_nodes(conn, &nodes)
336        })
337        .await
338    }
339
340    async fn get_partial_blockchain_peaks_by_block_num(
341        &self,
342        block_num: BlockNumber,
343    ) -> Result<MmrPeaks, StoreError> {
344        self.interact_with_connection(move |conn| {
345            SqliteStore::get_partial_blockchain_peaks_by_block_num(conn, block_num)
346        })
347        .await
348    }
349
350    async fn insert_account(
351        &self,
352        account: &Account,
353        initial_address: Address,
354    ) -> Result<(), StoreError> {
355        let cloned_account = account.clone();
356        let smt_forest = self.smt_forest.clone();
357
358        self.interact_with_connection(move |conn| {
359            SqliteStore::insert_account(conn, &smt_forest, &cloned_account, &initial_address)
360        })
361        .await
362    }
363
364    async fn update_account(&self, account: &Account) -> Result<(), StoreError> {
365        let cloned_account = account.clone();
366        let smt_forest = self.smt_forest.clone();
367
368        self.interact_with_connection(move |conn| {
369            SqliteStore::update_account(conn, &smt_forest, &cloned_account)
370        })
371        .await
372    }
373
374    async fn get_account_ids(&self) -> Result<Vec<AccountId>, StoreError> {
375        self.interact_with_connection(SqliteStore::get_account_ids).await
376    }
377
378    async fn get_account_headers(&self) -> Result<Vec<(AccountHeader, AccountStatus)>, StoreError> {
379        self.interact_with_connection(SqliteStore::get_account_headers).await
380    }
381
382    async fn get_account_header(
383        &self,
384        account_id: AccountId,
385    ) -> Result<Option<(AccountHeader, AccountStatus)>, StoreError> {
386        self.interact_with_connection(move |conn| SqliteStore::get_account_header(conn, account_id))
387            .await
388    }
389
390    async fn get_account_header_by_commitment(
391        &self,
392        account_commitment: Word,
393    ) -> Result<Option<AccountHeader>, StoreError> {
394        self.interact_with_connection(move |conn| {
395            SqliteStore::get_account_header_by_commitment(conn, account_commitment)
396        })
397        .await
398    }
399
400    async fn get_account(
401        &self,
402        account_id: AccountId,
403    ) -> Result<Option<AccountRecord>, StoreError> {
404        self.interact_with_connection(move |conn| SqliteStore::get_account(conn, account_id))
405            .await
406    }
407
408    async fn get_account_code(
409        &self,
410        account_id: AccountId,
411    ) -> Result<Option<AccountCode>, StoreError> {
412        self.interact_with_connection(move |conn| {
413            SqliteStore::get_account_code_by_id(conn, account_id)
414        })
415        .await
416    }
417
418    async fn upsert_foreign_account_code(
419        &self,
420        account_id: AccountId,
421        code: AccountCode,
422    ) -> Result<(), StoreError> {
423        self.interact_with_connection(move |conn| {
424            SqliteStore::upsert_foreign_account_code(conn, account_id, &code)
425        })
426        .await
427    }
428
429    async fn get_foreign_account_code(
430        &self,
431        account_ids: Vec<AccountId>,
432    ) -> Result<BTreeMap<AccountId, AccountCode>, StoreError> {
433        self.interact_with_connection(move |conn| {
434            SqliteStore::get_foreign_account_code(conn, account_ids)
435        })
436        .await
437    }
438
439    async fn set_setting(&self, key: String, value: Vec<u8>) -> Result<(), StoreError> {
440        self.interact_with_connection(move |conn| {
441            set_setting(conn, &key, &value).into_store_error()
442        })
443        .await
444    }
445
446    async fn get_setting(&self, key: String) -> Result<Option<Vec<u8>>, StoreError> {
447        self.interact_with_connection(move |conn| get_setting(conn, &key)).await
448    }
449
450    async fn remove_setting(&self, key: String) -> Result<(), StoreError> {
451        self.interact_with_connection(move |conn| remove_setting(conn, &key)).await
452    }
453
454    async fn list_setting_keys(&self) -> Result<Vec<String>, StoreError> {
455        self.interact_with_connection(move |conn| list_setting_keys(conn)).await
456    }
457
458    async fn get_unspent_input_note_nullifiers(&self) -> Result<Vec<Nullifier>, StoreError> {
459        self.interact_with_connection(SqliteStore::get_unspent_input_note_nullifiers)
460            .await
461    }
462
463    async fn get_account_vault(&self, account_id: AccountId) -> Result<AssetVault, StoreError> {
464        self.interact_with_connection(move |conn| SqliteStore::get_account_vault(conn, account_id))
465            .await
466    }
467
468    async fn get_account_asset(
469        &self,
470        account_id: AccountId,
471        vault_key: AssetVaultKey,
472    ) -> Result<Option<(Asset, AssetWitness)>, StoreError> {
473        let smt_forest = self.smt_forest.clone();
474        self.interact_with_connection(move |conn| {
475            SqliteStore::get_account_asset(conn, &smt_forest, account_id, vault_key)
476        })
477        .await
478    }
479
480    async fn get_account_storage(
481        &self,
482        account_id: AccountId,
483        filter: AccountStorageFilter,
484    ) -> Result<AccountStorage, StoreError> {
485        self.interact_with_connection(move |conn| {
486            SqliteStore::get_account_storage(conn, account_id, &filter)
487        })
488        .await
489    }
490
491    async fn get_account_map_item(
492        &self,
493        account_id: AccountId,
494        slot_name: StorageSlotName,
495        key: StorageMapKey,
496    ) -> Result<(Word, StorageMapWitness), StoreError> {
497        let smt_forest = self.smt_forest.clone();
498
499        self.interact_with_connection(move |conn| {
500            SqliteStore::get_account_map_item(conn, &smt_forest, account_id, slot_name, key)
501        })
502        .await
503    }
504
505    async fn get_addresses_by_account_id(
506        &self,
507        account_id: AccountId,
508    ) -> Result<Vec<Address>, StoreError> {
509        self.interact_with_connection(move |conn| {
510            SqliteStore::get_account_addresses(conn, account_id)
511        })
512        .await
513    }
514
515    async fn insert_address(
516        &self,
517        address: Address,
518        account_id: AccountId,
519    ) -> Result<(), StoreError> {
520        self.interact_with_connection(move |conn| {
521            let tx = conn.transaction().into_store_error()?;
522            SqliteStore::insert_address(&tx, &address, account_id)?;
523            tx.commit().into_store_error()
524        })
525        .await
526    }
527
528    async fn remove_address(
529        &self,
530        address: Address,
531        account_id: AccountId,
532    ) -> Result<(), StoreError> {
533        self.interact_with_connection(move |conn| {
534            SqliteStore::remove_address(conn, &address, account_id)
535        })
536        .await
537    }
538
539    async fn get_minimal_partial_account(
540        &self,
541        account_id: AccountId,
542    ) -> Result<Option<AccountRecord>, StoreError> {
543        self.interact_with_connection(move |conn| {
544            SqliteStore::get_minimal_partial_account(conn, account_id)
545        })
546        .await
547    }
548}
549
550// UTILS
551// ================================================================================================
552
553/// Returns the current UTC timestamp as `u64` (non-leap seconds since Unix epoch).
554pub(crate) fn current_timestamp_u64() -> u64 {
555    let now = chrono::Utc::now();
556    u64::try_from(now.timestamp()).expect("timestamp is always after epoch")
557}
558
559/// Gets a `u64` value from the database.
560///
561/// `Sqlite` uses `i64` as its internal representation format, and so when retrieving
562/// we need to make sure we cast as `u64` to get the original value
563pub fn column_value_as_u64<I: rusqlite::RowIndex>(
564    row: &rusqlite::Row<'_>,
565    index: I,
566) -> rusqlite::Result<u64> {
567    let value: i64 = row.get(index)?;
568    #[allow(
569        clippy::cast_sign_loss,
570        reason = "We store u64 as i64 as sqlite only allows the latter."
571    )]
572    Ok(value as u64)
573}
574
575/// Converts a `u64` into a [Value].
576///
577/// `Sqlite` uses `i64` as its internal representation format. Note that the `as` operator performs
578/// a lossless conversion from `u64` to `i64`.
579pub fn u64_to_value(v: u64) -> Value {
580    #[allow(
581        clippy::cast_possible_wrap,
582        reason = "We store u64 as i64 as sqlite only allows the latter."
583    )]
584    Value::Integer(v as i64)
585}
586
587// TESTS
588// ================================================================================================
589
590#[cfg(test)]
591pub mod tests {
592    use std::boxed::Box;
593
594    use miden_client::store::Store;
595    use miden_client::testing::common::create_test_store_path;
596
597    use super::SqliteStore;
598
599    fn assert_send_sync<T: Send + Sync>() {}
600
601    #[test]
602    fn is_send_sync() {
603        assert_send_sync::<SqliteStore>();
604        assert_send_sync::<Box<dyn Store>>();
605    }
606
607    // Function that returns a `Send` future from a dynamic trait that must be `Sync`.
608    async fn dyn_trait_send_fut(store: Box<dyn Store>) {
609        // This wouldn't compile if `get_tracked_block_headers` doesn't return a `Send` future.
610        let res = store.get_tracked_block_headers().await;
611        assert!(res.is_ok());
612    }
613
614    #[tokio::test]
615    async fn future_is_send() {
616        let client = SqliteStore::new(create_test_store_path()).await.unwrap();
617        let client: Box<SqliteStore> = client.into();
618        tokio::task::spawn(async move { dyn_trait_send_fut(client).await });
619    }
620
621    pub(crate) async fn create_test_store() -> SqliteStore {
622        SqliteStore::new(create_test_store_path()).await.unwrap()
623    }
624}