Skip to main content

zebra_state/service/
finalized_state.rs

1//! The primary implementation of the `zebra_state::Service` built upon rocksdb.
2//!
3//! Zebra's database is implemented in 4 layers:
4//! - [`FinalizedState`]: queues, validates, and commits blocks, using...
5//! - [`ZebraDb`]: reads and writes [`zebra_chain`] types to the state database, using...
6//! - [`DiskDb`]: reads and writes generic types to any column family in the database, using...
7//! - [`disk_format`]: converts types to raw database bytes.
8//!
9//! These layers allow us to split [`zebra_chain`] types for efficient database storage.
10//! They reduce the risk of data corruption bugs, runtime inconsistencies, and panics.
11//!
12//! # Correctness
13//!
14//! [`crate::constants::state_database_format_version_in_code()`] must be incremented
15//! each time the database format (column, serialization, etc) changes.
16
17use std::{
18    io::{stderr, stdout, Write},
19    sync::Arc,
20};
21
22use zebra_chain::{
23    amount::DeferredPoolBalanceChange,
24    block,
25    parallel::tree::NoteCommitmentTrees,
26    parameters::{subsidy::block_subsidy, Network},
27};
28use zebra_db::{
29    chain::BLOCK_INFO,
30    transparent::{BALANCE_BY_TRANSPARENT_ADDR, TX_LOC_BY_SPENT_OUT_LOC},
31};
32
33use crate::{
34    constants::{state_database_format_version_in_code, STATE_DATABASE_KIND},
35    error::CommitCheckpointVerifiedError,
36    request::{FinalizableBlock, FinalizedBlock, Treestate},
37    service::{check, QueuedCheckpointVerified},
38    CheckpointVerifiedBlock, Config, ValidateContextError,
39};
40
41pub mod column_family;
42
43mod disk_db;
44mod disk_format;
45mod zebra_db;
46
47#[cfg(any(test, feature = "proptest-impl"))]
48mod arbitrary;
49
50#[cfg(test)]
51mod tests;
52
53#[allow(unused_imports)]
54pub use column_family::{TypedColumnFamily, WriteTypedBatch};
55#[allow(unused_imports)]
56pub use disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk};
57#[allow(unused_imports)]
58pub use disk_format::{
59    FromDisk, IntoDisk, OutputLocation, RawBytes, TransactionIndex, TransactionLocation,
60    MAX_ON_DISK_HEIGHT,
61};
62pub use zebra_db::ZebraDb;
63
64#[cfg(any(test, feature = "proptest-impl"))]
65pub use disk_format::KV;
66
67pub use disk_format::upgrade::restorable_db_versions;
68
69/// The column families supported by the running `zebra-state` database code.
70///
71/// Existing column families that aren't listed here are preserved when the database is opened.
72pub const STATE_COLUMN_FAMILIES_IN_CODE: &[&str] = &[
73    // Blocks
74    "hash_by_height",
75    "height_by_hash",
76    "block_header_by_height",
77    // Transactions
78    "tx_by_loc",
79    "hash_by_tx_loc",
80    "tx_loc_by_hash",
81    // Transparent
82    BALANCE_BY_TRANSPARENT_ADDR,
83    "tx_loc_by_transparent_addr_loc",
84    "utxo_by_out_loc",
85    "utxo_loc_by_transparent_addr_loc",
86    TX_LOC_BY_SPENT_OUT_LOC,
87    // Sprout
88    "sprout_nullifiers",
89    "sprout_anchors",
90    "sprout_note_commitment_tree",
91    // Sapling
92    "sapling_nullifiers",
93    "sapling_anchors",
94    "sapling_note_commitment_tree",
95    "sapling_note_commitment_subtree",
96    // Orchard
97    "orchard_nullifiers",
98    "orchard_anchors",
99    "orchard_note_commitment_tree",
100    "orchard_note_commitment_subtree",
101    // Chain
102    "history_tree",
103    "tip_chain_value_pool",
104    BLOCK_INFO,
105];
106
107/// The finalized part of the chain state, stored in the db.
108///
109/// `rocksdb` allows concurrent writes through a shared reference,
110/// so clones of the finalized state represent the same database instance.
111/// When the final clone is dropped, the database is closed.
112///
113/// This is different from `NonFinalizedState::clone()`,
114/// which returns an independent copy of the chains.
115#[derive(Clone, Debug)]
116pub struct FinalizedState {
117    // Configuration
118    //
119    // This configuration cannot be modified after the database is initialized,
120    // because some clones would have different values.
121    //
122    /// The configured stop height.
123    ///
124    /// Commit blocks to the finalized state up to this height, then exit Zebra.
125    debug_stop_at_height: Option<block::Height>,
126
127    // Owned State
128    //
129    // Everything contained in this state must be shared by all clones, or read-only.
130    //
131    /// The underlying database.
132    ///
133    /// `rocksdb` allows reads and writes via a shared reference,
134    /// so this database object can be freely cloned.
135    /// The last instance that is dropped will close the underlying database.
136    pub db: ZebraDb,
137
138    #[cfg(feature = "elasticsearch")]
139    /// The elasticsearch handle.
140    pub elastic_db: Option<elasticsearch::Elasticsearch>,
141
142    #[cfg(feature = "elasticsearch")]
143    /// A collection of blocks to be sent to elasticsearch as a bulk.
144    pub elastic_blocks: Vec<String>,
145}
146
147impl FinalizedState {
148    /// Returns an on-disk database instance for `config`, `network`, and `elastic_db`.
149    /// If there is no existing database, creates a new database on disk.
150    pub fn new(
151        config: &Config,
152        network: &Network,
153        #[cfg(feature = "elasticsearch")] enable_elastic_db: bool,
154    ) -> Self {
155        Self::new_with_debug(
156            config,
157            network,
158            false,
159            #[cfg(feature = "elasticsearch")]
160            enable_elastic_db,
161            false,
162        )
163    }
164
165    /// Returns an on-disk database instance with the supplied production and debug settings.
166    /// If there is no existing database, creates a new database on disk.
167    ///
168    /// This method is intended for use in tests.
169    pub(crate) fn new_with_debug(
170        config: &Config,
171        network: &Network,
172        debug_skip_format_upgrades: bool,
173        #[cfg(feature = "elasticsearch")] enable_elastic_db: bool,
174        read_only: bool,
175    ) -> Self {
176        #[cfg(feature = "elasticsearch")]
177        let elastic_db = if enable_elastic_db {
178            use elasticsearch::{
179                auth::Credentials::Basic,
180                cert::CertificateValidation,
181                http::transport::{SingleNodeConnectionPool, TransportBuilder},
182                http::Url,
183                Elasticsearch,
184            };
185
186            let conn_pool = SingleNodeConnectionPool::new(
187                Url::parse(config.elasticsearch_url.as_str())
188                    .expect("configured elasticsearch url is invalid"),
189            );
190            let transport = TransportBuilder::new(conn_pool)
191                .cert_validation(CertificateValidation::None)
192                .auth(Basic(
193                    config.clone().elasticsearch_username,
194                    config.clone().elasticsearch_password,
195                ))
196                .build()
197                .expect("elasticsearch transport builder should not fail");
198
199            Some(Elasticsearch::new(transport))
200        } else {
201            None
202        };
203
204        let db = ZebraDb::new(
205            config,
206            STATE_DATABASE_KIND,
207            &state_database_format_version_in_code(),
208            network,
209            debug_skip_format_upgrades,
210            STATE_COLUMN_FAMILIES_IN_CODE
211                .iter()
212                .map(ToString::to_string),
213            read_only,
214        );
215
216        #[cfg(feature = "elasticsearch")]
217        let new_state = Self {
218            debug_stop_at_height: config.debug_stop_at_height.map(block::Height),
219            db,
220            elastic_db,
221            elastic_blocks: vec![],
222        };
223
224        #[cfg(not(feature = "elasticsearch"))]
225        let new_state = Self {
226            debug_stop_at_height: config.debug_stop_at_height.map(block::Height),
227            db,
228        };
229
230        // TODO: move debug_stop_at_height into a task in the start command (#3442)
231        if let Some(tip_height) = new_state.db.finalized_tip_height() {
232            if new_state.is_at_stop_height(tip_height) {
233                let debug_stop_at_height = new_state
234                    .debug_stop_at_height
235                    .expect("true from `is_at_stop_height` implies `debug_stop_at_height` is Some");
236                let tip_hash = new_state.db.finalized_tip_hash();
237
238                if tip_height > debug_stop_at_height {
239                    tracing::error!(
240                        ?debug_stop_at_height,
241                        ?tip_height,
242                        ?tip_hash,
243                        "previous state height is greater than the stop height",
244                    );
245                }
246
247                tracing::info!(
248                    ?debug_stop_at_height,
249                    ?tip_height,
250                    ?tip_hash,
251                    "state is already at the configured height"
252                );
253
254                // RocksDB can do a cleanup when column families are opened.
255                // So we want to drop it before we exit.
256                std::mem::drop(new_state);
257
258                // Drops tracing log output that's hasn't already been written to stdout
259                // since this exits before calling drop on the WorkerGuard for the logger thread.
260                // This is okay for now because this is test-only code
261                //
262                // TODO: Call ZebradApp.shutdown or drop its Tracing component before calling exit_process to flush logs to stdout
263                Self::exit_process();
264            }
265        }
266
267        new_state
268    }
269
270    /// Returns the configured network for this database.
271    pub fn network(&self) -> Network {
272        self.db.network()
273    }
274
275    /// Commit a checkpoint-verified block to the state.
276    ///
277    /// It's the caller's responsibility to ensure that blocks are committed in
278    /// order.
279    pub fn commit_finalized(
280        &mut self,
281        ordered_block: QueuedCheckpointVerified,
282        prev_note_commitment_trees: Option<NoteCommitmentTrees>,
283    ) -> Result<(CheckpointVerifiedBlock, NoteCommitmentTrees), CommitCheckpointVerifiedError> {
284        let (checkpoint_verified, rsp_tx) = ordered_block;
285        let result = self.commit_finalized_direct(
286            checkpoint_verified.clone().into(),
287            prev_note_commitment_trees,
288            "commit checkpoint-verified request",
289        );
290
291        if result.is_ok() {
292            metrics::counter!("state.checkpoint.finalized.block.count").increment(1);
293            metrics::gauge!("state.checkpoint.finalized.block.height")
294                .set(checkpoint_verified.height.0 as f64);
295
296            // This height gauge is updated for both fully verified and checkpoint blocks.
297            // These updates can't conflict, because the state makes sure that blocks
298            // are committed in order.
299            metrics::gauge!("zcash.chain.verified.block.height")
300                .set(checkpoint_verified.height.0 as f64);
301            metrics::counter!("zcash.chain.verified.block.total").increment(1);
302        } else {
303            metrics::counter!("state.checkpoint.error.block.count").increment(1);
304            metrics::gauge!("state.checkpoint.error.block.height")
305                .set(checkpoint_verified.height.0 as f64);
306        };
307
308        let _ = rsp_tx.send(result.clone().map(|(hash, _)| hash));
309
310        result.map(|(_hash, note_commitment_trees)| (checkpoint_verified, note_commitment_trees))
311    }
312
313    /// Immediately commit a `finalized` block to the finalized state.
314    ///
315    /// This can be called either by the non-finalized state (when finalizing
316    /// a block) or by the checkpoint verifier.
317    ///
318    /// Use `source` as the source of the block in log messages.
319    ///
320    /// # Errors
321    ///
322    /// - Propagates any errors from writing to the DB
323    /// - Propagates any errors from updating history and note commitment trees
324    /// - If `hashFinalSaplingRoot` / `hashLightClientRoot` / `hashBlockCommitments`
325    ///   does not match the expected value
326    #[allow(clippy::unwrap_in_result)]
327    pub fn commit_finalized_direct(
328        &mut self,
329        finalizable_block: FinalizableBlock,
330        prev_note_commitment_trees: Option<NoteCommitmentTrees>,
331        source: &str,
332    ) -> Result<(block::Hash, NoteCommitmentTrees), CommitCheckpointVerifiedError> {
333        let (height, hash, finalized, prev_note_commitment_trees) = match finalizable_block {
334            FinalizableBlock::Checkpoint {
335                checkpoint_verified,
336            } => {
337                // Checkpoint-verified blocks don't have an associated treestate, so we retrieve the
338                // treestate of the finalized tip from the database and update it for the block
339                // being committed, assuming the retrieved treestate is the parent block's
340                // treestate. Later on, this function proves this assumption by asserting that the
341                // finalized tip is the parent block of the block being committed.
342
343                let block = checkpoint_verified.block.clone();
344                let mut history_tree = self.db.history_tree();
345                let prev_note_commitment_trees = prev_note_commitment_trees
346                    .unwrap_or_else(|| self.db.note_commitment_trees_for_tip());
347
348                // Update the note commitment trees.
349                let mut note_commitment_trees = prev_note_commitment_trees.clone();
350                note_commitment_trees
351                    .update_trees_parallel(&block)
352                    .map_err(ValidateContextError::from)?;
353
354                // Check the block commitment if the history tree was not
355                // supplied by the non-finalized state. Note that we don't do
356                // this check for history trees supplied by the non-finalized
357                // state because the non-finalized state checks the block
358                // commitment.
359                //
360                // For Nu5-onward, the block hash commits only to
361                // non-authorizing data (see ZIP-244). This checks the
362                // authorizing data commitment, making sure the entire block
363                // contents were committed to. The test is done here (and not
364                // during semantic validation) because it needs the history tree
365                // root. While it _is_ checked during contextual validation,
366                // that is not called by the checkpoint verifier, and keeping a
367                // history tree there would be harder to implement.
368                //
369                // TODO: run this CPU-intensive cryptography in a parallel rayon
370                // thread, if it shows up in profiles
371                check::block_commitment_is_valid_for_chain_history(
372                    block.clone(),
373                    &self.network(),
374                    &history_tree,
375                )?;
376
377                // Update the history tree.
378                //
379                // TODO: run this CPU-intensive cryptography in a parallel rayon
380                // thread, if it shows up in profiles
381                let history_tree_mut = Arc::make_mut(&mut history_tree);
382                let sapling_root = note_commitment_trees.sapling.root();
383                let orchard_root = note_commitment_trees.orchard.root();
384                history_tree_mut
385                    .push(&self.network(), block.clone(), &sapling_root, &orchard_root)
386                    .map_err(Arc::new)
387                    .map_err(ValidateContextError::from)?;
388
389                let treestate = Treestate {
390                    note_commitment_trees,
391                    history_tree,
392                };
393
394                let height = checkpoint_verified.height;
395
396                (
397                    height,
398                    checkpoint_verified.hash,
399                    FinalizedBlock::from_checkpoint_verified(
400                        checkpoint_verified,
401                        treestate,
402                        calculate_deferred_pool_balance_change(height, &self.network()),
403                    ),
404                    Some(prev_note_commitment_trees),
405                )
406            }
407            FinalizableBlock::Contextual {
408                contextually_verified,
409                treestate,
410            } => {
411                let height = contextually_verified.height;
412                (
413                    height,
414                    contextually_verified.hash,
415                    FinalizedBlock::from_contextually_verified(
416                        contextually_verified,
417                        treestate,
418                        calculate_deferred_pool_balance_change(height, &self.network()),
419                    ),
420                    prev_note_commitment_trees,
421                )
422            }
423        };
424
425        let committed_tip_hash = self.db.finalized_tip_hash();
426        let committed_tip_height = self.db.finalized_tip_height();
427
428        // Assert that callers (including unit tests) get the chain order correct
429        if self.db.is_empty() {
430            assert_eq!(
431                committed_tip_hash, finalized.block.header.previous_block_hash,
432                "the first block added to an empty state must be a genesis block, source: {source}",
433            );
434            assert_eq!(
435                block::Height(0),
436                height,
437                "cannot commit genesis: invalid height, source: {source}",
438            );
439        } else {
440            assert_eq!(
441                committed_tip_height.expect("state must have a genesis block committed") + 1,
442                Some(height),
443                "committed block height must be 1 more than the finalized tip height, source: {source}",
444            );
445
446            assert_eq!(
447                committed_tip_hash, finalized.block.header.previous_block_hash,
448                "committed block must be a child of the finalized tip, source: {source}",
449            );
450        }
451
452        #[cfg(feature = "elasticsearch")]
453        let finalized_inner_block = finalized.block.clone();
454        let note_commitment_trees = finalized.treestate.note_commitment_trees.clone();
455
456        let result = self.db.write_block(
457            finalized,
458            prev_note_commitment_trees,
459            &self.network(),
460            source,
461        );
462
463        if result.is_ok() {
464            // Save blocks to elasticsearch if the feature is enabled.
465            #[cfg(feature = "elasticsearch")]
466            self.elasticsearch(&finalized_inner_block);
467
468            // TODO: move the stop height check to the syncer (#3442)
469            if self.is_at_stop_height(height) {
470                tracing::info!(
471                    ?height,
472                    ?hash,
473                    block_source = ?source,
474                    "stopping at configured height, flushing database to disk"
475                );
476
477                // We're just about to do a forced exit, so it's ok to do a forced db shutdown
478                self.db.shutdown(true);
479
480                // Drops tracing log output that's hasn't already been written to stdout
481                // since this exits before calling drop on the WorkerGuard for the logger thread.
482                // This is okay for now because this is test-only code
483                //
484                // TODO: Call ZebradApp.shutdown or drop its Tracing component before calling exit_process to flush logs to stdout
485                Self::exit_process();
486            }
487        }
488
489        result.map(|hash| (hash, note_commitment_trees))
490    }
491
492    #[cfg(feature = "elasticsearch")]
493    /// Store finalized blocks into an elasticsearch database.
494    ///
495    /// We use the elasticsearch bulk api to index multiple blocks at a time while we are
496    /// synchronizing the chain, when we get close to tip we index blocks one by one.
497    pub fn elasticsearch(&mut self, block: &Arc<block::Block>) {
498        if let Some(client) = self.elastic_db.clone() {
499            let block_time = block.header.time.timestamp();
500            let local_time = chrono::Utc::now().timestamp();
501
502            // Bulk size is small enough to avoid the elasticsearch 100mb content length limitation.
503            // MAX_BLOCK_BYTES = 2MB but each block use around 4.1 MB of JSON.
504            // Each block count as 2 as we send them with a operation/header line. A value of 48
505            // is 24 blocks.
506            const AWAY_FROM_TIP_BULK_SIZE: usize = 48;
507
508            // The number of blocks the bulk will have when we are in sync.
509            // A value of 2 means only 1 block as we want to insert them as soon as we get
510            // them for a real time experience. This is the same for mainnet and testnet.
511            const CLOSE_TO_TIP_BULK_SIZE: usize = 2;
512
513            // We consider in sync when the local time and the blockchain time difference is
514            // less than this number of seconds.
515            const CLOSE_TO_TIP_SECONDS: i64 = 14400; // 4 hours
516
517            let mut blocks_size_to_dump = AWAY_FROM_TIP_BULK_SIZE;
518
519            // If we are close to the tip, index one block per bulk call.
520            if local_time - block_time < CLOSE_TO_TIP_SECONDS {
521                blocks_size_to_dump = CLOSE_TO_TIP_BULK_SIZE;
522            }
523
524            // Insert the operation line.
525            let height_number = block.coinbase_height().unwrap_or(block::Height(0)).0;
526            self.elastic_blocks.push(
527                serde_json::json!({
528                    "index": {
529                        "_id": height_number.to_string().as_str()
530                    }
531                })
532                .to_string(),
533            );
534
535            // Insert the block itself.
536            self.elastic_blocks
537                .push(serde_json::json!(block).to_string());
538
539            // We are in bulk time, insert to ES all we have.
540            if self.elastic_blocks.len() >= blocks_size_to_dump {
541                let rt = tokio::runtime::Runtime::new()
542                    .expect("runtime creation for elasticsearch should not fail.");
543                let blocks = self.elastic_blocks.clone();
544                let network = self.network();
545
546                rt.block_on(async move {
547                    // Send a ping to the server to check if it is available before inserting.
548                    if client.ping().send().await.is_err() {
549                        tracing::error!("Elasticsearch is not available, skipping block indexing");
550                        return;
551                    }
552
553                    let response = client
554                        .bulk(elasticsearch::BulkParts::Index(
555                            format!("zcash_{}", network.to_string().to_lowercase()).as_str(),
556                        ))
557                        .body(blocks)
558                        .send()
559                        .await
560                        .expect("ES Request should never fail");
561
562                    // Make sure no errors ever.
563                    let response_body = response
564                        .json::<serde_json::Value>()
565                        .await
566                        .expect("ES response parsing error. Maybe we are sending more than 100 mb of data (`http.max_content_length`)");
567                    let errors = response_body["errors"].as_bool().unwrap_or(true);
568                    assert!(!errors, "{}", format!("ES error: {response_body}"));
569                });
570
571                // Clean the block storage.
572                self.elastic_blocks.clear();
573            }
574        }
575    }
576
577    /// Stop the process if `block_height` is greater than or equal to the
578    /// configured stop height.
579    fn is_at_stop_height(&self, block_height: block::Height) -> bool {
580        let debug_stop_at_height = match self.debug_stop_at_height {
581            Some(debug_stop_at_height) => debug_stop_at_height,
582            None => return false,
583        };
584
585        if block_height < debug_stop_at_height {
586            return false;
587        }
588
589        true
590    }
591
592    /// Exit the host process.
593    ///
594    /// Designed for debugging and tests.
595    ///
596    /// TODO: move the stop height check to the syncer (#3442)
597    fn exit_process() -> ! {
598        tracing::info!("exiting Zebra");
599
600        // Some OSes require a flush to send all output to the terminal.
601        // Zebra's logging doesn't depend on `tokio`, so we flush the stdlib sync streams.
602        //
603        // TODO: if this doesn't work, send an empty line as well.
604        let _ = stdout().lock().flush();
605        let _ = stderr().lock().flush();
606
607        // Give some time to logger thread to flush out any remaining lines to stdout
608        // and yield so that tests pass on MacOS
609        std::thread::sleep(std::time::Duration::from_secs(3));
610
611        // Exits before calling drop on the WorkerGuard for the logger thread,
612        // dropping any lines that haven't already been written to stdout.
613        // This is okay for now because this is test-only code
614        std::process::exit(0);
615    }
616}
617
618/// Calculates the deferred pool balance change for a given height and network.
619///
620/// Returns a deferred pool balance change of zero if it cannot be calculated.
621pub(crate) fn calculate_deferred_pool_balance_change(
622    height: block::Height,
623    network: &Network,
624) -> DeferredPoolBalanceChange {
625    if height > network.slow_start_interval() {
626        zebra_chain::parameters::subsidy::funding_stream_values(
627            height,
628            network,
629            block_subsidy(height, network).unwrap_or_default(),
630        )
631        .unwrap_or_default()
632        .remove(&zebra_chain::parameters::subsidy::FundingStreamReceiver::Deferred)
633        .unwrap_or_default()
634        .checked_sub(network.lockbox_disbursement_total_amount(height))
635        .map(DeferredPoolBalanceChange::new)
636        .unwrap_or_default()
637    } else {
638        DeferredPoolBalanceChange::zero()
639    }
640}