Skip to main content

zebra_state/
service.rs

1//! [`tower::Service`]s for Zebra's cached chain state.
2//!
3//! Zebra provides cached state access via two main services:
4//! - [`StateService`]: a read-write service that writes blocks to the state,
5//!   and redirects most read requests to the [`ReadStateService`].
6//! - [`ReadStateService`]: a read-only service that answers from the most
7//!   recent committed block.
8//!
9//! Most users should prefer [`ReadStateService`], unless they need to write blocks to the state.
10//!
11//! Zebra also provides access to the best chain tip via:
12//! - [`LatestChainTip`]: a read-only channel that contains the latest committed
13//!   tip.
14//! - [`ChainTipChange`]: a read-only channel that can asynchronously await
15//!   chain tip changes.
16
17use std::{
18    collections::HashMap,
19    future::Future,
20    pin::Pin,
21    sync::Arc,
22    task::{Context, Poll},
23    time::{Duration, Instant},
24};
25
26use futures::future::FutureExt;
27use tokio::sync::oneshot;
28use tower::{util::BoxService, Service, ServiceExt};
29use tracing::{instrument, Instrument, Span};
30
31#[cfg(any(test, feature = "proptest-impl"))]
32use tower::buffer::Buffer;
33
34use zebra_chain::{
35    block::{self, CountedHeader, HeightDiff},
36    diagnostic::CodeTimer,
37    parameters::{Network, NetworkUpgrade},
38    serialization::ZcashSerialize,
39    subtree::NoteCommitmentSubtreeIndex,
40};
41
42use crate::{
43    constants::{
44        MAX_FIND_BLOCK_HASHES_RESULTS, MAX_FIND_BLOCK_HEADERS_RESULTS, MAX_LEGACY_CHAIN_BLOCKS,
45    },
46    error::{CommitBlockError, CommitCheckpointVerifiedError, InvalidateError, ReconsiderError},
47    request::TimedSpan,
48    response::NonFinalizedBlocksListener,
49    service::{
50        block_iter::any_ancestor_blocks,
51        chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip},
52        finalized_state::{FinalizedState, ZebraDb},
53        non_finalized_state::{Chain, NonFinalizedState},
54        pending_utxos::PendingUtxos,
55        queued_blocks::QueuedBlocks,
56        read::find,
57        watch_receiver::WatchReceiver,
58    },
59    BoxError, CheckpointVerifiedBlock, CommitSemanticallyVerifiedError, Config, KnownBlock,
60    ReadRequest, ReadResponse, Request, Response, SemanticallyVerifiedBlock,
61};
62
63pub mod block_iter;
64pub mod chain_tip;
65pub mod watch_receiver;
66
67pub mod check;
68
69pub(crate) mod finalized_state;
70pub(crate) mod non_finalized_state;
71mod pending_utxos;
72mod queued_blocks;
73pub(crate) mod read;
74mod traits;
75mod write;
76
77#[cfg(any(test, feature = "proptest-impl"))]
78pub mod arbitrary;
79
80#[cfg(test)]
81mod tests;
82
83pub use finalized_state::{OutputLocation, TransactionIndex, TransactionLocation};
84use write::NonFinalizedWriteMessage;
85
86use self::queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified, SentHashes};
87
88pub use self::traits::{ReadState, State};
89
90/// A read-write service for Zebra's cached blockchain state.
91///
92/// This service modifies and provides access to:
93/// - the non-finalized state: the ~100 most recent blocks.
94///   Zebra allows chain forks in the non-finalized state,
95///   stores it in memory, and re-downloads it when restarted.
96/// - the finalized state: older blocks that have many confirmations.
97///   Zebra stores the single best chain in the finalized state,
98///   and re-loads it from disk when restarted.
99///
100/// Read requests to this service are buffered, then processed concurrently.
101/// Block write requests are buffered, then queued, then processed in order by a separate task.
102///
103/// Most state users can get faster read responses using the [`ReadStateService`],
104/// because its requests do not share a [`tower::buffer::Buffer`] with block write requests.
105///
106/// To quickly get the latest block, use [`LatestChainTip`] or [`ChainTipChange`].
107/// They can read the latest block directly, without queueing any requests.
108#[derive(Debug)]
109pub(crate) struct StateService {
110    // Configuration
111    //
112    /// The configured Zcash network.
113    network: Network,
114
115    /// The height that we start storing UTXOs from finalized blocks.
116    ///
117    /// This height should be lower than the last few checkpoints,
118    /// so the full verifier can verify UTXO spends from those blocks,
119    /// even if they haven't been committed to the finalized state yet.
120    full_verifier_utxo_lookahead: block::Height,
121
122    // Queued Blocks
123    //
124    /// Queued blocks for the [`NonFinalizedState`] that arrived out of order.
125    /// These blocks are awaiting their parent blocks before they can do contextual verification.
126    non_finalized_state_queued_blocks: QueuedBlocks,
127
128    /// Queued blocks for the [`FinalizedState`] that arrived out of order.
129    /// These blocks are awaiting their parent blocks before they can do contextual verification.
130    ///
131    /// Indexed by their parent block hash.
132    finalized_state_queued_blocks: HashMap<block::Hash, QueuedCheckpointVerified>,
133
134    /// Channels to send blocks to the block write task.
135    block_write_sender: write::BlockWriteSender,
136
137    /// The [`block::Hash`] of the most recent block sent on
138    /// `finalized_block_write_sender` or `non_finalized_block_write_sender`.
139    ///
140    /// On startup, this is:
141    /// - the finalized tip, if there are stored blocks, or
142    /// - the genesis block's parent hash, if the database is empty.
143    ///
144    /// If `invalid_block_write_reset_receiver` gets a reset, this is:
145    /// - the hash of the last valid committed block (the parent of the invalid block).
146    finalized_block_write_last_sent_hash: block::Hash,
147
148    /// A set of block hashes that have been sent to the block write task.
149    /// Hashes of blocks below the finalized tip height are periodically pruned.
150    non_finalized_block_write_sent_hashes: SentHashes,
151
152    /// If an invalid block is sent on `finalized_block_write_sender`
153    /// or `non_finalized_block_write_sender`,
154    /// this channel gets the [`block::Hash`] of the valid tip.
155    //
156    // TODO: add tests for finalized and non-finalized resets (#2654)
157    invalid_block_write_reset_receiver: tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
158
159    /// Receives the hash of every non-finalized block that the write task
160    /// rejected, so the corresponding entry can be removed from
161    /// `non_finalized_block_write_sent_hashes`.
162    ///
163    /// Without this, a rejected same-hash block locks out a later honest
164    /// re-delivery of a block at the same hash as a "duplicate" until restart
165    /// or reorg.
166    non_finalized_rejected_receiver: tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
167
168    // Pending UTXO Request Tracking
169    //
170    /// The set of outpoints with pending requests for their associated transparent::Output.
171    pending_utxos: PendingUtxos,
172
173    /// Instant tracking the last time `pending_utxos` was pruned.
174    last_prune: Instant,
175
176    // Updating Concurrently Readable State
177    //
178    /// A cloneable [`ReadStateService`], used to answer concurrent read requests.
179    ///
180    /// TODO: move users of read [`Request`]s to [`ReadStateService`], and remove `read_service`.
181    read_service: ReadStateService,
182
183    // Metrics
184    //
185    /// A metric tracking the maximum height that's currently in `finalized_state_queued_blocks`
186    ///
187    /// Set to `f64::NAN` if `finalized_state_queued_blocks` is empty, because grafana shows NaNs
188    /// as a break in the graph.
189    max_finalized_queue_height: f64,
190}
191
192/// A read-only service for accessing Zebra's cached blockchain state.
193///
194/// This service provides read-only access to:
195/// - the non-finalized state: the ~100 most recent blocks.
196/// - the finalized state: older blocks that have many confirmations.
197///
198/// Requests to this service are processed in parallel,
199/// ignoring any blocks queued by the read-write [`StateService`].
200///
201/// This quick response behavior is better for most state users.
202/// It allows other async tasks to make progress while concurrently reading data from disk.
203#[derive(Clone, Debug)]
204pub struct ReadStateService {
205    // Configuration
206    //
207    /// The configured Zcash network.
208    network: Network,
209
210    // Shared Concurrently Readable State
211    //
212    /// A watch channel with a cached copy of the [`NonFinalizedState`].
213    ///
214    /// This state is only updated between requests,
215    /// so it might include some block data that is also on `disk`.
216    non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
217
218    /// The shared inner on-disk database for the finalized state.
219    ///
220    /// RocksDB allows reads and writes via a shared reference,
221    /// but [`ZebraDb`] doesn't expose any write methods or types.
222    ///
223    /// This chain is updated concurrently with requests,
224    /// so it might include some block data that is also in `best_mem`.
225    db: ZebraDb,
226
227    /// A shared handle to a task that writes blocks to the [`NonFinalizedState`] or [`FinalizedState`],
228    /// once the queues have received all their parent blocks.
229    ///
230    /// Used to check for panics when writing blocks.
231    block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
232}
233
234impl Drop for StateService {
235    fn drop(&mut self) {
236        // The state service owns the state, tasks, and channels,
237        // so dropping it should shut down everything.
238
239        // Close the channels (non-blocking)
240        // This makes the block write thread exit the next time it checks the channels.
241        // We want to do this here so we get any errors or panics from the block write task before it shuts down.
242        self.invalid_block_write_reset_receiver.close();
243        self.non_finalized_rejected_receiver.close();
244
245        std::mem::drop(self.block_write_sender.finalized.take());
246        std::mem::drop(self.block_write_sender.non_finalized.take());
247
248        self.clear_finalized_block_queue(CommitBlockError::WriteTaskExited);
249        self.clear_non_finalized_block_queue(CommitBlockError::WriteTaskExited);
250
251        // Log database metrics before shutting down
252        info!("dropping the state: logging database metrics");
253        self.log_db_metrics();
254
255        // Then drop self.read_service, which checks the block write task for panics,
256        // and tries to shut down the database.
257    }
258}
259
260impl Drop for ReadStateService {
261    fn drop(&mut self) {
262        // The read state service shares the state,
263        // so dropping it should check if we can shut down.
264
265        // TODO: move this into a try_shutdown() method
266        if let Some(block_write_task) = self.block_write_task.take() {
267            if let Some(block_write_task_handle) = Arc::into_inner(block_write_task) {
268                // We're the last database user, so we can tell it to shut down (blocking):
269                // - flushes the database to disk, and
270                // - drops the database, which cleans up any database tasks correctly.
271                self.db.shutdown(true);
272
273                // We are the last state with a reference to this thread, so we can
274                // wait until the block write task finishes, then check for panics (blocking).
275                // (We'd also like to abort the thread, but std::thread::JoinHandle can't do that.)
276
277                // This log is verbose during tests.
278                #[cfg(not(test))]
279                info!("waiting for the block write task to finish");
280                #[cfg(test)]
281                debug!("waiting for the block write task to finish");
282
283                // TODO: move this into a check_for_panics() method
284                if let Err(thread_panic) = block_write_task_handle.join() {
285                    std::panic::resume_unwind(thread_panic);
286                } else {
287                    debug!("shutting down the state because the block write task has finished");
288                }
289            }
290        } else {
291            // Even if we're not the last database user, try shutting it down.
292            //
293            // TODO: rename this to try_shutdown()?
294            self.db.shutdown(false);
295        }
296    }
297}
298
299impl StateService {
300    const PRUNE_INTERVAL: Duration = Duration::from_secs(30);
301
302    /// Creates a new state service for the state `config` and `network`.
303    ///
304    /// Uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
305    /// to work out when it is near the final checkpoint.
306    ///
307    /// Returns the read-write and read-only state services,
308    /// and read-only watch channels for its best chain tip.
309    pub async fn new(
310        config: Config,
311        network: &Network,
312        max_checkpoint_height: block::Height,
313        checkpoint_verify_concurrency_limit: usize,
314    ) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
315        let (finalized_state, finalized_tip, timer) = {
316            let config = config.clone();
317            let network = network.clone();
318            tokio::task::spawn_blocking(move || {
319                let timer = CodeTimer::start();
320                let finalized_state = FinalizedState::new(
321                    &config,
322                    &network,
323                    #[cfg(feature = "elasticsearch")]
324                    true,
325                );
326                timer.finish_desc("opening finalized state database");
327
328                let timer = CodeTimer::start();
329                let finalized_tip = finalized_state.db.tip_block();
330
331                (finalized_state, finalized_tip, timer)
332            })
333            .await
334            .expect("failed to join blocking task")
335        };
336
337        // # Correctness
338        //
339        // The state service must set the finalized block write sender to `None`
340        // if there are blocks in the restored non-finalized state that are above
341        // the max checkpoint height so that non-finalized blocks can be written, otherwise,
342        // Zebra will be unable to commit semantically verified blocks, and its chain sync will stall.
343        //
344        // The state service must not set the finalized block write sender to `None` if there
345        // aren't blocks in the restored non-finalized state that are above the max checkpoint height,
346        // otherwise, unless checkpoint sync is disabled in the zebra-consensus configuration,
347        // Zebra will be unable to commit checkpoint verified blocks, and its chain sync will stall.
348        let is_finalized_tip_past_max_checkpoint = if let Some(tip) = &finalized_tip {
349            tip.coinbase_height().expect("valid block must have height") >= max_checkpoint_height
350        } else {
351            false
352        };
353        let backup_dir_path = config.non_finalized_state_backup_dir(network);
354        let skip_backup_task = config.debug_skip_non_finalized_state_backup_task;
355        let (non_finalized_state, non_finalized_state_sender, non_finalized_state_receiver) =
356            NonFinalizedState::new(network)
357                .with_backup(
358                    backup_dir_path.clone(),
359                    &finalized_state.db,
360                    is_finalized_tip_past_max_checkpoint,
361                    config.debug_skip_non_finalized_state_backup_task,
362                )
363                .await;
364
365        let non_finalized_block_write_sent_hashes = SentHashes::new(&non_finalized_state);
366        let initial_tip = non_finalized_state
367            .best_tip_block()
368            .map(|cv_block| cv_block.block.clone())
369            .or(finalized_tip)
370            .map(CheckpointVerifiedBlock::from)
371            .map(ChainTipBlock::from);
372
373        tracing::info!(chain_tip = ?initial_tip.as_ref().map(|tip| (tip.hash, tip.height)), "loaded Zebra state cache");
374
375        let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
376            ChainTipSender::new(initial_tip, network);
377
378        let finalized_state_for_writing = finalized_state.clone();
379        let should_use_finalized_block_write_sender = non_finalized_state.is_chain_set_empty();
380        let sync_backup_dir_path = backup_dir_path.filter(|_| skip_backup_task);
381        let (
382            block_write_sender,
383            invalid_block_write_reset_receiver,
384            non_finalized_rejected_receiver,
385            block_write_task,
386        ) = write::BlockWriteSender::spawn(
387            finalized_state_for_writing,
388            non_finalized_state,
389            chain_tip_sender,
390            non_finalized_state_sender,
391            should_use_finalized_block_write_sender,
392            sync_backup_dir_path,
393        );
394
395        let read_service = ReadStateService::new(
396            &finalized_state,
397            block_write_task,
398            non_finalized_state_receiver,
399        );
400
401        let full_verifier_utxo_lookahead = max_checkpoint_height
402            - HeightDiff::try_from(checkpoint_verify_concurrency_limit)
403                .expect("fits in HeightDiff");
404        let full_verifier_utxo_lookahead =
405            full_verifier_utxo_lookahead.unwrap_or(block::Height::MIN);
406        let non_finalized_state_queued_blocks = QueuedBlocks::default();
407        let pending_utxos = PendingUtxos::default();
408
409        let finalized_block_write_last_sent_hash =
410            tokio::task::spawn_blocking(move || finalized_state.db.finalized_tip_hash())
411                .await
412                .expect("failed to join blocking task");
413
414        let state = Self {
415            network: network.clone(),
416            full_verifier_utxo_lookahead,
417            non_finalized_state_queued_blocks,
418            finalized_state_queued_blocks: HashMap::new(),
419            block_write_sender,
420            finalized_block_write_last_sent_hash,
421            non_finalized_block_write_sent_hashes,
422            invalid_block_write_reset_receiver,
423            non_finalized_rejected_receiver,
424            pending_utxos,
425            last_prune: Instant::now(),
426            read_service: read_service.clone(),
427            max_finalized_queue_height: f64::NAN,
428        };
429        timer.finish_desc("initializing state service");
430
431        tracing::info!("starting legacy chain check");
432        let timer = CodeTimer::start();
433
434        if let (Some(tip), Some(nu5_activation_height)) = (
435            {
436                let read_state = state.read_service.clone();
437                tokio::task::spawn_blocking(move || read_state.best_tip())
438                    .await
439                    .expect("task should not panic")
440            },
441            NetworkUpgrade::Nu5.activation_height(network),
442        ) {
443            if let Err(error) = check::legacy_chain(
444                nu5_activation_height,
445                any_ancestor_blocks(
446                    &state.read_service.latest_non_finalized_state(),
447                    &state.read_service.db,
448                    tip.1,
449                ),
450                &state.network,
451                MAX_LEGACY_CHAIN_BLOCKS,
452            ) {
453                let legacy_db_path = state.read_service.db.path().to_path_buf();
454                panic!(
455                    "Cached state contains a legacy chain.\n\
456                     An outdated Zebra version did not know about a recent network upgrade,\n\
457                     so it followed a legacy chain using outdated consensus branch rules.\n\
458                     Hint: Delete your database, and restart Zebra to do a full sync.\n\
459                     Database path: {legacy_db_path:?}\n\
460                     Error: {error:?}",
461                );
462            }
463        }
464
465        tracing::info!("cached state consensus branch is valid: no legacy chain found");
466        timer.finish_desc("legacy chain check");
467
468        // Spawn a background task to periodically export RocksDB metrics to Prometheus
469        let db_for_metrics = read_service.db.clone();
470        tokio::spawn(async move {
471            let mut interval = tokio::time::interval(Duration::from_secs(30));
472            loop {
473                interval.tick().await;
474                db_for_metrics.export_metrics();
475            }
476        });
477
478        (state, read_service, latest_chain_tip, chain_tip_change)
479    }
480
481    /// Call read only state service to log rocksdb database metrics.
482    pub fn log_db_metrics(&self) {
483        self.read_service.db.print_db_metrics();
484    }
485
486    /// Queue a checkpoint verified block for verification and storage in the finalized state.
487    ///
488    /// Returns a channel receiver that provides the result of the block commit.
489    fn queue_and_commit_to_finalized_state(
490        &mut self,
491        checkpoint_verified: CheckpointVerifiedBlock,
492    ) -> oneshot::Receiver<Result<block::Hash, CommitCheckpointVerifiedError>> {
493        // # Correctness & Performance
494        //
495        // This method must not block, access the database, or perform CPU-intensive tasks,
496        // because it is called directly from the tokio executor's Future threads.
497
498        let queued_prev_hash = checkpoint_verified.block.header.previous_block_hash;
499        let queued_height = checkpoint_verified.height;
500
501        // If we're close to the final checkpoint, make the block's UTXOs available for
502        // semantic block verification, even when it is in the channel.
503        if self.is_close_to_final_checkpoint(queued_height) {
504            self.non_finalized_block_write_sent_hashes
505                .add_finalized(&checkpoint_verified)
506        }
507
508        let (rsp_tx, rsp_rx) = oneshot::channel();
509        let queued = (checkpoint_verified, rsp_tx);
510
511        if self.block_write_sender.finalized.is_some() {
512            // We're still committing checkpoint verified blocks
513            if let Some(duplicate_queued) = self
514                .finalized_state_queued_blocks
515                .insert(queued_prev_hash, queued)
516            {
517                Self::send_checkpoint_verified_block_error(
518                    duplicate_queued,
519                    CommitBlockError::new_duplicate(
520                        Some(queued_prev_hash.into()),
521                        KnownBlock::Queue,
522                    ),
523                );
524            }
525
526            self.drain_finalized_queue_and_commit();
527        } else {
528            // We've finished committing checkpoint verified blocks to the finalized state,
529            // so drop any repeated queued blocks, and return an error.
530            //
531            // TODO: track the latest sent height, and drop any blocks under that height
532            //       every time we send some blocks (like QueuedSemanticallyVerifiedBlocks)
533            Self::send_checkpoint_verified_block_error(
534                queued,
535                CommitBlockError::new_duplicate(None, KnownBlock::Finalized),
536            );
537
538            self.clear_finalized_block_queue(CommitBlockError::new_duplicate(
539                None,
540                KnownBlock::Finalized,
541            ));
542        }
543
544        if self.finalized_state_queued_blocks.is_empty() {
545            self.max_finalized_queue_height = f64::NAN;
546        } else if self.max_finalized_queue_height.is_nan()
547            || self.max_finalized_queue_height < queued_height.0 as f64
548        {
549            // if there are still blocks in the queue, then either:
550            //   - the new block was lower than the old maximum, and there was a gap before it,
551            //     so the maximum is still the same (and we skip this code), or
552            //   - the new block is higher than the old maximum, and there is at least one gap
553            //     between the finalized tip and the new maximum
554            self.max_finalized_queue_height = queued_height.0 as f64;
555        }
556
557        metrics::gauge!("state.checkpoint.queued.max.height").set(self.max_finalized_queue_height);
558        metrics::gauge!("state.checkpoint.queued.block.count")
559            .set(self.finalized_state_queued_blocks.len() as f64);
560
561        rsp_rx
562    }
563
564    /// Finds finalized state queue blocks to be committed to the state in order,
565    /// removes them from the queue, and sends them to the block commit task.
566    ///
567    /// After queueing a finalized block, this method checks whether the newly
568    /// queued block (and any of its descendants) can be committed to the state.
569    ///
570    /// Returns an error if the block commit channel has been closed.
571    pub fn drain_finalized_queue_and_commit(&mut self) {
572        use tokio::sync::mpsc::error::{SendError, TryRecvError};
573
574        // # Correctness & Performance
575        //
576        // This method must not block, access the database, or perform CPU-intensive tasks,
577        // because it is called directly from the tokio executor's Future threads.
578
579        // If a block failed, we need to start again from a valid tip.
580        match self.invalid_block_write_reset_receiver.try_recv() {
581            Ok(reset_tip_hash) => self.finalized_block_write_last_sent_hash = reset_tip_hash,
582            Err(TryRecvError::Disconnected) => {
583                info!("Block commit task closed the block reset channel. Is Zebra shutting down?");
584                return;
585            }
586            // There are no errors, so we can just use the last block hash we sent
587            Err(TryRecvError::Empty) => {}
588        }
589
590        while let Some(queued_block) = self
591            .finalized_state_queued_blocks
592            .remove(&self.finalized_block_write_last_sent_hash)
593        {
594            let last_sent_finalized_block_height = queued_block.0.height;
595
596            self.finalized_block_write_last_sent_hash = queued_block.0.hash;
597
598            // If we've finished sending finalized blocks, ignore any repeated blocks.
599            // (Blocks can be repeated after a syncer reset.)
600            if let Some(finalized_block_write_sender) = &self.block_write_sender.finalized {
601                let send_result = finalized_block_write_sender.send(queued_block);
602
603                // If the receiver is closed, we can't send any more blocks.
604                if let Err(SendError(queued)) = send_result {
605                    // If Zebra is shutting down, drop blocks and return an error.
606                    Self::send_checkpoint_verified_block_error(
607                        queued,
608                        CommitBlockError::WriteTaskExited,
609                    );
610
611                    self.clear_finalized_block_queue(CommitBlockError::WriteTaskExited);
612                } else {
613                    metrics::gauge!("state.checkpoint.sent.block.height")
614                        .set(last_sent_finalized_block_height.0 as f64);
615                };
616            }
617        }
618    }
619
620    /// Drains every hash queued on `non_finalized_rejected_receiver` and
621    /// removes it from `non_finalized_block_write_sent_hashes`.
622    ///
623    /// This closes the lockout window where a rejected block keeps its hash
624    /// recorded as "sent", so a subsequent honest re-delivery of a block at
625    /// the same hash is not short-circuited as a false "duplicate".
626    ///
627    /// # Correctness & Performance
628    ///
629    /// Like the other drain methods on `StateService`, this must not block,
630    /// access the database, or perform CPU-intensive work, because it is
631    /// called directly from the tokio executor's Future threads.
632    fn drain_non_finalized_rejected_hashes(&mut self) {
633        use tokio::sync::mpsc::error::TryRecvError;
634
635        loop {
636            match self.non_finalized_rejected_receiver.try_recv() {
637                Ok(hash) => {
638                    self.non_finalized_block_write_sent_hashes.remove(&hash);
639                }
640                Err(TryRecvError::Empty) => break,
641                Err(TryRecvError::Disconnected) => {
642                    info!(
643                        "Block commit task closed the non-finalized rejected hash channel. \
644                         Is Zebra shutting down?"
645                    );
646                    break;
647                }
648            }
649        }
650    }
651
652    /// Drops all finalized state queue blocks, and sends an error on their result channels.
653    fn clear_finalized_block_queue(
654        &mut self,
655        error: impl Into<CommitCheckpointVerifiedError> + Clone,
656    ) {
657        for (_hash, queued) in self.finalized_state_queued_blocks.drain() {
658            Self::send_checkpoint_verified_block_error(queued, error.clone());
659        }
660    }
661
662    /// Send an error on a `QueuedCheckpointVerified` block's result channel, and drop the block
663    fn send_checkpoint_verified_block_error(
664        queued: QueuedCheckpointVerified,
665        error: impl Into<CommitCheckpointVerifiedError>,
666    ) {
667        let (finalized, rsp_tx) = queued;
668
669        // The block sender might have already given up on this block,
670        // so ignore any channel send errors.
671        let _ = rsp_tx.send(Err(error.into()));
672        std::mem::drop(finalized);
673    }
674
675    /// Drops all non-finalized state queue blocks, and sends an error on their result channels.
676    fn clear_non_finalized_block_queue(
677        &mut self,
678        error: impl Into<CommitSemanticallyVerifiedError> + Clone,
679    ) {
680        for (_hash, queued) in self.non_finalized_state_queued_blocks.drain() {
681            Self::send_semantically_verified_block_error(queued, error.clone());
682        }
683    }
684
685    /// Send an error on a `QueuedSemanticallyVerified` block's result channel, and drop the block
686    fn send_semantically_verified_block_error(
687        queued: QueuedSemanticallyVerified,
688        error: impl Into<CommitSemanticallyVerifiedError>,
689    ) {
690        let (finalized, rsp_tx) = queued;
691
692        // The block sender might have already given up on this block,
693        // so ignore any channel send errors.
694        let _ = rsp_tx.send(Err(error.into()));
695        std::mem::drop(finalized);
696    }
697
698    /// Queue a semantically verified block for contextual verification and check if any queued
699    /// blocks are ready to be verified and committed to the state.
700    ///
701    /// This function encodes the logic for [committing non-finalized blocks][1]
702    /// in RFC0005.
703    ///
704    /// [1]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html#committing-non-finalized-blocks
705    #[instrument(level = "debug", skip(self, semantically_verified))]
706    fn queue_and_commit_to_non_finalized_state(
707        &mut self,
708        semantically_verified: SemanticallyVerifiedBlock,
709    ) -> oneshot::Receiver<Result<block::Hash, CommitSemanticallyVerifiedError>> {
710        tracing::debug!(block = %semantically_verified.block, "queueing block for contextual verification");
711        let parent_hash = semantically_verified.block.header.previous_block_hash;
712
713        // Drop hashes of any blocks the write task has rejected before checking
714        // the SentHashes membership below. Without this, a rejected same-hash
715        // block would lock out a later honest re-delivery of a block at the
716        // same hash as a false "duplicate".
717        self.drain_non_finalized_rejected_hashes();
718
719        if self
720            .non_finalized_block_write_sent_hashes
721            .contains(&semantically_verified.hash)
722        {
723            let (rsp_tx, rsp_rx) = oneshot::channel();
724            let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
725                Some(semantically_verified.hash.into()),
726                KnownBlock::WriteChannel,
727            )
728            .into()));
729            return rsp_rx;
730        }
731
732        if self
733            .read_service
734            .db
735            .contains_height(semantically_verified.height)
736        {
737            let (rsp_tx, rsp_rx) = oneshot::channel();
738            let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
739                Some(semantically_verified.height.into()),
740                KnownBlock::Finalized,
741            )
742            .into()));
743            return rsp_rx;
744        }
745
746        // [`Request::CommitSemanticallyVerifiedBlock`] contract: a request to commit a block which
747        // has been queued but not yet committed to the state fails the older request and replaces
748        // it with the newer request.
749        let rsp_rx = if let Some((_, old_rsp_tx)) = self
750            .non_finalized_state_queued_blocks
751            .get_mut(&semantically_verified.hash)
752        {
753            tracing::debug!("replacing older queued request with new request");
754            let (mut rsp_tx, rsp_rx) = oneshot::channel();
755            std::mem::swap(old_rsp_tx, &mut rsp_tx);
756            let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
757                Some(semantically_verified.hash.into()),
758                KnownBlock::Queue,
759            )
760            .into()));
761            rsp_rx
762        } else {
763            let (rsp_tx, rsp_rx) = oneshot::channel();
764            self.non_finalized_state_queued_blocks
765                .queue((semantically_verified, rsp_tx));
766            rsp_rx
767        };
768
769        // We've finished sending checkpoint verified blocks when:
770        // - we've sent the verified block for the last checkpoint, and
771        // - it has been successfully written to disk.
772        //
773        // We detect the last checkpoint by looking for non-finalized blocks
774        // that are a child of the last block we sent.
775        //
776        // TODO: configure the state with the last checkpoint hash instead?
777        if self.block_write_sender.finalized.is_some()
778            && self
779                .non_finalized_state_queued_blocks
780                .has_queued_children(self.finalized_block_write_last_sent_hash)
781            && self.read_service.db.finalized_tip_hash()
782                == self.finalized_block_write_last_sent_hash
783        {
784            // Tell the block write task to stop committing checkpoint verified blocks to the finalized state,
785            // and move on to committing semantically verified blocks to the non-finalized state.
786            std::mem::drop(self.block_write_sender.finalized.take());
787            // Remove any checkpoint-verified block hashes from `non_finalized_block_write_sent_hashes`.
788            self.non_finalized_block_write_sent_hashes = SentHashes::default();
789            // Mark `SentHashes` as usable by the `can_fork_chain_at()` method.
790            self.non_finalized_block_write_sent_hashes
791                .can_fork_chain_at_hashes = true;
792            // Send blocks from non-finalized queue
793            self.send_ready_non_finalized_queued(self.finalized_block_write_last_sent_hash);
794            // We've finished committing checkpoint verified blocks to finalized state, so drop any repeated queued blocks.
795            self.clear_finalized_block_queue(CommitBlockError::new_duplicate(
796                None,
797                KnownBlock::Finalized,
798            ));
799        } else if !self.can_fork_chain_at(&parent_hash) {
800            tracing::trace!("unready to verify, returning early");
801        } else if self.block_write_sender.finalized.is_none() {
802            // Wait until block commit task is ready to write non-finalized blocks before dequeuing them
803            self.send_ready_non_finalized_queued(parent_hash);
804
805            let finalized_tip_height = self.read_service.db.finalized_tip_height().expect(
806                "Finalized state must have at least one block before committing non-finalized state",
807            );
808
809            self.non_finalized_state_queued_blocks
810                .prune_by_height(finalized_tip_height);
811
812            self.non_finalized_block_write_sent_hashes
813                .prune_by_height(finalized_tip_height);
814        }
815
816        rsp_rx
817    }
818
819    /// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks.
820    fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
821        self.non_finalized_block_write_sent_hashes
822            .can_fork_chain_at(hash)
823            || &self.read_service.db.finalized_tip_hash() == hash
824    }
825
826    /// Returns `true` if `queued_height` is near the final checkpoint.
827    ///
828    /// The semantic block verifier needs access to UTXOs from checkpoint verified blocks
829    /// near the final checkpoint, so that it can verify blocks that spend those UTXOs.
830    ///
831    /// If it doesn't have the required UTXOs, some blocks will time out,
832    /// but succeed after a syncer restart.
833    fn is_close_to_final_checkpoint(&self, queued_height: block::Height) -> bool {
834        queued_height >= self.full_verifier_utxo_lookahead
835    }
836
837    /// Sends all queued blocks whose parents have recently arrived starting from `new_parent`
838    /// in breadth-first ordering to the block write task which will attempt to validate and commit them
839    #[tracing::instrument(level = "debug", skip(self, new_parent))]
840    fn send_ready_non_finalized_queued(&mut self, new_parent: block::Hash) {
841        use tokio::sync::mpsc::error::SendError;
842        if let Some(non_finalized_block_write_sender) = &self.block_write_sender.non_finalized {
843            let mut new_parents: Vec<block::Hash> = vec![new_parent];
844
845            while let Some(parent_hash) = new_parents.pop() {
846                let queued_children = self
847                    .non_finalized_state_queued_blocks
848                    .dequeue_children(parent_hash);
849
850                for queued_child in queued_children {
851                    let (SemanticallyVerifiedBlock { hash, .. }, _) = queued_child;
852
853                    self.non_finalized_block_write_sent_hashes
854                        .add(&queued_child.0);
855                    let send_result = non_finalized_block_write_sender.send(queued_child.into());
856
857                    if let Err(SendError(NonFinalizedWriteMessage::Commit(queued))) = send_result {
858                        // If Zebra is shutting down, drop blocks and return an error.
859                        Self::send_semantically_verified_block_error(
860                            queued,
861                            CommitBlockError::WriteTaskExited,
862                        );
863
864                        self.clear_non_finalized_block_queue(CommitBlockError::WriteTaskExited);
865
866                        return;
867                    };
868
869                    new_parents.push(hash);
870                }
871            }
872
873            self.non_finalized_block_write_sent_hashes.finish_batch();
874        };
875    }
876
877    /// Return the tip of the current best chain.
878    pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
879        self.read_service.best_tip()
880    }
881
882    fn send_invalidate_block(
883        &self,
884        hash: block::Hash,
885    ) -> oneshot::Receiver<Result<block::Hash, InvalidateError>> {
886        let (rsp_tx, rsp_rx) = oneshot::channel();
887
888        let Some(sender) = &self.block_write_sender.non_finalized else {
889            let _ = rsp_tx.send(Err(InvalidateError::ProcessingCheckpointedBlocks));
890            return rsp_rx;
891        };
892
893        if let Err(tokio::sync::mpsc::error::SendError(error)) =
894            sender.send(NonFinalizedWriteMessage::Invalidate { hash, rsp_tx })
895        {
896            let NonFinalizedWriteMessage::Invalidate { rsp_tx, .. } = error else {
897                unreachable!("should return the same Invalidate message could not be sent");
898            };
899
900            let _ = rsp_tx.send(Err(InvalidateError::SendInvalidateRequestFailed));
901        }
902
903        rsp_rx
904    }
905
906    fn send_reconsider_block(
907        &self,
908        hash: block::Hash,
909    ) -> oneshot::Receiver<Result<Vec<block::Hash>, ReconsiderError>> {
910        let (rsp_tx, rsp_rx) = oneshot::channel();
911
912        let Some(sender) = &self.block_write_sender.non_finalized else {
913            let _ = rsp_tx.send(Err(ReconsiderError::CheckpointCommitInProgress));
914            return rsp_rx;
915        };
916
917        if let Err(tokio::sync::mpsc::error::SendError(error)) =
918            sender.send(NonFinalizedWriteMessage::Reconsider { hash, rsp_tx })
919        {
920            let NonFinalizedWriteMessage::Reconsider { rsp_tx, .. } = error else {
921                unreachable!("should return the same Reconsider message could not be sent");
922            };
923
924            let _ = rsp_tx.send(Err(ReconsiderError::ReconsiderSendFailed));
925        }
926
927        rsp_rx
928    }
929
930    /// Assert some assumptions about the semantically verified `block` before it is queued.
931    fn assert_block_can_be_validated(&self, block: &SemanticallyVerifiedBlock) {
932        // required by `Request::CommitSemanticallyVerifiedBlock` call
933        assert!(
934            block.height > self.network.mandatory_checkpoint_height(),
935            "invalid semantically verified block height: the canopy checkpoint is mandatory, pre-canopy \
936            blocks, and the canopy activation block, must be committed to the state as finalized \
937            blocks"
938        );
939    }
940
941    fn known_sent_hash(&self, hash: &block::Hash) -> Option<KnownBlock> {
942        self.non_finalized_block_write_sent_hashes
943            .contains(hash)
944            .then_some(KnownBlock::WriteChannel)
945    }
946}
947
948impl ReadStateService {
949    /// Creates a new read-only state service, using the provided finalized state and
950    /// block write task handle.
951    ///
952    /// Returns the newly created service,
953    /// and a watch channel for updating the shared recent non-finalized chain.
954    pub(crate) fn new(
955        finalized_state: &FinalizedState,
956        block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
957        non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
958    ) -> Self {
959        let read_service = Self {
960            network: finalized_state.network(),
961            db: finalized_state.db.clone(),
962            non_finalized_state_receiver,
963            block_write_task,
964        };
965
966        tracing::debug!("created new read-only state service");
967
968        read_service
969    }
970
971    /// Return the tip of the current best chain.
972    pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
973        read::best_tip(&self.latest_non_finalized_state(), &self.db)
974    }
975
976    /// Gets a clone of the latest non-finalized state from the `non_finalized_state_receiver`
977    fn latest_non_finalized_state(&self) -> NonFinalizedState {
978        self.non_finalized_state_receiver.cloned_watch_data()
979    }
980
981    /// Gets a clone of the latest, best non-finalized chain from the `non_finalized_state_receiver`
982    fn latest_best_chain(&self) -> Option<Arc<Chain>> {
983        self.non_finalized_state_receiver
984            .borrow_mapped(|non_finalized_state| non_finalized_state.best_chain().cloned())
985    }
986
987    /// Test-only access to the inner database.
988    /// Can be used to modify the database without doing any consensus checks.
989    #[cfg(any(test, feature = "proptest-impl"))]
990    pub fn db(&self) -> &ZebraDb {
991        &self.db
992    }
993
994    /// Logs rocksdb metrics using the read only state service.
995    pub fn log_db_metrics(&self) {
996        self.db.print_db_metrics();
997    }
998}
999
1000impl Service<Request> for StateService {
1001    type Response = Response;
1002    type Error = BoxError;
1003    type Future =
1004        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1005
1006    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1007        // Check for panics in the block write task
1008        let poll = self.read_service.poll_ready(cx);
1009
1010        // Prune outdated UTXO requests
1011        let now = Instant::now();
1012
1013        if self.last_prune + Self::PRUNE_INTERVAL < now {
1014            let tip = self.best_tip();
1015            let old_len = self.pending_utxos.len();
1016
1017            self.pending_utxos.prune();
1018            self.last_prune = now;
1019
1020            let new_len = self.pending_utxos.len();
1021            let prune_count = old_len
1022                .checked_sub(new_len)
1023                .expect("prune does not add any utxo requests");
1024            if prune_count > 0 {
1025                tracing::debug!(
1026                    ?old_len,
1027                    ?new_len,
1028                    ?prune_count,
1029                    ?tip,
1030                    "pruned utxo requests"
1031                );
1032            } else {
1033                tracing::debug!(len = ?old_len, ?tip, "no utxo requests needed pruning");
1034            }
1035        }
1036
1037        poll
1038    }
1039
1040    #[instrument(name = "state", skip(self, req))]
1041    fn call(&mut self, req: Request) -> Self::Future {
1042        req.count_metric();
1043        let span = Span::current();
1044
1045        match req {
1046            // Uses non_finalized_state_queued_blocks and pending_utxos in the StateService
1047            // Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb.
1048            //
1049            // The expected error type for this request is `CommitSemanticallyVerifiedError`.
1050            Request::CommitSemanticallyVerifiedBlock(semantically_verified) => {
1051                let timer = CodeTimer::start();
1052                self.assert_block_can_be_validated(&semantically_verified);
1053
1054                self.pending_utxos
1055                    .check_against_ordered(&semantically_verified.new_outputs);
1056
1057                // # Performance
1058                //
1059                // Allow other async tasks to make progress while blocks are being verified
1060                // and written to disk. But wait for the blocks to finish committing,
1061                // so that `StateService` multi-block queries always observe a consistent state.
1062                //
1063                // Since each block is spawned into its own task,
1064                // there shouldn't be any other code running in the same task,
1065                // so we don't need to worry about blocking it:
1066                // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
1067
1068                let rsp_rx = tokio::task::block_in_place(move || {
1069                    span.in_scope(|| {
1070                        self.queue_and_commit_to_non_finalized_state(semantically_verified)
1071                    })
1072                });
1073
1074                // TODO:
1075                //   - check for panics in the block write task here,
1076                //     as well as in poll_ready()
1077
1078                // The work is all done, the future just waits on a channel for the result
1079                timer.finish_desc("CommitSemanticallyVerifiedBlock");
1080
1081                // Await the channel response, flatten the result, map receive errors to
1082                // `CommitSemanticallyVerifiedError::WriteTaskExited`.
1083                // Then flatten the nested Result and convert any errors to a BoxError.
1084                let span = Span::current();
1085                async move {
1086                    rsp_rx
1087                        .await
1088                        .map_err(|_recv_error| CommitBlockError::WriteTaskExited.into())
1089                        .and_then(|result| result)
1090                        .map_err(BoxError::from)
1091                        .map(Response::Committed)
1092                }
1093                .instrument(span)
1094                .boxed()
1095            }
1096
1097            // Uses finalized_state_queued_blocks and pending_utxos in the StateService.
1098            // Accesses shared writeable state in the StateService.
1099            //
1100            // The expected error type for this request is `CommitCheckpointVerifiedError`.
1101            Request::CommitCheckpointVerifiedBlock(finalized) => {
1102                let timer = CodeTimer::start();
1103                // # Consensus
1104                //
1105                // A semantic block verification could have called AwaitUtxo
1106                // before this checkpoint verified block arrived in the state.
1107                // So we need to check for pending UTXO requests sent by running
1108                // semantic block verifications.
1109                //
1110                // This check is redundant for most checkpoint verified blocks,
1111                // because semantic verification can only succeed near the final
1112                // checkpoint, when all the UTXOs are available for the verifying block.
1113                //
1114                // (Checkpoint block UTXOs are verified using block hash checkpoints
1115                // and transaction merkle tree block header commitments.)
1116                self.pending_utxos
1117                    .check_against_ordered(&finalized.new_outputs);
1118
1119                // # Performance
1120                //
1121                // This method doesn't block, access the database, or perform CPU-intensive tasks,
1122                // so we can run it directly in the tokio executor's Future threads.
1123                let rsp_rx = self.queue_and_commit_to_finalized_state(finalized);
1124
1125                // TODO:
1126                //   - check for panics in the block write task here,
1127                //     as well as in poll_ready()
1128
1129                // The work is all done, the future just waits on a channel for the result
1130                timer.finish_desc("CommitCheckpointVerifiedBlock");
1131
1132                // Await the channel response, flatten the result, map receive errors to
1133                // `CommitCheckpointVerifiedError::WriteTaskExited`.
1134                // Then flatten the nested Result and convert any errors to a BoxError.
1135                async move {
1136                    rsp_rx
1137                        .await
1138                        .map_err(|_recv_error| CommitBlockError::WriteTaskExited.into())
1139                        .and_then(|result| result)
1140                        .map_err(BoxError::from)
1141                        .map(Response::Committed)
1142                }
1143                .instrument(span)
1144                .boxed()
1145            }
1146
1147            // Uses pending_utxos and non_finalized_state_queued_blocks in the StateService.
1148            // If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService.
1149            Request::AwaitUtxo(outpoint) => {
1150                let timer = CodeTimer::start();
1151                // Prepare the AwaitUtxo future from PendingUxtos.
1152                let response_fut = self.pending_utxos.queue(outpoint);
1153                // Only instrument `response_fut`, the ReadStateService already
1154                // instruments its requests with the same span.
1155
1156                let response_fut = response_fut.instrument(span).boxed();
1157
1158                // Check the non-finalized block queue outside the returned future,
1159                // so we can access mutable state fields.
1160                if let Some(utxo) = self.non_finalized_state_queued_blocks.utxo(&outpoint) {
1161                    self.pending_utxos.respond(&outpoint, utxo);
1162
1163                    // We're finished, the returned future gets the UTXO from the respond() channel.
1164                    timer.finish_desc("AwaitUtxo/queued-non-finalized");
1165
1166                    return response_fut;
1167                }
1168
1169                // Check the sent non-finalized blocks
1170                if let Some(utxo) = self.non_finalized_block_write_sent_hashes.utxo(&outpoint) {
1171                    self.pending_utxos.respond(&outpoint, utxo);
1172
1173                    // We're finished, the returned future gets the UTXO from the respond() channel.
1174                    timer.finish_desc("AwaitUtxo/sent-non-finalized");
1175
1176                    return response_fut;
1177                }
1178
1179                // We ignore any UTXOs in FinalizedState.finalized_state_queued_blocks,
1180                // because it is only used during checkpoint verification.
1181                //
1182                // This creates a rare race condition, but it doesn't seem to happen much in practice.
1183                // See #5126 for details.
1184
1185                // Manually send a request to the ReadStateService,
1186                // to get UTXOs from any non-finalized chain or the finalized chain.
1187                let read_service = self.read_service.clone();
1188
1189                // Run the request in an async block, so we can await the response.
1190                async move {
1191                    let req = ReadRequest::AnyChainUtxo(outpoint);
1192
1193                    let rsp = read_service.oneshot(req).await?;
1194
1195                    // Optional TODO:
1196                    //  - make pending_utxos.respond() async using a channel,
1197                    //    so we can respond to all waiting requests here
1198                    //
1199                    // This change is not required for correctness, because:
1200                    // - any waiting requests should have returned when the block was sent to the state
1201                    // - otherwise, the request returns immediately if:
1202                    //   - the block is in the non-finalized queue, or
1203                    //   - the block is in any non-finalized chain or the finalized state
1204                    //
1205                    // And if the block is in the finalized queue,
1206                    // that's rare enough that a retry is ok.
1207                    if let ReadResponse::AnyChainUtxo(Some(utxo)) = rsp {
1208                        // We got a UTXO, so we replace the response future with the result own.
1209                        timer.finish_desc("AwaitUtxo/any-chain");
1210
1211                        return Ok(Response::Utxo(utxo));
1212                    }
1213
1214                    // We're finished, but the returned future is waiting on the respond() channel.
1215                    timer.finish_desc("AwaitUtxo/waiting");
1216
1217                    response_fut.await
1218                }
1219                .boxed()
1220            }
1221
1222            // Used by sync, inbound, and block verifier to check if a block is already in the state
1223            // before downloading or validating it.
1224            Request::KnownBlock(hash) => {
1225                let timer = CodeTimer::start();
1226                let sent_hash_response = self.known_sent_hash(&hash);
1227                let read_service = self.read_service.clone();
1228
1229                async move {
1230                    if sent_hash_response.is_some() {
1231                        return Ok(Response::KnownBlock(sent_hash_response));
1232                    };
1233
1234                    let response = read::non_finalized_state_contains_block_hash(
1235                        &read_service.latest_non_finalized_state(),
1236                        hash,
1237                    )
1238                    // TODO: Move this to a blocking task, perhaps by moving some of this logic to the ReadStateService.
1239                    .or_else(|| read::finalized_state_contains_block_hash(&read_service.db, hash));
1240
1241                    timer.finish_desc("Request::KnownBlock");
1242
1243                    Ok(Response::KnownBlock(response))
1244                }
1245                .boxed()
1246            }
1247
1248            // The expected error type for this request is `InvalidateError`
1249            Request::InvalidateBlock(block_hash) => {
1250                let rsp_rx = tokio::task::block_in_place(move || {
1251                    span.in_scope(|| self.send_invalidate_block(block_hash))
1252                });
1253
1254                // Await the channel response, flatten the result, map receive errors to
1255                // `InvalidateError::InvalidateRequestDropped`.
1256                // Then flatten the nested Result and convert any errors to a BoxError.
1257                let span = Span::current();
1258                async move {
1259                    rsp_rx
1260                        .await
1261                        .map_err(|_recv_error| InvalidateError::InvalidateRequestDropped)
1262                        .and_then(|result| result)
1263                        .map_err(BoxError::from)
1264                        .map(Response::Invalidated)
1265                }
1266                .instrument(span)
1267                .boxed()
1268            }
1269
1270            // The expected error type for this request is `ReconsiderError`
1271            Request::ReconsiderBlock(block_hash) => {
1272                let rsp_rx = tokio::task::block_in_place(move || {
1273                    span.in_scope(|| self.send_reconsider_block(block_hash))
1274                });
1275
1276                // Await the channel response, flatten the result, map receive errors to
1277                // `ReconsiderError::ReconsiderResponseDropped`.
1278                // Then flatten the nested Result and convert any errors to a BoxError.
1279                let span = Span::current();
1280                async move {
1281                    rsp_rx
1282                        .await
1283                        .map_err(|_recv_error| ReconsiderError::ReconsiderResponseDropped)
1284                        .and_then(|result| result)
1285                        .map_err(BoxError::from)
1286                        .map(Response::Reconsidered)
1287                }
1288                .instrument(span)
1289                .boxed()
1290            }
1291
1292            // Runs concurrently using the ReadStateService
1293            Request::Tip
1294            | Request::Depth(_)
1295            | Request::BestChainNextMedianTimePast
1296            | Request::BestChainBlockHash(_)
1297            | Request::BlockLocator
1298            | Request::Transaction(_)
1299            | Request::AnyChainTransaction(_)
1300            | Request::UnspentBestChainUtxo(_)
1301            | Request::Block(_)
1302            | Request::AnyChainBlock(_)
1303            | Request::BlockAndSize(_)
1304            | Request::BlockHeader(_)
1305            | Request::FindBlockHashes { .. }
1306            | Request::FindBlockHeaders { .. }
1307            | Request::CheckBestChainTipNullifiersAndAnchors(_)
1308            | Request::CheckBlockProposalValidity(_) => {
1309                // Redirect the request to the concurrent ReadStateService
1310                let read_service = self.read_service.clone();
1311
1312                async move {
1313                    let req = req
1314                        .try_into()
1315                        .expect("ReadRequest conversion should not fail");
1316
1317                    let rsp = read_service.oneshot(req).await?;
1318                    let rsp = rsp.try_into().expect("Response conversion should not fail");
1319
1320                    Ok(rsp)
1321                }
1322                .boxed()
1323            }
1324        }
1325    }
1326}
1327
1328impl Service<ReadRequest> for ReadStateService {
1329    type Response = ReadResponse;
1330    type Error = BoxError;
1331    type Future =
1332        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1333
1334    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1335        // Check for panics in the block write task
1336        //
1337        // TODO: move into a check_for_panics() method
1338        let block_write_task = self.block_write_task.take();
1339
1340        if let Some(block_write_task) = block_write_task {
1341            if block_write_task.is_finished() {
1342                if let Some(block_write_task) = Arc::into_inner(block_write_task) {
1343                    // We are the last state with a reference to this task, so we can propagate any panics
1344                    if let Err(thread_panic) = block_write_task.join() {
1345                        std::panic::resume_unwind(thread_panic);
1346                    }
1347                }
1348            } else {
1349                // It hasn't finished, so we need to put it back
1350                self.block_write_task = Some(block_write_task);
1351            }
1352        }
1353
1354        self.db.check_for_panics();
1355
1356        Poll::Ready(Ok(()))
1357    }
1358
1359    #[instrument(name = "read_state", skip(self, req))]
1360    fn call(&mut self, req: ReadRequest) -> Self::Future {
1361        req.count_metric();
1362        let timer = CodeTimer::start_desc(req.variant_name());
1363        let span = Span::current();
1364        let timed_span = TimedSpan::new(timer, span);
1365        let state = self.clone();
1366
1367        if req == ReadRequest::NonFinalizedBlocksListener {
1368            // The non-finalized blocks listener is used to notify the state service
1369            // about new blocks that have been added to the non-finalized state.
1370            let non_finalized_blocks_listener = NonFinalizedBlocksListener::spawn(
1371                self.network.clone(),
1372                self.non_finalized_state_receiver.clone(),
1373            );
1374
1375            return async move {
1376                Ok(ReadResponse::NonFinalizedBlocksListener(
1377                    non_finalized_blocks_listener,
1378                ))
1379            }
1380            .boxed();
1381        };
1382
1383        let request_handler = move || match req {
1384            // Used by the `getblockchaininfo` RPC.
1385            ReadRequest::UsageInfo => Ok(ReadResponse::UsageInfo(state.db.size())),
1386
1387            // Used by the StateService.
1388            ReadRequest::Tip => Ok(ReadResponse::Tip(read::tip(
1389                state.latest_best_chain(),
1390                &state.db,
1391            ))),
1392
1393            // Used by `getblockchaininfo` RPC method.
1394            ReadRequest::TipPoolValues => {
1395                let (tip_height, tip_hash, value_balance) =
1396                    read::tip_with_value_balance(state.latest_best_chain(), &state.db)?
1397                        .ok_or(BoxError::from("no chain tip available yet"))?;
1398
1399                Ok(ReadResponse::TipPoolValues {
1400                    tip_height,
1401                    tip_hash,
1402                    value_balance,
1403                })
1404            }
1405
1406            // Used by getblock
1407            ReadRequest::BlockInfo(hash_or_height) => Ok(ReadResponse::BlockInfo(
1408                read::block_info(state.latest_best_chain(), &state.db, hash_or_height),
1409            )),
1410
1411            // Used by the StateService.
1412            ReadRequest::Depth(hash) => Ok(ReadResponse::Depth(read::depth(
1413                state.latest_best_chain(),
1414                &state.db,
1415                hash,
1416            ))),
1417
1418            // Used by the StateService.
1419            ReadRequest::BestChainNextMedianTimePast => {
1420                Ok(ReadResponse::BestChainNextMedianTimePast(
1421                    read::next_median_time_past(&state.latest_non_finalized_state(), &state.db)?,
1422                ))
1423            }
1424
1425            // Used by the get_block (raw) RPC and the StateService.
1426            ReadRequest::Block(hash_or_height) => Ok(ReadResponse::Block(read::block(
1427                state.latest_best_chain(),
1428                &state.db,
1429                hash_or_height,
1430            ))),
1431
1432            ReadRequest::AnyChainBlock(hash_or_height) => Ok(ReadResponse::Block(read::any_block(
1433                state.latest_non_finalized_state().chain_iter(),
1434                &state.db,
1435                hash_or_height,
1436            ))),
1437
1438            // Used by the get_block (raw) RPC and the StateService.
1439            ReadRequest::BlockAndSize(hash_or_height) => Ok(ReadResponse::BlockAndSize(
1440                read::block_and_size(state.latest_best_chain(), &state.db, hash_or_height),
1441            )),
1442
1443            // Used by the get_block (verbose) RPC and the StateService.
1444            ReadRequest::BlockHeader(hash_or_height) => {
1445                let best_chain = state.latest_best_chain();
1446
1447                let height = hash_or_height
1448                    .height_or_else(|hash| {
1449                        read::find::height_by_hash(best_chain.clone(), &state.db, hash)
1450                    })
1451                    .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1452
1453                let hash = hash_or_height
1454                    .hash_or_else(|height| {
1455                        read::find::hash_by_height(best_chain.clone(), &state.db, height)
1456                    })
1457                    .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1458
1459                let next_height = height.next()?;
1460                let next_block_hash =
1461                    read::find::hash_by_height(best_chain.clone(), &state.db, next_height);
1462
1463                let header = read::block_header(best_chain, &state.db, height.into())
1464                    .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1465
1466                Ok(ReadResponse::BlockHeader {
1467                    header,
1468                    hash,
1469                    height,
1470                    next_block_hash,
1471                })
1472            }
1473
1474            // For the get_raw_transaction RPC and the StateService.
1475            ReadRequest::Transaction(hash) => Ok(ReadResponse::Transaction(
1476                read::mined_transaction(state.latest_best_chain(), &state.db, hash),
1477            )),
1478
1479            ReadRequest::AnyChainTransaction(hash) => {
1480                Ok(ReadResponse::AnyChainTransaction(read::any_transaction(
1481                    state.latest_non_finalized_state().chain_iter(),
1482                    &state.db,
1483                    hash,
1484                )))
1485            }
1486
1487            // Used by the getblock (verbose) RPC.
1488            ReadRequest::TransactionIdsForBlock(hash_or_height) => Ok(
1489                ReadResponse::TransactionIdsForBlock(read::transaction_hashes_for_block(
1490                    state.latest_best_chain(),
1491                    &state.db,
1492                    hash_or_height,
1493                )),
1494            ),
1495
1496            ReadRequest::AnyChainTransactionIdsForBlock(hash_or_height) => {
1497                Ok(ReadResponse::AnyChainTransactionIdsForBlock(
1498                    read::transaction_hashes_for_any_block(
1499                        state.latest_non_finalized_state().chain_iter(),
1500                        &state.db,
1501                        hash_or_height,
1502                    ),
1503                ))
1504            }
1505
1506            #[cfg(feature = "indexer")]
1507            ReadRequest::SpendingTransactionId(spend) => Ok(ReadResponse::TransactionId(
1508                read::spending_transaction_hash(state.latest_best_chain(), &state.db, spend),
1509            )),
1510
1511            ReadRequest::UnspentBestChainUtxo(outpoint) => Ok(ReadResponse::UnspentBestChainUtxo(
1512                read::unspent_utxo(state.latest_best_chain(), &state.db, outpoint),
1513            )),
1514
1515            // Manually used by the StateService to implement part of AwaitUtxo.
1516            ReadRequest::AnyChainUtxo(outpoint) => Ok(ReadResponse::AnyChainUtxo(read::any_utxo(
1517                state.latest_non_finalized_state(),
1518                &state.db,
1519                outpoint,
1520            ))),
1521
1522            // Used by the StateService.
1523            ReadRequest::BlockLocator => Ok(ReadResponse::BlockLocator(
1524                read::block_locator(state.latest_best_chain(), &state.db).unwrap_or_default(),
1525            )),
1526
1527            // Used by the StateService.
1528            ReadRequest::FindBlockHashes { known_blocks, stop } => {
1529                Ok(ReadResponse::BlockHashes(read::find_chain_hashes(
1530                    state.latest_best_chain(),
1531                    &state.db,
1532                    known_blocks,
1533                    stop,
1534                    MAX_FIND_BLOCK_HASHES_RESULTS,
1535                )))
1536            }
1537
1538            // Used by the StateService.
1539            ReadRequest::FindBlockHeaders { known_blocks, stop } => Ok(ReadResponse::BlockHeaders(
1540                read::find_chain_headers(
1541                    state.latest_best_chain(),
1542                    &state.db,
1543                    known_blocks,
1544                    stop,
1545                    MAX_FIND_BLOCK_HEADERS_RESULTS,
1546                )
1547                .into_iter()
1548                .map(|header| CountedHeader { header })
1549                .collect(),
1550            )),
1551
1552            ReadRequest::SaplingTree(hash_or_height) => Ok(ReadResponse::SaplingTree(
1553                read::sapling_tree(state.latest_best_chain(), &state.db, hash_or_height),
1554            )),
1555
1556            ReadRequest::OrchardTree(hash_or_height) => Ok(ReadResponse::OrchardTree(
1557                read::orchard_tree(state.latest_best_chain(), &state.db, hash_or_height),
1558            )),
1559
1560            ReadRequest::SaplingSubtrees { start_index, limit } => {
1561                let end_index = limit
1562                    .and_then(|limit| start_index.0.checked_add(limit.0))
1563                    .map(NoteCommitmentSubtreeIndex);
1564
1565                let best_chain = state.latest_best_chain();
1566                let sapling_subtrees = if let Some(end_index) = end_index {
1567                    read::sapling_subtrees(best_chain, &state.db, start_index..end_index)
1568                } else {
1569                    // If there is no end bound, just return all the trees.
1570                    // If the end bound would overflow, just returns all the trees, because that's what
1571                    // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1572                    // the trees run out.)
1573                    read::sapling_subtrees(best_chain, &state.db, start_index..)
1574                };
1575
1576                Ok(ReadResponse::SaplingSubtrees(sapling_subtrees))
1577            }
1578
1579            ReadRequest::OrchardSubtrees { start_index, limit } => {
1580                let end_index = limit
1581                    .and_then(|limit| start_index.0.checked_add(limit.0))
1582                    .map(NoteCommitmentSubtreeIndex);
1583
1584                let best_chain = state.latest_best_chain();
1585                let orchard_subtrees = if let Some(end_index) = end_index {
1586                    read::orchard_subtrees(best_chain, &state.db, start_index..end_index)
1587                } else {
1588                    // If there is no end bound, just return all the trees.
1589                    // If the end bound would overflow, just returns all the trees, because that's what
1590                    // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1591                    // the trees run out.)
1592                    read::orchard_subtrees(best_chain, &state.db, start_index..)
1593                };
1594
1595                Ok(ReadResponse::OrchardSubtrees(orchard_subtrees))
1596            }
1597
1598            // For the get_address_balance RPC.
1599            ReadRequest::AddressBalance(addresses) => {
1600                let (balance, received) =
1601                    read::transparent_balance(state.latest_best_chain(), &state.db, addresses)?;
1602                Ok(ReadResponse::AddressBalance { balance, received })
1603            }
1604
1605            // For the get_address_tx_ids RPC.
1606            ReadRequest::TransactionIdsByAddresses {
1607                addresses,
1608                height_range,
1609            } => read::transparent_tx_ids(
1610                state.latest_best_chain(),
1611                &state.db,
1612                addresses,
1613                height_range,
1614            )
1615            .map(ReadResponse::AddressesTransactionIds),
1616
1617            // For the get_address_utxos RPC.
1618            ReadRequest::UtxosByAddresses(addresses) => read::address_utxos(
1619                &state.network,
1620                state.latest_best_chain(),
1621                &state.db,
1622                addresses,
1623            )
1624            .map(ReadResponse::AddressUtxos),
1625
1626            ReadRequest::CheckBestChainTipNullifiersAndAnchors(unmined_tx) => {
1627                let latest_non_finalized_best_chain = state.latest_best_chain();
1628
1629                check::nullifier::tx_no_duplicates_in_chain(
1630                    &state.db,
1631                    latest_non_finalized_best_chain.as_ref(),
1632                    &unmined_tx.transaction,
1633                )?;
1634
1635                check::anchors::tx_anchors_refer_to_final_treestates(
1636                    &state.db,
1637                    latest_non_finalized_best_chain.as_ref(),
1638                    &unmined_tx,
1639                )?;
1640
1641                Ok(ReadResponse::ValidBestChainTipNullifiersAndAnchors)
1642            }
1643
1644            // Used by the get_block and get_block_hash RPCs.
1645            ReadRequest::BestChainBlockHash(height) => Ok(ReadResponse::BlockHash(
1646                read::hash_by_height(state.latest_best_chain(), &state.db, height),
1647            )),
1648
1649            // Used by get_block_template and getblockchaininfo RPCs.
1650            ReadRequest::ChainInfo => {
1651                // # Correctness
1652                //
1653                // It is ok to do these lookups using multiple database calls. Finalized state updates
1654                // can only add overlapping blocks, and block hashes are unique across all chain forks.
1655                //
1656                // If there is a large overlap between the non-finalized and finalized states,
1657                // where the finalized tip is above the non-finalized tip,
1658                // Zebra is receiving a lot of blocks, or this request has been delayed for a long time.
1659                //
1660                // In that case, the `getblocktemplate` RPC will return an error because Zebra
1661                // is not synced to the tip. That check happens before the RPC makes this request.
1662                read::difficulty::get_block_template_chain_info(
1663                    &state.latest_non_finalized_state(),
1664                    &state.db,
1665                    &state.network,
1666                )
1667                .map(ReadResponse::ChainInfo)
1668            }
1669
1670            // Used by getmininginfo, getnetworksolps, and getnetworkhashps RPCs.
1671            ReadRequest::SolutionRate { num_blocks, height } => {
1672                let latest_non_finalized_state = state.latest_non_finalized_state();
1673                // # Correctness
1674                //
1675                // It is ok to do these lookups using multiple database calls. Finalized state updates
1676                // can only add overlapping blocks, and block hashes are unique across all chain forks.
1677                //
1678                // The worst that can happen here is that the default `start_hash` will be below
1679                // the chain tip.
1680                let (tip_height, tip_hash) =
1681                    match read::tip(latest_non_finalized_state.best_chain(), &state.db) {
1682                        Some(tip_hash) => tip_hash,
1683                        None => return Ok(ReadResponse::SolutionRate(None)),
1684                    };
1685
1686                let start_hash = match height {
1687                    Some(height) if height < tip_height => read::hash_by_height(
1688                        latest_non_finalized_state.best_chain(),
1689                        &state.db,
1690                        height,
1691                    ),
1692                    // use the chain tip hash if height is above it or not provided.
1693                    _ => Some(tip_hash),
1694                };
1695
1696                let solution_rate = start_hash.and_then(|start_hash| {
1697                    read::difficulty::solution_rate(
1698                        &latest_non_finalized_state,
1699                        &state.db,
1700                        num_blocks,
1701                        start_hash,
1702                    )
1703                });
1704
1705                Ok(ReadResponse::SolutionRate(solution_rate))
1706            }
1707
1708            ReadRequest::CheckBlockProposalValidity(semantically_verified) => {
1709                tracing::debug!(
1710                    "attempting to validate and commit block proposal \
1711                         onto a cloned non-finalized state"
1712                );
1713                let mut latest_non_finalized_state = state.latest_non_finalized_state();
1714
1715                // The previous block of a valid proposal must be on the best chain tip.
1716                let Some((_best_tip_height, best_tip_hash)) =
1717                    read::best_tip(&latest_non_finalized_state, &state.db)
1718                else {
1719                    return Err(
1720                        "state is empty: wait for Zebra to sync before submitting a proposal"
1721                            .into(),
1722                    );
1723                };
1724
1725                if semantically_verified.block.header.previous_block_hash != best_tip_hash {
1726                    return Err("proposal is not based on the current best chain tip: \
1727                                    previous block hash must be the best chain tip"
1728                        .into());
1729                }
1730
1731                // This clone of the non-finalized state is dropped when this closure returns.
1732                // The non-finalized state that's used in the rest of the state (including finalizing
1733                // blocks into the db) is not mutated here.
1734                //
1735                // TODO: Convert `CommitSemanticallyVerifiedError` to a new `ValidateProposalError`?
1736                latest_non_finalized_state.disable_metrics();
1737
1738                write::validate_and_commit_non_finalized(
1739                    &state.db,
1740                    &mut latest_non_finalized_state,
1741                    semantically_verified,
1742                )?;
1743
1744                Ok(ReadResponse::ValidBlockProposal)
1745            }
1746
1747            ReadRequest::TipBlockSize => {
1748                // Respond with the length of the obtained block if any.
1749                Ok(ReadResponse::TipBlockSize(
1750                    state
1751                        .best_tip()
1752                        .and_then(|(tip_height, _)| {
1753                            read::block_info(
1754                                state.latest_best_chain(),
1755                                &state.db,
1756                                tip_height.into(),
1757                            )
1758                        })
1759                        .map(|info| info.size().try_into().expect("u32 should fit in usize"))
1760                        .or_else(|| {
1761                            find::tip_block(state.latest_best_chain(), &state.db)
1762                                .map(|b| b.zcash_serialized_size())
1763                        }),
1764                ))
1765            }
1766
1767            ReadRequest::NonFinalizedBlocksListener => {
1768                unreachable!("should return early");
1769            }
1770
1771            // Used by `gettxout` RPC method.
1772            ReadRequest::IsTransparentOutputSpent(outpoint) => {
1773                let is_spent = read::unspent_utxo(state.latest_best_chain(), &state.db, outpoint);
1774                Ok(ReadResponse::IsTransparentOutputSpent(is_spent.is_none()))
1775            }
1776        };
1777
1778        timed_span.spawn_blocking(request_handler)
1779    }
1780}
1781
1782/// Initialize a state service from the provided [`Config`].
1783/// Returns a boxed state service, a read-only state service,
1784/// and receivers for state chain tip updates.
1785///
1786/// Each `network` has its own separate on-disk database.
1787///
1788/// The state uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
1789/// to work out when it is near the final checkpoint.
1790///
1791/// To share access to the state, wrap the returned service in a `Buffer`,
1792/// or clone the returned [`ReadStateService`].
1793///
1794/// It's possible to construct multiple state services in the same application (as
1795/// long as they, e.g., use different storage locations), but doing so is
1796/// probably not what you want.
1797pub async fn init(
1798    config: Config,
1799    network: &Network,
1800    max_checkpoint_height: block::Height,
1801    checkpoint_verify_concurrency_limit: usize,
1802) -> (
1803    BoxService<Request, Response, BoxError>,
1804    ReadStateService,
1805    LatestChainTip,
1806    ChainTipChange,
1807) {
1808    let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
1809        StateService::new(
1810            config,
1811            network,
1812            max_checkpoint_height,
1813            checkpoint_verify_concurrency_limit,
1814        )
1815        .await;
1816
1817    (
1818        BoxService::new(state_service),
1819        read_only_state_service,
1820        latest_chain_tip,
1821        chain_tip_change,
1822    )
1823}
1824
1825/// Initialize a read state service from the provided [`Config`].
1826/// Returns a read-only state service,
1827///
1828/// Each `network` has its own separate on-disk database.
1829///
1830/// To share access to the state, clone the returned [`ReadStateService`].
1831pub fn init_read_only(
1832    config: Config,
1833    network: &Network,
1834) -> (
1835    ReadStateService,
1836    ZebraDb,
1837    tokio::sync::watch::Sender<NonFinalizedState>,
1838) {
1839    let finalized_state = FinalizedState::new_with_debug(
1840        &config,
1841        network,
1842        true,
1843        #[cfg(feature = "elasticsearch")]
1844        false,
1845        true,
1846    );
1847    let (non_finalized_state_sender, non_finalized_state_receiver) =
1848        tokio::sync::watch::channel(NonFinalizedState::new(network));
1849
1850    (
1851        ReadStateService::new(
1852            &finalized_state,
1853            None,
1854            WatchReceiver::new(non_finalized_state_receiver),
1855        ),
1856        finalized_state.db.clone(),
1857        non_finalized_state_sender,
1858    )
1859}
1860
1861/// Calls [`init_read_only`] with the provided [`Config`] and [`Network`] from a blocking task.
1862/// Returns a [`tokio::task::JoinHandle`] with a read state service and chain tip sender.
1863pub fn spawn_init_read_only(
1864    config: Config,
1865    network: &Network,
1866) -> tokio::task::JoinHandle<(
1867    ReadStateService,
1868    ZebraDb,
1869    tokio::sync::watch::Sender<NonFinalizedState>,
1870)> {
1871    let network = network.clone();
1872    tokio::task::spawn_blocking(move || init_read_only(config, &network))
1873}
1874
1875/// Returns a [`StateService`] with an ephemeral [`Config`] and a buffer with a single slot.
1876///
1877/// This can be used to create a state service for testing. See also [`init`].
1878#[cfg(any(test, feature = "proptest-impl"))]
1879pub async fn init_test(
1880    network: &Network,
1881) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
1882    // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
1883    //       if we ever need to test final checkpoint sent UTXO queries
1884    let (state_service, _, _, _) =
1885        StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
1886
1887    Buffer::new(BoxService::new(state_service), 1)
1888}
1889
1890/// Initializes a state service with an ephemeral [`Config`] and a buffer with a single slot,
1891/// then returns the read-write service, read-only service, and tip watch channels.
1892///
1893/// This can be used to create a state service for testing. See also [`init`].
1894#[cfg(any(test, feature = "proptest-impl"))]
1895pub async fn init_test_services(
1896    network: &Network,
1897) -> (
1898    Buffer<BoxService<Request, Response, BoxError>, Request>,
1899    ReadStateService,
1900    LatestChainTip,
1901    ChainTipChange,
1902) {
1903    // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
1904    //       if we ever need to test final checkpoint sent UTXO queries
1905    let (state_service, read_state_service, latest_chain_tip, chain_tip_change) =
1906        StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
1907
1908    let state_service = Buffer::new(BoxService::new(state_service), 1);
1909
1910    (
1911        state_service,
1912        read_state_service,
1913        latest_chain_tip,
1914        chain_tip_change,
1915    )
1916}