zebra_state/
service.rs

1//! [`tower::Service`]s for Zebra's cached chain state.
2//!
3//! Zebra provides cached state access via two main services:
4//! - [`StateService`]: a read-write service that writes blocks to the state,
5//!   and redirects most read requests to the [`ReadStateService`].
6//! - [`ReadStateService`]: a read-only service that answers from the most
7//!   recent committed block.
8//!
9//! Most users should prefer [`ReadStateService`], unless they need to write blocks to the state.
10//!
11//! Zebra also provides access to the best chain tip via:
12//! - [`LatestChainTip`]: a read-only channel that contains the latest committed
13//!   tip.
14//! - [`ChainTipChange`]: a read-only channel that can asynchronously await
15//!   chain tip changes.
16
17use std::{
18    collections::HashMap,
19    convert,
20    future::Future,
21    pin::Pin,
22    sync::Arc,
23    task::{Context, Poll},
24    time::{Duration, Instant},
25};
26
27use futures::future::FutureExt;
28use tokio::sync::oneshot;
29use tower::{util::BoxService, Service, ServiceExt};
30use tracing::{instrument, Instrument, Span};
31
32#[cfg(any(test, feature = "proptest-impl"))]
33use tower::buffer::Buffer;
34
35use zebra_chain::{
36    block::{self, CountedHeader, HeightDiff},
37    diagnostic::{task::WaitForPanics, CodeTimer},
38    parameters::{Network, NetworkUpgrade},
39    subtree::NoteCommitmentSubtreeIndex,
40};
41
42use zebra_chain::{block::Height, serialization::ZcashSerialize};
43
44use crate::{
45    constants::{
46        MAX_FIND_BLOCK_HASHES_RESULTS, MAX_FIND_BLOCK_HEADERS_RESULTS, MAX_LEGACY_CHAIN_BLOCKS,
47    },
48    error::{InvalidateError, QueueAndCommitError, ReconsiderError},
49    response::NonFinalizedBlocksListener,
50    service::{
51        block_iter::any_ancestor_blocks,
52        chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip},
53        finalized_state::{FinalizedState, ZebraDb},
54        non_finalized_state::{Chain, NonFinalizedState},
55        pending_utxos::PendingUtxos,
56        queued_blocks::QueuedBlocks,
57        watch_receiver::WatchReceiver,
58    },
59    BoxError, CheckpointVerifiedBlock, CommitSemanticallyVerifiedError, Config, ReadRequest,
60    ReadResponse, Request, Response, SemanticallyVerifiedBlock,
61};
62
63pub mod block_iter;
64pub mod chain_tip;
65pub mod watch_receiver;
66
67pub mod check;
68
69pub(crate) mod finalized_state;
70pub(crate) mod non_finalized_state;
71mod pending_utxos;
72mod queued_blocks;
73pub(crate) mod read;
74mod write;
75
76#[cfg(any(test, feature = "proptest-impl"))]
77pub mod arbitrary;
78
79#[cfg(test)]
80mod tests;
81
82pub use finalized_state::{OutputLocation, TransactionIndex, TransactionLocation};
83use write::NonFinalizedWriteMessage;
84
85use self::queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified, SentHashes};
86
87/// A read-write service for Zebra's cached blockchain state.
88///
89/// This service modifies and provides access to:
90/// - the non-finalized state: the ~100 most recent blocks.
91///   Zebra allows chain forks in the non-finalized state,
92///   stores it in memory, and re-downloads it when restarted.
93/// - the finalized state: older blocks that have many confirmations.
94///   Zebra stores the single best chain in the finalized state,
95///   and re-loads it from disk when restarted.
96///
97/// Read requests to this service are buffered, then processed concurrently.
98/// Block write requests are buffered, then queued, then processed in order by a separate task.
99///
100/// Most state users can get faster read responses using the [`ReadStateService`],
101/// because its requests do not share a [`tower::buffer::Buffer`] with block write requests.
102///
103/// To quickly get the latest block, use [`LatestChainTip`] or [`ChainTipChange`].
104/// They can read the latest block directly, without queueing any requests.
105#[derive(Debug)]
106pub(crate) struct StateService {
107    // Configuration
108    //
109    /// The configured Zcash network.
110    network: Network,
111
112    /// The height that we start storing UTXOs from finalized blocks.
113    ///
114    /// This height should be lower than the last few checkpoints,
115    /// so the full verifier can verify UTXO spends from those blocks,
116    /// even if they haven't been committed to the finalized state yet.
117    full_verifier_utxo_lookahead: block::Height,
118
119    // Queued Blocks
120    //
121    /// Queued blocks for the [`NonFinalizedState`] that arrived out of order.
122    /// These blocks are awaiting their parent blocks before they can do contextual verification.
123    non_finalized_state_queued_blocks: QueuedBlocks,
124
125    /// Queued blocks for the [`FinalizedState`] that arrived out of order.
126    /// These blocks are awaiting their parent blocks before they can do contextual verification.
127    ///
128    /// Indexed by their parent block hash.
129    finalized_state_queued_blocks: HashMap<block::Hash, QueuedCheckpointVerified>,
130
131    /// Channels to send blocks to the block write task.
132    block_write_sender: write::BlockWriteSender,
133
134    /// The [`block::Hash`] of the most recent block sent on
135    /// `finalized_block_write_sender` or `non_finalized_block_write_sender`.
136    ///
137    /// On startup, this is:
138    /// - the finalized tip, if there are stored blocks, or
139    /// - the genesis block's parent hash, if the database is empty.
140    ///
141    /// If `invalid_block_write_reset_receiver` gets a reset, this is:
142    /// - the hash of the last valid committed block (the parent of the invalid block).
143    finalized_block_write_last_sent_hash: block::Hash,
144
145    /// A set of block hashes that have been sent to the block write task.
146    /// Hashes of blocks below the finalized tip height are periodically pruned.
147    non_finalized_block_write_sent_hashes: SentHashes,
148
149    /// If an invalid block is sent on `finalized_block_write_sender`
150    /// or `non_finalized_block_write_sender`,
151    /// this channel gets the [`block::Hash`] of the valid tip.
152    //
153    // TODO: add tests for finalized and non-finalized resets (#2654)
154    invalid_block_write_reset_receiver: tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
155
156    // Pending UTXO Request Tracking
157    //
158    /// The set of outpoints with pending requests for their associated transparent::Output.
159    pending_utxos: PendingUtxos,
160
161    /// Instant tracking the last time `pending_utxos` was pruned.
162    last_prune: Instant,
163
164    // Updating Concurrently Readable State
165    //
166    /// A cloneable [`ReadStateService`], used to answer concurrent read requests.
167    ///
168    /// TODO: move users of read [`Request`]s to [`ReadStateService`], and remove `read_service`.
169    read_service: ReadStateService,
170
171    // Metrics
172    //
173    /// A metric tracking the maximum height that's currently in `finalized_state_queued_blocks`
174    ///
175    /// Set to `f64::NAN` if `finalized_state_queued_blocks` is empty, because grafana shows NaNs
176    /// as a break in the graph.
177    max_finalized_queue_height: f64,
178}
179
180/// A read-only service for accessing Zebra's cached blockchain state.
181///
182/// This service provides read-only access to:
183/// - the non-finalized state: the ~100 most recent blocks.
184/// - the finalized state: older blocks that have many confirmations.
185///
186/// Requests to this service are processed in parallel,
187/// ignoring any blocks queued by the read-write [`StateService`].
188///
189/// This quick response behavior is better for most state users.
190/// It allows other async tasks to make progress while concurrently reading data from disk.
191#[derive(Clone, Debug)]
192pub struct ReadStateService {
193    // Configuration
194    //
195    /// The configured Zcash network.
196    network: Network,
197
198    // Shared Concurrently Readable State
199    //
200    /// A watch channel with a cached copy of the [`NonFinalizedState`].
201    ///
202    /// This state is only updated between requests,
203    /// so it might include some block data that is also on `disk`.
204    non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
205
206    /// The shared inner on-disk database for the finalized state.
207    ///
208    /// RocksDB allows reads and writes via a shared reference,
209    /// but [`ZebraDb`] doesn't expose any write methods or types.
210    ///
211    /// This chain is updated concurrently with requests,
212    /// so it might include some block data that is also in `best_mem`.
213    db: ZebraDb,
214
215    /// A shared handle to a task that writes blocks to the [`NonFinalizedState`] or [`FinalizedState`],
216    /// once the queues have received all their parent blocks.
217    ///
218    /// Used to check for panics when writing blocks.
219    block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
220}
221
222impl Drop for StateService {
223    fn drop(&mut self) {
224        // The state service owns the state, tasks, and channels,
225        // so dropping it should shut down everything.
226
227        // Close the channels (non-blocking)
228        // This makes the block write thread exit the next time it checks the channels.
229        // We want to do this here so we get any errors or panics from the block write task before it shuts down.
230        self.invalid_block_write_reset_receiver.close();
231
232        std::mem::drop(self.block_write_sender.finalized.take());
233        std::mem::drop(self.block_write_sender.non_finalized.take());
234
235        self.clear_finalized_block_queue(
236            "dropping the state: dropped unused finalized state queue block",
237        );
238        self.clear_non_finalized_block_queue(CommitSemanticallyVerifiedError::WriteTaskExited);
239
240        // Log database metrics before shutting down
241        info!("dropping the state: logging database metrics");
242        self.log_db_metrics();
243
244        // Then drop self.read_service, which checks the block write task for panics,
245        // and tries to shut down the database.
246    }
247}
248
249impl Drop for ReadStateService {
250    fn drop(&mut self) {
251        // The read state service shares the state,
252        // so dropping it should check if we can shut down.
253
254        // TODO: move this into a try_shutdown() method
255        if let Some(block_write_task) = self.block_write_task.take() {
256            if let Some(block_write_task_handle) = Arc::into_inner(block_write_task) {
257                // We're the last database user, so we can tell it to shut down (blocking):
258                // - flushes the database to disk, and
259                // - drops the database, which cleans up any database tasks correctly.
260                self.db.shutdown(true);
261
262                // We are the last state with a reference to this thread, so we can
263                // wait until the block write task finishes, then check for panics (blocking).
264                // (We'd also like to abort the thread, but std::thread::JoinHandle can't do that.)
265
266                // This log is verbose during tests.
267                #[cfg(not(test))]
268                info!("waiting for the block write task to finish");
269                #[cfg(test)]
270                debug!("waiting for the block write task to finish");
271
272                // TODO: move this into a check_for_panics() method
273                if let Err(thread_panic) = block_write_task_handle.join() {
274                    std::panic::resume_unwind(thread_panic);
275                } else {
276                    debug!("shutting down the state because the block write task has finished");
277                }
278            }
279        } else {
280            // Even if we're not the last database user, try shutting it down.
281            //
282            // TODO: rename this to try_shutdown()?
283            self.db.shutdown(false);
284        }
285    }
286}
287
288impl StateService {
289    const PRUNE_INTERVAL: Duration = Duration::from_secs(30);
290
291    /// Creates a new state service for the state `config` and `network`.
292    ///
293    /// Uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
294    /// to work out when it is near the final checkpoint.
295    ///
296    /// Returns the read-write and read-only state services,
297    /// and read-only watch channels for its best chain tip.
298    pub async fn new(
299        config: Config,
300        network: &Network,
301        max_checkpoint_height: block::Height,
302        checkpoint_verify_concurrency_limit: usize,
303    ) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
304        let (finalized_state, finalized_tip, timer) = {
305            let config = config.clone();
306            let network = network.clone();
307            tokio::task::spawn_blocking(move || {
308                let timer = CodeTimer::start();
309                let finalized_state = FinalizedState::new(
310                    &config,
311                    &network,
312                    #[cfg(feature = "elasticsearch")]
313                    true,
314                );
315                timer.finish(module_path!(), line!(), "opening finalized state database");
316
317                let timer = CodeTimer::start();
318                let finalized_tip = finalized_state.db.tip_block();
319
320                (finalized_state, finalized_tip, timer)
321            })
322            .await
323            .expect("failed to join blocking task")
324        };
325
326        // # Correctness
327        //
328        // The state service must set the finalized block write sender to `None`
329        // if there are blocks in the restored non-finalized state that are above
330        // the max checkpoint height so that non-finalized blocks can be written, otherwise,
331        // Zebra will be unable to commit semantically verified blocks, and its chain sync will stall.
332        //
333        // The state service must not set the finalized block write sender to `None` if there
334        // aren't blocks in the restored non-finalized state that are above the max checkpoint height,
335        // otherwise, unless checkpoint sync is disabled in the zebra-consensus configuration,
336        // Zebra will be unable to commit checkpoint verified blocks, and its chain sync will stall.
337        let is_finalized_tip_past_max_checkpoint = if let Some(tip) = &finalized_tip {
338            tip.coinbase_height().expect("valid block must have height") >= max_checkpoint_height
339        } else {
340            false
341        };
342        let (non_finalized_state, non_finalized_state_sender, non_finalized_state_receiver) =
343            NonFinalizedState::new(network)
344                .with_backup(
345                    config.non_finalized_state_backup_dir(network),
346                    &finalized_state.db,
347                    is_finalized_tip_past_max_checkpoint,
348                )
349                .await;
350
351        let non_finalized_block_write_sent_hashes = SentHashes::new(&non_finalized_state);
352        let initial_tip = non_finalized_state
353            .best_tip_block()
354            .map(|cv_block| cv_block.block.clone())
355            .or(finalized_tip)
356            .map(CheckpointVerifiedBlock::from)
357            .map(ChainTipBlock::from);
358
359        tracing::info!(chain_tip = ?initial_tip.as_ref().map(|tip| (tip.hash, tip.height)), "loaded Zebra state cache");
360
361        let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
362            ChainTipSender::new(initial_tip, network);
363
364        let finalized_state_for_writing = finalized_state.clone();
365        let should_use_finalized_block_write_sender = non_finalized_state.is_chain_set_empty();
366        let (block_write_sender, invalid_block_write_reset_receiver, block_write_task) =
367            write::BlockWriteSender::spawn(
368                finalized_state_for_writing,
369                non_finalized_state,
370                chain_tip_sender,
371                non_finalized_state_sender,
372                should_use_finalized_block_write_sender,
373            );
374
375        let read_service = ReadStateService::new(
376            &finalized_state,
377            block_write_task,
378            non_finalized_state_receiver,
379        );
380
381        let full_verifier_utxo_lookahead = max_checkpoint_height
382            - HeightDiff::try_from(checkpoint_verify_concurrency_limit)
383                .expect("fits in HeightDiff");
384        let full_verifier_utxo_lookahead =
385            full_verifier_utxo_lookahead.unwrap_or(block::Height::MIN);
386        let non_finalized_state_queued_blocks = QueuedBlocks::default();
387        let pending_utxos = PendingUtxos::default();
388
389        let finalized_block_write_last_sent_hash =
390            tokio::task::spawn_blocking(move || finalized_state.db.finalized_tip_hash())
391                .await
392                .expect("failed to join blocking task");
393
394        let state = Self {
395            network: network.clone(),
396            full_verifier_utxo_lookahead,
397            non_finalized_state_queued_blocks,
398            finalized_state_queued_blocks: HashMap::new(),
399            block_write_sender,
400            finalized_block_write_last_sent_hash,
401            non_finalized_block_write_sent_hashes,
402            invalid_block_write_reset_receiver,
403            pending_utxos,
404            last_prune: Instant::now(),
405            read_service: read_service.clone(),
406            max_finalized_queue_height: f64::NAN,
407        };
408        timer.finish(module_path!(), line!(), "initializing state service");
409
410        tracing::info!("starting legacy chain check");
411        let timer = CodeTimer::start();
412
413        if let (Some(tip), Some(nu5_activation_height)) = (
414            {
415                let read_state = state.read_service.clone();
416                tokio::task::spawn_blocking(move || read_state.best_tip())
417                    .await
418                    .expect("task should not panic")
419            },
420            NetworkUpgrade::Nu5.activation_height(network),
421        ) {
422            if let Err(error) = check::legacy_chain(
423                nu5_activation_height,
424                any_ancestor_blocks(
425                    &state.read_service.latest_non_finalized_state(),
426                    &state.read_service.db,
427                    tip.1,
428                ),
429                &state.network,
430                MAX_LEGACY_CHAIN_BLOCKS,
431            ) {
432                let legacy_db_path = state.read_service.db.path().to_path_buf();
433                panic!(
434                    "Cached state contains a legacy chain.\n\
435                     An outdated Zebra version did not know about a recent network upgrade,\n\
436                     so it followed a legacy chain using outdated consensus branch rules.\n\
437                     Hint: Delete your database, and restart Zebra to do a full sync.\n\
438                     Database path: {legacy_db_path:?}\n\
439                     Error: {error:?}",
440                );
441            }
442        }
443
444        tracing::info!("cached state consensus branch is valid: no legacy chain found");
445        timer.finish(module_path!(), line!(), "legacy chain check");
446
447        (state, read_service, latest_chain_tip, chain_tip_change)
448    }
449
450    /// Call read only state service to log rocksdb database metrics.
451    pub fn log_db_metrics(&self) {
452        self.read_service.db.print_db_metrics();
453    }
454
455    /// Queue a checkpoint verified block for verification and storage in the finalized state.
456    ///
457    /// Returns a channel receiver that provides the result of the block commit.
458    fn queue_and_commit_to_finalized_state(
459        &mut self,
460        checkpoint_verified: CheckpointVerifiedBlock,
461    ) -> oneshot::Receiver<Result<block::Hash, BoxError>> {
462        // # Correctness & Performance
463        //
464        // This method must not block, access the database, or perform CPU-intensive tasks,
465        // because it is called directly from the tokio executor's Future threads.
466
467        let queued_prev_hash = checkpoint_verified.block.header.previous_block_hash;
468        let queued_height = checkpoint_verified.height;
469
470        // If we're close to the final checkpoint, make the block's UTXOs available for
471        // semantic block verification, even when it is in the channel.
472        if self.is_close_to_final_checkpoint(queued_height) {
473            self.non_finalized_block_write_sent_hashes
474                .add_finalized(&checkpoint_verified)
475        }
476
477        let (rsp_tx, rsp_rx) = oneshot::channel();
478        let queued = (checkpoint_verified, rsp_tx);
479
480        if self.block_write_sender.finalized.is_some() {
481            // We're still committing checkpoint verified blocks
482            if let Some(duplicate_queued) = self
483                .finalized_state_queued_blocks
484                .insert(queued_prev_hash, queued)
485            {
486                Self::send_checkpoint_verified_block_error(
487                    duplicate_queued,
488                    "dropping older checkpoint verified block: got newer duplicate block",
489                );
490            }
491
492            self.drain_finalized_queue_and_commit();
493        } else {
494            // We've finished committing checkpoint verified blocks to the finalized state,
495            // so drop any repeated queued blocks, and return an error.
496            //
497            // TODO: track the latest sent height, and drop any blocks under that height
498            //       every time we send some blocks (like QueuedSemanticallyVerifiedBlocks)
499            Self::send_checkpoint_verified_block_error(
500                queued,
501                "already finished committing checkpoint verified blocks: dropped duplicate block, \
502                 block is already committed to the state",
503            );
504
505            self.clear_finalized_block_queue(
506                "already finished committing checkpoint verified blocks: dropped duplicate block, \
507                 block is already committed to the state",
508            );
509        }
510
511        if self.finalized_state_queued_blocks.is_empty() {
512            self.max_finalized_queue_height = f64::NAN;
513        } else if self.max_finalized_queue_height.is_nan()
514            || self.max_finalized_queue_height < queued_height.0 as f64
515        {
516            // if there are still blocks in the queue, then either:
517            //   - the new block was lower than the old maximum, and there was a gap before it,
518            //     so the maximum is still the same (and we skip this code), or
519            //   - the new block is higher than the old maximum, and there is at least one gap
520            //     between the finalized tip and the new maximum
521            self.max_finalized_queue_height = queued_height.0 as f64;
522        }
523
524        metrics::gauge!("state.checkpoint.queued.max.height").set(self.max_finalized_queue_height);
525        metrics::gauge!("state.checkpoint.queued.block.count")
526            .set(self.finalized_state_queued_blocks.len() as f64);
527
528        rsp_rx
529    }
530
531    /// Finds finalized state queue blocks to be committed to the state in order,
532    /// removes them from the queue, and sends them to the block commit task.
533    ///
534    /// After queueing a finalized block, this method checks whether the newly
535    /// queued block (and any of its descendants) can be committed to the state.
536    ///
537    /// Returns an error if the block commit channel has been closed.
538    pub fn drain_finalized_queue_and_commit(&mut self) {
539        use tokio::sync::mpsc::error::{SendError, TryRecvError};
540
541        // # Correctness & Performance
542        //
543        // This method must not block, access the database, or perform CPU-intensive tasks,
544        // because it is called directly from the tokio executor's Future threads.
545
546        // If a block failed, we need to start again from a valid tip.
547        match self.invalid_block_write_reset_receiver.try_recv() {
548            Ok(reset_tip_hash) => self.finalized_block_write_last_sent_hash = reset_tip_hash,
549            Err(TryRecvError::Disconnected) => {
550                info!("Block commit task closed the block reset channel. Is Zebra shutting down?");
551                return;
552            }
553            // There are no errors, so we can just use the last block hash we sent
554            Err(TryRecvError::Empty) => {}
555        }
556
557        while let Some(queued_block) = self
558            .finalized_state_queued_blocks
559            .remove(&self.finalized_block_write_last_sent_hash)
560        {
561            let last_sent_finalized_block_height = queued_block.0.height;
562
563            self.finalized_block_write_last_sent_hash = queued_block.0.hash;
564
565            // If we've finished sending finalized blocks, ignore any repeated blocks.
566            // (Blocks can be repeated after a syncer reset.)
567            if let Some(finalized_block_write_sender) = &self.block_write_sender.finalized {
568                let send_result = finalized_block_write_sender.send(queued_block);
569
570                // If the receiver is closed, we can't send any more blocks.
571                if let Err(SendError(queued)) = send_result {
572                    // If Zebra is shutting down, drop blocks and return an error.
573                    Self::send_checkpoint_verified_block_error(
574                        queued,
575                        "block commit task exited. Is Zebra shutting down?",
576                    );
577
578                    self.clear_finalized_block_queue(
579                        "block commit task exited. Is Zebra shutting down?",
580                    );
581                } else {
582                    metrics::gauge!("state.checkpoint.sent.block.height")
583                        .set(last_sent_finalized_block_height.0 as f64);
584                };
585            }
586        }
587    }
588
589    /// Drops all finalized state queue blocks, and sends an error on their result channels.
590    fn clear_finalized_block_queue(&mut self, error: impl Into<BoxError> + Clone) {
591        for (_hash, queued) in self.finalized_state_queued_blocks.drain() {
592            Self::send_checkpoint_verified_block_error(queued, error.clone());
593        }
594    }
595
596    /// Send an error on a `QueuedCheckpointVerified` block's result channel, and drop the block
597    fn send_checkpoint_verified_block_error(
598        queued: QueuedCheckpointVerified,
599        error: impl Into<BoxError>,
600    ) {
601        let (finalized, rsp_tx) = queued;
602
603        // The block sender might have already given up on this block,
604        // so ignore any channel send errors.
605        let _ = rsp_tx.send(Err(error.into()));
606        std::mem::drop(finalized);
607    }
608
609    /// Drops all non-finalized state queue blocks, and sends an error on their result channels.
610    fn clear_non_finalized_block_queue(&mut self, error: CommitSemanticallyVerifiedError) {
611        for (_hash, queued) in self.non_finalized_state_queued_blocks.drain() {
612            Self::send_semantically_verified_block_error(queued, error.clone());
613        }
614    }
615
616    /// Send an error on a `QueuedSemanticallyVerified` block's result channel, and drop the block
617    fn send_semantically_verified_block_error(
618        queued: QueuedSemanticallyVerified,
619        error: CommitSemanticallyVerifiedError,
620    ) {
621        let (finalized, rsp_tx) = queued;
622
623        // The block sender might have already given up on this block,
624        // so ignore any channel send errors.
625        let _ = rsp_tx.send(Err(error));
626        std::mem::drop(finalized);
627    }
628
629    /// Queue a semantically verified block for contextual verification and check if any queued
630    /// blocks are ready to be verified and committed to the state.
631    ///
632    /// This function encodes the logic for [committing non-finalized blocks][1]
633    /// in RFC0005.
634    ///
635    /// [1]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html#committing-non-finalized-blocks
636    #[instrument(level = "debug", skip(self, semantically_verified))]
637    fn queue_and_commit_to_non_finalized_state(
638        &mut self,
639        semantically_verified: SemanticallyVerifiedBlock,
640    ) -> oneshot::Receiver<Result<block::Hash, CommitSemanticallyVerifiedError>> {
641        tracing::debug!(block = %semantically_verified.block, "queueing block for contextual verification");
642        let parent_hash = semantically_verified.block.header.previous_block_hash;
643
644        if self
645            .non_finalized_block_write_sent_hashes
646            .contains(&semantically_verified.hash)
647        {
648            let (rsp_tx, rsp_rx) = oneshot::channel();
649            let _ = rsp_tx.send(Err(QueueAndCommitError::new_duplicate(
650                semantically_verified.hash,
651            )
652            .into()));
653            return rsp_rx;
654        }
655
656        if self
657            .read_service
658            .db
659            .contains_height(semantically_verified.height)
660        {
661            let (rsp_tx, rsp_rx) = oneshot::channel();
662            let _ = rsp_tx.send(Err(QueueAndCommitError::new_already_finalized(
663                semantically_verified.height,
664            )
665            .into()));
666            return rsp_rx;
667        }
668
669        // [`Request::CommitSemanticallyVerifiedBlock`] contract: a request to commit a block which
670        // has been queued but not yet committed to the state fails the older request and replaces
671        // it with the newer request.
672        let rsp_rx = if let Some((_, old_rsp_tx)) = self
673            .non_finalized_state_queued_blocks
674            .get_mut(&semantically_verified.hash)
675        {
676            tracing::debug!("replacing older queued request with new request");
677            let (mut rsp_tx, rsp_rx) = oneshot::channel();
678            std::mem::swap(old_rsp_tx, &mut rsp_tx);
679            let _ = rsp_tx.send(Err(QueueAndCommitError::new_replaced(
680                semantically_verified.hash,
681            )
682            .into()));
683            rsp_rx
684        } else {
685            let (rsp_tx, rsp_rx) = oneshot::channel();
686            self.non_finalized_state_queued_blocks
687                .queue((semantically_verified, rsp_tx));
688            rsp_rx
689        };
690
691        // We've finished sending checkpoint verified blocks when:
692        // - we've sent the verified block for the last checkpoint, and
693        // - it has been successfully written to disk.
694        //
695        // We detect the last checkpoint by looking for non-finalized blocks
696        // that are a child of the last block we sent.
697        //
698        // TODO: configure the state with the last checkpoint hash instead?
699        if self.block_write_sender.finalized.is_some()
700            && self
701                .non_finalized_state_queued_blocks
702                .has_queued_children(self.finalized_block_write_last_sent_hash)
703            && self.read_service.db.finalized_tip_hash()
704                == self.finalized_block_write_last_sent_hash
705        {
706            // Tell the block write task to stop committing checkpoint verified blocks to the finalized state,
707            // and move on to committing semantically verified blocks to the non-finalized state.
708            std::mem::drop(self.block_write_sender.finalized.take());
709            // Remove any checkpoint-verified block hashes from `non_finalized_block_write_sent_hashes`.
710            self.non_finalized_block_write_sent_hashes = SentHashes::default();
711            // Mark `SentHashes` as usable by the `can_fork_chain_at()` method.
712            self.non_finalized_block_write_sent_hashes
713                .can_fork_chain_at_hashes = true;
714            // Send blocks from non-finalized queue
715            self.send_ready_non_finalized_queued(self.finalized_block_write_last_sent_hash);
716            // We've finished committing checkpoint verified blocks to finalized state, so drop any repeated queued blocks.
717            self.clear_finalized_block_queue(
718                "already finished committing checkpoint verified blocks: dropped duplicate block, \
719                 block is already committed to the state",
720            );
721        } else if !self.can_fork_chain_at(&parent_hash) {
722            tracing::trace!("unready to verify, returning early");
723        } else if self.block_write_sender.finalized.is_none() {
724            // Wait until block commit task is ready to write non-finalized blocks before dequeuing them
725            self.send_ready_non_finalized_queued(parent_hash);
726
727            let finalized_tip_height = self.read_service.db.finalized_tip_height().expect(
728                "Finalized state must have at least one block before committing non-finalized state",
729            );
730
731            self.non_finalized_state_queued_blocks
732                .prune_by_height(finalized_tip_height);
733
734            self.non_finalized_block_write_sent_hashes
735                .prune_by_height(finalized_tip_height);
736        }
737
738        rsp_rx
739    }
740
741    /// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks.
742    fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
743        self.non_finalized_block_write_sent_hashes
744            .can_fork_chain_at(hash)
745            || &self.read_service.db.finalized_tip_hash() == hash
746    }
747
748    /// Returns `true` if `queued_height` is near the final checkpoint.
749    ///
750    /// The semantic block verifier needs access to UTXOs from checkpoint verified blocks
751    /// near the final checkpoint, so that it can verify blocks that spend those UTXOs.
752    ///
753    /// If it doesn't have the required UTXOs, some blocks will time out,
754    /// but succeed after a syncer restart.
755    fn is_close_to_final_checkpoint(&self, queued_height: block::Height) -> bool {
756        queued_height >= self.full_verifier_utxo_lookahead
757    }
758
759    /// Sends all queued blocks whose parents have recently arrived starting from `new_parent`
760    /// in breadth-first ordering to the block write task which will attempt to validate and commit them
761    #[tracing::instrument(level = "debug", skip(self, new_parent))]
762    fn send_ready_non_finalized_queued(&mut self, new_parent: block::Hash) {
763        use tokio::sync::mpsc::error::SendError;
764        if let Some(non_finalized_block_write_sender) = &self.block_write_sender.non_finalized {
765            let mut new_parents: Vec<block::Hash> = vec![new_parent];
766
767            while let Some(parent_hash) = new_parents.pop() {
768                let queued_children = self
769                    .non_finalized_state_queued_blocks
770                    .dequeue_children(parent_hash);
771
772                for queued_child in queued_children {
773                    let (SemanticallyVerifiedBlock { hash, .. }, _) = queued_child;
774
775                    self.non_finalized_block_write_sent_hashes
776                        .add(&queued_child.0);
777                    let send_result = non_finalized_block_write_sender.send(queued_child.into());
778
779                    if let Err(SendError(NonFinalizedWriteMessage::Commit(queued))) = send_result {
780                        // If Zebra is shutting down, drop blocks and return an error.
781                        Self::send_semantically_verified_block_error(
782                            queued,
783                            CommitSemanticallyVerifiedError::WriteTaskExited,
784                        );
785
786                        self.clear_non_finalized_block_queue(
787                            CommitSemanticallyVerifiedError::WriteTaskExited,
788                        );
789
790                        return;
791                    };
792
793                    new_parents.push(hash);
794                }
795            }
796
797            self.non_finalized_block_write_sent_hashes.finish_batch();
798        };
799    }
800
801    /// Return the tip of the current best chain.
802    pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
803        self.read_service.best_tip()
804    }
805
806    fn send_invalidate_block(
807        &self,
808        hash: block::Hash,
809    ) -> oneshot::Receiver<Result<block::Hash, InvalidateError>> {
810        let (rsp_tx, rsp_rx) = oneshot::channel();
811
812        let Some(sender) = &self.block_write_sender.non_finalized else {
813            let _ = rsp_tx.send(Err(InvalidateError::ProcessingCheckpointedBlocks));
814            return rsp_rx;
815        };
816
817        if let Err(tokio::sync::mpsc::error::SendError(error)) =
818            sender.send(NonFinalizedWriteMessage::Invalidate { hash, rsp_tx })
819        {
820            let NonFinalizedWriteMessage::Invalidate { rsp_tx, .. } = error else {
821                unreachable!("should return the same Invalidate message could not be sent");
822            };
823
824            let _ = rsp_tx.send(Err(InvalidateError::SendInvalidateRequestFailed));
825        }
826
827        rsp_rx
828    }
829
830    fn send_reconsider_block(
831        &self,
832        hash: block::Hash,
833    ) -> oneshot::Receiver<Result<Vec<block::Hash>, ReconsiderError>> {
834        let (rsp_tx, rsp_rx) = oneshot::channel();
835
836        let Some(sender) = &self.block_write_sender.non_finalized else {
837            let _ = rsp_tx.send(Err(ReconsiderError::CheckpointCommitInProgress));
838            return rsp_rx;
839        };
840
841        if let Err(tokio::sync::mpsc::error::SendError(error)) =
842            sender.send(NonFinalizedWriteMessage::Reconsider { hash, rsp_tx })
843        {
844            let NonFinalizedWriteMessage::Reconsider { rsp_tx, .. } = error else {
845                unreachable!("should return the same Reconsider message could not be sent");
846            };
847
848            let _ = rsp_tx.send(Err(ReconsiderError::ReconsiderSendFailed));
849        }
850
851        rsp_rx
852    }
853
854    /// Assert some assumptions about the semantically verified `block` before it is queued.
855    fn assert_block_can_be_validated(&self, block: &SemanticallyVerifiedBlock) {
856        // required by `Request::CommitSemanticallyVerifiedBlock` call
857        assert!(
858            block.height > self.network.mandatory_checkpoint_height(),
859            "invalid semantically verified block height: the canopy checkpoint is mandatory, pre-canopy \
860            blocks, and the canopy activation block, must be committed to the state as finalized \
861            blocks"
862        );
863    }
864}
865
866impl ReadStateService {
867    /// Creates a new read-only state service, using the provided finalized state and
868    /// block write task handle.
869    ///
870    /// Returns the newly created service,
871    /// and a watch channel for updating the shared recent non-finalized chain.
872    pub(crate) fn new(
873        finalized_state: &FinalizedState,
874        block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
875        non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
876    ) -> Self {
877        let read_service = Self {
878            network: finalized_state.network(),
879            db: finalized_state.db.clone(),
880            non_finalized_state_receiver,
881            block_write_task,
882        };
883
884        tracing::debug!("created new read-only state service");
885
886        read_service
887    }
888
889    /// Return the tip of the current best chain.
890    pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
891        read::best_tip(&self.latest_non_finalized_state(), &self.db)
892    }
893
894    /// Gets a clone of the latest non-finalized state from the `non_finalized_state_receiver`
895    fn latest_non_finalized_state(&self) -> NonFinalizedState {
896        self.non_finalized_state_receiver.cloned_watch_data()
897    }
898
899    /// Gets a clone of the latest, best non-finalized chain from the `non_finalized_state_receiver`
900    #[allow(dead_code)]
901    fn latest_best_chain(&self) -> Option<Arc<Chain>> {
902        self.latest_non_finalized_state().best_chain().cloned()
903    }
904
905    /// Test-only access to the inner database.
906    /// Can be used to modify the database without doing any consensus checks.
907    #[cfg(any(test, feature = "proptest-impl"))]
908    pub fn db(&self) -> &ZebraDb {
909        &self.db
910    }
911
912    /// Logs rocksdb metrics using the read only state service.
913    pub fn log_db_metrics(&self) {
914        self.db.print_db_metrics();
915    }
916}
917
918impl Service<Request> for StateService {
919    type Response = Response;
920    type Error = BoxError;
921    type Future =
922        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
923
924    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
925        // Check for panics in the block write task
926        let poll = self.read_service.poll_ready(cx);
927
928        // Prune outdated UTXO requests
929        let now = Instant::now();
930
931        if self.last_prune + Self::PRUNE_INTERVAL < now {
932            let tip = self.best_tip();
933            let old_len = self.pending_utxos.len();
934
935            self.pending_utxos.prune();
936            self.last_prune = now;
937
938            let new_len = self.pending_utxos.len();
939            let prune_count = old_len
940                .checked_sub(new_len)
941                .expect("prune does not add any utxo requests");
942            if prune_count > 0 {
943                tracing::debug!(
944                    ?old_len,
945                    ?new_len,
946                    ?prune_count,
947                    ?tip,
948                    "pruned utxo requests"
949                );
950            } else {
951                tracing::debug!(len = ?old_len, ?tip, "no utxo requests needed pruning");
952            }
953        }
954
955        poll
956    }
957
958    #[instrument(name = "state", skip(self, req))]
959    fn call(&mut self, req: Request) -> Self::Future {
960        req.count_metric();
961        let timer = CodeTimer::start();
962        let span = Span::current();
963
964        match req {
965            // Uses non_finalized_state_queued_blocks and pending_utxos in the StateService
966            // Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb.
967            //
968            // The expected error type for this request is `CommitSemanticallyVerifiedError`.
969            Request::CommitSemanticallyVerifiedBlock(semantically_verified) => {
970                self.assert_block_can_be_validated(&semantically_verified);
971
972                self.pending_utxos
973                    .check_against_ordered(&semantically_verified.new_outputs);
974
975                // # Performance
976                //
977                // Allow other async tasks to make progress while blocks are being verified
978                // and written to disk. But wait for the blocks to finish committing,
979                // so that `StateService` multi-block queries always observe a consistent state.
980                //
981                // Since each block is spawned into its own task,
982                // there shouldn't be any other code running in the same task,
983                // so we don't need to worry about blocking it:
984                // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
985
986                let rsp_rx = tokio::task::block_in_place(move || {
987                    span.in_scope(|| {
988                        self.queue_and_commit_to_non_finalized_state(semantically_verified)
989                    })
990                });
991
992                // TODO:
993                //   - check for panics in the block write task here,
994                //     as well as in poll_ready()
995
996                // The work is all done, the future just waits on a channel for the result
997                timer.finish(module_path!(), line!(), "CommitSemanticallyVerifiedBlock");
998
999                // Await the channel response, flatten the result, map receive errors to
1000                // `CommitSemanticallyVerifiedError::WriteTaskExited`.
1001                // Then flatten the nested Result and convert any errors to a BoxError.
1002                let span = Span::current();
1003                async move {
1004                    rsp_rx
1005                        .await
1006                        .map_err(|_recv_error| CommitSemanticallyVerifiedError::WriteTaskExited)
1007                        .flatten()
1008                        .map_err(BoxError::from)
1009                        .map(Response::Committed)
1010                }
1011                .instrument(span)
1012                .boxed()
1013            }
1014
1015            // Uses finalized_state_queued_blocks and pending_utxos in the StateService.
1016            // Accesses shared writeable state in the StateService.
1017            Request::CommitCheckpointVerifiedBlock(finalized) => {
1018                // # Consensus
1019                //
1020                // A semantic block verification could have called AwaitUtxo
1021                // before this checkpoint verified block arrived in the state.
1022                // So we need to check for pending UTXO requests sent by running
1023                // semantic block verifications.
1024                //
1025                // This check is redundant for most checkpoint verified blocks,
1026                // because semantic verification can only succeed near the final
1027                // checkpoint, when all the UTXOs are available for the verifying block.
1028                //
1029                // (Checkpoint block UTXOs are verified using block hash checkpoints
1030                // and transaction merkle tree block header commitments.)
1031                self.pending_utxos
1032                    .check_against_ordered(&finalized.new_outputs);
1033
1034                // # Performance
1035                //
1036                // This method doesn't block, access the database, or perform CPU-intensive tasks,
1037                // so we can run it directly in the tokio executor's Future threads.
1038                let rsp_rx = self.queue_and_commit_to_finalized_state(finalized);
1039
1040                // TODO:
1041                //   - check for panics in the block write task here,
1042                //     as well as in poll_ready()
1043
1044                // The work is all done, the future just waits on a channel for the result
1045                timer.finish(module_path!(), line!(), "CommitCheckpointVerifiedBlock");
1046
1047                async move {
1048                    rsp_rx
1049                        .await
1050                        .map_err(|_recv_error| {
1051                            BoxError::from("block was dropped from the queue of finalized blocks")
1052                        })
1053                        // TODO: replace with Result::flatten once it stabilises
1054                        // https://github.com/rust-lang/rust/issues/70142
1055                        .and_then(convert::identity)
1056                        .map(Response::Committed)
1057                }
1058                .instrument(span)
1059                .boxed()
1060            }
1061
1062            // Uses pending_utxos and non_finalized_state_queued_blocks in the StateService.
1063            // If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService.
1064            Request::AwaitUtxo(outpoint) => {
1065                // Prepare the AwaitUtxo future from PendingUxtos.
1066                let response_fut = self.pending_utxos.queue(outpoint);
1067                // Only instrument `response_fut`, the ReadStateService already
1068                // instruments its requests with the same span.
1069
1070                let response_fut = response_fut.instrument(span).boxed();
1071
1072                // Check the non-finalized block queue outside the returned future,
1073                // so we can access mutable state fields.
1074                if let Some(utxo) = self.non_finalized_state_queued_blocks.utxo(&outpoint) {
1075                    self.pending_utxos.respond(&outpoint, utxo);
1076
1077                    // We're finished, the returned future gets the UTXO from the respond() channel.
1078                    timer.finish(module_path!(), line!(), "AwaitUtxo/queued-non-finalized");
1079
1080                    return response_fut;
1081                }
1082
1083                // Check the sent non-finalized blocks
1084                if let Some(utxo) = self.non_finalized_block_write_sent_hashes.utxo(&outpoint) {
1085                    self.pending_utxos.respond(&outpoint, utxo);
1086
1087                    // We're finished, the returned future gets the UTXO from the respond() channel.
1088                    timer.finish(module_path!(), line!(), "AwaitUtxo/sent-non-finalized");
1089
1090                    return response_fut;
1091                }
1092
1093                // We ignore any UTXOs in FinalizedState.finalized_state_queued_blocks,
1094                // because it is only used during checkpoint verification.
1095                //
1096                // This creates a rare race condition, but it doesn't seem to happen much in practice.
1097                // See #5126 for details.
1098
1099                // Manually send a request to the ReadStateService,
1100                // to get UTXOs from any non-finalized chain or the finalized chain.
1101                let read_service = self.read_service.clone();
1102
1103                // Run the request in an async block, so we can await the response.
1104                async move {
1105                    let req = ReadRequest::AnyChainUtxo(outpoint);
1106
1107                    let rsp = read_service.oneshot(req).await?;
1108
1109                    // Optional TODO:
1110                    //  - make pending_utxos.respond() async using a channel,
1111                    //    so we can respond to all waiting requests here
1112                    //
1113                    // This change is not required for correctness, because:
1114                    // - any waiting requests should have returned when the block was sent to the state
1115                    // - otherwise, the request returns immediately if:
1116                    //   - the block is in the non-finalized queue, or
1117                    //   - the block is in any non-finalized chain or the finalized state
1118                    //
1119                    // And if the block is in the finalized queue,
1120                    // that's rare enough that a retry is ok.
1121                    if let ReadResponse::AnyChainUtxo(Some(utxo)) = rsp {
1122                        // We got a UTXO, so we replace the response future with the result own.
1123                        timer.finish(module_path!(), line!(), "AwaitUtxo/any-chain");
1124
1125                        return Ok(Response::Utxo(utxo));
1126                    }
1127
1128                    // We're finished, but the returned future is waiting on the respond() channel.
1129                    timer.finish(module_path!(), line!(), "AwaitUtxo/waiting");
1130
1131                    response_fut.await
1132                }
1133                .boxed()
1134            }
1135
1136            // Used by sync, inbound, and block verifier to check if a block is already in the state
1137            // before downloading or validating it.
1138            Request::KnownBlock(hash) => {
1139                let timer = CodeTimer::start();
1140
1141                let read_service = self.read_service.clone();
1142
1143                async move {
1144                    let response = read::non_finalized_state_contains_block_hash(
1145                        &read_service.latest_non_finalized_state(),
1146                        hash,
1147                    )
1148                    .or_else(|| read::finalized_state_contains_block_hash(&read_service.db, hash));
1149
1150                    // The work is done in the future.
1151                    timer.finish(module_path!(), line!(), "Request::KnownBlock");
1152
1153                    Ok(Response::KnownBlock(response))
1154                }
1155                .boxed()
1156            }
1157
1158            // The expected error type for this request is `InvalidateError`
1159            Request::InvalidateBlock(block_hash) => {
1160                let rsp_rx = tokio::task::block_in_place(move || {
1161                    span.in_scope(|| self.send_invalidate_block(block_hash))
1162                });
1163
1164                // Await the channel response, flatten the result, map receive errors to
1165                // `InvalidateError::InvalidateRequestDropped`.
1166                // Then flatten the nested Result and convert any errors to a BoxError.
1167                let span = Span::current();
1168                async move {
1169                    rsp_rx
1170                        .await
1171                        .map_err(|_recv_error| InvalidateError::InvalidateRequestDropped)
1172                        .flatten()
1173                        .map_err(BoxError::from)
1174                        .map(Response::Invalidated)
1175                }
1176                .instrument(span)
1177                .boxed()
1178            }
1179
1180            // The expected error type for this request is `ReconsiderError`
1181            Request::ReconsiderBlock(block_hash) => {
1182                let rsp_rx = tokio::task::block_in_place(move || {
1183                    span.in_scope(|| self.send_reconsider_block(block_hash))
1184                });
1185
1186                // Await the channel response, flatten the result, map receive errors to
1187                // `ReconsiderError::ReconsiderResponseDropped`.
1188                // Then flatten the nested Result and convert any errors to a BoxError.
1189                let span = Span::current();
1190                async move {
1191                    rsp_rx
1192                        .await
1193                        .map_err(|_recv_error| ReconsiderError::ReconsiderResponseDropped)
1194                        .flatten()
1195                        .map_err(BoxError::from)
1196                        .map(Response::Reconsidered)
1197                }
1198                .instrument(span)
1199                .boxed()
1200            }
1201
1202            // Runs concurrently using the ReadStateService
1203            Request::Tip
1204            | Request::Depth(_)
1205            | Request::BestChainNextMedianTimePast
1206            | Request::BestChainBlockHash(_)
1207            | Request::BlockLocator
1208            | Request::Transaction(_)
1209            | Request::AnyChainTransaction(_)
1210            | Request::UnspentBestChainUtxo(_)
1211            | Request::Block(_)
1212            | Request::BlockAndSize(_)
1213            | Request::BlockHeader(_)
1214            | Request::FindBlockHashes { .. }
1215            | Request::FindBlockHeaders { .. }
1216            | Request::CheckBestChainTipNullifiersAndAnchors(_) => {
1217                // Redirect the request to the concurrent ReadStateService
1218                let read_service = self.read_service.clone();
1219
1220                async move {
1221                    let req = req
1222                        .try_into()
1223                        .expect("ReadRequest conversion should not fail");
1224
1225                    let rsp = read_service.oneshot(req).await?;
1226                    let rsp = rsp.try_into().expect("Response conversion should not fail");
1227
1228                    Ok(rsp)
1229                }
1230                .boxed()
1231            }
1232
1233            Request::CheckBlockProposalValidity(_) => {
1234                // Redirect the request to the concurrent ReadStateService
1235                let read_service = self.read_service.clone();
1236
1237                async move {
1238                    let req = req
1239                        .try_into()
1240                        .expect("ReadRequest conversion should not fail");
1241
1242                    let rsp = read_service.oneshot(req).await?;
1243                    let rsp = rsp.try_into().expect("Response conversion should not fail");
1244
1245                    Ok(rsp)
1246                }
1247                .boxed()
1248            }
1249        }
1250    }
1251}
1252
1253impl Service<ReadRequest> for ReadStateService {
1254    type Response = ReadResponse;
1255    type Error = BoxError;
1256    type Future =
1257        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1258
1259    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1260        // Check for panics in the block write task
1261        //
1262        // TODO: move into a check_for_panics() method
1263        let block_write_task = self.block_write_task.take();
1264
1265        if let Some(block_write_task) = block_write_task {
1266            if block_write_task.is_finished() {
1267                if let Some(block_write_task) = Arc::into_inner(block_write_task) {
1268                    // We are the last state with a reference to this task, so we can propagate any panics
1269                    if let Err(thread_panic) = block_write_task.join() {
1270                        std::panic::resume_unwind(thread_panic);
1271                    }
1272                }
1273            } else {
1274                // It hasn't finished, so we need to put it back
1275                self.block_write_task = Some(block_write_task);
1276            }
1277        }
1278
1279        self.db.check_for_panics();
1280
1281        Poll::Ready(Ok(()))
1282    }
1283
1284    #[instrument(name = "read_state", skip(self, req))]
1285    fn call(&mut self, req: ReadRequest) -> Self::Future {
1286        req.count_metric();
1287        let timer = CodeTimer::start();
1288        let span = Span::current();
1289
1290        match req {
1291            // Used by the `getblockchaininfo` RPC.
1292            ReadRequest::UsageInfo => {
1293                let db = self.db.clone();
1294
1295                tokio::task::spawn_blocking(move || {
1296                    span.in_scope(move || {
1297                        // The work is done in the future.
1298
1299                        let db_size = db.size();
1300
1301                        timer.finish(module_path!(), line!(), "ReadRequest::UsageInfo");
1302
1303                        Ok(ReadResponse::UsageInfo(db_size))
1304                    })
1305                })
1306                .wait_for_panics()
1307            }
1308
1309            // Used by the StateService.
1310            ReadRequest::Tip => {
1311                let state = self.clone();
1312
1313                tokio::task::spawn_blocking(move || {
1314                    span.in_scope(move || {
1315                        let tip = state.non_finalized_state_receiver.with_watch_data(
1316                            |non_finalized_state| {
1317                                read::tip(non_finalized_state.best_chain(), &state.db)
1318                            },
1319                        );
1320
1321                        // The work is done in the future.
1322                        timer.finish(module_path!(), line!(), "ReadRequest::Tip");
1323
1324                        Ok(ReadResponse::Tip(tip))
1325                    })
1326                })
1327                .wait_for_panics()
1328            }
1329
1330            // Used by `getblockchaininfo` RPC method.
1331            ReadRequest::TipPoolValues => {
1332                let state = self.clone();
1333
1334                tokio::task::spawn_blocking(move || {
1335                    span.in_scope(move || {
1336                        let tip_with_value_balance = state
1337                            .non_finalized_state_receiver
1338                            .with_watch_data(|non_finalized_state| {
1339                                read::tip_with_value_balance(
1340                                    non_finalized_state.best_chain(),
1341                                    &state.db,
1342                                )
1343                            });
1344
1345                        // The work is done in the future.
1346                        // TODO: Do this in the Drop impl with the variant name?
1347                        timer.finish(module_path!(), line!(), "ReadRequest::TipPoolValues");
1348
1349                        let (tip_height, tip_hash, value_balance) = tip_with_value_balance?
1350                            .ok_or(BoxError::from("no chain tip available yet"))?;
1351
1352                        Ok(ReadResponse::TipPoolValues {
1353                            tip_height,
1354                            tip_hash,
1355                            value_balance,
1356                        })
1357                    })
1358                })
1359                .wait_for_panics()
1360            }
1361
1362            // Used by getblock
1363            ReadRequest::BlockInfo(hash_or_height) => {
1364                let state = self.clone();
1365
1366                tokio::task::spawn_blocking(move || {
1367                    span.in_scope(move || {
1368                        let value_balance = state.non_finalized_state_receiver.with_watch_data(
1369                            |non_finalized_state| {
1370                                read::block_info(
1371                                    non_finalized_state.best_chain(),
1372                                    &state.db,
1373                                    hash_or_height,
1374                                )
1375                            },
1376                        );
1377
1378                        // The work is done in the future.
1379                        // TODO: Do this in the Drop impl with the variant name?
1380                        timer.finish(module_path!(), line!(), "ReadRequest::BlockInfo");
1381
1382                        Ok(ReadResponse::BlockInfo(value_balance))
1383                    })
1384                })
1385                .wait_for_panics()
1386            }
1387
1388            // Used by the StateService.
1389            ReadRequest::Depth(hash) => {
1390                let state = self.clone();
1391
1392                tokio::task::spawn_blocking(move || {
1393                    span.in_scope(move || {
1394                        let depth = state.non_finalized_state_receiver.with_watch_data(
1395                            |non_finalized_state| {
1396                                read::depth(non_finalized_state.best_chain(), &state.db, hash)
1397                            },
1398                        );
1399
1400                        // The work is done in the future.
1401                        timer.finish(module_path!(), line!(), "ReadRequest::Depth");
1402
1403                        Ok(ReadResponse::Depth(depth))
1404                    })
1405                })
1406                .wait_for_panics()
1407            }
1408
1409            // Used by the StateService.
1410            ReadRequest::BestChainNextMedianTimePast => {
1411                let state = self.clone();
1412
1413                tokio::task::spawn_blocking(move || {
1414                    span.in_scope(move || {
1415                        let non_finalized_state = state.latest_non_finalized_state();
1416                        let median_time_past =
1417                            read::next_median_time_past(&non_finalized_state, &state.db);
1418
1419                        // The work is done in the future.
1420                        timer.finish(
1421                            module_path!(),
1422                            line!(),
1423                            "ReadRequest::BestChainNextMedianTimePast",
1424                        );
1425
1426                        Ok(ReadResponse::BestChainNextMedianTimePast(median_time_past?))
1427                    })
1428                })
1429                .wait_for_panics()
1430            }
1431
1432            // Used by the get_block (raw) RPC and the StateService.
1433            ReadRequest::Block(hash_or_height) => {
1434                let state = self.clone();
1435
1436                tokio::task::spawn_blocking(move || {
1437                    span.in_scope(move || {
1438                        let block = state.non_finalized_state_receiver.with_watch_data(
1439                            |non_finalized_state| {
1440                                read::block(
1441                                    non_finalized_state.best_chain(),
1442                                    &state.db,
1443                                    hash_or_height,
1444                                )
1445                            },
1446                        );
1447
1448                        // The work is done in the future.
1449                        timer.finish(module_path!(), line!(), "ReadRequest::Block");
1450
1451                        Ok(ReadResponse::Block(block))
1452                    })
1453                })
1454                .wait_for_panics()
1455            }
1456
1457            // Used by the get_block (raw) RPC and the StateService.
1458            ReadRequest::BlockAndSize(hash_or_height) => {
1459                let state = self.clone();
1460
1461                tokio::task::spawn_blocking(move || {
1462                    span.in_scope(move || {
1463                        let block_and_size = state.non_finalized_state_receiver.with_watch_data(
1464                            |non_finalized_state| {
1465                                read::block_and_size(
1466                                    non_finalized_state.best_chain(),
1467                                    &state.db,
1468                                    hash_or_height,
1469                                )
1470                            },
1471                        );
1472
1473                        // The work is done in the future.
1474                        timer.finish(module_path!(), line!(), "ReadRequest::BlockAndSize");
1475
1476                        Ok(ReadResponse::BlockAndSize(block_and_size))
1477                    })
1478                })
1479                .wait_for_panics()
1480            }
1481
1482            // Used by the get_block (verbose) RPC and the StateService.
1483            ReadRequest::BlockHeader(hash_or_height) => {
1484                let state = self.clone();
1485
1486                tokio::task::spawn_blocking(move || {
1487                    span.in_scope(move || {
1488                        let best_chain = state.latest_best_chain();
1489
1490                        let height = hash_or_height
1491                            .height_or_else(|hash| {
1492                                read::find::height_by_hash(best_chain.clone(), &state.db, hash)
1493                            })
1494                            .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1495
1496                        let hash = hash_or_height
1497                            .hash_or_else(|height| {
1498                                read::find::hash_by_height(best_chain.clone(), &state.db, height)
1499                            })
1500                            .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1501
1502                        let next_height = height.next()?;
1503                        let next_block_hash =
1504                            read::find::hash_by_height(best_chain.clone(), &state.db, next_height);
1505
1506                        let header = read::block_header(best_chain, &state.db, height.into())
1507                            .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1508
1509                        // The work is done in the future.
1510                        timer.finish(module_path!(), line!(), "ReadRequest::Block");
1511
1512                        Ok(ReadResponse::BlockHeader {
1513                            header,
1514                            hash,
1515                            height,
1516                            next_block_hash,
1517                        })
1518                    })
1519                })
1520                .wait_for_panics()
1521            }
1522
1523            // For the get_raw_transaction RPC and the StateService.
1524            ReadRequest::Transaction(hash) => {
1525                let state = self.clone();
1526
1527                tokio::task::spawn_blocking(move || {
1528                    span.in_scope(move || {
1529                        let response =
1530                            read::mined_transaction(state.latest_best_chain(), &state.db, hash);
1531
1532                        // The work is done in the future.
1533                        timer.finish(module_path!(), line!(), "ReadRequest::Transaction");
1534
1535                        Ok(ReadResponse::Transaction(response))
1536                    })
1537                })
1538                .wait_for_panics()
1539            }
1540
1541            ReadRequest::AnyChainTransaction(hash) => {
1542                let state = self.clone();
1543
1544                tokio::task::spawn_blocking(move || {
1545                    span.in_scope(move || {
1546                        let tx = state.non_finalized_state_receiver.with_watch_data(
1547                            |non_finalized_state| {
1548                                read::any_transaction(
1549                                    non_finalized_state.chain_iter(),
1550                                    &state.db,
1551                                    hash,
1552                                )
1553                            },
1554                        );
1555
1556                        // The work is done in the future.
1557                        timer.finish(module_path!(), line!(), "ReadRequest::AnyChainTransaction");
1558
1559                        Ok(ReadResponse::AnyChainTransaction(tx))
1560                    })
1561                })
1562                .wait_for_panics()
1563            }
1564
1565            // Used by the getblock (verbose) RPC.
1566            ReadRequest::TransactionIdsForBlock(hash_or_height) => {
1567                let state = self.clone();
1568
1569                tokio::task::spawn_blocking(move || {
1570                    span.in_scope(move || {
1571                        let transaction_ids = state.non_finalized_state_receiver.with_watch_data(
1572                            |non_finalized_state| {
1573                                read::transaction_hashes_for_block(
1574                                    non_finalized_state.best_chain(),
1575                                    &state.db,
1576                                    hash_or_height,
1577                                )
1578                            },
1579                        );
1580
1581                        // The work is done in the future.
1582                        timer.finish(
1583                            module_path!(),
1584                            line!(),
1585                            "ReadRequest::TransactionIdsForBlock",
1586                        );
1587
1588                        Ok(ReadResponse::TransactionIdsForBlock(transaction_ids))
1589                    })
1590                })
1591                .wait_for_panics()
1592            }
1593
1594            ReadRequest::AnyChainTransactionIdsForBlock(hash_or_height) => {
1595                let state = self.clone();
1596
1597                tokio::task::spawn_blocking(move || {
1598                    span.in_scope(move || {
1599                        let transaction_ids = state.non_finalized_state_receiver.with_watch_data(
1600                            |non_finalized_state| {
1601                                read::transaction_hashes_for_any_block(
1602                                    non_finalized_state.chain_iter(),
1603                                    &state.db,
1604                                    hash_or_height,
1605                                )
1606                            },
1607                        );
1608
1609                        // The work is done in the future.
1610                        timer.finish(
1611                            module_path!(),
1612                            line!(),
1613                            "ReadRequest::AnyChainTransactionIdsForBlock",
1614                        );
1615
1616                        Ok(ReadResponse::AnyChainTransactionIdsForBlock(
1617                            transaction_ids,
1618                        ))
1619                    })
1620                })
1621                .wait_for_panics()
1622            }
1623
1624            #[cfg(feature = "indexer")]
1625            ReadRequest::SpendingTransactionId(spend) => {
1626                let state = self.clone();
1627
1628                tokio::task::spawn_blocking(move || {
1629                    span.in_scope(move || {
1630                        let spending_transaction_id = state
1631                            .non_finalized_state_receiver
1632                            .with_watch_data(|non_finalized_state| {
1633                                read::spending_transaction_hash(
1634                                    non_finalized_state.best_chain(),
1635                                    &state.db,
1636                                    spend,
1637                                )
1638                            });
1639
1640                        // The work is done in the future.
1641                        timer.finish(
1642                            module_path!(),
1643                            line!(),
1644                            "ReadRequest::TransactionIdForSpentOutPoint",
1645                        );
1646
1647                        Ok(ReadResponse::TransactionId(spending_transaction_id))
1648                    })
1649                })
1650                .wait_for_panics()
1651            }
1652
1653            ReadRequest::UnspentBestChainUtxo(outpoint) => {
1654                let state = self.clone();
1655
1656                tokio::task::spawn_blocking(move || {
1657                    span.in_scope(move || {
1658                        let utxo = state.non_finalized_state_receiver.with_watch_data(
1659                            |non_finalized_state| {
1660                                read::unspent_utxo(
1661                                    non_finalized_state.best_chain(),
1662                                    &state.db,
1663                                    outpoint,
1664                                )
1665                            },
1666                        );
1667
1668                        // The work is done in the future.
1669                        timer.finish(module_path!(), line!(), "ReadRequest::UnspentBestChainUtxo");
1670
1671                        Ok(ReadResponse::UnspentBestChainUtxo(utxo))
1672                    })
1673                })
1674                .wait_for_panics()
1675            }
1676
1677            // Manually used by the StateService to implement part of AwaitUtxo.
1678            ReadRequest::AnyChainUtxo(outpoint) => {
1679                let state = self.clone();
1680
1681                tokio::task::spawn_blocking(move || {
1682                    span.in_scope(move || {
1683                        let utxo = state.non_finalized_state_receiver.with_watch_data(
1684                            |non_finalized_state| {
1685                                read::any_utxo(non_finalized_state, &state.db, outpoint)
1686                            },
1687                        );
1688
1689                        // The work is done in the future.
1690                        timer.finish(module_path!(), line!(), "ReadRequest::AnyChainUtxo");
1691
1692                        Ok(ReadResponse::AnyChainUtxo(utxo))
1693                    })
1694                })
1695                .wait_for_panics()
1696            }
1697
1698            // Used by the StateService.
1699            ReadRequest::BlockLocator => {
1700                let state = self.clone();
1701
1702                tokio::task::spawn_blocking(move || {
1703                    span.in_scope(move || {
1704                        let block_locator = state.non_finalized_state_receiver.with_watch_data(
1705                            |non_finalized_state| {
1706                                read::block_locator(non_finalized_state.best_chain(), &state.db)
1707                            },
1708                        );
1709
1710                        // The work is done in the future.
1711                        timer.finish(module_path!(), line!(), "ReadRequest::BlockLocator");
1712
1713                        Ok(ReadResponse::BlockLocator(
1714                            block_locator.unwrap_or_default(),
1715                        ))
1716                    })
1717                })
1718                .wait_for_panics()
1719            }
1720
1721            // Used by the StateService.
1722            ReadRequest::FindBlockHashes { known_blocks, stop } => {
1723                let state = self.clone();
1724
1725                tokio::task::spawn_blocking(move || {
1726                    span.in_scope(move || {
1727                        let block_hashes = state.non_finalized_state_receiver.with_watch_data(
1728                            |non_finalized_state| {
1729                                read::find_chain_hashes(
1730                                    non_finalized_state.best_chain(),
1731                                    &state.db,
1732                                    known_blocks,
1733                                    stop,
1734                                    MAX_FIND_BLOCK_HASHES_RESULTS,
1735                                )
1736                            },
1737                        );
1738
1739                        // The work is done in the future.
1740                        timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHashes");
1741
1742                        Ok(ReadResponse::BlockHashes(block_hashes))
1743                    })
1744                })
1745                .wait_for_panics()
1746            }
1747
1748            // Used by the StateService.
1749            ReadRequest::FindBlockHeaders { known_blocks, stop } => {
1750                let state = self.clone();
1751
1752                tokio::task::spawn_blocking(move || {
1753                    span.in_scope(move || {
1754                        let block_headers = state.non_finalized_state_receiver.with_watch_data(
1755                            |non_finalized_state| {
1756                                read::find_chain_headers(
1757                                    non_finalized_state.best_chain(),
1758                                    &state.db,
1759                                    known_blocks,
1760                                    stop,
1761                                    MAX_FIND_BLOCK_HEADERS_RESULTS,
1762                                )
1763                            },
1764                        );
1765
1766                        let block_headers = block_headers
1767                            .into_iter()
1768                            .map(|header| CountedHeader { header })
1769                            .collect();
1770
1771                        // The work is done in the future.
1772                        timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHeaders");
1773
1774                        Ok(ReadResponse::BlockHeaders(block_headers))
1775                    })
1776                })
1777                .wait_for_panics()
1778            }
1779
1780            ReadRequest::SaplingTree(hash_or_height) => {
1781                let state = self.clone();
1782
1783                tokio::task::spawn_blocking(move || {
1784                    span.in_scope(move || {
1785                        let sapling_tree = state.non_finalized_state_receiver.with_watch_data(
1786                            |non_finalized_state| {
1787                                read::sapling_tree(
1788                                    non_finalized_state.best_chain(),
1789                                    &state.db,
1790                                    hash_or_height,
1791                                )
1792                            },
1793                        );
1794
1795                        // The work is done in the future.
1796                        timer.finish(module_path!(), line!(), "ReadRequest::SaplingTree");
1797
1798                        Ok(ReadResponse::SaplingTree(sapling_tree))
1799                    })
1800                })
1801                .wait_for_panics()
1802            }
1803
1804            ReadRequest::OrchardTree(hash_or_height) => {
1805                let state = self.clone();
1806
1807                tokio::task::spawn_blocking(move || {
1808                    span.in_scope(move || {
1809                        let orchard_tree = state.non_finalized_state_receiver.with_watch_data(
1810                            |non_finalized_state| {
1811                                read::orchard_tree(
1812                                    non_finalized_state.best_chain(),
1813                                    &state.db,
1814                                    hash_or_height,
1815                                )
1816                            },
1817                        );
1818
1819                        // The work is done in the future.
1820                        timer.finish(module_path!(), line!(), "ReadRequest::OrchardTree");
1821
1822                        Ok(ReadResponse::OrchardTree(orchard_tree))
1823                    })
1824                })
1825                .wait_for_panics()
1826            }
1827
1828            ReadRequest::SaplingSubtrees { start_index, limit } => {
1829                let state = self.clone();
1830
1831                tokio::task::spawn_blocking(move || {
1832                    span.in_scope(move || {
1833                        let end_index = limit
1834                            .and_then(|limit| start_index.0.checked_add(limit.0))
1835                            .map(NoteCommitmentSubtreeIndex);
1836
1837                        let sapling_subtrees = state.non_finalized_state_receiver.with_watch_data(
1838                            |non_finalized_state| {
1839                                if let Some(end_index) = end_index {
1840                                    read::sapling_subtrees(
1841                                        non_finalized_state.best_chain(),
1842                                        &state.db,
1843                                        start_index..end_index,
1844                                    )
1845                                } else {
1846                                    // If there is no end bound, just return all the trees.
1847                                    // If the end bound would overflow, just returns all the trees, because that's what
1848                                    // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1849                                    // the trees run out.)
1850                                    read::sapling_subtrees(
1851                                        non_finalized_state.best_chain(),
1852                                        &state.db,
1853                                        start_index..,
1854                                    )
1855                                }
1856                            },
1857                        );
1858
1859                        // The work is done in the future.
1860                        timer.finish(module_path!(), line!(), "ReadRequest::SaplingSubtrees");
1861
1862                        Ok(ReadResponse::SaplingSubtrees(sapling_subtrees))
1863                    })
1864                })
1865                .wait_for_panics()
1866            }
1867
1868            ReadRequest::OrchardSubtrees { start_index, limit } => {
1869                let state = self.clone();
1870
1871                tokio::task::spawn_blocking(move || {
1872                    span.in_scope(move || {
1873                        let end_index = limit
1874                            .and_then(|limit| start_index.0.checked_add(limit.0))
1875                            .map(NoteCommitmentSubtreeIndex);
1876
1877                        let orchard_subtrees = state.non_finalized_state_receiver.with_watch_data(
1878                            |non_finalized_state| {
1879                                if let Some(end_index) = end_index {
1880                                    read::orchard_subtrees(
1881                                        non_finalized_state.best_chain(),
1882                                        &state.db,
1883                                        start_index..end_index,
1884                                    )
1885                                } else {
1886                                    // If there is no end bound, just return all the trees.
1887                                    // If the end bound would overflow, just returns all the trees, because that's what
1888                                    // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1889                                    // the trees run out.)
1890                                    read::orchard_subtrees(
1891                                        non_finalized_state.best_chain(),
1892                                        &state.db,
1893                                        start_index..,
1894                                    )
1895                                }
1896                            },
1897                        );
1898
1899                        // The work is done in the future.
1900                        timer.finish(module_path!(), line!(), "ReadRequest::OrchardSubtrees");
1901
1902                        Ok(ReadResponse::OrchardSubtrees(orchard_subtrees))
1903                    })
1904                })
1905                .wait_for_panics()
1906            }
1907
1908            // For the get_address_balance RPC.
1909            ReadRequest::AddressBalance(addresses) => {
1910                let state = self.clone();
1911
1912                tokio::task::spawn_blocking(move || {
1913                    span.in_scope(move || {
1914                        let (balance, received) = state
1915                            .non_finalized_state_receiver
1916                            .with_watch_data(|non_finalized_state| {
1917                                read::transparent_balance(
1918                                    non_finalized_state.best_chain().cloned(),
1919                                    &state.db,
1920                                    addresses,
1921                                )
1922                            })?;
1923
1924                        // The work is done in the future.
1925                        timer.finish(module_path!(), line!(), "ReadRequest::AddressBalance");
1926
1927                        Ok(ReadResponse::AddressBalance { balance, received })
1928                    })
1929                })
1930                .wait_for_panics()
1931            }
1932
1933            // For the get_address_tx_ids RPC.
1934            ReadRequest::TransactionIdsByAddresses {
1935                addresses,
1936                height_range,
1937            } => {
1938                let state = self.clone();
1939
1940                tokio::task::spawn_blocking(move || {
1941                    span.in_scope(move || {
1942                        let tx_ids = state.non_finalized_state_receiver.with_watch_data(
1943                            |non_finalized_state| {
1944                                read::transparent_tx_ids(
1945                                    non_finalized_state.best_chain(),
1946                                    &state.db,
1947                                    addresses,
1948                                    height_range,
1949                                )
1950                            },
1951                        );
1952
1953                        // The work is done in the future.
1954                        timer.finish(
1955                            module_path!(),
1956                            line!(),
1957                            "ReadRequest::TransactionIdsByAddresses",
1958                        );
1959
1960                        tx_ids.map(ReadResponse::AddressesTransactionIds)
1961                    })
1962                })
1963                .wait_for_panics()
1964            }
1965
1966            // For the get_address_utxos RPC.
1967            ReadRequest::UtxosByAddresses(addresses) => {
1968                let state = self.clone();
1969
1970                tokio::task::spawn_blocking(move || {
1971                    span.in_scope(move || {
1972                        let utxos = state.non_finalized_state_receiver.with_watch_data(
1973                            |non_finalized_state| {
1974                                read::address_utxos(
1975                                    &state.network,
1976                                    non_finalized_state.best_chain(),
1977                                    &state.db,
1978                                    addresses,
1979                                )
1980                            },
1981                        );
1982
1983                        // The work is done in the future.
1984                        timer.finish(module_path!(), line!(), "ReadRequest::UtxosByAddresses");
1985
1986                        utxos.map(ReadResponse::AddressUtxos)
1987                    })
1988                })
1989                .wait_for_panics()
1990            }
1991
1992            ReadRequest::CheckBestChainTipNullifiersAndAnchors(unmined_tx) => {
1993                let state = self.clone();
1994
1995                tokio::task::spawn_blocking(move || {
1996                    span.in_scope(move || {
1997                        let latest_non_finalized_best_chain =
1998                            state.latest_non_finalized_state().best_chain().cloned();
1999
2000                        check::nullifier::tx_no_duplicates_in_chain(
2001                            &state.db,
2002                            latest_non_finalized_best_chain.as_ref(),
2003                            &unmined_tx.transaction,
2004                        )?;
2005
2006                        check::anchors::tx_anchors_refer_to_final_treestates(
2007                            &state.db,
2008                            latest_non_finalized_best_chain.as_ref(),
2009                            &unmined_tx,
2010                        )?;
2011
2012                        // The work is done in the future.
2013                        timer.finish(
2014                            module_path!(),
2015                            line!(),
2016                            "ReadRequest::CheckBestChainTipNullifiersAndAnchors",
2017                        );
2018
2019                        Ok(ReadResponse::ValidBestChainTipNullifiersAndAnchors)
2020                    })
2021                })
2022                .wait_for_panics()
2023            }
2024
2025            // Used by the get_block and get_block_hash RPCs.
2026            ReadRequest::BestChainBlockHash(height) => {
2027                let state = self.clone();
2028
2029                // # Performance
2030                //
2031                // Allow other async tasks to make progress while concurrently reading blocks from disk.
2032
2033                tokio::task::spawn_blocking(move || {
2034                    span.in_scope(move || {
2035                        let hash = state.non_finalized_state_receiver.with_watch_data(
2036                            |non_finalized_state| {
2037                                read::hash_by_height(
2038                                    non_finalized_state.best_chain(),
2039                                    &state.db,
2040                                    height,
2041                                )
2042                            },
2043                        );
2044
2045                        // The work is done in the future.
2046                        timer.finish(module_path!(), line!(), "ReadRequest::BestChainBlockHash");
2047
2048                        Ok(ReadResponse::BlockHash(hash))
2049                    })
2050                })
2051                .wait_for_panics()
2052            }
2053
2054            // Used by get_block_template and getblockchaininfo RPCs.
2055            ReadRequest::ChainInfo => {
2056                let state = self.clone();
2057                let latest_non_finalized_state = self.latest_non_finalized_state();
2058
2059                // # Performance
2060                //
2061                // Allow other async tasks to make progress while concurrently reading blocks from disk.
2062
2063                tokio::task::spawn_blocking(move || {
2064                    span.in_scope(move || {
2065                        // # Correctness
2066                        //
2067                        // It is ok to do these lookups using multiple database calls. Finalized state updates
2068                        // can only add overlapping blocks, and block hashes are unique across all chain forks.
2069                        //
2070                        // If there is a large overlap between the non-finalized and finalized states,
2071                        // where the finalized tip is above the non-finalized tip,
2072                        // Zebra is receiving a lot of blocks, or this request has been delayed for a long time.
2073                        //
2074                        // In that case, the `getblocktemplate` RPC will return an error because Zebra
2075                        // is not synced to the tip. That check happens before the RPC makes this request.
2076                        let get_block_template_info =
2077                            read::difficulty::get_block_template_chain_info(
2078                                &latest_non_finalized_state,
2079                                &state.db,
2080                                &state.network,
2081                            );
2082
2083                        // The work is done in the future.
2084                        timer.finish(module_path!(), line!(), "ReadRequest::ChainInfo");
2085
2086                        get_block_template_info.map(ReadResponse::ChainInfo)
2087                    })
2088                })
2089                .wait_for_panics()
2090            }
2091
2092            // Used by getmininginfo, getnetworksolps, and getnetworkhashps RPCs.
2093            ReadRequest::SolutionRate { num_blocks, height } => {
2094                let state = self.clone();
2095
2096                // # Performance
2097                //
2098                // Allow other async tasks to make progress while concurrently reading blocks from disk.
2099
2100                tokio::task::spawn_blocking(move || {
2101                    span.in_scope(move || {
2102                        let latest_non_finalized_state = state.latest_non_finalized_state();
2103                        // # Correctness
2104                        //
2105                        // It is ok to do these lookups using multiple database calls. Finalized state updates
2106                        // can only add overlapping blocks, and block hashes are unique across all chain forks.
2107                        //
2108                        // The worst that can happen here is that the default `start_hash` will be below
2109                        // the chain tip.
2110                        let (tip_height, tip_hash) =
2111                            match read::tip(latest_non_finalized_state.best_chain(), &state.db) {
2112                                Some(tip_hash) => tip_hash,
2113                                None => return Ok(ReadResponse::SolutionRate(None)),
2114                            };
2115
2116                        let start_hash = match height {
2117                            Some(height) if height < tip_height => read::hash_by_height(
2118                                latest_non_finalized_state.best_chain(),
2119                                &state.db,
2120                                height,
2121                            ),
2122                            // use the chain tip hash if height is above it or not provided.
2123                            _ => Some(tip_hash),
2124                        };
2125
2126                        let solution_rate = start_hash.and_then(|start_hash| {
2127                            read::difficulty::solution_rate(
2128                                &latest_non_finalized_state,
2129                                &state.db,
2130                                num_blocks,
2131                                start_hash,
2132                            )
2133                        });
2134
2135                        // The work is done in the future.
2136                        timer.finish(module_path!(), line!(), "ReadRequest::SolutionRate");
2137
2138                        Ok(ReadResponse::SolutionRate(solution_rate))
2139                    })
2140                })
2141                .wait_for_panics()
2142            }
2143
2144            ReadRequest::CheckBlockProposalValidity(semantically_verified) => {
2145                let state = self.clone();
2146
2147                // # Performance
2148                //
2149                // Allow other async tasks to make progress while concurrently reading blocks from disk.
2150
2151                tokio::task::spawn_blocking(move || {
2152                    span.in_scope(move || {
2153                        tracing::debug!("attempting to validate and commit block proposal onto a cloned non-finalized state");
2154                        let mut latest_non_finalized_state = state.latest_non_finalized_state();
2155
2156                        // The previous block of a valid proposal must be on the best chain tip.
2157                        let Some((_best_tip_height, best_tip_hash)) = read::best_tip(&latest_non_finalized_state, &state.db) else {
2158                            return Err("state is empty: wait for Zebra to sync before submitting a proposal".into());
2159                        };
2160
2161                        if semantically_verified.block.header.previous_block_hash != best_tip_hash {
2162                            return Err("proposal is not based on the current best chain tip: previous block hash must be the best chain tip".into());
2163                        }
2164
2165                        // This clone of the non-finalized state is dropped when this closure returns.
2166                        // The non-finalized state that's used in the rest of the state (including finalizing
2167                        // blocks into the db) is not mutated here.
2168                        //
2169                        // TODO: Convert `CommitSemanticallyVerifiedError` to a new `ValidateProposalError`?
2170                        latest_non_finalized_state.disable_metrics();
2171
2172                        write::validate_and_commit_non_finalized(
2173                            &state.db,
2174                            &mut latest_non_finalized_state,
2175                            semantically_verified,
2176                        )?;
2177
2178                        // The work is done in the future.
2179                        timer.finish(
2180                            module_path!(),
2181                            line!(),
2182                            "ReadRequest::CheckBlockProposalValidity",
2183                        );
2184
2185                        Ok(ReadResponse::ValidBlockProposal)
2186                    })
2187                })
2188                .wait_for_panics()
2189            }
2190
2191            ReadRequest::TipBlockSize => {
2192                let state = self.clone();
2193
2194                tokio::task::spawn_blocking(move || {
2195                    span.in_scope(move || {
2196                        // Get the best chain tip height.
2197                        let tip_height = state
2198                            .non_finalized_state_receiver
2199                            .with_watch_data(|non_finalized_state| {
2200                                read::tip_height(non_finalized_state.best_chain(), &state.db)
2201                            })
2202                            .unwrap_or(Height(0));
2203
2204                        // Get the block at the best chain tip height.
2205                        let block = state.non_finalized_state_receiver.with_watch_data(
2206                            |non_finalized_state| {
2207                                read::block(
2208                                    non_finalized_state.best_chain(),
2209                                    &state.db,
2210                                    tip_height.into(),
2211                                )
2212                            },
2213                        );
2214
2215                        // The work is done in the future.
2216                        timer.finish(module_path!(), line!(), "ReadRequest::TipBlockSize");
2217
2218                        // Respond with the length of the obtained block if any.
2219                        match block {
2220                            Some(b) => Ok(ReadResponse::TipBlockSize(Some(
2221                                b.zcash_serialize_to_vec()?.len(),
2222                            ))),
2223                            None => Ok(ReadResponse::TipBlockSize(None)),
2224                        }
2225                    })
2226                })
2227                .wait_for_panics()
2228            }
2229
2230            ReadRequest::NonFinalizedBlocksListener => {
2231                // The non-finalized blocks listener is used to notify the state service
2232                // about new blocks that have been added to the non-finalized state.
2233                let non_finalized_blocks_listener = NonFinalizedBlocksListener::spawn(
2234                    self.network.clone(),
2235                    self.non_finalized_state_receiver.clone(),
2236                );
2237
2238                async move {
2239                    timer.finish(
2240                        module_path!(),
2241                        line!(),
2242                        "ReadRequest::NonFinalizedBlocksListener",
2243                    );
2244
2245                    Ok(ReadResponse::NonFinalizedBlocksListener(
2246                        non_finalized_blocks_listener,
2247                    ))
2248                }
2249                .boxed()
2250            }
2251        }
2252    }
2253}
2254
2255/// Initialize a state service from the provided [`Config`].
2256/// Returns a boxed state service, a read-only state service,
2257/// and receivers for state chain tip updates.
2258///
2259/// Each `network` has its own separate on-disk database.
2260///
2261/// The state uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
2262/// to work out when it is near the final checkpoint.
2263///
2264/// To share access to the state, wrap the returned service in a `Buffer`,
2265/// or clone the returned [`ReadStateService`].
2266///
2267/// It's possible to construct multiple state services in the same application (as
2268/// long as they, e.g., use different storage locations), but doing so is
2269/// probably not what you want.
2270pub async fn init(
2271    config: Config,
2272    network: &Network,
2273    max_checkpoint_height: block::Height,
2274    checkpoint_verify_concurrency_limit: usize,
2275) -> (
2276    BoxService<Request, Response, BoxError>,
2277    ReadStateService,
2278    LatestChainTip,
2279    ChainTipChange,
2280) {
2281    let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
2282        StateService::new(
2283            config,
2284            network,
2285            max_checkpoint_height,
2286            checkpoint_verify_concurrency_limit,
2287        )
2288        .await;
2289
2290    (
2291        BoxService::new(state_service),
2292        read_only_state_service,
2293        latest_chain_tip,
2294        chain_tip_change,
2295    )
2296}
2297
2298/// Initialize a read state service from the provided [`Config`].
2299/// Returns a read-only state service,
2300///
2301/// Each `network` has its own separate on-disk database.
2302///
2303/// To share access to the state, clone the returned [`ReadStateService`].
2304pub fn init_read_only(
2305    config: Config,
2306    network: &Network,
2307) -> (
2308    ReadStateService,
2309    ZebraDb,
2310    tokio::sync::watch::Sender<NonFinalizedState>,
2311) {
2312    let finalized_state = FinalizedState::new_with_debug(
2313        &config,
2314        network,
2315        true,
2316        #[cfg(feature = "elasticsearch")]
2317        false,
2318        true,
2319    );
2320    let (non_finalized_state_sender, non_finalized_state_receiver) =
2321        tokio::sync::watch::channel(NonFinalizedState::new(network));
2322
2323    (
2324        ReadStateService::new(
2325            &finalized_state,
2326            None,
2327            WatchReceiver::new(non_finalized_state_receiver),
2328        ),
2329        finalized_state.db.clone(),
2330        non_finalized_state_sender,
2331    )
2332}
2333
2334/// Calls [`init_read_only`] with the provided [`Config`] and [`Network`] from a blocking task.
2335/// Returns a [`tokio::task::JoinHandle`] with a read state service and chain tip sender.
2336pub fn spawn_init_read_only(
2337    config: Config,
2338    network: &Network,
2339) -> tokio::task::JoinHandle<(
2340    ReadStateService,
2341    ZebraDb,
2342    tokio::sync::watch::Sender<NonFinalizedState>,
2343)> {
2344    let network = network.clone();
2345    tokio::task::spawn_blocking(move || init_read_only(config, &network))
2346}
2347
2348/// Returns a [`StateService`] with an ephemeral [`Config`] and a buffer with a single slot.
2349///
2350/// This can be used to create a state service for testing. See also [`init`].
2351#[cfg(any(test, feature = "proptest-impl"))]
2352pub async fn init_test(
2353    network: &Network,
2354) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
2355    // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
2356    //       if we ever need to test final checkpoint sent UTXO queries
2357    let (state_service, _, _, _) =
2358        StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
2359
2360    Buffer::new(BoxService::new(state_service), 1)
2361}
2362
2363/// Initializes a state service with an ephemeral [`Config`] and a buffer with a single slot,
2364/// then returns the read-write service, read-only service, and tip watch channels.
2365///
2366/// This can be used to create a state service for testing. See also [`init`].
2367#[cfg(any(test, feature = "proptest-impl"))]
2368pub async fn init_test_services(
2369    network: &Network,
2370) -> (
2371    Buffer<BoxService<Request, Response, BoxError>, Request>,
2372    ReadStateService,
2373    LatestChainTip,
2374    ChainTipChange,
2375) {
2376    // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
2377    //       if we ever need to test final checkpoint sent UTXO queries
2378    let (state_service, read_state_service, latest_chain_tip, chain_tip_change) =
2379        StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
2380
2381    let state_service = Buffer::new(BoxService::new(state_service), 1);
2382
2383    (
2384        state_service,
2385        read_state_service,
2386        latest_chain_tip,
2387        chain_tip_change,
2388    )
2389}