Skip to main content

miden_node_store/db/models/queries/
accounts.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::num::NonZeroUsize;
3use std::ops::RangeInclusive;
4
5use diesel::prelude::{Queryable, QueryableByName};
6use diesel::query_dsl::methods::SelectDsl;
7use diesel::sqlite::Sqlite;
8use diesel::{
9    AsChangeset,
10    BoolExpressionMethods,
11    ExpressionMethods,
12    Insertable,
13    JoinOnDsl,
14    NullableExpressionMethods,
15    OptionalExtension,
16    QueryDsl,
17    RunQueryDsl,
18    Selectable,
19    SelectableHelper,
20    SqliteConnection,
21};
22use miden_node_proto::domain::account::{AccountInfo, AccountSummary};
23use miden_node_utils::limiter::{
24    MAX_RESPONSE_PAYLOAD_BYTES,
25    QueryParamAccountIdLimit,
26    QueryParamLimiter,
27};
28use miden_protocol::account::delta::AccountUpdateDetails;
29use miden_protocol::account::{
30    Account,
31    AccountCode,
32    AccountId,
33    AccountStorage,
34    AccountStorageHeader,
35    NonFungibleDeltaAction,
36    StorageMap,
37    StorageMapKey,
38    StorageSlot,
39    StorageSlotContent,
40    StorageSlotName,
41    StorageSlotType,
42};
43use miden_protocol::asset::{Asset, AssetVault, AssetVaultKey, FungibleAsset};
44use miden_protocol::block::{BlockAccountUpdate, BlockNumber};
45use miden_protocol::utils::serde::{Deserializable, Serializable};
46use miden_protocol::{Felt, Word};
47use miden_standards::account::auth::NetworkAccount;
48
49use crate::COMPONENT;
50use crate::db::models::conv::{SqlTypeConvert, nonce_to_raw_sql, raw_sql_to_nonce};
51#[cfg(test)]
52use crate::db::models::vec_raw_try_into;
53use crate::db::{AccountVaultValue, schema};
54use crate::errors::DatabaseError;
55
56mod at_block;
57pub(crate) use at_block::select_account_header_with_storage_header_at_block;
58
59mod delta;
60use delta::{
61    AccountStateForInsert,
62    PartialAccountState,
63    apply_storage_delta,
64    select_latest_vault_assets,
65    select_minimal_account_state_headers,
66    select_vault_balances_by_vault_keys,
67};
68
69#[cfg(test)]
70mod tests;
71
72type StorageMapValueRow = (i64, String, Vec<u8>, Vec<u8>);
73type StorageHeaderWithEntries =
74    (AccountStorageHeader, HashMap<StorageSlotName, BTreeMap<StorageMapKey, Word>>);
75
76// NETWORK ACCOUNT TYPE
77// ================================================================================================
78
79/// Classifies accounts for database storage based on whether they are network accounts.
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub(crate) enum NetworkAccountType {
82    /// Not a network account.
83    None,
84    /// A network account.
85    Network,
86}
87
88// ACCOUNT CODE
89// ================================================================================================
90
91/// Select account code by its commitment hash from the `account_codes` table.
92///
93/// # Returns
94///
95/// The account code bytes if found, or `None` if no code exists with that commitment.
96///
97/// # Raw SQL
98///
99/// ```sql
100/// SELECT code FROM account_codes WHERE code_commitment = ?1
101/// ```
102pub(crate) fn select_account_code_by_commitment(
103    conn: &mut SqliteConnection,
104    code_commitment: Word,
105) -> Result<Option<Vec<u8>>, DatabaseError> {
106    use schema::account_codes;
107
108    let code_commitment_bytes = code_commitment.to_bytes();
109
110    let result: Option<Vec<u8>> = SelectDsl::select(
111        account_codes::table.filter(account_codes::code_commitment.eq(&code_commitment_bytes)),
112        account_codes::code,
113    )
114    .first(conn)
115    .optional()?;
116
117    Ok(result)
118}
119
120// ACCOUNT RETRIEVAL
121// ================================================================================================
122
123/// Select account by ID from the DB using the given [`SqliteConnection`].
124///
125/// # Returns
126///
127/// The latest account info, or an error.
128///
129/// # Raw SQL
130///
131/// ```sql
132/// SELECT
133///     accounts.account_id,
134///     accounts.account_commitment,
135///     accounts.block_num
136/// FROM
137///     accounts
138/// WHERE
139///     account_id = ?1
140///     AND is_latest = 1
141/// ```
142pub(crate) fn select_account(
143    conn: &mut SqliteConnection,
144    account_id: AccountId,
145) -> Result<AccountInfo, DatabaseError> {
146    let raw = SelectDsl::select(schema::accounts::table, AccountSummaryRaw::as_select())
147        .filter(schema::accounts::account_id.eq(account_id.to_bytes()))
148        .filter(schema::accounts::is_latest.eq(true))
149        .get_result::<AccountSummaryRaw>(conn)
150        .optional()?
151        .ok_or(DatabaseError::AccountNotFoundInDb(account_id))?;
152
153    let summary: AccountSummary = raw.try_into()?;
154
155    // Backfill account details from database For private accounts, we don't store full details in
156    // the database
157    let details = if account_id.is_public() {
158        Some(select_full_account(conn, account_id)?)
159    } else {
160        None
161    };
162
163    Ok(AccountInfo { summary, details })
164}
165
166/// Reconstruct full Account from database tables for the latest account state
167///
168/// This function queries the database tables to reconstruct a complete Account object:
169/// - Code from `account_codes` table
170/// - Nonce and storage header from `accounts` table
171/// - Storage map entries from `account_storage_map_values` table
172/// - Vault from `account_vault_assets` table
173///
174/// # Note
175///
176/// A stop-gap solution to retain store API and construct `AccountInfo` types.
177/// The function should ultimately be removed, and any queries be served from the
178/// `State` which contains an `SmtForest` to serve the latest and most recent
179/// historical data.
180// TODO: remove eventually once refactoring is complete
181pub(crate) fn select_full_account(
182    conn: &mut SqliteConnection,
183    account_id: AccountId,
184) -> Result<Account, DatabaseError> {
185    // Get account metadata (nonce, code_commitment) and code in a single join query
186    let joined = schema::accounts::table.inner_join(schema::account_codes::table.on(
187        schema::accounts::code_commitment.eq(schema::account_codes::code_commitment.nullable()),
188    ));
189
190    let (nonce, code_bytes): (Option<i64>, Vec<u8>) =
191        SelectDsl::select(joined, (schema::accounts::nonce, schema::account_codes::code))
192            .filter(schema::accounts::account_id.eq(account_id.to_bytes()))
193            .filter(schema::accounts::is_latest.eq(true))
194            .get_result(conn)
195            .optional()?
196            .ok_or(DatabaseError::AccountNotFoundInDb(account_id))?;
197
198    let nonce = raw_sql_to_nonce(nonce.ok_or_else(|| {
199        DatabaseError::DataCorrupted(format!("No nonce found for account {account_id}"))
200    })?);
201
202    let code = AccountCode::read_from_bytes(&code_bytes)?;
203
204    // Reconstruct storage using existing helper function
205    let storage = select_latest_account_storage(conn, account_id)?;
206
207    // Reconstruct vault from account_vault_assets table
208    let vault_entries: Vec<(Vec<u8>, Option<Vec<u8>>)> = SelectDsl::select(
209        schema::account_vault_assets::table,
210        (schema::account_vault_assets::vault_key, schema::account_vault_assets::asset),
211    )
212    .filter(schema::account_vault_assets::account_id.eq(account_id.to_bytes()))
213    .filter(schema::account_vault_assets::is_latest.eq(true))
214    .load(conn)?;
215
216    let mut assets = Vec::new();
217    for (_key_bytes, maybe_asset_bytes) in vault_entries {
218        if let Some(asset_bytes) = maybe_asset_bytes {
219            let asset = Asset::read_from_bytes(&asset_bytes)?;
220            assets.push(asset);
221        }
222    }
223
224    let vault = AssetVault::new(&assets)?;
225
226    Ok(Account::new(account_id, vault, storage, code, nonce, None)?)
227}
228
229/// Page of account commitments returned by [`select_account_commitments_paged`].
230#[derive(Debug)]
231pub struct AccountCommitmentsPage {
232    /// The account commitments in this page.
233    pub commitments: Vec<(AccountId, Word)>,
234    /// If `Some`, there are more results. Use this as the `after_account_id` for the next page.
235    pub next_cursor: Option<AccountId>,
236}
237
238/// Selects account commitments with pagination.
239///
240/// Returns up to `page_size` account commitments, starting after `after_account_id` if provided.
241/// Results are ordered by `account_id` for stable pagination.
242///
243/// # Raw SQL
244///
245/// ```sql
246/// SELECT
247///     account_id,
248///     account_commitment
249/// FROM
250///     accounts
251/// WHERE
252///     is_latest = 1
253///     AND (account_id > :after_account_id OR :after_account_id IS NULL)
254/// ORDER BY
255///     account_id ASC
256/// LIMIT :page_size + 1
257/// ```
258pub(crate) fn select_account_commitments_paged(
259    conn: &mut SqliteConnection,
260    page_size: NonZeroUsize,
261    after_account_id: Option<AccountId>,
262) -> Result<AccountCommitmentsPage, DatabaseError> {
263    // Fetch one extra to determine if there are more results
264    #[expect(clippy::cast_possible_wrap)]
265    let limit = (page_size.get() + 1) as i64;
266
267    let mut query = SelectDsl::select(
268        schema::accounts::table,
269        (schema::accounts::account_id, schema::accounts::account_commitment),
270    )
271    .filter(schema::accounts::is_latest.eq(true))
272    .order_by(schema::accounts::account_id.asc())
273    .limit(limit)
274    .into_boxed();
275
276    if let Some(cursor) = after_account_id {
277        query = query.filter(schema::accounts::account_id.gt(cursor.to_bytes()));
278    }
279
280    let raw = query.load::<(Vec<u8>, Vec<u8>)>(conn)?;
281
282    let mut commitments = Result::<Vec<_>, DatabaseError>::from_iter(raw.into_iter().map(
283        |(ref account, ref commitment)| {
284            Ok((AccountId::read_from_bytes(account)?, Word::read_from_bytes(commitment)?))
285        },
286    ))?;
287
288    // If we got more than page_size, there are more results
289    let next_cursor = if commitments.len() > page_size.get() {
290        commitments.pop(); // Remove the extra element
291        commitments.last().map(|(id, _)| *id)
292    } else {
293        None
294    };
295
296    Ok(AccountCommitmentsPage { commitments, next_cursor })
297}
298
299/// Page of public account IDs returned by [`select_public_account_ids_paged`].
300#[derive(Debug)]
301pub struct PublicAccountIdsPage {
302    /// The public account IDs in this page.
303    pub account_ids: Vec<AccountId>,
304    /// If `Some`, there are more results. Use this as the `after_account_id` for the next page.
305    pub next_cursor: Option<AccountId>,
306}
307
308/// Latest account state forest roots for a public account.
309#[derive(Debug)]
310pub struct PublicAccountStateRoots {
311    pub account_id: AccountId,
312    pub vault_root: Word,
313    pub storage_header: AccountStorageHeader,
314}
315
316/// Page of public account state roots returned by [`select_public_account_state_roots_paged`].
317#[derive(Debug)]
318pub struct PublicAccountStateRootsPage {
319    /// The public account state roots in this page.
320    pub accounts: Vec<PublicAccountStateRoots>,
321    /// If `Some`, there are more results. Use this as the `after_account_id` for the next page.
322    pub next_cursor: Option<AccountId>,
323}
324
325/// Selects public account IDs with pagination.
326///
327/// Returns up to `page_size` public account IDs, starting after `after_account_id` if provided.
328/// Results are ordered by `account_id` for stable pagination.
329///
330/// Public accounts are those with `AccountType::Public`. We identify them by checking
331/// against the store. Public accounts store their `code_commitment`, while private accounts only
332/// store the `account_commitment`.
333///
334/// # Raw SQL
335///
336/// ```sql
337/// SELECT
338///     account_id
339/// FROM
340///     accounts
341/// WHERE
342///     is_latest = 1
343///     AND code_commitment IS NOT NULL
344///     AND (account_id > :after_account_id OR :after_account_id IS NULL)
345/// ORDER BY
346///     account_id ASC
347/// LIMIT :page_size + 1
348/// ```
349pub(crate) fn select_public_account_ids_paged(
350    conn: &mut SqliteConnection,
351    page_size: NonZeroUsize,
352    after_account_id: Option<AccountId>,
353) -> Result<PublicAccountIdsPage, DatabaseError> {
354    #[expect(clippy::cast_possible_wrap)]
355    let limit = (page_size.get() + 1) as i64;
356
357    let mut query = SelectDsl::select(schema::accounts::table, schema::accounts::account_id)
358        .filter(schema::accounts::is_latest.eq(true))
359        .filter(schema::accounts::code_commitment.is_not_null())
360        .order_by(schema::accounts::account_id.asc())
361        .limit(limit)
362        .into_boxed();
363
364    if let Some(cursor) = after_account_id {
365        query = query.filter(schema::accounts::account_id.gt(cursor.to_bytes()));
366    }
367
368    let raw = query.load::<Vec<u8>>(conn)?;
369
370    let mut account_ids: Vec<AccountId> = Result::from_iter(raw.into_iter().map(|bytes| {
371        AccountId::read_from_bytes(&bytes).map_err(DatabaseError::DeserializationError)
372    }))?;
373
374    // If we got more than page_size, there are more results
375    let next_cursor = if account_ids.len() > page_size.get() {
376        account_ids.pop(); // Remove the extra element
377        account_ids.last().copied()
378    } else {
379        None
380    };
381
382    Ok(PublicAccountIdsPage { account_ids, next_cursor })
383}
384
385/// Selects public account vault roots and storage headers with pagination.
386///
387/// Returns up to `page_size` public account states, starting after `after_account_id` if provided.
388/// Results are ordered by `account_id` for stable pagination.
389///
390/// Public accounts are those with `AccountType::Public`. We identify them by checking
391/// against the store. Public accounts store their `code_commitment`, while private accounts only
392/// store the `account_commitment`.
393///
394/// # Raw SQL
395///
396/// ```sql
397/// SELECT
398///     account_id,
399///     vault_root,
400///     storage_header
401/// FROM
402///     accounts
403/// WHERE
404///     is_latest = 1
405///     AND code_commitment IS NOT NULL
406///     AND (account_id > :after_account_id OR :after_account_id IS NULL)
407/// ORDER BY
408///     account_id ASC
409/// LIMIT :page_size + 1
410/// ```
411pub(crate) fn select_public_account_state_roots_paged(
412    conn: &mut SqliteConnection,
413    page_size: NonZeroUsize,
414    after_account_id: Option<AccountId>,
415) -> Result<PublicAccountStateRootsPage, DatabaseError> {
416    #[expect(clippy::cast_possible_wrap)]
417    let limit = (page_size.get() + 1) as i64;
418
419    let mut query = SelectDsl::select(
420        schema::accounts::table,
421        (
422            schema::accounts::account_id,
423            schema::accounts::vault_root,
424            schema::accounts::storage_header,
425        ),
426    )
427    .filter(schema::accounts::is_latest.eq(true))
428    .filter(schema::accounts::code_commitment.is_not_null())
429    .order_by(schema::accounts::account_id.asc())
430    .limit(limit)
431    .into_boxed();
432
433    if let Some(cursor) = after_account_id {
434        query = query.filter(schema::accounts::account_id.gt(cursor.to_bytes()));
435    }
436
437    let raw = query.load::<(Vec<u8>, Option<Vec<u8>>, Option<Vec<u8>>)>(conn)?;
438
439    let mut accounts: Vec<PublicAccountStateRoots> = Result::from_iter(raw.into_iter().map(
440        |(account_id_bytes, vault_root_bytes, storage_header_bytes)| {
441            let account_id = AccountId::read_from_bytes(&account_id_bytes)
442                .map_err(DatabaseError::DeserializationError)?;
443            let vault_root_bytes = vault_root_bytes.ok_or_else(|| {
444                DatabaseError::DataCorrupted(format!(
445                    "public account {account_id} is missing a vault root"
446                ))
447            })?;
448            let storage_header_bytes = storage_header_bytes.ok_or_else(|| {
449                DatabaseError::DataCorrupted(format!(
450                    "public account {account_id} is missing a storage header"
451                ))
452            })?;
453
454            Ok::<_, DatabaseError>(PublicAccountStateRoots {
455                account_id,
456                vault_root: Word::read_from_bytes(&vault_root_bytes)?,
457                storage_header: AccountStorageHeader::read_from_bytes(&storage_header_bytes)?,
458            })
459        },
460    ))?;
461
462    // If we got more than page_size, there are more results.
463    let next_cursor = if accounts.len() > page_size.get() {
464        accounts.pop();
465        accounts.last().map(|account| account.account_id)
466    } else {
467        None
468    };
469
470    Ok(PublicAccountStateRootsPage { accounts, next_cursor })
471}
472
473/// Select account vault assets within a block range (inclusive).
474///
475/// # Parameters
476/// * `account_id`: Account ID to query
477/// * `block_from`: Starting block number
478/// * `block_to`: Ending block number
479/// * Response payload size: 0 <= size <= 2MB
480/// * Vault assets per response: 0 <= count <= (2MB / (2*Word + u32)) + 1
481///
482/// # Raw SQL
483///
484/// ```sql
485/// SELECT
486///     block_num,
487///     vault_key,
488///     asset
489/// FROM
490///     account_vault_assets
491/// WHERE
492///     account_id = ?1
493///     AND block_num >= ?2
494///     AND block_num <= ?3
495/// ORDER BY
496///     block_num ASC
497/// LIMIT
498///     ?4
499/// ```
500pub(crate) fn select_account_vault_assets(
501    conn: &mut SqliteConnection,
502    account_id: AccountId,
503    block_range: RangeInclusive<BlockNumber>,
504) -> Result<(BlockNumber, Vec<AccountVaultValue>), DatabaseError> {
505    use schema::account_vault_assets as t;
506    // TODO: These limits should be given by the protocol. See miden-protocol/issues/1770 for more
507    // details
508    const ROW_OVERHEAD_BYTES: usize = 2 * size_of::<Word>() + size_of::<u32>(); // key + asset + block_num
509    const MAX_ROWS: usize = MAX_RESPONSE_PAYLOAD_BYTES / ROW_OVERHEAD_BYTES;
510
511    if !account_id.is_public() {
512        return Err(DatabaseError::AccountNotPublic(account_id));
513    }
514
515    if block_range.is_empty() {
516        return Err(DatabaseError::InvalidBlockRange {
517            from: *block_range.start(),
518            to: *block_range.end(),
519        });
520    }
521
522    let raw: Vec<(i64, Vec<u8>, Option<Vec<u8>>)> =
523        SelectDsl::select(t::table, (t::block_num, t::vault_key, t::asset))
524            .filter(
525                t::account_id
526                    .eq(account_id.to_bytes())
527                    .and(t::block_num.ge(block_range.start().to_raw_sql()))
528                    .and(t::block_num.le(block_range.end().to_raw_sql())),
529            )
530            .order(t::block_num.asc())
531            .limit(i64::try_from(MAX_ROWS + 1).expect("should fit within i64"))
532            .load::<(i64, Vec<u8>, Option<Vec<u8>>)>(conn)?;
533
534    // If we got more rows than the limit, the last block may be incomplete so we drop it entirely
535    // and derive last_block_included from the remaining rows.
536    let (last_block_included, values) = if let Some(&(last_block_num, ..)) = raw.last()
537        && raw.len() > MAX_ROWS
538    {
539        let values = raw
540            .into_iter()
541            .take_while(|(bn, ..)| *bn != last_block_num)
542            .map(AccountVaultValue::from_raw_row)
543            .collect::<Result<Vec<_>, DatabaseError>>()?;
544
545        let last_block_included = values.last().map_or(*block_range.start(), |v| v.block_num);
546
547        (last_block_included, values)
548    } else {
549        (
550            *block_range.end(),
551            raw.into_iter().map(AccountVaultValue::from_raw_row).collect::<Result<_, _>>()?,
552        )
553    };
554
555    Ok((last_block_included, values))
556}
557
558/// Select all accounts from the DB using the given [`SqliteConnection`].
559///
560/// # Returns
561///
562/// A vector with accounts, or an error.
563///
564/// # Raw SQL
565///
566/// ```sql
567/// SELECT
568///     accounts.account_id,
569///     accounts.account_commitment,
570///     accounts.block_num
571/// FROM
572///     accounts
573/// WHERE
574///     is_latest = 1
575/// ORDER BY
576///     block_num ASC
577/// ```
578#[cfg(test)]
579pub(crate) fn select_all_accounts(
580    conn: &mut SqliteConnection,
581) -> Result<Vec<AccountInfo>, DatabaseError> {
582    let raw = SelectDsl::select(schema::accounts::table, AccountSummaryRaw::as_select())
583        .filter(schema::accounts::is_latest.eq(true))
584        .order_by(schema::accounts::block_num.asc())
585        .load::<AccountSummaryRaw>(conn)?;
586
587    let summaries: Vec<AccountSummary> = vec_raw_try_into(raw)?;
588
589    // Backfill account details from database
590    let account_infos = summaries
591        .into_iter()
592        .map(|summary| {
593            let account_id = summary.account_id;
594            let details = select_full_account(conn, account_id).ok();
595            AccountInfo { summary, details }
596        })
597        .collect();
598
599    Ok(account_infos)
600}
601
602#[derive(Debug, Clone, PartialEq, Eq)]
603pub struct StorageMapValue {
604    pub block_num: BlockNumber,
605    pub slot_name: StorageSlotName,
606    pub key: StorageMapKey,
607    pub value: Word,
608}
609
610#[derive(Debug, Clone, PartialEq, Eq)]
611pub struct StorageMapValuesPage {
612    /// Highest block number included in `rows`. If the page is empty, this will be `block_from`.
613    pub last_block_included: BlockNumber,
614    /// Storage map values
615    pub values: Vec<StorageMapValue>,
616}
617
618impl StorageMapValue {
619    pub fn from_raw_row(row: StorageMapValueRow) -> Result<Self, DatabaseError> {
620        let (block_num, slot_name, key, value) = row;
621        Ok(Self {
622            block_num: BlockNumber::from_raw_sql(block_num)?,
623            slot_name: StorageSlotName::from_raw_sql(slot_name)?,
624            key: StorageMapKey::read_from_bytes(&key)?,
625            value: Word::read_from_bytes(&value)?,
626        })
627    }
628}
629
630/// Select account storage map values from the DB using the given [`SqliteConnection`].
631///
632/// # Returns
633///
634/// A vector of tuples containing `(slot, key, value, is_latest)` for the given account.
635/// Each row contains one of:
636///
637/// - the historical value for a slot and key specifically on block `block_to`
638/// - the latest updated value for the slot and key combination, alongside the block number in which
639///   it was updated
640///
641/// # Raw SQL
642///
643/// ```sql
644/// SELECT
645///     block_num,
646///     slot,
647///     key,
648///     value
649/// FROM
650///     account_storage_map_values
651/// WHERE
652///     account_id = ?1
653///     AND block_num >= ?2
654///     AND block_num <= ?3
655/// ORDER BY
656///     block_num ASC
657/// LIMIT
658///     ?4
659/// ```
660/// Select account storage map values within a block range (inclusive).
661///
662/// ## Parameters
663///
664/// * `account_id`: Account ID to query
665/// * `block_range`: Range of block numbers (inclusive)
666///
667/// ## Response
668///
669/// * Response payload size: 0 <= size <= 2MB
670/// * Storage map values per response: 0 <= count <= (2MB / (2*Word + u32 + u8)) + 1
671pub(crate) fn select_account_storage_map_values_paged(
672    conn: &mut SqliteConnection,
673    account_id: AccountId,
674    block_range: RangeInclusive<BlockNumber>,
675    limit: usize,
676) -> Result<StorageMapValuesPage, DatabaseError> {
677    use schema::account_storage_map_values as t;
678
679    if !account_id.is_public() {
680        return Err(DatabaseError::AccountNotPublic(account_id));
681    }
682
683    if block_range.is_empty() {
684        return Err(DatabaseError::InvalidBlockRange {
685            from: *block_range.start(),
686            to: *block_range.end(),
687        });
688    }
689
690    let raw: Vec<StorageMapValueRow> =
691        SelectDsl::select(t::table, (t::block_num, t::slot_name, t::key, t::value))
692            .filter(
693                t::account_id
694                    .eq(account_id.to_bytes())
695                    .and(t::block_num.ge(block_range.start().to_raw_sql()))
696                    .and(t::block_num.le(block_range.end().to_raw_sql())),
697            )
698            .order(t::block_num.asc())
699            .limit(i64::try_from(limit + 1).expect("limit fits within i64"))
700            .load(conn)?;
701
702    // If we got more rows than the limit, the last block may be incomplete so we drop it entirely
703    // and derive last_block_included from the remaining rows.
704    let (last_block_included, values) = if let Some(&(last_block_num, ..)) = raw.last()
705        && raw.len() > limit
706    {
707        let values = raw
708            .into_iter()
709            .take_while(|(bn, ..)| *bn != last_block_num)
710            .map(StorageMapValue::from_raw_row)
711            .collect::<Result<Vec<_>, DatabaseError>>()?;
712
713        let last_block_included = values.last().map_or(*block_range.start(), |v| v.block_num);
714
715        (last_block_included, values)
716    } else {
717        (
718            *block_range.end(),
719            raw.into_iter()
720                .map(StorageMapValue::from_raw_row)
721                .collect::<Result<Vec<_>, _>>()?,
722        )
723    };
724
725    Ok(StorageMapValuesPage { last_block_included, values })
726}
727
728/// Select latest account storage by querying `accounts.storage_header` where `is_latest=true`
729/// and reconstructing full storage from the header plus map values from
730/// `account_storage_map_values`.
731///
732/// Attention: For large accounts it is prohibitively expensive!
733pub(crate) fn select_latest_account_storage(
734    conn: &mut SqliteConnection,
735    account_id: AccountId,
736) -> Result<AccountStorage, DatabaseError> {
737    let (storage_header, map_entries_by_slot) =
738        select_latest_account_storage_components(conn, account_id)?;
739    // Reconstruct StorageSlots from header slots + map entries
740    let slots =
741        Result::<Vec<_>, DatabaseError>::from_iter(storage_header.slots().map(|slot_header| {
742            let slot = match slot_header.slot_type() {
743                StorageSlotType::Value => {
744                    // For value slots, the header value IS the slot value
745                    StorageSlot::with_value(slot_header.name().clone(), slot_header.value())
746                },
747                StorageSlotType::Map => {
748                    // For map slots, reconstruct from map entries
749                    let entries =
750                        map_entries_by_slot.get(slot_header.name()).cloned().unwrap_or_default();
751                    let storage_map = StorageMap::with_entries(entries.into_iter())?;
752                    StorageSlot::with_map(slot_header.name().clone(), storage_map)
753                },
754            };
755            Ok(slot)
756        }))?;
757
758    Ok(AccountStorage::new(slots)?)
759}
760
761/// Fetch account storage header and all storage maps
762pub(crate) fn select_latest_account_storage_components(
763    conn: &mut SqliteConnection,
764    account_id: AccountId,
765) -> Result<StorageHeaderWithEntries, DatabaseError> {
766    let account_id_bytes = account_id.to_bytes();
767
768    // Query storage header blob for this account where is_latest = true
769    let storage_blob: Option<Vec<u8>> =
770        SelectDsl::select(schema::accounts::table, schema::accounts::storage_header)
771            .filter(schema::accounts::account_id.eq(&account_id_bytes))
772            .filter(schema::accounts::is_latest.eq(true))
773            .first(conn)
774            .optional()?
775            .flatten();
776
777    let header = match storage_blob {
778        Some(blob) => AccountStorageHeader::read_from_bytes(&blob)?,
779        None => AccountStorageHeader::new(Vec::new())?,
780    };
781
782    let entries = select_latest_storage_map_entries_all(conn, &account_id)?;
783    Ok((header, entries))
784}
785
786// TODO this is expensive and should only be called from tests
787fn select_latest_storage_map_entries_all(
788    conn: &mut SqliteConnection,
789    account_id: &AccountId,
790) -> Result<HashMap<StorageSlotName, BTreeMap<StorageMapKey, Word>>, DatabaseError> {
791    use schema::account_storage_map_values as t;
792
793    let map_values: Vec<(String, Vec<u8>, Vec<u8>)> =
794        SelectDsl::select(t::table, (t::slot_name, t::key, t::value))
795            .filter(t::account_id.eq(&account_id.to_bytes()))
796            .filter(t::is_latest.eq(true))
797            .load(conn)?;
798
799    group_storage_map_entries(map_values)
800}
801
802fn select_latest_storage_map_entries_for_slots(
803    conn: &mut SqliteConnection,
804    account_id: &AccountId,
805    slot_names: &[StorageSlotName],
806) -> Result<HashMap<StorageSlotName, BTreeMap<StorageMapKey, Word>>, DatabaseError> {
807    use schema::account_storage_map_values as t;
808
809    if slot_names.is_empty() {
810        return Ok(HashMap::new());
811    }
812
813    if let [slot_name] = slot_names {
814        let entries = select_latest_storage_map_entries_for_slot(conn, account_id, slot_name)?;
815        if entries.is_empty() {
816            return Ok(HashMap::new());
817        }
818
819        let mut map_entries = HashMap::new();
820        map_entries.insert(slot_name.clone(), entries);
821        return Ok(map_entries);
822    }
823
824    let slot_names = Vec::from_iter(slot_names.iter().cloned().map(StorageSlotName::to_raw_sql));
825    let map_values: Vec<(String, Vec<u8>, Vec<u8>)> =
826        SelectDsl::select(t::table, (t::slot_name, t::key, t::value))
827            .filter(t::account_id.eq(&account_id.to_bytes()))
828            .filter(t::is_latest.eq(true))
829            .filter(t::slot_name.eq_any(slot_names))
830            .load(conn)?;
831
832    group_storage_map_entries(map_values)
833}
834
835fn select_latest_storage_map_entries_for_slot(
836    conn: &mut SqliteConnection,
837    account_id: &AccountId,
838    slot_name: &StorageSlotName,
839) -> Result<BTreeMap<StorageMapKey, Word>, DatabaseError> {
840    use schema::account_storage_map_values as t;
841
842    let map_values: Vec<(String, Vec<u8>, Vec<u8>)> =
843        SelectDsl::select(t::table, (t::slot_name, t::key, t::value))
844            .filter(t::account_id.eq(&account_id.to_bytes()))
845            .filter(t::is_latest.eq(true))
846            .filter(t::slot_name.eq(slot_name.clone().to_raw_sql()))
847            .load(conn)?;
848
849    Ok(group_storage_map_entries(map_values)?.remove(slot_name).unwrap_or_default())
850}
851
852fn group_storage_map_entries(
853    map_values: Vec<(String, Vec<u8>, Vec<u8>)>,
854) -> Result<HashMap<StorageSlotName, BTreeMap<StorageMapKey, Word>>, DatabaseError> {
855    let mut map_entries_by_slot: HashMap<StorageSlotName, BTreeMap<StorageMapKey, Word>> =
856        HashMap::new();
857    for (slot_name_str, key_bytes, value_bytes) in map_values {
858        let slot_name: StorageSlotName = slot_name_str.parse().map_err(|_| {
859            DatabaseError::DataCorrupted(format!("Invalid slot name: {slot_name_str}"))
860        })?;
861        let key = StorageMapKey::read_from_bytes(&key_bytes)?;
862        let value = Word::read_from_bytes(&value_bytes)?;
863        map_entries_by_slot.entry(slot_name).or_default().insert(key, value);
864    }
865
866    Ok(map_entries_by_slot)
867}
868
869// ACCOUNT MUTATION
870// ================================================================================================
871
872#[derive(Queryable, Selectable)]
873#[diesel(table_name = crate::db::schema::account_vault_assets)]
874#[diesel(check_for_backend(diesel::sqlite::Sqlite))]
875pub struct AccountVaultUpdateRaw {
876    pub vault_key: Vec<u8>,
877    pub asset: Option<Vec<u8>>,
878    pub block_num: i64,
879}
880
881impl TryFrom<AccountVaultUpdateRaw> for AccountVaultValue {
882    type Error = DatabaseError;
883
884    fn try_from(raw: AccountVaultUpdateRaw) -> Result<Self, Self::Error> {
885        let vault_key = AssetVaultKey::try_from(Word::read_from_bytes(&raw.vault_key)?)?;
886        let asset = raw.asset.map(|bytes| Asset::read_from_bytes(&bytes)).transpose()?;
887        let block_num = BlockNumber::from_raw_sql(raw.block_num)?;
888
889        Ok(AccountVaultValue { block_num, vault_key, asset })
890    }
891}
892
893#[derive(Debug, Clone, PartialEq, Eq, Selectable, Queryable, QueryableByName)]
894#[diesel(table_name = schema::accounts)]
895#[diesel(check_for_backend(Sqlite))]
896pub struct AccountSummaryRaw {
897    account_id: Vec<u8>,         // AccountId,
898    account_commitment: Vec<u8>, //RpoDigest,
899    block_num: i64,              //BlockNumber,
900}
901
902impl TryInto<AccountSummary> for AccountSummaryRaw {
903    type Error = DatabaseError;
904    fn try_into(self) -> Result<AccountSummary, Self::Error> {
905        let account_id = AccountId::read_from_bytes(&self.account_id[..])?;
906        let account_commitment = Word::read_from_bytes(&self.account_commitment[..])?;
907        let block_num = BlockNumber::from_raw_sql(self.block_num)?;
908
909        Ok(AccountSummary {
910            account_id,
911            account_commitment,
912            block_num,
913        })
914    }
915}
916
917/// Insert an account vault asset row into the DB using the given [`SqliteConnection`].
918///
919/// Sets `is_latest=true` for the new row and updates any existing
920/// row with the same `(account_id, vault_key)` tuple to `is_latest=false`.
921///
922/// # Returns
923///
924/// The number of affected rows.
925pub(crate) fn insert_account_vault_asset(
926    conn: &mut SqliteConnection,
927    account_id: AccountId,
928    block_num: BlockNumber,
929    vault_key: AssetVaultKey,
930    asset: Option<Asset>,
931) -> Result<usize, DatabaseError> {
932    let record = AccountAssetRowInsert::new(&account_id, &vault_key, block_num, asset, true);
933
934    diesel::Connection::transaction(conn, |conn| {
935        // First, update any existing rows with the same (account_id, vault_key) to set
936        // is_latest=false
937        let vault_key: Word = vault_key.into();
938        let vault_key_bytes = vault_key.to_bytes();
939        let account_id_bytes = account_id.to_bytes();
940        let update_count = diesel::update(schema::account_vault_assets::table)
941            .filter(
942                schema::account_vault_assets::account_id
943                    .eq(account_id_bytes)
944                    .and(schema::account_vault_assets::vault_key.eq(vault_key_bytes))
945                    .and(schema::account_vault_assets::is_latest.eq(true)),
946            )
947            .set(schema::account_vault_assets::is_latest.eq(false))
948            .execute(conn)?;
949
950        // Insert the new latest row
951        let insert_count = diesel::insert_into(schema::account_vault_assets::table)
952            .values(record)
953            .execute(conn)?;
954
955        Ok(update_count + insert_count)
956    })
957}
958
959/// Insert an account storage map value into the DB using the given [`SqliteConnection`].
960///
961/// Sets `is_latest=true` for the new row and updates any existing
962/// row with the same `(account_id, slot_index, key)` tuple to `is_latest=false`.
963///
964/// # Returns
965///
966/// The number of affected rows.
967pub(crate) fn insert_account_storage_map_value(
968    conn: &mut SqliteConnection,
969    account_id: AccountId,
970    block_num: BlockNumber,
971    slot_name: StorageSlotName,
972    key: StorageMapKey,
973    value: Word,
974) -> Result<usize, DatabaseError> {
975    let account_id = account_id.to_bytes();
976    let key = key.to_bytes();
977    let value = value.to_bytes();
978    let slot_name = slot_name.to_raw_sql();
979    let block_num = block_num.to_raw_sql();
980
981    let update_count = diesel::update(schema::account_storage_map_values::table)
982        .filter(
983            schema::account_storage_map_values::account_id
984                .eq(&account_id)
985                .and(schema::account_storage_map_values::slot_name.eq(&slot_name))
986                .and(schema::account_storage_map_values::key.eq(&key))
987                .and(schema::account_storage_map_values::is_latest.eq(true)),
988        )
989        .set(schema::account_storage_map_values::is_latest.eq(false))
990        .execute(conn)?;
991
992    let record = AccountStorageMapRowInsert {
993        account_id,
994        key,
995        value,
996        slot_name,
997        block_num,
998        is_latest: true,
999    };
1000    let insert_count = diesel::insert_into(schema::account_storage_map_values::table)
1001        .values(record)
1002        .execute(conn)?;
1003
1004    Ok(update_count + insert_count)
1005}
1006
1007type PendingStorageInserts = Vec<(AccountId, StorageSlotName, StorageMapKey, Word)>;
1008type PendingAssetInserts = Vec<(AccountId, AssetVaultKey, Option<Asset>)>;
1009
1010fn prepare_full_account_update(
1011    update: &BlockAccountUpdate,
1012    account: Account,
1013) -> Result<(AccountStateForInsert, PendingStorageInserts, PendingAssetInserts), DatabaseError> {
1014    let account_id = account.id();
1015
1016    // sanity check the commitment of account matches the final state commitment
1017    if account.to_commitment() != update.final_state_commitment() {
1018        return Err(DatabaseError::AccountCommitmentsMismatch {
1019            calculated: account.to_commitment(),
1020            expected: update.final_state_commitment(),
1021        });
1022    }
1023
1024    // collect storage-map inserts to apply after account upsert
1025    let mut storage = Vec::new();
1026    for slot in account.storage().slots() {
1027        if let StorageSlotContent::Map(storage_map) = slot.content() {
1028            for (key, value) in storage_map.entries() {
1029                storage.push((account_id, slot.name().clone(), *key, *value));
1030            }
1031        }
1032    }
1033
1034    // collect vault-asset inserts to apply after account upsert
1035    let mut assets = Vec::new();
1036    for asset in account.vault().assets() {
1037        // Only insert assets with non-zero values for fungible assets
1038        let should_insert = match asset {
1039            Asset::Fungible(fungible) => fungible.amount().as_u64() > 0,
1040            Asset::NonFungible(_) => true,
1041        };
1042        if should_insert {
1043            assets.push((account_id, asset.vault_key(), Some(asset)));
1044        }
1045    }
1046
1047    Ok((AccountStateForInsert::FullAccount(account), storage, assets))
1048}
1049
1050/// Prepare partial delta data for account upserts and follow-up storage and vault inserts.
1051fn prepare_partial_account_update(
1052    conn: &mut SqliteConnection,
1053    update: &BlockAccountUpdate,
1054    account_id: AccountId,
1055    delta: &miden_protocol::account::delta::AccountDelta,
1056) -> Result<(AccountStateForInsert, PendingStorageInserts, PendingAssetInserts), DatabaseError> {
1057    // Build the minimal account state needed for partial delta application. Only load the storage
1058    // map entries and vault balances that will receive updates. The next line fetches the header,
1059    // which will always change unless the delta is empty.
1060    let state_headers = select_minimal_account_state_headers(conn, account_id)?;
1061
1062    // --- Process asset updates. --------------------------------- Look up balances by vault key
1063    // (which includes the callback flag), not by faucet id.
1064    let vault_keys =
1065        Vec::from_iter(delta.vault().fungible().iter().map(|(vault_key, _)| *vault_key));
1066    let prev_balances = select_vault_balances_by_vault_keys(conn, account_id, &vault_keys)?;
1067
1068    // Encode `Some` as update and `None` as removal.
1069    let mut assets = Vec::new();
1070
1071    // Update fungible assets.
1072    for (vault_key, amount_delta) in delta.vault().fungible().iter() {
1073        let faucet_id = vault_key.faucet_id();
1074        let callback_flag = vault_key.callback_flag();
1075        let prev_amount = prev_balances.get(&vault_key.to_word()).copied().unwrap_or(0);
1076        let prev_asset = FungibleAsset::new(faucet_id, prev_amount)?.with_callbacks(callback_flag);
1077        let amount_abs = amount_delta.unsigned_abs();
1078        let delta = FungibleAsset::new(faucet_id, amount_abs)?.with_callbacks(callback_flag);
1079        let new_balance = if *amount_delta < 0 {
1080            prev_asset.sub(delta)?
1081        } else {
1082            prev_asset.add(delta)?
1083        };
1084        let update_or_remove = if new_balance.amount().as_u64() == 0 {
1085            None
1086        } else {
1087            Some(Asset::from(new_balance))
1088        };
1089        assets.push((account_id, new_balance.vault_key(), update_or_remove));
1090    }
1091
1092    // Update non-fungible assets.
1093    for (asset, delta_action) in delta.vault().non_fungible().iter() {
1094        let asset_update = match delta_action {
1095            NonFungibleDeltaAction::Add => Some(Asset::NonFungible(*asset)),
1096            NonFungibleDeltaAction::Remove => None,
1097        };
1098        assets.push((account_id, asset.vault_key(), asset_update));
1099    }
1100
1101    // --- Collect storage map updates. ---------------------------
1102
1103    let mut storage = Vec::new();
1104    for (slot_name, map_delta) in delta.storage().maps() {
1105        for (key, value) in map_delta.entries() {
1106            storage.push((account_id, slot_name.clone(), *key, *value));
1107        }
1108    }
1109
1110    // First collect entries that have associated changes.
1111    let slot_names = Vec::from_iter(delta.storage().maps().filter_map(|(slot_name, map_delta)| {
1112        if map_delta.is_empty() {
1113            None
1114        } else {
1115            Some(slot_name.clone())
1116        }
1117    }));
1118
1119    let map_entries = select_latest_storage_map_entries_for_slots(conn, &account_id, &slot_names)?;
1120
1121    // Apply the delta storage to the given storage header.
1122    let new_storage_header =
1123        apply_storage_delta(&state_headers.storage_header, delta.storage(), &map_entries)?;
1124
1125    // --- Update the vault root by constructing the asset vault from DB.
1126    let new_vault_root = {
1127        let assets = select_latest_vault_assets(conn, account_id)?;
1128        let mut vault = AssetVault::new(&assets)?;
1129        vault.apply_delta(delta.vault())?;
1130        vault.root()
1131    };
1132
1133    // --- Compute updated account state for the accounts row. --- Apply nonce delta.
1134    let new_nonce_value = state_headers
1135        .nonce
1136        .as_canonical_u64()
1137        .checked_add(delta.nonce_delta().as_canonical_u64())
1138        .ok_or_else(|| {
1139            DatabaseError::DataCorrupted(format!("Nonce overflow for account {account_id}"))
1140        })?;
1141    let new_nonce = Felt::new_unchecked(new_nonce_value);
1142
1143    // Create minimal account state data for the row insert.
1144    let account_state = PartialAccountState {
1145        nonce: new_nonce,
1146        code_commitment: state_headers.code_commitment,
1147        storage_header: new_storage_header,
1148        vault_root: new_vault_root,
1149    };
1150
1151    let account_header = miden_protocol::account::AccountHeader::new(
1152        account_id,
1153        account_state.nonce,
1154        account_state.vault_root,
1155        account_state.storage_header.to_commitment(),
1156        account_state.code_commitment,
1157    );
1158
1159    if account_header.to_commitment() != update.final_state_commitment() {
1160        return Err(DatabaseError::AccountCommitmentsMismatch {
1161            calculated: account_header.to_commitment(),
1162            expected: update.final_state_commitment(),
1163        });
1164    }
1165
1166    Ok((AccountStateForInsert::PartialState(account_state), storage, assets))
1167}
1168
1169/// Returns the subset of `account_ids` whose latest committed state is a network account.
1170///
1171/// Unknown ids and non-network accounts are silently omitted.
1172pub(crate) fn select_network_accounts_subset(
1173    conn: &mut SqliteConnection,
1174    account_ids: &[AccountId],
1175) -> Result<HashSet<AccountId>, DatabaseError> {
1176    QueryParamAccountIdLimit::check(account_ids.len())?;
1177    let id_bytes: Vec<Vec<u8>> =
1178        account_ids.iter().map(miden_crypto::utils::Serializable::to_bytes).collect();
1179
1180    let rows: Vec<Vec<u8>> =
1181        SelectDsl::select(schema::accounts::table, schema::accounts::account_id)
1182            .filter(
1183                schema::accounts::account_id
1184                    .eq_any(&id_bytes)
1185                    .and(
1186                        schema::accounts::network_account_type
1187                            .eq(NetworkAccountType::Network.to_raw_sql()),
1188                    )
1189                    .and(schema::accounts::is_latest.eq(true)),
1190            )
1191            .load::<Vec<u8>>(conn)
1192            .map_err(DatabaseError::Diesel)?;
1193
1194    rows.into_iter()
1195        .map(|bytes| {
1196            AccountId::read_from_bytes(&bytes).map_err(DatabaseError::DeserializationError)
1197        })
1198        .collect()
1199}
1200
1201/// Attention: Assumes the account details are NOT null! The schema explicitly allows this though!
1202#[tracing::instrument(
1203    target = COMPONENT,
1204    skip_all,
1205    err,
1206)]
1207#[expect(clippy::too_many_lines)]
1208pub(crate) fn upsert_accounts(
1209    conn: &mut SqliteConnection,
1210    accounts: &[BlockAccountUpdate],
1211    block_num: BlockNumber,
1212) -> Result<usize, DatabaseError> {
1213    let mut count = 0;
1214    for update in accounts {
1215        let account_id = update.account_id();
1216        let account_id_bytes = account_id.to_bytes();
1217
1218        // Pull the latest row (if any) so we can carry forward `created_at_block` and the
1219        // `network_account_type` classification, both of which are fixed at account creation.
1220        let existing: Option<(i64, i32)> = QueryDsl::select(
1221            schema::accounts::table.filter(
1222                schema::accounts::account_id
1223                    .eq(&account_id_bytes)
1224                    .and(schema::accounts::is_latest.eq(true)),
1225            ),
1226            (schema::accounts::created_at_block, schema::accounts::network_account_type),
1227        )
1228        .first(conn)
1229        .optional()
1230        .map_err(DatabaseError::Diesel)?;
1231
1232        let created_at_block = match existing {
1233            Some((raw, _)) => BlockNumber::from_raw_sql(raw)?,
1234            None => block_num,
1235        };
1236
1237        // NOTE: we collect storage / asset inserts to apply them only after the account row is
1238        // written. The storage and vault tables have FKs pointing to accounts `(account_id,
1239        // block_num)`, so inserting them earlier would violate those constraints when inserting a
1240        // brand-new account.
1241        let (account_state, pending_storage_inserts, pending_asset_inserts) = match update.details()
1242        {
1243            AccountUpdateDetails::Private => (AccountStateForInsert::Private, vec![], vec![]),
1244
1245            // New account is always a full account, but also comes as an update
1246            AccountUpdateDetails::Delta(delta) if delta.is_full_state() => {
1247                let account = Account::try_from(delta)
1248                    .expect("Delta to full account always works for full state deltas");
1249                debug_assert_eq!(account_id, account.id());
1250
1251                prepare_full_account_update(update, account)?
1252            },
1253
1254            // Update of an existing account
1255            AccountUpdateDetails::Delta(delta) => {
1256                prepare_partial_account_update(conn, update, account_id, delta)?
1257            },
1258        };
1259
1260        // Inherit the classification when the account already exists; otherwise classify it once at
1261        // creation based on the new state.
1262        let network_account_type = match existing {
1263            Some((_, raw)) => NetworkAccountType::from_raw_sql(raw)?,
1264            None => match &account_state {
1265                AccountStateForInsert::FullAccount(account)
1266                    if NetworkAccount::new(account.clone()).is_ok() =>
1267                {
1268                    NetworkAccountType::Network
1269                },
1270                _ => NetworkAccountType::None,
1271            },
1272        };
1273
1274        // Insert account _code_ for full accounts (new account creation)
1275        if let AccountStateForInsert::FullAccount(ref account) = account_state {
1276            let code = account.code();
1277            let code_value = AccountCodeRowInsert {
1278                code_commitment: code.commitment().to_bytes(),
1279                code: code.to_bytes(),
1280            };
1281            diesel::insert_into(schema::account_codes::table)
1282                .values(&code_value)
1283                .on_conflict(schema::account_codes::code_commitment)
1284                .do_nothing()
1285                .execute(conn)?;
1286        }
1287
1288        // mark previous rows as non-latest and insert NEW account row
1289        diesel::update(schema::accounts::table)
1290            .filter(
1291                schema::accounts::account_id
1292                    .eq(&account_id_bytes)
1293                    .and(schema::accounts::is_latest.eq(true)),
1294            )
1295            .set(schema::accounts::is_latest.eq(false))
1296            .execute(conn)?;
1297
1298        let account_value = match &account_state {
1299            AccountStateForInsert::Private => AccountRowInsert::new_private(
1300                account_id,
1301                network_account_type,
1302                update.final_state_commitment(),
1303                block_num,
1304                created_at_block,
1305            ),
1306            AccountStateForInsert::FullAccount(account) => AccountRowInsert::new_from_account(
1307                account_id,
1308                network_account_type,
1309                update.final_state_commitment(),
1310                block_num,
1311                created_at_block,
1312                account,
1313            ),
1314            AccountStateForInsert::PartialState(state) => AccountRowInsert::new_from_partial(
1315                account_id,
1316                network_account_type,
1317                update.final_state_commitment(),
1318                block_num,
1319                created_at_block,
1320                state,
1321            ),
1322        };
1323
1324        diesel::insert_into(schema::accounts::table)
1325            .values(&account_value)
1326            .on_conflict((schema::accounts::account_id, schema::accounts::block_num))
1327            .do_update()
1328            .set(&account_value)
1329            .execute(conn)?;
1330
1331        // insert pending storage map entries TODO consider batching
1332        for (acc_id, slot_name, key, value) in pending_storage_inserts {
1333            insert_account_storage_map_value(conn, acc_id, block_num, slot_name, key, value)?;
1334        }
1335
1336        for (acc_id, vault_key, update) in pending_asset_inserts {
1337            insert_account_vault_asset(conn, acc_id, block_num, vault_key, update)?;
1338        }
1339
1340        count += 1;
1341    }
1342
1343    Ok(count)
1344}
1345
1346#[derive(Insertable, Debug, Clone)]
1347#[diesel(table_name = schema::account_codes)]
1348pub(crate) struct AccountCodeRowInsert {
1349    pub(crate) code_commitment: Vec<u8>,
1350    pub(crate) code: Vec<u8>,
1351}
1352
1353#[derive(Insertable, AsChangeset, Debug, Clone)]
1354#[diesel(table_name = schema::accounts)]
1355pub(crate) struct AccountRowInsert {
1356    pub(crate) account_id: Vec<u8>,
1357    pub(crate) network_account_type: i32,
1358    pub(crate) block_num: i64,
1359    pub(crate) account_commitment: Vec<u8>,
1360    pub(crate) code_commitment: Option<Vec<u8>>,
1361    pub(crate) nonce: Option<i64>,
1362    pub(crate) storage_header: Option<Vec<u8>>,
1363    pub(crate) vault_root: Option<Vec<u8>>,
1364    pub(crate) is_latest: bool,
1365    pub(crate) created_at_block: i64,
1366}
1367
1368impl AccountRowInsert {
1369    /// Creates an insert row for a private account (no public state).
1370    pub(crate) fn new_private(
1371        account_id: AccountId,
1372        network_account_type: NetworkAccountType,
1373        account_commitment: Word,
1374        block_num: BlockNumber,
1375        created_at_block: BlockNumber,
1376    ) -> Self {
1377        Self {
1378            account_id: account_id.to_bytes(),
1379            network_account_type: network_account_type.to_raw_sql(),
1380            account_commitment: account_commitment.to_bytes(),
1381            block_num: block_num.to_raw_sql(),
1382            nonce: None,
1383            code_commitment: None,
1384            storage_header: None,
1385            vault_root: None,
1386            is_latest: true,
1387            created_at_block: created_at_block.to_raw_sql(),
1388        }
1389    }
1390
1391    /// Creates an insert row from a full account (new account creation).
1392    fn new_from_account(
1393        account_id: AccountId,
1394        network_account_type: NetworkAccountType,
1395        account_commitment: Word,
1396        block_num: BlockNumber,
1397        created_at_block: BlockNumber,
1398        account: &Account,
1399    ) -> Self {
1400        Self {
1401            account_id: account_id.to_bytes(),
1402            network_account_type: network_account_type.to_raw_sql(),
1403            account_commitment: account_commitment.to_bytes(),
1404            block_num: block_num.to_raw_sql(),
1405            nonce: Some(nonce_to_raw_sql(account.nonce())),
1406            code_commitment: Some(account.code().commitment().to_bytes()),
1407            storage_header: Some(account.storage().to_header().to_bytes()),
1408            vault_root: Some(account.vault().root().to_bytes()),
1409            is_latest: true,
1410            created_at_block: created_at_block.to_raw_sql(),
1411        }
1412    }
1413
1414    /// Creates an insert row from a partial account state (delta update).
1415    fn new_from_partial(
1416        account_id: AccountId,
1417        network_account_type: NetworkAccountType,
1418        account_commitment: Word,
1419        block_num: BlockNumber,
1420        created_at_block: BlockNumber,
1421        state: &PartialAccountState,
1422    ) -> Self {
1423        Self {
1424            account_id: account_id.to_bytes(),
1425            network_account_type: network_account_type.to_raw_sql(),
1426            account_commitment: account_commitment.to_bytes(),
1427            block_num: block_num.to_raw_sql(),
1428            nonce: Some(nonce_to_raw_sql(state.nonce)),
1429            code_commitment: Some(state.code_commitment.to_bytes()),
1430            storage_header: Some(state.storage_header.to_bytes()),
1431            vault_root: Some(state.vault_root.to_bytes()),
1432            is_latest: true,
1433            created_at_block: created_at_block.to_raw_sql(),
1434        }
1435    }
1436}
1437
1438#[derive(Insertable, AsChangeset, Debug, Clone)]
1439#[diesel(table_name = schema::account_vault_assets)]
1440pub(crate) struct AccountAssetRowInsert {
1441    pub(crate) account_id: Vec<u8>,
1442    pub(crate) block_num: i64,
1443    pub(crate) vault_key: Vec<u8>,
1444    pub(crate) asset: Option<Vec<u8>>,
1445    pub(crate) is_latest: bool,
1446}
1447
1448impl AccountAssetRowInsert {
1449    pub(crate) fn new(
1450        account_id: &AccountId,
1451        vault_key: &AssetVaultKey,
1452        block_num: BlockNumber,
1453        asset: Option<Asset>,
1454        is_latest: bool,
1455    ) -> Self {
1456        let account_id = account_id.to_bytes();
1457        let vault_key: Word = (*vault_key).into();
1458        let vault_key = vault_key.to_bytes();
1459        let block_num = block_num.to_raw_sql();
1460        let asset = asset.map(|asset| asset.to_bytes());
1461        Self {
1462            account_id,
1463            block_num,
1464            vault_key,
1465            asset,
1466            is_latest,
1467        }
1468    }
1469}
1470
1471#[derive(Insertable, AsChangeset, Debug, Clone)]
1472#[diesel(table_name = schema::account_storage_map_values)]
1473pub(crate) struct AccountStorageMapRowInsert {
1474    pub(crate) account_id: Vec<u8>,
1475    pub(crate) block_num: i64,
1476    pub(crate) slot_name: String,
1477    pub(crate) key: Vec<u8>,
1478    pub(crate) value: Vec<u8>,
1479    pub(crate) is_latest: bool,
1480}
1481
1482// CLEANUP FUNCTIONS
1483// ================================================================================================
1484
1485/// Number of historical blocks to retain for vault assets, storage map values, and account codes.
1486/// Entries older than `chain_tip - HISTORICAL_BLOCK_RETENTION` will be deleted, except for entries
1487/// marked with `is_latest=true` which are always retained.
1488pub const HISTORICAL_BLOCK_RETENTION: u32 = 50;
1489
1490/// Clean up old entries for all accounts, deleting entries older than the retention window.
1491///
1492/// Deletes rows where `block_num < chain_tip - HISTORICAL_BLOCK_RETENTION` and `is_latest = false`
1493/// for vault assets and storage map values. Also deletes account codes that are no longer
1494/// referenced by any account row within the retention window.
1495///
1496/// # Returns
1497/// A tuple of `(vault_assets_deleted, storage_map_values_deleted, account_codes_deleted)`
1498#[tracing::instrument(
1499    target = COMPONENT,
1500    skip_all,
1501    err,
1502    fields(cutoff_block),
1503)]
1504pub(crate) fn prune_history(
1505    conn: &mut SqliteConnection,
1506    chain_tip: BlockNumber,
1507) -> Result<(usize, usize, usize), DatabaseError> {
1508    let cutoff_block = i64::from(chain_tip.as_u32().saturating_sub(HISTORICAL_BLOCK_RETENTION));
1509    tracing::Span::current().record("cutoff_block", cutoff_block);
1510    let vault_deleted = prune_account_vault_assets(conn, cutoff_block)?;
1511    let storage_deleted = prune_account_storage_map_values(conn, cutoff_block)?;
1512    let codes_deleted = prune_account_codes(conn, cutoff_block)?;
1513
1514    Ok((vault_deleted, storage_deleted, codes_deleted))
1515}
1516
1517#[tracing::instrument(
1518    target = COMPONENT,
1519    skip_all,
1520    err,
1521    fields(cutoff_block),
1522)]
1523fn prune_account_vault_assets(
1524    conn: &mut SqliteConnection,
1525    cutoff_block: i64,
1526) -> Result<usize, DatabaseError> {
1527    diesel::delete(
1528        schema::account_vault_assets::table.filter(
1529            schema::account_vault_assets::block_num
1530                .lt(cutoff_block)
1531                .and(schema::account_vault_assets::is_latest.eq(false)),
1532        ),
1533    )
1534    .execute(conn)
1535    .map_err(DatabaseError::Diesel)
1536}
1537
1538#[tracing::instrument(
1539    target = COMPONENT,
1540    skip_all,
1541    err,
1542    fields(cutoff_block),
1543)]
1544fn prune_account_storage_map_values(
1545    conn: &mut SqliteConnection,
1546    cutoff_block: i64,
1547) -> Result<usize, DatabaseError> {
1548    diesel::delete(
1549        schema::account_storage_map_values::table.filter(
1550            schema::account_storage_map_values::block_num
1551                .lt(cutoff_block)
1552                .and(schema::account_storage_map_values::is_latest.eq(false)),
1553        ),
1554    )
1555    .execute(conn)
1556    .map_err(DatabaseError::Diesel)
1557}
1558
1559/// Deletes account codes that are no longer referenced by any account row within the retention
1560/// window.
1561///
1562/// An account code is safe to delete when no `accounts` row with `block_num >= cutoff_block`
1563/// references its `code_commitment`. This covers both active accounts (`is_latest=true`) and
1564/// recent historical rows that still fall within the retention window.
1565///
1566/// The `UNION ALL` shape and explicit index selections avoid SQLite choosing
1567/// `idx_accounts_code_commitment` for the whole predicate, which is expensive when the account
1568/// history table has millions of public rows.
1569#[tracing::instrument(
1570    target = COMPONENT,
1571    skip_all,
1572    err,
1573    fields(cutoff_block),
1574)]
1575fn prune_account_codes(
1576    conn: &mut SqliteConnection,
1577    cutoff_block: i64,
1578) -> Result<usize, DatabaseError> {
1579    use diesel::sql_types::BigInt;
1580
1581    diesel::sql_query(
1582        "DELETE FROM account_codes \
1583         WHERE code_commitment NOT IN ( \
1584             SELECT DISTINCT code_commitment \
1585             FROM ( \
1586                 SELECT code_commitment \
1587                 FROM accounts INDEXED BY idx_accounts_prune_code \
1588                 WHERE code_commitment IS NOT NULL \
1589                   AND block_num >= ?1 \
1590                 UNION ALL \
1591                 SELECT code_commitment \
1592                 FROM accounts INDEXED BY idx_accounts_latest_code_commitment \
1593                 WHERE code_commitment IS NOT NULL \
1594                   AND is_latest = 1 \
1595             ) \
1596         )",
1597    )
1598    .bind::<BigInt, _>(cutoff_block)
1599    .execute(conn)
1600    .map_err(DatabaseError::Diesel)
1601}