zebra_state/
service.rs

1//! [`tower::Service`]s for Zebra's cached chain state.
2//!
3//! Zebra provides cached state access via two main services:
4//! - [`StateService`]: a read-write service that writes blocks to the state,
5//!   and redirects most read requests to the [`ReadStateService`].
6//! - [`ReadStateService`]: a read-only service that answers from the most
7//!   recent committed block.
8//!
9//! Most users should prefer [`ReadStateService`], unless they need to write blocks to the state.
10//!
11//! Zebra also provides access to the best chain tip via:
12//! - [`LatestChainTip`]: a read-only channel that contains the latest committed
13//!   tip.
14//! - [`ChainTipChange`]: a read-only channel that can asynchronously await
15//!   chain tip changes.
16
17use std::{
18    collections::HashMap,
19    convert,
20    future::Future,
21    pin::Pin,
22    sync::Arc,
23    task::{Context, Poll},
24    time::{Duration, Instant},
25};
26
27use futures::future::FutureExt;
28use tokio::sync::oneshot;
29use tower::{util::BoxService, Service, ServiceExt};
30use tracing::{instrument, Instrument, Span};
31
32#[cfg(any(test, feature = "proptest-impl"))]
33use tower::buffer::Buffer;
34
35use zebra_chain::{
36    block::{self, CountedHeader, HeightDiff},
37    diagnostic::{task::WaitForPanics, CodeTimer},
38    parameters::{Network, NetworkUpgrade},
39    subtree::NoteCommitmentSubtreeIndex,
40};
41
42use zebra_chain::{block::Height, serialization::ZcashSerialize};
43
44use crate::{
45    constants::{
46        MAX_FIND_BLOCK_HASHES_RESULTS, MAX_FIND_BLOCK_HEADERS_RESULTS, MAX_LEGACY_CHAIN_BLOCKS,
47    },
48    error::{InvalidateError, QueueAndCommitError, ReconsiderError},
49    response::NonFinalizedBlocksListener,
50    service::{
51        block_iter::any_ancestor_blocks,
52        chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip},
53        finalized_state::{FinalizedState, ZebraDb},
54        non_finalized_state::{Chain, NonFinalizedState},
55        pending_utxos::PendingUtxos,
56        queued_blocks::QueuedBlocks,
57        watch_receiver::WatchReceiver,
58    },
59    BoxError, CheckpointVerifiedBlock, CommitSemanticallyVerifiedError, Config, ReadRequest,
60    ReadResponse, Request, Response, SemanticallyVerifiedBlock,
61};
62
63pub mod block_iter;
64pub mod chain_tip;
65pub mod watch_receiver;
66
67pub mod check;
68
69pub(crate) mod finalized_state;
70pub(crate) mod non_finalized_state;
71mod pending_utxos;
72mod queued_blocks;
73pub(crate) mod read;
74mod traits;
75mod write;
76
77#[cfg(any(test, feature = "proptest-impl"))]
78pub mod arbitrary;
79
80#[cfg(test)]
81mod tests;
82
83pub use finalized_state::{OutputLocation, TransactionIndex, TransactionLocation};
84use write::NonFinalizedWriteMessage;
85
86use self::queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified, SentHashes};
87
88pub use self::traits::{ReadState, State};
89
90/// A read-write service for Zebra's cached blockchain state.
91///
92/// This service modifies and provides access to:
93/// - the non-finalized state: the ~100 most recent blocks.
94///   Zebra allows chain forks in the non-finalized state,
95///   stores it in memory, and re-downloads it when restarted.
96/// - the finalized state: older blocks that have many confirmations.
97///   Zebra stores the single best chain in the finalized state,
98///   and re-loads it from disk when restarted.
99///
100/// Read requests to this service are buffered, then processed concurrently.
101/// Block write requests are buffered, then queued, then processed in order by a separate task.
102///
103/// Most state users can get faster read responses using the [`ReadStateService`],
104/// because its requests do not share a [`tower::buffer::Buffer`] with block write requests.
105///
106/// To quickly get the latest block, use [`LatestChainTip`] or [`ChainTipChange`].
107/// They can read the latest block directly, without queueing any requests.
108#[derive(Debug)]
109pub(crate) struct StateService {
110    // Configuration
111    //
112    /// The configured Zcash network.
113    network: Network,
114
115    /// The height that we start storing UTXOs from finalized blocks.
116    ///
117    /// This height should be lower than the last few checkpoints,
118    /// so the full verifier can verify UTXO spends from those blocks,
119    /// even if they haven't been committed to the finalized state yet.
120    full_verifier_utxo_lookahead: block::Height,
121
122    // Queued Blocks
123    //
124    /// Queued blocks for the [`NonFinalizedState`] that arrived out of order.
125    /// These blocks are awaiting their parent blocks before they can do contextual verification.
126    non_finalized_state_queued_blocks: QueuedBlocks,
127
128    /// Queued blocks for the [`FinalizedState`] that arrived out of order.
129    /// These blocks are awaiting their parent blocks before they can do contextual verification.
130    ///
131    /// Indexed by their parent block hash.
132    finalized_state_queued_blocks: HashMap<block::Hash, QueuedCheckpointVerified>,
133
134    /// Channels to send blocks to the block write task.
135    block_write_sender: write::BlockWriteSender,
136
137    /// The [`block::Hash`] of the most recent block sent on
138    /// `finalized_block_write_sender` or `non_finalized_block_write_sender`.
139    ///
140    /// On startup, this is:
141    /// - the finalized tip, if there are stored blocks, or
142    /// - the genesis block's parent hash, if the database is empty.
143    ///
144    /// If `invalid_block_write_reset_receiver` gets a reset, this is:
145    /// - the hash of the last valid committed block (the parent of the invalid block).
146    finalized_block_write_last_sent_hash: block::Hash,
147
148    /// A set of block hashes that have been sent to the block write task.
149    /// Hashes of blocks below the finalized tip height are periodically pruned.
150    non_finalized_block_write_sent_hashes: SentHashes,
151
152    /// If an invalid block is sent on `finalized_block_write_sender`
153    /// or `non_finalized_block_write_sender`,
154    /// this channel gets the [`block::Hash`] of the valid tip.
155    //
156    // TODO: add tests for finalized and non-finalized resets (#2654)
157    invalid_block_write_reset_receiver: tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
158
159    // Pending UTXO Request Tracking
160    //
161    /// The set of outpoints with pending requests for their associated transparent::Output.
162    pending_utxos: PendingUtxos,
163
164    /// Instant tracking the last time `pending_utxos` was pruned.
165    last_prune: Instant,
166
167    // Updating Concurrently Readable State
168    //
169    /// A cloneable [`ReadStateService`], used to answer concurrent read requests.
170    ///
171    /// TODO: move users of read [`Request`]s to [`ReadStateService`], and remove `read_service`.
172    read_service: ReadStateService,
173
174    // Metrics
175    //
176    /// A metric tracking the maximum height that's currently in `finalized_state_queued_blocks`
177    ///
178    /// Set to `f64::NAN` if `finalized_state_queued_blocks` is empty, because grafana shows NaNs
179    /// as a break in the graph.
180    max_finalized_queue_height: f64,
181}
182
183/// A read-only service for accessing Zebra's cached blockchain state.
184///
185/// This service provides read-only access to:
186/// - the non-finalized state: the ~100 most recent blocks.
187/// - the finalized state: older blocks that have many confirmations.
188///
189/// Requests to this service are processed in parallel,
190/// ignoring any blocks queued by the read-write [`StateService`].
191///
192/// This quick response behavior is better for most state users.
193/// It allows other async tasks to make progress while concurrently reading data from disk.
194#[derive(Clone, Debug)]
195pub struct ReadStateService {
196    // Configuration
197    //
198    /// The configured Zcash network.
199    network: Network,
200
201    // Shared Concurrently Readable State
202    //
203    /// A watch channel with a cached copy of the [`NonFinalizedState`].
204    ///
205    /// This state is only updated between requests,
206    /// so it might include some block data that is also on `disk`.
207    non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
208
209    /// The shared inner on-disk database for the finalized state.
210    ///
211    /// RocksDB allows reads and writes via a shared reference,
212    /// but [`ZebraDb`] doesn't expose any write methods or types.
213    ///
214    /// This chain is updated concurrently with requests,
215    /// so it might include some block data that is also in `best_mem`.
216    db: ZebraDb,
217
218    /// A shared handle to a task that writes blocks to the [`NonFinalizedState`] or [`FinalizedState`],
219    /// once the queues have received all their parent blocks.
220    ///
221    /// Used to check for panics when writing blocks.
222    block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
223}
224
225impl Drop for StateService {
226    fn drop(&mut self) {
227        // The state service owns the state, tasks, and channels,
228        // so dropping it should shut down everything.
229
230        // Close the channels (non-blocking)
231        // This makes the block write thread exit the next time it checks the channels.
232        // We want to do this here so we get any errors or panics from the block write task before it shuts down.
233        self.invalid_block_write_reset_receiver.close();
234
235        std::mem::drop(self.block_write_sender.finalized.take());
236        std::mem::drop(self.block_write_sender.non_finalized.take());
237
238        self.clear_finalized_block_queue(
239            "dropping the state: dropped unused finalized state queue block",
240        );
241        self.clear_non_finalized_block_queue(CommitSemanticallyVerifiedError::WriteTaskExited);
242
243        // Log database metrics before shutting down
244        info!("dropping the state: logging database metrics");
245        self.log_db_metrics();
246
247        // Then drop self.read_service, which checks the block write task for panics,
248        // and tries to shut down the database.
249    }
250}
251
252impl Drop for ReadStateService {
253    fn drop(&mut self) {
254        // The read state service shares the state,
255        // so dropping it should check if we can shut down.
256
257        // TODO: move this into a try_shutdown() method
258        if let Some(block_write_task) = self.block_write_task.take() {
259            if let Some(block_write_task_handle) = Arc::into_inner(block_write_task) {
260                // We're the last database user, so we can tell it to shut down (blocking):
261                // - flushes the database to disk, and
262                // - drops the database, which cleans up any database tasks correctly.
263                self.db.shutdown(true);
264
265                // We are the last state with a reference to this thread, so we can
266                // wait until the block write task finishes, then check for panics (blocking).
267                // (We'd also like to abort the thread, but std::thread::JoinHandle can't do that.)
268
269                // This log is verbose during tests.
270                #[cfg(not(test))]
271                info!("waiting for the block write task to finish");
272                #[cfg(test)]
273                debug!("waiting for the block write task to finish");
274
275                // TODO: move this into a check_for_panics() method
276                if let Err(thread_panic) = block_write_task_handle.join() {
277                    std::panic::resume_unwind(thread_panic);
278                } else {
279                    debug!("shutting down the state because the block write task has finished");
280                }
281            }
282        } else {
283            // Even if we're not the last database user, try shutting it down.
284            //
285            // TODO: rename this to try_shutdown()?
286            self.db.shutdown(false);
287        }
288    }
289}
290
291impl StateService {
292    const PRUNE_INTERVAL: Duration = Duration::from_secs(30);
293
294    /// Creates a new state service for the state `config` and `network`.
295    ///
296    /// Uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
297    /// to work out when it is near the final checkpoint.
298    ///
299    /// Returns the read-write and read-only state services,
300    /// and read-only watch channels for its best chain tip.
301    pub async fn new(
302        config: Config,
303        network: &Network,
304        max_checkpoint_height: block::Height,
305        checkpoint_verify_concurrency_limit: usize,
306    ) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
307        let (finalized_state, finalized_tip, timer) = {
308            let config = config.clone();
309            let network = network.clone();
310            tokio::task::spawn_blocking(move || {
311                let timer = CodeTimer::start();
312                let finalized_state = FinalizedState::new(
313                    &config,
314                    &network,
315                    #[cfg(feature = "elasticsearch")]
316                    true,
317                );
318                timer.finish(module_path!(), line!(), "opening finalized state database");
319
320                let timer = CodeTimer::start();
321                let finalized_tip = finalized_state.db.tip_block();
322
323                (finalized_state, finalized_tip, timer)
324            })
325            .await
326            .expect("failed to join blocking task")
327        };
328
329        // # Correctness
330        //
331        // The state service must set the finalized block write sender to `None`
332        // if there are blocks in the restored non-finalized state that are above
333        // the max checkpoint height so that non-finalized blocks can be written, otherwise,
334        // Zebra will be unable to commit semantically verified blocks, and its chain sync will stall.
335        //
336        // The state service must not set the finalized block write sender to `None` if there
337        // aren't blocks in the restored non-finalized state that are above the max checkpoint height,
338        // otherwise, unless checkpoint sync is disabled in the zebra-consensus configuration,
339        // Zebra will be unable to commit checkpoint verified blocks, and its chain sync will stall.
340        let is_finalized_tip_past_max_checkpoint = if let Some(tip) = &finalized_tip {
341            tip.coinbase_height().expect("valid block must have height") >= max_checkpoint_height
342        } else {
343            false
344        };
345        let (non_finalized_state, non_finalized_state_sender, non_finalized_state_receiver) =
346            NonFinalizedState::new(network)
347                .with_backup(
348                    config.non_finalized_state_backup_dir(network),
349                    &finalized_state.db,
350                    is_finalized_tip_past_max_checkpoint,
351                )
352                .await;
353
354        let non_finalized_block_write_sent_hashes = SentHashes::new(&non_finalized_state);
355        let initial_tip = non_finalized_state
356            .best_tip_block()
357            .map(|cv_block| cv_block.block.clone())
358            .or(finalized_tip)
359            .map(CheckpointVerifiedBlock::from)
360            .map(ChainTipBlock::from);
361
362        tracing::info!(chain_tip = ?initial_tip.as_ref().map(|tip| (tip.hash, tip.height)), "loaded Zebra state cache");
363
364        let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
365            ChainTipSender::new(initial_tip, network);
366
367        let finalized_state_for_writing = finalized_state.clone();
368        let should_use_finalized_block_write_sender = non_finalized_state.is_chain_set_empty();
369        let (block_write_sender, invalid_block_write_reset_receiver, block_write_task) =
370            write::BlockWriteSender::spawn(
371                finalized_state_for_writing,
372                non_finalized_state,
373                chain_tip_sender,
374                non_finalized_state_sender,
375                should_use_finalized_block_write_sender,
376            );
377
378        let read_service = ReadStateService::new(
379            &finalized_state,
380            block_write_task,
381            non_finalized_state_receiver,
382        );
383
384        let full_verifier_utxo_lookahead = max_checkpoint_height
385            - HeightDiff::try_from(checkpoint_verify_concurrency_limit)
386                .expect("fits in HeightDiff");
387        let full_verifier_utxo_lookahead =
388            full_verifier_utxo_lookahead.unwrap_or(block::Height::MIN);
389        let non_finalized_state_queued_blocks = QueuedBlocks::default();
390        let pending_utxos = PendingUtxos::default();
391
392        let finalized_block_write_last_sent_hash =
393            tokio::task::spawn_blocking(move || finalized_state.db.finalized_tip_hash())
394                .await
395                .expect("failed to join blocking task");
396
397        let state = Self {
398            network: network.clone(),
399            full_verifier_utxo_lookahead,
400            non_finalized_state_queued_blocks,
401            finalized_state_queued_blocks: HashMap::new(),
402            block_write_sender,
403            finalized_block_write_last_sent_hash,
404            non_finalized_block_write_sent_hashes,
405            invalid_block_write_reset_receiver,
406            pending_utxos,
407            last_prune: Instant::now(),
408            read_service: read_service.clone(),
409            max_finalized_queue_height: f64::NAN,
410        };
411        timer.finish(module_path!(), line!(), "initializing state service");
412
413        tracing::info!("starting legacy chain check");
414        let timer = CodeTimer::start();
415
416        if let (Some(tip), Some(nu5_activation_height)) = (
417            {
418                let read_state = state.read_service.clone();
419                tokio::task::spawn_blocking(move || read_state.best_tip())
420                    .await
421                    .expect("task should not panic")
422            },
423            NetworkUpgrade::Nu5.activation_height(network),
424        ) {
425            if let Err(error) = check::legacy_chain(
426                nu5_activation_height,
427                any_ancestor_blocks(
428                    &state.read_service.latest_non_finalized_state(),
429                    &state.read_service.db,
430                    tip.1,
431                ),
432                &state.network,
433                MAX_LEGACY_CHAIN_BLOCKS,
434            ) {
435                let legacy_db_path = state.read_service.db.path().to_path_buf();
436                panic!(
437                    "Cached state contains a legacy chain.\n\
438                     An outdated Zebra version did not know about a recent network upgrade,\n\
439                     so it followed a legacy chain using outdated consensus branch rules.\n\
440                     Hint: Delete your database, and restart Zebra to do a full sync.\n\
441                     Database path: {legacy_db_path:?}\n\
442                     Error: {error:?}",
443                );
444            }
445        }
446
447        tracing::info!("cached state consensus branch is valid: no legacy chain found");
448        timer.finish(module_path!(), line!(), "legacy chain check");
449
450        (state, read_service, latest_chain_tip, chain_tip_change)
451    }
452
453    /// Call read only state service to log rocksdb database metrics.
454    pub fn log_db_metrics(&self) {
455        self.read_service.db.print_db_metrics();
456    }
457
458    /// Queue a checkpoint verified block for verification and storage in the finalized state.
459    ///
460    /// Returns a channel receiver that provides the result of the block commit.
461    fn queue_and_commit_to_finalized_state(
462        &mut self,
463        checkpoint_verified: CheckpointVerifiedBlock,
464    ) -> oneshot::Receiver<Result<block::Hash, BoxError>> {
465        // # Correctness & Performance
466        //
467        // This method must not block, access the database, or perform CPU-intensive tasks,
468        // because it is called directly from the tokio executor's Future threads.
469
470        let queued_prev_hash = checkpoint_verified.block.header.previous_block_hash;
471        let queued_height = checkpoint_verified.height;
472
473        // If we're close to the final checkpoint, make the block's UTXOs available for
474        // semantic block verification, even when it is in the channel.
475        if self.is_close_to_final_checkpoint(queued_height) {
476            self.non_finalized_block_write_sent_hashes
477                .add_finalized(&checkpoint_verified)
478        }
479
480        let (rsp_tx, rsp_rx) = oneshot::channel();
481        let queued = (checkpoint_verified, rsp_tx);
482
483        if self.block_write_sender.finalized.is_some() {
484            // We're still committing checkpoint verified blocks
485            if let Some(duplicate_queued) = self
486                .finalized_state_queued_blocks
487                .insert(queued_prev_hash, queued)
488            {
489                Self::send_checkpoint_verified_block_error(
490                    duplicate_queued,
491                    "dropping older checkpoint verified block: got newer duplicate block",
492                );
493            }
494
495            self.drain_finalized_queue_and_commit();
496        } else {
497            // We've finished committing checkpoint verified blocks to the finalized state,
498            // so drop any repeated queued blocks, and return an error.
499            //
500            // TODO: track the latest sent height, and drop any blocks under that height
501            //       every time we send some blocks (like QueuedSemanticallyVerifiedBlocks)
502            Self::send_checkpoint_verified_block_error(
503                queued,
504                "already finished committing checkpoint verified blocks: dropped duplicate block, \
505                 block is already committed to the state",
506            );
507
508            self.clear_finalized_block_queue(
509                "already finished committing checkpoint verified blocks: dropped duplicate block, \
510                 block is already committed to the state",
511            );
512        }
513
514        if self.finalized_state_queued_blocks.is_empty() {
515            self.max_finalized_queue_height = f64::NAN;
516        } else if self.max_finalized_queue_height.is_nan()
517            || self.max_finalized_queue_height < queued_height.0 as f64
518        {
519            // if there are still blocks in the queue, then either:
520            //   - the new block was lower than the old maximum, and there was a gap before it,
521            //     so the maximum is still the same (and we skip this code), or
522            //   - the new block is higher than the old maximum, and there is at least one gap
523            //     between the finalized tip and the new maximum
524            self.max_finalized_queue_height = queued_height.0 as f64;
525        }
526
527        metrics::gauge!("state.checkpoint.queued.max.height").set(self.max_finalized_queue_height);
528        metrics::gauge!("state.checkpoint.queued.block.count")
529            .set(self.finalized_state_queued_blocks.len() as f64);
530
531        rsp_rx
532    }
533
534    /// Finds finalized state queue blocks to be committed to the state in order,
535    /// removes them from the queue, and sends them to the block commit task.
536    ///
537    /// After queueing a finalized block, this method checks whether the newly
538    /// queued block (and any of its descendants) can be committed to the state.
539    ///
540    /// Returns an error if the block commit channel has been closed.
541    pub fn drain_finalized_queue_and_commit(&mut self) {
542        use tokio::sync::mpsc::error::{SendError, TryRecvError};
543
544        // # Correctness & Performance
545        //
546        // This method must not block, access the database, or perform CPU-intensive tasks,
547        // because it is called directly from the tokio executor's Future threads.
548
549        // If a block failed, we need to start again from a valid tip.
550        match self.invalid_block_write_reset_receiver.try_recv() {
551            Ok(reset_tip_hash) => self.finalized_block_write_last_sent_hash = reset_tip_hash,
552            Err(TryRecvError::Disconnected) => {
553                info!("Block commit task closed the block reset channel. Is Zebra shutting down?");
554                return;
555            }
556            // There are no errors, so we can just use the last block hash we sent
557            Err(TryRecvError::Empty) => {}
558        }
559
560        while let Some(queued_block) = self
561            .finalized_state_queued_blocks
562            .remove(&self.finalized_block_write_last_sent_hash)
563        {
564            let last_sent_finalized_block_height = queued_block.0.height;
565
566            self.finalized_block_write_last_sent_hash = queued_block.0.hash;
567
568            // If we've finished sending finalized blocks, ignore any repeated blocks.
569            // (Blocks can be repeated after a syncer reset.)
570            if let Some(finalized_block_write_sender) = &self.block_write_sender.finalized {
571                let send_result = finalized_block_write_sender.send(queued_block);
572
573                // If the receiver is closed, we can't send any more blocks.
574                if let Err(SendError(queued)) = send_result {
575                    // If Zebra is shutting down, drop blocks and return an error.
576                    Self::send_checkpoint_verified_block_error(
577                        queued,
578                        "block commit task exited. Is Zebra shutting down?",
579                    );
580
581                    self.clear_finalized_block_queue(
582                        "block commit task exited. Is Zebra shutting down?",
583                    );
584                } else {
585                    metrics::gauge!("state.checkpoint.sent.block.height")
586                        .set(last_sent_finalized_block_height.0 as f64);
587                };
588            }
589        }
590    }
591
592    /// Drops all finalized state queue blocks, and sends an error on their result channels.
593    fn clear_finalized_block_queue(&mut self, error: impl Into<BoxError> + Clone) {
594        for (_hash, queued) in self.finalized_state_queued_blocks.drain() {
595            Self::send_checkpoint_verified_block_error(queued, error.clone());
596        }
597    }
598
599    /// Send an error on a `QueuedCheckpointVerified` block's result channel, and drop the block
600    fn send_checkpoint_verified_block_error(
601        queued: QueuedCheckpointVerified,
602        error: impl Into<BoxError>,
603    ) {
604        let (finalized, rsp_tx) = queued;
605
606        // The block sender might have already given up on this block,
607        // so ignore any channel send errors.
608        let _ = rsp_tx.send(Err(error.into()));
609        std::mem::drop(finalized);
610    }
611
612    /// Drops all non-finalized state queue blocks, and sends an error on their result channels.
613    fn clear_non_finalized_block_queue(&mut self, error: CommitSemanticallyVerifiedError) {
614        for (_hash, queued) in self.non_finalized_state_queued_blocks.drain() {
615            Self::send_semantically_verified_block_error(queued, error.clone());
616        }
617    }
618
619    /// Send an error on a `QueuedSemanticallyVerified` block's result channel, and drop the block
620    fn send_semantically_verified_block_error(
621        queued: QueuedSemanticallyVerified,
622        error: CommitSemanticallyVerifiedError,
623    ) {
624        let (finalized, rsp_tx) = queued;
625
626        // The block sender might have already given up on this block,
627        // so ignore any channel send errors.
628        let _ = rsp_tx.send(Err(error));
629        std::mem::drop(finalized);
630    }
631
632    /// Queue a semantically verified block for contextual verification and check if any queued
633    /// blocks are ready to be verified and committed to the state.
634    ///
635    /// This function encodes the logic for [committing non-finalized blocks][1]
636    /// in RFC0005.
637    ///
638    /// [1]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html#committing-non-finalized-blocks
639    #[instrument(level = "debug", skip(self, semantically_verified))]
640    fn queue_and_commit_to_non_finalized_state(
641        &mut self,
642        semantically_verified: SemanticallyVerifiedBlock,
643    ) -> oneshot::Receiver<Result<block::Hash, CommitSemanticallyVerifiedError>> {
644        tracing::debug!(block = %semantically_verified.block, "queueing block for contextual verification");
645        let parent_hash = semantically_verified.block.header.previous_block_hash;
646
647        if self
648            .non_finalized_block_write_sent_hashes
649            .contains(&semantically_verified.hash)
650        {
651            let (rsp_tx, rsp_rx) = oneshot::channel();
652            let _ = rsp_tx.send(Err(QueueAndCommitError::new_duplicate(
653                semantically_verified.hash,
654            )
655            .into()));
656            return rsp_rx;
657        }
658
659        if self
660            .read_service
661            .db
662            .contains_height(semantically_verified.height)
663        {
664            let (rsp_tx, rsp_rx) = oneshot::channel();
665            let _ = rsp_tx.send(Err(QueueAndCommitError::new_already_finalized(
666                semantically_verified.height,
667            )
668            .into()));
669            return rsp_rx;
670        }
671
672        // [`Request::CommitSemanticallyVerifiedBlock`] contract: a request to commit a block which
673        // has been queued but not yet committed to the state fails the older request and replaces
674        // it with the newer request.
675        let rsp_rx = if let Some((_, old_rsp_tx)) = self
676            .non_finalized_state_queued_blocks
677            .get_mut(&semantically_verified.hash)
678        {
679            tracing::debug!("replacing older queued request with new request");
680            let (mut rsp_tx, rsp_rx) = oneshot::channel();
681            std::mem::swap(old_rsp_tx, &mut rsp_tx);
682            let _ = rsp_tx.send(Err(QueueAndCommitError::new_replaced(
683                semantically_verified.hash,
684            )
685            .into()));
686            rsp_rx
687        } else {
688            let (rsp_tx, rsp_rx) = oneshot::channel();
689            self.non_finalized_state_queued_blocks
690                .queue((semantically_verified, rsp_tx));
691            rsp_rx
692        };
693
694        // We've finished sending checkpoint verified blocks when:
695        // - we've sent the verified block for the last checkpoint, and
696        // - it has been successfully written to disk.
697        //
698        // We detect the last checkpoint by looking for non-finalized blocks
699        // that are a child of the last block we sent.
700        //
701        // TODO: configure the state with the last checkpoint hash instead?
702        if self.block_write_sender.finalized.is_some()
703            && self
704                .non_finalized_state_queued_blocks
705                .has_queued_children(self.finalized_block_write_last_sent_hash)
706            && self.read_service.db.finalized_tip_hash()
707                == self.finalized_block_write_last_sent_hash
708        {
709            // Tell the block write task to stop committing checkpoint verified blocks to the finalized state,
710            // and move on to committing semantically verified blocks to the non-finalized state.
711            std::mem::drop(self.block_write_sender.finalized.take());
712            // Remove any checkpoint-verified block hashes from `non_finalized_block_write_sent_hashes`.
713            self.non_finalized_block_write_sent_hashes = SentHashes::default();
714            // Mark `SentHashes` as usable by the `can_fork_chain_at()` method.
715            self.non_finalized_block_write_sent_hashes
716                .can_fork_chain_at_hashes = true;
717            // Send blocks from non-finalized queue
718            self.send_ready_non_finalized_queued(self.finalized_block_write_last_sent_hash);
719            // We've finished committing checkpoint verified blocks to finalized state, so drop any repeated queued blocks.
720            self.clear_finalized_block_queue(
721                "already finished committing checkpoint verified blocks: dropped duplicate block, \
722                 block is already committed to the state",
723            );
724        } else if !self.can_fork_chain_at(&parent_hash) {
725            tracing::trace!("unready to verify, returning early");
726        } else if self.block_write_sender.finalized.is_none() {
727            // Wait until block commit task is ready to write non-finalized blocks before dequeuing them
728            self.send_ready_non_finalized_queued(parent_hash);
729
730            let finalized_tip_height = self.read_service.db.finalized_tip_height().expect(
731                "Finalized state must have at least one block before committing non-finalized state",
732            );
733
734            self.non_finalized_state_queued_blocks
735                .prune_by_height(finalized_tip_height);
736
737            self.non_finalized_block_write_sent_hashes
738                .prune_by_height(finalized_tip_height);
739        }
740
741        rsp_rx
742    }
743
744    /// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks.
745    fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
746        self.non_finalized_block_write_sent_hashes
747            .can_fork_chain_at(hash)
748            || &self.read_service.db.finalized_tip_hash() == hash
749    }
750
751    /// Returns `true` if `queued_height` is near the final checkpoint.
752    ///
753    /// The semantic block verifier needs access to UTXOs from checkpoint verified blocks
754    /// near the final checkpoint, so that it can verify blocks that spend those UTXOs.
755    ///
756    /// If it doesn't have the required UTXOs, some blocks will time out,
757    /// but succeed after a syncer restart.
758    fn is_close_to_final_checkpoint(&self, queued_height: block::Height) -> bool {
759        queued_height >= self.full_verifier_utxo_lookahead
760    }
761
762    /// Sends all queued blocks whose parents have recently arrived starting from `new_parent`
763    /// in breadth-first ordering to the block write task which will attempt to validate and commit them
764    #[tracing::instrument(level = "debug", skip(self, new_parent))]
765    fn send_ready_non_finalized_queued(&mut self, new_parent: block::Hash) {
766        use tokio::sync::mpsc::error::SendError;
767        if let Some(non_finalized_block_write_sender) = &self.block_write_sender.non_finalized {
768            let mut new_parents: Vec<block::Hash> = vec![new_parent];
769
770            while let Some(parent_hash) = new_parents.pop() {
771                let queued_children = self
772                    .non_finalized_state_queued_blocks
773                    .dequeue_children(parent_hash);
774
775                for queued_child in queued_children {
776                    let (SemanticallyVerifiedBlock { hash, .. }, _) = queued_child;
777
778                    self.non_finalized_block_write_sent_hashes
779                        .add(&queued_child.0);
780                    let send_result = non_finalized_block_write_sender.send(queued_child.into());
781
782                    if let Err(SendError(NonFinalizedWriteMessage::Commit(queued))) = send_result {
783                        // If Zebra is shutting down, drop blocks and return an error.
784                        Self::send_semantically_verified_block_error(
785                            queued,
786                            CommitSemanticallyVerifiedError::WriteTaskExited,
787                        );
788
789                        self.clear_non_finalized_block_queue(
790                            CommitSemanticallyVerifiedError::WriteTaskExited,
791                        );
792
793                        return;
794                    };
795
796                    new_parents.push(hash);
797                }
798            }
799
800            self.non_finalized_block_write_sent_hashes.finish_batch();
801        };
802    }
803
804    /// Return the tip of the current best chain.
805    pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
806        self.read_service.best_tip()
807    }
808
809    fn send_invalidate_block(
810        &self,
811        hash: block::Hash,
812    ) -> oneshot::Receiver<Result<block::Hash, InvalidateError>> {
813        let (rsp_tx, rsp_rx) = oneshot::channel();
814
815        let Some(sender) = &self.block_write_sender.non_finalized else {
816            let _ = rsp_tx.send(Err(InvalidateError::ProcessingCheckpointedBlocks));
817            return rsp_rx;
818        };
819
820        if let Err(tokio::sync::mpsc::error::SendError(error)) =
821            sender.send(NonFinalizedWriteMessage::Invalidate { hash, rsp_tx })
822        {
823            let NonFinalizedWriteMessage::Invalidate { rsp_tx, .. } = error else {
824                unreachable!("should return the same Invalidate message could not be sent");
825            };
826
827            let _ = rsp_tx.send(Err(InvalidateError::SendInvalidateRequestFailed));
828        }
829
830        rsp_rx
831    }
832
833    fn send_reconsider_block(
834        &self,
835        hash: block::Hash,
836    ) -> oneshot::Receiver<Result<Vec<block::Hash>, ReconsiderError>> {
837        let (rsp_tx, rsp_rx) = oneshot::channel();
838
839        let Some(sender) = &self.block_write_sender.non_finalized else {
840            let _ = rsp_tx.send(Err(ReconsiderError::CheckpointCommitInProgress));
841            return rsp_rx;
842        };
843
844        if let Err(tokio::sync::mpsc::error::SendError(error)) =
845            sender.send(NonFinalizedWriteMessage::Reconsider { hash, rsp_tx })
846        {
847            let NonFinalizedWriteMessage::Reconsider { rsp_tx, .. } = error else {
848                unreachable!("should return the same Reconsider message could not be sent");
849            };
850
851            let _ = rsp_tx.send(Err(ReconsiderError::ReconsiderSendFailed));
852        }
853
854        rsp_rx
855    }
856
857    /// Assert some assumptions about the semantically verified `block` before it is queued.
858    fn assert_block_can_be_validated(&self, block: &SemanticallyVerifiedBlock) {
859        // required by `Request::CommitSemanticallyVerifiedBlock` call
860        assert!(
861            block.height > self.network.mandatory_checkpoint_height(),
862            "invalid semantically verified block height: the canopy checkpoint is mandatory, pre-canopy \
863            blocks, and the canopy activation block, must be committed to the state as finalized \
864            blocks"
865        );
866    }
867}
868
869impl ReadStateService {
870    /// Creates a new read-only state service, using the provided finalized state and
871    /// block write task handle.
872    ///
873    /// Returns the newly created service,
874    /// and a watch channel for updating the shared recent non-finalized chain.
875    pub(crate) fn new(
876        finalized_state: &FinalizedState,
877        block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
878        non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
879    ) -> Self {
880        let read_service = Self {
881            network: finalized_state.network(),
882            db: finalized_state.db.clone(),
883            non_finalized_state_receiver,
884            block_write_task,
885        };
886
887        tracing::debug!("created new read-only state service");
888
889        read_service
890    }
891
892    /// Return the tip of the current best chain.
893    pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
894        read::best_tip(&self.latest_non_finalized_state(), &self.db)
895    }
896
897    /// Gets a clone of the latest non-finalized state from the `non_finalized_state_receiver`
898    fn latest_non_finalized_state(&self) -> NonFinalizedState {
899        self.non_finalized_state_receiver.cloned_watch_data()
900    }
901
902    /// Gets a clone of the latest, best non-finalized chain from the `non_finalized_state_receiver`
903    #[allow(dead_code)]
904    fn latest_best_chain(&self) -> Option<Arc<Chain>> {
905        self.latest_non_finalized_state().best_chain().cloned()
906    }
907
908    /// Test-only access to the inner database.
909    /// Can be used to modify the database without doing any consensus checks.
910    #[cfg(any(test, feature = "proptest-impl"))]
911    pub fn db(&self) -> &ZebraDb {
912        &self.db
913    }
914
915    /// Logs rocksdb metrics using the read only state service.
916    pub fn log_db_metrics(&self) {
917        self.db.print_db_metrics();
918    }
919}
920
921impl Service<Request> for StateService {
922    type Response = Response;
923    type Error = BoxError;
924    type Future =
925        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
926
927    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
928        // Check for panics in the block write task
929        let poll = self.read_service.poll_ready(cx);
930
931        // Prune outdated UTXO requests
932        let now = Instant::now();
933
934        if self.last_prune + Self::PRUNE_INTERVAL < now {
935            let tip = self.best_tip();
936            let old_len = self.pending_utxos.len();
937
938            self.pending_utxos.prune();
939            self.last_prune = now;
940
941            let new_len = self.pending_utxos.len();
942            let prune_count = old_len
943                .checked_sub(new_len)
944                .expect("prune does not add any utxo requests");
945            if prune_count > 0 {
946                tracing::debug!(
947                    ?old_len,
948                    ?new_len,
949                    ?prune_count,
950                    ?tip,
951                    "pruned utxo requests"
952                );
953            } else {
954                tracing::debug!(len = ?old_len, ?tip, "no utxo requests needed pruning");
955            }
956        }
957
958        poll
959    }
960
961    #[instrument(name = "state", skip(self, req))]
962    fn call(&mut self, req: Request) -> Self::Future {
963        req.count_metric();
964        let timer = CodeTimer::start();
965        let span = Span::current();
966
967        match req {
968            // Uses non_finalized_state_queued_blocks and pending_utxos in the StateService
969            // Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb.
970            //
971            // The expected error type for this request is `CommitSemanticallyVerifiedError`.
972            Request::CommitSemanticallyVerifiedBlock(semantically_verified) => {
973                self.assert_block_can_be_validated(&semantically_verified);
974
975                self.pending_utxos
976                    .check_against_ordered(&semantically_verified.new_outputs);
977
978                // # Performance
979                //
980                // Allow other async tasks to make progress while blocks are being verified
981                // and written to disk. But wait for the blocks to finish committing,
982                // so that `StateService` multi-block queries always observe a consistent state.
983                //
984                // Since each block is spawned into its own task,
985                // there shouldn't be any other code running in the same task,
986                // so we don't need to worry about blocking it:
987                // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
988
989                let rsp_rx = tokio::task::block_in_place(move || {
990                    span.in_scope(|| {
991                        self.queue_and_commit_to_non_finalized_state(semantically_verified)
992                    })
993                });
994
995                // TODO:
996                //   - check for panics in the block write task here,
997                //     as well as in poll_ready()
998
999                // The work is all done, the future just waits on a channel for the result
1000                timer.finish(module_path!(), line!(), "CommitSemanticallyVerifiedBlock");
1001
1002                // Await the channel response, flatten the result, map receive errors to
1003                // `CommitSemanticallyVerifiedError::WriteTaskExited`.
1004                // Then flatten the nested Result and convert any errors to a BoxError.
1005                let span = Span::current();
1006                async move {
1007                    rsp_rx
1008                        .await
1009                        .map_err(|_recv_error| CommitSemanticallyVerifiedError::WriteTaskExited)
1010                        .flatten()
1011                        .map_err(BoxError::from)
1012                        .map(Response::Committed)
1013                }
1014                .instrument(span)
1015                .boxed()
1016            }
1017
1018            // Uses finalized_state_queued_blocks and pending_utxos in the StateService.
1019            // Accesses shared writeable state in the StateService.
1020            Request::CommitCheckpointVerifiedBlock(finalized) => {
1021                // # Consensus
1022                //
1023                // A semantic block verification could have called AwaitUtxo
1024                // before this checkpoint verified block arrived in the state.
1025                // So we need to check for pending UTXO requests sent by running
1026                // semantic block verifications.
1027                //
1028                // This check is redundant for most checkpoint verified blocks,
1029                // because semantic verification can only succeed near the final
1030                // checkpoint, when all the UTXOs are available for the verifying block.
1031                //
1032                // (Checkpoint block UTXOs are verified using block hash checkpoints
1033                // and transaction merkle tree block header commitments.)
1034                self.pending_utxos
1035                    .check_against_ordered(&finalized.new_outputs);
1036
1037                // # Performance
1038                //
1039                // This method doesn't block, access the database, or perform CPU-intensive tasks,
1040                // so we can run it directly in the tokio executor's Future threads.
1041                let rsp_rx = self.queue_and_commit_to_finalized_state(finalized);
1042
1043                // TODO:
1044                //   - check for panics in the block write task here,
1045                //     as well as in poll_ready()
1046
1047                // The work is all done, the future just waits on a channel for the result
1048                timer.finish(module_path!(), line!(), "CommitCheckpointVerifiedBlock");
1049
1050                async move {
1051                    rsp_rx
1052                        .await
1053                        .map_err(|_recv_error| {
1054                            BoxError::from("block was dropped from the queue of finalized blocks")
1055                        })
1056                        // TODO: replace with Result::flatten once it stabilises
1057                        // https://github.com/rust-lang/rust/issues/70142
1058                        .and_then(convert::identity)
1059                        .map(Response::Committed)
1060                }
1061                .instrument(span)
1062                .boxed()
1063            }
1064
1065            // Uses pending_utxos and non_finalized_state_queued_blocks in the StateService.
1066            // If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService.
1067            Request::AwaitUtxo(outpoint) => {
1068                // Prepare the AwaitUtxo future from PendingUxtos.
1069                let response_fut = self.pending_utxos.queue(outpoint);
1070                // Only instrument `response_fut`, the ReadStateService already
1071                // instruments its requests with the same span.
1072
1073                let response_fut = response_fut.instrument(span).boxed();
1074
1075                // Check the non-finalized block queue outside the returned future,
1076                // so we can access mutable state fields.
1077                if let Some(utxo) = self.non_finalized_state_queued_blocks.utxo(&outpoint) {
1078                    self.pending_utxos.respond(&outpoint, utxo);
1079
1080                    // We're finished, the returned future gets the UTXO from the respond() channel.
1081                    timer.finish(module_path!(), line!(), "AwaitUtxo/queued-non-finalized");
1082
1083                    return response_fut;
1084                }
1085
1086                // Check the sent non-finalized blocks
1087                if let Some(utxo) = self.non_finalized_block_write_sent_hashes.utxo(&outpoint) {
1088                    self.pending_utxos.respond(&outpoint, utxo);
1089
1090                    // We're finished, the returned future gets the UTXO from the respond() channel.
1091                    timer.finish(module_path!(), line!(), "AwaitUtxo/sent-non-finalized");
1092
1093                    return response_fut;
1094                }
1095
1096                // We ignore any UTXOs in FinalizedState.finalized_state_queued_blocks,
1097                // because it is only used during checkpoint verification.
1098                //
1099                // This creates a rare race condition, but it doesn't seem to happen much in practice.
1100                // See #5126 for details.
1101
1102                // Manually send a request to the ReadStateService,
1103                // to get UTXOs from any non-finalized chain or the finalized chain.
1104                let read_service = self.read_service.clone();
1105
1106                // Run the request in an async block, so we can await the response.
1107                async move {
1108                    let req = ReadRequest::AnyChainUtxo(outpoint);
1109
1110                    let rsp = read_service.oneshot(req).await?;
1111
1112                    // Optional TODO:
1113                    //  - make pending_utxos.respond() async using a channel,
1114                    //    so we can respond to all waiting requests here
1115                    //
1116                    // This change is not required for correctness, because:
1117                    // - any waiting requests should have returned when the block was sent to the state
1118                    // - otherwise, the request returns immediately if:
1119                    //   - the block is in the non-finalized queue, or
1120                    //   - the block is in any non-finalized chain or the finalized state
1121                    //
1122                    // And if the block is in the finalized queue,
1123                    // that's rare enough that a retry is ok.
1124                    if let ReadResponse::AnyChainUtxo(Some(utxo)) = rsp {
1125                        // We got a UTXO, so we replace the response future with the result own.
1126                        timer.finish(module_path!(), line!(), "AwaitUtxo/any-chain");
1127
1128                        return Ok(Response::Utxo(utxo));
1129                    }
1130
1131                    // We're finished, but the returned future is waiting on the respond() channel.
1132                    timer.finish(module_path!(), line!(), "AwaitUtxo/waiting");
1133
1134                    response_fut.await
1135                }
1136                .boxed()
1137            }
1138
1139            // Used by sync, inbound, and block verifier to check if a block is already in the state
1140            // before downloading or validating it.
1141            Request::KnownBlock(hash) => {
1142                let timer = CodeTimer::start();
1143
1144                let read_service = self.read_service.clone();
1145
1146                async move {
1147                    let response = read::non_finalized_state_contains_block_hash(
1148                        &read_service.latest_non_finalized_state(),
1149                        hash,
1150                    )
1151                    .or_else(|| read::finalized_state_contains_block_hash(&read_service.db, hash));
1152
1153                    // The work is done in the future.
1154                    timer.finish(module_path!(), line!(), "Request::KnownBlock");
1155
1156                    Ok(Response::KnownBlock(response))
1157                }
1158                .boxed()
1159            }
1160
1161            // The expected error type for this request is `InvalidateError`
1162            Request::InvalidateBlock(block_hash) => {
1163                let rsp_rx = tokio::task::block_in_place(move || {
1164                    span.in_scope(|| self.send_invalidate_block(block_hash))
1165                });
1166
1167                // Await the channel response, flatten the result, map receive errors to
1168                // `InvalidateError::InvalidateRequestDropped`.
1169                // Then flatten the nested Result and convert any errors to a BoxError.
1170                let span = Span::current();
1171                async move {
1172                    rsp_rx
1173                        .await
1174                        .map_err(|_recv_error| InvalidateError::InvalidateRequestDropped)
1175                        .flatten()
1176                        .map_err(BoxError::from)
1177                        .map(Response::Invalidated)
1178                }
1179                .instrument(span)
1180                .boxed()
1181            }
1182
1183            // The expected error type for this request is `ReconsiderError`
1184            Request::ReconsiderBlock(block_hash) => {
1185                let rsp_rx = tokio::task::block_in_place(move || {
1186                    span.in_scope(|| self.send_reconsider_block(block_hash))
1187                });
1188
1189                // Await the channel response, flatten the result, map receive errors to
1190                // `ReconsiderError::ReconsiderResponseDropped`.
1191                // Then flatten the nested Result and convert any errors to a BoxError.
1192                let span = Span::current();
1193                async move {
1194                    rsp_rx
1195                        .await
1196                        .map_err(|_recv_error| ReconsiderError::ReconsiderResponseDropped)
1197                        .flatten()
1198                        .map_err(BoxError::from)
1199                        .map(Response::Reconsidered)
1200                }
1201                .instrument(span)
1202                .boxed()
1203            }
1204
1205            // Runs concurrently using the ReadStateService
1206            Request::Tip
1207            | Request::Depth(_)
1208            | Request::BestChainNextMedianTimePast
1209            | Request::BestChainBlockHash(_)
1210            | Request::BlockLocator
1211            | Request::Transaction(_)
1212            | Request::AnyChainTransaction(_)
1213            | Request::UnspentBestChainUtxo(_)
1214            | Request::Block(_)
1215            | Request::BlockAndSize(_)
1216            | Request::BlockHeader(_)
1217            | Request::FindBlockHashes { .. }
1218            | Request::FindBlockHeaders { .. }
1219            | Request::CheckBestChainTipNullifiersAndAnchors(_) => {
1220                // Redirect the request to the concurrent ReadStateService
1221                let read_service = self.read_service.clone();
1222
1223                async move {
1224                    let req = req
1225                        .try_into()
1226                        .expect("ReadRequest conversion should not fail");
1227
1228                    let rsp = read_service.oneshot(req).await?;
1229                    let rsp = rsp.try_into().expect("Response conversion should not fail");
1230
1231                    Ok(rsp)
1232                }
1233                .boxed()
1234            }
1235
1236            Request::CheckBlockProposalValidity(_) => {
1237                // Redirect the request to the concurrent ReadStateService
1238                let read_service = self.read_service.clone();
1239
1240                async move {
1241                    let req = req
1242                        .try_into()
1243                        .expect("ReadRequest conversion should not fail");
1244
1245                    let rsp = read_service.oneshot(req).await?;
1246                    let rsp = rsp.try_into().expect("Response conversion should not fail");
1247
1248                    Ok(rsp)
1249                }
1250                .boxed()
1251            }
1252        }
1253    }
1254}
1255
1256impl Service<ReadRequest> for ReadStateService {
1257    type Response = ReadResponse;
1258    type Error = BoxError;
1259    type Future =
1260        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1261
1262    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1263        // Check for panics in the block write task
1264        //
1265        // TODO: move into a check_for_panics() method
1266        let block_write_task = self.block_write_task.take();
1267
1268        if let Some(block_write_task) = block_write_task {
1269            if block_write_task.is_finished() {
1270                if let Some(block_write_task) = Arc::into_inner(block_write_task) {
1271                    // We are the last state with a reference to this task, so we can propagate any panics
1272                    if let Err(thread_panic) = block_write_task.join() {
1273                        std::panic::resume_unwind(thread_panic);
1274                    }
1275                }
1276            } else {
1277                // It hasn't finished, so we need to put it back
1278                self.block_write_task = Some(block_write_task);
1279            }
1280        }
1281
1282        self.db.check_for_panics();
1283
1284        Poll::Ready(Ok(()))
1285    }
1286
1287    #[instrument(name = "read_state", skip(self, req))]
1288    fn call(&mut self, req: ReadRequest) -> Self::Future {
1289        req.count_metric();
1290        let timer = CodeTimer::start();
1291        let span = Span::current();
1292
1293        match req {
1294            // Used by the `getblockchaininfo` RPC.
1295            ReadRequest::UsageInfo => {
1296                let db = self.db.clone();
1297
1298                tokio::task::spawn_blocking(move || {
1299                    span.in_scope(move || {
1300                        // The work is done in the future.
1301
1302                        let db_size = db.size();
1303
1304                        timer.finish(module_path!(), line!(), "ReadRequest::UsageInfo");
1305
1306                        Ok(ReadResponse::UsageInfo(db_size))
1307                    })
1308                })
1309                .wait_for_panics()
1310            }
1311
1312            // Used by the StateService.
1313            ReadRequest::Tip => {
1314                let state = self.clone();
1315
1316                tokio::task::spawn_blocking(move || {
1317                    span.in_scope(move || {
1318                        let tip = state.non_finalized_state_receiver.with_watch_data(
1319                            |non_finalized_state| {
1320                                read::tip(non_finalized_state.best_chain(), &state.db)
1321                            },
1322                        );
1323
1324                        // The work is done in the future.
1325                        timer.finish(module_path!(), line!(), "ReadRequest::Tip");
1326
1327                        Ok(ReadResponse::Tip(tip))
1328                    })
1329                })
1330                .wait_for_panics()
1331            }
1332
1333            // Used by `getblockchaininfo` RPC method.
1334            ReadRequest::TipPoolValues => {
1335                let state = self.clone();
1336
1337                tokio::task::spawn_blocking(move || {
1338                    span.in_scope(move || {
1339                        let tip_with_value_balance = state
1340                            .non_finalized_state_receiver
1341                            .with_watch_data(|non_finalized_state| {
1342                                read::tip_with_value_balance(
1343                                    non_finalized_state.best_chain(),
1344                                    &state.db,
1345                                )
1346                            });
1347
1348                        // The work is done in the future.
1349                        // TODO: Do this in the Drop impl with the variant name?
1350                        timer.finish(module_path!(), line!(), "ReadRequest::TipPoolValues");
1351
1352                        let (tip_height, tip_hash, value_balance) = tip_with_value_balance?
1353                            .ok_or(BoxError::from("no chain tip available yet"))?;
1354
1355                        Ok(ReadResponse::TipPoolValues {
1356                            tip_height,
1357                            tip_hash,
1358                            value_balance,
1359                        })
1360                    })
1361                })
1362                .wait_for_panics()
1363            }
1364
1365            // Used by getblock
1366            ReadRequest::BlockInfo(hash_or_height) => {
1367                let state = self.clone();
1368
1369                tokio::task::spawn_blocking(move || {
1370                    span.in_scope(move || {
1371                        let value_balance = state.non_finalized_state_receiver.with_watch_data(
1372                            |non_finalized_state| {
1373                                read::block_info(
1374                                    non_finalized_state.best_chain(),
1375                                    &state.db,
1376                                    hash_or_height,
1377                                )
1378                            },
1379                        );
1380
1381                        // The work is done in the future.
1382                        // TODO: Do this in the Drop impl with the variant name?
1383                        timer.finish(module_path!(), line!(), "ReadRequest::BlockInfo");
1384
1385                        Ok(ReadResponse::BlockInfo(value_balance))
1386                    })
1387                })
1388                .wait_for_panics()
1389            }
1390
1391            // Used by the StateService.
1392            ReadRequest::Depth(hash) => {
1393                let state = self.clone();
1394
1395                tokio::task::spawn_blocking(move || {
1396                    span.in_scope(move || {
1397                        let depth = state.non_finalized_state_receiver.with_watch_data(
1398                            |non_finalized_state| {
1399                                read::depth(non_finalized_state.best_chain(), &state.db, hash)
1400                            },
1401                        );
1402
1403                        // The work is done in the future.
1404                        timer.finish(module_path!(), line!(), "ReadRequest::Depth");
1405
1406                        Ok(ReadResponse::Depth(depth))
1407                    })
1408                })
1409                .wait_for_panics()
1410            }
1411
1412            // Used by the StateService.
1413            ReadRequest::BestChainNextMedianTimePast => {
1414                let state = self.clone();
1415
1416                tokio::task::spawn_blocking(move || {
1417                    span.in_scope(move || {
1418                        let non_finalized_state = state.latest_non_finalized_state();
1419                        let median_time_past =
1420                            read::next_median_time_past(&non_finalized_state, &state.db);
1421
1422                        // The work is done in the future.
1423                        timer.finish(
1424                            module_path!(),
1425                            line!(),
1426                            "ReadRequest::BestChainNextMedianTimePast",
1427                        );
1428
1429                        Ok(ReadResponse::BestChainNextMedianTimePast(median_time_past?))
1430                    })
1431                })
1432                .wait_for_panics()
1433            }
1434
1435            // Used by the get_block (raw) RPC and the StateService.
1436            ReadRequest::Block(hash_or_height) => {
1437                let state = self.clone();
1438
1439                tokio::task::spawn_blocking(move || {
1440                    span.in_scope(move || {
1441                        let block = state.non_finalized_state_receiver.with_watch_data(
1442                            |non_finalized_state| {
1443                                read::block(
1444                                    non_finalized_state.best_chain(),
1445                                    &state.db,
1446                                    hash_or_height,
1447                                )
1448                            },
1449                        );
1450
1451                        // The work is done in the future.
1452                        timer.finish(module_path!(), line!(), "ReadRequest::Block");
1453
1454                        Ok(ReadResponse::Block(block))
1455                    })
1456                })
1457                .wait_for_panics()
1458            }
1459
1460            // Used by the get_block (raw) RPC and the StateService.
1461            ReadRequest::BlockAndSize(hash_or_height) => {
1462                let state = self.clone();
1463
1464                tokio::task::spawn_blocking(move || {
1465                    span.in_scope(move || {
1466                        let block_and_size = state.non_finalized_state_receiver.with_watch_data(
1467                            |non_finalized_state| {
1468                                read::block_and_size(
1469                                    non_finalized_state.best_chain(),
1470                                    &state.db,
1471                                    hash_or_height,
1472                                )
1473                            },
1474                        );
1475
1476                        // The work is done in the future.
1477                        timer.finish(module_path!(), line!(), "ReadRequest::BlockAndSize");
1478
1479                        Ok(ReadResponse::BlockAndSize(block_and_size))
1480                    })
1481                })
1482                .wait_for_panics()
1483            }
1484
1485            // Used by the get_block (verbose) RPC and the StateService.
1486            ReadRequest::BlockHeader(hash_or_height) => {
1487                let state = self.clone();
1488
1489                tokio::task::spawn_blocking(move || {
1490                    span.in_scope(move || {
1491                        let best_chain = state.latest_best_chain();
1492
1493                        let height = hash_or_height
1494                            .height_or_else(|hash| {
1495                                read::find::height_by_hash(best_chain.clone(), &state.db, hash)
1496                            })
1497                            .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1498
1499                        let hash = hash_or_height
1500                            .hash_or_else(|height| {
1501                                read::find::hash_by_height(best_chain.clone(), &state.db, height)
1502                            })
1503                            .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1504
1505                        let next_height = height.next()?;
1506                        let next_block_hash =
1507                            read::find::hash_by_height(best_chain.clone(), &state.db, next_height);
1508
1509                        let header = read::block_header(best_chain, &state.db, height.into())
1510                            .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1511
1512                        // The work is done in the future.
1513                        timer.finish(module_path!(), line!(), "ReadRequest::Block");
1514
1515                        Ok(ReadResponse::BlockHeader {
1516                            header,
1517                            hash,
1518                            height,
1519                            next_block_hash,
1520                        })
1521                    })
1522                })
1523                .wait_for_panics()
1524            }
1525
1526            // For the get_raw_transaction RPC and the StateService.
1527            ReadRequest::Transaction(hash) => {
1528                let state = self.clone();
1529
1530                tokio::task::spawn_blocking(move || {
1531                    span.in_scope(move || {
1532                        let response =
1533                            read::mined_transaction(state.latest_best_chain(), &state.db, hash);
1534
1535                        // The work is done in the future.
1536                        timer.finish(module_path!(), line!(), "ReadRequest::Transaction");
1537
1538                        Ok(ReadResponse::Transaction(response))
1539                    })
1540                })
1541                .wait_for_panics()
1542            }
1543
1544            ReadRequest::AnyChainTransaction(hash) => {
1545                let state = self.clone();
1546
1547                tokio::task::spawn_blocking(move || {
1548                    span.in_scope(move || {
1549                        let tx = state.non_finalized_state_receiver.with_watch_data(
1550                            |non_finalized_state| {
1551                                read::any_transaction(
1552                                    non_finalized_state.chain_iter(),
1553                                    &state.db,
1554                                    hash,
1555                                )
1556                            },
1557                        );
1558
1559                        // The work is done in the future.
1560                        timer.finish(module_path!(), line!(), "ReadRequest::AnyChainTransaction");
1561
1562                        Ok(ReadResponse::AnyChainTransaction(tx))
1563                    })
1564                })
1565                .wait_for_panics()
1566            }
1567
1568            // Used by the getblock (verbose) RPC.
1569            ReadRequest::TransactionIdsForBlock(hash_or_height) => {
1570                let state = self.clone();
1571
1572                tokio::task::spawn_blocking(move || {
1573                    span.in_scope(move || {
1574                        let transaction_ids = state.non_finalized_state_receiver.with_watch_data(
1575                            |non_finalized_state| {
1576                                read::transaction_hashes_for_block(
1577                                    non_finalized_state.best_chain(),
1578                                    &state.db,
1579                                    hash_or_height,
1580                                )
1581                            },
1582                        );
1583
1584                        // The work is done in the future.
1585                        timer.finish(
1586                            module_path!(),
1587                            line!(),
1588                            "ReadRequest::TransactionIdsForBlock",
1589                        );
1590
1591                        Ok(ReadResponse::TransactionIdsForBlock(transaction_ids))
1592                    })
1593                })
1594                .wait_for_panics()
1595            }
1596
1597            ReadRequest::AnyChainTransactionIdsForBlock(hash_or_height) => {
1598                let state = self.clone();
1599
1600                tokio::task::spawn_blocking(move || {
1601                    span.in_scope(move || {
1602                        let transaction_ids = state.non_finalized_state_receiver.with_watch_data(
1603                            |non_finalized_state| {
1604                                read::transaction_hashes_for_any_block(
1605                                    non_finalized_state.chain_iter(),
1606                                    &state.db,
1607                                    hash_or_height,
1608                                )
1609                            },
1610                        );
1611
1612                        // The work is done in the future.
1613                        timer.finish(
1614                            module_path!(),
1615                            line!(),
1616                            "ReadRequest::AnyChainTransactionIdsForBlock",
1617                        );
1618
1619                        Ok(ReadResponse::AnyChainTransactionIdsForBlock(
1620                            transaction_ids,
1621                        ))
1622                    })
1623                })
1624                .wait_for_panics()
1625            }
1626
1627            #[cfg(feature = "indexer")]
1628            ReadRequest::SpendingTransactionId(spend) => {
1629                let state = self.clone();
1630
1631                tokio::task::spawn_blocking(move || {
1632                    span.in_scope(move || {
1633                        let spending_transaction_id = state
1634                            .non_finalized_state_receiver
1635                            .with_watch_data(|non_finalized_state| {
1636                                read::spending_transaction_hash(
1637                                    non_finalized_state.best_chain(),
1638                                    &state.db,
1639                                    spend,
1640                                )
1641                            });
1642
1643                        // The work is done in the future.
1644                        timer.finish(
1645                            module_path!(),
1646                            line!(),
1647                            "ReadRequest::TransactionIdForSpentOutPoint",
1648                        );
1649
1650                        Ok(ReadResponse::TransactionId(spending_transaction_id))
1651                    })
1652                })
1653                .wait_for_panics()
1654            }
1655
1656            ReadRequest::UnspentBestChainUtxo(outpoint) => {
1657                let state = self.clone();
1658
1659                tokio::task::spawn_blocking(move || {
1660                    span.in_scope(move || {
1661                        let utxo = state.non_finalized_state_receiver.with_watch_data(
1662                            |non_finalized_state| {
1663                                read::unspent_utxo(
1664                                    non_finalized_state.best_chain(),
1665                                    &state.db,
1666                                    outpoint,
1667                                )
1668                            },
1669                        );
1670
1671                        // The work is done in the future.
1672                        timer.finish(module_path!(), line!(), "ReadRequest::UnspentBestChainUtxo");
1673
1674                        Ok(ReadResponse::UnspentBestChainUtxo(utxo))
1675                    })
1676                })
1677                .wait_for_panics()
1678            }
1679
1680            // Manually used by the StateService to implement part of AwaitUtxo.
1681            ReadRequest::AnyChainUtxo(outpoint) => {
1682                let state = self.clone();
1683
1684                tokio::task::spawn_blocking(move || {
1685                    span.in_scope(move || {
1686                        let utxo = state.non_finalized_state_receiver.with_watch_data(
1687                            |non_finalized_state| {
1688                                read::any_utxo(non_finalized_state, &state.db, outpoint)
1689                            },
1690                        );
1691
1692                        // The work is done in the future.
1693                        timer.finish(module_path!(), line!(), "ReadRequest::AnyChainUtxo");
1694
1695                        Ok(ReadResponse::AnyChainUtxo(utxo))
1696                    })
1697                })
1698                .wait_for_panics()
1699            }
1700
1701            // Used by the StateService.
1702            ReadRequest::BlockLocator => {
1703                let state = self.clone();
1704
1705                tokio::task::spawn_blocking(move || {
1706                    span.in_scope(move || {
1707                        let block_locator = state.non_finalized_state_receiver.with_watch_data(
1708                            |non_finalized_state| {
1709                                read::block_locator(non_finalized_state.best_chain(), &state.db)
1710                            },
1711                        );
1712
1713                        // The work is done in the future.
1714                        timer.finish(module_path!(), line!(), "ReadRequest::BlockLocator");
1715
1716                        Ok(ReadResponse::BlockLocator(
1717                            block_locator.unwrap_or_default(),
1718                        ))
1719                    })
1720                })
1721                .wait_for_panics()
1722            }
1723
1724            // Used by the StateService.
1725            ReadRequest::FindBlockHashes { known_blocks, stop } => {
1726                let state = self.clone();
1727
1728                tokio::task::spawn_blocking(move || {
1729                    span.in_scope(move || {
1730                        let block_hashes = state.non_finalized_state_receiver.with_watch_data(
1731                            |non_finalized_state| {
1732                                read::find_chain_hashes(
1733                                    non_finalized_state.best_chain(),
1734                                    &state.db,
1735                                    known_blocks,
1736                                    stop,
1737                                    MAX_FIND_BLOCK_HASHES_RESULTS,
1738                                )
1739                            },
1740                        );
1741
1742                        // The work is done in the future.
1743                        timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHashes");
1744
1745                        Ok(ReadResponse::BlockHashes(block_hashes))
1746                    })
1747                })
1748                .wait_for_panics()
1749            }
1750
1751            // Used by the StateService.
1752            ReadRequest::FindBlockHeaders { known_blocks, stop } => {
1753                let state = self.clone();
1754
1755                tokio::task::spawn_blocking(move || {
1756                    span.in_scope(move || {
1757                        let block_headers = state.non_finalized_state_receiver.with_watch_data(
1758                            |non_finalized_state| {
1759                                read::find_chain_headers(
1760                                    non_finalized_state.best_chain(),
1761                                    &state.db,
1762                                    known_blocks,
1763                                    stop,
1764                                    MAX_FIND_BLOCK_HEADERS_RESULTS,
1765                                )
1766                            },
1767                        );
1768
1769                        let block_headers = block_headers
1770                            .into_iter()
1771                            .map(|header| CountedHeader { header })
1772                            .collect();
1773
1774                        // The work is done in the future.
1775                        timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHeaders");
1776
1777                        Ok(ReadResponse::BlockHeaders(block_headers))
1778                    })
1779                })
1780                .wait_for_panics()
1781            }
1782
1783            ReadRequest::SaplingTree(hash_or_height) => {
1784                let state = self.clone();
1785
1786                tokio::task::spawn_blocking(move || {
1787                    span.in_scope(move || {
1788                        let sapling_tree = state.non_finalized_state_receiver.with_watch_data(
1789                            |non_finalized_state| {
1790                                read::sapling_tree(
1791                                    non_finalized_state.best_chain(),
1792                                    &state.db,
1793                                    hash_or_height,
1794                                )
1795                            },
1796                        );
1797
1798                        // The work is done in the future.
1799                        timer.finish(module_path!(), line!(), "ReadRequest::SaplingTree");
1800
1801                        Ok(ReadResponse::SaplingTree(sapling_tree))
1802                    })
1803                })
1804                .wait_for_panics()
1805            }
1806
1807            ReadRequest::OrchardTree(hash_or_height) => {
1808                let state = self.clone();
1809
1810                tokio::task::spawn_blocking(move || {
1811                    span.in_scope(move || {
1812                        let orchard_tree = state.non_finalized_state_receiver.with_watch_data(
1813                            |non_finalized_state| {
1814                                read::orchard_tree(
1815                                    non_finalized_state.best_chain(),
1816                                    &state.db,
1817                                    hash_or_height,
1818                                )
1819                            },
1820                        );
1821
1822                        // The work is done in the future.
1823                        timer.finish(module_path!(), line!(), "ReadRequest::OrchardTree");
1824
1825                        Ok(ReadResponse::OrchardTree(orchard_tree))
1826                    })
1827                })
1828                .wait_for_panics()
1829            }
1830
1831            ReadRequest::SaplingSubtrees { start_index, limit } => {
1832                let state = self.clone();
1833
1834                tokio::task::spawn_blocking(move || {
1835                    span.in_scope(move || {
1836                        let end_index = limit
1837                            .and_then(|limit| start_index.0.checked_add(limit.0))
1838                            .map(NoteCommitmentSubtreeIndex);
1839
1840                        let sapling_subtrees = state.non_finalized_state_receiver.with_watch_data(
1841                            |non_finalized_state| {
1842                                if let Some(end_index) = end_index {
1843                                    read::sapling_subtrees(
1844                                        non_finalized_state.best_chain(),
1845                                        &state.db,
1846                                        start_index..end_index,
1847                                    )
1848                                } else {
1849                                    // If there is no end bound, just return all the trees.
1850                                    // If the end bound would overflow, just returns all the trees, because that's what
1851                                    // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1852                                    // the trees run out.)
1853                                    read::sapling_subtrees(
1854                                        non_finalized_state.best_chain(),
1855                                        &state.db,
1856                                        start_index..,
1857                                    )
1858                                }
1859                            },
1860                        );
1861
1862                        // The work is done in the future.
1863                        timer.finish(module_path!(), line!(), "ReadRequest::SaplingSubtrees");
1864
1865                        Ok(ReadResponse::SaplingSubtrees(sapling_subtrees))
1866                    })
1867                })
1868                .wait_for_panics()
1869            }
1870
1871            ReadRequest::OrchardSubtrees { start_index, limit } => {
1872                let state = self.clone();
1873
1874                tokio::task::spawn_blocking(move || {
1875                    span.in_scope(move || {
1876                        let end_index = limit
1877                            .and_then(|limit| start_index.0.checked_add(limit.0))
1878                            .map(NoteCommitmentSubtreeIndex);
1879
1880                        let orchard_subtrees = state.non_finalized_state_receiver.with_watch_data(
1881                            |non_finalized_state| {
1882                                if let Some(end_index) = end_index {
1883                                    read::orchard_subtrees(
1884                                        non_finalized_state.best_chain(),
1885                                        &state.db,
1886                                        start_index..end_index,
1887                                    )
1888                                } else {
1889                                    // If there is no end bound, just return all the trees.
1890                                    // If the end bound would overflow, just returns all the trees, because that's what
1891                                    // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1892                                    // the trees run out.)
1893                                    read::orchard_subtrees(
1894                                        non_finalized_state.best_chain(),
1895                                        &state.db,
1896                                        start_index..,
1897                                    )
1898                                }
1899                            },
1900                        );
1901
1902                        // The work is done in the future.
1903                        timer.finish(module_path!(), line!(), "ReadRequest::OrchardSubtrees");
1904
1905                        Ok(ReadResponse::OrchardSubtrees(orchard_subtrees))
1906                    })
1907                })
1908                .wait_for_panics()
1909            }
1910
1911            // For the get_address_balance RPC.
1912            ReadRequest::AddressBalance(addresses) => {
1913                let state = self.clone();
1914
1915                tokio::task::spawn_blocking(move || {
1916                    span.in_scope(move || {
1917                        let (balance, received) = state
1918                            .non_finalized_state_receiver
1919                            .with_watch_data(|non_finalized_state| {
1920                                read::transparent_balance(
1921                                    non_finalized_state.best_chain().cloned(),
1922                                    &state.db,
1923                                    addresses,
1924                                )
1925                            })?;
1926
1927                        // The work is done in the future.
1928                        timer.finish(module_path!(), line!(), "ReadRequest::AddressBalance");
1929
1930                        Ok(ReadResponse::AddressBalance { balance, received })
1931                    })
1932                })
1933                .wait_for_panics()
1934            }
1935
1936            // For the get_address_tx_ids RPC.
1937            ReadRequest::TransactionIdsByAddresses {
1938                addresses,
1939                height_range,
1940            } => {
1941                let state = self.clone();
1942
1943                tokio::task::spawn_blocking(move || {
1944                    span.in_scope(move || {
1945                        let tx_ids = state.non_finalized_state_receiver.with_watch_data(
1946                            |non_finalized_state| {
1947                                read::transparent_tx_ids(
1948                                    non_finalized_state.best_chain(),
1949                                    &state.db,
1950                                    addresses,
1951                                    height_range,
1952                                )
1953                            },
1954                        );
1955
1956                        // The work is done in the future.
1957                        timer.finish(
1958                            module_path!(),
1959                            line!(),
1960                            "ReadRequest::TransactionIdsByAddresses",
1961                        );
1962
1963                        tx_ids.map(ReadResponse::AddressesTransactionIds)
1964                    })
1965                })
1966                .wait_for_panics()
1967            }
1968
1969            // For the get_address_utxos RPC.
1970            ReadRequest::UtxosByAddresses(addresses) => {
1971                let state = self.clone();
1972
1973                tokio::task::spawn_blocking(move || {
1974                    span.in_scope(move || {
1975                        let utxos = state.non_finalized_state_receiver.with_watch_data(
1976                            |non_finalized_state| {
1977                                read::address_utxos(
1978                                    &state.network,
1979                                    non_finalized_state.best_chain(),
1980                                    &state.db,
1981                                    addresses,
1982                                )
1983                            },
1984                        );
1985
1986                        // The work is done in the future.
1987                        timer.finish(module_path!(), line!(), "ReadRequest::UtxosByAddresses");
1988
1989                        utxos.map(ReadResponse::AddressUtxos)
1990                    })
1991                })
1992                .wait_for_panics()
1993            }
1994
1995            ReadRequest::CheckBestChainTipNullifiersAndAnchors(unmined_tx) => {
1996                let state = self.clone();
1997
1998                tokio::task::spawn_blocking(move || {
1999                    span.in_scope(move || {
2000                        let latest_non_finalized_best_chain =
2001                            state.latest_non_finalized_state().best_chain().cloned();
2002
2003                        check::nullifier::tx_no_duplicates_in_chain(
2004                            &state.db,
2005                            latest_non_finalized_best_chain.as_ref(),
2006                            &unmined_tx.transaction,
2007                        )?;
2008
2009                        check::anchors::tx_anchors_refer_to_final_treestates(
2010                            &state.db,
2011                            latest_non_finalized_best_chain.as_ref(),
2012                            &unmined_tx,
2013                        )?;
2014
2015                        // The work is done in the future.
2016                        timer.finish(
2017                            module_path!(),
2018                            line!(),
2019                            "ReadRequest::CheckBestChainTipNullifiersAndAnchors",
2020                        );
2021
2022                        Ok(ReadResponse::ValidBestChainTipNullifiersAndAnchors)
2023                    })
2024                })
2025                .wait_for_panics()
2026            }
2027
2028            // Used by the get_block and get_block_hash RPCs.
2029            ReadRequest::BestChainBlockHash(height) => {
2030                let state = self.clone();
2031
2032                // # Performance
2033                //
2034                // Allow other async tasks to make progress while concurrently reading blocks from disk.
2035
2036                tokio::task::spawn_blocking(move || {
2037                    span.in_scope(move || {
2038                        let hash = state.non_finalized_state_receiver.with_watch_data(
2039                            |non_finalized_state| {
2040                                read::hash_by_height(
2041                                    non_finalized_state.best_chain(),
2042                                    &state.db,
2043                                    height,
2044                                )
2045                            },
2046                        );
2047
2048                        // The work is done in the future.
2049                        timer.finish(module_path!(), line!(), "ReadRequest::BestChainBlockHash");
2050
2051                        Ok(ReadResponse::BlockHash(hash))
2052                    })
2053                })
2054                .wait_for_panics()
2055            }
2056
2057            // Used by get_block_template and getblockchaininfo RPCs.
2058            ReadRequest::ChainInfo => {
2059                let state = self.clone();
2060                let latest_non_finalized_state = self.latest_non_finalized_state();
2061
2062                // # Performance
2063                //
2064                // Allow other async tasks to make progress while concurrently reading blocks from disk.
2065
2066                tokio::task::spawn_blocking(move || {
2067                    span.in_scope(move || {
2068                        // # Correctness
2069                        //
2070                        // It is ok to do these lookups using multiple database calls. Finalized state updates
2071                        // can only add overlapping blocks, and block hashes are unique across all chain forks.
2072                        //
2073                        // If there is a large overlap between the non-finalized and finalized states,
2074                        // where the finalized tip is above the non-finalized tip,
2075                        // Zebra is receiving a lot of blocks, or this request has been delayed for a long time.
2076                        //
2077                        // In that case, the `getblocktemplate` RPC will return an error because Zebra
2078                        // is not synced to the tip. That check happens before the RPC makes this request.
2079                        let get_block_template_info =
2080                            read::difficulty::get_block_template_chain_info(
2081                                &latest_non_finalized_state,
2082                                &state.db,
2083                                &state.network,
2084                            );
2085
2086                        // The work is done in the future.
2087                        timer.finish(module_path!(), line!(), "ReadRequest::ChainInfo");
2088
2089                        get_block_template_info.map(ReadResponse::ChainInfo)
2090                    })
2091                })
2092                .wait_for_panics()
2093            }
2094
2095            // Used by getmininginfo, getnetworksolps, and getnetworkhashps RPCs.
2096            ReadRequest::SolutionRate { num_blocks, height } => {
2097                let state = self.clone();
2098
2099                // # Performance
2100                //
2101                // Allow other async tasks to make progress while concurrently reading blocks from disk.
2102
2103                tokio::task::spawn_blocking(move || {
2104                    span.in_scope(move || {
2105                        let latest_non_finalized_state = state.latest_non_finalized_state();
2106                        // # Correctness
2107                        //
2108                        // It is ok to do these lookups using multiple database calls. Finalized state updates
2109                        // can only add overlapping blocks, and block hashes are unique across all chain forks.
2110                        //
2111                        // The worst that can happen here is that the default `start_hash` will be below
2112                        // the chain tip.
2113                        let (tip_height, tip_hash) =
2114                            match read::tip(latest_non_finalized_state.best_chain(), &state.db) {
2115                                Some(tip_hash) => tip_hash,
2116                                None => return Ok(ReadResponse::SolutionRate(None)),
2117                            };
2118
2119                        let start_hash = match height {
2120                            Some(height) if height < tip_height => read::hash_by_height(
2121                                latest_non_finalized_state.best_chain(),
2122                                &state.db,
2123                                height,
2124                            ),
2125                            // use the chain tip hash if height is above it or not provided.
2126                            _ => Some(tip_hash),
2127                        };
2128
2129                        let solution_rate = start_hash.and_then(|start_hash| {
2130                            read::difficulty::solution_rate(
2131                                &latest_non_finalized_state,
2132                                &state.db,
2133                                num_blocks,
2134                                start_hash,
2135                            )
2136                        });
2137
2138                        // The work is done in the future.
2139                        timer.finish(module_path!(), line!(), "ReadRequest::SolutionRate");
2140
2141                        Ok(ReadResponse::SolutionRate(solution_rate))
2142                    })
2143                })
2144                .wait_for_panics()
2145            }
2146
2147            ReadRequest::CheckBlockProposalValidity(semantically_verified) => {
2148                let state = self.clone();
2149
2150                // # Performance
2151                //
2152                // Allow other async tasks to make progress while concurrently reading blocks from disk.
2153
2154                tokio::task::spawn_blocking(move || {
2155                    span.in_scope(move || {
2156                        tracing::debug!("attempting to validate and commit block proposal onto a cloned non-finalized state");
2157                        let mut latest_non_finalized_state = state.latest_non_finalized_state();
2158
2159                        // The previous block of a valid proposal must be on the best chain tip.
2160                        let Some((_best_tip_height, best_tip_hash)) = read::best_tip(&latest_non_finalized_state, &state.db) else {
2161                            return Err("state is empty: wait for Zebra to sync before submitting a proposal".into());
2162                        };
2163
2164                        if semantically_verified.block.header.previous_block_hash != best_tip_hash {
2165                            return Err("proposal is not based on the current best chain tip: previous block hash must be the best chain tip".into());
2166                        }
2167
2168                        // This clone of the non-finalized state is dropped when this closure returns.
2169                        // The non-finalized state that's used in the rest of the state (including finalizing
2170                        // blocks into the db) is not mutated here.
2171                        //
2172                        // TODO: Convert `CommitSemanticallyVerifiedError` to a new `ValidateProposalError`?
2173                        latest_non_finalized_state.disable_metrics();
2174
2175                        write::validate_and_commit_non_finalized(
2176                            &state.db,
2177                            &mut latest_non_finalized_state,
2178                            semantically_verified,
2179                        )?;
2180
2181                        // The work is done in the future.
2182                        timer.finish(
2183                            module_path!(),
2184                            line!(),
2185                            "ReadRequest::CheckBlockProposalValidity",
2186                        );
2187
2188                        Ok(ReadResponse::ValidBlockProposal)
2189                    })
2190                })
2191                .wait_for_panics()
2192            }
2193
2194            ReadRequest::TipBlockSize => {
2195                let state = self.clone();
2196
2197                tokio::task::spawn_blocking(move || {
2198                    span.in_scope(move || {
2199                        // Get the best chain tip height.
2200                        let tip_height = state
2201                            .non_finalized_state_receiver
2202                            .with_watch_data(|non_finalized_state| {
2203                                read::tip_height(non_finalized_state.best_chain(), &state.db)
2204                            })
2205                            .unwrap_or(Height(0));
2206
2207                        // Get the block at the best chain tip height.
2208                        let block = state.non_finalized_state_receiver.with_watch_data(
2209                            |non_finalized_state| {
2210                                read::block(
2211                                    non_finalized_state.best_chain(),
2212                                    &state.db,
2213                                    tip_height.into(),
2214                                )
2215                            },
2216                        );
2217
2218                        // The work is done in the future.
2219                        timer.finish(module_path!(), line!(), "ReadRequest::TipBlockSize");
2220
2221                        // Respond with the length of the obtained block if any.
2222                        match block {
2223                            Some(b) => Ok(ReadResponse::TipBlockSize(Some(
2224                                b.zcash_serialize_to_vec()?.len(),
2225                            ))),
2226                            None => Ok(ReadResponse::TipBlockSize(None)),
2227                        }
2228                    })
2229                })
2230                .wait_for_panics()
2231            }
2232
2233            ReadRequest::NonFinalizedBlocksListener => {
2234                // The non-finalized blocks listener is used to notify the state service
2235                // about new blocks that have been added to the non-finalized state.
2236                let non_finalized_blocks_listener = NonFinalizedBlocksListener::spawn(
2237                    self.network.clone(),
2238                    self.non_finalized_state_receiver.clone(),
2239                );
2240
2241                async move {
2242                    timer.finish(
2243                        module_path!(),
2244                        line!(),
2245                        "ReadRequest::NonFinalizedBlocksListener",
2246                    );
2247
2248                    Ok(ReadResponse::NonFinalizedBlocksListener(
2249                        non_finalized_blocks_listener,
2250                    ))
2251                }
2252                .boxed()
2253            }
2254        }
2255    }
2256}
2257
2258/// Initialize a state service from the provided [`Config`].
2259/// Returns a boxed state service, a read-only state service,
2260/// and receivers for state chain tip updates.
2261///
2262/// Each `network` has its own separate on-disk database.
2263///
2264/// The state uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
2265/// to work out when it is near the final checkpoint.
2266///
2267/// To share access to the state, wrap the returned service in a `Buffer`,
2268/// or clone the returned [`ReadStateService`].
2269///
2270/// It's possible to construct multiple state services in the same application (as
2271/// long as they, e.g., use different storage locations), but doing so is
2272/// probably not what you want.
2273pub async fn init(
2274    config: Config,
2275    network: &Network,
2276    max_checkpoint_height: block::Height,
2277    checkpoint_verify_concurrency_limit: usize,
2278) -> (
2279    BoxService<Request, Response, BoxError>,
2280    ReadStateService,
2281    LatestChainTip,
2282    ChainTipChange,
2283) {
2284    let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
2285        StateService::new(
2286            config,
2287            network,
2288            max_checkpoint_height,
2289            checkpoint_verify_concurrency_limit,
2290        )
2291        .await;
2292
2293    (
2294        BoxService::new(state_service),
2295        read_only_state_service,
2296        latest_chain_tip,
2297        chain_tip_change,
2298    )
2299}
2300
2301/// Initialize a read state service from the provided [`Config`].
2302/// Returns a read-only state service,
2303///
2304/// Each `network` has its own separate on-disk database.
2305///
2306/// To share access to the state, clone the returned [`ReadStateService`].
2307pub fn init_read_only(
2308    config: Config,
2309    network: &Network,
2310) -> (
2311    ReadStateService,
2312    ZebraDb,
2313    tokio::sync::watch::Sender<NonFinalizedState>,
2314) {
2315    let finalized_state = FinalizedState::new_with_debug(
2316        &config,
2317        network,
2318        true,
2319        #[cfg(feature = "elasticsearch")]
2320        false,
2321        true,
2322    );
2323    let (non_finalized_state_sender, non_finalized_state_receiver) =
2324        tokio::sync::watch::channel(NonFinalizedState::new(network));
2325
2326    (
2327        ReadStateService::new(
2328            &finalized_state,
2329            None,
2330            WatchReceiver::new(non_finalized_state_receiver),
2331        ),
2332        finalized_state.db.clone(),
2333        non_finalized_state_sender,
2334    )
2335}
2336
2337/// Calls [`init_read_only`] with the provided [`Config`] and [`Network`] from a blocking task.
2338/// Returns a [`tokio::task::JoinHandle`] with a read state service and chain tip sender.
2339pub fn spawn_init_read_only(
2340    config: Config,
2341    network: &Network,
2342) -> tokio::task::JoinHandle<(
2343    ReadStateService,
2344    ZebraDb,
2345    tokio::sync::watch::Sender<NonFinalizedState>,
2346)> {
2347    let network = network.clone();
2348    tokio::task::spawn_blocking(move || init_read_only(config, &network))
2349}
2350
2351/// Returns a [`StateService`] with an ephemeral [`Config`] and a buffer with a single slot.
2352///
2353/// This can be used to create a state service for testing. See also [`init`].
2354#[cfg(any(test, feature = "proptest-impl"))]
2355pub async fn init_test(
2356    network: &Network,
2357) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
2358    // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
2359    //       if we ever need to test final checkpoint sent UTXO queries
2360    let (state_service, _, _, _) =
2361        StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
2362
2363    Buffer::new(BoxService::new(state_service), 1)
2364}
2365
2366/// Initializes a state service with an ephemeral [`Config`] and a buffer with a single slot,
2367/// then returns the read-write service, read-only service, and tip watch channels.
2368///
2369/// This can be used to create a state service for testing. See also [`init`].
2370#[cfg(any(test, feature = "proptest-impl"))]
2371pub async fn init_test_services(
2372    network: &Network,
2373) -> (
2374    Buffer<BoxService<Request, Response, BoxError>, Request>,
2375    ReadStateService,
2376    LatestChainTip,
2377    ChainTipChange,
2378) {
2379    // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
2380    //       if we ever need to test final checkpoint sent UTXO queries
2381    let (state_service, read_state_service, latest_chain_tip, chain_tip_change) =
2382        StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
2383
2384    let state_service = Buffer::new(BoxService::new(state_service), 1);
2385
2386    (
2387        state_service,
2388        read_state_service,
2389        latest_chain_tip,
2390        chain_tip_change,
2391    )
2392}