zebra_state/service.rs
1//! [`tower::Service`]s for Zebra's cached chain state.
2//!
3//! Zebra provides cached state access via two main services:
4//! - [`StateService`]: a read-write service that writes blocks to the state,
5//! and redirects most read requests to the [`ReadStateService`].
6//! - [`ReadStateService`]: a read-only service that answers from the most
7//! recent committed block.
8//!
9//! Most users should prefer [`ReadStateService`], unless they need to write blocks to the state.
10//!
11//! Zebra also provides access to the best chain tip via:
12//! - [`LatestChainTip`]: a read-only channel that contains the latest committed
13//! tip.
14//! - [`ChainTipChange`]: a read-only channel that can asynchronously await
15//! chain tip changes.
16
17use std::{
18 collections::HashMap,
19 future::Future,
20 pin::Pin,
21 sync::Arc,
22 task::{Context, Poll},
23 time::{Duration, Instant},
24};
25
26use futures::future::FutureExt;
27use tokio::sync::oneshot;
28use tower::{util::BoxService, Service, ServiceExt};
29use tracing::{instrument, Instrument, Span};
30
31#[cfg(any(test, feature = "proptest-impl"))]
32use tower::buffer::Buffer;
33
34use zebra_chain::{
35 block::{self, CountedHeader, HeightDiff},
36 diagnostic::CodeTimer,
37 parameters::{Network, NetworkUpgrade},
38 serialization::ZcashSerialize,
39 subtree::NoteCommitmentSubtreeIndex,
40};
41
42use crate::{
43 constants::{
44 MAX_FIND_BLOCK_HASHES_RESULTS, MAX_FIND_BLOCK_HEADERS_RESULTS, MAX_LEGACY_CHAIN_BLOCKS,
45 },
46 error::{CommitBlockError, CommitCheckpointVerifiedError, InvalidateError, ReconsiderError},
47 request::TimedSpan,
48 response::NonFinalizedBlocksListener,
49 service::{
50 block_iter::any_ancestor_blocks,
51 chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip},
52 finalized_state::{FinalizedState, ZebraDb},
53 non_finalized_state::{Chain, NonFinalizedState},
54 pending_utxos::PendingUtxos,
55 queued_blocks::QueuedBlocks,
56 read::find,
57 watch_receiver::WatchReceiver,
58 },
59 BoxError, CheckpointVerifiedBlock, CommitSemanticallyVerifiedError, Config, KnownBlock,
60 ReadRequest, ReadResponse, Request, Response, SemanticallyVerifiedBlock,
61};
62
63pub mod block_iter;
64pub mod chain_tip;
65pub mod watch_receiver;
66
67pub mod check;
68
69pub(crate) mod finalized_state;
70pub(crate) mod non_finalized_state;
71mod pending_utxos;
72mod queued_blocks;
73pub(crate) mod read;
74mod traits;
75mod write;
76
77#[cfg(any(test, feature = "proptest-impl"))]
78pub mod arbitrary;
79
80#[cfg(test)]
81mod tests;
82
83pub use finalized_state::{OutputLocation, TransactionIndex, TransactionLocation};
84use write::NonFinalizedWriteMessage;
85
86use self::queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified, SentHashes};
87
88pub use self::traits::{ReadState, State};
89
90/// A read-write service for Zebra's cached blockchain state.
91///
92/// This service modifies and provides access to:
93/// - the non-finalized state: the ~100 most recent blocks.
94/// Zebra allows chain forks in the non-finalized state,
95/// stores it in memory, and re-downloads it when restarted.
96/// - the finalized state: older blocks that have many confirmations.
97/// Zebra stores the single best chain in the finalized state,
98/// and re-loads it from disk when restarted.
99///
100/// Read requests to this service are buffered, then processed concurrently.
101/// Block write requests are buffered, then queued, then processed in order by a separate task.
102///
103/// Most state users can get faster read responses using the [`ReadStateService`],
104/// because its requests do not share a [`tower::buffer::Buffer`] with block write requests.
105///
106/// To quickly get the latest block, use [`LatestChainTip`] or [`ChainTipChange`].
107/// They can read the latest block directly, without queueing any requests.
108#[derive(Debug)]
109pub(crate) struct StateService {
110 // Configuration
111 //
112 /// The configured Zcash network.
113 network: Network,
114
115 /// The height that we start storing UTXOs from finalized blocks.
116 ///
117 /// This height should be lower than the last few checkpoints,
118 /// so the full verifier can verify UTXO spends from those blocks,
119 /// even if they haven't been committed to the finalized state yet.
120 full_verifier_utxo_lookahead: block::Height,
121
122 // Queued Blocks
123 //
124 /// Queued blocks for the [`NonFinalizedState`] that arrived out of order.
125 /// These blocks are awaiting their parent blocks before they can do contextual verification.
126 non_finalized_state_queued_blocks: QueuedBlocks,
127
128 /// Queued blocks for the [`FinalizedState`] that arrived out of order.
129 /// These blocks are awaiting their parent blocks before they can do contextual verification.
130 ///
131 /// Indexed by their parent block hash.
132 finalized_state_queued_blocks: HashMap<block::Hash, QueuedCheckpointVerified>,
133
134 /// Channels to send blocks to the block write task.
135 block_write_sender: write::BlockWriteSender,
136
137 /// The [`block::Hash`] of the most recent block sent on
138 /// `finalized_block_write_sender` or `non_finalized_block_write_sender`.
139 ///
140 /// On startup, this is:
141 /// - the finalized tip, if there are stored blocks, or
142 /// - the genesis block's parent hash, if the database is empty.
143 ///
144 /// If `invalid_block_write_reset_receiver` gets a reset, this is:
145 /// - the hash of the last valid committed block (the parent of the invalid block).
146 finalized_block_write_last_sent_hash: block::Hash,
147
148 /// A set of block hashes that have been sent to the block write task.
149 /// Hashes of blocks below the finalized tip height are periodically pruned.
150 non_finalized_block_write_sent_hashes: SentHashes,
151
152 /// If an invalid block is sent on `finalized_block_write_sender`
153 /// or `non_finalized_block_write_sender`,
154 /// this channel gets the [`block::Hash`] of the valid tip.
155 //
156 // TODO: add tests for finalized and non-finalized resets (#2654)
157 invalid_block_write_reset_receiver: tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
158
159 /// Receives the hash of every non-finalized block that the write task
160 /// rejected, so the corresponding entry can be removed from
161 /// `non_finalized_block_write_sent_hashes`.
162 ///
163 /// Without this, a rejected same-hash block locks out a later honest
164 /// re-delivery of a block at the same hash as a "duplicate" until restart
165 /// or reorg.
166 non_finalized_rejected_receiver: tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
167
168 // Pending UTXO Request Tracking
169 //
170 /// The set of outpoints with pending requests for their associated transparent::Output.
171 pending_utxos: PendingUtxos,
172
173 /// Instant tracking the last time `pending_utxos` was pruned.
174 last_prune: Instant,
175
176 // Updating Concurrently Readable State
177 //
178 /// A cloneable [`ReadStateService`], used to answer concurrent read requests.
179 ///
180 /// TODO: move users of read [`Request`]s to [`ReadStateService`], and remove `read_service`.
181 read_service: ReadStateService,
182
183 // Metrics
184 //
185 /// A metric tracking the maximum height that's currently in `finalized_state_queued_blocks`
186 ///
187 /// Set to `f64::NAN` if `finalized_state_queued_blocks` is empty, because grafana shows NaNs
188 /// as a break in the graph.
189 max_finalized_queue_height: f64,
190}
191
192/// A read-only service for accessing Zebra's cached blockchain state.
193///
194/// This service provides read-only access to:
195/// - the non-finalized state: the ~100 most recent blocks.
196/// - the finalized state: older blocks that have many confirmations.
197///
198/// Requests to this service are processed in parallel,
199/// ignoring any blocks queued by the read-write [`StateService`].
200///
201/// This quick response behavior is better for most state users.
202/// It allows other async tasks to make progress while concurrently reading data from disk.
203#[derive(Clone, Debug)]
204pub struct ReadStateService {
205 // Configuration
206 //
207 /// The configured Zcash network.
208 network: Network,
209
210 // Shared Concurrently Readable State
211 //
212 /// A watch channel with a cached copy of the [`NonFinalizedState`].
213 ///
214 /// This state is only updated between requests,
215 /// so it might include some block data that is also on `disk`.
216 non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
217
218 /// The shared inner on-disk database for the finalized state.
219 ///
220 /// RocksDB allows reads and writes via a shared reference,
221 /// but [`ZebraDb`] doesn't expose any write methods or types.
222 ///
223 /// This chain is updated concurrently with requests,
224 /// so it might include some block data that is also in `best_mem`.
225 db: ZebraDb,
226
227 /// A shared handle to a task that writes blocks to the [`NonFinalizedState`] or [`FinalizedState`],
228 /// once the queues have received all their parent blocks.
229 ///
230 /// Used to check for panics when writing blocks.
231 block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
232}
233
234impl Drop for StateService {
235 fn drop(&mut self) {
236 // The state service owns the state, tasks, and channels,
237 // so dropping it should shut down everything.
238
239 // Close the channels (non-blocking)
240 // This makes the block write thread exit the next time it checks the channels.
241 // We want to do this here so we get any errors or panics from the block write task before it shuts down.
242 self.invalid_block_write_reset_receiver.close();
243 self.non_finalized_rejected_receiver.close();
244
245 std::mem::drop(self.block_write_sender.finalized.take());
246 std::mem::drop(self.block_write_sender.non_finalized.take());
247
248 self.clear_finalized_block_queue(CommitBlockError::WriteTaskExited);
249 self.clear_non_finalized_block_queue(CommitBlockError::WriteTaskExited);
250
251 // Log database metrics before shutting down
252 info!("dropping the state: logging database metrics");
253 self.log_db_metrics();
254
255 // Then drop self.read_service, which checks the block write task for panics,
256 // and tries to shut down the database.
257 }
258}
259
260impl Drop for ReadStateService {
261 fn drop(&mut self) {
262 // The read state service shares the state,
263 // so dropping it should check if we can shut down.
264
265 // TODO: move this into a try_shutdown() method
266 if let Some(block_write_task) = self.block_write_task.take() {
267 if let Some(block_write_task_handle) = Arc::into_inner(block_write_task) {
268 // We're the last database user, so we can tell it to shut down (blocking):
269 // - flushes the database to disk, and
270 // - drops the database, which cleans up any database tasks correctly.
271 self.db.shutdown(true);
272
273 // We are the last state with a reference to this thread, so we can
274 // wait until the block write task finishes, then check for panics (blocking).
275 // (We'd also like to abort the thread, but std::thread::JoinHandle can't do that.)
276
277 // This log is verbose during tests.
278 #[cfg(not(test))]
279 info!("waiting for the block write task to finish");
280 #[cfg(test)]
281 debug!("waiting for the block write task to finish");
282
283 // TODO: move this into a check_for_panics() method
284 if let Err(thread_panic) = block_write_task_handle.join() {
285 std::panic::resume_unwind(thread_panic);
286 } else {
287 debug!("shutting down the state because the block write task has finished");
288 }
289 }
290 } else {
291 // Even if we're not the last database user, try shutting it down.
292 //
293 // TODO: rename this to try_shutdown()?
294 self.db.shutdown(false);
295 }
296 }
297}
298
299impl StateService {
300 const PRUNE_INTERVAL: Duration = Duration::from_secs(30);
301
302 /// Creates a new state service for the state `config` and `network`.
303 ///
304 /// Uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
305 /// to work out when it is near the final checkpoint.
306 ///
307 /// Returns the read-write and read-only state services,
308 /// and read-only watch channels for its best chain tip.
309 pub async fn new(
310 config: Config,
311 network: &Network,
312 max_checkpoint_height: block::Height,
313 checkpoint_verify_concurrency_limit: usize,
314 ) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
315 let (finalized_state, finalized_tip, timer) = {
316 let config = config.clone();
317 let network = network.clone();
318 tokio::task::spawn_blocking(move || {
319 let timer = CodeTimer::start();
320 let finalized_state = FinalizedState::new(
321 &config,
322 &network,
323 #[cfg(feature = "elasticsearch")]
324 true,
325 );
326 timer.finish_desc("opening finalized state database");
327
328 let timer = CodeTimer::start();
329 let finalized_tip = finalized_state.db.tip_block();
330
331 (finalized_state, finalized_tip, timer)
332 })
333 .await
334 .expect("failed to join blocking task")
335 };
336
337 // # Correctness
338 //
339 // The state service must set the finalized block write sender to `None`
340 // if there are blocks in the restored non-finalized state that are above
341 // the max checkpoint height so that non-finalized blocks can be written, otherwise,
342 // Zebra will be unable to commit semantically verified blocks, and its chain sync will stall.
343 //
344 // The state service must not set the finalized block write sender to `None` if there
345 // aren't blocks in the restored non-finalized state that are above the max checkpoint height,
346 // otherwise, unless checkpoint sync is disabled in the zebra-consensus configuration,
347 // Zebra will be unable to commit checkpoint verified blocks, and its chain sync will stall.
348 let is_finalized_tip_past_max_checkpoint = if let Some(tip) = &finalized_tip {
349 tip.coinbase_height().expect("valid block must have height") >= max_checkpoint_height
350 } else {
351 false
352 };
353 let backup_dir_path = config.non_finalized_state_backup_dir(network);
354 let skip_backup_task = config.debug_skip_non_finalized_state_backup_task;
355 let (non_finalized_state, non_finalized_state_sender, non_finalized_state_receiver) =
356 NonFinalizedState::new(network)
357 .with_backup(
358 backup_dir_path.clone(),
359 &finalized_state.db,
360 is_finalized_tip_past_max_checkpoint,
361 config.debug_skip_non_finalized_state_backup_task,
362 )
363 .await;
364
365 let non_finalized_block_write_sent_hashes = SentHashes::new(&non_finalized_state);
366 let initial_tip = non_finalized_state
367 .best_tip_block()
368 .map(|cv_block| cv_block.block.clone())
369 .or(finalized_tip)
370 .map(CheckpointVerifiedBlock::from)
371 .map(ChainTipBlock::from);
372
373 tracing::info!(chain_tip = ?initial_tip.as_ref().map(|tip| (tip.hash, tip.height)), "loaded Zebra state cache");
374
375 let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
376 ChainTipSender::new(initial_tip, network);
377
378 let finalized_state_for_writing = finalized_state.clone();
379 let should_use_finalized_block_write_sender = non_finalized_state.is_chain_set_empty();
380 let sync_backup_dir_path = backup_dir_path.filter(|_| skip_backup_task);
381 let (
382 block_write_sender,
383 invalid_block_write_reset_receiver,
384 non_finalized_rejected_receiver,
385 block_write_task,
386 ) = write::BlockWriteSender::spawn(
387 finalized_state_for_writing,
388 non_finalized_state,
389 chain_tip_sender,
390 non_finalized_state_sender,
391 should_use_finalized_block_write_sender,
392 sync_backup_dir_path,
393 );
394
395 let read_service = ReadStateService::new(
396 &finalized_state,
397 block_write_task,
398 non_finalized_state_receiver,
399 );
400
401 let full_verifier_utxo_lookahead = max_checkpoint_height
402 - HeightDiff::try_from(checkpoint_verify_concurrency_limit)
403 .expect("fits in HeightDiff");
404 let full_verifier_utxo_lookahead =
405 full_verifier_utxo_lookahead.unwrap_or(block::Height::MIN);
406 let non_finalized_state_queued_blocks = QueuedBlocks::default();
407 let pending_utxos = PendingUtxos::default();
408
409 let finalized_block_write_last_sent_hash =
410 tokio::task::spawn_blocking(move || finalized_state.db.finalized_tip_hash())
411 .await
412 .expect("failed to join blocking task");
413
414 let state = Self {
415 network: network.clone(),
416 full_verifier_utxo_lookahead,
417 non_finalized_state_queued_blocks,
418 finalized_state_queued_blocks: HashMap::new(),
419 block_write_sender,
420 finalized_block_write_last_sent_hash,
421 non_finalized_block_write_sent_hashes,
422 invalid_block_write_reset_receiver,
423 non_finalized_rejected_receiver,
424 pending_utxos,
425 last_prune: Instant::now(),
426 read_service: read_service.clone(),
427 max_finalized_queue_height: f64::NAN,
428 };
429 timer.finish_desc("initializing state service");
430
431 tracing::info!("starting legacy chain check");
432 let timer = CodeTimer::start();
433
434 if let (Some(tip), Some(nu5_activation_height)) = (
435 {
436 let read_state = state.read_service.clone();
437 tokio::task::spawn_blocking(move || read_state.best_tip())
438 .await
439 .expect("task should not panic")
440 },
441 NetworkUpgrade::Nu5.activation_height(network),
442 ) {
443 if let Err(error) = check::legacy_chain(
444 nu5_activation_height,
445 any_ancestor_blocks(
446 &state.read_service.latest_non_finalized_state(),
447 &state.read_service.db,
448 tip.1,
449 ),
450 &state.network,
451 MAX_LEGACY_CHAIN_BLOCKS,
452 ) {
453 let legacy_db_path = state.read_service.db.path().to_path_buf();
454 panic!(
455 "Cached state contains a legacy chain.\n\
456 An outdated Zebra version did not know about a recent network upgrade,\n\
457 so it followed a legacy chain using outdated consensus branch rules.\n\
458 Hint: Delete your database, and restart Zebra to do a full sync.\n\
459 Database path: {legacy_db_path:?}\n\
460 Error: {error:?}",
461 );
462 }
463 }
464
465 tracing::info!("cached state consensus branch is valid: no legacy chain found");
466 timer.finish_desc("legacy chain check");
467
468 // Spawn a background task to periodically export RocksDB metrics to Prometheus
469 let db_for_metrics = read_service.db.clone();
470 tokio::spawn(async move {
471 let mut interval = tokio::time::interval(Duration::from_secs(30));
472 loop {
473 interval.tick().await;
474 db_for_metrics.export_metrics();
475 }
476 });
477
478 (state, read_service, latest_chain_tip, chain_tip_change)
479 }
480
481 /// Call read only state service to log rocksdb database metrics.
482 pub fn log_db_metrics(&self) {
483 self.read_service.db.print_db_metrics();
484 }
485
486 /// Queue a checkpoint verified block for verification and storage in the finalized state.
487 ///
488 /// Returns a channel receiver that provides the result of the block commit.
489 fn queue_and_commit_to_finalized_state(
490 &mut self,
491 checkpoint_verified: CheckpointVerifiedBlock,
492 ) -> oneshot::Receiver<Result<block::Hash, CommitCheckpointVerifiedError>> {
493 // # Correctness & Performance
494 //
495 // This method must not block, access the database, or perform CPU-intensive tasks,
496 // because it is called directly from the tokio executor's Future threads.
497
498 let queued_prev_hash = checkpoint_verified.block.header.previous_block_hash;
499 let queued_height = checkpoint_verified.height;
500
501 // If we're close to the final checkpoint, make the block's UTXOs available for
502 // semantic block verification, even when it is in the channel.
503 if self.is_close_to_final_checkpoint(queued_height) {
504 self.non_finalized_block_write_sent_hashes
505 .add_finalized(&checkpoint_verified)
506 }
507
508 let (rsp_tx, rsp_rx) = oneshot::channel();
509 let queued = (checkpoint_verified, rsp_tx);
510
511 if self.block_write_sender.finalized.is_some() {
512 // We're still committing checkpoint verified blocks
513 if let Some(duplicate_queued) = self
514 .finalized_state_queued_blocks
515 .insert(queued_prev_hash, queued)
516 {
517 Self::send_checkpoint_verified_block_error(
518 duplicate_queued,
519 CommitBlockError::new_duplicate(
520 Some(queued_prev_hash.into()),
521 KnownBlock::Queue,
522 ),
523 );
524 }
525
526 self.drain_finalized_queue_and_commit();
527 } else {
528 // We've finished committing checkpoint verified blocks to the finalized state,
529 // so drop any repeated queued blocks, and return an error.
530 //
531 // TODO: track the latest sent height, and drop any blocks under that height
532 // every time we send some blocks (like QueuedSemanticallyVerifiedBlocks)
533 Self::send_checkpoint_verified_block_error(
534 queued,
535 CommitBlockError::new_duplicate(None, KnownBlock::Finalized),
536 );
537
538 self.clear_finalized_block_queue(CommitBlockError::new_duplicate(
539 None,
540 KnownBlock::Finalized,
541 ));
542 }
543
544 if self.finalized_state_queued_blocks.is_empty() {
545 self.max_finalized_queue_height = f64::NAN;
546 } else if self.max_finalized_queue_height.is_nan()
547 || self.max_finalized_queue_height < queued_height.0 as f64
548 {
549 // if there are still blocks in the queue, then either:
550 // - the new block was lower than the old maximum, and there was a gap before it,
551 // so the maximum is still the same (and we skip this code), or
552 // - the new block is higher than the old maximum, and there is at least one gap
553 // between the finalized tip and the new maximum
554 self.max_finalized_queue_height = queued_height.0 as f64;
555 }
556
557 metrics::gauge!("state.checkpoint.queued.max.height").set(self.max_finalized_queue_height);
558 metrics::gauge!("state.checkpoint.queued.block.count")
559 .set(self.finalized_state_queued_blocks.len() as f64);
560
561 rsp_rx
562 }
563
564 /// Finds finalized state queue blocks to be committed to the state in order,
565 /// removes them from the queue, and sends them to the block commit task.
566 ///
567 /// After queueing a finalized block, this method checks whether the newly
568 /// queued block (and any of its descendants) can be committed to the state.
569 ///
570 /// Returns an error if the block commit channel has been closed.
571 pub fn drain_finalized_queue_and_commit(&mut self) {
572 use tokio::sync::mpsc::error::{SendError, TryRecvError};
573
574 // # Correctness & Performance
575 //
576 // This method must not block, access the database, or perform CPU-intensive tasks,
577 // because it is called directly from the tokio executor's Future threads.
578
579 // If a block failed, we need to start again from a valid tip.
580 match self.invalid_block_write_reset_receiver.try_recv() {
581 Ok(reset_tip_hash) => self.finalized_block_write_last_sent_hash = reset_tip_hash,
582 Err(TryRecvError::Disconnected) => {
583 info!("Block commit task closed the block reset channel. Is Zebra shutting down?");
584 return;
585 }
586 // There are no errors, so we can just use the last block hash we sent
587 Err(TryRecvError::Empty) => {}
588 }
589
590 while let Some(queued_block) = self
591 .finalized_state_queued_blocks
592 .remove(&self.finalized_block_write_last_sent_hash)
593 {
594 let last_sent_finalized_block_height = queued_block.0.height;
595
596 self.finalized_block_write_last_sent_hash = queued_block.0.hash;
597
598 // If we've finished sending finalized blocks, ignore any repeated blocks.
599 // (Blocks can be repeated after a syncer reset.)
600 if let Some(finalized_block_write_sender) = &self.block_write_sender.finalized {
601 let send_result = finalized_block_write_sender.send(queued_block);
602
603 // If the receiver is closed, we can't send any more blocks.
604 if let Err(SendError(queued)) = send_result {
605 // If Zebra is shutting down, drop blocks and return an error.
606 Self::send_checkpoint_verified_block_error(
607 queued,
608 CommitBlockError::WriteTaskExited,
609 );
610
611 self.clear_finalized_block_queue(CommitBlockError::WriteTaskExited);
612 } else {
613 metrics::gauge!("state.checkpoint.sent.block.height")
614 .set(last_sent_finalized_block_height.0 as f64);
615 };
616 }
617 }
618 }
619
620 /// Drains every hash queued on `non_finalized_rejected_receiver` and
621 /// removes it from `non_finalized_block_write_sent_hashes`.
622 ///
623 /// This closes the lockout window where a rejected block keeps its hash
624 /// recorded as "sent", so a subsequent honest re-delivery of a block at
625 /// the same hash is not short-circuited as a false "duplicate".
626 ///
627 /// # Correctness & Performance
628 ///
629 /// Like the other drain methods on `StateService`, this must not block,
630 /// access the database, or perform CPU-intensive work, because it is
631 /// called directly from the tokio executor's Future threads.
632 fn drain_non_finalized_rejected_hashes(&mut self) {
633 use tokio::sync::mpsc::error::TryRecvError;
634
635 loop {
636 match self.non_finalized_rejected_receiver.try_recv() {
637 Ok(hash) => {
638 self.non_finalized_block_write_sent_hashes.remove(&hash);
639 }
640 Err(TryRecvError::Empty) => break,
641 Err(TryRecvError::Disconnected) => {
642 info!(
643 "Block commit task closed the non-finalized rejected hash channel. \
644 Is Zebra shutting down?"
645 );
646 break;
647 }
648 }
649 }
650 }
651
652 /// Drops all finalized state queue blocks, and sends an error on their result channels.
653 fn clear_finalized_block_queue(
654 &mut self,
655 error: impl Into<CommitCheckpointVerifiedError> + Clone,
656 ) {
657 for (_hash, queued) in self.finalized_state_queued_blocks.drain() {
658 Self::send_checkpoint_verified_block_error(queued, error.clone());
659 }
660 }
661
662 /// Send an error on a `QueuedCheckpointVerified` block's result channel, and drop the block
663 fn send_checkpoint_verified_block_error(
664 queued: QueuedCheckpointVerified,
665 error: impl Into<CommitCheckpointVerifiedError>,
666 ) {
667 let (finalized, rsp_tx) = queued;
668
669 // The block sender might have already given up on this block,
670 // so ignore any channel send errors.
671 let _ = rsp_tx.send(Err(error.into()));
672 std::mem::drop(finalized);
673 }
674
675 /// Drops all non-finalized state queue blocks, and sends an error on their result channels.
676 fn clear_non_finalized_block_queue(
677 &mut self,
678 error: impl Into<CommitSemanticallyVerifiedError> + Clone,
679 ) {
680 for (_hash, queued) in self.non_finalized_state_queued_blocks.drain() {
681 Self::send_semantically_verified_block_error(queued, error.clone());
682 }
683 }
684
685 /// Send an error on a `QueuedSemanticallyVerified` block's result channel, and drop the block
686 fn send_semantically_verified_block_error(
687 queued: QueuedSemanticallyVerified,
688 error: impl Into<CommitSemanticallyVerifiedError>,
689 ) {
690 let (finalized, rsp_tx) = queued;
691
692 // The block sender might have already given up on this block,
693 // so ignore any channel send errors.
694 let _ = rsp_tx.send(Err(error.into()));
695 std::mem::drop(finalized);
696 }
697
698 /// Queue a semantically verified block for contextual verification and check if any queued
699 /// blocks are ready to be verified and committed to the state.
700 ///
701 /// This function encodes the logic for [committing non-finalized blocks][1]
702 /// in RFC0005.
703 ///
704 /// [1]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html#committing-non-finalized-blocks
705 #[instrument(level = "debug", skip(self, semantically_verified))]
706 fn queue_and_commit_to_non_finalized_state(
707 &mut self,
708 semantically_verified: SemanticallyVerifiedBlock,
709 ) -> oneshot::Receiver<Result<block::Hash, CommitSemanticallyVerifiedError>> {
710 tracing::debug!(block = %semantically_verified.block, "queueing block for contextual verification");
711 let parent_hash = semantically_verified.block.header.previous_block_hash;
712
713 // Drop hashes of any blocks the write task has rejected before checking
714 // the SentHashes membership below. Without this, a rejected same-hash
715 // block would lock out a later honest re-delivery of a block at the
716 // same hash as a false "duplicate".
717 self.drain_non_finalized_rejected_hashes();
718
719 if self
720 .non_finalized_block_write_sent_hashes
721 .contains(&semantically_verified.hash)
722 {
723 let (rsp_tx, rsp_rx) = oneshot::channel();
724 let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
725 Some(semantically_verified.hash.into()),
726 KnownBlock::WriteChannel,
727 )
728 .into()));
729 return rsp_rx;
730 }
731
732 if self
733 .read_service
734 .db
735 .contains_height(semantically_verified.height)
736 {
737 let (rsp_tx, rsp_rx) = oneshot::channel();
738 let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
739 Some(semantically_verified.height.into()),
740 KnownBlock::Finalized,
741 )
742 .into()));
743 return rsp_rx;
744 }
745
746 // [`Request::CommitSemanticallyVerifiedBlock`] contract: a request to commit a block which
747 // has been queued but not yet committed to the state fails the older request and replaces
748 // it with the newer request.
749 let rsp_rx = if let Some((_, old_rsp_tx)) = self
750 .non_finalized_state_queued_blocks
751 .get_mut(&semantically_verified.hash)
752 {
753 tracing::debug!("replacing older queued request with new request");
754 let (mut rsp_tx, rsp_rx) = oneshot::channel();
755 std::mem::swap(old_rsp_tx, &mut rsp_tx);
756 let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
757 Some(semantically_verified.hash.into()),
758 KnownBlock::Queue,
759 )
760 .into()));
761 rsp_rx
762 } else {
763 let (rsp_tx, rsp_rx) = oneshot::channel();
764 self.non_finalized_state_queued_blocks
765 .queue((semantically_verified, rsp_tx));
766 rsp_rx
767 };
768
769 // We've finished sending checkpoint verified blocks when:
770 // - we've sent the verified block for the last checkpoint, and
771 // - it has been successfully written to disk.
772 //
773 // We detect the last checkpoint by looking for non-finalized blocks
774 // that are a child of the last block we sent.
775 //
776 // TODO: configure the state with the last checkpoint hash instead?
777 if self.block_write_sender.finalized.is_some()
778 && self
779 .non_finalized_state_queued_blocks
780 .has_queued_children(self.finalized_block_write_last_sent_hash)
781 && self.read_service.db.finalized_tip_hash()
782 == self.finalized_block_write_last_sent_hash
783 {
784 // Tell the block write task to stop committing checkpoint verified blocks to the finalized state,
785 // and move on to committing semantically verified blocks to the non-finalized state.
786 std::mem::drop(self.block_write_sender.finalized.take());
787 // Remove any checkpoint-verified block hashes from `non_finalized_block_write_sent_hashes`.
788 self.non_finalized_block_write_sent_hashes = SentHashes::default();
789 // Mark `SentHashes` as usable by the `can_fork_chain_at()` method.
790 self.non_finalized_block_write_sent_hashes
791 .can_fork_chain_at_hashes = true;
792 // Send blocks from non-finalized queue
793 self.send_ready_non_finalized_queued(self.finalized_block_write_last_sent_hash);
794 // We've finished committing checkpoint verified blocks to finalized state, so drop any repeated queued blocks.
795 self.clear_finalized_block_queue(CommitBlockError::new_duplicate(
796 None,
797 KnownBlock::Finalized,
798 ));
799 } else if !self.can_fork_chain_at(&parent_hash) {
800 tracing::trace!("unready to verify, returning early");
801 } else if self.block_write_sender.finalized.is_none() {
802 // Wait until block commit task is ready to write non-finalized blocks before dequeuing them
803 self.send_ready_non_finalized_queued(parent_hash);
804
805 let finalized_tip_height = self.read_service.db.finalized_tip_height().expect(
806 "Finalized state must have at least one block before committing non-finalized state",
807 );
808
809 self.non_finalized_state_queued_blocks
810 .prune_by_height(finalized_tip_height);
811
812 self.non_finalized_block_write_sent_hashes
813 .prune_by_height(finalized_tip_height);
814 }
815
816 rsp_rx
817 }
818
819 /// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks.
820 fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
821 self.non_finalized_block_write_sent_hashes
822 .can_fork_chain_at(hash)
823 || &self.read_service.db.finalized_tip_hash() == hash
824 }
825
826 /// Returns `true` if `queued_height` is near the final checkpoint.
827 ///
828 /// The semantic block verifier needs access to UTXOs from checkpoint verified blocks
829 /// near the final checkpoint, so that it can verify blocks that spend those UTXOs.
830 ///
831 /// If it doesn't have the required UTXOs, some blocks will time out,
832 /// but succeed after a syncer restart.
833 fn is_close_to_final_checkpoint(&self, queued_height: block::Height) -> bool {
834 queued_height >= self.full_verifier_utxo_lookahead
835 }
836
837 /// Sends all queued blocks whose parents have recently arrived starting from `new_parent`
838 /// in breadth-first ordering to the block write task which will attempt to validate and commit them
839 #[tracing::instrument(level = "debug", skip(self, new_parent))]
840 fn send_ready_non_finalized_queued(&mut self, new_parent: block::Hash) {
841 use tokio::sync::mpsc::error::SendError;
842 if let Some(non_finalized_block_write_sender) = &self.block_write_sender.non_finalized {
843 let mut new_parents: Vec<block::Hash> = vec![new_parent];
844
845 while let Some(parent_hash) = new_parents.pop() {
846 let queued_children = self
847 .non_finalized_state_queued_blocks
848 .dequeue_children(parent_hash);
849
850 for queued_child in queued_children {
851 let (SemanticallyVerifiedBlock { hash, .. }, _) = queued_child;
852
853 self.non_finalized_block_write_sent_hashes
854 .add(&queued_child.0);
855 let send_result = non_finalized_block_write_sender.send(queued_child.into());
856
857 if let Err(SendError(NonFinalizedWriteMessage::Commit(queued))) = send_result {
858 // If Zebra is shutting down, drop blocks and return an error.
859 Self::send_semantically_verified_block_error(
860 queued,
861 CommitBlockError::WriteTaskExited,
862 );
863
864 self.clear_non_finalized_block_queue(CommitBlockError::WriteTaskExited);
865
866 return;
867 };
868
869 new_parents.push(hash);
870 }
871 }
872
873 self.non_finalized_block_write_sent_hashes.finish_batch();
874 };
875 }
876
877 /// Return the tip of the current best chain.
878 pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
879 self.read_service.best_tip()
880 }
881
882 fn send_invalidate_block(
883 &self,
884 hash: block::Hash,
885 ) -> oneshot::Receiver<Result<block::Hash, InvalidateError>> {
886 let (rsp_tx, rsp_rx) = oneshot::channel();
887
888 let Some(sender) = &self.block_write_sender.non_finalized else {
889 let _ = rsp_tx.send(Err(InvalidateError::ProcessingCheckpointedBlocks));
890 return rsp_rx;
891 };
892
893 if let Err(tokio::sync::mpsc::error::SendError(error)) =
894 sender.send(NonFinalizedWriteMessage::Invalidate { hash, rsp_tx })
895 {
896 let NonFinalizedWriteMessage::Invalidate { rsp_tx, .. } = error else {
897 unreachable!("should return the same Invalidate message could not be sent");
898 };
899
900 let _ = rsp_tx.send(Err(InvalidateError::SendInvalidateRequestFailed));
901 }
902
903 rsp_rx
904 }
905
906 fn send_reconsider_block(
907 &self,
908 hash: block::Hash,
909 ) -> oneshot::Receiver<Result<Vec<block::Hash>, ReconsiderError>> {
910 let (rsp_tx, rsp_rx) = oneshot::channel();
911
912 let Some(sender) = &self.block_write_sender.non_finalized else {
913 let _ = rsp_tx.send(Err(ReconsiderError::CheckpointCommitInProgress));
914 return rsp_rx;
915 };
916
917 if let Err(tokio::sync::mpsc::error::SendError(error)) =
918 sender.send(NonFinalizedWriteMessage::Reconsider { hash, rsp_tx })
919 {
920 let NonFinalizedWriteMessage::Reconsider { rsp_tx, .. } = error else {
921 unreachable!("should return the same Reconsider message could not be sent");
922 };
923
924 let _ = rsp_tx.send(Err(ReconsiderError::ReconsiderSendFailed));
925 }
926
927 rsp_rx
928 }
929
930 /// Assert some assumptions about the semantically verified `block` before it is queued.
931 fn assert_block_can_be_validated(&self, block: &SemanticallyVerifiedBlock) {
932 // required by `Request::CommitSemanticallyVerifiedBlock` call
933 assert!(
934 block.height > self.network.mandatory_checkpoint_height(),
935 "invalid semantically verified block height: the canopy checkpoint is mandatory, pre-canopy \
936 blocks, and the canopy activation block, must be committed to the state as finalized \
937 blocks"
938 );
939 }
940
941 fn known_sent_hash(&self, hash: &block::Hash) -> Option<KnownBlock> {
942 self.non_finalized_block_write_sent_hashes
943 .contains(hash)
944 .then_some(KnownBlock::WriteChannel)
945 }
946}
947
948impl ReadStateService {
949 /// Creates a new read-only state service, using the provided finalized state and
950 /// block write task handle.
951 ///
952 /// Returns the newly created service,
953 /// and a watch channel for updating the shared recent non-finalized chain.
954 pub(crate) fn new(
955 finalized_state: &FinalizedState,
956 block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
957 non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
958 ) -> Self {
959 let read_service = Self {
960 network: finalized_state.network(),
961 db: finalized_state.db.clone(),
962 non_finalized_state_receiver,
963 block_write_task,
964 };
965
966 tracing::debug!("created new read-only state service");
967
968 read_service
969 }
970
971 /// Return the tip of the current best chain.
972 pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
973 read::best_tip(&self.latest_non_finalized_state(), &self.db)
974 }
975
976 /// Gets a clone of the latest non-finalized state from the `non_finalized_state_receiver`
977 fn latest_non_finalized_state(&self) -> NonFinalizedState {
978 self.non_finalized_state_receiver.cloned_watch_data()
979 }
980
981 /// Gets a clone of the latest, best non-finalized chain from the `non_finalized_state_receiver`
982 fn latest_best_chain(&self) -> Option<Arc<Chain>> {
983 self.non_finalized_state_receiver
984 .borrow_mapped(|non_finalized_state| non_finalized_state.best_chain().cloned())
985 }
986
987 /// Test-only access to the inner database.
988 /// Can be used to modify the database without doing any consensus checks.
989 #[cfg(any(test, feature = "proptest-impl"))]
990 pub fn db(&self) -> &ZebraDb {
991 &self.db
992 }
993
994 /// Logs rocksdb metrics using the read only state service.
995 pub fn log_db_metrics(&self) {
996 self.db.print_db_metrics();
997 }
998}
999
1000impl Service<Request> for StateService {
1001 type Response = Response;
1002 type Error = BoxError;
1003 type Future =
1004 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1005
1006 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1007 // Check for panics in the block write task
1008 let poll = self.read_service.poll_ready(cx);
1009
1010 // Prune outdated UTXO requests
1011 let now = Instant::now();
1012
1013 if self.last_prune + Self::PRUNE_INTERVAL < now {
1014 let tip = self.best_tip();
1015 let old_len = self.pending_utxos.len();
1016
1017 self.pending_utxos.prune();
1018 self.last_prune = now;
1019
1020 let new_len = self.pending_utxos.len();
1021 let prune_count = old_len
1022 .checked_sub(new_len)
1023 .expect("prune does not add any utxo requests");
1024 if prune_count > 0 {
1025 tracing::debug!(
1026 ?old_len,
1027 ?new_len,
1028 ?prune_count,
1029 ?tip,
1030 "pruned utxo requests"
1031 );
1032 } else {
1033 tracing::debug!(len = ?old_len, ?tip, "no utxo requests needed pruning");
1034 }
1035 }
1036
1037 poll
1038 }
1039
1040 #[instrument(name = "state", skip(self, req))]
1041 fn call(&mut self, req: Request) -> Self::Future {
1042 req.count_metric();
1043 let span = Span::current();
1044
1045 match req {
1046 // Uses non_finalized_state_queued_blocks and pending_utxos in the StateService
1047 // Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb.
1048 //
1049 // The expected error type for this request is `CommitSemanticallyVerifiedError`.
1050 Request::CommitSemanticallyVerifiedBlock(semantically_verified) => {
1051 let timer = CodeTimer::start();
1052 self.assert_block_can_be_validated(&semantically_verified);
1053
1054 self.pending_utxos
1055 .check_against_ordered(&semantically_verified.new_outputs);
1056
1057 // # Performance
1058 //
1059 // Allow other async tasks to make progress while blocks are being verified
1060 // and written to disk. But wait for the blocks to finish committing,
1061 // so that `StateService` multi-block queries always observe a consistent state.
1062 //
1063 // Since each block is spawned into its own task,
1064 // there shouldn't be any other code running in the same task,
1065 // so we don't need to worry about blocking it:
1066 // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
1067
1068 let rsp_rx = tokio::task::block_in_place(move || {
1069 span.in_scope(|| {
1070 self.queue_and_commit_to_non_finalized_state(semantically_verified)
1071 })
1072 });
1073
1074 // TODO:
1075 // - check for panics in the block write task here,
1076 // as well as in poll_ready()
1077
1078 // The work is all done, the future just waits on a channel for the result
1079 timer.finish_desc("CommitSemanticallyVerifiedBlock");
1080
1081 // Await the channel response, flatten the result, map receive errors to
1082 // `CommitSemanticallyVerifiedError::WriteTaskExited`.
1083 // Then flatten the nested Result and convert any errors to a BoxError.
1084 let span = Span::current();
1085 async move {
1086 rsp_rx
1087 .await
1088 .map_err(|_recv_error| CommitBlockError::WriteTaskExited.into())
1089 .and_then(|result| result)
1090 .map_err(BoxError::from)
1091 .map(Response::Committed)
1092 }
1093 .instrument(span)
1094 .boxed()
1095 }
1096
1097 // Uses finalized_state_queued_blocks and pending_utxos in the StateService.
1098 // Accesses shared writeable state in the StateService.
1099 //
1100 // The expected error type for this request is `CommitCheckpointVerifiedError`.
1101 Request::CommitCheckpointVerifiedBlock(finalized) => {
1102 let timer = CodeTimer::start();
1103 // # Consensus
1104 //
1105 // A semantic block verification could have called AwaitUtxo
1106 // before this checkpoint verified block arrived in the state.
1107 // So we need to check for pending UTXO requests sent by running
1108 // semantic block verifications.
1109 //
1110 // This check is redundant for most checkpoint verified blocks,
1111 // because semantic verification can only succeed near the final
1112 // checkpoint, when all the UTXOs are available for the verifying block.
1113 //
1114 // (Checkpoint block UTXOs are verified using block hash checkpoints
1115 // and transaction merkle tree block header commitments.)
1116 self.pending_utxos
1117 .check_against_ordered(&finalized.new_outputs);
1118
1119 // # Performance
1120 //
1121 // This method doesn't block, access the database, or perform CPU-intensive tasks,
1122 // so we can run it directly in the tokio executor's Future threads.
1123 let rsp_rx = self.queue_and_commit_to_finalized_state(finalized);
1124
1125 // TODO:
1126 // - check for panics in the block write task here,
1127 // as well as in poll_ready()
1128
1129 // The work is all done, the future just waits on a channel for the result
1130 timer.finish_desc("CommitCheckpointVerifiedBlock");
1131
1132 // Await the channel response, flatten the result, map receive errors to
1133 // `CommitCheckpointVerifiedError::WriteTaskExited`.
1134 // Then flatten the nested Result and convert any errors to a BoxError.
1135 async move {
1136 rsp_rx
1137 .await
1138 .map_err(|_recv_error| CommitBlockError::WriteTaskExited.into())
1139 .and_then(|result| result)
1140 .map_err(BoxError::from)
1141 .map(Response::Committed)
1142 }
1143 .instrument(span)
1144 .boxed()
1145 }
1146
1147 // Uses pending_utxos and non_finalized_state_queued_blocks in the StateService.
1148 // If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService.
1149 Request::AwaitUtxo(outpoint) => {
1150 let timer = CodeTimer::start();
1151 // Prepare the AwaitUtxo future from PendingUxtos.
1152 let response_fut = self.pending_utxos.queue(outpoint);
1153 // Only instrument `response_fut`, the ReadStateService already
1154 // instruments its requests with the same span.
1155
1156 let response_fut = response_fut.instrument(span).boxed();
1157
1158 // Check the non-finalized block queue outside the returned future,
1159 // so we can access mutable state fields.
1160 if let Some(utxo) = self.non_finalized_state_queued_blocks.utxo(&outpoint) {
1161 self.pending_utxos.respond(&outpoint, utxo);
1162
1163 // We're finished, the returned future gets the UTXO from the respond() channel.
1164 timer.finish_desc("AwaitUtxo/queued-non-finalized");
1165
1166 return response_fut;
1167 }
1168
1169 // Check the sent non-finalized blocks
1170 if let Some(utxo) = self.non_finalized_block_write_sent_hashes.utxo(&outpoint) {
1171 self.pending_utxos.respond(&outpoint, utxo);
1172
1173 // We're finished, the returned future gets the UTXO from the respond() channel.
1174 timer.finish_desc("AwaitUtxo/sent-non-finalized");
1175
1176 return response_fut;
1177 }
1178
1179 // We ignore any UTXOs in FinalizedState.finalized_state_queued_blocks,
1180 // because it is only used during checkpoint verification.
1181 //
1182 // This creates a rare race condition, but it doesn't seem to happen much in practice.
1183 // See #5126 for details.
1184
1185 // Manually send a request to the ReadStateService,
1186 // to get UTXOs from any non-finalized chain or the finalized chain.
1187 let read_service = self.read_service.clone();
1188
1189 // Run the request in an async block, so we can await the response.
1190 async move {
1191 let req = ReadRequest::AnyChainUtxo(outpoint);
1192
1193 let rsp = read_service.oneshot(req).await?;
1194
1195 // Optional TODO:
1196 // - make pending_utxos.respond() async using a channel,
1197 // so we can respond to all waiting requests here
1198 //
1199 // This change is not required for correctness, because:
1200 // - any waiting requests should have returned when the block was sent to the state
1201 // - otherwise, the request returns immediately if:
1202 // - the block is in the non-finalized queue, or
1203 // - the block is in any non-finalized chain or the finalized state
1204 //
1205 // And if the block is in the finalized queue,
1206 // that's rare enough that a retry is ok.
1207 if let ReadResponse::AnyChainUtxo(Some(utxo)) = rsp {
1208 // We got a UTXO, so we replace the response future with the result own.
1209 timer.finish_desc("AwaitUtxo/any-chain");
1210
1211 return Ok(Response::Utxo(utxo));
1212 }
1213
1214 // We're finished, but the returned future is waiting on the respond() channel.
1215 timer.finish_desc("AwaitUtxo/waiting");
1216
1217 response_fut.await
1218 }
1219 .boxed()
1220 }
1221
1222 // Used by sync, inbound, and block verifier to check if a block is already in the state
1223 // before downloading or validating it.
1224 Request::KnownBlock(hash) => {
1225 let timer = CodeTimer::start();
1226 let sent_hash_response = self.known_sent_hash(&hash);
1227 let read_service = self.read_service.clone();
1228
1229 async move {
1230 if sent_hash_response.is_some() {
1231 return Ok(Response::KnownBlock(sent_hash_response));
1232 };
1233
1234 let response = read::non_finalized_state_contains_block_hash(
1235 &read_service.latest_non_finalized_state(),
1236 hash,
1237 )
1238 // TODO: Move this to a blocking task, perhaps by moving some of this logic to the ReadStateService.
1239 .or_else(|| read::finalized_state_contains_block_hash(&read_service.db, hash));
1240
1241 timer.finish_desc("Request::KnownBlock");
1242
1243 Ok(Response::KnownBlock(response))
1244 }
1245 .boxed()
1246 }
1247
1248 // The expected error type for this request is `InvalidateError`
1249 Request::InvalidateBlock(block_hash) => {
1250 let rsp_rx = tokio::task::block_in_place(move || {
1251 span.in_scope(|| self.send_invalidate_block(block_hash))
1252 });
1253
1254 // Await the channel response, flatten the result, map receive errors to
1255 // `InvalidateError::InvalidateRequestDropped`.
1256 // Then flatten the nested Result and convert any errors to a BoxError.
1257 let span = Span::current();
1258 async move {
1259 rsp_rx
1260 .await
1261 .map_err(|_recv_error| InvalidateError::InvalidateRequestDropped)
1262 .and_then(|result| result)
1263 .map_err(BoxError::from)
1264 .map(Response::Invalidated)
1265 }
1266 .instrument(span)
1267 .boxed()
1268 }
1269
1270 // The expected error type for this request is `ReconsiderError`
1271 Request::ReconsiderBlock(block_hash) => {
1272 let rsp_rx = tokio::task::block_in_place(move || {
1273 span.in_scope(|| self.send_reconsider_block(block_hash))
1274 });
1275
1276 // Await the channel response, flatten the result, map receive errors to
1277 // `ReconsiderError::ReconsiderResponseDropped`.
1278 // Then flatten the nested Result and convert any errors to a BoxError.
1279 let span = Span::current();
1280 async move {
1281 rsp_rx
1282 .await
1283 .map_err(|_recv_error| ReconsiderError::ReconsiderResponseDropped)
1284 .and_then(|result| result)
1285 .map_err(BoxError::from)
1286 .map(Response::Reconsidered)
1287 }
1288 .instrument(span)
1289 .boxed()
1290 }
1291
1292 // Runs concurrently using the ReadStateService
1293 Request::Tip
1294 | Request::Depth(_)
1295 | Request::BestChainNextMedianTimePast
1296 | Request::BestChainBlockHash(_)
1297 | Request::BlockLocator
1298 | Request::Transaction(_)
1299 | Request::AnyChainTransaction(_)
1300 | Request::UnspentBestChainUtxo(_)
1301 | Request::Block(_)
1302 | Request::AnyChainBlock(_)
1303 | Request::BlockAndSize(_)
1304 | Request::BlockHeader(_)
1305 | Request::FindBlockHashes { .. }
1306 | Request::FindBlockHeaders { .. }
1307 | Request::CheckBestChainTipNullifiersAndAnchors(_)
1308 | Request::CheckBlockProposalValidity(_) => {
1309 // Redirect the request to the concurrent ReadStateService
1310 let read_service = self.read_service.clone();
1311
1312 async move {
1313 let req = req
1314 .try_into()
1315 .expect("ReadRequest conversion should not fail");
1316
1317 let rsp = read_service.oneshot(req).await?;
1318 let rsp = rsp.try_into().expect("Response conversion should not fail");
1319
1320 Ok(rsp)
1321 }
1322 .boxed()
1323 }
1324 }
1325 }
1326}
1327
1328impl Service<ReadRequest> for ReadStateService {
1329 type Response = ReadResponse;
1330 type Error = BoxError;
1331 type Future =
1332 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1333
1334 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1335 // Check for panics in the block write task
1336 //
1337 // TODO: move into a check_for_panics() method
1338 let block_write_task = self.block_write_task.take();
1339
1340 if let Some(block_write_task) = block_write_task {
1341 if block_write_task.is_finished() {
1342 if let Some(block_write_task) = Arc::into_inner(block_write_task) {
1343 // We are the last state with a reference to this task, so we can propagate any panics
1344 if let Err(thread_panic) = block_write_task.join() {
1345 std::panic::resume_unwind(thread_panic);
1346 }
1347 }
1348 } else {
1349 // It hasn't finished, so we need to put it back
1350 self.block_write_task = Some(block_write_task);
1351 }
1352 }
1353
1354 self.db.check_for_panics();
1355
1356 Poll::Ready(Ok(()))
1357 }
1358
1359 #[instrument(name = "read_state", skip(self, req))]
1360 fn call(&mut self, req: ReadRequest) -> Self::Future {
1361 req.count_metric();
1362 let timer = CodeTimer::start_desc(req.variant_name());
1363 let span = Span::current();
1364 let timed_span = TimedSpan::new(timer, span);
1365 let state = self.clone();
1366
1367 if req == ReadRequest::NonFinalizedBlocksListener {
1368 // The non-finalized blocks listener is used to notify the state service
1369 // about new blocks that have been added to the non-finalized state.
1370 let non_finalized_blocks_listener = NonFinalizedBlocksListener::spawn(
1371 self.network.clone(),
1372 self.non_finalized_state_receiver.clone(),
1373 );
1374
1375 return async move {
1376 Ok(ReadResponse::NonFinalizedBlocksListener(
1377 non_finalized_blocks_listener,
1378 ))
1379 }
1380 .boxed();
1381 };
1382
1383 let request_handler = move || match req {
1384 // Used by the `getblockchaininfo` RPC.
1385 ReadRequest::UsageInfo => Ok(ReadResponse::UsageInfo(state.db.size())),
1386
1387 // Used by the StateService.
1388 ReadRequest::Tip => Ok(ReadResponse::Tip(read::tip(
1389 state.latest_best_chain(),
1390 &state.db,
1391 ))),
1392
1393 // Used by `getblockchaininfo` RPC method.
1394 ReadRequest::TipPoolValues => {
1395 let (tip_height, tip_hash, value_balance) =
1396 read::tip_with_value_balance(state.latest_best_chain(), &state.db)?
1397 .ok_or(BoxError::from("no chain tip available yet"))?;
1398
1399 Ok(ReadResponse::TipPoolValues {
1400 tip_height,
1401 tip_hash,
1402 value_balance,
1403 })
1404 }
1405
1406 // Used by getblock
1407 ReadRequest::BlockInfo(hash_or_height) => Ok(ReadResponse::BlockInfo(
1408 read::block_info(state.latest_best_chain(), &state.db, hash_or_height),
1409 )),
1410
1411 // Used by the StateService.
1412 ReadRequest::Depth(hash) => Ok(ReadResponse::Depth(read::depth(
1413 state.latest_best_chain(),
1414 &state.db,
1415 hash,
1416 ))),
1417
1418 // Used by the StateService.
1419 ReadRequest::BestChainNextMedianTimePast => {
1420 Ok(ReadResponse::BestChainNextMedianTimePast(
1421 read::next_median_time_past(&state.latest_non_finalized_state(), &state.db)?,
1422 ))
1423 }
1424
1425 // Used by the get_block (raw) RPC and the StateService.
1426 ReadRequest::Block(hash_or_height) => Ok(ReadResponse::Block(read::block(
1427 state.latest_best_chain(),
1428 &state.db,
1429 hash_or_height,
1430 ))),
1431
1432 ReadRequest::AnyChainBlock(hash_or_height) => Ok(ReadResponse::Block(read::any_block(
1433 state.latest_non_finalized_state().chain_iter(),
1434 &state.db,
1435 hash_or_height,
1436 ))),
1437
1438 // Used by the get_block (raw) RPC and the StateService.
1439 ReadRequest::BlockAndSize(hash_or_height) => Ok(ReadResponse::BlockAndSize(
1440 read::block_and_size(state.latest_best_chain(), &state.db, hash_or_height),
1441 )),
1442
1443 // Used by the get_block (verbose) RPC and the StateService.
1444 ReadRequest::BlockHeader(hash_or_height) => {
1445 let best_chain = state.latest_best_chain();
1446
1447 let height = hash_or_height
1448 .height_or_else(|hash| {
1449 read::find::height_by_hash(best_chain.clone(), &state.db, hash)
1450 })
1451 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1452
1453 let hash = hash_or_height
1454 .hash_or_else(|height| {
1455 read::find::hash_by_height(best_chain.clone(), &state.db, height)
1456 })
1457 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1458
1459 let next_height = height.next()?;
1460 let next_block_hash =
1461 read::find::hash_by_height(best_chain.clone(), &state.db, next_height);
1462
1463 let header = read::block_header(best_chain, &state.db, height.into())
1464 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1465
1466 Ok(ReadResponse::BlockHeader {
1467 header,
1468 hash,
1469 height,
1470 next_block_hash,
1471 })
1472 }
1473
1474 // For the get_raw_transaction RPC and the StateService.
1475 ReadRequest::Transaction(hash) => Ok(ReadResponse::Transaction(
1476 read::mined_transaction(state.latest_best_chain(), &state.db, hash),
1477 )),
1478
1479 ReadRequest::AnyChainTransaction(hash) => {
1480 Ok(ReadResponse::AnyChainTransaction(read::any_transaction(
1481 state.latest_non_finalized_state().chain_iter(),
1482 &state.db,
1483 hash,
1484 )))
1485 }
1486
1487 // Used by the getblock (verbose) RPC.
1488 ReadRequest::TransactionIdsForBlock(hash_or_height) => Ok(
1489 ReadResponse::TransactionIdsForBlock(read::transaction_hashes_for_block(
1490 state.latest_best_chain(),
1491 &state.db,
1492 hash_or_height,
1493 )),
1494 ),
1495
1496 ReadRequest::AnyChainTransactionIdsForBlock(hash_or_height) => {
1497 Ok(ReadResponse::AnyChainTransactionIdsForBlock(
1498 read::transaction_hashes_for_any_block(
1499 state.latest_non_finalized_state().chain_iter(),
1500 &state.db,
1501 hash_or_height,
1502 ),
1503 ))
1504 }
1505
1506 #[cfg(feature = "indexer")]
1507 ReadRequest::SpendingTransactionId(spend) => Ok(ReadResponse::TransactionId(
1508 read::spending_transaction_hash(state.latest_best_chain(), &state.db, spend),
1509 )),
1510
1511 ReadRequest::UnspentBestChainUtxo(outpoint) => Ok(ReadResponse::UnspentBestChainUtxo(
1512 read::unspent_utxo(state.latest_best_chain(), &state.db, outpoint),
1513 )),
1514
1515 // Manually used by the StateService to implement part of AwaitUtxo.
1516 ReadRequest::AnyChainUtxo(outpoint) => Ok(ReadResponse::AnyChainUtxo(read::any_utxo(
1517 state.latest_non_finalized_state(),
1518 &state.db,
1519 outpoint,
1520 ))),
1521
1522 // Used by the StateService.
1523 ReadRequest::BlockLocator => Ok(ReadResponse::BlockLocator(
1524 read::block_locator(state.latest_best_chain(), &state.db).unwrap_or_default(),
1525 )),
1526
1527 // Used by the StateService.
1528 ReadRequest::FindBlockHashes { known_blocks, stop } => {
1529 Ok(ReadResponse::BlockHashes(read::find_chain_hashes(
1530 state.latest_best_chain(),
1531 &state.db,
1532 known_blocks,
1533 stop,
1534 MAX_FIND_BLOCK_HASHES_RESULTS,
1535 )))
1536 }
1537
1538 // Used by the StateService.
1539 ReadRequest::FindBlockHeaders { known_blocks, stop } => Ok(ReadResponse::BlockHeaders(
1540 read::find_chain_headers(
1541 state.latest_best_chain(),
1542 &state.db,
1543 known_blocks,
1544 stop,
1545 MAX_FIND_BLOCK_HEADERS_RESULTS,
1546 )
1547 .into_iter()
1548 .map(|header| CountedHeader { header })
1549 .collect(),
1550 )),
1551
1552 ReadRequest::SaplingTree(hash_or_height) => Ok(ReadResponse::SaplingTree(
1553 read::sapling_tree(state.latest_best_chain(), &state.db, hash_or_height),
1554 )),
1555
1556 ReadRequest::OrchardTree(hash_or_height) => Ok(ReadResponse::OrchardTree(
1557 read::orchard_tree(state.latest_best_chain(), &state.db, hash_or_height),
1558 )),
1559
1560 ReadRequest::SaplingSubtrees { start_index, limit } => {
1561 let end_index = limit
1562 .and_then(|limit| start_index.0.checked_add(limit.0))
1563 .map(NoteCommitmentSubtreeIndex);
1564
1565 let best_chain = state.latest_best_chain();
1566 let sapling_subtrees = if let Some(end_index) = end_index {
1567 read::sapling_subtrees(best_chain, &state.db, start_index..end_index)
1568 } else {
1569 // If there is no end bound, just return all the trees.
1570 // If the end bound would overflow, just returns all the trees, because that's what
1571 // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1572 // the trees run out.)
1573 read::sapling_subtrees(best_chain, &state.db, start_index..)
1574 };
1575
1576 Ok(ReadResponse::SaplingSubtrees(sapling_subtrees))
1577 }
1578
1579 ReadRequest::OrchardSubtrees { start_index, limit } => {
1580 let end_index = limit
1581 .and_then(|limit| start_index.0.checked_add(limit.0))
1582 .map(NoteCommitmentSubtreeIndex);
1583
1584 let best_chain = state.latest_best_chain();
1585 let orchard_subtrees = if let Some(end_index) = end_index {
1586 read::orchard_subtrees(best_chain, &state.db, start_index..end_index)
1587 } else {
1588 // If there is no end bound, just return all the trees.
1589 // If the end bound would overflow, just returns all the trees, because that's what
1590 // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1591 // the trees run out.)
1592 read::orchard_subtrees(best_chain, &state.db, start_index..)
1593 };
1594
1595 Ok(ReadResponse::OrchardSubtrees(orchard_subtrees))
1596 }
1597
1598 // For the get_address_balance RPC.
1599 ReadRequest::AddressBalance(addresses) => {
1600 let (balance, received) =
1601 read::transparent_balance(state.latest_best_chain(), &state.db, addresses)?;
1602 Ok(ReadResponse::AddressBalance { balance, received })
1603 }
1604
1605 // For the get_address_tx_ids RPC.
1606 ReadRequest::TransactionIdsByAddresses {
1607 addresses,
1608 height_range,
1609 } => read::transparent_tx_ids(
1610 state.latest_best_chain(),
1611 &state.db,
1612 addresses,
1613 height_range,
1614 )
1615 .map(ReadResponse::AddressesTransactionIds),
1616
1617 // For the get_address_utxos RPC.
1618 ReadRequest::UtxosByAddresses(addresses) => read::address_utxos(
1619 &state.network,
1620 state.latest_best_chain(),
1621 &state.db,
1622 addresses,
1623 )
1624 .map(ReadResponse::AddressUtxos),
1625
1626 ReadRequest::CheckBestChainTipNullifiersAndAnchors(unmined_tx) => {
1627 let latest_non_finalized_best_chain = state.latest_best_chain();
1628
1629 check::nullifier::tx_no_duplicates_in_chain(
1630 &state.db,
1631 latest_non_finalized_best_chain.as_ref(),
1632 &unmined_tx.transaction,
1633 )?;
1634
1635 check::anchors::tx_anchors_refer_to_final_treestates(
1636 &state.db,
1637 latest_non_finalized_best_chain.as_ref(),
1638 &unmined_tx,
1639 )?;
1640
1641 Ok(ReadResponse::ValidBestChainTipNullifiersAndAnchors)
1642 }
1643
1644 // Used by the get_block and get_block_hash RPCs.
1645 ReadRequest::BestChainBlockHash(height) => Ok(ReadResponse::BlockHash(
1646 read::hash_by_height(state.latest_best_chain(), &state.db, height),
1647 )),
1648
1649 // Used by get_block_template and getblockchaininfo RPCs.
1650 ReadRequest::ChainInfo => {
1651 // # Correctness
1652 //
1653 // It is ok to do these lookups using multiple database calls. Finalized state updates
1654 // can only add overlapping blocks, and block hashes are unique across all chain forks.
1655 //
1656 // If there is a large overlap between the non-finalized and finalized states,
1657 // where the finalized tip is above the non-finalized tip,
1658 // Zebra is receiving a lot of blocks, or this request has been delayed for a long time.
1659 //
1660 // In that case, the `getblocktemplate` RPC will return an error because Zebra
1661 // is not synced to the tip. That check happens before the RPC makes this request.
1662 read::difficulty::get_block_template_chain_info(
1663 &state.latest_non_finalized_state(),
1664 &state.db,
1665 &state.network,
1666 )
1667 .map(ReadResponse::ChainInfo)
1668 }
1669
1670 // Used by getmininginfo, getnetworksolps, and getnetworkhashps RPCs.
1671 ReadRequest::SolutionRate { num_blocks, height } => {
1672 let latest_non_finalized_state = state.latest_non_finalized_state();
1673 // # Correctness
1674 //
1675 // It is ok to do these lookups using multiple database calls. Finalized state updates
1676 // can only add overlapping blocks, and block hashes are unique across all chain forks.
1677 //
1678 // The worst that can happen here is that the default `start_hash` will be below
1679 // the chain tip.
1680 let (tip_height, tip_hash) =
1681 match read::tip(latest_non_finalized_state.best_chain(), &state.db) {
1682 Some(tip_hash) => tip_hash,
1683 None => return Ok(ReadResponse::SolutionRate(None)),
1684 };
1685
1686 let start_hash = match height {
1687 Some(height) if height < tip_height => read::hash_by_height(
1688 latest_non_finalized_state.best_chain(),
1689 &state.db,
1690 height,
1691 ),
1692 // use the chain tip hash if height is above it or not provided.
1693 _ => Some(tip_hash),
1694 };
1695
1696 let solution_rate = start_hash.and_then(|start_hash| {
1697 read::difficulty::solution_rate(
1698 &latest_non_finalized_state,
1699 &state.db,
1700 num_blocks,
1701 start_hash,
1702 )
1703 });
1704
1705 Ok(ReadResponse::SolutionRate(solution_rate))
1706 }
1707
1708 ReadRequest::CheckBlockProposalValidity(semantically_verified) => {
1709 tracing::debug!(
1710 "attempting to validate and commit block proposal \
1711 onto a cloned non-finalized state"
1712 );
1713 let mut latest_non_finalized_state = state.latest_non_finalized_state();
1714
1715 // The previous block of a valid proposal must be on the best chain tip.
1716 let Some((_best_tip_height, best_tip_hash)) =
1717 read::best_tip(&latest_non_finalized_state, &state.db)
1718 else {
1719 return Err(
1720 "state is empty: wait for Zebra to sync before submitting a proposal"
1721 .into(),
1722 );
1723 };
1724
1725 if semantically_verified.block.header.previous_block_hash != best_tip_hash {
1726 return Err("proposal is not based on the current best chain tip: \
1727 previous block hash must be the best chain tip"
1728 .into());
1729 }
1730
1731 // This clone of the non-finalized state is dropped when this closure returns.
1732 // The non-finalized state that's used in the rest of the state (including finalizing
1733 // blocks into the db) is not mutated here.
1734 //
1735 // TODO: Convert `CommitSemanticallyVerifiedError` to a new `ValidateProposalError`?
1736 latest_non_finalized_state.disable_metrics();
1737
1738 write::validate_and_commit_non_finalized(
1739 &state.db,
1740 &mut latest_non_finalized_state,
1741 semantically_verified,
1742 )?;
1743
1744 Ok(ReadResponse::ValidBlockProposal)
1745 }
1746
1747 ReadRequest::TipBlockSize => {
1748 // Respond with the length of the obtained block if any.
1749 Ok(ReadResponse::TipBlockSize(
1750 state
1751 .best_tip()
1752 .and_then(|(tip_height, _)| {
1753 read::block_info(
1754 state.latest_best_chain(),
1755 &state.db,
1756 tip_height.into(),
1757 )
1758 })
1759 .map(|info| info.size().try_into().expect("u32 should fit in usize"))
1760 .or_else(|| {
1761 find::tip_block(state.latest_best_chain(), &state.db)
1762 .map(|b| b.zcash_serialized_size())
1763 }),
1764 ))
1765 }
1766
1767 ReadRequest::NonFinalizedBlocksListener => {
1768 unreachable!("should return early");
1769 }
1770
1771 // Used by `gettxout` RPC method.
1772 ReadRequest::IsTransparentOutputSpent(outpoint) => {
1773 let is_spent = read::unspent_utxo(state.latest_best_chain(), &state.db, outpoint);
1774 Ok(ReadResponse::IsTransparentOutputSpent(is_spent.is_none()))
1775 }
1776 };
1777
1778 timed_span.spawn_blocking(request_handler)
1779 }
1780}
1781
1782/// Initialize a state service from the provided [`Config`].
1783/// Returns a boxed state service, a read-only state service,
1784/// and receivers for state chain tip updates.
1785///
1786/// Each `network` has its own separate on-disk database.
1787///
1788/// The state uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
1789/// to work out when it is near the final checkpoint.
1790///
1791/// To share access to the state, wrap the returned service in a `Buffer`,
1792/// or clone the returned [`ReadStateService`].
1793///
1794/// It's possible to construct multiple state services in the same application (as
1795/// long as they, e.g., use different storage locations), but doing so is
1796/// probably not what you want.
1797pub async fn init(
1798 config: Config,
1799 network: &Network,
1800 max_checkpoint_height: block::Height,
1801 checkpoint_verify_concurrency_limit: usize,
1802) -> (
1803 BoxService<Request, Response, BoxError>,
1804 ReadStateService,
1805 LatestChainTip,
1806 ChainTipChange,
1807) {
1808 let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
1809 StateService::new(
1810 config,
1811 network,
1812 max_checkpoint_height,
1813 checkpoint_verify_concurrency_limit,
1814 )
1815 .await;
1816
1817 (
1818 BoxService::new(state_service),
1819 read_only_state_service,
1820 latest_chain_tip,
1821 chain_tip_change,
1822 )
1823}
1824
1825/// Initialize a read state service from the provided [`Config`].
1826/// Returns a read-only state service,
1827///
1828/// Each `network` has its own separate on-disk database.
1829///
1830/// To share access to the state, clone the returned [`ReadStateService`].
1831pub fn init_read_only(
1832 config: Config,
1833 network: &Network,
1834) -> (
1835 ReadStateService,
1836 ZebraDb,
1837 tokio::sync::watch::Sender<NonFinalizedState>,
1838) {
1839 let finalized_state = FinalizedState::new_with_debug(
1840 &config,
1841 network,
1842 true,
1843 #[cfg(feature = "elasticsearch")]
1844 false,
1845 true,
1846 );
1847 let (non_finalized_state_sender, non_finalized_state_receiver) =
1848 tokio::sync::watch::channel(NonFinalizedState::new(network));
1849
1850 (
1851 ReadStateService::new(
1852 &finalized_state,
1853 None,
1854 WatchReceiver::new(non_finalized_state_receiver),
1855 ),
1856 finalized_state.db.clone(),
1857 non_finalized_state_sender,
1858 )
1859}
1860
1861/// Calls [`init_read_only`] with the provided [`Config`] and [`Network`] from a blocking task.
1862/// Returns a [`tokio::task::JoinHandle`] with a read state service and chain tip sender.
1863pub fn spawn_init_read_only(
1864 config: Config,
1865 network: &Network,
1866) -> tokio::task::JoinHandle<(
1867 ReadStateService,
1868 ZebraDb,
1869 tokio::sync::watch::Sender<NonFinalizedState>,
1870)> {
1871 let network = network.clone();
1872 tokio::task::spawn_blocking(move || init_read_only(config, &network))
1873}
1874
1875/// Returns a [`StateService`] with an ephemeral [`Config`] and a buffer with a single slot.
1876///
1877/// This can be used to create a state service for testing. See also [`init`].
1878#[cfg(any(test, feature = "proptest-impl"))]
1879pub async fn init_test(
1880 network: &Network,
1881) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
1882 // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
1883 // if we ever need to test final checkpoint sent UTXO queries
1884 let (state_service, _, _, _) =
1885 StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
1886
1887 Buffer::new(BoxService::new(state_service), 1)
1888}
1889
1890/// Initializes a state service with an ephemeral [`Config`] and a buffer with a single slot,
1891/// then returns the read-write service, read-only service, and tip watch channels.
1892///
1893/// This can be used to create a state service for testing. See also [`init`].
1894#[cfg(any(test, feature = "proptest-impl"))]
1895pub async fn init_test_services(
1896 network: &Network,
1897) -> (
1898 Buffer<BoxService<Request, Response, BoxError>, Request>,
1899 ReadStateService,
1900 LatestChainTip,
1901 ChainTipChange,
1902) {
1903 // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
1904 // if we ever need to test final checkpoint sent UTXO queries
1905 let (state_service, read_state_service, latest_chain_tip, chain_tip_change) =
1906 StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
1907
1908 let state_service = Buffer::new(BoxService::new(state_service), 1);
1909
1910 (
1911 state_service,
1912 read_state_service,
1913 latest_chain_tip,
1914 chain_tip_change,
1915 )
1916}