nyxd_scraper_shared/storage/
mod.rs

1// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::error::ScraperError;
5use async_trait::async_trait;
6use thiserror::Error;
7use tracing::warn;
8
9pub use crate::ParsedTransactionResponse;
10pub use crate::block_processor::types::FullBlockInformation;
11pub use tendermint::Block;
12pub use tendermint::block::{Commit, CommitSig};
13pub use tendermint_rpc::endpoint::validators;
14
15pub mod helpers;
16
17// a workaround for needing associated type (which is a no-no in dynamic dispatch)
18#[derive(Error, Debug)]
19#[error(transparent)]
20pub struct NyxdScraperStorageError(Box<dyn std::error::Error + Send + Sync>);
21
22impl NyxdScraperStorageError {
23    pub fn new<E>(error: E) -> Self
24    where
25        E: std::error::Error + Send + Sync + 'static,
26    {
27        NyxdScraperStorageError(Box::new(error))
28    }
29}
30
31#[async_trait]
32pub trait NyxdScraperStorage: Clone + Sized {
33    type StorageTransaction: NyxdScraperTransaction;
34
35    /// Either connection string (postgres) or storage path (sqlite)
36    async fn initialise(storage: &str) -> Result<Self, NyxdScraperStorageError>;
37
38    async fn begin_processing_tx(
39        &self,
40    ) -> Result<Self::StorageTransaction, NyxdScraperStorageError>;
41
42    async fn get_last_processed_height(&self) -> Result<i64, NyxdScraperStorageError>;
43
44    async fn get_pruned_height(&self) -> Result<i64, NyxdScraperStorageError>;
45
46    async fn lowest_block_height(&self) -> Result<Option<i64>, NyxdScraperStorageError>;
47
48    async fn prune_storage(
49        &self,
50        oldest_to_keep: u32,
51        current_height: u32,
52    ) -> Result<(), NyxdScraperStorageError>;
53}
54
55#[async_trait]
56pub trait NyxdScraperTransaction {
57    async fn commit(mut self) -> Result<(), NyxdScraperStorageError>;
58
59    async fn persist_validators(
60        &mut self,
61        validators: &validators::Response,
62    ) -> Result<(), NyxdScraperStorageError>;
63
64    async fn persist_block_data(
65        &mut self,
66        block: &Block,
67        total_gas: i64,
68    ) -> Result<(), NyxdScraperStorageError>;
69
70    async fn persist_commits(
71        &mut self,
72        commits: &Commit,
73        validators: &validators::Response,
74    ) -> Result<(), NyxdScraperStorageError>;
75
76    async fn persist_txs(
77        &mut self,
78        txs: &[ParsedTransactionResponse],
79    ) -> Result<(), NyxdScraperStorageError>;
80
81    async fn persist_messages(
82        &mut self,
83        txs: &[ParsedTransactionResponse],
84    ) -> Result<(), NyxdScraperStorageError>;
85
86    async fn update_last_processed(&mut self, height: i64) -> Result<(), NyxdScraperStorageError>;
87}
88
89pub async fn persist_block<Tx>(
90    block: &FullBlockInformation,
91    tx: &mut Tx,
92    store_precommits: bool,
93) -> Result<(), ScraperError>
94where
95    Tx: NyxdScraperTransaction,
96{
97    let total_gas = crate::helpers::tx_gas_sum(&block.transactions);
98
99    // SANITY CHECK: make sure the block proposer is present in the validator set
100    block.ensure_proposer()?;
101
102    tx.persist_validators(&block.validators).await?;
103
104    tx.persist_block_data(&block.block, total_gas).await?;
105
106    if store_precommits {
107        if let Some(commit) = &block.block.last_commit {
108            tx.persist_commits(commit, &block.validators).await?;
109        } else {
110            warn!("no commits for block {}", block.block.header.height)
111        }
112    }
113
114    // persist txs
115    tx.persist_txs(&block.transactions).await?;
116
117    // persist messages (inside the transactions)
118    tx.persist_messages(&block.transactions).await?;
119
120    tx.update_last_processed(block.block.header.height.into())
121        .await?;
122
123    Ok(())
124}