Skip to main content

zebra_state/
service.rs

1//! [`tower::Service`]s for Zebra's cached chain state.
2//!
3//! Zebra provides cached state access via two main services:
4//! - [`StateService`]: a read-write service that writes blocks to the state,
5//!   and redirects most read requests to the [`ReadStateService`].
6//! - [`ReadStateService`]: a read-only service that answers from the most
7//!   recent committed block.
8//!
9//! Most users should prefer [`ReadStateService`], unless they need to write blocks to the state.
10//!
11//! Zebra also provides access to the best chain tip via:
12//! - [`LatestChainTip`]: a read-only channel that contains the latest committed
13//!   tip.
14//! - [`ChainTipChange`]: a read-only channel that can asynchronously await
15//!   chain tip changes.
16
17use std::{
18    collections::HashMap,
19    future::Future,
20    pin::Pin,
21    sync::Arc,
22    task::{Context, Poll},
23    time::{Duration, Instant},
24};
25
26use futures::future::FutureExt;
27use tokio::sync::oneshot;
28use tower::{util::BoxService, Service, ServiceExt};
29use tracing::{instrument, Instrument, Span};
30
31#[cfg(any(test, feature = "proptest-impl"))]
32use tower::buffer::Buffer;
33
34use zebra_chain::{
35    block::{self, CountedHeader, HeightDiff},
36    diagnostic::{task::WaitForPanics, CodeTimer},
37    parameters::{Network, NetworkUpgrade},
38    subtree::NoteCommitmentSubtreeIndex,
39};
40
41use zebra_chain::{block::Height, serialization::ZcashSerialize};
42
43use crate::{
44    constants::{
45        MAX_FIND_BLOCK_HASHES_RESULTS, MAX_FIND_BLOCK_HEADERS_RESULTS, MAX_LEGACY_CHAIN_BLOCKS,
46    },
47    error::{CommitBlockError, CommitCheckpointVerifiedError, InvalidateError, ReconsiderError},
48    response::NonFinalizedBlocksListener,
49    service::{
50        block_iter::any_ancestor_blocks,
51        chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip},
52        finalized_state::{FinalizedState, ZebraDb},
53        non_finalized_state::{Chain, NonFinalizedState},
54        pending_utxos::PendingUtxos,
55        queued_blocks::QueuedBlocks,
56        watch_receiver::WatchReceiver,
57    },
58    BoxError, CheckpointVerifiedBlock, CommitSemanticallyVerifiedError, Config, KnownBlock,
59    ReadRequest, ReadResponse, Request, Response, SemanticallyVerifiedBlock,
60};
61
62pub mod block_iter;
63pub mod chain_tip;
64pub mod watch_receiver;
65
66pub mod check;
67
68pub(crate) mod finalized_state;
69pub(crate) mod non_finalized_state;
70mod pending_utxos;
71mod queued_blocks;
72pub(crate) mod read;
73mod traits;
74mod write;
75
76#[cfg(any(test, feature = "proptest-impl"))]
77pub mod arbitrary;
78
79#[cfg(test)]
80mod tests;
81
82pub use finalized_state::{OutputLocation, TransactionIndex, TransactionLocation};
83use write::NonFinalizedWriteMessage;
84
85use self::queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified, SentHashes};
86
87pub use self::traits::{ReadState, State};
88
89/// A read-write service for Zebra's cached blockchain state.
90///
91/// This service modifies and provides access to:
92/// - the non-finalized state: the ~100 most recent blocks.
93///   Zebra allows chain forks in the non-finalized state,
94///   stores it in memory, and re-downloads it when restarted.
95/// - the finalized state: older blocks that have many confirmations.
96///   Zebra stores the single best chain in the finalized state,
97///   and re-loads it from disk when restarted.
98///
99/// Read requests to this service are buffered, then processed concurrently.
100/// Block write requests are buffered, then queued, then processed in order by a separate task.
101///
102/// Most state users can get faster read responses using the [`ReadStateService`],
103/// because its requests do not share a [`tower::buffer::Buffer`] with block write requests.
104///
105/// To quickly get the latest block, use [`LatestChainTip`] or [`ChainTipChange`].
106/// They can read the latest block directly, without queueing any requests.
107#[derive(Debug)]
108pub(crate) struct StateService {
109    // Configuration
110    //
111    /// The configured Zcash network.
112    network: Network,
113
114    /// The height that we start storing UTXOs from finalized blocks.
115    ///
116    /// This height should be lower than the last few checkpoints,
117    /// so the full verifier can verify UTXO spends from those blocks,
118    /// even if they haven't been committed to the finalized state yet.
119    full_verifier_utxo_lookahead: block::Height,
120
121    // Queued Blocks
122    //
123    /// Queued blocks for the [`NonFinalizedState`] that arrived out of order.
124    /// These blocks are awaiting their parent blocks before they can do contextual verification.
125    non_finalized_state_queued_blocks: QueuedBlocks,
126
127    /// Queued blocks for the [`FinalizedState`] that arrived out of order.
128    /// These blocks are awaiting their parent blocks before they can do contextual verification.
129    ///
130    /// Indexed by their parent block hash.
131    finalized_state_queued_blocks: HashMap<block::Hash, QueuedCheckpointVerified>,
132
133    /// Channels to send blocks to the block write task.
134    block_write_sender: write::BlockWriteSender,
135
136    /// The [`block::Hash`] of the most recent block sent on
137    /// `finalized_block_write_sender` or `non_finalized_block_write_sender`.
138    ///
139    /// On startup, this is:
140    /// - the finalized tip, if there are stored blocks, or
141    /// - the genesis block's parent hash, if the database is empty.
142    ///
143    /// If `invalid_block_write_reset_receiver` gets a reset, this is:
144    /// - the hash of the last valid committed block (the parent of the invalid block).
145    finalized_block_write_last_sent_hash: block::Hash,
146
147    /// A set of block hashes that have been sent to the block write task.
148    /// Hashes of blocks below the finalized tip height are periodically pruned.
149    non_finalized_block_write_sent_hashes: SentHashes,
150
151    /// If an invalid block is sent on `finalized_block_write_sender`
152    /// or `non_finalized_block_write_sender`,
153    /// this channel gets the [`block::Hash`] of the valid tip.
154    //
155    // TODO: add tests for finalized and non-finalized resets (#2654)
156    invalid_block_write_reset_receiver: tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
157
158    // Pending UTXO Request Tracking
159    //
160    /// The set of outpoints with pending requests for their associated transparent::Output.
161    pending_utxos: PendingUtxos,
162
163    /// Instant tracking the last time `pending_utxos` was pruned.
164    last_prune: Instant,
165
166    // Updating Concurrently Readable State
167    //
168    /// A cloneable [`ReadStateService`], used to answer concurrent read requests.
169    ///
170    /// TODO: move users of read [`Request`]s to [`ReadStateService`], and remove `read_service`.
171    read_service: ReadStateService,
172
173    // Metrics
174    //
175    /// A metric tracking the maximum height that's currently in `finalized_state_queued_blocks`
176    ///
177    /// Set to `f64::NAN` if `finalized_state_queued_blocks` is empty, because grafana shows NaNs
178    /// as a break in the graph.
179    max_finalized_queue_height: f64,
180}
181
182/// A read-only service for accessing Zebra's cached blockchain state.
183///
184/// This service provides read-only access to:
185/// - the non-finalized state: the ~100 most recent blocks.
186/// - the finalized state: older blocks that have many confirmations.
187///
188/// Requests to this service are processed in parallel,
189/// ignoring any blocks queued by the read-write [`StateService`].
190///
191/// This quick response behavior is better for most state users.
192/// It allows other async tasks to make progress while concurrently reading data from disk.
193#[derive(Clone, Debug)]
194pub struct ReadStateService {
195    // Configuration
196    //
197    /// The configured Zcash network.
198    network: Network,
199
200    // Shared Concurrently Readable State
201    //
202    /// A watch channel with a cached copy of the [`NonFinalizedState`].
203    ///
204    /// This state is only updated between requests,
205    /// so it might include some block data that is also on `disk`.
206    non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
207
208    /// The shared inner on-disk database for the finalized state.
209    ///
210    /// RocksDB allows reads and writes via a shared reference,
211    /// but [`ZebraDb`] doesn't expose any write methods or types.
212    ///
213    /// This chain is updated concurrently with requests,
214    /// so it might include some block data that is also in `best_mem`.
215    db: ZebraDb,
216
217    /// A shared handle to a task that writes blocks to the [`NonFinalizedState`] or [`FinalizedState`],
218    /// once the queues have received all their parent blocks.
219    ///
220    /// Used to check for panics when writing blocks.
221    block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
222}
223
224impl Drop for StateService {
225    fn drop(&mut self) {
226        // The state service owns the state, tasks, and channels,
227        // so dropping it should shut down everything.
228
229        // Close the channels (non-blocking)
230        // This makes the block write thread exit the next time it checks the channels.
231        // We want to do this here so we get any errors or panics from the block write task before it shuts down.
232        self.invalid_block_write_reset_receiver.close();
233
234        std::mem::drop(self.block_write_sender.finalized.take());
235        std::mem::drop(self.block_write_sender.non_finalized.take());
236
237        self.clear_finalized_block_queue(CommitBlockError::WriteTaskExited);
238        self.clear_non_finalized_block_queue(CommitBlockError::WriteTaskExited);
239
240        // Log database metrics before shutting down
241        info!("dropping the state: logging database metrics");
242        self.log_db_metrics();
243
244        // Then drop self.read_service, which checks the block write task for panics,
245        // and tries to shut down the database.
246    }
247}
248
249impl Drop for ReadStateService {
250    fn drop(&mut self) {
251        // The read state service shares the state,
252        // so dropping it should check if we can shut down.
253
254        // TODO: move this into a try_shutdown() method
255        if let Some(block_write_task) = self.block_write_task.take() {
256            if let Some(block_write_task_handle) = Arc::into_inner(block_write_task) {
257                // We're the last database user, so we can tell it to shut down (blocking):
258                // - flushes the database to disk, and
259                // - drops the database, which cleans up any database tasks correctly.
260                self.db.shutdown(true);
261
262                // We are the last state with a reference to this thread, so we can
263                // wait until the block write task finishes, then check for panics (blocking).
264                // (We'd also like to abort the thread, but std::thread::JoinHandle can't do that.)
265
266                // This log is verbose during tests.
267                #[cfg(not(test))]
268                info!("waiting for the block write task to finish");
269                #[cfg(test)]
270                debug!("waiting for the block write task to finish");
271
272                // TODO: move this into a check_for_panics() method
273                if let Err(thread_panic) = block_write_task_handle.join() {
274                    std::panic::resume_unwind(thread_panic);
275                } else {
276                    debug!("shutting down the state because the block write task has finished");
277                }
278            }
279        } else {
280            // Even if we're not the last database user, try shutting it down.
281            //
282            // TODO: rename this to try_shutdown()?
283            self.db.shutdown(false);
284        }
285    }
286}
287
288impl StateService {
289    const PRUNE_INTERVAL: Duration = Duration::from_secs(30);
290
291    /// Creates a new state service for the state `config` and `network`.
292    ///
293    /// Uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
294    /// to work out when it is near the final checkpoint.
295    ///
296    /// Returns the read-write and read-only state services,
297    /// and read-only watch channels for its best chain tip.
298    pub async fn new(
299        config: Config,
300        network: &Network,
301        max_checkpoint_height: block::Height,
302        checkpoint_verify_concurrency_limit: usize,
303    ) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
304        let (finalized_state, finalized_tip, timer) = {
305            let config = config.clone();
306            let network = network.clone();
307            tokio::task::spawn_blocking(move || {
308                let timer = CodeTimer::start();
309                let finalized_state = FinalizedState::new(
310                    &config,
311                    &network,
312                    #[cfg(feature = "elasticsearch")]
313                    true,
314                );
315                timer.finish(module_path!(), line!(), "opening finalized state database");
316
317                let timer = CodeTimer::start();
318                let finalized_tip = finalized_state.db.tip_block();
319
320                (finalized_state, finalized_tip, timer)
321            })
322            .await
323            .expect("failed to join blocking task")
324        };
325
326        // # Correctness
327        //
328        // The state service must set the finalized block write sender to `None`
329        // if there are blocks in the restored non-finalized state that are above
330        // the max checkpoint height so that non-finalized blocks can be written, otherwise,
331        // Zebra will be unable to commit semantically verified blocks, and its chain sync will stall.
332        //
333        // The state service must not set the finalized block write sender to `None` if there
334        // aren't blocks in the restored non-finalized state that are above the max checkpoint height,
335        // otherwise, unless checkpoint sync is disabled in the zebra-consensus configuration,
336        // Zebra will be unable to commit checkpoint verified blocks, and its chain sync will stall.
337        let is_finalized_tip_past_max_checkpoint = if let Some(tip) = &finalized_tip {
338            tip.coinbase_height().expect("valid block must have height") >= max_checkpoint_height
339        } else {
340            false
341        };
342        let (non_finalized_state, non_finalized_state_sender, non_finalized_state_receiver) =
343            NonFinalizedState::new(network)
344                .with_backup(
345                    config.non_finalized_state_backup_dir(network),
346                    &finalized_state.db,
347                    is_finalized_tip_past_max_checkpoint,
348                )
349                .await;
350
351        let non_finalized_block_write_sent_hashes = SentHashes::new(&non_finalized_state);
352        let initial_tip = non_finalized_state
353            .best_tip_block()
354            .map(|cv_block| cv_block.block.clone())
355            .or(finalized_tip)
356            .map(CheckpointVerifiedBlock::from)
357            .map(ChainTipBlock::from);
358
359        tracing::info!(chain_tip = ?initial_tip.as_ref().map(|tip| (tip.hash, tip.height)), "loaded Zebra state cache");
360
361        let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
362            ChainTipSender::new(initial_tip, network);
363
364        let finalized_state_for_writing = finalized_state.clone();
365        let should_use_finalized_block_write_sender = non_finalized_state.is_chain_set_empty();
366        let (block_write_sender, invalid_block_write_reset_receiver, block_write_task) =
367            write::BlockWriteSender::spawn(
368                finalized_state_for_writing,
369                non_finalized_state,
370                chain_tip_sender,
371                non_finalized_state_sender,
372                should_use_finalized_block_write_sender,
373            );
374
375        let read_service = ReadStateService::new(
376            &finalized_state,
377            block_write_task,
378            non_finalized_state_receiver,
379        );
380
381        let full_verifier_utxo_lookahead = max_checkpoint_height
382            - HeightDiff::try_from(checkpoint_verify_concurrency_limit)
383                .expect("fits in HeightDiff");
384        let full_verifier_utxo_lookahead =
385            full_verifier_utxo_lookahead.unwrap_or(block::Height::MIN);
386        let non_finalized_state_queued_blocks = QueuedBlocks::default();
387        let pending_utxos = PendingUtxos::default();
388
389        let finalized_block_write_last_sent_hash =
390            tokio::task::spawn_blocking(move || finalized_state.db.finalized_tip_hash())
391                .await
392                .expect("failed to join blocking task");
393
394        let state = Self {
395            network: network.clone(),
396            full_verifier_utxo_lookahead,
397            non_finalized_state_queued_blocks,
398            finalized_state_queued_blocks: HashMap::new(),
399            block_write_sender,
400            finalized_block_write_last_sent_hash,
401            non_finalized_block_write_sent_hashes,
402            invalid_block_write_reset_receiver,
403            pending_utxos,
404            last_prune: Instant::now(),
405            read_service: read_service.clone(),
406            max_finalized_queue_height: f64::NAN,
407        };
408        timer.finish(module_path!(), line!(), "initializing state service");
409
410        tracing::info!("starting legacy chain check");
411        let timer = CodeTimer::start();
412
413        if let (Some(tip), Some(nu5_activation_height)) = (
414            {
415                let read_state = state.read_service.clone();
416                tokio::task::spawn_blocking(move || read_state.best_tip())
417                    .await
418                    .expect("task should not panic")
419            },
420            NetworkUpgrade::Nu5.activation_height(network),
421        ) {
422            if let Err(error) = check::legacy_chain(
423                nu5_activation_height,
424                any_ancestor_blocks(
425                    &state.read_service.latest_non_finalized_state(),
426                    &state.read_service.db,
427                    tip.1,
428                ),
429                &state.network,
430                MAX_LEGACY_CHAIN_BLOCKS,
431            ) {
432                let legacy_db_path = state.read_service.db.path().to_path_buf();
433                panic!(
434                    "Cached state contains a legacy chain.\n\
435                     An outdated Zebra version did not know about a recent network upgrade,\n\
436                     so it followed a legacy chain using outdated consensus branch rules.\n\
437                     Hint: Delete your database, and restart Zebra to do a full sync.\n\
438                     Database path: {legacy_db_path:?}\n\
439                     Error: {error:?}",
440                );
441            }
442        }
443
444        tracing::info!("cached state consensus branch is valid: no legacy chain found");
445        timer.finish(module_path!(), line!(), "legacy chain check");
446
447        // Spawn a background task to periodically export RocksDB metrics to Prometheus
448        let db_for_metrics = read_service.db.clone();
449        tokio::spawn(async move {
450            let mut interval = tokio::time::interval(Duration::from_secs(30));
451            loop {
452                interval.tick().await;
453                db_for_metrics.export_metrics();
454            }
455        });
456
457        (state, read_service, latest_chain_tip, chain_tip_change)
458    }
459
460    /// Call read only state service to log rocksdb database metrics.
461    pub fn log_db_metrics(&self) {
462        self.read_service.db.print_db_metrics();
463    }
464
465    /// Queue a checkpoint verified block for verification and storage in the finalized state.
466    ///
467    /// Returns a channel receiver that provides the result of the block commit.
468    fn queue_and_commit_to_finalized_state(
469        &mut self,
470        checkpoint_verified: CheckpointVerifiedBlock,
471    ) -> oneshot::Receiver<Result<block::Hash, CommitCheckpointVerifiedError>> {
472        // # Correctness & Performance
473        //
474        // This method must not block, access the database, or perform CPU-intensive tasks,
475        // because it is called directly from the tokio executor's Future threads.
476
477        let queued_prev_hash = checkpoint_verified.block.header.previous_block_hash;
478        let queued_height = checkpoint_verified.height;
479
480        // If we're close to the final checkpoint, make the block's UTXOs available for
481        // semantic block verification, even when it is in the channel.
482        if self.is_close_to_final_checkpoint(queued_height) {
483            self.non_finalized_block_write_sent_hashes
484                .add_finalized(&checkpoint_verified)
485        }
486
487        let (rsp_tx, rsp_rx) = oneshot::channel();
488        let queued = (checkpoint_verified, rsp_tx);
489
490        if self.block_write_sender.finalized.is_some() {
491            // We're still committing checkpoint verified blocks
492            if let Some(duplicate_queued) = self
493                .finalized_state_queued_blocks
494                .insert(queued_prev_hash, queued)
495            {
496                Self::send_checkpoint_verified_block_error(
497                    duplicate_queued,
498                    CommitBlockError::new_duplicate(
499                        Some(queued_prev_hash.into()),
500                        KnownBlock::Queue,
501                    ),
502                );
503            }
504
505            self.drain_finalized_queue_and_commit();
506        } else {
507            // We've finished committing checkpoint verified blocks to the finalized state,
508            // so drop any repeated queued blocks, and return an error.
509            //
510            // TODO: track the latest sent height, and drop any blocks under that height
511            //       every time we send some blocks (like QueuedSemanticallyVerifiedBlocks)
512            Self::send_checkpoint_verified_block_error(
513                queued,
514                CommitBlockError::new_duplicate(None, KnownBlock::Finalized),
515            );
516
517            self.clear_finalized_block_queue(CommitBlockError::new_duplicate(
518                None,
519                KnownBlock::Finalized,
520            ));
521        }
522
523        if self.finalized_state_queued_blocks.is_empty() {
524            self.max_finalized_queue_height = f64::NAN;
525        } else if self.max_finalized_queue_height.is_nan()
526            || self.max_finalized_queue_height < queued_height.0 as f64
527        {
528            // if there are still blocks in the queue, then either:
529            //   - the new block was lower than the old maximum, and there was a gap before it,
530            //     so the maximum is still the same (and we skip this code), or
531            //   - the new block is higher than the old maximum, and there is at least one gap
532            //     between the finalized tip and the new maximum
533            self.max_finalized_queue_height = queued_height.0 as f64;
534        }
535
536        metrics::gauge!("state.checkpoint.queued.max.height").set(self.max_finalized_queue_height);
537        metrics::gauge!("state.checkpoint.queued.block.count")
538            .set(self.finalized_state_queued_blocks.len() as f64);
539
540        rsp_rx
541    }
542
543    /// Finds finalized state queue blocks to be committed to the state in order,
544    /// removes them from the queue, and sends them to the block commit task.
545    ///
546    /// After queueing a finalized block, this method checks whether the newly
547    /// queued block (and any of its descendants) can be committed to the state.
548    ///
549    /// Returns an error if the block commit channel has been closed.
550    pub fn drain_finalized_queue_and_commit(&mut self) {
551        use tokio::sync::mpsc::error::{SendError, TryRecvError};
552
553        // # Correctness & Performance
554        //
555        // This method must not block, access the database, or perform CPU-intensive tasks,
556        // because it is called directly from the tokio executor's Future threads.
557
558        // If a block failed, we need to start again from a valid tip.
559        match self.invalid_block_write_reset_receiver.try_recv() {
560            Ok(reset_tip_hash) => self.finalized_block_write_last_sent_hash = reset_tip_hash,
561            Err(TryRecvError::Disconnected) => {
562                info!("Block commit task closed the block reset channel. Is Zebra shutting down?");
563                return;
564            }
565            // There are no errors, so we can just use the last block hash we sent
566            Err(TryRecvError::Empty) => {}
567        }
568
569        while let Some(queued_block) = self
570            .finalized_state_queued_blocks
571            .remove(&self.finalized_block_write_last_sent_hash)
572        {
573            let last_sent_finalized_block_height = queued_block.0.height;
574
575            self.finalized_block_write_last_sent_hash = queued_block.0.hash;
576
577            // If we've finished sending finalized blocks, ignore any repeated blocks.
578            // (Blocks can be repeated after a syncer reset.)
579            if let Some(finalized_block_write_sender) = &self.block_write_sender.finalized {
580                let send_result = finalized_block_write_sender.send(queued_block);
581
582                // If the receiver is closed, we can't send any more blocks.
583                if let Err(SendError(queued)) = send_result {
584                    // If Zebra is shutting down, drop blocks and return an error.
585                    Self::send_checkpoint_verified_block_error(
586                        queued,
587                        CommitBlockError::WriteTaskExited,
588                    );
589
590                    self.clear_finalized_block_queue(CommitBlockError::WriteTaskExited);
591                } else {
592                    metrics::gauge!("state.checkpoint.sent.block.height")
593                        .set(last_sent_finalized_block_height.0 as f64);
594                };
595            }
596        }
597    }
598
599    /// Drops all finalized state queue blocks, and sends an error on their result channels.
600    fn clear_finalized_block_queue(
601        &mut self,
602        error: impl Into<CommitCheckpointVerifiedError> + Clone,
603    ) {
604        for (_hash, queued) in self.finalized_state_queued_blocks.drain() {
605            Self::send_checkpoint_verified_block_error(queued, error.clone());
606        }
607    }
608
609    /// Send an error on a `QueuedCheckpointVerified` block's result channel, and drop the block
610    fn send_checkpoint_verified_block_error(
611        queued: QueuedCheckpointVerified,
612        error: impl Into<CommitCheckpointVerifiedError>,
613    ) {
614        let (finalized, rsp_tx) = queued;
615
616        // The block sender might have already given up on this block,
617        // so ignore any channel send errors.
618        let _ = rsp_tx.send(Err(error.into()));
619        std::mem::drop(finalized);
620    }
621
622    /// Drops all non-finalized state queue blocks, and sends an error on their result channels.
623    fn clear_non_finalized_block_queue(
624        &mut self,
625        error: impl Into<CommitSemanticallyVerifiedError> + Clone,
626    ) {
627        for (_hash, queued) in self.non_finalized_state_queued_blocks.drain() {
628            Self::send_semantically_verified_block_error(queued, error.clone());
629        }
630    }
631
632    /// Send an error on a `QueuedSemanticallyVerified` block's result channel, and drop the block
633    fn send_semantically_verified_block_error(
634        queued: QueuedSemanticallyVerified,
635        error: impl Into<CommitSemanticallyVerifiedError>,
636    ) {
637        let (finalized, rsp_tx) = queued;
638
639        // The block sender might have already given up on this block,
640        // so ignore any channel send errors.
641        let _ = rsp_tx.send(Err(error.into()));
642        std::mem::drop(finalized);
643    }
644
645    /// Queue a semantically verified block for contextual verification and check if any queued
646    /// blocks are ready to be verified and committed to the state.
647    ///
648    /// This function encodes the logic for [committing non-finalized blocks][1]
649    /// in RFC0005.
650    ///
651    /// [1]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html#committing-non-finalized-blocks
652    #[instrument(level = "debug", skip(self, semantically_verified))]
653    fn queue_and_commit_to_non_finalized_state(
654        &mut self,
655        semantically_verified: SemanticallyVerifiedBlock,
656    ) -> oneshot::Receiver<Result<block::Hash, CommitSemanticallyVerifiedError>> {
657        tracing::debug!(block = %semantically_verified.block, "queueing block for contextual verification");
658        let parent_hash = semantically_verified.block.header.previous_block_hash;
659
660        if self
661            .non_finalized_block_write_sent_hashes
662            .contains(&semantically_verified.hash)
663        {
664            let (rsp_tx, rsp_rx) = oneshot::channel();
665            let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
666                Some(semantically_verified.hash.into()),
667                KnownBlock::WriteChannel,
668            )
669            .into()));
670            return rsp_rx;
671        }
672
673        if self
674            .read_service
675            .db
676            .contains_height(semantically_verified.height)
677        {
678            let (rsp_tx, rsp_rx) = oneshot::channel();
679            let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
680                Some(semantically_verified.height.into()),
681                KnownBlock::Finalized,
682            )
683            .into()));
684            return rsp_rx;
685        }
686
687        // [`Request::CommitSemanticallyVerifiedBlock`] contract: a request to commit a block which
688        // has been queued but not yet committed to the state fails the older request and replaces
689        // it with the newer request.
690        let rsp_rx = if let Some((_, old_rsp_tx)) = self
691            .non_finalized_state_queued_blocks
692            .get_mut(&semantically_verified.hash)
693        {
694            tracing::debug!("replacing older queued request with new request");
695            let (mut rsp_tx, rsp_rx) = oneshot::channel();
696            std::mem::swap(old_rsp_tx, &mut rsp_tx);
697            let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
698                Some(semantically_verified.hash.into()),
699                KnownBlock::Queue,
700            )
701            .into()));
702            rsp_rx
703        } else {
704            let (rsp_tx, rsp_rx) = oneshot::channel();
705            self.non_finalized_state_queued_blocks
706                .queue((semantically_verified, rsp_tx));
707            rsp_rx
708        };
709
710        // We've finished sending checkpoint verified blocks when:
711        // - we've sent the verified block for the last checkpoint, and
712        // - it has been successfully written to disk.
713        //
714        // We detect the last checkpoint by looking for non-finalized blocks
715        // that are a child of the last block we sent.
716        //
717        // TODO: configure the state with the last checkpoint hash instead?
718        if self.block_write_sender.finalized.is_some()
719            && self
720                .non_finalized_state_queued_blocks
721                .has_queued_children(self.finalized_block_write_last_sent_hash)
722            && self.read_service.db.finalized_tip_hash()
723                == self.finalized_block_write_last_sent_hash
724        {
725            // Tell the block write task to stop committing checkpoint verified blocks to the finalized state,
726            // and move on to committing semantically verified blocks to the non-finalized state.
727            std::mem::drop(self.block_write_sender.finalized.take());
728            // Remove any checkpoint-verified block hashes from `non_finalized_block_write_sent_hashes`.
729            self.non_finalized_block_write_sent_hashes = SentHashes::default();
730            // Mark `SentHashes` as usable by the `can_fork_chain_at()` method.
731            self.non_finalized_block_write_sent_hashes
732                .can_fork_chain_at_hashes = true;
733            // Send blocks from non-finalized queue
734            self.send_ready_non_finalized_queued(self.finalized_block_write_last_sent_hash);
735            // We've finished committing checkpoint verified blocks to finalized state, so drop any repeated queued blocks.
736            self.clear_finalized_block_queue(CommitBlockError::new_duplicate(
737                None,
738                KnownBlock::Finalized,
739            ));
740        } else if !self.can_fork_chain_at(&parent_hash) {
741            tracing::trace!("unready to verify, returning early");
742        } else if self.block_write_sender.finalized.is_none() {
743            // Wait until block commit task is ready to write non-finalized blocks before dequeuing them
744            self.send_ready_non_finalized_queued(parent_hash);
745
746            let finalized_tip_height = self.read_service.db.finalized_tip_height().expect(
747                "Finalized state must have at least one block before committing non-finalized state",
748            );
749
750            self.non_finalized_state_queued_blocks
751                .prune_by_height(finalized_tip_height);
752
753            self.non_finalized_block_write_sent_hashes
754                .prune_by_height(finalized_tip_height);
755        }
756
757        rsp_rx
758    }
759
760    /// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks.
761    fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
762        self.non_finalized_block_write_sent_hashes
763            .can_fork_chain_at(hash)
764            || &self.read_service.db.finalized_tip_hash() == hash
765    }
766
767    /// Returns `true` if `queued_height` is near the final checkpoint.
768    ///
769    /// The semantic block verifier needs access to UTXOs from checkpoint verified blocks
770    /// near the final checkpoint, so that it can verify blocks that spend those UTXOs.
771    ///
772    /// If it doesn't have the required UTXOs, some blocks will time out,
773    /// but succeed after a syncer restart.
774    fn is_close_to_final_checkpoint(&self, queued_height: block::Height) -> bool {
775        queued_height >= self.full_verifier_utxo_lookahead
776    }
777
778    /// Sends all queued blocks whose parents have recently arrived starting from `new_parent`
779    /// in breadth-first ordering to the block write task which will attempt to validate and commit them
780    #[tracing::instrument(level = "debug", skip(self, new_parent))]
781    fn send_ready_non_finalized_queued(&mut self, new_parent: block::Hash) {
782        use tokio::sync::mpsc::error::SendError;
783        if let Some(non_finalized_block_write_sender) = &self.block_write_sender.non_finalized {
784            let mut new_parents: Vec<block::Hash> = vec![new_parent];
785
786            while let Some(parent_hash) = new_parents.pop() {
787                let queued_children = self
788                    .non_finalized_state_queued_blocks
789                    .dequeue_children(parent_hash);
790
791                for queued_child in queued_children {
792                    let (SemanticallyVerifiedBlock { hash, .. }, _) = queued_child;
793
794                    self.non_finalized_block_write_sent_hashes
795                        .add(&queued_child.0);
796                    let send_result = non_finalized_block_write_sender.send(queued_child.into());
797
798                    if let Err(SendError(NonFinalizedWriteMessage::Commit(queued))) = send_result {
799                        // If Zebra is shutting down, drop blocks and return an error.
800                        Self::send_semantically_verified_block_error(
801                            queued,
802                            CommitBlockError::WriteTaskExited,
803                        );
804
805                        self.clear_non_finalized_block_queue(CommitBlockError::WriteTaskExited);
806
807                        return;
808                    };
809
810                    new_parents.push(hash);
811                }
812            }
813
814            self.non_finalized_block_write_sent_hashes.finish_batch();
815        };
816    }
817
818    /// Return the tip of the current best chain.
819    pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
820        self.read_service.best_tip()
821    }
822
823    fn send_invalidate_block(
824        &self,
825        hash: block::Hash,
826    ) -> oneshot::Receiver<Result<block::Hash, InvalidateError>> {
827        let (rsp_tx, rsp_rx) = oneshot::channel();
828
829        let Some(sender) = &self.block_write_sender.non_finalized else {
830            let _ = rsp_tx.send(Err(InvalidateError::ProcessingCheckpointedBlocks));
831            return rsp_rx;
832        };
833
834        if let Err(tokio::sync::mpsc::error::SendError(error)) =
835            sender.send(NonFinalizedWriteMessage::Invalidate { hash, rsp_tx })
836        {
837            let NonFinalizedWriteMessage::Invalidate { rsp_tx, .. } = error else {
838                unreachable!("should return the same Invalidate message could not be sent");
839            };
840
841            let _ = rsp_tx.send(Err(InvalidateError::SendInvalidateRequestFailed));
842        }
843
844        rsp_rx
845    }
846
847    fn send_reconsider_block(
848        &self,
849        hash: block::Hash,
850    ) -> oneshot::Receiver<Result<Vec<block::Hash>, ReconsiderError>> {
851        let (rsp_tx, rsp_rx) = oneshot::channel();
852
853        let Some(sender) = &self.block_write_sender.non_finalized else {
854            let _ = rsp_tx.send(Err(ReconsiderError::CheckpointCommitInProgress));
855            return rsp_rx;
856        };
857
858        if let Err(tokio::sync::mpsc::error::SendError(error)) =
859            sender.send(NonFinalizedWriteMessage::Reconsider { hash, rsp_tx })
860        {
861            let NonFinalizedWriteMessage::Reconsider { rsp_tx, .. } = error else {
862                unreachable!("should return the same Reconsider message could not be sent");
863            };
864
865            let _ = rsp_tx.send(Err(ReconsiderError::ReconsiderSendFailed));
866        }
867
868        rsp_rx
869    }
870
871    /// Assert some assumptions about the semantically verified `block` before it is queued.
872    fn assert_block_can_be_validated(&self, block: &SemanticallyVerifiedBlock) {
873        // required by `Request::CommitSemanticallyVerifiedBlock` call
874        assert!(
875            block.height > self.network.mandatory_checkpoint_height(),
876            "invalid semantically verified block height: the canopy checkpoint is mandatory, pre-canopy \
877            blocks, and the canopy activation block, must be committed to the state as finalized \
878            blocks"
879        );
880    }
881
882    fn known_sent_hash(&self, hash: &block::Hash) -> Option<KnownBlock> {
883        self.non_finalized_block_write_sent_hashes
884            .contains(hash)
885            .then_some(KnownBlock::WriteChannel)
886    }
887}
888
889impl ReadStateService {
890    /// Creates a new read-only state service, using the provided finalized state and
891    /// block write task handle.
892    ///
893    /// Returns the newly created service,
894    /// and a watch channel for updating the shared recent non-finalized chain.
895    pub(crate) fn new(
896        finalized_state: &FinalizedState,
897        block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
898        non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
899    ) -> Self {
900        let read_service = Self {
901            network: finalized_state.network(),
902            db: finalized_state.db.clone(),
903            non_finalized_state_receiver,
904            block_write_task,
905        };
906
907        tracing::debug!("created new read-only state service");
908
909        read_service
910    }
911
912    /// Return the tip of the current best chain.
913    pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
914        read::best_tip(&self.latest_non_finalized_state(), &self.db)
915    }
916
917    /// Gets a clone of the latest non-finalized state from the `non_finalized_state_receiver`
918    fn latest_non_finalized_state(&self) -> NonFinalizedState {
919        self.non_finalized_state_receiver.cloned_watch_data()
920    }
921
922    /// Gets a clone of the latest, best non-finalized chain from the `non_finalized_state_receiver`
923    #[allow(dead_code)]
924    fn latest_best_chain(&self) -> Option<Arc<Chain>> {
925        self.latest_non_finalized_state().best_chain().cloned()
926    }
927
928    /// Test-only access to the inner database.
929    /// Can be used to modify the database without doing any consensus checks.
930    #[cfg(any(test, feature = "proptest-impl"))]
931    pub fn db(&self) -> &ZebraDb {
932        &self.db
933    }
934
935    /// Logs rocksdb metrics using the read only state service.
936    pub fn log_db_metrics(&self) {
937        self.db.print_db_metrics();
938    }
939}
940
941impl Service<Request> for StateService {
942    type Response = Response;
943    type Error = BoxError;
944    type Future =
945        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
946
947    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
948        // Check for panics in the block write task
949        let poll = self.read_service.poll_ready(cx);
950
951        // Prune outdated UTXO requests
952        let now = Instant::now();
953
954        if self.last_prune + Self::PRUNE_INTERVAL < now {
955            let tip = self.best_tip();
956            let old_len = self.pending_utxos.len();
957
958            self.pending_utxos.prune();
959            self.last_prune = now;
960
961            let new_len = self.pending_utxos.len();
962            let prune_count = old_len
963                .checked_sub(new_len)
964                .expect("prune does not add any utxo requests");
965            if prune_count > 0 {
966                tracing::debug!(
967                    ?old_len,
968                    ?new_len,
969                    ?prune_count,
970                    ?tip,
971                    "pruned utxo requests"
972                );
973            } else {
974                tracing::debug!(len = ?old_len, ?tip, "no utxo requests needed pruning");
975            }
976        }
977
978        poll
979    }
980
981    #[instrument(name = "state", skip(self, req))]
982    fn call(&mut self, req: Request) -> Self::Future {
983        req.count_metric();
984        let timer = CodeTimer::start();
985        let span = Span::current();
986
987        match req {
988            // Uses non_finalized_state_queued_blocks and pending_utxos in the StateService
989            // Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb.
990            //
991            // The expected error type for this request is `CommitSemanticallyVerifiedError`.
992            Request::CommitSemanticallyVerifiedBlock(semantically_verified) => {
993                self.assert_block_can_be_validated(&semantically_verified);
994
995                self.pending_utxos
996                    .check_against_ordered(&semantically_verified.new_outputs);
997
998                // # Performance
999                //
1000                // Allow other async tasks to make progress while blocks are being verified
1001                // and written to disk. But wait for the blocks to finish committing,
1002                // so that `StateService` multi-block queries always observe a consistent state.
1003                //
1004                // Since each block is spawned into its own task,
1005                // there shouldn't be any other code running in the same task,
1006                // so we don't need to worry about blocking it:
1007                // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
1008
1009                let rsp_rx = tokio::task::block_in_place(move || {
1010                    span.in_scope(|| {
1011                        self.queue_and_commit_to_non_finalized_state(semantically_verified)
1012                    })
1013                });
1014
1015                // TODO:
1016                //   - check for panics in the block write task here,
1017                //     as well as in poll_ready()
1018
1019                // The work is all done, the future just waits on a channel for the result
1020                timer.finish(module_path!(), line!(), "CommitSemanticallyVerifiedBlock");
1021
1022                // Await the channel response, flatten the result, map receive errors to
1023                // `CommitSemanticallyVerifiedError::WriteTaskExited`.
1024                // Then flatten the nested Result and convert any errors to a BoxError.
1025                let span = Span::current();
1026                async move {
1027                    rsp_rx
1028                        .await
1029                        .map_err(|_recv_error| CommitBlockError::WriteTaskExited.into())
1030                        .and_then(|result| result)
1031                        .map_err(BoxError::from)
1032                        .map(Response::Committed)
1033                }
1034                .instrument(span)
1035                .boxed()
1036            }
1037
1038            // Uses finalized_state_queued_blocks and pending_utxos in the StateService.
1039            // Accesses shared writeable state in the StateService.
1040            //
1041            // The expected error type for this request is `CommitCheckpointVerifiedError`.
1042            Request::CommitCheckpointVerifiedBlock(finalized) => {
1043                // # Consensus
1044                //
1045                // A semantic block verification could have called AwaitUtxo
1046                // before this checkpoint verified block arrived in the state.
1047                // So we need to check for pending UTXO requests sent by running
1048                // semantic block verifications.
1049                //
1050                // This check is redundant for most checkpoint verified blocks,
1051                // because semantic verification can only succeed near the final
1052                // checkpoint, when all the UTXOs are available for the verifying block.
1053                //
1054                // (Checkpoint block UTXOs are verified using block hash checkpoints
1055                // and transaction merkle tree block header commitments.)
1056                self.pending_utxos
1057                    .check_against_ordered(&finalized.new_outputs);
1058
1059                // # Performance
1060                //
1061                // This method doesn't block, access the database, or perform CPU-intensive tasks,
1062                // so we can run it directly in the tokio executor's Future threads.
1063                let rsp_rx = self.queue_and_commit_to_finalized_state(finalized);
1064
1065                // TODO:
1066                //   - check for panics in the block write task here,
1067                //     as well as in poll_ready()
1068
1069                // The work is all done, the future just waits on a channel for the result
1070                timer.finish(module_path!(), line!(), "CommitCheckpointVerifiedBlock");
1071
1072                // Await the channel response, flatten the result, map receive errors to
1073                // `CommitCheckpointVerifiedError::WriteTaskExited`.
1074                // Then flatten the nested Result and convert any errors to a BoxError.
1075                async move {
1076                    rsp_rx
1077                        .await
1078                        .map_err(|_recv_error| CommitBlockError::WriteTaskExited.into())
1079                        .and_then(|result| result)
1080                        .map_err(BoxError::from)
1081                        .map(Response::Committed)
1082                }
1083                .instrument(span)
1084                .boxed()
1085            }
1086
1087            // Uses pending_utxos and non_finalized_state_queued_blocks in the StateService.
1088            // If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService.
1089            Request::AwaitUtxo(outpoint) => {
1090                // Prepare the AwaitUtxo future from PendingUxtos.
1091                let response_fut = self.pending_utxos.queue(outpoint);
1092                // Only instrument `response_fut`, the ReadStateService already
1093                // instruments its requests with the same span.
1094
1095                let response_fut = response_fut.instrument(span).boxed();
1096
1097                // Check the non-finalized block queue outside the returned future,
1098                // so we can access mutable state fields.
1099                if let Some(utxo) = self.non_finalized_state_queued_blocks.utxo(&outpoint) {
1100                    self.pending_utxos.respond(&outpoint, utxo);
1101
1102                    // We're finished, the returned future gets the UTXO from the respond() channel.
1103                    timer.finish(module_path!(), line!(), "AwaitUtxo/queued-non-finalized");
1104
1105                    return response_fut;
1106                }
1107
1108                // Check the sent non-finalized blocks
1109                if let Some(utxo) = self.non_finalized_block_write_sent_hashes.utxo(&outpoint) {
1110                    self.pending_utxos.respond(&outpoint, utxo);
1111
1112                    // We're finished, the returned future gets the UTXO from the respond() channel.
1113                    timer.finish(module_path!(), line!(), "AwaitUtxo/sent-non-finalized");
1114
1115                    return response_fut;
1116                }
1117
1118                // We ignore any UTXOs in FinalizedState.finalized_state_queued_blocks,
1119                // because it is only used during checkpoint verification.
1120                //
1121                // This creates a rare race condition, but it doesn't seem to happen much in practice.
1122                // See #5126 for details.
1123
1124                // Manually send a request to the ReadStateService,
1125                // to get UTXOs from any non-finalized chain or the finalized chain.
1126                let read_service = self.read_service.clone();
1127
1128                // Run the request in an async block, so we can await the response.
1129                async move {
1130                    let req = ReadRequest::AnyChainUtxo(outpoint);
1131
1132                    let rsp = read_service.oneshot(req).await?;
1133
1134                    // Optional TODO:
1135                    //  - make pending_utxos.respond() async using a channel,
1136                    //    so we can respond to all waiting requests here
1137                    //
1138                    // This change is not required for correctness, because:
1139                    // - any waiting requests should have returned when the block was sent to the state
1140                    // - otherwise, the request returns immediately if:
1141                    //   - the block is in the non-finalized queue, or
1142                    //   - the block is in any non-finalized chain or the finalized state
1143                    //
1144                    // And if the block is in the finalized queue,
1145                    // that's rare enough that a retry is ok.
1146                    if let ReadResponse::AnyChainUtxo(Some(utxo)) = rsp {
1147                        // We got a UTXO, so we replace the response future with the result own.
1148                        timer.finish(module_path!(), line!(), "AwaitUtxo/any-chain");
1149
1150                        return Ok(Response::Utxo(utxo));
1151                    }
1152
1153                    // We're finished, but the returned future is waiting on the respond() channel.
1154                    timer.finish(module_path!(), line!(), "AwaitUtxo/waiting");
1155
1156                    response_fut.await
1157                }
1158                .boxed()
1159            }
1160
1161            // Used by sync, inbound, and block verifier to check if a block is already in the state
1162            // before downloading or validating it.
1163            Request::KnownBlock(hash) => {
1164                let timer = CodeTimer::start();
1165
1166                let sent_hash_response = self.known_sent_hash(&hash);
1167                let read_service = self.read_service.clone();
1168
1169                async move {
1170                    if sent_hash_response.is_some() {
1171                        return Ok(Response::KnownBlock(sent_hash_response));
1172                    };
1173
1174                    let response = read::non_finalized_state_contains_block_hash(
1175                        &read_service.latest_non_finalized_state(),
1176                        hash,
1177                    )
1178                    .or_else(|| read::finalized_state_contains_block_hash(&read_service.db, hash));
1179
1180                    // The work is done in the future.
1181                    timer.finish(module_path!(), line!(), "Request::KnownBlock");
1182
1183                    Ok(Response::KnownBlock(response))
1184                }
1185                .boxed()
1186            }
1187
1188            // The expected error type for this request is `InvalidateError`
1189            Request::InvalidateBlock(block_hash) => {
1190                let rsp_rx = tokio::task::block_in_place(move || {
1191                    span.in_scope(|| self.send_invalidate_block(block_hash))
1192                });
1193
1194                // Await the channel response, flatten the result, map receive errors to
1195                // `InvalidateError::InvalidateRequestDropped`.
1196                // Then flatten the nested Result and convert any errors to a BoxError.
1197                let span = Span::current();
1198                async move {
1199                    rsp_rx
1200                        .await
1201                        .map_err(|_recv_error| InvalidateError::InvalidateRequestDropped)
1202                        .and_then(|result| result)
1203                        .map_err(BoxError::from)
1204                        .map(Response::Invalidated)
1205                }
1206                .instrument(span)
1207                .boxed()
1208            }
1209
1210            // The expected error type for this request is `ReconsiderError`
1211            Request::ReconsiderBlock(block_hash) => {
1212                let rsp_rx = tokio::task::block_in_place(move || {
1213                    span.in_scope(|| self.send_reconsider_block(block_hash))
1214                });
1215
1216                // Await the channel response, flatten the result, map receive errors to
1217                // `ReconsiderError::ReconsiderResponseDropped`.
1218                // Then flatten the nested Result and convert any errors to a BoxError.
1219                let span = Span::current();
1220                async move {
1221                    rsp_rx
1222                        .await
1223                        .map_err(|_recv_error| ReconsiderError::ReconsiderResponseDropped)
1224                        .and_then(|result| result)
1225                        .map_err(BoxError::from)
1226                        .map(Response::Reconsidered)
1227                }
1228                .instrument(span)
1229                .boxed()
1230            }
1231
1232            // Runs concurrently using the ReadStateService
1233            Request::Tip
1234            | Request::Depth(_)
1235            | Request::BestChainNextMedianTimePast
1236            | Request::BestChainBlockHash(_)
1237            | Request::BlockLocator
1238            | Request::Transaction(_)
1239            | Request::AnyChainTransaction(_)
1240            | Request::UnspentBestChainUtxo(_)
1241            | Request::Block(_)
1242            | Request::BlockAndSize(_)
1243            | Request::BlockHeader(_)
1244            | Request::FindBlockHashes { .. }
1245            | Request::FindBlockHeaders { .. }
1246            | Request::CheckBestChainTipNullifiersAndAnchors(_) => {
1247                // Redirect the request to the concurrent ReadStateService
1248                let read_service = self.read_service.clone();
1249
1250                async move {
1251                    let req = req
1252                        .try_into()
1253                        .expect("ReadRequest conversion should not fail");
1254
1255                    let rsp = read_service.oneshot(req).await?;
1256                    let rsp = rsp.try_into().expect("Response conversion should not fail");
1257
1258                    Ok(rsp)
1259                }
1260                .boxed()
1261            }
1262
1263            Request::CheckBlockProposalValidity(_) => {
1264                // Redirect the request to the concurrent ReadStateService
1265                let read_service = self.read_service.clone();
1266
1267                async move {
1268                    let req = req
1269                        .try_into()
1270                        .expect("ReadRequest conversion should not fail");
1271
1272                    let rsp = read_service.oneshot(req).await?;
1273                    let rsp = rsp.try_into().expect("Response conversion should not fail");
1274
1275                    Ok(rsp)
1276                }
1277                .boxed()
1278            }
1279        }
1280    }
1281}
1282
1283impl Service<ReadRequest> for ReadStateService {
1284    type Response = ReadResponse;
1285    type Error = BoxError;
1286    type Future =
1287        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1288
1289    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1290        // Check for panics in the block write task
1291        //
1292        // TODO: move into a check_for_panics() method
1293        let block_write_task = self.block_write_task.take();
1294
1295        if let Some(block_write_task) = block_write_task {
1296            if block_write_task.is_finished() {
1297                if let Some(block_write_task) = Arc::into_inner(block_write_task) {
1298                    // We are the last state with a reference to this task, so we can propagate any panics
1299                    if let Err(thread_panic) = block_write_task.join() {
1300                        std::panic::resume_unwind(thread_panic);
1301                    }
1302                }
1303            } else {
1304                // It hasn't finished, so we need to put it back
1305                self.block_write_task = Some(block_write_task);
1306            }
1307        }
1308
1309        self.db.check_for_panics();
1310
1311        Poll::Ready(Ok(()))
1312    }
1313
1314    #[instrument(name = "read_state", skip(self, req))]
1315    fn call(&mut self, req: ReadRequest) -> Self::Future {
1316        req.count_metric();
1317        let timer = CodeTimer::start();
1318        let span = Span::current();
1319
1320        match req {
1321            // Used by the `getblockchaininfo` RPC.
1322            ReadRequest::UsageInfo => {
1323                let db = self.db.clone();
1324
1325                tokio::task::spawn_blocking(move || {
1326                    span.in_scope(move || {
1327                        // The work is done in the future.
1328
1329                        let db_size = db.size();
1330
1331                        timer.finish(module_path!(), line!(), "ReadRequest::UsageInfo");
1332
1333                        Ok(ReadResponse::UsageInfo(db_size))
1334                    })
1335                })
1336                .wait_for_panics()
1337            }
1338
1339            // Used by the StateService.
1340            ReadRequest::Tip => {
1341                let state = self.clone();
1342
1343                tokio::task::spawn_blocking(move || {
1344                    span.in_scope(move || {
1345                        let tip = state.non_finalized_state_receiver.with_watch_data(
1346                            |non_finalized_state| {
1347                                read::tip(non_finalized_state.best_chain(), &state.db)
1348                            },
1349                        );
1350
1351                        // The work is done in the future.
1352                        timer.finish(module_path!(), line!(), "ReadRequest::Tip");
1353
1354                        Ok(ReadResponse::Tip(tip))
1355                    })
1356                })
1357                .wait_for_panics()
1358            }
1359
1360            // Used by `getblockchaininfo` RPC method.
1361            ReadRequest::TipPoolValues => {
1362                let state = self.clone();
1363
1364                tokio::task::spawn_blocking(move || {
1365                    span.in_scope(move || {
1366                        let tip_with_value_balance = state
1367                            .non_finalized_state_receiver
1368                            .with_watch_data(|non_finalized_state| {
1369                                read::tip_with_value_balance(
1370                                    non_finalized_state.best_chain(),
1371                                    &state.db,
1372                                )
1373                            });
1374
1375                        // The work is done in the future.
1376                        // TODO: Do this in the Drop impl with the variant name?
1377                        timer.finish(module_path!(), line!(), "ReadRequest::TipPoolValues");
1378
1379                        let (tip_height, tip_hash, value_balance) = tip_with_value_balance?
1380                            .ok_or(BoxError::from("no chain tip available yet"))?;
1381
1382                        Ok(ReadResponse::TipPoolValues {
1383                            tip_height,
1384                            tip_hash,
1385                            value_balance,
1386                        })
1387                    })
1388                })
1389                .wait_for_panics()
1390            }
1391
1392            // Used by getblock
1393            ReadRequest::BlockInfo(hash_or_height) => {
1394                let state = self.clone();
1395
1396                tokio::task::spawn_blocking(move || {
1397                    span.in_scope(move || {
1398                        let value_balance = state.non_finalized_state_receiver.with_watch_data(
1399                            |non_finalized_state| {
1400                                read::block_info(
1401                                    non_finalized_state.best_chain(),
1402                                    &state.db,
1403                                    hash_or_height,
1404                                )
1405                            },
1406                        );
1407
1408                        // The work is done in the future.
1409                        // TODO: Do this in the Drop impl with the variant name?
1410                        timer.finish(module_path!(), line!(), "ReadRequest::BlockInfo");
1411
1412                        Ok(ReadResponse::BlockInfo(value_balance))
1413                    })
1414                })
1415                .wait_for_panics()
1416            }
1417
1418            // Used by the StateService.
1419            ReadRequest::Depth(hash) => {
1420                let state = self.clone();
1421
1422                tokio::task::spawn_blocking(move || {
1423                    span.in_scope(move || {
1424                        let depth = state.non_finalized_state_receiver.with_watch_data(
1425                            |non_finalized_state| {
1426                                read::depth(non_finalized_state.best_chain(), &state.db, hash)
1427                            },
1428                        );
1429
1430                        // The work is done in the future.
1431                        timer.finish(module_path!(), line!(), "ReadRequest::Depth");
1432
1433                        Ok(ReadResponse::Depth(depth))
1434                    })
1435                })
1436                .wait_for_panics()
1437            }
1438
1439            // Used by the StateService.
1440            ReadRequest::BestChainNextMedianTimePast => {
1441                let state = self.clone();
1442
1443                tokio::task::spawn_blocking(move || {
1444                    span.in_scope(move || {
1445                        let non_finalized_state = state.latest_non_finalized_state();
1446                        let median_time_past =
1447                            read::next_median_time_past(&non_finalized_state, &state.db);
1448
1449                        // The work is done in the future.
1450                        timer.finish(
1451                            module_path!(),
1452                            line!(),
1453                            "ReadRequest::BestChainNextMedianTimePast",
1454                        );
1455
1456                        Ok(ReadResponse::BestChainNextMedianTimePast(median_time_past?))
1457                    })
1458                })
1459                .wait_for_panics()
1460            }
1461
1462            // Used by the get_block (raw) RPC and the StateService.
1463            ReadRequest::Block(hash_or_height) => {
1464                let state = self.clone();
1465
1466                tokio::task::spawn_blocking(move || {
1467                    span.in_scope(move || {
1468                        let block = state.non_finalized_state_receiver.with_watch_data(
1469                            |non_finalized_state| {
1470                                read::block(
1471                                    non_finalized_state.best_chain(),
1472                                    &state.db,
1473                                    hash_or_height,
1474                                )
1475                            },
1476                        );
1477
1478                        // The work is done in the future.
1479                        timer.finish(module_path!(), line!(), "ReadRequest::Block");
1480
1481                        Ok(ReadResponse::Block(block))
1482                    })
1483                })
1484                .wait_for_panics()
1485            }
1486
1487            // Used by the get_block (raw) RPC and the StateService.
1488            ReadRequest::BlockAndSize(hash_or_height) => {
1489                let state = self.clone();
1490
1491                tokio::task::spawn_blocking(move || {
1492                    span.in_scope(move || {
1493                        let block_and_size = state.non_finalized_state_receiver.with_watch_data(
1494                            |non_finalized_state| {
1495                                read::block_and_size(
1496                                    non_finalized_state.best_chain(),
1497                                    &state.db,
1498                                    hash_or_height,
1499                                )
1500                            },
1501                        );
1502
1503                        // The work is done in the future.
1504                        timer.finish(module_path!(), line!(), "ReadRequest::BlockAndSize");
1505
1506                        Ok(ReadResponse::BlockAndSize(block_and_size))
1507                    })
1508                })
1509                .wait_for_panics()
1510            }
1511
1512            // Used by the get_block (verbose) RPC and the StateService.
1513            ReadRequest::BlockHeader(hash_or_height) => {
1514                let state = self.clone();
1515
1516                tokio::task::spawn_blocking(move || {
1517                    span.in_scope(move || {
1518                        let best_chain = state.latest_best_chain();
1519
1520                        let height = hash_or_height
1521                            .height_or_else(|hash| {
1522                                read::find::height_by_hash(best_chain.clone(), &state.db, hash)
1523                            })
1524                            .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1525
1526                        let hash = hash_or_height
1527                            .hash_or_else(|height| {
1528                                read::find::hash_by_height(best_chain.clone(), &state.db, height)
1529                            })
1530                            .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1531
1532                        let next_height = height.next()?;
1533                        let next_block_hash =
1534                            read::find::hash_by_height(best_chain.clone(), &state.db, next_height);
1535
1536                        let header = read::block_header(best_chain, &state.db, height.into())
1537                            .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1538
1539                        // The work is done in the future.
1540                        timer.finish(module_path!(), line!(), "ReadRequest::Block");
1541
1542                        Ok(ReadResponse::BlockHeader {
1543                            header,
1544                            hash,
1545                            height,
1546                            next_block_hash,
1547                        })
1548                    })
1549                })
1550                .wait_for_panics()
1551            }
1552
1553            // For the get_raw_transaction RPC and the StateService.
1554            ReadRequest::Transaction(hash) => {
1555                let state = self.clone();
1556
1557                tokio::task::spawn_blocking(move || {
1558                    span.in_scope(move || {
1559                        let response =
1560                            read::mined_transaction(state.latest_best_chain(), &state.db, hash);
1561
1562                        // The work is done in the future.
1563                        timer.finish(module_path!(), line!(), "ReadRequest::Transaction");
1564
1565                        Ok(ReadResponse::Transaction(response))
1566                    })
1567                })
1568                .wait_for_panics()
1569            }
1570
1571            ReadRequest::AnyChainTransaction(hash) => {
1572                let state = self.clone();
1573
1574                tokio::task::spawn_blocking(move || {
1575                    span.in_scope(move || {
1576                        let tx = state.non_finalized_state_receiver.with_watch_data(
1577                            |non_finalized_state| {
1578                                read::any_transaction(
1579                                    non_finalized_state.chain_iter(),
1580                                    &state.db,
1581                                    hash,
1582                                )
1583                            },
1584                        );
1585
1586                        // The work is done in the future.
1587                        timer.finish(module_path!(), line!(), "ReadRequest::AnyChainTransaction");
1588
1589                        Ok(ReadResponse::AnyChainTransaction(tx))
1590                    })
1591                })
1592                .wait_for_panics()
1593            }
1594
1595            // Used by the getblock (verbose) RPC.
1596            ReadRequest::TransactionIdsForBlock(hash_or_height) => {
1597                let state = self.clone();
1598
1599                tokio::task::spawn_blocking(move || {
1600                    span.in_scope(move || {
1601                        let transaction_ids = state.non_finalized_state_receiver.with_watch_data(
1602                            |non_finalized_state| {
1603                                read::transaction_hashes_for_block(
1604                                    non_finalized_state.best_chain(),
1605                                    &state.db,
1606                                    hash_or_height,
1607                                )
1608                            },
1609                        );
1610
1611                        // The work is done in the future.
1612                        timer.finish(
1613                            module_path!(),
1614                            line!(),
1615                            "ReadRequest::TransactionIdsForBlock",
1616                        );
1617
1618                        Ok(ReadResponse::TransactionIdsForBlock(transaction_ids))
1619                    })
1620                })
1621                .wait_for_panics()
1622            }
1623
1624            ReadRequest::AnyChainTransactionIdsForBlock(hash_or_height) => {
1625                let state = self.clone();
1626
1627                tokio::task::spawn_blocking(move || {
1628                    span.in_scope(move || {
1629                        let transaction_ids = state.non_finalized_state_receiver.with_watch_data(
1630                            |non_finalized_state| {
1631                                read::transaction_hashes_for_any_block(
1632                                    non_finalized_state.chain_iter(),
1633                                    &state.db,
1634                                    hash_or_height,
1635                                )
1636                            },
1637                        );
1638
1639                        // The work is done in the future.
1640                        timer.finish(
1641                            module_path!(),
1642                            line!(),
1643                            "ReadRequest::AnyChainTransactionIdsForBlock",
1644                        );
1645
1646                        Ok(ReadResponse::AnyChainTransactionIdsForBlock(
1647                            transaction_ids,
1648                        ))
1649                    })
1650                })
1651                .wait_for_panics()
1652            }
1653
1654            #[cfg(feature = "indexer")]
1655            ReadRequest::SpendingTransactionId(spend) => {
1656                let state = self.clone();
1657
1658                tokio::task::spawn_blocking(move || {
1659                    span.in_scope(move || {
1660                        let spending_transaction_id = state
1661                            .non_finalized_state_receiver
1662                            .with_watch_data(|non_finalized_state| {
1663                                read::spending_transaction_hash(
1664                                    non_finalized_state.best_chain(),
1665                                    &state.db,
1666                                    spend,
1667                                )
1668                            });
1669
1670                        // The work is done in the future.
1671                        timer.finish(
1672                            module_path!(),
1673                            line!(),
1674                            "ReadRequest::TransactionIdForSpentOutPoint",
1675                        );
1676
1677                        Ok(ReadResponse::TransactionId(spending_transaction_id))
1678                    })
1679                })
1680                .wait_for_panics()
1681            }
1682
1683            ReadRequest::UnspentBestChainUtxo(outpoint) => {
1684                let state = self.clone();
1685
1686                tokio::task::spawn_blocking(move || {
1687                    span.in_scope(move || {
1688                        let utxo = state.non_finalized_state_receiver.with_watch_data(
1689                            |non_finalized_state| {
1690                                read::unspent_utxo(
1691                                    non_finalized_state.best_chain(),
1692                                    &state.db,
1693                                    outpoint,
1694                                )
1695                            },
1696                        );
1697
1698                        // The work is done in the future.
1699                        timer.finish(module_path!(), line!(), "ReadRequest::UnspentBestChainUtxo");
1700
1701                        Ok(ReadResponse::UnspentBestChainUtxo(utxo))
1702                    })
1703                })
1704                .wait_for_panics()
1705            }
1706
1707            // Manually used by the StateService to implement part of AwaitUtxo.
1708            ReadRequest::AnyChainUtxo(outpoint) => {
1709                let state = self.clone();
1710
1711                tokio::task::spawn_blocking(move || {
1712                    span.in_scope(move || {
1713                        let utxo = state.non_finalized_state_receiver.with_watch_data(
1714                            |non_finalized_state| {
1715                                read::any_utxo(non_finalized_state, &state.db, outpoint)
1716                            },
1717                        );
1718
1719                        // The work is done in the future.
1720                        timer.finish(module_path!(), line!(), "ReadRequest::AnyChainUtxo");
1721
1722                        Ok(ReadResponse::AnyChainUtxo(utxo))
1723                    })
1724                })
1725                .wait_for_panics()
1726            }
1727
1728            // Used by the StateService.
1729            ReadRequest::BlockLocator => {
1730                let state = self.clone();
1731
1732                tokio::task::spawn_blocking(move || {
1733                    span.in_scope(move || {
1734                        let block_locator = state.non_finalized_state_receiver.with_watch_data(
1735                            |non_finalized_state| {
1736                                read::block_locator(non_finalized_state.best_chain(), &state.db)
1737                            },
1738                        );
1739
1740                        // The work is done in the future.
1741                        timer.finish(module_path!(), line!(), "ReadRequest::BlockLocator");
1742
1743                        Ok(ReadResponse::BlockLocator(
1744                            block_locator.unwrap_or_default(),
1745                        ))
1746                    })
1747                })
1748                .wait_for_panics()
1749            }
1750
1751            // Used by the StateService.
1752            ReadRequest::FindBlockHashes { known_blocks, stop } => {
1753                let state = self.clone();
1754
1755                tokio::task::spawn_blocking(move || {
1756                    span.in_scope(move || {
1757                        let block_hashes = state.non_finalized_state_receiver.with_watch_data(
1758                            |non_finalized_state| {
1759                                read::find_chain_hashes(
1760                                    non_finalized_state.best_chain(),
1761                                    &state.db,
1762                                    known_blocks,
1763                                    stop,
1764                                    MAX_FIND_BLOCK_HASHES_RESULTS,
1765                                )
1766                            },
1767                        );
1768
1769                        // The work is done in the future.
1770                        timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHashes");
1771
1772                        Ok(ReadResponse::BlockHashes(block_hashes))
1773                    })
1774                })
1775                .wait_for_panics()
1776            }
1777
1778            // Used by the StateService.
1779            ReadRequest::FindBlockHeaders { known_blocks, stop } => {
1780                let state = self.clone();
1781
1782                tokio::task::spawn_blocking(move || {
1783                    span.in_scope(move || {
1784                        let block_headers = state.non_finalized_state_receiver.with_watch_data(
1785                            |non_finalized_state| {
1786                                read::find_chain_headers(
1787                                    non_finalized_state.best_chain(),
1788                                    &state.db,
1789                                    known_blocks,
1790                                    stop,
1791                                    MAX_FIND_BLOCK_HEADERS_RESULTS,
1792                                )
1793                            },
1794                        );
1795
1796                        let block_headers = block_headers
1797                            .into_iter()
1798                            .map(|header| CountedHeader { header })
1799                            .collect();
1800
1801                        // The work is done in the future.
1802                        timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHeaders");
1803
1804                        Ok(ReadResponse::BlockHeaders(block_headers))
1805                    })
1806                })
1807                .wait_for_panics()
1808            }
1809
1810            ReadRequest::SaplingTree(hash_or_height) => {
1811                let state = self.clone();
1812
1813                tokio::task::spawn_blocking(move || {
1814                    span.in_scope(move || {
1815                        let sapling_tree = state.non_finalized_state_receiver.with_watch_data(
1816                            |non_finalized_state| {
1817                                read::sapling_tree(
1818                                    non_finalized_state.best_chain(),
1819                                    &state.db,
1820                                    hash_or_height,
1821                                )
1822                            },
1823                        );
1824
1825                        // The work is done in the future.
1826                        timer.finish(module_path!(), line!(), "ReadRequest::SaplingTree");
1827
1828                        Ok(ReadResponse::SaplingTree(sapling_tree))
1829                    })
1830                })
1831                .wait_for_panics()
1832            }
1833
1834            ReadRequest::OrchardTree(hash_or_height) => {
1835                let state = self.clone();
1836
1837                tokio::task::spawn_blocking(move || {
1838                    span.in_scope(move || {
1839                        let orchard_tree = state.non_finalized_state_receiver.with_watch_data(
1840                            |non_finalized_state| {
1841                                read::orchard_tree(
1842                                    non_finalized_state.best_chain(),
1843                                    &state.db,
1844                                    hash_or_height,
1845                                )
1846                            },
1847                        );
1848
1849                        // The work is done in the future.
1850                        timer.finish(module_path!(), line!(), "ReadRequest::OrchardTree");
1851
1852                        Ok(ReadResponse::OrchardTree(orchard_tree))
1853                    })
1854                })
1855                .wait_for_panics()
1856            }
1857
1858            ReadRequest::SaplingSubtrees { start_index, limit } => {
1859                let state = self.clone();
1860
1861                tokio::task::spawn_blocking(move || {
1862                    span.in_scope(move || {
1863                        let end_index = limit
1864                            .and_then(|limit| start_index.0.checked_add(limit.0))
1865                            .map(NoteCommitmentSubtreeIndex);
1866
1867                        let sapling_subtrees = state.non_finalized_state_receiver.with_watch_data(
1868                            |non_finalized_state| {
1869                                if let Some(end_index) = end_index {
1870                                    read::sapling_subtrees(
1871                                        non_finalized_state.best_chain(),
1872                                        &state.db,
1873                                        start_index..end_index,
1874                                    )
1875                                } else {
1876                                    // If there is no end bound, just return all the trees.
1877                                    // If the end bound would overflow, just returns all the trees, because that's what
1878                                    // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1879                                    // the trees run out.)
1880                                    read::sapling_subtrees(
1881                                        non_finalized_state.best_chain(),
1882                                        &state.db,
1883                                        start_index..,
1884                                    )
1885                                }
1886                            },
1887                        );
1888
1889                        // The work is done in the future.
1890                        timer.finish(module_path!(), line!(), "ReadRequest::SaplingSubtrees");
1891
1892                        Ok(ReadResponse::SaplingSubtrees(sapling_subtrees))
1893                    })
1894                })
1895                .wait_for_panics()
1896            }
1897
1898            ReadRequest::OrchardSubtrees { start_index, limit } => {
1899                let state = self.clone();
1900
1901                tokio::task::spawn_blocking(move || {
1902                    span.in_scope(move || {
1903                        let end_index = limit
1904                            .and_then(|limit| start_index.0.checked_add(limit.0))
1905                            .map(NoteCommitmentSubtreeIndex);
1906
1907                        let orchard_subtrees = state.non_finalized_state_receiver.with_watch_data(
1908                            |non_finalized_state| {
1909                                if let Some(end_index) = end_index {
1910                                    read::orchard_subtrees(
1911                                        non_finalized_state.best_chain(),
1912                                        &state.db,
1913                                        start_index..end_index,
1914                                    )
1915                                } else {
1916                                    // If there is no end bound, just return all the trees.
1917                                    // If the end bound would overflow, just returns all the trees, because that's what
1918                                    // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1919                                    // the trees run out.)
1920                                    read::orchard_subtrees(
1921                                        non_finalized_state.best_chain(),
1922                                        &state.db,
1923                                        start_index..,
1924                                    )
1925                                }
1926                            },
1927                        );
1928
1929                        // The work is done in the future.
1930                        timer.finish(module_path!(), line!(), "ReadRequest::OrchardSubtrees");
1931
1932                        Ok(ReadResponse::OrchardSubtrees(orchard_subtrees))
1933                    })
1934                })
1935                .wait_for_panics()
1936            }
1937
1938            // For the get_address_balance RPC.
1939            ReadRequest::AddressBalance(addresses) => {
1940                let state = self.clone();
1941
1942                tokio::task::spawn_blocking(move || {
1943                    span.in_scope(move || {
1944                        let (balance, received) = state
1945                            .non_finalized_state_receiver
1946                            .with_watch_data(|non_finalized_state| {
1947                                read::transparent_balance(
1948                                    non_finalized_state.best_chain().cloned(),
1949                                    &state.db,
1950                                    addresses,
1951                                )
1952                            })?;
1953
1954                        // The work is done in the future.
1955                        timer.finish(module_path!(), line!(), "ReadRequest::AddressBalance");
1956
1957                        Ok(ReadResponse::AddressBalance { balance, received })
1958                    })
1959                })
1960                .wait_for_panics()
1961            }
1962
1963            // For the get_address_tx_ids RPC.
1964            ReadRequest::TransactionIdsByAddresses {
1965                addresses,
1966                height_range,
1967            } => {
1968                let state = self.clone();
1969
1970                tokio::task::spawn_blocking(move || {
1971                    span.in_scope(move || {
1972                        let tx_ids = state.non_finalized_state_receiver.with_watch_data(
1973                            |non_finalized_state| {
1974                                read::transparent_tx_ids(
1975                                    non_finalized_state.best_chain(),
1976                                    &state.db,
1977                                    addresses,
1978                                    height_range,
1979                                )
1980                            },
1981                        );
1982
1983                        // The work is done in the future.
1984                        timer.finish(
1985                            module_path!(),
1986                            line!(),
1987                            "ReadRequest::TransactionIdsByAddresses",
1988                        );
1989
1990                        tx_ids.map(ReadResponse::AddressesTransactionIds)
1991                    })
1992                })
1993                .wait_for_panics()
1994            }
1995
1996            // For the get_address_utxos RPC.
1997            ReadRequest::UtxosByAddresses(addresses) => {
1998                let state = self.clone();
1999
2000                tokio::task::spawn_blocking(move || {
2001                    span.in_scope(move || {
2002                        let utxos = state.non_finalized_state_receiver.with_watch_data(
2003                            |non_finalized_state| {
2004                                read::address_utxos(
2005                                    &state.network,
2006                                    non_finalized_state.best_chain(),
2007                                    &state.db,
2008                                    addresses,
2009                                )
2010                            },
2011                        );
2012
2013                        // The work is done in the future.
2014                        timer.finish(module_path!(), line!(), "ReadRequest::UtxosByAddresses");
2015
2016                        utxos.map(ReadResponse::AddressUtxos)
2017                    })
2018                })
2019                .wait_for_panics()
2020            }
2021
2022            ReadRequest::CheckBestChainTipNullifiersAndAnchors(unmined_tx) => {
2023                let state = self.clone();
2024
2025                tokio::task::spawn_blocking(move || {
2026                    span.in_scope(move || {
2027                        let latest_non_finalized_best_chain =
2028                            state.latest_non_finalized_state().best_chain().cloned();
2029
2030                        check::nullifier::tx_no_duplicates_in_chain(
2031                            &state.db,
2032                            latest_non_finalized_best_chain.as_ref(),
2033                            &unmined_tx.transaction,
2034                        )?;
2035
2036                        check::anchors::tx_anchors_refer_to_final_treestates(
2037                            &state.db,
2038                            latest_non_finalized_best_chain.as_ref(),
2039                            &unmined_tx,
2040                        )?;
2041
2042                        // The work is done in the future.
2043                        timer.finish(
2044                            module_path!(),
2045                            line!(),
2046                            "ReadRequest::CheckBestChainTipNullifiersAndAnchors",
2047                        );
2048
2049                        Ok(ReadResponse::ValidBestChainTipNullifiersAndAnchors)
2050                    })
2051                })
2052                .wait_for_panics()
2053            }
2054
2055            // Used by the get_block and get_block_hash RPCs.
2056            ReadRequest::BestChainBlockHash(height) => {
2057                let state = self.clone();
2058
2059                // # Performance
2060                //
2061                // Allow other async tasks to make progress while concurrently reading blocks from disk.
2062
2063                tokio::task::spawn_blocking(move || {
2064                    span.in_scope(move || {
2065                        let hash = state.non_finalized_state_receiver.with_watch_data(
2066                            |non_finalized_state| {
2067                                read::hash_by_height(
2068                                    non_finalized_state.best_chain(),
2069                                    &state.db,
2070                                    height,
2071                                )
2072                            },
2073                        );
2074
2075                        // The work is done in the future.
2076                        timer.finish(module_path!(), line!(), "ReadRequest::BestChainBlockHash");
2077
2078                        Ok(ReadResponse::BlockHash(hash))
2079                    })
2080                })
2081                .wait_for_panics()
2082            }
2083
2084            // Used by get_block_template and getblockchaininfo RPCs.
2085            ReadRequest::ChainInfo => {
2086                let state = self.clone();
2087                let latest_non_finalized_state = self.latest_non_finalized_state();
2088
2089                // # Performance
2090                //
2091                // Allow other async tasks to make progress while concurrently reading blocks from disk.
2092
2093                tokio::task::spawn_blocking(move || {
2094                    span.in_scope(move || {
2095                        // # Correctness
2096                        //
2097                        // It is ok to do these lookups using multiple database calls. Finalized state updates
2098                        // can only add overlapping blocks, and block hashes are unique across all chain forks.
2099                        //
2100                        // If there is a large overlap between the non-finalized and finalized states,
2101                        // where the finalized tip is above the non-finalized tip,
2102                        // Zebra is receiving a lot of blocks, or this request has been delayed for a long time.
2103                        //
2104                        // In that case, the `getblocktemplate` RPC will return an error because Zebra
2105                        // is not synced to the tip. That check happens before the RPC makes this request.
2106                        let get_block_template_info =
2107                            read::difficulty::get_block_template_chain_info(
2108                                &latest_non_finalized_state,
2109                                &state.db,
2110                                &state.network,
2111                            );
2112
2113                        // The work is done in the future.
2114                        timer.finish(module_path!(), line!(), "ReadRequest::ChainInfo");
2115
2116                        get_block_template_info.map(ReadResponse::ChainInfo)
2117                    })
2118                })
2119                .wait_for_panics()
2120            }
2121
2122            // Used by getmininginfo, getnetworksolps, and getnetworkhashps RPCs.
2123            ReadRequest::SolutionRate { num_blocks, height } => {
2124                let state = self.clone();
2125
2126                // # Performance
2127                //
2128                // Allow other async tasks to make progress while concurrently reading blocks from disk.
2129
2130                tokio::task::spawn_blocking(move || {
2131                    span.in_scope(move || {
2132                        let latest_non_finalized_state = state.latest_non_finalized_state();
2133                        // # Correctness
2134                        //
2135                        // It is ok to do these lookups using multiple database calls. Finalized state updates
2136                        // can only add overlapping blocks, and block hashes are unique across all chain forks.
2137                        //
2138                        // The worst that can happen here is that the default `start_hash` will be below
2139                        // the chain tip.
2140                        let (tip_height, tip_hash) =
2141                            match read::tip(latest_non_finalized_state.best_chain(), &state.db) {
2142                                Some(tip_hash) => tip_hash,
2143                                None => return Ok(ReadResponse::SolutionRate(None)),
2144                            };
2145
2146                        let start_hash = match height {
2147                            Some(height) if height < tip_height => read::hash_by_height(
2148                                latest_non_finalized_state.best_chain(),
2149                                &state.db,
2150                                height,
2151                            ),
2152                            // use the chain tip hash if height is above it or not provided.
2153                            _ => Some(tip_hash),
2154                        };
2155
2156                        let solution_rate = start_hash.and_then(|start_hash| {
2157                            read::difficulty::solution_rate(
2158                                &latest_non_finalized_state,
2159                                &state.db,
2160                                num_blocks,
2161                                start_hash,
2162                            )
2163                        });
2164
2165                        // The work is done in the future.
2166                        timer.finish(module_path!(), line!(), "ReadRequest::SolutionRate");
2167
2168                        Ok(ReadResponse::SolutionRate(solution_rate))
2169                    })
2170                })
2171                .wait_for_panics()
2172            }
2173
2174            ReadRequest::CheckBlockProposalValidity(semantically_verified) => {
2175                let state = self.clone();
2176
2177                // # Performance
2178                //
2179                // Allow other async tasks to make progress while concurrently reading blocks from disk.
2180
2181                tokio::task::spawn_blocking(move || {
2182                    span.in_scope(move || {
2183                        tracing::debug!("attempting to validate and commit block proposal onto a cloned non-finalized state");
2184                        let mut latest_non_finalized_state = state.latest_non_finalized_state();
2185
2186                        // The previous block of a valid proposal must be on the best chain tip.
2187                        let Some((_best_tip_height, best_tip_hash)) = read::best_tip(&latest_non_finalized_state, &state.db) else {
2188                            return Err("state is empty: wait for Zebra to sync before submitting a proposal".into());
2189                        };
2190
2191                        if semantically_verified.block.header.previous_block_hash != best_tip_hash {
2192                            return Err("proposal is not based on the current best chain tip: previous block hash must be the best chain tip".into());
2193                        }
2194
2195                        // This clone of the non-finalized state is dropped when this closure returns.
2196                        // The non-finalized state that's used in the rest of the state (including finalizing
2197                        // blocks into the db) is not mutated here.
2198                        //
2199                        // TODO: Convert `CommitSemanticallyVerifiedError` to a new `ValidateProposalError`?
2200                        latest_non_finalized_state.disable_metrics();
2201
2202                        write::validate_and_commit_non_finalized(
2203                            &state.db,
2204                            &mut latest_non_finalized_state,
2205                            semantically_verified,
2206                        )?;
2207
2208                        // The work is done in the future.
2209                        timer.finish(
2210                            module_path!(),
2211                            line!(),
2212                            "ReadRequest::CheckBlockProposalValidity",
2213                        );
2214
2215                        Ok(ReadResponse::ValidBlockProposal)
2216                    })
2217                })
2218                .wait_for_panics()
2219            }
2220
2221            ReadRequest::TipBlockSize => {
2222                let state = self.clone();
2223
2224                tokio::task::spawn_blocking(move || {
2225                    span.in_scope(move || {
2226                        // Get the best chain tip height.
2227                        let tip_height = state
2228                            .non_finalized_state_receiver
2229                            .with_watch_data(|non_finalized_state| {
2230                                read::tip_height(non_finalized_state.best_chain(), &state.db)
2231                            })
2232                            .unwrap_or(Height(0));
2233
2234                        // Get the block at the best chain tip height.
2235                        let block = state.non_finalized_state_receiver.with_watch_data(
2236                            |non_finalized_state| {
2237                                read::block(
2238                                    non_finalized_state.best_chain(),
2239                                    &state.db,
2240                                    tip_height.into(),
2241                                )
2242                            },
2243                        );
2244
2245                        // The work is done in the future.
2246                        timer.finish(module_path!(), line!(), "ReadRequest::TipBlockSize");
2247
2248                        // Respond with the length of the obtained block if any.
2249                        match block {
2250                            Some(b) => Ok(ReadResponse::TipBlockSize(Some(
2251                                b.zcash_serialize_to_vec()?.len(),
2252                            ))),
2253                            None => Ok(ReadResponse::TipBlockSize(None)),
2254                        }
2255                    })
2256                })
2257                .wait_for_panics()
2258            }
2259
2260            ReadRequest::NonFinalizedBlocksListener => {
2261                // The non-finalized blocks listener is used to notify the state service
2262                // about new blocks that have been added to the non-finalized state.
2263                let non_finalized_blocks_listener = NonFinalizedBlocksListener::spawn(
2264                    self.network.clone(),
2265                    self.non_finalized_state_receiver.clone(),
2266                );
2267
2268                async move {
2269                    timer.finish(
2270                        module_path!(),
2271                        line!(),
2272                        "ReadRequest::NonFinalizedBlocksListener",
2273                    );
2274
2275                    Ok(ReadResponse::NonFinalizedBlocksListener(
2276                        non_finalized_blocks_listener,
2277                    ))
2278                }
2279                .boxed()
2280            }
2281        }
2282    }
2283}
2284
2285/// Initialize a state service from the provided [`Config`].
2286/// Returns a boxed state service, a read-only state service,
2287/// and receivers for state chain tip updates.
2288///
2289/// Each `network` has its own separate on-disk database.
2290///
2291/// The state uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
2292/// to work out when it is near the final checkpoint.
2293///
2294/// To share access to the state, wrap the returned service in a `Buffer`,
2295/// or clone the returned [`ReadStateService`].
2296///
2297/// It's possible to construct multiple state services in the same application (as
2298/// long as they, e.g., use different storage locations), but doing so is
2299/// probably not what you want.
2300pub async fn init(
2301    config: Config,
2302    network: &Network,
2303    max_checkpoint_height: block::Height,
2304    checkpoint_verify_concurrency_limit: usize,
2305) -> (
2306    BoxService<Request, Response, BoxError>,
2307    ReadStateService,
2308    LatestChainTip,
2309    ChainTipChange,
2310) {
2311    let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
2312        StateService::new(
2313            config,
2314            network,
2315            max_checkpoint_height,
2316            checkpoint_verify_concurrency_limit,
2317        )
2318        .await;
2319
2320    (
2321        BoxService::new(state_service),
2322        read_only_state_service,
2323        latest_chain_tip,
2324        chain_tip_change,
2325    )
2326}
2327
2328/// Initialize a read state service from the provided [`Config`].
2329/// Returns a read-only state service,
2330///
2331/// Each `network` has its own separate on-disk database.
2332///
2333/// To share access to the state, clone the returned [`ReadStateService`].
2334pub fn init_read_only(
2335    config: Config,
2336    network: &Network,
2337) -> (
2338    ReadStateService,
2339    ZebraDb,
2340    tokio::sync::watch::Sender<NonFinalizedState>,
2341) {
2342    let finalized_state = FinalizedState::new_with_debug(
2343        &config,
2344        network,
2345        true,
2346        #[cfg(feature = "elasticsearch")]
2347        false,
2348        true,
2349    );
2350    let (non_finalized_state_sender, non_finalized_state_receiver) =
2351        tokio::sync::watch::channel(NonFinalizedState::new(network));
2352
2353    (
2354        ReadStateService::new(
2355            &finalized_state,
2356            None,
2357            WatchReceiver::new(non_finalized_state_receiver),
2358        ),
2359        finalized_state.db.clone(),
2360        non_finalized_state_sender,
2361    )
2362}
2363
2364/// Calls [`init_read_only`] with the provided [`Config`] and [`Network`] from a blocking task.
2365/// Returns a [`tokio::task::JoinHandle`] with a read state service and chain tip sender.
2366pub fn spawn_init_read_only(
2367    config: Config,
2368    network: &Network,
2369) -> tokio::task::JoinHandle<(
2370    ReadStateService,
2371    ZebraDb,
2372    tokio::sync::watch::Sender<NonFinalizedState>,
2373)> {
2374    let network = network.clone();
2375    tokio::task::spawn_blocking(move || init_read_only(config, &network))
2376}
2377
2378/// Returns a [`StateService`] with an ephemeral [`Config`] and a buffer with a single slot.
2379///
2380/// This can be used to create a state service for testing. See also [`init`].
2381#[cfg(any(test, feature = "proptest-impl"))]
2382pub async fn init_test(
2383    network: &Network,
2384) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
2385    // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
2386    //       if we ever need to test final checkpoint sent UTXO queries
2387    let (state_service, _, _, _) =
2388        StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
2389
2390    Buffer::new(BoxService::new(state_service), 1)
2391}
2392
2393/// Initializes a state service with an ephemeral [`Config`] and a buffer with a single slot,
2394/// then returns the read-write service, read-only service, and tip watch channels.
2395///
2396/// This can be used to create a state service for testing. See also [`init`].
2397#[cfg(any(test, feature = "proptest-impl"))]
2398pub async fn init_test_services(
2399    network: &Network,
2400) -> (
2401    Buffer<BoxService<Request, Response, BoxError>, Request>,
2402    ReadStateService,
2403    LatestChainTip,
2404    ChainTipChange,
2405) {
2406    // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
2407    //       if we ever need to test final checkpoint sent UTXO queries
2408    let (state_service, read_state_service, latest_chain_tip, chain_tip_change) =
2409        StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
2410
2411    let state_service = Buffer::new(BoxService::new(state_service), 1);
2412
2413    (
2414        state_service,
2415        read_state_service,
2416        latest_chain_tip,
2417        chain_tip_change,
2418    )
2419}