zebra_state/service.rs
1//! [`tower::Service`]s for Zebra's cached chain state.
2//!
3//! Zebra provides cached state access via two main services:
4//! - [`StateService`]: a read-write service that writes blocks to the state,
5//! and redirects most read requests to the [`ReadStateService`].
6//! - [`ReadStateService`]: a read-only service that answers from the most
7//! recent committed block.
8//!
9//! Most users should prefer [`ReadStateService`], unless they need to write blocks to the state.
10//!
11//! Zebra also provides access to the best chain tip via:
12//! - [`LatestChainTip`]: a read-only channel that contains the latest committed
13//! tip.
14//! - [`ChainTipChange`]: a read-only channel that can asynchronously await
15//! chain tip changes.
16
17use std::{
18 collections::HashMap,
19 future::Future,
20 pin::Pin,
21 sync::Arc,
22 task::{Context, Poll},
23 time::{Duration, Instant},
24};
25
26use futures::future::FutureExt;
27use tokio::sync::oneshot;
28use tower::{util::BoxService, Service, ServiceExt};
29use tracing::{instrument, Instrument, Span};
30
31#[cfg(any(test, feature = "proptest-impl"))]
32use tower::buffer::Buffer;
33
34use zebra_chain::{
35 block::{self, CountedHeader, HeightDiff},
36 diagnostic::CodeTimer,
37 parameters::{Network, NetworkUpgrade},
38 serialization::ZcashSerialize,
39 subtree::NoteCommitmentSubtreeIndex,
40};
41
42use crate::{
43 constants::{
44 MAX_FIND_BLOCK_HASHES_RESULTS, MAX_FIND_BLOCK_HEADERS_RESULTS, MAX_LEGACY_CHAIN_BLOCKS,
45 },
46 error::{CommitBlockError, CommitCheckpointVerifiedError, InvalidateError, ReconsiderError},
47 request::TimedSpan,
48 response::NonFinalizedBlocksListener,
49 service::{
50 block_iter::any_ancestor_blocks,
51 chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip},
52 finalized_state::{FinalizedState, ZebraDb},
53 non_finalized_state::{Chain, NonFinalizedState},
54 pending_utxos::PendingUtxos,
55 queued_blocks::QueuedBlocks,
56 read::find,
57 watch_receiver::WatchReceiver,
58 },
59 BoxError, CheckpointVerifiedBlock, CommitSemanticallyVerifiedError, Config, KnownBlock,
60 ReadRequest, ReadResponse, Request, Response, SemanticallyVerifiedBlock,
61};
62
63pub mod block_iter;
64pub mod chain_tip;
65pub mod watch_receiver;
66
67pub mod check;
68
69pub(crate) mod finalized_state;
70pub(crate) mod non_finalized_state;
71mod pending_utxos;
72mod queued_blocks;
73pub(crate) mod read;
74mod traits;
75mod write;
76
77#[cfg(any(test, feature = "proptest-impl"))]
78pub mod arbitrary;
79
80#[cfg(test)]
81mod tests;
82
83pub use finalized_state::{OutputLocation, TransactionIndex, TransactionLocation};
84use write::NonFinalizedWriteMessage;
85
86use self::queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified, SentHashes};
87
88pub use self::traits::{ReadState, State};
89
90/// A read-write service for Zebra's cached blockchain state.
91///
92/// This service modifies and provides access to:
93/// - the non-finalized state: the most recent blocks, up to
94/// [`MAX_BLOCK_REORG_HEIGHT`](crate::MAX_BLOCK_REORG_HEIGHT) of them.
95/// Zebra allows chain forks in the non-finalized state,
96/// stores it in memory, and re-downloads it when restarted.
97/// - the finalized state: older blocks that have many confirmations.
98/// Zebra stores the single best chain in the finalized state,
99/// and re-loads it from disk when restarted.
100///
101/// Read requests to this service are buffered, then processed concurrently.
102/// Block write requests are buffered, then queued, then processed in order by a separate task.
103///
104/// Most state users can get faster read responses using the [`ReadStateService`],
105/// because its requests do not share a [`tower::buffer::Buffer`] with block write requests.
106///
107/// To quickly get the latest block, use [`LatestChainTip`] or [`ChainTipChange`].
108/// They can read the latest block directly, without queueing any requests.
109#[derive(Debug)]
110pub(crate) struct StateService {
111 // Configuration
112 //
113 /// The configured Zcash network.
114 network: Network,
115
116 /// The height that we start storing UTXOs from finalized blocks.
117 ///
118 /// This height should be lower than the last few checkpoints,
119 /// so the full verifier can verify UTXO spends from those blocks,
120 /// even if they haven't been committed to the finalized state yet.
121 full_verifier_utxo_lookahead: block::Height,
122
123 // Queued Blocks
124 //
125 /// Queued blocks for the [`NonFinalizedState`] that arrived out of order.
126 /// These blocks are awaiting their parent blocks before they can do contextual verification.
127 non_finalized_state_queued_blocks: QueuedBlocks,
128
129 /// Queued blocks for the [`FinalizedState`] that arrived out of order.
130 /// These blocks are awaiting their parent blocks before they can do contextual verification.
131 ///
132 /// Indexed by their parent block hash.
133 finalized_state_queued_blocks: HashMap<block::Hash, QueuedCheckpointVerified>,
134
135 /// Channels to send blocks to the block write task.
136 block_write_sender: write::BlockWriteSender,
137
138 /// The [`block::Hash`] of the most recent block sent on
139 /// `finalized_block_write_sender` or `non_finalized_block_write_sender`.
140 ///
141 /// On startup, this is:
142 /// - the finalized tip, if there are stored blocks, or
143 /// - the genesis block's parent hash, if the database is empty.
144 ///
145 /// If `invalid_block_write_reset_receiver` gets a reset, this is:
146 /// - the hash of the last valid committed block (the parent of the invalid block).
147 finalized_block_write_last_sent_hash: block::Hash,
148
149 /// A set of block hashes that have been sent to the block write task.
150 /// Hashes of blocks below the finalized tip height are periodically pruned.
151 non_finalized_block_write_sent_hashes: SentHashes,
152
153 /// If an invalid block is sent on `finalized_block_write_sender`
154 /// or `non_finalized_block_write_sender`,
155 /// this channel gets the [`block::Hash`] of the valid tip.
156 //
157 // TODO: add tests for finalized and non-finalized resets (#2654)
158 invalid_block_write_reset_receiver: tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
159
160 /// Receives the hash of every non-finalized block that the write task
161 /// rejected, so the corresponding entry can be removed from
162 /// `non_finalized_block_write_sent_hashes`.
163 ///
164 /// Without this, a rejected same-hash block locks out a later honest
165 /// re-delivery of a block at the same hash as a "duplicate" until restart
166 /// or reorg.
167 non_finalized_rejected_receiver: tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
168
169 // Pending UTXO Request Tracking
170 //
171 /// The set of outpoints with pending requests for their associated transparent::Output.
172 pending_utxos: PendingUtxos,
173
174 /// Instant tracking the last time `pending_utxos` was pruned.
175 last_prune: Instant,
176
177 // Updating Concurrently Readable State
178 //
179 /// A cloneable [`ReadStateService`], used to answer concurrent read requests.
180 ///
181 /// TODO: move users of read [`Request`]s to [`ReadStateService`], and remove `read_service`.
182 read_service: ReadStateService,
183
184 // Metrics
185 //
186 /// A metric tracking the maximum height that's currently in `finalized_state_queued_blocks`
187 ///
188 /// Set to `f64::NAN` if `finalized_state_queued_blocks` is empty, because grafana shows NaNs
189 /// as a break in the graph.
190 max_finalized_queue_height: f64,
191}
192
193/// A read-only service for accessing Zebra's cached blockchain state.
194///
195/// This service provides read-only access to:
196/// - the non-finalized state: the most recent blocks, up to
197/// [`MAX_BLOCK_REORG_HEIGHT`](crate::MAX_BLOCK_REORG_HEIGHT) of them.
198/// - the finalized state: older blocks that have many confirmations.
199///
200/// Requests to this service are processed in parallel,
201/// ignoring any blocks queued by the read-write [`StateService`].
202///
203/// This quick response behavior is better for most state users.
204/// It allows other async tasks to make progress while concurrently reading data from disk.
205#[derive(Clone, Debug)]
206pub struct ReadStateService {
207 // Configuration
208 //
209 /// The configured Zcash network.
210 network: Network,
211
212 // Shared Concurrently Readable State
213 //
214 /// A watch channel with a cached copy of the [`NonFinalizedState`].
215 ///
216 /// This state is only updated between requests,
217 /// so it might include some block data that is also on `disk`.
218 non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
219
220 /// The shared inner on-disk database for the finalized state.
221 ///
222 /// RocksDB allows reads and writes via a shared reference,
223 /// but [`ZebraDb`] doesn't expose any write methods or types.
224 ///
225 /// This chain is updated concurrently with requests,
226 /// so it might include some block data that is also in `best_mem`.
227 db: ZebraDb,
228
229 /// A shared handle to a task that writes blocks to the [`NonFinalizedState`] or [`FinalizedState`],
230 /// once the queues have received all their parent blocks.
231 ///
232 /// Used to check for panics when writing blocks.
233 block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
234}
235
236impl Drop for StateService {
237 fn drop(&mut self) {
238 // The state service owns the state, tasks, and channels,
239 // so dropping it should shut down everything.
240
241 // Close the channels (non-blocking)
242 // This makes the block write thread exit the next time it checks the channels.
243 // We want to do this here so we get any errors or panics from the block write task before it shuts down.
244 self.invalid_block_write_reset_receiver.close();
245 self.non_finalized_rejected_receiver.close();
246
247 std::mem::drop(self.block_write_sender.finalized.take());
248 std::mem::drop(self.block_write_sender.non_finalized.take());
249
250 self.clear_finalized_block_queue(CommitBlockError::WriteTaskExited);
251 self.clear_non_finalized_block_queue(CommitBlockError::WriteTaskExited);
252
253 // Log database metrics before shutting down
254 info!("dropping the state: logging database metrics");
255 self.log_db_metrics();
256
257 // Then drop self.read_service, which checks the block write task for panics,
258 // and tries to shut down the database.
259 }
260}
261
262impl Drop for ReadStateService {
263 fn drop(&mut self) {
264 // The read state service shares the state,
265 // so dropping it should check if we can shut down.
266
267 // TODO: move this into a try_shutdown() method
268 if let Some(block_write_task) = self.block_write_task.take() {
269 if let Some(block_write_task_handle) = Arc::into_inner(block_write_task) {
270 // We're the last database user, so we can tell it to shut down (blocking):
271 // - flushes the database to disk, and
272 // - drops the database, which cleans up any database tasks correctly.
273 self.db.shutdown(true);
274
275 // We are the last state with a reference to this thread, so we can
276 // wait until the block write task finishes, then check for panics (blocking).
277 // (We'd also like to abort the thread, but std::thread::JoinHandle can't do that.)
278
279 // This log is verbose during tests.
280 #[cfg(not(test))]
281 info!("waiting for the block write task to finish");
282 #[cfg(test)]
283 debug!("waiting for the block write task to finish");
284
285 // TODO: move this into a check_for_panics() method
286 if let Err(thread_panic) = block_write_task_handle.join() {
287 std::panic::resume_unwind(thread_panic);
288 } else {
289 debug!("shutting down the state because the block write task has finished");
290 }
291 }
292 } else {
293 // Even if we're not the last database user, try shutting it down.
294 //
295 // TODO: rename this to try_shutdown()?
296 self.db.shutdown(false);
297 }
298 }
299}
300
301impl StateService {
302 const PRUNE_INTERVAL: Duration = Duration::from_secs(30);
303
304 /// Creates a new state service for the state `config` and `network`.
305 ///
306 /// Uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
307 /// to work out when it is near the final checkpoint.
308 ///
309 /// Returns the read-write and read-only state services,
310 /// and read-only watch channels for its best chain tip.
311 pub async fn new(
312 config: Config,
313 network: &Network,
314 max_checkpoint_height: block::Height,
315 checkpoint_verify_concurrency_limit: usize,
316 ) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
317 let (finalized_state, finalized_tip, timer) = {
318 let config = config.clone();
319 let network = network.clone();
320 tokio::task::spawn_blocking(move || {
321 let timer = CodeTimer::start();
322 let finalized_state = FinalizedState::new(
323 &config,
324 &network,
325 #[cfg(feature = "elasticsearch")]
326 true,
327 );
328 timer.finish_desc("opening finalized state database");
329
330 let timer = CodeTimer::start();
331 let finalized_tip = finalized_state.db.tip_block();
332
333 (finalized_state, finalized_tip, timer)
334 })
335 .await
336 .expect("failed to join blocking task")
337 };
338
339 // # Correctness
340 //
341 // The state service must set the finalized block write sender to `None`
342 // if there are blocks in the restored non-finalized state that are above
343 // the max checkpoint height so that non-finalized blocks can be written, otherwise,
344 // Zebra will be unable to commit semantically verified blocks, and its chain sync will stall.
345 //
346 // The state service must not set the finalized block write sender to `None` if there
347 // aren't blocks in the restored non-finalized state that are above the max checkpoint height,
348 // otherwise, unless checkpoint sync is disabled in the zebra-consensus configuration,
349 // Zebra will be unable to commit checkpoint verified blocks, and its chain sync will stall.
350 let is_finalized_tip_past_max_checkpoint = if let Some(tip) = &finalized_tip {
351 tip.coinbase_height().expect("valid block must have height") >= max_checkpoint_height
352 } else {
353 false
354 };
355 let backup_dir_path = config.non_finalized_state_backup_dir(network);
356 let skip_backup_task = config.debug_skip_non_finalized_state_backup_task;
357 let (non_finalized_state, non_finalized_state_sender, non_finalized_state_receiver) =
358 NonFinalizedState::new(network)
359 .with_backup(
360 backup_dir_path.clone(),
361 &finalized_state.db,
362 is_finalized_tip_past_max_checkpoint,
363 config.debug_skip_non_finalized_state_backup_task,
364 )
365 .await;
366
367 let non_finalized_block_write_sent_hashes = SentHashes::new(&non_finalized_state);
368 let initial_tip = non_finalized_state
369 .best_tip_block()
370 .map(|cv_block| cv_block.block.clone())
371 .or(finalized_tip)
372 .map(CheckpointVerifiedBlock::from)
373 .map(ChainTipBlock::from);
374
375 tracing::info!(chain_tip = ?initial_tip.as_ref().map(|tip| (tip.hash, tip.height)), "loaded Zebra state cache");
376
377 let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
378 ChainTipSender::new(initial_tip, network);
379
380 let finalized_state_for_writing = finalized_state.clone();
381 let should_use_finalized_block_write_sender = non_finalized_state.is_chain_set_empty();
382 let sync_backup_dir_path = backup_dir_path.filter(|_| skip_backup_task);
383 let (
384 block_write_sender,
385 invalid_block_write_reset_receiver,
386 non_finalized_rejected_receiver,
387 block_write_task,
388 ) = write::BlockWriteSender::spawn(
389 finalized_state_for_writing,
390 non_finalized_state,
391 chain_tip_sender,
392 non_finalized_state_sender,
393 should_use_finalized_block_write_sender,
394 sync_backup_dir_path,
395 );
396
397 let read_service = ReadStateService::new(
398 &finalized_state,
399 block_write_task,
400 non_finalized_state_receiver,
401 );
402
403 let full_verifier_utxo_lookahead = max_checkpoint_height
404 - HeightDiff::try_from(checkpoint_verify_concurrency_limit)
405 .expect("fits in HeightDiff");
406 let full_verifier_utxo_lookahead =
407 full_verifier_utxo_lookahead.unwrap_or(block::Height::MIN);
408 let non_finalized_state_queued_blocks = QueuedBlocks::default();
409 let pending_utxos = PendingUtxos::default();
410
411 let finalized_block_write_last_sent_hash =
412 tokio::task::spawn_blocking(move || finalized_state.db.finalized_tip_hash())
413 .await
414 .expect("failed to join blocking task");
415
416 let state = Self {
417 network: network.clone(),
418 full_verifier_utxo_lookahead,
419 non_finalized_state_queued_blocks,
420 finalized_state_queued_blocks: HashMap::new(),
421 block_write_sender,
422 finalized_block_write_last_sent_hash,
423 non_finalized_block_write_sent_hashes,
424 invalid_block_write_reset_receiver,
425 non_finalized_rejected_receiver,
426 pending_utxos,
427 last_prune: Instant::now(),
428 read_service: read_service.clone(),
429 max_finalized_queue_height: f64::NAN,
430 };
431 timer.finish_desc("initializing state service");
432
433 tracing::info!("starting legacy chain check");
434 let timer = CodeTimer::start();
435
436 if let (Some(tip), Some(nu5_activation_height)) = (
437 {
438 let read_state = state.read_service.clone();
439 tokio::task::spawn_blocking(move || read_state.best_tip())
440 .await
441 .expect("task should not panic")
442 },
443 NetworkUpgrade::Nu5.activation_height(network),
444 ) {
445 if let Err(error) = check::legacy_chain(
446 nu5_activation_height,
447 any_ancestor_blocks(
448 &state.read_service.latest_non_finalized_state(),
449 &state.read_service.db,
450 tip.1,
451 ),
452 &state.network,
453 MAX_LEGACY_CHAIN_BLOCKS,
454 ) {
455 let legacy_db_path = state.read_service.db.path().to_path_buf();
456 panic!(
457 "Cached state contains a legacy chain.\n\
458 An outdated Zebra version did not know about a recent network upgrade,\n\
459 so it followed a legacy chain using outdated consensus branch rules.\n\
460 Hint: Delete your database, and restart Zebra to do a full sync.\n\
461 Database path: {legacy_db_path:?}\n\
462 Error: {error:?}",
463 );
464 }
465 }
466
467 tracing::info!("cached state consensus branch is valid: no legacy chain found");
468 timer.finish_desc("legacy chain check");
469
470 // Spawn a background task to periodically export RocksDB metrics to Prometheus
471 let db_for_metrics = read_service.db.clone();
472 tokio::spawn(async move {
473 let mut interval = tokio::time::interval(Duration::from_secs(30));
474 loop {
475 interval.tick().await;
476 db_for_metrics.export_metrics();
477 }
478 });
479
480 (state, read_service, latest_chain_tip, chain_tip_change)
481 }
482
483 /// Call read only state service to log rocksdb database metrics.
484 pub fn log_db_metrics(&self) {
485 self.read_service.db.print_db_metrics();
486 }
487
488 /// Queue a checkpoint verified block for verification and storage in the finalized state.
489 ///
490 /// Returns a channel receiver that provides the result of the block commit.
491 fn queue_and_commit_to_finalized_state(
492 &mut self,
493 checkpoint_verified: CheckpointVerifiedBlock,
494 ) -> oneshot::Receiver<Result<block::Hash, CommitCheckpointVerifiedError>> {
495 // # Correctness & Performance
496 //
497 // This method must not block, access the database, or perform CPU-intensive tasks,
498 // because it is called directly from the tokio executor's Future threads.
499
500 let queued_prev_hash = checkpoint_verified.block.header.previous_block_hash;
501 let queued_height = checkpoint_verified.height;
502
503 // If we're close to the final checkpoint, make the block's UTXOs available for
504 // semantic block verification, even when it is in the channel.
505 if self.is_close_to_final_checkpoint(queued_height) {
506 self.non_finalized_block_write_sent_hashes
507 .add_finalized(&checkpoint_verified)
508 }
509
510 let (rsp_tx, rsp_rx) = oneshot::channel();
511 let queued = (checkpoint_verified, rsp_tx);
512
513 if self.block_write_sender.finalized.is_some() {
514 // We're still committing checkpoint verified blocks
515 if let Some(duplicate_queued) = self
516 .finalized_state_queued_blocks
517 .insert(queued_prev_hash, queued)
518 {
519 Self::send_checkpoint_verified_block_error(
520 duplicate_queued,
521 CommitBlockError::new_duplicate(
522 Some(queued_prev_hash.into()),
523 KnownBlock::Queue,
524 ),
525 );
526 }
527
528 self.drain_finalized_queue_and_commit();
529 } else {
530 // We've finished committing checkpoint verified blocks to the finalized state,
531 // so drop any repeated queued blocks, and return an error.
532 //
533 // TODO: track the latest sent height, and drop any blocks under that height
534 // every time we send some blocks (like QueuedSemanticallyVerifiedBlocks)
535 Self::send_checkpoint_verified_block_error(
536 queued,
537 CommitBlockError::new_duplicate(None, KnownBlock::Finalized),
538 );
539
540 self.clear_finalized_block_queue(CommitBlockError::new_duplicate(
541 None,
542 KnownBlock::Finalized,
543 ));
544 }
545
546 if self.finalized_state_queued_blocks.is_empty() {
547 self.max_finalized_queue_height = f64::NAN;
548 } else if self.max_finalized_queue_height.is_nan()
549 || self.max_finalized_queue_height < queued_height.0 as f64
550 {
551 // if there are still blocks in the queue, then either:
552 // - the new block was lower than the old maximum, and there was a gap before it,
553 // so the maximum is still the same (and we skip this code), or
554 // - the new block is higher than the old maximum, and there is at least one gap
555 // between the finalized tip and the new maximum
556 self.max_finalized_queue_height = queued_height.0 as f64;
557 }
558
559 metrics::gauge!("state.checkpoint.queued.max.height").set(self.max_finalized_queue_height);
560 metrics::gauge!("state.checkpoint.queued.block.count")
561 .set(self.finalized_state_queued_blocks.len() as f64);
562
563 rsp_rx
564 }
565
566 /// Finds finalized state queue blocks to be committed to the state in order,
567 /// removes them from the queue, and sends them to the block commit task.
568 ///
569 /// After queueing a finalized block, this method checks whether the newly
570 /// queued block (and any of its descendants) can be committed to the state.
571 ///
572 /// Returns an error if the block commit channel has been closed.
573 pub fn drain_finalized_queue_and_commit(&mut self) {
574 use tokio::sync::mpsc::error::{SendError, TryRecvError};
575
576 // # Correctness & Performance
577 //
578 // This method must not block, access the database, or perform CPU-intensive tasks,
579 // because it is called directly from the tokio executor's Future threads.
580
581 // If a block failed, we need to start again from a valid tip.
582 match self.invalid_block_write_reset_receiver.try_recv() {
583 Ok(reset_tip_hash) => self.finalized_block_write_last_sent_hash = reset_tip_hash,
584 Err(TryRecvError::Disconnected) => {
585 info!("Block commit task closed the block reset channel. Is Zebra shutting down?");
586 return;
587 }
588 // There are no errors, so we can just use the last block hash we sent
589 Err(TryRecvError::Empty) => {}
590 }
591
592 while let Some(queued_block) = self
593 .finalized_state_queued_blocks
594 .remove(&self.finalized_block_write_last_sent_hash)
595 {
596 let last_sent_finalized_block_height = queued_block.0.height;
597
598 self.finalized_block_write_last_sent_hash = queued_block.0.hash;
599
600 // If we've finished sending finalized blocks, ignore any repeated blocks.
601 // (Blocks can be repeated after a syncer reset.)
602 if let Some(finalized_block_write_sender) = &self.block_write_sender.finalized {
603 let send_result = finalized_block_write_sender.send(queued_block);
604
605 // If the receiver is closed, we can't send any more blocks.
606 if let Err(SendError(queued)) = send_result {
607 // If Zebra is shutting down, drop blocks and return an error.
608 Self::send_checkpoint_verified_block_error(
609 queued,
610 CommitBlockError::WriteTaskExited,
611 );
612
613 self.clear_finalized_block_queue(CommitBlockError::WriteTaskExited);
614 } else {
615 metrics::gauge!("state.checkpoint.sent.block.height")
616 .set(last_sent_finalized_block_height.0 as f64);
617 };
618 }
619 }
620 }
621
622 /// Drains every hash queued on `non_finalized_rejected_receiver` and
623 /// removes it from `non_finalized_block_write_sent_hashes`.
624 ///
625 /// This closes the lockout window where a rejected block keeps its hash
626 /// recorded as "sent", so a subsequent honest re-delivery of a block at
627 /// the same hash is not short-circuited as a false "duplicate".
628 ///
629 /// # Correctness & Performance
630 ///
631 /// Like the other drain methods on `StateService`, this must not block,
632 /// access the database, or perform CPU-intensive work, because it is
633 /// called directly from the tokio executor's Future threads.
634 fn drain_non_finalized_rejected_hashes(&mut self) {
635 use tokio::sync::mpsc::error::TryRecvError;
636
637 loop {
638 match self.non_finalized_rejected_receiver.try_recv() {
639 Ok(hash) => {
640 self.non_finalized_block_write_sent_hashes.remove(&hash);
641 }
642 Err(TryRecvError::Empty) => break,
643 Err(TryRecvError::Disconnected) => {
644 info!(
645 "Block commit task closed the non-finalized rejected hash channel. \
646 Is Zebra shutting down?"
647 );
648 break;
649 }
650 }
651 }
652 }
653
654 /// Drops all finalized state queue blocks, and sends an error on their result channels.
655 fn clear_finalized_block_queue(
656 &mut self,
657 error: impl Into<CommitCheckpointVerifiedError> + Clone,
658 ) {
659 for (_hash, queued) in self.finalized_state_queued_blocks.drain() {
660 Self::send_checkpoint_verified_block_error(queued, error.clone());
661 }
662 }
663
664 /// Send an error on a `QueuedCheckpointVerified` block's result channel, and drop the block
665 fn send_checkpoint_verified_block_error(
666 queued: QueuedCheckpointVerified,
667 error: impl Into<CommitCheckpointVerifiedError>,
668 ) {
669 let (finalized, rsp_tx) = queued;
670
671 // The block sender might have already given up on this block,
672 // so ignore any channel send errors.
673 let _ = rsp_tx.send(Err(error.into()));
674 std::mem::drop(finalized);
675 }
676
677 /// Drops all non-finalized state queue blocks, and sends an error on their result channels.
678 fn clear_non_finalized_block_queue(
679 &mut self,
680 error: impl Into<CommitSemanticallyVerifiedError> + Clone,
681 ) {
682 for (_hash, queued) in self.non_finalized_state_queued_blocks.drain() {
683 Self::send_semantically_verified_block_error(queued, error.clone());
684 }
685 }
686
687 /// Send an error on a `QueuedSemanticallyVerified` block's result channel, and drop the block
688 fn send_semantically_verified_block_error(
689 queued: QueuedSemanticallyVerified,
690 error: impl Into<CommitSemanticallyVerifiedError>,
691 ) {
692 let (finalized, rsp_tx) = queued;
693
694 // The block sender might have already given up on this block,
695 // so ignore any channel send errors.
696 let _ = rsp_tx.send(Err(error.into()));
697 std::mem::drop(finalized);
698 }
699
700 /// Queue a semantically verified block for contextual verification and check if any queued
701 /// blocks are ready to be verified and committed to the state.
702 ///
703 /// This function encodes the logic for [committing non-finalized blocks][1]
704 /// in RFC0005.
705 ///
706 /// [1]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html#committing-non-finalized-blocks
707 #[instrument(level = "debug", skip(self, semantically_verified))]
708 fn queue_and_commit_to_non_finalized_state(
709 &mut self,
710 semantically_verified: SemanticallyVerifiedBlock,
711 ) -> oneshot::Receiver<Result<block::Hash, CommitSemanticallyVerifiedError>> {
712 tracing::debug!(block = %semantically_verified.block, "queueing block for contextual verification");
713 let parent_hash = semantically_verified.block.header.previous_block_hash;
714
715 // Drop hashes of any blocks the write task has rejected before checking
716 // the SentHashes membership below. Without this, a rejected same-hash
717 // block would lock out a later honest re-delivery of a block at the
718 // same hash as a false "duplicate".
719 self.drain_non_finalized_rejected_hashes();
720
721 if self
722 .non_finalized_block_write_sent_hashes
723 .contains(&semantically_verified.hash)
724 {
725 let (rsp_tx, rsp_rx) = oneshot::channel();
726 let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
727 Some(semantically_verified.hash.into()),
728 KnownBlock::WriteChannel,
729 )
730 .into()));
731 return rsp_rx;
732 }
733
734 if self
735 .read_service
736 .db
737 .contains_height(semantically_verified.height)
738 {
739 let (rsp_tx, rsp_rx) = oneshot::channel();
740 let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
741 Some(semantically_verified.height.into()),
742 KnownBlock::Finalized,
743 )
744 .into()));
745 return rsp_rx;
746 }
747
748 // [`Request::CommitSemanticallyVerifiedBlock`] contract: a request to commit a block which
749 // has been queued but not yet committed to the state fails the older request and replaces
750 // it with the newer request.
751 let rsp_rx = if let Some((_, old_rsp_tx)) = self
752 .non_finalized_state_queued_blocks
753 .get_mut(&semantically_verified.hash)
754 {
755 tracing::debug!("replacing older queued request with new request");
756 let (mut rsp_tx, rsp_rx) = oneshot::channel();
757 std::mem::swap(old_rsp_tx, &mut rsp_tx);
758 let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
759 Some(semantically_verified.hash.into()),
760 KnownBlock::Queue,
761 )
762 .into()));
763 rsp_rx
764 } else {
765 let (rsp_tx, rsp_rx) = oneshot::channel();
766 self.non_finalized_state_queued_blocks
767 .queue((semantically_verified, rsp_tx));
768 rsp_rx
769 };
770
771 // We've finished sending checkpoint verified blocks when:
772 // - we've sent the verified block for the last checkpoint, and
773 // - it has been successfully written to disk.
774 //
775 // We detect the last checkpoint by looking for non-finalized blocks
776 // that are a child of the last block we sent.
777 //
778 // TODO: configure the state with the last checkpoint hash instead?
779 if self.block_write_sender.finalized.is_some()
780 && self
781 .non_finalized_state_queued_blocks
782 .has_queued_children(self.finalized_block_write_last_sent_hash)
783 && self.read_service.db.finalized_tip_hash()
784 == self.finalized_block_write_last_sent_hash
785 {
786 // Tell the block write task to stop committing checkpoint verified blocks to the finalized state,
787 // and move on to committing semantically verified blocks to the non-finalized state.
788 std::mem::drop(self.block_write_sender.finalized.take());
789 // Remove any checkpoint-verified block hashes from `non_finalized_block_write_sent_hashes`.
790 self.non_finalized_block_write_sent_hashes = SentHashes::default();
791 // Mark `SentHashes` as usable by the `can_fork_chain_at()` method.
792 self.non_finalized_block_write_sent_hashes
793 .can_fork_chain_at_hashes = true;
794 // Send blocks from non-finalized queue
795 self.send_ready_non_finalized_queued(self.finalized_block_write_last_sent_hash);
796 // We've finished committing checkpoint verified blocks to finalized state, so drop any repeated queued blocks.
797 self.clear_finalized_block_queue(CommitBlockError::new_duplicate(
798 None,
799 KnownBlock::Finalized,
800 ));
801 } else if !self.can_fork_chain_at(&parent_hash) {
802 tracing::trace!("unready to verify, returning early");
803 } else if self.block_write_sender.finalized.is_none() {
804 // Wait until block commit task is ready to write non-finalized blocks before dequeuing them
805 self.send_ready_non_finalized_queued(parent_hash);
806
807 let finalized_tip_height = self.read_service.db.finalized_tip_height().expect(
808 "Finalized state must have at least one block before committing non-finalized state",
809 );
810
811 self.non_finalized_state_queued_blocks
812 .prune_by_height(finalized_tip_height);
813
814 self.non_finalized_block_write_sent_hashes
815 .prune_by_height(finalized_tip_height);
816 }
817
818 rsp_rx
819 }
820
821 /// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks.
822 fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
823 self.non_finalized_block_write_sent_hashes
824 .can_fork_chain_at(hash)
825 || &self.read_service.db.finalized_tip_hash() == hash
826 }
827
828 /// Returns `true` if `queued_height` is near the final checkpoint.
829 ///
830 /// The semantic block verifier needs access to UTXOs from checkpoint verified blocks
831 /// near the final checkpoint, so that it can verify blocks that spend those UTXOs.
832 ///
833 /// If it doesn't have the required UTXOs, some blocks will time out,
834 /// but succeed after a syncer restart.
835 fn is_close_to_final_checkpoint(&self, queued_height: block::Height) -> bool {
836 queued_height >= self.full_verifier_utxo_lookahead
837 }
838
839 /// Sends all queued blocks whose parents have recently arrived starting from `new_parent`
840 /// in breadth-first ordering to the block write task which will attempt to validate and commit them
841 #[tracing::instrument(level = "debug", skip(self, new_parent))]
842 fn send_ready_non_finalized_queued(&mut self, new_parent: block::Hash) {
843 use tokio::sync::mpsc::error::SendError;
844 if let Some(non_finalized_block_write_sender) = &self.block_write_sender.non_finalized {
845 let mut new_parents: Vec<block::Hash> = vec![new_parent];
846
847 while let Some(parent_hash) = new_parents.pop() {
848 let queued_children = self
849 .non_finalized_state_queued_blocks
850 .dequeue_children(parent_hash);
851
852 for queued_child in queued_children {
853 let (SemanticallyVerifiedBlock { hash, .. }, _) = queued_child;
854
855 self.non_finalized_block_write_sent_hashes
856 .add(&queued_child.0);
857 let send_result = non_finalized_block_write_sender.send(queued_child.into());
858
859 if let Err(SendError(NonFinalizedWriteMessage::Commit(queued))) = send_result {
860 // If Zebra is shutting down, drop blocks and return an error.
861 Self::send_semantically_verified_block_error(
862 queued,
863 CommitBlockError::WriteTaskExited,
864 );
865
866 self.clear_non_finalized_block_queue(CommitBlockError::WriteTaskExited);
867
868 return;
869 };
870
871 new_parents.push(hash);
872 }
873 }
874
875 self.non_finalized_block_write_sent_hashes.finish_batch();
876 };
877 }
878
879 /// Return the tip of the current best chain.
880 pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
881 self.read_service.best_tip()
882 }
883
884 fn send_invalidate_block(
885 &self,
886 hash: block::Hash,
887 ) -> oneshot::Receiver<Result<block::Hash, InvalidateError>> {
888 let (rsp_tx, rsp_rx) = oneshot::channel();
889
890 let Some(sender) = &self.block_write_sender.non_finalized else {
891 let _ = rsp_tx.send(Err(InvalidateError::ProcessingCheckpointedBlocks));
892 return rsp_rx;
893 };
894
895 if let Err(tokio::sync::mpsc::error::SendError(error)) =
896 sender.send(NonFinalizedWriteMessage::Invalidate { hash, rsp_tx })
897 {
898 let NonFinalizedWriteMessage::Invalidate { rsp_tx, .. } = error else {
899 unreachable!("should return the same Invalidate message could not be sent");
900 };
901
902 let _ = rsp_tx.send(Err(InvalidateError::SendInvalidateRequestFailed));
903 }
904
905 rsp_rx
906 }
907
908 fn send_reconsider_block(
909 &self,
910 hash: block::Hash,
911 ) -> oneshot::Receiver<Result<Vec<block::Hash>, ReconsiderError>> {
912 let (rsp_tx, rsp_rx) = oneshot::channel();
913
914 let Some(sender) = &self.block_write_sender.non_finalized else {
915 let _ = rsp_tx.send(Err(ReconsiderError::CheckpointCommitInProgress));
916 return rsp_rx;
917 };
918
919 if let Err(tokio::sync::mpsc::error::SendError(error)) =
920 sender.send(NonFinalizedWriteMessage::Reconsider { hash, rsp_tx })
921 {
922 let NonFinalizedWriteMessage::Reconsider { rsp_tx, .. } = error else {
923 unreachable!("should return the same Reconsider message could not be sent");
924 };
925
926 let _ = rsp_tx.send(Err(ReconsiderError::ReconsiderSendFailed));
927 }
928
929 rsp_rx
930 }
931
932 /// Assert some assumptions about the semantically verified `block` before it is queued.
933 fn assert_block_can_be_validated(&self, block: &SemanticallyVerifiedBlock) {
934 // required by `Request::CommitSemanticallyVerifiedBlock` call
935 assert!(
936 block.height > self.network.mandatory_checkpoint_height(),
937 "invalid semantically verified block height: the canopy checkpoint is mandatory, pre-canopy \
938 blocks, and the canopy activation block, must be committed to the state as finalized \
939 blocks"
940 );
941 }
942
943 fn known_sent_hash(&self, hash: &block::Hash) -> Option<KnownBlock> {
944 self.non_finalized_block_write_sent_hashes
945 .contains(hash)
946 .then_some(KnownBlock::WriteChannel)
947 }
948}
949
950impl ReadStateService {
951 /// Creates a new read-only state service, using the provided finalized state and
952 /// block write task handle.
953 ///
954 /// Returns the newly created service,
955 /// and a watch channel for updating the shared recent non-finalized chain.
956 pub(crate) fn new(
957 finalized_state: &FinalizedState,
958 block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
959 non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
960 ) -> Self {
961 let read_service = Self {
962 network: finalized_state.network(),
963 db: finalized_state.db.clone(),
964 non_finalized_state_receiver,
965 block_write_task,
966 };
967
968 tracing::debug!("created new read-only state service");
969
970 read_service
971 }
972
973 /// Return the tip of the current best chain.
974 pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
975 read::best_tip(&self.latest_non_finalized_state(), &self.db)
976 }
977
978 /// Gets a clone of the latest non-finalized state from the `non_finalized_state_receiver`
979 fn latest_non_finalized_state(&self) -> NonFinalizedState {
980 self.non_finalized_state_receiver.cloned_watch_data()
981 }
982
983 /// Gets a clone of the latest, best non-finalized chain from the `non_finalized_state_receiver`
984 fn latest_best_chain(&self) -> Option<Arc<Chain>> {
985 self.non_finalized_state_receiver
986 .borrow_mapped(|non_finalized_state| non_finalized_state.best_chain().cloned())
987 }
988
989 /// Test-only access to the inner database.
990 /// Can be used to modify the database without doing any consensus checks.
991 #[cfg(any(test, feature = "proptest-impl"))]
992 pub fn db(&self) -> &ZebraDb {
993 &self.db
994 }
995
996 /// Logs rocksdb metrics using the read only state service.
997 pub fn log_db_metrics(&self) {
998 self.db.print_db_metrics();
999 }
1000}
1001
1002impl Service<Request> for StateService {
1003 type Response = Response;
1004 type Error = BoxError;
1005 type Future =
1006 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1007
1008 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1009 // Check for panics in the block write task
1010 let poll = self.read_service.poll_ready(cx);
1011
1012 // Prune outdated UTXO requests
1013 let now = Instant::now();
1014
1015 if self.last_prune + Self::PRUNE_INTERVAL < now {
1016 let tip = self.best_tip();
1017 let old_len = self.pending_utxos.len();
1018
1019 self.pending_utxos.prune();
1020 self.last_prune = now;
1021
1022 let new_len = self.pending_utxos.len();
1023 let prune_count = old_len
1024 .checked_sub(new_len)
1025 .expect("prune does not add any utxo requests");
1026 if prune_count > 0 {
1027 tracing::debug!(
1028 ?old_len,
1029 ?new_len,
1030 ?prune_count,
1031 ?tip,
1032 "pruned utxo requests"
1033 );
1034 } else {
1035 tracing::debug!(len = ?old_len, ?tip, "no utxo requests needed pruning");
1036 }
1037 }
1038
1039 poll
1040 }
1041
1042 #[instrument(name = "state", skip(self, req))]
1043 fn call(&mut self, req: Request) -> Self::Future {
1044 req.count_metric();
1045 let span = Span::current();
1046
1047 match req {
1048 // Uses non_finalized_state_queued_blocks and pending_utxos in the StateService
1049 // Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb.
1050 //
1051 // The expected error type for this request is `CommitSemanticallyVerifiedError`.
1052 Request::CommitSemanticallyVerifiedBlock(semantically_verified) => {
1053 let timer = CodeTimer::start();
1054 self.assert_block_can_be_validated(&semantically_verified);
1055
1056 self.pending_utxos
1057 .check_against_ordered(&semantically_verified.new_outputs);
1058
1059 // # Performance
1060 //
1061 // Allow other async tasks to make progress while blocks are being verified
1062 // and written to disk. But wait for the blocks to finish committing,
1063 // so that `StateService` multi-block queries always observe a consistent state.
1064 //
1065 // Since each block is spawned into its own task,
1066 // there shouldn't be any other code running in the same task,
1067 // so we don't need to worry about blocking it:
1068 // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
1069
1070 let rsp_rx = tokio::task::block_in_place(move || {
1071 span.in_scope(|| {
1072 self.queue_and_commit_to_non_finalized_state(semantically_verified)
1073 })
1074 });
1075
1076 // TODO:
1077 // - check for panics in the block write task here,
1078 // as well as in poll_ready()
1079
1080 // The work is all done, the future just waits on a channel for the result
1081 timer.finish_desc("CommitSemanticallyVerifiedBlock");
1082
1083 // Await the channel response, flatten the result, map receive errors to
1084 // `CommitSemanticallyVerifiedError::WriteTaskExited`.
1085 // Then flatten the nested Result and convert any errors to a BoxError.
1086 let span = Span::current();
1087 async move {
1088 rsp_rx
1089 .await
1090 .map_err(|_recv_error| CommitBlockError::WriteTaskExited.into())
1091 .and_then(|result| result)
1092 .map_err(BoxError::from)
1093 .map(Response::Committed)
1094 }
1095 .instrument(span)
1096 .boxed()
1097 }
1098
1099 // Uses finalized_state_queued_blocks and pending_utxos in the StateService.
1100 // Accesses shared writeable state in the StateService.
1101 //
1102 // The expected error type for this request is `CommitCheckpointVerifiedError`.
1103 Request::CommitCheckpointVerifiedBlock(finalized) => {
1104 let timer = CodeTimer::start();
1105 // # Consensus
1106 //
1107 // A semantic block verification could have called AwaitUtxo
1108 // before this checkpoint verified block arrived in the state.
1109 // So we need to check for pending UTXO requests sent by running
1110 // semantic block verifications.
1111 //
1112 // This check is redundant for most checkpoint verified blocks,
1113 // because semantic verification can only succeed near the final
1114 // checkpoint, when all the UTXOs are available for the verifying block.
1115 //
1116 // (Checkpoint block UTXOs are verified using block hash checkpoints
1117 // and transaction merkle tree block header commitments.)
1118 self.pending_utxos
1119 .check_against_ordered(&finalized.new_outputs);
1120
1121 // # Performance
1122 //
1123 // This method doesn't block, access the database, or perform CPU-intensive tasks,
1124 // so we can run it directly in the tokio executor's Future threads.
1125 let rsp_rx = self.queue_and_commit_to_finalized_state(finalized);
1126
1127 // TODO:
1128 // - check for panics in the block write task here,
1129 // as well as in poll_ready()
1130
1131 // The work is all done, the future just waits on a channel for the result
1132 timer.finish_desc("CommitCheckpointVerifiedBlock");
1133
1134 // Await the channel response, flatten the result, map receive errors to
1135 // `CommitCheckpointVerifiedError::WriteTaskExited`.
1136 // Then flatten the nested Result and convert any errors to a BoxError.
1137 async move {
1138 rsp_rx
1139 .await
1140 .map_err(|_recv_error| CommitBlockError::WriteTaskExited.into())
1141 .and_then(|result| result)
1142 .map_err(BoxError::from)
1143 .map(Response::Committed)
1144 }
1145 .instrument(span)
1146 .boxed()
1147 }
1148
1149 // Uses pending_utxos and non_finalized_state_queued_blocks in the StateService.
1150 // If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService.
1151 Request::AwaitUtxo(outpoint) => {
1152 let timer = CodeTimer::start();
1153 // Prepare the AwaitUtxo future from PendingUxtos.
1154 let response_fut = self.pending_utxos.queue(outpoint);
1155 // Only instrument `response_fut`, the ReadStateService already
1156 // instruments its requests with the same span.
1157
1158 let response_fut = response_fut.instrument(span).boxed();
1159
1160 // Check the non-finalized block queue outside the returned future,
1161 // so we can access mutable state fields.
1162 if let Some(utxo) = self.non_finalized_state_queued_blocks.utxo(&outpoint) {
1163 self.pending_utxos.respond(&outpoint, utxo);
1164
1165 // We're finished, the returned future gets the UTXO from the respond() channel.
1166 timer.finish_desc("AwaitUtxo/queued-non-finalized");
1167
1168 return response_fut;
1169 }
1170
1171 // Check the sent non-finalized blocks
1172 if let Some(utxo) = self.non_finalized_block_write_sent_hashes.utxo(&outpoint) {
1173 self.pending_utxos.respond(&outpoint, utxo);
1174
1175 // We're finished, the returned future gets the UTXO from the respond() channel.
1176 timer.finish_desc("AwaitUtxo/sent-non-finalized");
1177
1178 return response_fut;
1179 }
1180
1181 // We ignore any UTXOs in FinalizedState.finalized_state_queued_blocks,
1182 // because it is only used during checkpoint verification.
1183 //
1184 // This creates a rare race condition, but it doesn't seem to happen much in practice.
1185 // See #5126 for details.
1186
1187 // Manually send a request to the ReadStateService,
1188 // to get UTXOs from any non-finalized chain or the finalized chain.
1189 let read_service = self.read_service.clone();
1190
1191 // Run the request in an async block, so we can await the response.
1192 async move {
1193 let req = ReadRequest::AnyChainUtxo(outpoint);
1194
1195 let rsp = read_service.oneshot(req).await?;
1196
1197 // Optional TODO:
1198 // - make pending_utxos.respond() async using a channel,
1199 // so we can respond to all waiting requests here
1200 //
1201 // This change is not required for correctness, because:
1202 // - any waiting requests should have returned when the block was sent to the state
1203 // - otherwise, the request returns immediately if:
1204 // - the block is in the non-finalized queue, or
1205 // - the block is in any non-finalized chain or the finalized state
1206 //
1207 // And if the block is in the finalized queue,
1208 // that's rare enough that a retry is ok.
1209 if let ReadResponse::AnyChainUtxo(Some(utxo)) = rsp {
1210 // We got a UTXO, so we replace the response future with the result own.
1211 timer.finish_desc("AwaitUtxo/any-chain");
1212
1213 return Ok(Response::Utxo(utxo));
1214 }
1215
1216 // We're finished, but the returned future is waiting on the respond() channel.
1217 timer.finish_desc("AwaitUtxo/waiting");
1218
1219 response_fut.await
1220 }
1221 .boxed()
1222 }
1223
1224 // Used by sync, inbound, and block verifier to check if a block is already in the state
1225 // before downloading or validating it.
1226 Request::KnownBlock(hash) => {
1227 let timer = CodeTimer::start();
1228 let sent_hash_response = self.known_sent_hash(&hash);
1229 let read_service = self.read_service.clone();
1230
1231 async move {
1232 if sent_hash_response.is_some() {
1233 return Ok(Response::KnownBlock(sent_hash_response));
1234 };
1235
1236 let response = read::non_finalized_state_contains_block_hash(
1237 &read_service.latest_non_finalized_state(),
1238 hash,
1239 )
1240 // TODO: Move this to a blocking task, perhaps by moving some of this logic to the ReadStateService.
1241 .or_else(|| read::finalized_state_contains_block_hash(&read_service.db, hash));
1242
1243 timer.finish_desc("Request::KnownBlock");
1244
1245 Ok(Response::KnownBlock(response))
1246 }
1247 .boxed()
1248 }
1249
1250 // The expected error type for this request is `InvalidateError`
1251 Request::InvalidateBlock(block_hash) => {
1252 let rsp_rx = tokio::task::block_in_place(move || {
1253 span.in_scope(|| self.send_invalidate_block(block_hash))
1254 });
1255
1256 // Await the channel response, flatten the result, map receive errors to
1257 // `InvalidateError::InvalidateRequestDropped`.
1258 // Then flatten the nested Result and convert any errors to a BoxError.
1259 let span = Span::current();
1260 async move {
1261 rsp_rx
1262 .await
1263 .map_err(|_recv_error| InvalidateError::InvalidateRequestDropped)
1264 .and_then(|result| result)
1265 .map_err(BoxError::from)
1266 .map(Response::Invalidated)
1267 }
1268 .instrument(span)
1269 .boxed()
1270 }
1271
1272 // The expected error type for this request is `ReconsiderError`
1273 Request::ReconsiderBlock(block_hash) => {
1274 let rsp_rx = tokio::task::block_in_place(move || {
1275 span.in_scope(|| self.send_reconsider_block(block_hash))
1276 });
1277
1278 // Await the channel response, flatten the result, map receive errors to
1279 // `ReconsiderError::ReconsiderResponseDropped`.
1280 // Then flatten the nested Result and convert any errors to a BoxError.
1281 let span = Span::current();
1282 async move {
1283 rsp_rx
1284 .await
1285 .map_err(|_recv_error| ReconsiderError::ReconsiderResponseDropped)
1286 .and_then(|result| result)
1287 .map_err(BoxError::from)
1288 .map(Response::Reconsidered)
1289 }
1290 .instrument(span)
1291 .boxed()
1292 }
1293
1294 // Runs concurrently using the ReadStateService
1295 Request::Tip
1296 | Request::Depth(_)
1297 | Request::BestChainNextMedianTimePast
1298 | Request::BestChainBlockHash(_)
1299 | Request::BlockLocator
1300 | Request::Transaction(_)
1301 | Request::AnyChainTransaction(_)
1302 | Request::UnspentBestChainUtxo(_)
1303 | Request::Block(_)
1304 | Request::AnyChainBlock(_)
1305 | Request::BlockAndSize(_)
1306 | Request::BlockHeader(_)
1307 | Request::FindBlockHashes { .. }
1308 | Request::FindBlockHeaders { .. }
1309 | Request::CheckBestChainTipNullifiersAndAnchors(_)
1310 | Request::CheckBlockProposalValidity(_) => {
1311 // Redirect the request to the concurrent ReadStateService
1312 let read_service = self.read_service.clone();
1313
1314 async move {
1315 let req = req
1316 .try_into()
1317 .expect("ReadRequest conversion should not fail");
1318
1319 let rsp = read_service.oneshot(req).await?;
1320 let rsp = rsp.try_into().expect("Response conversion should not fail");
1321
1322 Ok(rsp)
1323 }
1324 .boxed()
1325 }
1326 }
1327 }
1328}
1329
1330impl Service<ReadRequest> for ReadStateService {
1331 type Response = ReadResponse;
1332 type Error = BoxError;
1333 type Future =
1334 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1335
1336 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1337 // Check for panics in the block write task
1338 //
1339 // TODO: move into a check_for_panics() method
1340 let block_write_task = self.block_write_task.take();
1341
1342 if let Some(block_write_task) = block_write_task {
1343 if block_write_task.is_finished() {
1344 if let Some(block_write_task) = Arc::into_inner(block_write_task) {
1345 // We are the last state with a reference to this task, so we can propagate any panics
1346 if let Err(thread_panic) = block_write_task.join() {
1347 std::panic::resume_unwind(thread_panic);
1348 }
1349 }
1350 } else {
1351 // It hasn't finished, so we need to put it back
1352 self.block_write_task = Some(block_write_task);
1353 }
1354 }
1355
1356 self.db.check_for_panics();
1357
1358 Poll::Ready(Ok(()))
1359 }
1360
1361 #[instrument(name = "read_state", skip(self, req))]
1362 fn call(&mut self, req: ReadRequest) -> Self::Future {
1363 req.count_metric();
1364 let timer = CodeTimer::start_desc(req.variant_name());
1365 let span = Span::current();
1366 let timed_span = TimedSpan::new(timer, span);
1367 let state = self.clone();
1368
1369 if req == ReadRequest::NonFinalizedBlocksListener {
1370 // The non-finalized blocks listener is used to notify the state service
1371 // about new blocks that have been added to the non-finalized state.
1372 let non_finalized_blocks_listener = NonFinalizedBlocksListener::spawn(
1373 self.network.clone(),
1374 self.non_finalized_state_receiver.clone(),
1375 );
1376
1377 return async move {
1378 Ok(ReadResponse::NonFinalizedBlocksListener(
1379 non_finalized_blocks_listener,
1380 ))
1381 }
1382 .boxed();
1383 };
1384
1385 let request_handler = move || match req {
1386 // Used by the `getblockchaininfo` RPC.
1387 ReadRequest::UsageInfo => Ok(ReadResponse::UsageInfo(state.db.size())),
1388
1389 // Used by the StateService.
1390 ReadRequest::Tip => Ok(ReadResponse::Tip(read::tip(
1391 state.latest_best_chain(),
1392 &state.db,
1393 ))),
1394
1395 // Used by `getblockchaininfo` RPC method.
1396 ReadRequest::TipPoolValues => {
1397 let (tip_height, tip_hash, value_balance) =
1398 read::tip_with_value_balance(state.latest_best_chain(), &state.db)?
1399 .ok_or(BoxError::from("no chain tip available yet"))?;
1400
1401 Ok(ReadResponse::TipPoolValues {
1402 tip_height,
1403 tip_hash,
1404 value_balance,
1405 })
1406 }
1407
1408 // Used by getblock
1409 ReadRequest::BlockInfo(hash_or_height) => Ok(ReadResponse::BlockInfo(
1410 read::block_info(state.latest_best_chain(), &state.db, hash_or_height),
1411 )),
1412
1413 // Used by the StateService.
1414 ReadRequest::Depth(hash) => Ok(ReadResponse::Depth(read::depth(
1415 state.latest_best_chain(),
1416 &state.db,
1417 hash,
1418 ))),
1419
1420 // Used by the StateService.
1421 ReadRequest::BestChainNextMedianTimePast => {
1422 Ok(ReadResponse::BestChainNextMedianTimePast(
1423 read::next_median_time_past(&state.latest_non_finalized_state(), &state.db)?,
1424 ))
1425 }
1426
1427 // Used by the get_block (raw) RPC and the StateService.
1428 ReadRequest::Block(hash_or_height) => Ok(ReadResponse::Block(read::block(
1429 state.latest_best_chain(),
1430 &state.db,
1431 hash_or_height,
1432 ))),
1433
1434 ReadRequest::AnyChainBlock(hash_or_height) => Ok(ReadResponse::Block(read::any_block(
1435 state.latest_non_finalized_state().chain_iter(),
1436 &state.db,
1437 hash_or_height,
1438 ))),
1439
1440 // Used by the get_block (raw) RPC and the StateService.
1441 ReadRequest::BlockAndSize(hash_or_height) => Ok(ReadResponse::BlockAndSize(
1442 read::block_and_size(state.latest_best_chain(), &state.db, hash_or_height),
1443 )),
1444
1445 // Used by the get_block (verbose) RPC and the StateService.
1446 ReadRequest::BlockHeader(hash_or_height) => {
1447 let best_chain = state.latest_best_chain();
1448
1449 let height = hash_or_height
1450 .height_or_else(|hash| {
1451 read::find::height_by_hash(best_chain.clone(), &state.db, hash)
1452 })
1453 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1454
1455 let hash = hash_or_height
1456 .hash_or_else(|height| {
1457 read::find::hash_by_height(best_chain.clone(), &state.db, height)
1458 })
1459 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1460
1461 let next_height = height.next()?;
1462 let next_block_hash =
1463 read::find::hash_by_height(best_chain.clone(), &state.db, next_height);
1464
1465 let header = read::block_header(best_chain, &state.db, height.into())
1466 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1467
1468 Ok(ReadResponse::BlockHeader {
1469 header,
1470 hash,
1471 height,
1472 next_block_hash,
1473 })
1474 }
1475
1476 // For the get_raw_transaction RPC and the StateService.
1477 ReadRequest::Transaction(hash) => Ok(ReadResponse::Transaction(
1478 read::mined_transaction(state.latest_best_chain(), &state.db, hash),
1479 )),
1480
1481 ReadRequest::AnyChainTransaction(hash) => {
1482 Ok(ReadResponse::AnyChainTransaction(read::any_transaction(
1483 state.latest_non_finalized_state().chain_iter(),
1484 &state.db,
1485 hash,
1486 )))
1487 }
1488
1489 // Used by the getblock (verbose) RPC.
1490 ReadRequest::TransactionIdsForBlock(hash_or_height) => Ok(
1491 ReadResponse::TransactionIdsForBlock(read::transaction_hashes_for_block(
1492 state.latest_best_chain(),
1493 &state.db,
1494 hash_or_height,
1495 )),
1496 ),
1497
1498 ReadRequest::AnyChainTransactionIdsForBlock(hash_or_height) => {
1499 Ok(ReadResponse::AnyChainTransactionIdsForBlock(
1500 read::transaction_hashes_for_any_block(
1501 state.latest_non_finalized_state().chain_iter(),
1502 &state.db,
1503 hash_or_height,
1504 ),
1505 ))
1506 }
1507
1508 #[cfg(feature = "indexer")]
1509 ReadRequest::SpendingTransactionId(spend) => Ok(ReadResponse::TransactionId(
1510 read::spending_transaction_hash(state.latest_best_chain(), &state.db, spend),
1511 )),
1512
1513 ReadRequest::UnspentBestChainUtxo(outpoint) => Ok(ReadResponse::UnspentBestChainUtxo(
1514 read::unspent_utxo(state.latest_best_chain(), &state.db, outpoint),
1515 )),
1516
1517 // Manually used by the StateService to implement part of AwaitUtxo.
1518 ReadRequest::AnyChainUtxo(outpoint) => Ok(ReadResponse::AnyChainUtxo(read::any_utxo(
1519 state.latest_non_finalized_state(),
1520 &state.db,
1521 outpoint,
1522 ))),
1523
1524 // Used by the StateService.
1525 ReadRequest::BlockLocator => Ok(ReadResponse::BlockLocator(
1526 read::block_locator(state.latest_best_chain(), &state.db).unwrap_or_default(),
1527 )),
1528
1529 // Used by the StateService.
1530 ReadRequest::FindBlockHashes { known_blocks, stop } => {
1531 Ok(ReadResponse::BlockHashes(read::find_chain_hashes(
1532 state.latest_best_chain(),
1533 &state.db,
1534 known_blocks,
1535 stop,
1536 MAX_FIND_BLOCK_HASHES_RESULTS,
1537 )))
1538 }
1539
1540 // Used by the StateService.
1541 ReadRequest::FindBlockHeaders { known_blocks, stop } => Ok(ReadResponse::BlockHeaders(
1542 read::find_chain_headers(
1543 state.latest_best_chain(),
1544 &state.db,
1545 known_blocks,
1546 stop,
1547 MAX_FIND_BLOCK_HEADERS_RESULTS,
1548 )
1549 .into_iter()
1550 .map(|header| CountedHeader { header })
1551 .collect(),
1552 )),
1553
1554 ReadRequest::SaplingTree(hash_or_height) => Ok(ReadResponse::SaplingTree(
1555 read::sapling_tree(state.latest_best_chain(), &state.db, hash_or_height),
1556 )),
1557
1558 ReadRequest::OrchardTree(hash_or_height) => Ok(ReadResponse::OrchardTree(
1559 read::orchard_tree(state.latest_best_chain(), &state.db, hash_or_height),
1560 )),
1561
1562 ReadRequest::SaplingSubtrees { start_index, limit } => {
1563 let end_index = limit
1564 .and_then(|limit| start_index.0.checked_add(limit.0))
1565 .map(NoteCommitmentSubtreeIndex);
1566
1567 let best_chain = state.latest_best_chain();
1568 let sapling_subtrees = if let Some(end_index) = end_index {
1569 read::sapling_subtrees(best_chain, &state.db, start_index..end_index)
1570 } else {
1571 // If there is no end bound, just return all the trees.
1572 // If the end bound would overflow, just returns all the trees, because that's what
1573 // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1574 // the trees run out.)
1575 read::sapling_subtrees(best_chain, &state.db, start_index..)
1576 };
1577
1578 Ok(ReadResponse::SaplingSubtrees(sapling_subtrees))
1579 }
1580
1581 ReadRequest::OrchardSubtrees { start_index, limit } => {
1582 let end_index = limit
1583 .and_then(|limit| start_index.0.checked_add(limit.0))
1584 .map(NoteCommitmentSubtreeIndex);
1585
1586 let best_chain = state.latest_best_chain();
1587 let orchard_subtrees = if let Some(end_index) = end_index {
1588 read::orchard_subtrees(best_chain, &state.db, start_index..end_index)
1589 } else {
1590 // If there is no end bound, just return all the trees.
1591 // If the end bound would overflow, just returns all the trees, because that's what
1592 // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1593 // the trees run out.)
1594 read::orchard_subtrees(best_chain, &state.db, start_index..)
1595 };
1596
1597 Ok(ReadResponse::OrchardSubtrees(orchard_subtrees))
1598 }
1599
1600 // For the get_address_balance RPC.
1601 ReadRequest::AddressBalance(addresses) => {
1602 let (balance, received) =
1603 read::transparent_balance(state.latest_best_chain(), &state.db, addresses)?;
1604 Ok(ReadResponse::AddressBalance { balance, received })
1605 }
1606
1607 // For the get_address_tx_ids RPC.
1608 ReadRequest::TransactionIdsByAddresses {
1609 addresses,
1610 height_range,
1611 } => read::transparent_tx_ids(
1612 state.latest_best_chain(),
1613 &state.db,
1614 addresses,
1615 height_range,
1616 )
1617 .map(ReadResponse::AddressesTransactionIds),
1618
1619 // For the get_address_utxos RPC.
1620 ReadRequest::UtxosByAddresses(addresses) => read::address_utxos(
1621 &state.network,
1622 state.latest_best_chain(),
1623 &state.db,
1624 addresses,
1625 )
1626 .map(ReadResponse::AddressUtxos),
1627
1628 ReadRequest::CheckBestChainTipNullifiersAndAnchors(unmined_tx) => {
1629 let latest_non_finalized_best_chain = state.latest_best_chain();
1630
1631 check::nullifier::tx_no_duplicates_in_chain(
1632 &state.db,
1633 latest_non_finalized_best_chain.as_ref(),
1634 &unmined_tx.transaction,
1635 )?;
1636
1637 check::anchors::tx_anchors_refer_to_final_treestates(
1638 &state.db,
1639 latest_non_finalized_best_chain.as_ref(),
1640 &unmined_tx,
1641 )?;
1642
1643 Ok(ReadResponse::ValidBestChainTipNullifiersAndAnchors)
1644 }
1645
1646 // Used by the get_block and get_block_hash RPCs.
1647 ReadRequest::BestChainBlockHash(height) => Ok(ReadResponse::BlockHash(
1648 read::hash_by_height(state.latest_best_chain(), &state.db, height),
1649 )),
1650
1651 // Used by get_block_template and getblockchaininfo RPCs.
1652 ReadRequest::ChainInfo => {
1653 // # Correctness
1654 //
1655 // It is ok to do these lookups using multiple database calls. Finalized state updates
1656 // can only add overlapping blocks, and block hashes are unique across all chain forks.
1657 //
1658 // If there is a large overlap between the non-finalized and finalized states,
1659 // where the finalized tip is above the non-finalized tip,
1660 // Zebra is receiving a lot of blocks, or this request has been delayed for a long time.
1661 //
1662 // In that case, the `getblocktemplate` RPC will return an error because Zebra
1663 // is not synced to the tip. That check happens before the RPC makes this request.
1664 read::difficulty::get_block_template_chain_info(
1665 &state.latest_non_finalized_state(),
1666 &state.db,
1667 &state.network,
1668 )
1669 .map(ReadResponse::ChainInfo)
1670 }
1671
1672 // Used by getmininginfo, getnetworksolps, and getnetworkhashps RPCs.
1673 ReadRequest::SolutionRate { num_blocks, height } => {
1674 let latest_non_finalized_state = state.latest_non_finalized_state();
1675 // # Correctness
1676 //
1677 // It is ok to do these lookups using multiple database calls. Finalized state updates
1678 // can only add overlapping blocks, and block hashes are unique across all chain forks.
1679 //
1680 // The worst that can happen here is that the default `start_hash` will be below
1681 // the chain tip.
1682 let (tip_height, tip_hash) =
1683 match read::tip(latest_non_finalized_state.best_chain(), &state.db) {
1684 Some(tip_hash) => tip_hash,
1685 None => return Ok(ReadResponse::SolutionRate(None)),
1686 };
1687
1688 let start_hash = match height {
1689 Some(height) if height < tip_height => read::hash_by_height(
1690 latest_non_finalized_state.best_chain(),
1691 &state.db,
1692 height,
1693 ),
1694 // use the chain tip hash if height is above it or not provided.
1695 _ => Some(tip_hash),
1696 };
1697
1698 let solution_rate = start_hash.and_then(|start_hash| {
1699 read::difficulty::solution_rate(
1700 &latest_non_finalized_state,
1701 &state.db,
1702 num_blocks,
1703 start_hash,
1704 )
1705 });
1706
1707 Ok(ReadResponse::SolutionRate(solution_rate))
1708 }
1709
1710 ReadRequest::CheckBlockProposalValidity(semantically_verified) => {
1711 tracing::debug!(
1712 "attempting to validate and commit block proposal \
1713 onto a cloned non-finalized state"
1714 );
1715 let mut latest_non_finalized_state = state.latest_non_finalized_state();
1716
1717 // The previous block of a valid proposal must be on the best chain tip.
1718 let Some((_best_tip_height, best_tip_hash)) =
1719 read::best_tip(&latest_non_finalized_state, &state.db)
1720 else {
1721 return Err(
1722 "state is empty: wait for Zebra to sync before submitting a proposal"
1723 .into(),
1724 );
1725 };
1726
1727 if semantically_verified.block.header.previous_block_hash != best_tip_hash {
1728 return Err("proposal is not based on the current best chain tip: \
1729 previous block hash must be the best chain tip"
1730 .into());
1731 }
1732
1733 // This clone of the non-finalized state is dropped when this closure returns.
1734 // The non-finalized state that's used in the rest of the state (including finalizing
1735 // blocks into the db) is not mutated here.
1736 //
1737 // TODO: Convert `CommitSemanticallyVerifiedError` to a new `ValidateProposalError`?
1738 latest_non_finalized_state.disable_metrics();
1739
1740 write::validate_and_commit_non_finalized(
1741 &state.db,
1742 &mut latest_non_finalized_state,
1743 semantically_verified,
1744 )?;
1745
1746 Ok(ReadResponse::ValidBlockProposal)
1747 }
1748
1749 ReadRequest::TipBlockSize => {
1750 // Respond with the length of the obtained block if any.
1751 Ok(ReadResponse::TipBlockSize(
1752 state
1753 .best_tip()
1754 .and_then(|(tip_height, _)| {
1755 read::block_info(
1756 state.latest_best_chain(),
1757 &state.db,
1758 tip_height.into(),
1759 )
1760 })
1761 .map(|info| info.size().try_into().expect("u32 should fit in usize"))
1762 .or_else(|| {
1763 find::tip_block(state.latest_best_chain(), &state.db)
1764 .map(|b| b.zcash_serialized_size())
1765 }),
1766 ))
1767 }
1768
1769 ReadRequest::NonFinalizedBlocksListener => {
1770 unreachable!("should return early");
1771 }
1772
1773 // Used by `gettxout` RPC method.
1774 ReadRequest::IsTransparentOutputSpent(outpoint) => {
1775 let is_spent = read::unspent_utxo(state.latest_best_chain(), &state.db, outpoint);
1776 Ok(ReadResponse::IsTransparentOutputSpent(is_spent.is_none()))
1777 }
1778 };
1779
1780 timed_span.spawn_blocking(request_handler)
1781 }
1782}
1783
1784/// Initialize a state service from the provided [`Config`].
1785/// Returns a boxed state service, a read-only state service,
1786/// and receivers for state chain tip updates.
1787///
1788/// Each `network` has its own separate on-disk database.
1789///
1790/// The state uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
1791/// to work out when it is near the final checkpoint.
1792///
1793/// To share access to the state, wrap the returned service in a `Buffer`,
1794/// or clone the returned [`ReadStateService`].
1795///
1796/// It's possible to construct multiple state services in the same application (as
1797/// long as they, e.g., use different storage locations), but doing so is
1798/// probably not what you want.
1799pub async fn init(
1800 config: Config,
1801 network: &Network,
1802 max_checkpoint_height: block::Height,
1803 checkpoint_verify_concurrency_limit: usize,
1804) -> (
1805 BoxService<Request, Response, BoxError>,
1806 ReadStateService,
1807 LatestChainTip,
1808 ChainTipChange,
1809) {
1810 let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
1811 StateService::new(
1812 config,
1813 network,
1814 max_checkpoint_height,
1815 checkpoint_verify_concurrency_limit,
1816 )
1817 .await;
1818
1819 (
1820 BoxService::new(state_service),
1821 read_only_state_service,
1822 latest_chain_tip,
1823 chain_tip_change,
1824 )
1825}
1826
1827/// Initialize a read state service from the provided [`Config`].
1828/// Returns a read-only state service,
1829///
1830/// Each `network` has its own separate on-disk database.
1831///
1832/// To share access to the state, clone the returned [`ReadStateService`].
1833pub fn init_read_only(
1834 config: Config,
1835 network: &Network,
1836) -> (
1837 ReadStateService,
1838 ZebraDb,
1839 tokio::sync::watch::Sender<NonFinalizedState>,
1840) {
1841 let finalized_state = FinalizedState::new_with_debug(
1842 &config,
1843 network,
1844 true,
1845 #[cfg(feature = "elasticsearch")]
1846 false,
1847 true,
1848 );
1849 let (non_finalized_state_sender, non_finalized_state_receiver) =
1850 tokio::sync::watch::channel(NonFinalizedState::new(network));
1851
1852 (
1853 ReadStateService::new(
1854 &finalized_state,
1855 None,
1856 WatchReceiver::new(non_finalized_state_receiver),
1857 ),
1858 finalized_state.db.clone(),
1859 non_finalized_state_sender,
1860 )
1861}
1862
1863/// Calls [`init_read_only`] with the provided [`Config`] and [`Network`] from a blocking task.
1864/// Returns a [`tokio::task::JoinHandle`] with a read state service and chain tip sender.
1865pub fn spawn_init_read_only(
1866 config: Config,
1867 network: &Network,
1868) -> tokio::task::JoinHandle<(
1869 ReadStateService,
1870 ZebraDb,
1871 tokio::sync::watch::Sender<NonFinalizedState>,
1872)> {
1873 let network = network.clone();
1874 tokio::task::spawn_blocking(move || init_read_only(config, &network))
1875}
1876
1877/// Returns a [`StateService`] with an ephemeral [`Config`] and a buffer with a single slot.
1878///
1879/// This can be used to create a state service for testing. See also [`init`].
1880#[cfg(any(test, feature = "proptest-impl"))]
1881pub async fn init_test(
1882 network: &Network,
1883) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
1884 // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
1885 // if we ever need to test final checkpoint sent UTXO queries
1886 let (state_service, _, _, _) =
1887 StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
1888
1889 Buffer::new(BoxService::new(state_service), 1)
1890}
1891
1892/// Initializes a state service with an ephemeral [`Config`] and a buffer with a single slot,
1893/// then returns the read-write service, read-only service, and tip watch channels.
1894///
1895/// This can be used to create a state service for testing. See also [`init`].
1896#[cfg(any(test, feature = "proptest-impl"))]
1897pub async fn init_test_services(
1898 network: &Network,
1899) -> (
1900 Buffer<BoxService<Request, Response, BoxError>, Request>,
1901 ReadStateService,
1902 LatestChainTip,
1903 ChainTipChange,
1904) {
1905 // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
1906 // if we ever need to test final checkpoint sent UTXO queries
1907 let (state_service, read_state_service, latest_chain_tip, chain_tip_change) =
1908 StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
1909
1910 let state_service = Buffer::new(BoxService::new(state_service), 1);
1911
1912 (
1913 state_service,
1914 read_state_service,
1915 latest_chain_tip,
1916 chain_tip_change,
1917 )
1918}