Skip to main content

zebra_state/
service.rs

1//! [`tower::Service`]s for Zebra's cached chain state.
2//!
3//! Zebra provides cached state access via two main services:
4//! - [`StateService`]: a read-write service that writes blocks to the state,
5//!   and redirects most read requests to the [`ReadStateService`].
6//! - [`ReadStateService`]: a read-only service that answers from the most
7//!   recent committed block.
8//!
9//! Most users should prefer [`ReadStateService`], unless they need to write blocks to the state.
10//!
11//! Zebra also provides access to the best chain tip via:
12//! - [`LatestChainTip`]: a read-only channel that contains the latest committed
13//!   tip.
14//! - [`ChainTipChange`]: a read-only channel that can asynchronously await
15//!   chain tip changes.
16
17use std::{
18    collections::HashMap,
19    future::Future,
20    pin::Pin,
21    sync::Arc,
22    task::{Context, Poll},
23    time::{Duration, Instant},
24};
25
26use futures::future::FutureExt;
27use tokio::sync::oneshot;
28use tower::{util::BoxService, Service, ServiceExt};
29use tracing::{instrument, Instrument, Span};
30
31#[cfg(any(test, feature = "proptest-impl"))]
32use tower::buffer::Buffer;
33
34use zebra_chain::{
35    block::{self, CountedHeader, HeightDiff},
36    diagnostic::CodeTimer,
37    parameters::{Network, NetworkUpgrade},
38    serialization::ZcashSerialize,
39    subtree::NoteCommitmentSubtreeIndex,
40};
41
42use crate::{
43    constants::{
44        MAX_FIND_BLOCK_HASHES_RESULTS, MAX_FIND_BLOCK_HEADERS_RESULTS, MAX_LEGACY_CHAIN_BLOCKS,
45    },
46    error::{CommitBlockError, CommitCheckpointVerifiedError, InvalidateError, ReconsiderError},
47    request::TimedSpan,
48    response::NonFinalizedBlocksListener,
49    service::{
50        block_iter::any_ancestor_blocks,
51        chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip},
52        finalized_state::{FinalizedState, ZebraDb},
53        non_finalized_state::{Chain, NonFinalizedState},
54        pending_utxos::PendingUtxos,
55        queued_blocks::QueuedBlocks,
56        read::find,
57        watch_receiver::WatchReceiver,
58    },
59    BoxError, CheckpointVerifiedBlock, CommitSemanticallyVerifiedError, Config, KnownBlock,
60    ReadRequest, ReadResponse, Request, Response, SemanticallyVerifiedBlock,
61};
62
63pub mod block_iter;
64pub mod chain_tip;
65pub mod watch_receiver;
66
67pub mod check;
68
69pub(crate) mod finalized_state;
70pub(crate) mod non_finalized_state;
71mod pending_utxos;
72mod queued_blocks;
73pub(crate) mod read;
74mod traits;
75mod write;
76
77#[cfg(any(test, feature = "proptest-impl"))]
78pub mod arbitrary;
79
80#[cfg(test)]
81mod tests;
82
83pub use finalized_state::{OutputLocation, TransactionIndex, TransactionLocation};
84use write::NonFinalizedWriteMessage;
85
86use self::queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified, SentHashes};
87
88pub use self::traits::{ReadState, State};
89
90/// A read-write service for Zebra's cached blockchain state.
91///
92/// This service modifies and provides access to:
93/// - the non-finalized state: the most recent blocks, up to
94///   [`MAX_BLOCK_REORG_HEIGHT`](crate::MAX_BLOCK_REORG_HEIGHT) of them.
95///   Zebra allows chain forks in the non-finalized state,
96///   stores it in memory, and re-downloads it when restarted.
97/// - the finalized state: older blocks that have many confirmations.
98///   Zebra stores the single best chain in the finalized state,
99///   and re-loads it from disk when restarted.
100///
101/// Read requests to this service are buffered, then processed concurrently.
102/// Block write requests are buffered, then queued, then processed in order by a separate task.
103///
104/// Most state users can get faster read responses using the [`ReadStateService`],
105/// because its requests do not share a [`tower::buffer::Buffer`] with block write requests.
106///
107/// To quickly get the latest block, use [`LatestChainTip`] or [`ChainTipChange`].
108/// They can read the latest block directly, without queueing any requests.
109#[derive(Debug)]
110pub(crate) struct StateService {
111    // Configuration
112    //
113    /// The configured Zcash network.
114    network: Network,
115
116    /// The height that we start storing UTXOs from finalized blocks.
117    ///
118    /// This height should be lower than the last few checkpoints,
119    /// so the full verifier can verify UTXO spends from those blocks,
120    /// even if they haven't been committed to the finalized state yet.
121    full_verifier_utxo_lookahead: block::Height,
122
123    // Queued Blocks
124    //
125    /// Queued blocks for the [`NonFinalizedState`] that arrived out of order.
126    /// These blocks are awaiting their parent blocks before they can do contextual verification.
127    non_finalized_state_queued_blocks: QueuedBlocks,
128
129    /// Queued blocks for the [`FinalizedState`] that arrived out of order.
130    /// These blocks are awaiting their parent blocks before they can do contextual verification.
131    ///
132    /// Indexed by their parent block hash.
133    finalized_state_queued_blocks: HashMap<block::Hash, QueuedCheckpointVerified>,
134
135    /// Channels to send blocks to the block write task.
136    block_write_sender: write::BlockWriteSender,
137
138    /// The [`block::Hash`] of the most recent block sent on
139    /// `finalized_block_write_sender` or `non_finalized_block_write_sender`.
140    ///
141    /// On startup, this is:
142    /// - the finalized tip, if there are stored blocks, or
143    /// - the genesis block's parent hash, if the database is empty.
144    ///
145    /// If `invalid_block_write_reset_receiver` gets a reset, this is:
146    /// - the hash of the last valid committed block (the parent of the invalid block).
147    finalized_block_write_last_sent_hash: block::Hash,
148
149    /// A set of block hashes that have been sent to the block write task.
150    /// Hashes of blocks below the finalized tip height are periodically pruned.
151    non_finalized_block_write_sent_hashes: SentHashes,
152
153    /// If an invalid block is sent on `finalized_block_write_sender`
154    /// or `non_finalized_block_write_sender`,
155    /// this channel gets the [`block::Hash`] of the valid tip.
156    //
157    // TODO: add tests for finalized and non-finalized resets (#2654)
158    invalid_block_write_reset_receiver: tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
159
160    /// Receives the hash of every non-finalized block that the write task
161    /// rejected, so the corresponding entry can be removed from
162    /// `non_finalized_block_write_sent_hashes`.
163    ///
164    /// Without this, a rejected same-hash block locks out a later honest
165    /// re-delivery of a block at the same hash as a "duplicate" until restart
166    /// or reorg.
167    non_finalized_rejected_receiver: tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
168
169    // Pending UTXO Request Tracking
170    //
171    /// The set of outpoints with pending requests for their associated transparent::Output.
172    pending_utxos: PendingUtxos,
173
174    /// Instant tracking the last time `pending_utxos` was pruned.
175    last_prune: Instant,
176
177    // Updating Concurrently Readable State
178    //
179    /// A cloneable [`ReadStateService`], used to answer concurrent read requests.
180    ///
181    /// TODO: move users of read [`Request`]s to [`ReadStateService`], and remove `read_service`.
182    read_service: ReadStateService,
183
184    // Metrics
185    //
186    /// A metric tracking the maximum height that's currently in `finalized_state_queued_blocks`
187    ///
188    /// Set to `f64::NAN` if `finalized_state_queued_blocks` is empty, because grafana shows NaNs
189    /// as a break in the graph.
190    max_finalized_queue_height: f64,
191}
192
193/// A read-only service for accessing Zebra's cached blockchain state.
194///
195/// This service provides read-only access to:
196/// - the non-finalized state: the most recent blocks, up to
197///   [`MAX_BLOCK_REORG_HEIGHT`](crate::MAX_BLOCK_REORG_HEIGHT) of them.
198/// - the finalized state: older blocks that have many confirmations.
199///
200/// Requests to this service are processed in parallel,
201/// ignoring any blocks queued by the read-write [`StateService`].
202///
203/// This quick response behavior is better for most state users.
204/// It allows other async tasks to make progress while concurrently reading data from disk.
205#[derive(Clone, Debug)]
206pub struct ReadStateService {
207    // Configuration
208    //
209    /// The configured Zcash network.
210    network: Network,
211
212    // Shared Concurrently Readable State
213    //
214    /// A watch channel with a cached copy of the [`NonFinalizedState`].
215    ///
216    /// This state is only updated between requests,
217    /// so it might include some block data that is also on `disk`.
218    non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
219
220    /// The shared inner on-disk database for the finalized state.
221    ///
222    /// RocksDB allows reads and writes via a shared reference,
223    /// but [`ZebraDb`] doesn't expose any write methods or types.
224    ///
225    /// This chain is updated concurrently with requests,
226    /// so it might include some block data that is also in `best_mem`.
227    db: ZebraDb,
228
229    /// A shared handle to a task that writes blocks to the [`NonFinalizedState`] or [`FinalizedState`],
230    /// once the queues have received all their parent blocks.
231    ///
232    /// Used to check for panics when writing blocks.
233    block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
234}
235
236impl Drop for StateService {
237    fn drop(&mut self) {
238        // The state service owns the state, tasks, and channels,
239        // so dropping it should shut down everything.
240
241        // Close the channels (non-blocking)
242        // This makes the block write thread exit the next time it checks the channels.
243        // We want to do this here so we get any errors or panics from the block write task before it shuts down.
244        self.invalid_block_write_reset_receiver.close();
245        self.non_finalized_rejected_receiver.close();
246
247        std::mem::drop(self.block_write_sender.finalized.take());
248        std::mem::drop(self.block_write_sender.non_finalized.take());
249
250        self.clear_finalized_block_queue(CommitBlockError::WriteTaskExited);
251        self.clear_non_finalized_block_queue(CommitBlockError::WriteTaskExited);
252
253        // Log database metrics before shutting down
254        info!("dropping the state: logging database metrics");
255        self.log_db_metrics();
256
257        // Then drop self.read_service, which checks the block write task for panics,
258        // and tries to shut down the database.
259    }
260}
261
262impl Drop for ReadStateService {
263    fn drop(&mut self) {
264        // The read state service shares the state,
265        // so dropping it should check if we can shut down.
266
267        // TODO: move this into a try_shutdown() method
268        if let Some(block_write_task) = self.block_write_task.take() {
269            if let Some(block_write_task_handle) = Arc::into_inner(block_write_task) {
270                // We're the last database user, so we can tell it to shut down (blocking):
271                // - flushes the database to disk, and
272                // - drops the database, which cleans up any database tasks correctly.
273                self.db.shutdown(true);
274
275                // We are the last state with a reference to this thread, so we can
276                // wait until the block write task finishes, then check for panics (blocking).
277                // (We'd also like to abort the thread, but std::thread::JoinHandle can't do that.)
278
279                // This log is verbose during tests.
280                #[cfg(not(test))]
281                info!("waiting for the block write task to finish");
282                #[cfg(test)]
283                debug!("waiting for the block write task to finish");
284
285                // TODO: move this into a check_for_panics() method
286                if let Err(thread_panic) = block_write_task_handle.join() {
287                    std::panic::resume_unwind(thread_panic);
288                } else {
289                    debug!("shutting down the state because the block write task has finished");
290                }
291            }
292        } else {
293            // Even if we're not the last database user, try shutting it down.
294            //
295            // TODO: rename this to try_shutdown()?
296            self.db.shutdown(false);
297        }
298    }
299}
300
301impl StateService {
302    const PRUNE_INTERVAL: Duration = Duration::from_secs(30);
303
304    /// Creates a new state service for the state `config` and `network`.
305    ///
306    /// Uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
307    /// to work out when it is near the final checkpoint.
308    ///
309    /// Returns the read-write and read-only state services,
310    /// and read-only watch channels for its best chain tip.
311    pub async fn new(
312        config: Config,
313        network: &Network,
314        max_checkpoint_height: block::Height,
315        checkpoint_verify_concurrency_limit: usize,
316    ) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
317        let (finalized_state, finalized_tip, timer) = {
318            let config = config.clone();
319            let network = network.clone();
320            tokio::task::spawn_blocking(move || {
321                let timer = CodeTimer::start();
322                let finalized_state = FinalizedState::new(
323                    &config,
324                    &network,
325                    #[cfg(feature = "elasticsearch")]
326                    true,
327                );
328                timer.finish_desc("opening finalized state database");
329
330                let timer = CodeTimer::start();
331                let finalized_tip = finalized_state.db.tip_block();
332
333                (finalized_state, finalized_tip, timer)
334            })
335            .await
336            .expect("failed to join blocking task")
337        };
338
339        // # Correctness
340        //
341        // The state service must set the finalized block write sender to `None`
342        // if there are blocks in the restored non-finalized state that are above
343        // the max checkpoint height so that non-finalized blocks can be written, otherwise,
344        // Zebra will be unable to commit semantically verified blocks, and its chain sync will stall.
345        //
346        // The state service must not set the finalized block write sender to `None` if there
347        // aren't blocks in the restored non-finalized state that are above the max checkpoint height,
348        // otherwise, unless checkpoint sync is disabled in the zebra-consensus configuration,
349        // Zebra will be unable to commit checkpoint verified blocks, and its chain sync will stall.
350        let is_finalized_tip_past_max_checkpoint = if let Some(tip) = &finalized_tip {
351            tip.coinbase_height().expect("valid block must have height") >= max_checkpoint_height
352        } else {
353            false
354        };
355        let backup_dir_path = config.non_finalized_state_backup_dir(network);
356        let skip_backup_task = config.debug_skip_non_finalized_state_backup_task;
357        let (non_finalized_state, non_finalized_state_sender, non_finalized_state_receiver) =
358            NonFinalizedState::new(network)
359                .with_backup(
360                    backup_dir_path.clone(),
361                    &finalized_state.db,
362                    is_finalized_tip_past_max_checkpoint,
363                    config.debug_skip_non_finalized_state_backup_task,
364                )
365                .await;
366
367        let non_finalized_block_write_sent_hashes = SentHashes::new(&non_finalized_state);
368        let initial_tip = non_finalized_state
369            .best_tip_block()
370            .map(|cv_block| cv_block.block.clone())
371            .or(finalized_tip)
372            .map(CheckpointVerifiedBlock::from)
373            .map(ChainTipBlock::from);
374
375        tracing::info!(chain_tip = ?initial_tip.as_ref().map(|tip| (tip.hash, tip.height)), "loaded Zebra state cache");
376
377        let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
378            ChainTipSender::new(initial_tip, network);
379
380        let finalized_state_for_writing = finalized_state.clone();
381        let should_use_finalized_block_write_sender = non_finalized_state.is_chain_set_empty();
382        let sync_backup_dir_path = backup_dir_path.filter(|_| skip_backup_task);
383        let (
384            block_write_sender,
385            invalid_block_write_reset_receiver,
386            non_finalized_rejected_receiver,
387            block_write_task,
388        ) = write::BlockWriteSender::spawn(
389            finalized_state_for_writing,
390            non_finalized_state,
391            chain_tip_sender,
392            non_finalized_state_sender,
393            should_use_finalized_block_write_sender,
394            sync_backup_dir_path,
395        );
396
397        let read_service = ReadStateService::new(
398            &finalized_state,
399            block_write_task,
400            non_finalized_state_receiver,
401        );
402
403        let full_verifier_utxo_lookahead = max_checkpoint_height
404            - HeightDiff::try_from(checkpoint_verify_concurrency_limit)
405                .expect("fits in HeightDiff");
406        let full_verifier_utxo_lookahead =
407            full_verifier_utxo_lookahead.unwrap_or(block::Height::MIN);
408        let non_finalized_state_queued_blocks = QueuedBlocks::default();
409        let pending_utxos = PendingUtxos::default();
410
411        let finalized_block_write_last_sent_hash =
412            tokio::task::spawn_blocking(move || finalized_state.db.finalized_tip_hash())
413                .await
414                .expect("failed to join blocking task");
415
416        let state = Self {
417            network: network.clone(),
418            full_verifier_utxo_lookahead,
419            non_finalized_state_queued_blocks,
420            finalized_state_queued_blocks: HashMap::new(),
421            block_write_sender,
422            finalized_block_write_last_sent_hash,
423            non_finalized_block_write_sent_hashes,
424            invalid_block_write_reset_receiver,
425            non_finalized_rejected_receiver,
426            pending_utxos,
427            last_prune: Instant::now(),
428            read_service: read_service.clone(),
429            max_finalized_queue_height: f64::NAN,
430        };
431        timer.finish_desc("initializing state service");
432
433        tracing::info!("starting legacy chain check");
434        let timer = CodeTimer::start();
435
436        if let (Some(tip), Some(nu5_activation_height)) = (
437            {
438                let read_state = state.read_service.clone();
439                tokio::task::spawn_blocking(move || read_state.best_tip())
440                    .await
441                    .expect("task should not panic")
442            },
443            NetworkUpgrade::Nu5.activation_height(network),
444        ) {
445            if let Err(error) = check::legacy_chain(
446                nu5_activation_height,
447                any_ancestor_blocks(
448                    &state.read_service.latest_non_finalized_state(),
449                    &state.read_service.db,
450                    tip.1,
451                ),
452                &state.network,
453                MAX_LEGACY_CHAIN_BLOCKS,
454            ) {
455                let legacy_db_path = state.read_service.db.path().to_path_buf();
456                panic!(
457                    "Cached state contains a legacy chain.\n\
458                     An outdated Zebra version did not know about a recent network upgrade,\n\
459                     so it followed a legacy chain using outdated consensus branch rules.\n\
460                     Hint: Delete your database, and restart Zebra to do a full sync.\n\
461                     Database path: {legacy_db_path:?}\n\
462                     Error: {error:?}",
463                );
464            }
465        }
466
467        tracing::info!("cached state consensus branch is valid: no legacy chain found");
468        timer.finish_desc("legacy chain check");
469
470        // Spawn a background task to periodically export RocksDB metrics to Prometheus
471        let db_for_metrics = read_service.db.clone();
472        tokio::spawn(async move {
473            let mut interval = tokio::time::interval(Duration::from_secs(30));
474            loop {
475                interval.tick().await;
476                db_for_metrics.export_metrics();
477            }
478        });
479
480        (state, read_service, latest_chain_tip, chain_tip_change)
481    }
482
483    /// Call read only state service to log rocksdb database metrics.
484    pub fn log_db_metrics(&self) {
485        self.read_service.db.print_db_metrics();
486    }
487
488    /// Queue a checkpoint verified block for verification and storage in the finalized state.
489    ///
490    /// Returns a channel receiver that provides the result of the block commit.
491    fn queue_and_commit_to_finalized_state(
492        &mut self,
493        checkpoint_verified: CheckpointVerifiedBlock,
494    ) -> oneshot::Receiver<Result<block::Hash, CommitCheckpointVerifiedError>> {
495        // # Correctness & Performance
496        //
497        // This method must not block, access the database, or perform CPU-intensive tasks,
498        // because it is called directly from the tokio executor's Future threads.
499
500        let queued_prev_hash = checkpoint_verified.block.header.previous_block_hash;
501        let queued_height = checkpoint_verified.height;
502
503        // If we're close to the final checkpoint, make the block's UTXOs available for
504        // semantic block verification, even when it is in the channel.
505        if self.is_close_to_final_checkpoint(queued_height) {
506            self.non_finalized_block_write_sent_hashes
507                .add_finalized(&checkpoint_verified)
508        }
509
510        let (rsp_tx, rsp_rx) = oneshot::channel();
511        let queued = (checkpoint_verified, rsp_tx);
512
513        if self.block_write_sender.finalized.is_some() {
514            // We're still committing checkpoint verified blocks
515            if let Some(duplicate_queued) = self
516                .finalized_state_queued_blocks
517                .insert(queued_prev_hash, queued)
518            {
519                Self::send_checkpoint_verified_block_error(
520                    duplicate_queued,
521                    CommitBlockError::new_duplicate(
522                        Some(queued_prev_hash.into()),
523                        KnownBlock::Queue,
524                    ),
525                );
526            }
527
528            self.drain_finalized_queue_and_commit();
529        } else {
530            // We've finished committing checkpoint verified blocks to the finalized state,
531            // so drop any repeated queued blocks, and return an error.
532            //
533            // TODO: track the latest sent height, and drop any blocks under that height
534            //       every time we send some blocks (like QueuedSemanticallyVerifiedBlocks)
535            Self::send_checkpoint_verified_block_error(
536                queued,
537                CommitBlockError::new_duplicate(None, KnownBlock::Finalized),
538            );
539
540            self.clear_finalized_block_queue(CommitBlockError::new_duplicate(
541                None,
542                KnownBlock::Finalized,
543            ));
544        }
545
546        if self.finalized_state_queued_blocks.is_empty() {
547            self.max_finalized_queue_height = f64::NAN;
548        } else if self.max_finalized_queue_height.is_nan()
549            || self.max_finalized_queue_height < queued_height.0 as f64
550        {
551            // if there are still blocks in the queue, then either:
552            //   - the new block was lower than the old maximum, and there was a gap before it,
553            //     so the maximum is still the same (and we skip this code), or
554            //   - the new block is higher than the old maximum, and there is at least one gap
555            //     between the finalized tip and the new maximum
556            self.max_finalized_queue_height = queued_height.0 as f64;
557        }
558
559        metrics::gauge!("state.checkpoint.queued.max.height").set(self.max_finalized_queue_height);
560        metrics::gauge!("state.checkpoint.queued.block.count")
561            .set(self.finalized_state_queued_blocks.len() as f64);
562
563        rsp_rx
564    }
565
566    /// Finds finalized state queue blocks to be committed to the state in order,
567    /// removes them from the queue, and sends them to the block commit task.
568    ///
569    /// After queueing a finalized block, this method checks whether the newly
570    /// queued block (and any of its descendants) can be committed to the state.
571    ///
572    /// Returns an error if the block commit channel has been closed.
573    pub fn drain_finalized_queue_and_commit(&mut self) {
574        use tokio::sync::mpsc::error::{SendError, TryRecvError};
575
576        // # Correctness & Performance
577        //
578        // This method must not block, access the database, or perform CPU-intensive tasks,
579        // because it is called directly from the tokio executor's Future threads.
580
581        // If a block failed, we need to start again from a valid tip.
582        match self.invalid_block_write_reset_receiver.try_recv() {
583            Ok(reset_tip_hash) => self.finalized_block_write_last_sent_hash = reset_tip_hash,
584            Err(TryRecvError::Disconnected) => {
585                info!("Block commit task closed the block reset channel. Is Zebra shutting down?");
586                return;
587            }
588            // There are no errors, so we can just use the last block hash we sent
589            Err(TryRecvError::Empty) => {}
590        }
591
592        while let Some(queued_block) = self
593            .finalized_state_queued_blocks
594            .remove(&self.finalized_block_write_last_sent_hash)
595        {
596            let last_sent_finalized_block_height = queued_block.0.height;
597
598            self.finalized_block_write_last_sent_hash = queued_block.0.hash;
599
600            // If we've finished sending finalized blocks, ignore any repeated blocks.
601            // (Blocks can be repeated after a syncer reset.)
602            if let Some(finalized_block_write_sender) = &self.block_write_sender.finalized {
603                let send_result = finalized_block_write_sender.send(queued_block);
604
605                // If the receiver is closed, we can't send any more blocks.
606                if let Err(SendError(queued)) = send_result {
607                    // If Zebra is shutting down, drop blocks and return an error.
608                    Self::send_checkpoint_verified_block_error(
609                        queued,
610                        CommitBlockError::WriteTaskExited,
611                    );
612
613                    self.clear_finalized_block_queue(CommitBlockError::WriteTaskExited);
614                } else {
615                    metrics::gauge!("state.checkpoint.sent.block.height")
616                        .set(last_sent_finalized_block_height.0 as f64);
617                };
618            }
619        }
620    }
621
622    /// Drains every hash queued on `non_finalized_rejected_receiver` and
623    /// removes it from `non_finalized_block_write_sent_hashes`.
624    ///
625    /// This closes the lockout window where a rejected block keeps its hash
626    /// recorded as "sent", so a subsequent honest re-delivery of a block at
627    /// the same hash is not short-circuited as a false "duplicate".
628    ///
629    /// # Correctness & Performance
630    ///
631    /// Like the other drain methods on `StateService`, this must not block,
632    /// access the database, or perform CPU-intensive work, because it is
633    /// called directly from the tokio executor's Future threads.
634    fn drain_non_finalized_rejected_hashes(&mut self) {
635        use tokio::sync::mpsc::error::TryRecvError;
636
637        loop {
638            match self.non_finalized_rejected_receiver.try_recv() {
639                Ok(hash) => {
640                    self.non_finalized_block_write_sent_hashes.remove(&hash);
641                }
642                Err(TryRecvError::Empty) => break,
643                Err(TryRecvError::Disconnected) => {
644                    info!(
645                        "Block commit task closed the non-finalized rejected hash channel. \
646                         Is Zebra shutting down?"
647                    );
648                    break;
649                }
650            }
651        }
652    }
653
654    /// Drops all finalized state queue blocks, and sends an error on their result channels.
655    fn clear_finalized_block_queue(
656        &mut self,
657        error: impl Into<CommitCheckpointVerifiedError> + Clone,
658    ) {
659        for (_hash, queued) in self.finalized_state_queued_blocks.drain() {
660            Self::send_checkpoint_verified_block_error(queued, error.clone());
661        }
662    }
663
664    /// Send an error on a `QueuedCheckpointVerified` block's result channel, and drop the block
665    fn send_checkpoint_verified_block_error(
666        queued: QueuedCheckpointVerified,
667        error: impl Into<CommitCheckpointVerifiedError>,
668    ) {
669        let (finalized, rsp_tx) = queued;
670
671        // The block sender might have already given up on this block,
672        // so ignore any channel send errors.
673        let _ = rsp_tx.send(Err(error.into()));
674        std::mem::drop(finalized);
675    }
676
677    /// Drops all non-finalized state queue blocks, and sends an error on their result channels.
678    fn clear_non_finalized_block_queue(
679        &mut self,
680        error: impl Into<CommitSemanticallyVerifiedError> + Clone,
681    ) {
682        for (_hash, queued) in self.non_finalized_state_queued_blocks.drain() {
683            Self::send_semantically_verified_block_error(queued, error.clone());
684        }
685    }
686
687    /// Send an error on a `QueuedSemanticallyVerified` block's result channel, and drop the block
688    fn send_semantically_verified_block_error(
689        queued: QueuedSemanticallyVerified,
690        error: impl Into<CommitSemanticallyVerifiedError>,
691    ) {
692        let (finalized, rsp_tx) = queued;
693
694        // The block sender might have already given up on this block,
695        // so ignore any channel send errors.
696        let _ = rsp_tx.send(Err(error.into()));
697        std::mem::drop(finalized);
698    }
699
700    /// Queue a semantically verified block for contextual verification and check if any queued
701    /// blocks are ready to be verified and committed to the state.
702    ///
703    /// This function encodes the logic for [committing non-finalized blocks][1]
704    /// in RFC0005.
705    ///
706    /// [1]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html#committing-non-finalized-blocks
707    #[instrument(level = "debug", skip(self, semantically_verified))]
708    fn queue_and_commit_to_non_finalized_state(
709        &mut self,
710        semantically_verified: SemanticallyVerifiedBlock,
711    ) -> oneshot::Receiver<Result<block::Hash, CommitSemanticallyVerifiedError>> {
712        tracing::debug!(block = %semantically_verified.block, "queueing block for contextual verification");
713        let parent_hash = semantically_verified.block.header.previous_block_hash;
714
715        // Drop hashes of any blocks the write task has rejected before checking
716        // the SentHashes membership below. Without this, a rejected same-hash
717        // block would lock out a later honest re-delivery of a block at the
718        // same hash as a false "duplicate".
719        self.drain_non_finalized_rejected_hashes();
720
721        if self
722            .non_finalized_block_write_sent_hashes
723            .contains(&semantically_verified.hash)
724        {
725            let (rsp_tx, rsp_rx) = oneshot::channel();
726            let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
727                Some(semantically_verified.hash.into()),
728                KnownBlock::WriteChannel,
729            )
730            .into()));
731            return rsp_rx;
732        }
733
734        if self
735            .read_service
736            .db
737            .contains_height(semantically_verified.height)
738        {
739            let (rsp_tx, rsp_rx) = oneshot::channel();
740            let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
741                Some(semantically_verified.height.into()),
742                KnownBlock::Finalized,
743            )
744            .into()));
745            return rsp_rx;
746        }
747
748        // [`Request::CommitSemanticallyVerifiedBlock`] contract: a request to commit a block which
749        // has been queued but not yet committed to the state fails the older request and replaces
750        // it with the newer request.
751        let rsp_rx = if let Some((_, old_rsp_tx)) = self
752            .non_finalized_state_queued_blocks
753            .get_mut(&semantically_verified.hash)
754        {
755            tracing::debug!("replacing older queued request with new request");
756            let (mut rsp_tx, rsp_rx) = oneshot::channel();
757            std::mem::swap(old_rsp_tx, &mut rsp_tx);
758            let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
759                Some(semantically_verified.hash.into()),
760                KnownBlock::Queue,
761            )
762            .into()));
763            rsp_rx
764        } else {
765            let (rsp_tx, rsp_rx) = oneshot::channel();
766            self.non_finalized_state_queued_blocks
767                .queue((semantically_verified, rsp_tx));
768            rsp_rx
769        };
770
771        // We've finished sending checkpoint verified blocks when:
772        // - we've sent the verified block for the last checkpoint, and
773        // - it has been successfully written to disk.
774        //
775        // We detect the last checkpoint by looking for non-finalized blocks
776        // that are a child of the last block we sent.
777        //
778        // TODO: configure the state with the last checkpoint hash instead?
779        if self.block_write_sender.finalized.is_some()
780            && self
781                .non_finalized_state_queued_blocks
782                .has_queued_children(self.finalized_block_write_last_sent_hash)
783            && self.read_service.db.finalized_tip_hash()
784                == self.finalized_block_write_last_sent_hash
785        {
786            // Tell the block write task to stop committing checkpoint verified blocks to the finalized state,
787            // and move on to committing semantically verified blocks to the non-finalized state.
788            std::mem::drop(self.block_write_sender.finalized.take());
789            // Remove any checkpoint-verified block hashes from `non_finalized_block_write_sent_hashes`.
790            self.non_finalized_block_write_sent_hashes = SentHashes::default();
791            // Mark `SentHashes` as usable by the `can_fork_chain_at()` method.
792            self.non_finalized_block_write_sent_hashes
793                .can_fork_chain_at_hashes = true;
794            // Send blocks from non-finalized queue
795            self.send_ready_non_finalized_queued(self.finalized_block_write_last_sent_hash);
796            // We've finished committing checkpoint verified blocks to finalized state, so drop any repeated queued blocks.
797            self.clear_finalized_block_queue(CommitBlockError::new_duplicate(
798                None,
799                KnownBlock::Finalized,
800            ));
801        } else if !self.can_fork_chain_at(&parent_hash) {
802            tracing::trace!("unready to verify, returning early");
803        } else if self.block_write_sender.finalized.is_none() {
804            // Wait until block commit task is ready to write non-finalized blocks before dequeuing them
805            self.send_ready_non_finalized_queued(parent_hash);
806
807            let finalized_tip_height = self.read_service.db.finalized_tip_height().expect(
808                "Finalized state must have at least one block before committing non-finalized state",
809            );
810
811            self.non_finalized_state_queued_blocks
812                .prune_by_height(finalized_tip_height);
813
814            self.non_finalized_block_write_sent_hashes
815                .prune_by_height(finalized_tip_height);
816        }
817
818        rsp_rx
819    }
820
821    /// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks.
822    fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
823        self.non_finalized_block_write_sent_hashes
824            .can_fork_chain_at(hash)
825            || &self.read_service.db.finalized_tip_hash() == hash
826    }
827
828    /// Returns `true` if `queued_height` is near the final checkpoint.
829    ///
830    /// The semantic block verifier needs access to UTXOs from checkpoint verified blocks
831    /// near the final checkpoint, so that it can verify blocks that spend those UTXOs.
832    ///
833    /// If it doesn't have the required UTXOs, some blocks will time out,
834    /// but succeed after a syncer restart.
835    fn is_close_to_final_checkpoint(&self, queued_height: block::Height) -> bool {
836        queued_height >= self.full_verifier_utxo_lookahead
837    }
838
839    /// Sends all queued blocks whose parents have recently arrived starting from `new_parent`
840    /// in breadth-first ordering to the block write task which will attempt to validate and commit them
841    #[tracing::instrument(level = "debug", skip(self, new_parent))]
842    fn send_ready_non_finalized_queued(&mut self, new_parent: block::Hash) {
843        use tokio::sync::mpsc::error::SendError;
844        if let Some(non_finalized_block_write_sender) = &self.block_write_sender.non_finalized {
845            let mut new_parents: Vec<block::Hash> = vec![new_parent];
846
847            while let Some(parent_hash) = new_parents.pop() {
848                let queued_children = self
849                    .non_finalized_state_queued_blocks
850                    .dequeue_children(parent_hash);
851
852                for queued_child in queued_children {
853                    let (SemanticallyVerifiedBlock { hash, .. }, _) = queued_child;
854
855                    self.non_finalized_block_write_sent_hashes
856                        .add(&queued_child.0);
857                    let send_result = non_finalized_block_write_sender.send(queued_child.into());
858
859                    if let Err(SendError(NonFinalizedWriteMessage::Commit(queued))) = send_result {
860                        // If Zebra is shutting down, drop blocks and return an error.
861                        Self::send_semantically_verified_block_error(
862                            queued,
863                            CommitBlockError::WriteTaskExited,
864                        );
865
866                        self.clear_non_finalized_block_queue(CommitBlockError::WriteTaskExited);
867
868                        return;
869                    };
870
871                    new_parents.push(hash);
872                }
873            }
874
875            self.non_finalized_block_write_sent_hashes.finish_batch();
876        };
877    }
878
879    /// Return the tip of the current best chain.
880    pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
881        self.read_service.best_tip()
882    }
883
884    fn send_invalidate_block(
885        &self,
886        hash: block::Hash,
887    ) -> oneshot::Receiver<Result<block::Hash, InvalidateError>> {
888        let (rsp_tx, rsp_rx) = oneshot::channel();
889
890        let Some(sender) = &self.block_write_sender.non_finalized else {
891            let _ = rsp_tx.send(Err(InvalidateError::ProcessingCheckpointedBlocks));
892            return rsp_rx;
893        };
894
895        if let Err(tokio::sync::mpsc::error::SendError(error)) =
896            sender.send(NonFinalizedWriteMessage::Invalidate { hash, rsp_tx })
897        {
898            let NonFinalizedWriteMessage::Invalidate { rsp_tx, .. } = error else {
899                unreachable!("should return the same Invalidate message could not be sent");
900            };
901
902            let _ = rsp_tx.send(Err(InvalidateError::SendInvalidateRequestFailed));
903        }
904
905        rsp_rx
906    }
907
908    fn send_reconsider_block(
909        &self,
910        hash: block::Hash,
911    ) -> oneshot::Receiver<Result<Vec<block::Hash>, ReconsiderError>> {
912        let (rsp_tx, rsp_rx) = oneshot::channel();
913
914        let Some(sender) = &self.block_write_sender.non_finalized else {
915            let _ = rsp_tx.send(Err(ReconsiderError::CheckpointCommitInProgress));
916            return rsp_rx;
917        };
918
919        if let Err(tokio::sync::mpsc::error::SendError(error)) =
920            sender.send(NonFinalizedWriteMessage::Reconsider { hash, rsp_tx })
921        {
922            let NonFinalizedWriteMessage::Reconsider { rsp_tx, .. } = error else {
923                unreachable!("should return the same Reconsider message could not be sent");
924            };
925
926            let _ = rsp_tx.send(Err(ReconsiderError::ReconsiderSendFailed));
927        }
928
929        rsp_rx
930    }
931
932    /// Assert some assumptions about the semantically verified `block` before it is queued.
933    fn assert_block_can_be_validated(&self, block: &SemanticallyVerifiedBlock) {
934        // required by `Request::CommitSemanticallyVerifiedBlock` call
935        assert!(
936            block.height > self.network.mandatory_checkpoint_height(),
937            "invalid semantically verified block height: the canopy checkpoint is mandatory, pre-canopy \
938            blocks, and the canopy activation block, must be committed to the state as finalized \
939            blocks"
940        );
941    }
942
943    fn known_sent_hash(&self, hash: &block::Hash) -> Option<KnownBlock> {
944        self.non_finalized_block_write_sent_hashes
945            .contains(hash)
946            .then_some(KnownBlock::WriteChannel)
947    }
948}
949
950impl ReadStateService {
951    /// Creates a new read-only state service, using the provided finalized state and
952    /// block write task handle.
953    ///
954    /// Returns the newly created service,
955    /// and a watch channel for updating the shared recent non-finalized chain.
956    pub(crate) fn new(
957        finalized_state: &FinalizedState,
958        block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
959        non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
960    ) -> Self {
961        let read_service = Self {
962            network: finalized_state.network(),
963            db: finalized_state.db.clone(),
964            non_finalized_state_receiver,
965            block_write_task,
966        };
967
968        tracing::debug!("created new read-only state service");
969
970        read_service
971    }
972
973    /// Return the tip of the current best chain.
974    pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
975        read::best_tip(&self.latest_non_finalized_state(), &self.db)
976    }
977
978    /// Gets a clone of the latest non-finalized state from the `non_finalized_state_receiver`
979    fn latest_non_finalized_state(&self) -> NonFinalizedState {
980        self.non_finalized_state_receiver.cloned_watch_data()
981    }
982
983    /// Gets a clone of the latest, best non-finalized chain from the `non_finalized_state_receiver`
984    fn latest_best_chain(&self) -> Option<Arc<Chain>> {
985        self.non_finalized_state_receiver
986            .borrow_mapped(|non_finalized_state| non_finalized_state.best_chain().cloned())
987    }
988
989    /// Test-only access to the inner database.
990    /// Can be used to modify the database without doing any consensus checks.
991    #[cfg(any(test, feature = "proptest-impl"))]
992    pub fn db(&self) -> &ZebraDb {
993        &self.db
994    }
995
996    /// Logs rocksdb metrics using the read only state service.
997    pub fn log_db_metrics(&self) {
998        self.db.print_db_metrics();
999    }
1000}
1001
1002impl Service<Request> for StateService {
1003    type Response = Response;
1004    type Error = BoxError;
1005    type Future =
1006        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1007
1008    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1009        // Check for panics in the block write task
1010        let poll = self.read_service.poll_ready(cx);
1011
1012        // Prune outdated UTXO requests
1013        let now = Instant::now();
1014
1015        if self.last_prune + Self::PRUNE_INTERVAL < now {
1016            let tip = self.best_tip();
1017            let old_len = self.pending_utxos.len();
1018
1019            self.pending_utxos.prune();
1020            self.last_prune = now;
1021
1022            let new_len = self.pending_utxos.len();
1023            let prune_count = old_len
1024                .checked_sub(new_len)
1025                .expect("prune does not add any utxo requests");
1026            if prune_count > 0 {
1027                tracing::debug!(
1028                    ?old_len,
1029                    ?new_len,
1030                    ?prune_count,
1031                    ?tip,
1032                    "pruned utxo requests"
1033                );
1034            } else {
1035                tracing::debug!(len = ?old_len, ?tip, "no utxo requests needed pruning");
1036            }
1037        }
1038
1039        poll
1040    }
1041
1042    #[instrument(name = "state", skip(self, req))]
1043    fn call(&mut self, req: Request) -> Self::Future {
1044        req.count_metric();
1045        let span = Span::current();
1046
1047        match req {
1048            // Uses non_finalized_state_queued_blocks and pending_utxos in the StateService
1049            // Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb.
1050            //
1051            // The expected error type for this request is `CommitSemanticallyVerifiedError`.
1052            Request::CommitSemanticallyVerifiedBlock(semantically_verified) => {
1053                let timer = CodeTimer::start();
1054                self.assert_block_can_be_validated(&semantically_verified);
1055
1056                self.pending_utxos
1057                    .check_against_ordered(&semantically_verified.new_outputs);
1058
1059                // # Performance
1060                //
1061                // Allow other async tasks to make progress while blocks are being verified
1062                // and written to disk. But wait for the blocks to finish committing,
1063                // so that `StateService` multi-block queries always observe a consistent state.
1064                //
1065                // Since each block is spawned into its own task,
1066                // there shouldn't be any other code running in the same task,
1067                // so we don't need to worry about blocking it:
1068                // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
1069
1070                let rsp_rx = tokio::task::block_in_place(move || {
1071                    span.in_scope(|| {
1072                        self.queue_and_commit_to_non_finalized_state(semantically_verified)
1073                    })
1074                });
1075
1076                // TODO:
1077                //   - check for panics in the block write task here,
1078                //     as well as in poll_ready()
1079
1080                // The work is all done, the future just waits on a channel for the result
1081                timer.finish_desc("CommitSemanticallyVerifiedBlock");
1082
1083                // Await the channel response, flatten the result, map receive errors to
1084                // `CommitSemanticallyVerifiedError::WriteTaskExited`.
1085                // Then flatten the nested Result and convert any errors to a BoxError.
1086                let span = Span::current();
1087                async move {
1088                    rsp_rx
1089                        .await
1090                        .map_err(|_recv_error| CommitBlockError::WriteTaskExited.into())
1091                        .and_then(|result| result)
1092                        .map_err(BoxError::from)
1093                        .map(Response::Committed)
1094                }
1095                .instrument(span)
1096                .boxed()
1097            }
1098
1099            // Uses finalized_state_queued_blocks and pending_utxos in the StateService.
1100            // Accesses shared writeable state in the StateService.
1101            //
1102            // The expected error type for this request is `CommitCheckpointVerifiedError`.
1103            Request::CommitCheckpointVerifiedBlock(finalized) => {
1104                let timer = CodeTimer::start();
1105                // # Consensus
1106                //
1107                // A semantic block verification could have called AwaitUtxo
1108                // before this checkpoint verified block arrived in the state.
1109                // So we need to check for pending UTXO requests sent by running
1110                // semantic block verifications.
1111                //
1112                // This check is redundant for most checkpoint verified blocks,
1113                // because semantic verification can only succeed near the final
1114                // checkpoint, when all the UTXOs are available for the verifying block.
1115                //
1116                // (Checkpoint block UTXOs are verified using block hash checkpoints
1117                // and transaction merkle tree block header commitments.)
1118                self.pending_utxos
1119                    .check_against_ordered(&finalized.new_outputs);
1120
1121                // # Performance
1122                //
1123                // This method doesn't block, access the database, or perform CPU-intensive tasks,
1124                // so we can run it directly in the tokio executor's Future threads.
1125                let rsp_rx = self.queue_and_commit_to_finalized_state(finalized);
1126
1127                // TODO:
1128                //   - check for panics in the block write task here,
1129                //     as well as in poll_ready()
1130
1131                // The work is all done, the future just waits on a channel for the result
1132                timer.finish_desc("CommitCheckpointVerifiedBlock");
1133
1134                // Await the channel response, flatten the result, map receive errors to
1135                // `CommitCheckpointVerifiedError::WriteTaskExited`.
1136                // Then flatten the nested Result and convert any errors to a BoxError.
1137                async move {
1138                    rsp_rx
1139                        .await
1140                        .map_err(|_recv_error| CommitBlockError::WriteTaskExited.into())
1141                        .and_then(|result| result)
1142                        .map_err(BoxError::from)
1143                        .map(Response::Committed)
1144                }
1145                .instrument(span)
1146                .boxed()
1147            }
1148
1149            // Uses pending_utxos and non_finalized_state_queued_blocks in the StateService.
1150            // If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService.
1151            Request::AwaitUtxo(outpoint) => {
1152                let timer = CodeTimer::start();
1153                // Prepare the AwaitUtxo future from PendingUxtos.
1154                let response_fut = self.pending_utxos.queue(outpoint);
1155                // Only instrument `response_fut`, the ReadStateService already
1156                // instruments its requests with the same span.
1157
1158                let response_fut = response_fut.instrument(span).boxed();
1159
1160                // Check the non-finalized block queue outside the returned future,
1161                // so we can access mutable state fields.
1162                if let Some(utxo) = self.non_finalized_state_queued_blocks.utxo(&outpoint) {
1163                    self.pending_utxos.respond(&outpoint, utxo);
1164
1165                    // We're finished, the returned future gets the UTXO from the respond() channel.
1166                    timer.finish_desc("AwaitUtxo/queued-non-finalized");
1167
1168                    return response_fut;
1169                }
1170
1171                // Check the sent non-finalized blocks
1172                if let Some(utxo) = self.non_finalized_block_write_sent_hashes.utxo(&outpoint) {
1173                    self.pending_utxos.respond(&outpoint, utxo);
1174
1175                    // We're finished, the returned future gets the UTXO from the respond() channel.
1176                    timer.finish_desc("AwaitUtxo/sent-non-finalized");
1177
1178                    return response_fut;
1179                }
1180
1181                // We ignore any UTXOs in FinalizedState.finalized_state_queued_blocks,
1182                // because it is only used during checkpoint verification.
1183                //
1184                // This creates a rare race condition, but it doesn't seem to happen much in practice.
1185                // See #5126 for details.
1186
1187                // Manually send a request to the ReadStateService,
1188                // to get UTXOs from any non-finalized chain or the finalized chain.
1189                let read_service = self.read_service.clone();
1190
1191                // Run the request in an async block, so we can await the response.
1192                async move {
1193                    let req = ReadRequest::AnyChainUtxo(outpoint);
1194
1195                    let rsp = read_service.oneshot(req).await?;
1196
1197                    // Optional TODO:
1198                    //  - make pending_utxos.respond() async using a channel,
1199                    //    so we can respond to all waiting requests here
1200                    //
1201                    // This change is not required for correctness, because:
1202                    // - any waiting requests should have returned when the block was sent to the state
1203                    // - otherwise, the request returns immediately if:
1204                    //   - the block is in the non-finalized queue, or
1205                    //   - the block is in any non-finalized chain or the finalized state
1206                    //
1207                    // And if the block is in the finalized queue,
1208                    // that's rare enough that a retry is ok.
1209                    if let ReadResponse::AnyChainUtxo(Some(utxo)) = rsp {
1210                        // We got a UTXO, so we replace the response future with the result own.
1211                        timer.finish_desc("AwaitUtxo/any-chain");
1212
1213                        return Ok(Response::Utxo(utxo));
1214                    }
1215
1216                    // We're finished, but the returned future is waiting on the respond() channel.
1217                    timer.finish_desc("AwaitUtxo/waiting");
1218
1219                    response_fut.await
1220                }
1221                .boxed()
1222            }
1223
1224            // Used by sync, inbound, and block verifier to check if a block is already in the state
1225            // before downloading or validating it.
1226            Request::KnownBlock(hash) => {
1227                let timer = CodeTimer::start();
1228                let sent_hash_response = self.known_sent_hash(&hash);
1229                let read_service = self.read_service.clone();
1230
1231                async move {
1232                    if sent_hash_response.is_some() {
1233                        return Ok(Response::KnownBlock(sent_hash_response));
1234                    };
1235
1236                    let response = read::non_finalized_state_contains_block_hash(
1237                        &read_service.latest_non_finalized_state(),
1238                        hash,
1239                    )
1240                    // TODO: Move this to a blocking task, perhaps by moving some of this logic to the ReadStateService.
1241                    .or_else(|| read::finalized_state_contains_block_hash(&read_service.db, hash));
1242
1243                    timer.finish_desc("Request::KnownBlock");
1244
1245                    Ok(Response::KnownBlock(response))
1246                }
1247                .boxed()
1248            }
1249
1250            // The expected error type for this request is `InvalidateError`
1251            Request::InvalidateBlock(block_hash) => {
1252                let rsp_rx = tokio::task::block_in_place(move || {
1253                    span.in_scope(|| self.send_invalidate_block(block_hash))
1254                });
1255
1256                // Await the channel response, flatten the result, map receive errors to
1257                // `InvalidateError::InvalidateRequestDropped`.
1258                // Then flatten the nested Result and convert any errors to a BoxError.
1259                let span = Span::current();
1260                async move {
1261                    rsp_rx
1262                        .await
1263                        .map_err(|_recv_error| InvalidateError::InvalidateRequestDropped)
1264                        .and_then(|result| result)
1265                        .map_err(BoxError::from)
1266                        .map(Response::Invalidated)
1267                }
1268                .instrument(span)
1269                .boxed()
1270            }
1271
1272            // The expected error type for this request is `ReconsiderError`
1273            Request::ReconsiderBlock(block_hash) => {
1274                let rsp_rx = tokio::task::block_in_place(move || {
1275                    span.in_scope(|| self.send_reconsider_block(block_hash))
1276                });
1277
1278                // Await the channel response, flatten the result, map receive errors to
1279                // `ReconsiderError::ReconsiderResponseDropped`.
1280                // Then flatten the nested Result and convert any errors to a BoxError.
1281                let span = Span::current();
1282                async move {
1283                    rsp_rx
1284                        .await
1285                        .map_err(|_recv_error| ReconsiderError::ReconsiderResponseDropped)
1286                        .and_then(|result| result)
1287                        .map_err(BoxError::from)
1288                        .map(Response::Reconsidered)
1289                }
1290                .instrument(span)
1291                .boxed()
1292            }
1293
1294            // Runs concurrently using the ReadStateService
1295            Request::Tip
1296            | Request::Depth(_)
1297            | Request::BestChainNextMedianTimePast
1298            | Request::BestChainBlockHash(_)
1299            | Request::BlockLocator
1300            | Request::Transaction(_)
1301            | Request::AnyChainTransaction(_)
1302            | Request::UnspentBestChainUtxo(_)
1303            | Request::Block(_)
1304            | Request::AnyChainBlock(_)
1305            | Request::BlockAndSize(_)
1306            | Request::BlockHeader(_)
1307            | Request::FindBlockHashes { .. }
1308            | Request::FindBlockHeaders { .. }
1309            | Request::CheckBestChainTipNullifiersAndAnchors(_)
1310            | Request::CheckBlockProposalValidity(_) => {
1311                // Redirect the request to the concurrent ReadStateService
1312                let read_service = self.read_service.clone();
1313
1314                async move {
1315                    let req = req
1316                        .try_into()
1317                        .expect("ReadRequest conversion should not fail");
1318
1319                    let rsp = read_service.oneshot(req).await?;
1320                    let rsp = rsp.try_into().expect("Response conversion should not fail");
1321
1322                    Ok(rsp)
1323                }
1324                .boxed()
1325            }
1326        }
1327    }
1328}
1329
1330impl Service<ReadRequest> for ReadStateService {
1331    type Response = ReadResponse;
1332    type Error = BoxError;
1333    type Future =
1334        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1335
1336    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1337        // Check for panics in the block write task
1338        //
1339        // TODO: move into a check_for_panics() method
1340        let block_write_task = self.block_write_task.take();
1341
1342        if let Some(block_write_task) = block_write_task {
1343            if block_write_task.is_finished() {
1344                if let Some(block_write_task) = Arc::into_inner(block_write_task) {
1345                    // We are the last state with a reference to this task, so we can propagate any panics
1346                    if let Err(thread_panic) = block_write_task.join() {
1347                        std::panic::resume_unwind(thread_panic);
1348                    }
1349                }
1350            } else {
1351                // It hasn't finished, so we need to put it back
1352                self.block_write_task = Some(block_write_task);
1353            }
1354        }
1355
1356        self.db.check_for_panics();
1357
1358        Poll::Ready(Ok(()))
1359    }
1360
1361    #[instrument(name = "read_state", skip(self, req))]
1362    fn call(&mut self, req: ReadRequest) -> Self::Future {
1363        req.count_metric();
1364        let timer = CodeTimer::start_desc(req.variant_name());
1365        let span = Span::current();
1366        let timed_span = TimedSpan::new(timer, span);
1367        let state = self.clone();
1368
1369        if req == ReadRequest::NonFinalizedBlocksListener {
1370            // The non-finalized blocks listener is used to notify the state service
1371            // about new blocks that have been added to the non-finalized state.
1372            let non_finalized_blocks_listener = NonFinalizedBlocksListener::spawn(
1373                self.network.clone(),
1374                self.non_finalized_state_receiver.clone(),
1375            );
1376
1377            return async move {
1378                Ok(ReadResponse::NonFinalizedBlocksListener(
1379                    non_finalized_blocks_listener,
1380                ))
1381            }
1382            .boxed();
1383        };
1384
1385        let request_handler = move || match req {
1386            // Used by the `getblockchaininfo` RPC.
1387            ReadRequest::UsageInfo => Ok(ReadResponse::UsageInfo(state.db.size())),
1388
1389            // Used by the StateService.
1390            ReadRequest::Tip => Ok(ReadResponse::Tip(read::tip(
1391                state.latest_best_chain(),
1392                &state.db,
1393            ))),
1394
1395            // Used by `getblockchaininfo` RPC method.
1396            ReadRequest::TipPoolValues => {
1397                let (tip_height, tip_hash, value_balance) =
1398                    read::tip_with_value_balance(state.latest_best_chain(), &state.db)?
1399                        .ok_or(BoxError::from("no chain tip available yet"))?;
1400
1401                Ok(ReadResponse::TipPoolValues {
1402                    tip_height,
1403                    tip_hash,
1404                    value_balance,
1405                })
1406            }
1407
1408            // Used by getblock
1409            ReadRequest::BlockInfo(hash_or_height) => Ok(ReadResponse::BlockInfo(
1410                read::block_info(state.latest_best_chain(), &state.db, hash_or_height),
1411            )),
1412
1413            // Used by the StateService.
1414            ReadRequest::Depth(hash) => Ok(ReadResponse::Depth(read::depth(
1415                state.latest_best_chain(),
1416                &state.db,
1417                hash,
1418            ))),
1419
1420            // Used by the StateService.
1421            ReadRequest::BestChainNextMedianTimePast => {
1422                Ok(ReadResponse::BestChainNextMedianTimePast(
1423                    read::next_median_time_past(&state.latest_non_finalized_state(), &state.db)?,
1424                ))
1425            }
1426
1427            // Used by the get_block (raw) RPC and the StateService.
1428            ReadRequest::Block(hash_or_height) => Ok(ReadResponse::Block(read::block(
1429                state.latest_best_chain(),
1430                &state.db,
1431                hash_or_height,
1432            ))),
1433
1434            ReadRequest::AnyChainBlock(hash_or_height) => Ok(ReadResponse::Block(read::any_block(
1435                state.latest_non_finalized_state().chain_iter(),
1436                &state.db,
1437                hash_or_height,
1438            ))),
1439
1440            // Used by the get_block (raw) RPC and the StateService.
1441            ReadRequest::BlockAndSize(hash_or_height) => Ok(ReadResponse::BlockAndSize(
1442                read::block_and_size(state.latest_best_chain(), &state.db, hash_or_height),
1443            )),
1444
1445            // Used by the get_block (verbose) RPC and the StateService.
1446            ReadRequest::BlockHeader(hash_or_height) => {
1447                let best_chain = state.latest_best_chain();
1448
1449                let height = hash_or_height
1450                    .height_or_else(|hash| {
1451                        read::find::height_by_hash(best_chain.clone(), &state.db, hash)
1452                    })
1453                    .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1454
1455                let hash = hash_or_height
1456                    .hash_or_else(|height| {
1457                        read::find::hash_by_height(best_chain.clone(), &state.db, height)
1458                    })
1459                    .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1460
1461                let next_height = height.next()?;
1462                let next_block_hash =
1463                    read::find::hash_by_height(best_chain.clone(), &state.db, next_height);
1464
1465                let header = read::block_header(best_chain, &state.db, height.into())
1466                    .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1467
1468                Ok(ReadResponse::BlockHeader {
1469                    header,
1470                    hash,
1471                    height,
1472                    next_block_hash,
1473                })
1474            }
1475
1476            // For the get_raw_transaction RPC and the StateService.
1477            ReadRequest::Transaction(hash) => Ok(ReadResponse::Transaction(
1478                read::mined_transaction(state.latest_best_chain(), &state.db, hash),
1479            )),
1480
1481            ReadRequest::AnyChainTransaction(hash) => {
1482                Ok(ReadResponse::AnyChainTransaction(read::any_transaction(
1483                    state.latest_non_finalized_state().chain_iter(),
1484                    &state.db,
1485                    hash,
1486                )))
1487            }
1488
1489            // Used by the getblock (verbose) RPC.
1490            ReadRequest::TransactionIdsForBlock(hash_or_height) => Ok(
1491                ReadResponse::TransactionIdsForBlock(read::transaction_hashes_for_block(
1492                    state.latest_best_chain(),
1493                    &state.db,
1494                    hash_or_height,
1495                )),
1496            ),
1497
1498            ReadRequest::AnyChainTransactionIdsForBlock(hash_or_height) => {
1499                Ok(ReadResponse::AnyChainTransactionIdsForBlock(
1500                    read::transaction_hashes_for_any_block(
1501                        state.latest_non_finalized_state().chain_iter(),
1502                        &state.db,
1503                        hash_or_height,
1504                    ),
1505                ))
1506            }
1507
1508            #[cfg(feature = "indexer")]
1509            ReadRequest::SpendingTransactionId(spend) => Ok(ReadResponse::TransactionId(
1510                read::spending_transaction_hash(state.latest_best_chain(), &state.db, spend),
1511            )),
1512
1513            ReadRequest::UnspentBestChainUtxo(outpoint) => Ok(ReadResponse::UnspentBestChainUtxo(
1514                read::unspent_utxo(state.latest_best_chain(), &state.db, outpoint),
1515            )),
1516
1517            // Manually used by the StateService to implement part of AwaitUtxo.
1518            ReadRequest::AnyChainUtxo(outpoint) => Ok(ReadResponse::AnyChainUtxo(read::any_utxo(
1519                state.latest_non_finalized_state(),
1520                &state.db,
1521                outpoint,
1522            ))),
1523
1524            // Used by the StateService.
1525            ReadRequest::BlockLocator => Ok(ReadResponse::BlockLocator(
1526                read::block_locator(state.latest_best_chain(), &state.db).unwrap_or_default(),
1527            )),
1528
1529            // Used by the StateService.
1530            ReadRequest::FindBlockHashes { known_blocks, stop } => {
1531                Ok(ReadResponse::BlockHashes(read::find_chain_hashes(
1532                    state.latest_best_chain(),
1533                    &state.db,
1534                    known_blocks,
1535                    stop,
1536                    MAX_FIND_BLOCK_HASHES_RESULTS,
1537                )))
1538            }
1539
1540            // Used by the StateService.
1541            ReadRequest::FindBlockHeaders { known_blocks, stop } => Ok(ReadResponse::BlockHeaders(
1542                read::find_chain_headers(
1543                    state.latest_best_chain(),
1544                    &state.db,
1545                    known_blocks,
1546                    stop,
1547                    MAX_FIND_BLOCK_HEADERS_RESULTS,
1548                )
1549                .into_iter()
1550                .map(|header| CountedHeader { header })
1551                .collect(),
1552            )),
1553
1554            ReadRequest::SaplingTree(hash_or_height) => Ok(ReadResponse::SaplingTree(
1555                read::sapling_tree(state.latest_best_chain(), &state.db, hash_or_height),
1556            )),
1557
1558            ReadRequest::OrchardTree(hash_or_height) => Ok(ReadResponse::OrchardTree(
1559                read::orchard_tree(state.latest_best_chain(), &state.db, hash_or_height),
1560            )),
1561
1562            ReadRequest::SaplingSubtrees { start_index, limit } => {
1563                let end_index = limit
1564                    .and_then(|limit| start_index.0.checked_add(limit.0))
1565                    .map(NoteCommitmentSubtreeIndex);
1566
1567                let best_chain = state.latest_best_chain();
1568                let sapling_subtrees = if let Some(end_index) = end_index {
1569                    read::sapling_subtrees(best_chain, &state.db, start_index..end_index)
1570                } else {
1571                    // If there is no end bound, just return all the trees.
1572                    // If the end bound would overflow, just returns all the trees, because that's what
1573                    // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1574                    // the trees run out.)
1575                    read::sapling_subtrees(best_chain, &state.db, start_index..)
1576                };
1577
1578                Ok(ReadResponse::SaplingSubtrees(sapling_subtrees))
1579            }
1580
1581            ReadRequest::OrchardSubtrees { start_index, limit } => {
1582                let end_index = limit
1583                    .and_then(|limit| start_index.0.checked_add(limit.0))
1584                    .map(NoteCommitmentSubtreeIndex);
1585
1586                let best_chain = state.latest_best_chain();
1587                let orchard_subtrees = if let Some(end_index) = end_index {
1588                    read::orchard_subtrees(best_chain, &state.db, start_index..end_index)
1589                } else {
1590                    // If there is no end bound, just return all the trees.
1591                    // If the end bound would overflow, just returns all the trees, because that's what
1592                    // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1593                    // the trees run out.)
1594                    read::orchard_subtrees(best_chain, &state.db, start_index..)
1595                };
1596
1597                Ok(ReadResponse::OrchardSubtrees(orchard_subtrees))
1598            }
1599
1600            // For the get_address_balance RPC.
1601            ReadRequest::AddressBalance(addresses) => {
1602                let (balance, received) =
1603                    read::transparent_balance(state.latest_best_chain(), &state.db, addresses)?;
1604                Ok(ReadResponse::AddressBalance { balance, received })
1605            }
1606
1607            // For the get_address_tx_ids RPC.
1608            ReadRequest::TransactionIdsByAddresses {
1609                addresses,
1610                height_range,
1611            } => read::transparent_tx_ids(
1612                state.latest_best_chain(),
1613                &state.db,
1614                addresses,
1615                height_range,
1616            )
1617            .map(ReadResponse::AddressesTransactionIds),
1618
1619            // For the get_address_utxos RPC.
1620            ReadRequest::UtxosByAddresses(addresses) => read::address_utxos(
1621                &state.network,
1622                state.latest_best_chain(),
1623                &state.db,
1624                addresses,
1625            )
1626            .map(ReadResponse::AddressUtxos),
1627
1628            ReadRequest::CheckBestChainTipNullifiersAndAnchors(unmined_tx) => {
1629                let latest_non_finalized_best_chain = state.latest_best_chain();
1630
1631                check::nullifier::tx_no_duplicates_in_chain(
1632                    &state.db,
1633                    latest_non_finalized_best_chain.as_ref(),
1634                    &unmined_tx.transaction,
1635                )?;
1636
1637                check::anchors::tx_anchors_refer_to_final_treestates(
1638                    &state.db,
1639                    latest_non_finalized_best_chain.as_ref(),
1640                    &unmined_tx,
1641                )?;
1642
1643                Ok(ReadResponse::ValidBestChainTipNullifiersAndAnchors)
1644            }
1645
1646            // Used by the get_block and get_block_hash RPCs.
1647            ReadRequest::BestChainBlockHash(height) => Ok(ReadResponse::BlockHash(
1648                read::hash_by_height(state.latest_best_chain(), &state.db, height),
1649            )),
1650
1651            // Used by get_block_template and getblockchaininfo RPCs.
1652            ReadRequest::ChainInfo => {
1653                // # Correctness
1654                //
1655                // It is ok to do these lookups using multiple database calls. Finalized state updates
1656                // can only add overlapping blocks, and block hashes are unique across all chain forks.
1657                //
1658                // If there is a large overlap between the non-finalized and finalized states,
1659                // where the finalized tip is above the non-finalized tip,
1660                // Zebra is receiving a lot of blocks, or this request has been delayed for a long time.
1661                //
1662                // In that case, the `getblocktemplate` RPC will return an error because Zebra
1663                // is not synced to the tip. That check happens before the RPC makes this request.
1664                read::difficulty::get_block_template_chain_info(
1665                    &state.latest_non_finalized_state(),
1666                    &state.db,
1667                    &state.network,
1668                )
1669                .map(ReadResponse::ChainInfo)
1670            }
1671
1672            // Used by getmininginfo, getnetworksolps, and getnetworkhashps RPCs.
1673            ReadRequest::SolutionRate { num_blocks, height } => {
1674                let latest_non_finalized_state = state.latest_non_finalized_state();
1675                // # Correctness
1676                //
1677                // It is ok to do these lookups using multiple database calls. Finalized state updates
1678                // can only add overlapping blocks, and block hashes are unique across all chain forks.
1679                //
1680                // The worst that can happen here is that the default `start_hash` will be below
1681                // the chain tip.
1682                let (tip_height, tip_hash) =
1683                    match read::tip(latest_non_finalized_state.best_chain(), &state.db) {
1684                        Some(tip_hash) => tip_hash,
1685                        None => return Ok(ReadResponse::SolutionRate(None)),
1686                    };
1687
1688                let start_hash = match height {
1689                    Some(height) if height < tip_height => read::hash_by_height(
1690                        latest_non_finalized_state.best_chain(),
1691                        &state.db,
1692                        height,
1693                    ),
1694                    // use the chain tip hash if height is above it or not provided.
1695                    _ => Some(tip_hash),
1696                };
1697
1698                let solution_rate = start_hash.and_then(|start_hash| {
1699                    read::difficulty::solution_rate(
1700                        &latest_non_finalized_state,
1701                        &state.db,
1702                        num_blocks,
1703                        start_hash,
1704                    )
1705                });
1706
1707                Ok(ReadResponse::SolutionRate(solution_rate))
1708            }
1709
1710            ReadRequest::CheckBlockProposalValidity(semantically_verified) => {
1711                tracing::debug!(
1712                    "attempting to validate and commit block proposal \
1713                         onto a cloned non-finalized state"
1714                );
1715                let mut latest_non_finalized_state = state.latest_non_finalized_state();
1716
1717                // The previous block of a valid proposal must be on the best chain tip.
1718                let Some((_best_tip_height, best_tip_hash)) =
1719                    read::best_tip(&latest_non_finalized_state, &state.db)
1720                else {
1721                    return Err(
1722                        "state is empty: wait for Zebra to sync before submitting a proposal"
1723                            .into(),
1724                    );
1725                };
1726
1727                if semantically_verified.block.header.previous_block_hash != best_tip_hash {
1728                    return Err("proposal is not based on the current best chain tip: \
1729                                    previous block hash must be the best chain tip"
1730                        .into());
1731                }
1732
1733                // This clone of the non-finalized state is dropped when this closure returns.
1734                // The non-finalized state that's used in the rest of the state (including finalizing
1735                // blocks into the db) is not mutated here.
1736                //
1737                // TODO: Convert `CommitSemanticallyVerifiedError` to a new `ValidateProposalError`?
1738                latest_non_finalized_state.disable_metrics();
1739
1740                write::validate_and_commit_non_finalized(
1741                    &state.db,
1742                    &mut latest_non_finalized_state,
1743                    semantically_verified,
1744                )?;
1745
1746                Ok(ReadResponse::ValidBlockProposal)
1747            }
1748
1749            ReadRequest::TipBlockSize => {
1750                // Respond with the length of the obtained block if any.
1751                Ok(ReadResponse::TipBlockSize(
1752                    state
1753                        .best_tip()
1754                        .and_then(|(tip_height, _)| {
1755                            read::block_info(
1756                                state.latest_best_chain(),
1757                                &state.db,
1758                                tip_height.into(),
1759                            )
1760                        })
1761                        .map(|info| info.size().try_into().expect("u32 should fit in usize"))
1762                        .or_else(|| {
1763                            find::tip_block(state.latest_best_chain(), &state.db)
1764                                .map(|b| b.zcash_serialized_size())
1765                        }),
1766                ))
1767            }
1768
1769            ReadRequest::NonFinalizedBlocksListener => {
1770                unreachable!("should return early");
1771            }
1772
1773            // Used by `gettxout` RPC method.
1774            ReadRequest::IsTransparentOutputSpent(outpoint) => {
1775                let is_spent = read::unspent_utxo(state.latest_best_chain(), &state.db, outpoint);
1776                Ok(ReadResponse::IsTransparentOutputSpent(is_spent.is_none()))
1777            }
1778        };
1779
1780        timed_span.spawn_blocking(request_handler)
1781    }
1782}
1783
1784/// Initialize a state service from the provided [`Config`].
1785/// Returns a boxed state service, a read-only state service,
1786/// and receivers for state chain tip updates.
1787///
1788/// Each `network` has its own separate on-disk database.
1789///
1790/// The state uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
1791/// to work out when it is near the final checkpoint.
1792///
1793/// To share access to the state, wrap the returned service in a `Buffer`,
1794/// or clone the returned [`ReadStateService`].
1795///
1796/// It's possible to construct multiple state services in the same application (as
1797/// long as they, e.g., use different storage locations), but doing so is
1798/// probably not what you want.
1799pub async fn init(
1800    config: Config,
1801    network: &Network,
1802    max_checkpoint_height: block::Height,
1803    checkpoint_verify_concurrency_limit: usize,
1804) -> (
1805    BoxService<Request, Response, BoxError>,
1806    ReadStateService,
1807    LatestChainTip,
1808    ChainTipChange,
1809) {
1810    let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
1811        StateService::new(
1812            config,
1813            network,
1814            max_checkpoint_height,
1815            checkpoint_verify_concurrency_limit,
1816        )
1817        .await;
1818
1819    (
1820        BoxService::new(state_service),
1821        read_only_state_service,
1822        latest_chain_tip,
1823        chain_tip_change,
1824    )
1825}
1826
1827/// Initialize a read state service from the provided [`Config`].
1828/// Returns a read-only state service,
1829///
1830/// Each `network` has its own separate on-disk database.
1831///
1832/// To share access to the state, clone the returned [`ReadStateService`].
1833pub fn init_read_only(
1834    config: Config,
1835    network: &Network,
1836) -> (
1837    ReadStateService,
1838    ZebraDb,
1839    tokio::sync::watch::Sender<NonFinalizedState>,
1840) {
1841    let finalized_state = FinalizedState::new_with_debug(
1842        &config,
1843        network,
1844        true,
1845        #[cfg(feature = "elasticsearch")]
1846        false,
1847        true,
1848    );
1849    let (non_finalized_state_sender, non_finalized_state_receiver) =
1850        tokio::sync::watch::channel(NonFinalizedState::new(network));
1851
1852    (
1853        ReadStateService::new(
1854            &finalized_state,
1855            None,
1856            WatchReceiver::new(non_finalized_state_receiver),
1857        ),
1858        finalized_state.db.clone(),
1859        non_finalized_state_sender,
1860    )
1861}
1862
1863/// Calls [`init_read_only`] with the provided [`Config`] and [`Network`] from a blocking task.
1864/// Returns a [`tokio::task::JoinHandle`] with a read state service and chain tip sender.
1865pub fn spawn_init_read_only(
1866    config: Config,
1867    network: &Network,
1868) -> tokio::task::JoinHandle<(
1869    ReadStateService,
1870    ZebraDb,
1871    tokio::sync::watch::Sender<NonFinalizedState>,
1872)> {
1873    let network = network.clone();
1874    tokio::task::spawn_blocking(move || init_read_only(config, &network))
1875}
1876
1877/// Returns a [`StateService`] with an ephemeral [`Config`] and a buffer with a single slot.
1878///
1879/// This can be used to create a state service for testing. See also [`init`].
1880#[cfg(any(test, feature = "proptest-impl"))]
1881pub async fn init_test(
1882    network: &Network,
1883) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
1884    // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
1885    //       if we ever need to test final checkpoint sent UTXO queries
1886    let (state_service, _, _, _) =
1887        StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
1888
1889    Buffer::new(BoxService::new(state_service), 1)
1890}
1891
1892/// Initializes a state service with an ephemeral [`Config`] and a buffer with a single slot,
1893/// then returns the read-write service, read-only service, and tip watch channels.
1894///
1895/// This can be used to create a state service for testing. See also [`init`].
1896#[cfg(any(test, feature = "proptest-impl"))]
1897pub async fn init_test_services(
1898    network: &Network,
1899) -> (
1900    Buffer<BoxService<Request, Response, BoxError>, Request>,
1901    ReadStateService,
1902    LatestChainTip,
1903    ChainTipChange,
1904) {
1905    // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
1906    //       if we ever need to test final checkpoint sent UTXO queries
1907    let (state_service, read_state_service, latest_chain_tip, chain_tip_change) =
1908        StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
1909
1910    let state_service = Buffer::new(BoxService::new(state_service), 1);
1911
1912    (
1913        state_service,
1914        read_state_service,
1915        latest_chain_tip,
1916        chain_tip_change,
1917    )
1918}