chaindexing/
config.rs

1use std::sync::Arc;
2
3use tokio::sync::Mutex;
4
5use crate::chain_reorg::MinConfirmationCount;
6use crate::chains::Chain;
7use crate::nodes::{self, NodeHeartbeat};
8use crate::pruning::PruningConfig;
9use crate::{ChaindexingRepo, Contract};
10
11pub enum ConfigError {
12    NoContract,
13    NoChain,
14}
15
16impl std::fmt::Debug for ConfigError {
17    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18        match self {
19            ConfigError::NoContract => {
20                write!(f, "At least one contract is required")
21            }
22            ConfigError::NoChain => {
23                write!(f, "At least one chain is required")
24            }
25        }
26    }
27}
28
29/// Used to configure managing a chaindexing's node heartbeat
30/// to cut down JSON-RPC's (Alchemy, Infura, etc.) cost.
31#[derive(Clone, Debug)]
32pub struct OptimizationConfig {
33    pub(crate) node_heartbeat: NodeHeartbeat,
34    pub(crate) start_after_in_secs: u64,
35}
36
37impl OptimizationConfig {
38    /// Optimization starts after the seconds specified here.
39    /// This is the typically the estimated time to complete initial indexing
40    /// i.e. the estimated time in seconds for chaindexing to reach
41    /// the current block for all chains being indexed.
42    pub fn new(node_heartbeat: &NodeHeartbeat, start_after_in_secs: u64) -> Self {
43        Self {
44            node_heartbeat: node_heartbeat.clone(),
45            start_after_in_secs,
46        }
47    }
48}
49
50/// Configuration for indexing states
51#[derive(Clone, Debug)]
52pub struct Config<SharedState: Sync + Send + Clone> {
53    pub chains: Vec<Chain>,
54    pub repo: ChaindexingRepo,
55    pub contracts: Vec<Contract<SharedState>>,
56    pub(crate) min_confirmation_count: MinConfirmationCount,
57    pub blocks_per_batch: u64,
58    pub handler_rate_ms: u64,
59    pub ingestion_rate_ms: u64,
60    pub chain_concurrency: u32,
61    node_election_rate_ms: Option<u64>,
62    pub reset_count: u64,
63    pub(crate) reset_including_side_effects_count: u64,
64    pub reset_queries: Vec<String>,
65    pub shared_state: Option<Arc<Mutex<SharedState>>>,
66    pub max_concurrent_node_count: u16,
67    pub optimization_config: Option<OptimizationConfig>,
68    pub(crate) pruning_config: Option<PruningConfig>,
69}
70
71impl<SharedState: Sync + Send + Clone> Config<SharedState> {
72    pub fn new(repo: ChaindexingRepo) -> Self {
73        Self {
74            repo,
75            chains: vec![],
76            contracts: vec![],
77            min_confirmation_count: MinConfirmationCount::new(40),
78            blocks_per_batch: 450,
79            handler_rate_ms: 4_000,
80            ingestion_rate_ms: 20_000,
81            chain_concurrency: 4,
82            node_election_rate_ms: None,
83            reset_count: 0,
84            reset_including_side_effects_count: 0,
85            reset_queries: vec![],
86            shared_state: None,
87            max_concurrent_node_count: nodes::DEFAULT_MAX_CONCURRENT_NODE_COUNT,
88            optimization_config: None,
89            pruning_config: None,
90        }
91    }
92
93    // Includes chain in config
94    pub fn add_chain(mut self, chain: Chain) -> Self {
95        self.chains.push(chain);
96
97        self
98    }
99
100    // Includes contract in config
101    pub fn add_contract(mut self, contract: Contract<SharedState>) -> Self {
102        self.contracts.push(contract);
103
104        self
105    }
106
107    /// Allows managing derived app states (derived from indexed states)
108    pub fn add_reset_query(mut self, reset_query: &str) -> Self {
109        self.reset_queries.push(reset_query.to_string());
110
111        self
112    }
113
114    /// Restarts indexing from scratch for EventHandlers. SideEffectHandlers
115    /// will not run if they ran already
116    pub fn reset(mut self, count: u64) -> Self {
117        self.reset_count = count;
118
119        self
120    }
121
122    /// Restarts indexing from scratch for all Handlers. SideEffectHandlers
123    /// will RUN even if they ran already
124    pub fn reset_including_side_effects_dangerously(mut self, count: u64) -> Self {
125        self.reset_including_side_effects_count = count;
126
127        self
128    }
129
130    /// Defines the initial state for side effect handlers
131    pub fn with_initial_state(mut self, initial_state: SharedState) -> Self {
132        self.shared_state = Some(Arc::new(Mutex::new(initial_state)));
133
134        self
135    }
136
137    /// The minimum confirmation count for detecting chain-reorganizations or uncled blocks
138    pub fn with_min_confirmation_count(mut self, min_confirmation_count: u8) -> Self {
139        self.min_confirmation_count = MinConfirmationCount::new(min_confirmation_count);
140
141        self
142    }
143
144    /// Advance config: How many blocks per batch should be ingested and handled.
145    /// Default is 8_000
146    pub fn with_blocks_per_batch(mut self, blocks_per_batch: u64) -> Self {
147        self.blocks_per_batch = blocks_per_batch;
148
149        self
150    }
151
152    /// Advance config: How often should the events handlers processes run.
153    /// Default is 4_000
154    pub fn with_handler_rate_ms(mut self, handler_rate_ms: u64) -> Self {
155        self.handler_rate_ms = handler_rate_ms;
156
157        self
158    }
159
160    /// Advance config:  How often should the events ingester processes run.
161    /// Default is 20_000
162    pub fn with_ingestion_rate_ms(mut self, ingestion_rate_ms: u64) -> Self {
163        self.ingestion_rate_ms = ingestion_rate_ms;
164
165        self
166    }
167
168    /// Configures number of chain batches to be processed concurrently
169    pub fn with_chain_concurrency(mut self, chain_concurrency: u32) -> Self {
170        self.chain_concurrency = chain_concurrency;
171
172        self
173    }
174
175    pub fn with_node_election_rate_ms(mut self, node_election_rate_ms: u64) -> Self {
176        self.node_election_rate_ms = Some(node_election_rate_ms);
177
178        self
179    }
180
181    pub fn with_max_concurrent_node_count(mut self, max_concurrent_node_count: u16) -> Self {
182        self.max_concurrent_node_count = max_concurrent_node_count;
183
184        self
185    }
186
187    /// Deletes stale events and related-internal data
188    pub fn with_pruning(mut self) -> Self {
189        self.pruning_config = Some(Default::default());
190
191        self
192    }
193
194    pub fn with_prune_n_blocks_away(mut self, prune_n_blocks_away: u64) -> Self {
195        self.pruning_config = Some(PruningConfig {
196            prune_n_blocks_away,
197            ..self.pruning_config.unwrap_or_default()
198        });
199
200        self
201    }
202
203    pub fn with_prune_interval(mut self, prune_interval: u64) -> Self {
204        self.pruning_config = Some(PruningConfig {
205            prune_interval,
206            ..self.pruning_config.unwrap_or_default()
207        });
208
209        self
210    }
211
212    /// This enables optimization for indexing with the CAVEAT that you have to
213    /// manually keep chaindexing alive e.g. when a user enters certain pages
214    /// in your DApp
215    pub fn enable_optimization(mut self, optimization_config: &OptimizationConfig) -> Self {
216        self.optimization_config = Some(optimization_config.clone());
217
218        self
219    }
220    pub fn is_optimization_enabled(&self) -> bool {
221        self.optimization_config.is_some()
222    }
223
224    pub(super) fn get_node_election_rate_ms(&self) -> u64 {
225        self.node_election_rate_ms.unwrap_or(self.ingestion_rate_ms)
226    }
227
228    pub(super) fn validate(&self) -> Result<(), ConfigError> {
229        if self.contracts.is_empty() {
230            Err(ConfigError::NoContract)
231        } else if self.chains.is_empty() {
232            Err(ConfigError::NoChain)
233        } else {
234            Ok(())
235        }
236    }
237}