Skip to main content

zebra_state/service/
finalized_state.rs

1//! The primary implementation of the `zebra_state::Service` built upon rocksdb.
2//!
3//! Zebra's database is implemented in 4 layers:
4//! - [`FinalizedState`]: queues, validates, and commits blocks, using...
5//! - [`ZebraDb`]: reads and writes [`zebra_chain`] types to the state database, using...
6//! - [`DiskDb`]: reads and writes generic types to any column family in the database, using...
7//! - [`disk_format`]: converts types to raw database bytes.
8//!
9//! These layers allow us to split [`zebra_chain`] types for efficient database storage.
10//! They reduce the risk of data corruption bugs, runtime inconsistencies, and panics.
11//!
12//! # Correctness
13//!
14//! [`crate::constants::state_database_format_version_in_code()`] must be incremented
15//! each time the database format (column, serialization, etc) changes.
16
17use std::{
18    io::{stderr, stdout, Write},
19    sync::Arc,
20};
21
22use zebra_chain::{block, parallel::tree::NoteCommitmentTrees, parameters::Network};
23use zebra_db::{
24    chain::BLOCK_INFO,
25    transparent::{BALANCE_BY_TRANSPARENT_ADDR, TX_LOC_BY_SPENT_OUT_LOC},
26};
27
28use crate::{
29    constants::{state_database_format_version_in_code, STATE_DATABASE_KIND},
30    error::CommitCheckpointVerifiedError,
31    request::{FinalizableBlock, FinalizedBlock, Treestate},
32    service::{check, QueuedCheckpointVerified},
33    CheckpointVerifiedBlock, Config, ValidateContextError,
34};
35
36pub mod column_family;
37
38mod disk_db;
39mod disk_format;
40mod zebra_db;
41
42#[cfg(any(test, feature = "proptest-impl"))]
43mod arbitrary;
44
45#[cfg(test)]
46mod tests;
47
48#[allow(unused_imports)]
49pub use column_family::{TypedColumnFamily, WriteTypedBatch};
50#[allow(unused_imports)]
51pub use disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk};
52#[allow(unused_imports)]
53pub use disk_format::{
54    FromDisk, IntoDisk, OutputLocation, RawBytes, TransactionIndex, TransactionLocation,
55    MAX_ON_DISK_HEIGHT,
56};
57pub use zebra_db::ZebraDb;
58
59#[cfg(any(test, feature = "proptest-impl"))]
60pub use disk_format::KV;
61
62pub use disk_format::upgrade::restorable_db_versions;
63
64/// The column families supported by the running `zebra-state` database code.
65///
66/// Existing column families that aren't listed here are preserved when the database is opened.
67pub const STATE_COLUMN_FAMILIES_IN_CODE: &[&str] = &[
68    // Blocks
69    "hash_by_height",
70    "height_by_hash",
71    "block_header_by_height",
72    // Transactions
73    "tx_by_loc",
74    "hash_by_tx_loc",
75    "tx_loc_by_hash",
76    // Transparent
77    BALANCE_BY_TRANSPARENT_ADDR,
78    "tx_loc_by_transparent_addr_loc",
79    "utxo_by_out_loc",
80    "utxo_loc_by_transparent_addr_loc",
81    TX_LOC_BY_SPENT_OUT_LOC,
82    // Sprout
83    "sprout_nullifiers",
84    "sprout_anchors",
85    "sprout_note_commitment_tree",
86    // Sapling
87    "sapling_nullifiers",
88    "sapling_anchors",
89    "sapling_note_commitment_tree",
90    "sapling_note_commitment_subtree",
91    // Orchard
92    "orchard_nullifiers",
93    "orchard_anchors",
94    "orchard_note_commitment_tree",
95    "orchard_note_commitment_subtree",
96    // Chain
97    "history_tree",
98    "tip_chain_value_pool",
99    BLOCK_INFO,
100];
101
102/// The finalized part of the chain state, stored in the db.
103///
104/// `rocksdb` allows concurrent writes through a shared reference,
105/// so clones of the finalized state represent the same database instance.
106/// When the final clone is dropped, the database is closed.
107///
108/// This is different from `NonFinalizedState::clone()`,
109/// which returns an independent copy of the chains.
110#[derive(Clone, Debug)]
111pub struct FinalizedState {
112    // Configuration
113    //
114    // This configuration cannot be modified after the database is initialized,
115    // because some clones would have different values.
116    //
117    /// The configured stop height.
118    ///
119    /// Commit blocks to the finalized state up to this height, then exit Zebra.
120    debug_stop_at_height: Option<block::Height>,
121
122    // Owned State
123    //
124    // Everything contained in this state must be shared by all clones, or read-only.
125    //
126    /// The underlying database.
127    ///
128    /// `rocksdb` allows reads and writes via a shared reference,
129    /// so this database object can be freely cloned.
130    /// The last instance that is dropped will close the underlying database.
131    pub db: ZebraDb,
132
133    #[cfg(feature = "elasticsearch")]
134    /// The elasticsearch handle.
135    pub elastic_db: Option<elasticsearch::Elasticsearch>,
136
137    #[cfg(feature = "elasticsearch")]
138    /// A collection of blocks to be sent to elasticsearch as a bulk.
139    pub elastic_blocks: Vec<String>,
140}
141
142impl FinalizedState {
143    /// Returns an on-disk database instance for `config`, `network`, and `elastic_db`.
144    /// If there is no existing database, creates a new database on disk.
145    pub fn new(
146        config: &Config,
147        network: &Network,
148        #[cfg(feature = "elasticsearch")] enable_elastic_db: bool,
149    ) -> Self {
150        Self::new_with_debug(
151            config,
152            network,
153            false,
154            #[cfg(feature = "elasticsearch")]
155            enable_elastic_db,
156            false,
157        )
158    }
159
160    /// Returns an on-disk database instance with the supplied production and debug settings.
161    /// If there is no existing database, creates a new database on disk.
162    ///
163    /// This method is intended for use in tests.
164    pub(crate) fn new_with_debug(
165        config: &Config,
166        network: &Network,
167        debug_skip_format_upgrades: bool,
168        #[cfg(feature = "elasticsearch")] enable_elastic_db: bool,
169        read_only: bool,
170    ) -> Self {
171        #[cfg(feature = "elasticsearch")]
172        let elastic_db = if enable_elastic_db {
173            use elasticsearch::{
174                auth::Credentials::Basic,
175                cert::CertificateValidation,
176                http::transport::{SingleNodeConnectionPool, TransportBuilder},
177                http::Url,
178                Elasticsearch,
179            };
180
181            let conn_pool = SingleNodeConnectionPool::new(
182                Url::parse(config.elasticsearch_url.as_str())
183                    .expect("configured elasticsearch url is invalid"),
184            );
185            let transport = TransportBuilder::new(conn_pool)
186                .cert_validation(CertificateValidation::None)
187                .auth(Basic(
188                    config.clone().elasticsearch_username,
189                    config.clone().elasticsearch_password,
190                ))
191                .build()
192                .expect("elasticsearch transport builder should not fail");
193
194            Some(Elasticsearch::new(transport))
195        } else {
196            None
197        };
198
199        let db = ZebraDb::new(
200            config,
201            STATE_DATABASE_KIND,
202            &state_database_format_version_in_code(),
203            network,
204            debug_skip_format_upgrades,
205            STATE_COLUMN_FAMILIES_IN_CODE
206                .iter()
207                .map(ToString::to_string),
208            read_only,
209        );
210
211        #[cfg(feature = "elasticsearch")]
212        let new_state = Self {
213            debug_stop_at_height: config.debug_stop_at_height.map(block::Height),
214            db,
215            elastic_db,
216            elastic_blocks: vec![],
217        };
218
219        #[cfg(not(feature = "elasticsearch"))]
220        let new_state = Self {
221            debug_stop_at_height: config.debug_stop_at_height.map(block::Height),
222            db,
223        };
224
225        // TODO: move debug_stop_at_height into a task in the start command (#3442)
226        if let Some(tip_height) = new_state.db.finalized_tip_height() {
227            if new_state.is_at_stop_height(tip_height) {
228                let debug_stop_at_height = new_state
229                    .debug_stop_at_height
230                    .expect("true from `is_at_stop_height` implies `debug_stop_at_height` is Some");
231                let tip_hash = new_state.db.finalized_tip_hash();
232
233                if tip_height > debug_stop_at_height {
234                    tracing::error!(
235                        ?debug_stop_at_height,
236                        ?tip_height,
237                        ?tip_hash,
238                        "previous state height is greater than the stop height",
239                    );
240                }
241
242                tracing::info!(
243                    ?debug_stop_at_height,
244                    ?tip_height,
245                    ?tip_hash,
246                    "state is already at the configured height"
247                );
248
249                // RocksDB can do a cleanup when column families are opened.
250                // So we want to drop it before we exit.
251                std::mem::drop(new_state);
252
253                // Drops tracing log output that's hasn't already been written to stdout
254                // since this exits before calling drop on the WorkerGuard for the logger thread.
255                // This is okay for now because this is test-only code
256                //
257                // TODO: Call ZebradApp.shutdown or drop its Tracing component before calling exit_process to flush logs to stdout
258                Self::exit_process();
259            }
260        }
261
262        new_state
263    }
264
265    /// Returns the configured network for this database.
266    pub fn network(&self) -> Network {
267        self.db.network()
268    }
269
270    /// Commit a checkpoint-verified block to the state.
271    ///
272    /// It's the caller's responsibility to ensure that blocks are committed in
273    /// order.
274    pub fn commit_finalized(
275        &mut self,
276        ordered_block: QueuedCheckpointVerified,
277        prev_note_commitment_trees: Option<NoteCommitmentTrees>,
278    ) -> Result<(CheckpointVerifiedBlock, NoteCommitmentTrees), CommitCheckpointVerifiedError> {
279        let (checkpoint_verified, rsp_tx) = ordered_block;
280        let result = self.commit_finalized_direct(
281            checkpoint_verified.clone().into(),
282            prev_note_commitment_trees,
283            "commit checkpoint-verified request",
284        );
285
286        if result.is_ok() {
287            metrics::counter!("state.checkpoint.finalized.block.count").increment(1);
288            metrics::gauge!("state.checkpoint.finalized.block.height")
289                .set(checkpoint_verified.height.0 as f64);
290
291            // This height gauge is updated for both fully verified and checkpoint blocks.
292            // These updates can't conflict, because the state makes sure that blocks
293            // are committed in order.
294            metrics::gauge!("zcash.chain.verified.block.height")
295                .set(checkpoint_verified.height.0 as f64);
296            metrics::counter!("zcash.chain.verified.block.total").increment(1);
297        } else {
298            metrics::counter!("state.checkpoint.error.block.count").increment(1);
299            metrics::gauge!("state.checkpoint.error.block.height")
300                .set(checkpoint_verified.height.0 as f64);
301        };
302
303        let _ = rsp_tx.send(result.clone().map(|(hash, _)| hash));
304
305        result.map(|(_hash, note_commitment_trees)| (checkpoint_verified, note_commitment_trees))
306    }
307
308    /// Immediately commit a `finalized` block to the finalized state.
309    ///
310    /// This can be called either by the non-finalized state (when finalizing
311    /// a block) or by the checkpoint verifier.
312    ///
313    /// Use `source` as the source of the block in log messages.
314    ///
315    /// # Errors
316    ///
317    /// - Propagates any errors from writing to the DB
318    /// - Propagates any errors from updating history and note commitment trees
319    /// - If `hashFinalSaplingRoot` / `hashLightClientRoot` / `hashBlockCommitments`
320    ///   does not match the expected value
321    #[allow(clippy::unwrap_in_result)]
322    pub fn commit_finalized_direct(
323        &mut self,
324        finalizable_block: FinalizableBlock,
325        prev_note_commitment_trees: Option<NoteCommitmentTrees>,
326        source: &str,
327    ) -> Result<(block::Hash, NoteCommitmentTrees), CommitCheckpointVerifiedError> {
328        let (height, hash, finalized, prev_note_commitment_trees) = match finalizable_block {
329            FinalizableBlock::Checkpoint {
330                checkpoint_verified,
331            } => {
332                // Checkpoint-verified blocks don't have an associated treestate, so we retrieve the
333                // treestate of the finalized tip from the database and update it for the block
334                // being committed, assuming the retrieved treestate is the parent block's
335                // treestate. Later on, this function proves this assumption by asserting that the
336                // finalized tip is the parent block of the block being committed.
337
338                let block = checkpoint_verified.block.clone();
339                let mut history_tree = self.db.history_tree();
340                let prev_note_commitment_trees = prev_note_commitment_trees
341                    .unwrap_or_else(|| self.db.note_commitment_trees_for_tip());
342
343                // Update the note commitment trees.
344                let mut note_commitment_trees = prev_note_commitment_trees.clone();
345                note_commitment_trees
346                    .update_trees_parallel(&block)
347                    .map_err(ValidateContextError::from)?;
348
349                // Check the block commitment if the history tree was not
350                // supplied by the non-finalized state. Note that we don't do
351                // this check for history trees supplied by the non-finalized
352                // state because the non-finalized state checks the block
353                // commitment.
354                //
355                // For Nu5-onward, the block hash commits only to
356                // non-authorizing data (see ZIP-244). This checks the
357                // authorizing data commitment, making sure the entire block
358                // contents were committed to. The test is done here (and not
359                // during semantic validation) because it needs the history tree
360                // root. While it _is_ checked during contextual validation,
361                // that is not called by the checkpoint verifier, and keeping a
362                // history tree there would be harder to implement.
363                //
364                // TODO: run this CPU-intensive cryptography in a parallel rayon
365                // thread, if it shows up in profiles
366                check::block_commitment_is_valid_for_chain_history(
367                    block.clone(),
368                    &self.network(),
369                    &history_tree,
370                )?;
371
372                // Update the history tree.
373                //
374                // TODO: run this CPU-intensive cryptography in a parallel rayon
375                // thread, if it shows up in profiles
376                let history_tree_mut = Arc::make_mut(&mut history_tree);
377                let sapling_root = note_commitment_trees.sapling.root();
378                let orchard_root = note_commitment_trees.orchard.root();
379                history_tree_mut
380                    .push(&self.network(), block.clone(), &sapling_root, &orchard_root)
381                    .map_err(Arc::new)
382                    .map_err(ValidateContextError::from)?;
383
384                let treestate = Treestate {
385                    note_commitment_trees,
386                    history_tree,
387                };
388
389                (
390                    checkpoint_verified.height,
391                    checkpoint_verified.hash,
392                    FinalizedBlock::from_checkpoint_verified(checkpoint_verified, treestate),
393                    Some(prev_note_commitment_trees),
394                )
395            }
396            FinalizableBlock::Contextual {
397                contextually_verified,
398                treestate,
399            } => (
400                contextually_verified.height,
401                contextually_verified.hash,
402                FinalizedBlock::from_contextually_verified(contextually_verified, treestate),
403                prev_note_commitment_trees,
404            ),
405        };
406
407        let committed_tip_hash = self.db.finalized_tip_hash();
408        let committed_tip_height = self.db.finalized_tip_height();
409
410        // Assert that callers (including unit tests) get the chain order correct
411        if self.db.is_empty() {
412            assert_eq!(
413                committed_tip_hash, finalized.block.header.previous_block_hash,
414                "the first block added to an empty state must be a genesis block, source: {source}",
415            );
416            assert_eq!(
417                block::Height(0),
418                height,
419                "cannot commit genesis: invalid height, source: {source}",
420            );
421        } else {
422            assert_eq!(
423                committed_tip_height.expect("state must have a genesis block committed") + 1,
424                Some(height),
425                "committed block height must be 1 more than the finalized tip height, source: {source}",
426            );
427
428            assert_eq!(
429                committed_tip_hash, finalized.block.header.previous_block_hash,
430                "committed block must be a child of the finalized tip, source: {source}",
431            );
432        }
433
434        #[cfg(feature = "elasticsearch")]
435        let finalized_inner_block = finalized.block.clone();
436        let note_commitment_trees = finalized.treestate.note_commitment_trees.clone();
437
438        let result = self.db.write_block(
439            finalized,
440            prev_note_commitment_trees,
441            &self.network(),
442            source,
443        );
444
445        if result.is_ok() {
446            // Save blocks to elasticsearch if the feature is enabled.
447            #[cfg(feature = "elasticsearch")]
448            self.elasticsearch(&finalized_inner_block);
449
450            // TODO: move the stop height check to the syncer (#3442)
451            if self.is_at_stop_height(height) {
452                tracing::info!(
453                    ?height,
454                    ?hash,
455                    block_source = ?source,
456                    "stopping at configured height, flushing database to disk"
457                );
458
459                // We're just about to do a forced exit, so it's ok to do a forced db shutdown
460                self.db.shutdown(true);
461
462                // Drops tracing log output that's hasn't already been written to stdout
463                // since this exits before calling drop on the WorkerGuard for the logger thread.
464                // This is okay for now because this is test-only code
465                //
466                // TODO: Call ZebradApp.shutdown or drop its Tracing component before calling exit_process to flush logs to stdout
467                Self::exit_process();
468            }
469        }
470
471        result.map(|hash| (hash, note_commitment_trees))
472    }
473
474    #[cfg(feature = "elasticsearch")]
475    /// Store finalized blocks into an elasticsearch database.
476    ///
477    /// We use the elasticsearch bulk api to index multiple blocks at a time while we are
478    /// synchronizing the chain, when we get close to tip we index blocks one by one.
479    pub fn elasticsearch(&mut self, block: &Arc<block::Block>) {
480        if let Some(client) = self.elastic_db.clone() {
481            let block_time = block.header.time.timestamp();
482            let local_time = chrono::Utc::now().timestamp();
483
484            // Bulk size is small enough to avoid the elasticsearch 100mb content length limitation.
485            // MAX_BLOCK_BYTES = 2MB but each block use around 4.1 MB of JSON.
486            // Each block count as 2 as we send them with a operation/header line. A value of 48
487            // is 24 blocks.
488            const AWAY_FROM_TIP_BULK_SIZE: usize = 48;
489
490            // The number of blocks the bulk will have when we are in sync.
491            // A value of 2 means only 1 block as we want to insert them as soon as we get
492            // them for a real time experience. This is the same for mainnet and testnet.
493            const CLOSE_TO_TIP_BULK_SIZE: usize = 2;
494
495            // We consider in sync when the local time and the blockchain time difference is
496            // less than this number of seconds.
497            const CLOSE_TO_TIP_SECONDS: i64 = 14400; // 4 hours
498
499            let mut blocks_size_to_dump = AWAY_FROM_TIP_BULK_SIZE;
500
501            // If we are close to the tip, index one block per bulk call.
502            if local_time - block_time < CLOSE_TO_TIP_SECONDS {
503                blocks_size_to_dump = CLOSE_TO_TIP_BULK_SIZE;
504            }
505
506            // Insert the operation line.
507            let height_number = block.coinbase_height().unwrap_or(block::Height(0)).0;
508            self.elastic_blocks.push(
509                serde_json::json!({
510                    "index": {
511                        "_id": height_number.to_string().as_str()
512                    }
513                })
514                .to_string(),
515            );
516
517            // Insert the block itself.
518            self.elastic_blocks
519                .push(serde_json::json!(block).to_string());
520
521            // We are in bulk time, insert to ES all we have.
522            if self.elastic_blocks.len() >= blocks_size_to_dump {
523                let rt = tokio::runtime::Runtime::new()
524                    .expect("runtime creation for elasticsearch should not fail.");
525                let blocks = self.elastic_blocks.clone();
526                let network = self.network();
527
528                rt.block_on(async move {
529                    // Send a ping to the server to check if it is available before inserting.
530                    if client.ping().send().await.is_err() {
531                        tracing::error!("Elasticsearch is not available, skipping block indexing");
532                        return;
533                    }
534
535                    let response = client
536                        .bulk(elasticsearch::BulkParts::Index(
537                            format!("zcash_{}", network.to_string().to_lowercase()).as_str(),
538                        ))
539                        .body(blocks)
540                        .send()
541                        .await
542                        .expect("ES Request should never fail");
543
544                    // Make sure no errors ever.
545                    let response_body = response
546                        .json::<serde_json::Value>()
547                        .await
548                        .expect("ES response parsing error. Maybe we are sending more than 100 mb of data (`http.max_content_length`)");
549                    let errors = response_body["errors"].as_bool().unwrap_or(true);
550                    assert!(!errors, "{}", format!("ES error: {response_body}"));
551                });
552
553                // Clean the block storage.
554                self.elastic_blocks.clear();
555            }
556        }
557    }
558
559    /// Stop the process if `block_height` is greater than or equal to the
560    /// configured stop height.
561    fn is_at_stop_height(&self, block_height: block::Height) -> bool {
562        let debug_stop_at_height = match self.debug_stop_at_height {
563            Some(debug_stop_at_height) => debug_stop_at_height,
564            None => return false,
565        };
566
567        if block_height < debug_stop_at_height {
568            return false;
569        }
570
571        true
572    }
573
574    /// Exit the host process.
575    ///
576    /// Designed for debugging and tests.
577    ///
578    /// TODO: move the stop height check to the syncer (#3442)
579    fn exit_process() -> ! {
580        tracing::info!("exiting Zebra");
581
582        // Some OSes require a flush to send all output to the terminal.
583        // Zebra's logging doesn't depend on `tokio`, so we flush the stdlib sync streams.
584        //
585        // TODO: if this doesn't work, send an empty line as well.
586        let _ = stdout().lock().flush();
587        let _ = stderr().lock().flush();
588
589        // Give some time to logger thread to flush out any remaining lines to stdout
590        // and yield so that tests pass on MacOS
591        std::thread::sleep(std::time::Duration::from_secs(3));
592
593        // Exits before calling drop on the WorkerGuard for the logger thread,
594        // dropping any lines that haven't already been written to stdout.
595        // This is okay for now because this is test-only code
596        std::process::exit(0);
597    }
598}