zebra_state/service.rs
1//! [`tower::Service`]s for Zebra's cached chain state.
2//!
3//! Zebra provides cached state access via two main services:
4//! - [`StateService`]: a read-write service that writes blocks to the state,
5//! and redirects most read requests to the [`ReadStateService`].
6//! - [`ReadStateService`]: a read-only service that answers from the most
7//! recent committed block.
8//!
9//! Most users should prefer [`ReadStateService`], unless they need to write blocks to the state.
10//!
11//! Zebra also provides access to the best chain tip via:
12//! - [`LatestChainTip`]: a read-only channel that contains the latest committed
13//! tip.
14//! - [`ChainTipChange`]: a read-only channel that can asynchronously await
15//! chain tip changes.
16
17use std::{
18 collections::HashMap,
19 convert,
20 future::Future,
21 pin::Pin,
22 sync::Arc,
23 task::{Context, Poll},
24 time::{Duration, Instant},
25};
26
27use futures::future::FutureExt;
28use tokio::sync::oneshot;
29use tower::{util::BoxService, Service, ServiceExt};
30use tracing::{instrument, Instrument, Span};
31
32#[cfg(any(test, feature = "proptest-impl"))]
33use tower::buffer::Buffer;
34
35use zebra_chain::{
36 block::{self, CountedHeader, HeightDiff},
37 diagnostic::{task::WaitForPanics, CodeTimer},
38 parameters::{Network, NetworkUpgrade},
39 subtree::NoteCommitmentSubtreeIndex,
40};
41
42use zebra_chain::{block::Height, serialization::ZcashSerialize};
43
44use crate::{
45 constants::{
46 MAX_FIND_BLOCK_HASHES_RESULTS, MAX_FIND_BLOCK_HEADERS_RESULTS, MAX_LEGACY_CHAIN_BLOCKS,
47 },
48 error::{InvalidateError, QueueAndCommitError, ReconsiderError},
49 response::NonFinalizedBlocksListener,
50 service::{
51 block_iter::any_ancestor_blocks,
52 chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip},
53 finalized_state::{FinalizedState, ZebraDb},
54 non_finalized_state::{Chain, NonFinalizedState},
55 pending_utxos::PendingUtxos,
56 queued_blocks::QueuedBlocks,
57 watch_receiver::WatchReceiver,
58 },
59 BoxError, CheckpointVerifiedBlock, CommitSemanticallyVerifiedError, Config, ReadRequest,
60 ReadResponse, Request, Response, SemanticallyVerifiedBlock,
61};
62
63pub mod block_iter;
64pub mod chain_tip;
65pub mod watch_receiver;
66
67pub mod check;
68
69pub(crate) mod finalized_state;
70pub(crate) mod non_finalized_state;
71mod pending_utxos;
72mod queued_blocks;
73pub(crate) mod read;
74mod traits;
75mod write;
76
77#[cfg(any(test, feature = "proptest-impl"))]
78pub mod arbitrary;
79
80#[cfg(test)]
81mod tests;
82
83pub use finalized_state::{OutputLocation, TransactionIndex, TransactionLocation};
84use write::NonFinalizedWriteMessage;
85
86use self::queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified, SentHashes};
87
88pub use self::traits::{ReadState, State};
89
90/// A read-write service for Zebra's cached blockchain state.
91///
92/// This service modifies and provides access to:
93/// - the non-finalized state: the ~100 most recent blocks.
94/// Zebra allows chain forks in the non-finalized state,
95/// stores it in memory, and re-downloads it when restarted.
96/// - the finalized state: older blocks that have many confirmations.
97/// Zebra stores the single best chain in the finalized state,
98/// and re-loads it from disk when restarted.
99///
100/// Read requests to this service are buffered, then processed concurrently.
101/// Block write requests are buffered, then queued, then processed in order by a separate task.
102///
103/// Most state users can get faster read responses using the [`ReadStateService`],
104/// because its requests do not share a [`tower::buffer::Buffer`] with block write requests.
105///
106/// To quickly get the latest block, use [`LatestChainTip`] or [`ChainTipChange`].
107/// They can read the latest block directly, without queueing any requests.
108#[derive(Debug)]
109pub(crate) struct StateService {
110 // Configuration
111 //
112 /// The configured Zcash network.
113 network: Network,
114
115 /// The height that we start storing UTXOs from finalized blocks.
116 ///
117 /// This height should be lower than the last few checkpoints,
118 /// so the full verifier can verify UTXO spends from those blocks,
119 /// even if they haven't been committed to the finalized state yet.
120 full_verifier_utxo_lookahead: block::Height,
121
122 // Queued Blocks
123 //
124 /// Queued blocks for the [`NonFinalizedState`] that arrived out of order.
125 /// These blocks are awaiting their parent blocks before they can do contextual verification.
126 non_finalized_state_queued_blocks: QueuedBlocks,
127
128 /// Queued blocks for the [`FinalizedState`] that arrived out of order.
129 /// These blocks are awaiting their parent blocks before they can do contextual verification.
130 ///
131 /// Indexed by their parent block hash.
132 finalized_state_queued_blocks: HashMap<block::Hash, QueuedCheckpointVerified>,
133
134 /// Channels to send blocks to the block write task.
135 block_write_sender: write::BlockWriteSender,
136
137 /// The [`block::Hash`] of the most recent block sent on
138 /// `finalized_block_write_sender` or `non_finalized_block_write_sender`.
139 ///
140 /// On startup, this is:
141 /// - the finalized tip, if there are stored blocks, or
142 /// - the genesis block's parent hash, if the database is empty.
143 ///
144 /// If `invalid_block_write_reset_receiver` gets a reset, this is:
145 /// - the hash of the last valid committed block (the parent of the invalid block).
146 finalized_block_write_last_sent_hash: block::Hash,
147
148 /// A set of block hashes that have been sent to the block write task.
149 /// Hashes of blocks below the finalized tip height are periodically pruned.
150 non_finalized_block_write_sent_hashes: SentHashes,
151
152 /// If an invalid block is sent on `finalized_block_write_sender`
153 /// or `non_finalized_block_write_sender`,
154 /// this channel gets the [`block::Hash`] of the valid tip.
155 //
156 // TODO: add tests for finalized and non-finalized resets (#2654)
157 invalid_block_write_reset_receiver: tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
158
159 // Pending UTXO Request Tracking
160 //
161 /// The set of outpoints with pending requests for their associated transparent::Output.
162 pending_utxos: PendingUtxos,
163
164 /// Instant tracking the last time `pending_utxos` was pruned.
165 last_prune: Instant,
166
167 // Updating Concurrently Readable State
168 //
169 /// A cloneable [`ReadStateService`], used to answer concurrent read requests.
170 ///
171 /// TODO: move users of read [`Request`]s to [`ReadStateService`], and remove `read_service`.
172 read_service: ReadStateService,
173
174 // Metrics
175 //
176 /// A metric tracking the maximum height that's currently in `finalized_state_queued_blocks`
177 ///
178 /// Set to `f64::NAN` if `finalized_state_queued_blocks` is empty, because grafana shows NaNs
179 /// as a break in the graph.
180 max_finalized_queue_height: f64,
181}
182
183/// A read-only service for accessing Zebra's cached blockchain state.
184///
185/// This service provides read-only access to:
186/// - the non-finalized state: the ~100 most recent blocks.
187/// - the finalized state: older blocks that have many confirmations.
188///
189/// Requests to this service are processed in parallel,
190/// ignoring any blocks queued by the read-write [`StateService`].
191///
192/// This quick response behavior is better for most state users.
193/// It allows other async tasks to make progress while concurrently reading data from disk.
194#[derive(Clone, Debug)]
195pub struct ReadStateService {
196 // Configuration
197 //
198 /// The configured Zcash network.
199 network: Network,
200
201 // Shared Concurrently Readable State
202 //
203 /// A watch channel with a cached copy of the [`NonFinalizedState`].
204 ///
205 /// This state is only updated between requests,
206 /// so it might include some block data that is also on `disk`.
207 non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
208
209 /// The shared inner on-disk database for the finalized state.
210 ///
211 /// RocksDB allows reads and writes via a shared reference,
212 /// but [`ZebraDb`] doesn't expose any write methods or types.
213 ///
214 /// This chain is updated concurrently with requests,
215 /// so it might include some block data that is also in `best_mem`.
216 db: ZebraDb,
217
218 /// A shared handle to a task that writes blocks to the [`NonFinalizedState`] or [`FinalizedState`],
219 /// once the queues have received all their parent blocks.
220 ///
221 /// Used to check for panics when writing blocks.
222 block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
223}
224
225impl Drop for StateService {
226 fn drop(&mut self) {
227 // The state service owns the state, tasks, and channels,
228 // so dropping it should shut down everything.
229
230 // Close the channels (non-blocking)
231 // This makes the block write thread exit the next time it checks the channels.
232 // We want to do this here so we get any errors or panics from the block write task before it shuts down.
233 self.invalid_block_write_reset_receiver.close();
234
235 std::mem::drop(self.block_write_sender.finalized.take());
236 std::mem::drop(self.block_write_sender.non_finalized.take());
237
238 self.clear_finalized_block_queue(
239 "dropping the state: dropped unused finalized state queue block",
240 );
241 self.clear_non_finalized_block_queue(CommitSemanticallyVerifiedError::WriteTaskExited);
242
243 // Log database metrics before shutting down
244 info!("dropping the state: logging database metrics");
245 self.log_db_metrics();
246
247 // Then drop self.read_service, which checks the block write task for panics,
248 // and tries to shut down the database.
249 }
250}
251
252impl Drop for ReadStateService {
253 fn drop(&mut self) {
254 // The read state service shares the state,
255 // so dropping it should check if we can shut down.
256
257 // TODO: move this into a try_shutdown() method
258 if let Some(block_write_task) = self.block_write_task.take() {
259 if let Some(block_write_task_handle) = Arc::into_inner(block_write_task) {
260 // We're the last database user, so we can tell it to shut down (blocking):
261 // - flushes the database to disk, and
262 // - drops the database, which cleans up any database tasks correctly.
263 self.db.shutdown(true);
264
265 // We are the last state with a reference to this thread, so we can
266 // wait until the block write task finishes, then check for panics (blocking).
267 // (We'd also like to abort the thread, but std::thread::JoinHandle can't do that.)
268
269 // This log is verbose during tests.
270 #[cfg(not(test))]
271 info!("waiting for the block write task to finish");
272 #[cfg(test)]
273 debug!("waiting for the block write task to finish");
274
275 // TODO: move this into a check_for_panics() method
276 if let Err(thread_panic) = block_write_task_handle.join() {
277 std::panic::resume_unwind(thread_panic);
278 } else {
279 debug!("shutting down the state because the block write task has finished");
280 }
281 }
282 } else {
283 // Even if we're not the last database user, try shutting it down.
284 //
285 // TODO: rename this to try_shutdown()?
286 self.db.shutdown(false);
287 }
288 }
289}
290
291impl StateService {
292 const PRUNE_INTERVAL: Duration = Duration::from_secs(30);
293
294 /// Creates a new state service for the state `config` and `network`.
295 ///
296 /// Uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
297 /// to work out when it is near the final checkpoint.
298 ///
299 /// Returns the read-write and read-only state services,
300 /// and read-only watch channels for its best chain tip.
301 pub async fn new(
302 config: Config,
303 network: &Network,
304 max_checkpoint_height: block::Height,
305 checkpoint_verify_concurrency_limit: usize,
306 ) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
307 let (finalized_state, finalized_tip, timer) = {
308 let config = config.clone();
309 let network = network.clone();
310 tokio::task::spawn_blocking(move || {
311 let timer = CodeTimer::start();
312 let finalized_state = FinalizedState::new(
313 &config,
314 &network,
315 #[cfg(feature = "elasticsearch")]
316 true,
317 );
318 timer.finish(module_path!(), line!(), "opening finalized state database");
319
320 let timer = CodeTimer::start();
321 let finalized_tip = finalized_state.db.tip_block();
322
323 (finalized_state, finalized_tip, timer)
324 })
325 .await
326 .expect("failed to join blocking task")
327 };
328
329 // # Correctness
330 //
331 // The state service must set the finalized block write sender to `None`
332 // if there are blocks in the restored non-finalized state that are above
333 // the max checkpoint height so that non-finalized blocks can be written, otherwise,
334 // Zebra will be unable to commit semantically verified blocks, and its chain sync will stall.
335 //
336 // The state service must not set the finalized block write sender to `None` if there
337 // aren't blocks in the restored non-finalized state that are above the max checkpoint height,
338 // otherwise, unless checkpoint sync is disabled in the zebra-consensus configuration,
339 // Zebra will be unable to commit checkpoint verified blocks, and its chain sync will stall.
340 let is_finalized_tip_past_max_checkpoint = if let Some(tip) = &finalized_tip {
341 tip.coinbase_height().expect("valid block must have height") >= max_checkpoint_height
342 } else {
343 false
344 };
345 let (non_finalized_state, non_finalized_state_sender, non_finalized_state_receiver) =
346 NonFinalizedState::new(network)
347 .with_backup(
348 config.non_finalized_state_backup_dir(network),
349 &finalized_state.db,
350 is_finalized_tip_past_max_checkpoint,
351 )
352 .await;
353
354 let non_finalized_block_write_sent_hashes = SentHashes::new(&non_finalized_state);
355 let initial_tip = non_finalized_state
356 .best_tip_block()
357 .map(|cv_block| cv_block.block.clone())
358 .or(finalized_tip)
359 .map(CheckpointVerifiedBlock::from)
360 .map(ChainTipBlock::from);
361
362 tracing::info!(chain_tip = ?initial_tip.as_ref().map(|tip| (tip.hash, tip.height)), "loaded Zebra state cache");
363
364 let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
365 ChainTipSender::new(initial_tip, network);
366
367 let finalized_state_for_writing = finalized_state.clone();
368 let should_use_finalized_block_write_sender = non_finalized_state.is_chain_set_empty();
369 let (block_write_sender, invalid_block_write_reset_receiver, block_write_task) =
370 write::BlockWriteSender::spawn(
371 finalized_state_for_writing,
372 non_finalized_state,
373 chain_tip_sender,
374 non_finalized_state_sender,
375 should_use_finalized_block_write_sender,
376 );
377
378 let read_service = ReadStateService::new(
379 &finalized_state,
380 block_write_task,
381 non_finalized_state_receiver,
382 );
383
384 let full_verifier_utxo_lookahead = max_checkpoint_height
385 - HeightDiff::try_from(checkpoint_verify_concurrency_limit)
386 .expect("fits in HeightDiff");
387 let full_verifier_utxo_lookahead =
388 full_verifier_utxo_lookahead.unwrap_or(block::Height::MIN);
389 let non_finalized_state_queued_blocks = QueuedBlocks::default();
390 let pending_utxos = PendingUtxos::default();
391
392 let finalized_block_write_last_sent_hash =
393 tokio::task::spawn_blocking(move || finalized_state.db.finalized_tip_hash())
394 .await
395 .expect("failed to join blocking task");
396
397 let state = Self {
398 network: network.clone(),
399 full_verifier_utxo_lookahead,
400 non_finalized_state_queued_blocks,
401 finalized_state_queued_blocks: HashMap::new(),
402 block_write_sender,
403 finalized_block_write_last_sent_hash,
404 non_finalized_block_write_sent_hashes,
405 invalid_block_write_reset_receiver,
406 pending_utxos,
407 last_prune: Instant::now(),
408 read_service: read_service.clone(),
409 max_finalized_queue_height: f64::NAN,
410 };
411 timer.finish(module_path!(), line!(), "initializing state service");
412
413 tracing::info!("starting legacy chain check");
414 let timer = CodeTimer::start();
415
416 if let (Some(tip), Some(nu5_activation_height)) = (
417 {
418 let read_state = state.read_service.clone();
419 tokio::task::spawn_blocking(move || read_state.best_tip())
420 .await
421 .expect("task should not panic")
422 },
423 NetworkUpgrade::Nu5.activation_height(network),
424 ) {
425 if let Err(error) = check::legacy_chain(
426 nu5_activation_height,
427 any_ancestor_blocks(
428 &state.read_service.latest_non_finalized_state(),
429 &state.read_service.db,
430 tip.1,
431 ),
432 &state.network,
433 MAX_LEGACY_CHAIN_BLOCKS,
434 ) {
435 let legacy_db_path = state.read_service.db.path().to_path_buf();
436 panic!(
437 "Cached state contains a legacy chain.\n\
438 An outdated Zebra version did not know about a recent network upgrade,\n\
439 so it followed a legacy chain using outdated consensus branch rules.\n\
440 Hint: Delete your database, and restart Zebra to do a full sync.\n\
441 Database path: {legacy_db_path:?}\n\
442 Error: {error:?}",
443 );
444 }
445 }
446
447 tracing::info!("cached state consensus branch is valid: no legacy chain found");
448 timer.finish(module_path!(), line!(), "legacy chain check");
449
450 (state, read_service, latest_chain_tip, chain_tip_change)
451 }
452
453 /// Call read only state service to log rocksdb database metrics.
454 pub fn log_db_metrics(&self) {
455 self.read_service.db.print_db_metrics();
456 }
457
458 /// Queue a checkpoint verified block for verification and storage in the finalized state.
459 ///
460 /// Returns a channel receiver that provides the result of the block commit.
461 fn queue_and_commit_to_finalized_state(
462 &mut self,
463 checkpoint_verified: CheckpointVerifiedBlock,
464 ) -> oneshot::Receiver<Result<block::Hash, BoxError>> {
465 // # Correctness & Performance
466 //
467 // This method must not block, access the database, or perform CPU-intensive tasks,
468 // because it is called directly from the tokio executor's Future threads.
469
470 let queued_prev_hash = checkpoint_verified.block.header.previous_block_hash;
471 let queued_height = checkpoint_verified.height;
472
473 // If we're close to the final checkpoint, make the block's UTXOs available for
474 // semantic block verification, even when it is in the channel.
475 if self.is_close_to_final_checkpoint(queued_height) {
476 self.non_finalized_block_write_sent_hashes
477 .add_finalized(&checkpoint_verified)
478 }
479
480 let (rsp_tx, rsp_rx) = oneshot::channel();
481 let queued = (checkpoint_verified, rsp_tx);
482
483 if self.block_write_sender.finalized.is_some() {
484 // We're still committing checkpoint verified blocks
485 if let Some(duplicate_queued) = self
486 .finalized_state_queued_blocks
487 .insert(queued_prev_hash, queued)
488 {
489 Self::send_checkpoint_verified_block_error(
490 duplicate_queued,
491 "dropping older checkpoint verified block: got newer duplicate block",
492 );
493 }
494
495 self.drain_finalized_queue_and_commit();
496 } else {
497 // We've finished committing checkpoint verified blocks to the finalized state,
498 // so drop any repeated queued blocks, and return an error.
499 //
500 // TODO: track the latest sent height, and drop any blocks under that height
501 // every time we send some blocks (like QueuedSemanticallyVerifiedBlocks)
502 Self::send_checkpoint_verified_block_error(
503 queued,
504 "already finished committing checkpoint verified blocks: dropped duplicate block, \
505 block is already committed to the state",
506 );
507
508 self.clear_finalized_block_queue(
509 "already finished committing checkpoint verified blocks: dropped duplicate block, \
510 block is already committed to the state",
511 );
512 }
513
514 if self.finalized_state_queued_blocks.is_empty() {
515 self.max_finalized_queue_height = f64::NAN;
516 } else if self.max_finalized_queue_height.is_nan()
517 || self.max_finalized_queue_height < queued_height.0 as f64
518 {
519 // if there are still blocks in the queue, then either:
520 // - the new block was lower than the old maximum, and there was a gap before it,
521 // so the maximum is still the same (and we skip this code), or
522 // - the new block is higher than the old maximum, and there is at least one gap
523 // between the finalized tip and the new maximum
524 self.max_finalized_queue_height = queued_height.0 as f64;
525 }
526
527 metrics::gauge!("state.checkpoint.queued.max.height").set(self.max_finalized_queue_height);
528 metrics::gauge!("state.checkpoint.queued.block.count")
529 .set(self.finalized_state_queued_blocks.len() as f64);
530
531 rsp_rx
532 }
533
534 /// Finds finalized state queue blocks to be committed to the state in order,
535 /// removes them from the queue, and sends them to the block commit task.
536 ///
537 /// After queueing a finalized block, this method checks whether the newly
538 /// queued block (and any of its descendants) can be committed to the state.
539 ///
540 /// Returns an error if the block commit channel has been closed.
541 pub fn drain_finalized_queue_and_commit(&mut self) {
542 use tokio::sync::mpsc::error::{SendError, TryRecvError};
543
544 // # Correctness & Performance
545 //
546 // This method must not block, access the database, or perform CPU-intensive tasks,
547 // because it is called directly from the tokio executor's Future threads.
548
549 // If a block failed, we need to start again from a valid tip.
550 match self.invalid_block_write_reset_receiver.try_recv() {
551 Ok(reset_tip_hash) => self.finalized_block_write_last_sent_hash = reset_tip_hash,
552 Err(TryRecvError::Disconnected) => {
553 info!("Block commit task closed the block reset channel. Is Zebra shutting down?");
554 return;
555 }
556 // There are no errors, so we can just use the last block hash we sent
557 Err(TryRecvError::Empty) => {}
558 }
559
560 while let Some(queued_block) = self
561 .finalized_state_queued_blocks
562 .remove(&self.finalized_block_write_last_sent_hash)
563 {
564 let last_sent_finalized_block_height = queued_block.0.height;
565
566 self.finalized_block_write_last_sent_hash = queued_block.0.hash;
567
568 // If we've finished sending finalized blocks, ignore any repeated blocks.
569 // (Blocks can be repeated after a syncer reset.)
570 if let Some(finalized_block_write_sender) = &self.block_write_sender.finalized {
571 let send_result = finalized_block_write_sender.send(queued_block);
572
573 // If the receiver is closed, we can't send any more blocks.
574 if let Err(SendError(queued)) = send_result {
575 // If Zebra is shutting down, drop blocks and return an error.
576 Self::send_checkpoint_verified_block_error(
577 queued,
578 "block commit task exited. Is Zebra shutting down?",
579 );
580
581 self.clear_finalized_block_queue(
582 "block commit task exited. Is Zebra shutting down?",
583 );
584 } else {
585 metrics::gauge!("state.checkpoint.sent.block.height")
586 .set(last_sent_finalized_block_height.0 as f64);
587 };
588 }
589 }
590 }
591
592 /// Drops all finalized state queue blocks, and sends an error on their result channels.
593 fn clear_finalized_block_queue(&mut self, error: impl Into<BoxError> + Clone) {
594 for (_hash, queued) in self.finalized_state_queued_blocks.drain() {
595 Self::send_checkpoint_verified_block_error(queued, error.clone());
596 }
597 }
598
599 /// Send an error on a `QueuedCheckpointVerified` block's result channel, and drop the block
600 fn send_checkpoint_verified_block_error(
601 queued: QueuedCheckpointVerified,
602 error: impl Into<BoxError>,
603 ) {
604 let (finalized, rsp_tx) = queued;
605
606 // The block sender might have already given up on this block,
607 // so ignore any channel send errors.
608 let _ = rsp_tx.send(Err(error.into()));
609 std::mem::drop(finalized);
610 }
611
612 /// Drops all non-finalized state queue blocks, and sends an error on their result channels.
613 fn clear_non_finalized_block_queue(&mut self, error: CommitSemanticallyVerifiedError) {
614 for (_hash, queued) in self.non_finalized_state_queued_blocks.drain() {
615 Self::send_semantically_verified_block_error(queued, error.clone());
616 }
617 }
618
619 /// Send an error on a `QueuedSemanticallyVerified` block's result channel, and drop the block
620 fn send_semantically_verified_block_error(
621 queued: QueuedSemanticallyVerified,
622 error: CommitSemanticallyVerifiedError,
623 ) {
624 let (finalized, rsp_tx) = queued;
625
626 // The block sender might have already given up on this block,
627 // so ignore any channel send errors.
628 let _ = rsp_tx.send(Err(error));
629 std::mem::drop(finalized);
630 }
631
632 /// Queue a semantically verified block for contextual verification and check if any queued
633 /// blocks are ready to be verified and committed to the state.
634 ///
635 /// This function encodes the logic for [committing non-finalized blocks][1]
636 /// in RFC0005.
637 ///
638 /// [1]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html#committing-non-finalized-blocks
639 #[instrument(level = "debug", skip(self, semantically_verified))]
640 fn queue_and_commit_to_non_finalized_state(
641 &mut self,
642 semantically_verified: SemanticallyVerifiedBlock,
643 ) -> oneshot::Receiver<Result<block::Hash, CommitSemanticallyVerifiedError>> {
644 tracing::debug!(block = %semantically_verified.block, "queueing block for contextual verification");
645 let parent_hash = semantically_verified.block.header.previous_block_hash;
646
647 if self
648 .non_finalized_block_write_sent_hashes
649 .contains(&semantically_verified.hash)
650 {
651 let (rsp_tx, rsp_rx) = oneshot::channel();
652 let _ = rsp_tx.send(Err(QueueAndCommitError::new_duplicate(
653 semantically_verified.hash,
654 )
655 .into()));
656 return rsp_rx;
657 }
658
659 if self
660 .read_service
661 .db
662 .contains_height(semantically_verified.height)
663 {
664 let (rsp_tx, rsp_rx) = oneshot::channel();
665 let _ = rsp_tx.send(Err(QueueAndCommitError::new_already_finalized(
666 semantically_verified.height,
667 )
668 .into()));
669 return rsp_rx;
670 }
671
672 // [`Request::CommitSemanticallyVerifiedBlock`] contract: a request to commit a block which
673 // has been queued but not yet committed to the state fails the older request and replaces
674 // it with the newer request.
675 let rsp_rx = if let Some((_, old_rsp_tx)) = self
676 .non_finalized_state_queued_blocks
677 .get_mut(&semantically_verified.hash)
678 {
679 tracing::debug!("replacing older queued request with new request");
680 let (mut rsp_tx, rsp_rx) = oneshot::channel();
681 std::mem::swap(old_rsp_tx, &mut rsp_tx);
682 let _ = rsp_tx.send(Err(QueueAndCommitError::new_replaced(
683 semantically_verified.hash,
684 )
685 .into()));
686 rsp_rx
687 } else {
688 let (rsp_tx, rsp_rx) = oneshot::channel();
689 self.non_finalized_state_queued_blocks
690 .queue((semantically_verified, rsp_tx));
691 rsp_rx
692 };
693
694 // We've finished sending checkpoint verified blocks when:
695 // - we've sent the verified block for the last checkpoint, and
696 // - it has been successfully written to disk.
697 //
698 // We detect the last checkpoint by looking for non-finalized blocks
699 // that are a child of the last block we sent.
700 //
701 // TODO: configure the state with the last checkpoint hash instead?
702 if self.block_write_sender.finalized.is_some()
703 && self
704 .non_finalized_state_queued_blocks
705 .has_queued_children(self.finalized_block_write_last_sent_hash)
706 && self.read_service.db.finalized_tip_hash()
707 == self.finalized_block_write_last_sent_hash
708 {
709 // Tell the block write task to stop committing checkpoint verified blocks to the finalized state,
710 // and move on to committing semantically verified blocks to the non-finalized state.
711 std::mem::drop(self.block_write_sender.finalized.take());
712 // Remove any checkpoint-verified block hashes from `non_finalized_block_write_sent_hashes`.
713 self.non_finalized_block_write_sent_hashes = SentHashes::default();
714 // Mark `SentHashes` as usable by the `can_fork_chain_at()` method.
715 self.non_finalized_block_write_sent_hashes
716 .can_fork_chain_at_hashes = true;
717 // Send blocks from non-finalized queue
718 self.send_ready_non_finalized_queued(self.finalized_block_write_last_sent_hash);
719 // We've finished committing checkpoint verified blocks to finalized state, so drop any repeated queued blocks.
720 self.clear_finalized_block_queue(
721 "already finished committing checkpoint verified blocks: dropped duplicate block, \
722 block is already committed to the state",
723 );
724 } else if !self.can_fork_chain_at(&parent_hash) {
725 tracing::trace!("unready to verify, returning early");
726 } else if self.block_write_sender.finalized.is_none() {
727 // Wait until block commit task is ready to write non-finalized blocks before dequeuing them
728 self.send_ready_non_finalized_queued(parent_hash);
729
730 let finalized_tip_height = self.read_service.db.finalized_tip_height().expect(
731 "Finalized state must have at least one block before committing non-finalized state",
732 );
733
734 self.non_finalized_state_queued_blocks
735 .prune_by_height(finalized_tip_height);
736
737 self.non_finalized_block_write_sent_hashes
738 .prune_by_height(finalized_tip_height);
739 }
740
741 rsp_rx
742 }
743
744 /// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks.
745 fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
746 self.non_finalized_block_write_sent_hashes
747 .can_fork_chain_at(hash)
748 || &self.read_service.db.finalized_tip_hash() == hash
749 }
750
751 /// Returns `true` if `queued_height` is near the final checkpoint.
752 ///
753 /// The semantic block verifier needs access to UTXOs from checkpoint verified blocks
754 /// near the final checkpoint, so that it can verify blocks that spend those UTXOs.
755 ///
756 /// If it doesn't have the required UTXOs, some blocks will time out,
757 /// but succeed after a syncer restart.
758 fn is_close_to_final_checkpoint(&self, queued_height: block::Height) -> bool {
759 queued_height >= self.full_verifier_utxo_lookahead
760 }
761
762 /// Sends all queued blocks whose parents have recently arrived starting from `new_parent`
763 /// in breadth-first ordering to the block write task which will attempt to validate and commit them
764 #[tracing::instrument(level = "debug", skip(self, new_parent))]
765 fn send_ready_non_finalized_queued(&mut self, new_parent: block::Hash) {
766 use tokio::sync::mpsc::error::SendError;
767 if let Some(non_finalized_block_write_sender) = &self.block_write_sender.non_finalized {
768 let mut new_parents: Vec<block::Hash> = vec![new_parent];
769
770 while let Some(parent_hash) = new_parents.pop() {
771 let queued_children = self
772 .non_finalized_state_queued_blocks
773 .dequeue_children(parent_hash);
774
775 for queued_child in queued_children {
776 let (SemanticallyVerifiedBlock { hash, .. }, _) = queued_child;
777
778 self.non_finalized_block_write_sent_hashes
779 .add(&queued_child.0);
780 let send_result = non_finalized_block_write_sender.send(queued_child.into());
781
782 if let Err(SendError(NonFinalizedWriteMessage::Commit(queued))) = send_result {
783 // If Zebra is shutting down, drop blocks and return an error.
784 Self::send_semantically_verified_block_error(
785 queued,
786 CommitSemanticallyVerifiedError::WriteTaskExited,
787 );
788
789 self.clear_non_finalized_block_queue(
790 CommitSemanticallyVerifiedError::WriteTaskExited,
791 );
792
793 return;
794 };
795
796 new_parents.push(hash);
797 }
798 }
799
800 self.non_finalized_block_write_sent_hashes.finish_batch();
801 };
802 }
803
804 /// Return the tip of the current best chain.
805 pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
806 self.read_service.best_tip()
807 }
808
809 fn send_invalidate_block(
810 &self,
811 hash: block::Hash,
812 ) -> oneshot::Receiver<Result<block::Hash, InvalidateError>> {
813 let (rsp_tx, rsp_rx) = oneshot::channel();
814
815 let Some(sender) = &self.block_write_sender.non_finalized else {
816 let _ = rsp_tx.send(Err(InvalidateError::ProcessingCheckpointedBlocks));
817 return rsp_rx;
818 };
819
820 if let Err(tokio::sync::mpsc::error::SendError(error)) =
821 sender.send(NonFinalizedWriteMessage::Invalidate { hash, rsp_tx })
822 {
823 let NonFinalizedWriteMessage::Invalidate { rsp_tx, .. } = error else {
824 unreachable!("should return the same Invalidate message could not be sent");
825 };
826
827 let _ = rsp_tx.send(Err(InvalidateError::SendInvalidateRequestFailed));
828 }
829
830 rsp_rx
831 }
832
833 fn send_reconsider_block(
834 &self,
835 hash: block::Hash,
836 ) -> oneshot::Receiver<Result<Vec<block::Hash>, ReconsiderError>> {
837 let (rsp_tx, rsp_rx) = oneshot::channel();
838
839 let Some(sender) = &self.block_write_sender.non_finalized else {
840 let _ = rsp_tx.send(Err(ReconsiderError::CheckpointCommitInProgress));
841 return rsp_rx;
842 };
843
844 if let Err(tokio::sync::mpsc::error::SendError(error)) =
845 sender.send(NonFinalizedWriteMessage::Reconsider { hash, rsp_tx })
846 {
847 let NonFinalizedWriteMessage::Reconsider { rsp_tx, .. } = error else {
848 unreachable!("should return the same Reconsider message could not be sent");
849 };
850
851 let _ = rsp_tx.send(Err(ReconsiderError::ReconsiderSendFailed));
852 }
853
854 rsp_rx
855 }
856
857 /// Assert some assumptions about the semantically verified `block` before it is queued.
858 fn assert_block_can_be_validated(&self, block: &SemanticallyVerifiedBlock) {
859 // required by `Request::CommitSemanticallyVerifiedBlock` call
860 assert!(
861 block.height > self.network.mandatory_checkpoint_height(),
862 "invalid semantically verified block height: the canopy checkpoint is mandatory, pre-canopy \
863 blocks, and the canopy activation block, must be committed to the state as finalized \
864 blocks"
865 );
866 }
867}
868
869impl ReadStateService {
870 /// Creates a new read-only state service, using the provided finalized state and
871 /// block write task handle.
872 ///
873 /// Returns the newly created service,
874 /// and a watch channel for updating the shared recent non-finalized chain.
875 pub(crate) fn new(
876 finalized_state: &FinalizedState,
877 block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
878 non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
879 ) -> Self {
880 let read_service = Self {
881 network: finalized_state.network(),
882 db: finalized_state.db.clone(),
883 non_finalized_state_receiver,
884 block_write_task,
885 };
886
887 tracing::debug!("created new read-only state service");
888
889 read_service
890 }
891
892 /// Return the tip of the current best chain.
893 pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
894 read::best_tip(&self.latest_non_finalized_state(), &self.db)
895 }
896
897 /// Gets a clone of the latest non-finalized state from the `non_finalized_state_receiver`
898 fn latest_non_finalized_state(&self) -> NonFinalizedState {
899 self.non_finalized_state_receiver.cloned_watch_data()
900 }
901
902 /// Gets a clone of the latest, best non-finalized chain from the `non_finalized_state_receiver`
903 #[allow(dead_code)]
904 fn latest_best_chain(&self) -> Option<Arc<Chain>> {
905 self.latest_non_finalized_state().best_chain().cloned()
906 }
907
908 /// Test-only access to the inner database.
909 /// Can be used to modify the database without doing any consensus checks.
910 #[cfg(any(test, feature = "proptest-impl"))]
911 pub fn db(&self) -> &ZebraDb {
912 &self.db
913 }
914
915 /// Logs rocksdb metrics using the read only state service.
916 pub fn log_db_metrics(&self) {
917 self.db.print_db_metrics();
918 }
919}
920
921impl Service<Request> for StateService {
922 type Response = Response;
923 type Error = BoxError;
924 type Future =
925 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
926
927 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
928 // Check for panics in the block write task
929 let poll = self.read_service.poll_ready(cx);
930
931 // Prune outdated UTXO requests
932 let now = Instant::now();
933
934 if self.last_prune + Self::PRUNE_INTERVAL < now {
935 let tip = self.best_tip();
936 let old_len = self.pending_utxos.len();
937
938 self.pending_utxos.prune();
939 self.last_prune = now;
940
941 let new_len = self.pending_utxos.len();
942 let prune_count = old_len
943 .checked_sub(new_len)
944 .expect("prune does not add any utxo requests");
945 if prune_count > 0 {
946 tracing::debug!(
947 ?old_len,
948 ?new_len,
949 ?prune_count,
950 ?tip,
951 "pruned utxo requests"
952 );
953 } else {
954 tracing::debug!(len = ?old_len, ?tip, "no utxo requests needed pruning");
955 }
956 }
957
958 poll
959 }
960
961 #[instrument(name = "state", skip(self, req))]
962 fn call(&mut self, req: Request) -> Self::Future {
963 req.count_metric();
964 let timer = CodeTimer::start();
965 let span = Span::current();
966
967 match req {
968 // Uses non_finalized_state_queued_blocks and pending_utxos in the StateService
969 // Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb.
970 //
971 // The expected error type for this request is `CommitSemanticallyVerifiedError`.
972 Request::CommitSemanticallyVerifiedBlock(semantically_verified) => {
973 self.assert_block_can_be_validated(&semantically_verified);
974
975 self.pending_utxos
976 .check_against_ordered(&semantically_verified.new_outputs);
977
978 // # Performance
979 //
980 // Allow other async tasks to make progress while blocks are being verified
981 // and written to disk. But wait for the blocks to finish committing,
982 // so that `StateService` multi-block queries always observe a consistent state.
983 //
984 // Since each block is spawned into its own task,
985 // there shouldn't be any other code running in the same task,
986 // so we don't need to worry about blocking it:
987 // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
988
989 let rsp_rx = tokio::task::block_in_place(move || {
990 span.in_scope(|| {
991 self.queue_and_commit_to_non_finalized_state(semantically_verified)
992 })
993 });
994
995 // TODO:
996 // - check for panics in the block write task here,
997 // as well as in poll_ready()
998
999 // The work is all done, the future just waits on a channel for the result
1000 timer.finish(module_path!(), line!(), "CommitSemanticallyVerifiedBlock");
1001
1002 // Await the channel response, flatten the result, map receive errors to
1003 // `CommitSemanticallyVerifiedError::WriteTaskExited`.
1004 // Then flatten the nested Result and convert any errors to a BoxError.
1005 let span = Span::current();
1006 async move {
1007 rsp_rx
1008 .await
1009 .map_err(|_recv_error| CommitSemanticallyVerifiedError::WriteTaskExited)
1010 .flatten()
1011 .map_err(BoxError::from)
1012 .map(Response::Committed)
1013 }
1014 .instrument(span)
1015 .boxed()
1016 }
1017
1018 // Uses finalized_state_queued_blocks and pending_utxos in the StateService.
1019 // Accesses shared writeable state in the StateService.
1020 Request::CommitCheckpointVerifiedBlock(finalized) => {
1021 // # Consensus
1022 //
1023 // A semantic block verification could have called AwaitUtxo
1024 // before this checkpoint verified block arrived in the state.
1025 // So we need to check for pending UTXO requests sent by running
1026 // semantic block verifications.
1027 //
1028 // This check is redundant for most checkpoint verified blocks,
1029 // because semantic verification can only succeed near the final
1030 // checkpoint, when all the UTXOs are available for the verifying block.
1031 //
1032 // (Checkpoint block UTXOs are verified using block hash checkpoints
1033 // and transaction merkle tree block header commitments.)
1034 self.pending_utxos
1035 .check_against_ordered(&finalized.new_outputs);
1036
1037 // # Performance
1038 //
1039 // This method doesn't block, access the database, or perform CPU-intensive tasks,
1040 // so we can run it directly in the tokio executor's Future threads.
1041 let rsp_rx = self.queue_and_commit_to_finalized_state(finalized);
1042
1043 // TODO:
1044 // - check for panics in the block write task here,
1045 // as well as in poll_ready()
1046
1047 // The work is all done, the future just waits on a channel for the result
1048 timer.finish(module_path!(), line!(), "CommitCheckpointVerifiedBlock");
1049
1050 async move {
1051 rsp_rx
1052 .await
1053 .map_err(|_recv_error| {
1054 BoxError::from("block was dropped from the queue of finalized blocks")
1055 })
1056 // TODO: replace with Result::flatten once it stabilises
1057 // https://github.com/rust-lang/rust/issues/70142
1058 .and_then(convert::identity)
1059 .map(Response::Committed)
1060 }
1061 .instrument(span)
1062 .boxed()
1063 }
1064
1065 // Uses pending_utxos and non_finalized_state_queued_blocks in the StateService.
1066 // If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService.
1067 Request::AwaitUtxo(outpoint) => {
1068 // Prepare the AwaitUtxo future from PendingUxtos.
1069 let response_fut = self.pending_utxos.queue(outpoint);
1070 // Only instrument `response_fut`, the ReadStateService already
1071 // instruments its requests with the same span.
1072
1073 let response_fut = response_fut.instrument(span).boxed();
1074
1075 // Check the non-finalized block queue outside the returned future,
1076 // so we can access mutable state fields.
1077 if let Some(utxo) = self.non_finalized_state_queued_blocks.utxo(&outpoint) {
1078 self.pending_utxos.respond(&outpoint, utxo);
1079
1080 // We're finished, the returned future gets the UTXO from the respond() channel.
1081 timer.finish(module_path!(), line!(), "AwaitUtxo/queued-non-finalized");
1082
1083 return response_fut;
1084 }
1085
1086 // Check the sent non-finalized blocks
1087 if let Some(utxo) = self.non_finalized_block_write_sent_hashes.utxo(&outpoint) {
1088 self.pending_utxos.respond(&outpoint, utxo);
1089
1090 // We're finished, the returned future gets the UTXO from the respond() channel.
1091 timer.finish(module_path!(), line!(), "AwaitUtxo/sent-non-finalized");
1092
1093 return response_fut;
1094 }
1095
1096 // We ignore any UTXOs in FinalizedState.finalized_state_queued_blocks,
1097 // because it is only used during checkpoint verification.
1098 //
1099 // This creates a rare race condition, but it doesn't seem to happen much in practice.
1100 // See #5126 for details.
1101
1102 // Manually send a request to the ReadStateService,
1103 // to get UTXOs from any non-finalized chain or the finalized chain.
1104 let read_service = self.read_service.clone();
1105
1106 // Run the request in an async block, so we can await the response.
1107 async move {
1108 let req = ReadRequest::AnyChainUtxo(outpoint);
1109
1110 let rsp = read_service.oneshot(req).await?;
1111
1112 // Optional TODO:
1113 // - make pending_utxos.respond() async using a channel,
1114 // so we can respond to all waiting requests here
1115 //
1116 // This change is not required for correctness, because:
1117 // - any waiting requests should have returned when the block was sent to the state
1118 // - otherwise, the request returns immediately if:
1119 // - the block is in the non-finalized queue, or
1120 // - the block is in any non-finalized chain or the finalized state
1121 //
1122 // And if the block is in the finalized queue,
1123 // that's rare enough that a retry is ok.
1124 if let ReadResponse::AnyChainUtxo(Some(utxo)) = rsp {
1125 // We got a UTXO, so we replace the response future with the result own.
1126 timer.finish(module_path!(), line!(), "AwaitUtxo/any-chain");
1127
1128 return Ok(Response::Utxo(utxo));
1129 }
1130
1131 // We're finished, but the returned future is waiting on the respond() channel.
1132 timer.finish(module_path!(), line!(), "AwaitUtxo/waiting");
1133
1134 response_fut.await
1135 }
1136 .boxed()
1137 }
1138
1139 // Used by sync, inbound, and block verifier to check if a block is already in the state
1140 // before downloading or validating it.
1141 Request::KnownBlock(hash) => {
1142 let timer = CodeTimer::start();
1143
1144 let read_service = self.read_service.clone();
1145
1146 async move {
1147 let response = read::non_finalized_state_contains_block_hash(
1148 &read_service.latest_non_finalized_state(),
1149 hash,
1150 )
1151 .or_else(|| read::finalized_state_contains_block_hash(&read_service.db, hash));
1152
1153 // The work is done in the future.
1154 timer.finish(module_path!(), line!(), "Request::KnownBlock");
1155
1156 Ok(Response::KnownBlock(response))
1157 }
1158 .boxed()
1159 }
1160
1161 // The expected error type for this request is `InvalidateError`
1162 Request::InvalidateBlock(block_hash) => {
1163 let rsp_rx = tokio::task::block_in_place(move || {
1164 span.in_scope(|| self.send_invalidate_block(block_hash))
1165 });
1166
1167 // Await the channel response, flatten the result, map receive errors to
1168 // `InvalidateError::InvalidateRequestDropped`.
1169 // Then flatten the nested Result and convert any errors to a BoxError.
1170 let span = Span::current();
1171 async move {
1172 rsp_rx
1173 .await
1174 .map_err(|_recv_error| InvalidateError::InvalidateRequestDropped)
1175 .flatten()
1176 .map_err(BoxError::from)
1177 .map(Response::Invalidated)
1178 }
1179 .instrument(span)
1180 .boxed()
1181 }
1182
1183 // The expected error type for this request is `ReconsiderError`
1184 Request::ReconsiderBlock(block_hash) => {
1185 let rsp_rx = tokio::task::block_in_place(move || {
1186 span.in_scope(|| self.send_reconsider_block(block_hash))
1187 });
1188
1189 // Await the channel response, flatten the result, map receive errors to
1190 // `ReconsiderError::ReconsiderResponseDropped`.
1191 // Then flatten the nested Result and convert any errors to a BoxError.
1192 let span = Span::current();
1193 async move {
1194 rsp_rx
1195 .await
1196 .map_err(|_recv_error| ReconsiderError::ReconsiderResponseDropped)
1197 .flatten()
1198 .map_err(BoxError::from)
1199 .map(Response::Reconsidered)
1200 }
1201 .instrument(span)
1202 .boxed()
1203 }
1204
1205 // Runs concurrently using the ReadStateService
1206 Request::Tip
1207 | Request::Depth(_)
1208 | Request::BestChainNextMedianTimePast
1209 | Request::BestChainBlockHash(_)
1210 | Request::BlockLocator
1211 | Request::Transaction(_)
1212 | Request::AnyChainTransaction(_)
1213 | Request::UnspentBestChainUtxo(_)
1214 | Request::Block(_)
1215 | Request::BlockAndSize(_)
1216 | Request::BlockHeader(_)
1217 | Request::FindBlockHashes { .. }
1218 | Request::FindBlockHeaders { .. }
1219 | Request::CheckBestChainTipNullifiersAndAnchors(_) => {
1220 // Redirect the request to the concurrent ReadStateService
1221 let read_service = self.read_service.clone();
1222
1223 async move {
1224 let req = req
1225 .try_into()
1226 .expect("ReadRequest conversion should not fail");
1227
1228 let rsp = read_service.oneshot(req).await?;
1229 let rsp = rsp.try_into().expect("Response conversion should not fail");
1230
1231 Ok(rsp)
1232 }
1233 .boxed()
1234 }
1235
1236 Request::CheckBlockProposalValidity(_) => {
1237 // Redirect the request to the concurrent ReadStateService
1238 let read_service = self.read_service.clone();
1239
1240 async move {
1241 let req = req
1242 .try_into()
1243 .expect("ReadRequest conversion should not fail");
1244
1245 let rsp = read_service.oneshot(req).await?;
1246 let rsp = rsp.try_into().expect("Response conversion should not fail");
1247
1248 Ok(rsp)
1249 }
1250 .boxed()
1251 }
1252 }
1253 }
1254}
1255
1256impl Service<ReadRequest> for ReadStateService {
1257 type Response = ReadResponse;
1258 type Error = BoxError;
1259 type Future =
1260 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1261
1262 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1263 // Check for panics in the block write task
1264 //
1265 // TODO: move into a check_for_panics() method
1266 let block_write_task = self.block_write_task.take();
1267
1268 if let Some(block_write_task) = block_write_task {
1269 if block_write_task.is_finished() {
1270 if let Some(block_write_task) = Arc::into_inner(block_write_task) {
1271 // We are the last state with a reference to this task, so we can propagate any panics
1272 if let Err(thread_panic) = block_write_task.join() {
1273 std::panic::resume_unwind(thread_panic);
1274 }
1275 }
1276 } else {
1277 // It hasn't finished, so we need to put it back
1278 self.block_write_task = Some(block_write_task);
1279 }
1280 }
1281
1282 self.db.check_for_panics();
1283
1284 Poll::Ready(Ok(()))
1285 }
1286
1287 #[instrument(name = "read_state", skip(self, req))]
1288 fn call(&mut self, req: ReadRequest) -> Self::Future {
1289 req.count_metric();
1290 let timer = CodeTimer::start();
1291 let span = Span::current();
1292
1293 match req {
1294 // Used by the `getblockchaininfo` RPC.
1295 ReadRequest::UsageInfo => {
1296 let db = self.db.clone();
1297
1298 tokio::task::spawn_blocking(move || {
1299 span.in_scope(move || {
1300 // The work is done in the future.
1301
1302 let db_size = db.size();
1303
1304 timer.finish(module_path!(), line!(), "ReadRequest::UsageInfo");
1305
1306 Ok(ReadResponse::UsageInfo(db_size))
1307 })
1308 })
1309 .wait_for_panics()
1310 }
1311
1312 // Used by the StateService.
1313 ReadRequest::Tip => {
1314 let state = self.clone();
1315
1316 tokio::task::spawn_blocking(move || {
1317 span.in_scope(move || {
1318 let tip = state.non_finalized_state_receiver.with_watch_data(
1319 |non_finalized_state| {
1320 read::tip(non_finalized_state.best_chain(), &state.db)
1321 },
1322 );
1323
1324 // The work is done in the future.
1325 timer.finish(module_path!(), line!(), "ReadRequest::Tip");
1326
1327 Ok(ReadResponse::Tip(tip))
1328 })
1329 })
1330 .wait_for_panics()
1331 }
1332
1333 // Used by `getblockchaininfo` RPC method.
1334 ReadRequest::TipPoolValues => {
1335 let state = self.clone();
1336
1337 tokio::task::spawn_blocking(move || {
1338 span.in_scope(move || {
1339 let tip_with_value_balance = state
1340 .non_finalized_state_receiver
1341 .with_watch_data(|non_finalized_state| {
1342 read::tip_with_value_balance(
1343 non_finalized_state.best_chain(),
1344 &state.db,
1345 )
1346 });
1347
1348 // The work is done in the future.
1349 // TODO: Do this in the Drop impl with the variant name?
1350 timer.finish(module_path!(), line!(), "ReadRequest::TipPoolValues");
1351
1352 let (tip_height, tip_hash, value_balance) = tip_with_value_balance?
1353 .ok_or(BoxError::from("no chain tip available yet"))?;
1354
1355 Ok(ReadResponse::TipPoolValues {
1356 tip_height,
1357 tip_hash,
1358 value_balance,
1359 })
1360 })
1361 })
1362 .wait_for_panics()
1363 }
1364
1365 // Used by getblock
1366 ReadRequest::BlockInfo(hash_or_height) => {
1367 let state = self.clone();
1368
1369 tokio::task::spawn_blocking(move || {
1370 span.in_scope(move || {
1371 let value_balance = state.non_finalized_state_receiver.with_watch_data(
1372 |non_finalized_state| {
1373 read::block_info(
1374 non_finalized_state.best_chain(),
1375 &state.db,
1376 hash_or_height,
1377 )
1378 },
1379 );
1380
1381 // The work is done in the future.
1382 // TODO: Do this in the Drop impl with the variant name?
1383 timer.finish(module_path!(), line!(), "ReadRequest::BlockInfo");
1384
1385 Ok(ReadResponse::BlockInfo(value_balance))
1386 })
1387 })
1388 .wait_for_panics()
1389 }
1390
1391 // Used by the StateService.
1392 ReadRequest::Depth(hash) => {
1393 let state = self.clone();
1394
1395 tokio::task::spawn_blocking(move || {
1396 span.in_scope(move || {
1397 let depth = state.non_finalized_state_receiver.with_watch_data(
1398 |non_finalized_state| {
1399 read::depth(non_finalized_state.best_chain(), &state.db, hash)
1400 },
1401 );
1402
1403 // The work is done in the future.
1404 timer.finish(module_path!(), line!(), "ReadRequest::Depth");
1405
1406 Ok(ReadResponse::Depth(depth))
1407 })
1408 })
1409 .wait_for_panics()
1410 }
1411
1412 // Used by the StateService.
1413 ReadRequest::BestChainNextMedianTimePast => {
1414 let state = self.clone();
1415
1416 tokio::task::spawn_blocking(move || {
1417 span.in_scope(move || {
1418 let non_finalized_state = state.latest_non_finalized_state();
1419 let median_time_past =
1420 read::next_median_time_past(&non_finalized_state, &state.db);
1421
1422 // The work is done in the future.
1423 timer.finish(
1424 module_path!(),
1425 line!(),
1426 "ReadRequest::BestChainNextMedianTimePast",
1427 );
1428
1429 Ok(ReadResponse::BestChainNextMedianTimePast(median_time_past?))
1430 })
1431 })
1432 .wait_for_panics()
1433 }
1434
1435 // Used by the get_block (raw) RPC and the StateService.
1436 ReadRequest::Block(hash_or_height) => {
1437 let state = self.clone();
1438
1439 tokio::task::spawn_blocking(move || {
1440 span.in_scope(move || {
1441 let block = state.non_finalized_state_receiver.with_watch_data(
1442 |non_finalized_state| {
1443 read::block(
1444 non_finalized_state.best_chain(),
1445 &state.db,
1446 hash_or_height,
1447 )
1448 },
1449 );
1450
1451 // The work is done in the future.
1452 timer.finish(module_path!(), line!(), "ReadRequest::Block");
1453
1454 Ok(ReadResponse::Block(block))
1455 })
1456 })
1457 .wait_for_panics()
1458 }
1459
1460 // Used by the get_block (raw) RPC and the StateService.
1461 ReadRequest::BlockAndSize(hash_or_height) => {
1462 let state = self.clone();
1463
1464 tokio::task::spawn_blocking(move || {
1465 span.in_scope(move || {
1466 let block_and_size = state.non_finalized_state_receiver.with_watch_data(
1467 |non_finalized_state| {
1468 read::block_and_size(
1469 non_finalized_state.best_chain(),
1470 &state.db,
1471 hash_or_height,
1472 )
1473 },
1474 );
1475
1476 // The work is done in the future.
1477 timer.finish(module_path!(), line!(), "ReadRequest::BlockAndSize");
1478
1479 Ok(ReadResponse::BlockAndSize(block_and_size))
1480 })
1481 })
1482 .wait_for_panics()
1483 }
1484
1485 // Used by the get_block (verbose) RPC and the StateService.
1486 ReadRequest::BlockHeader(hash_or_height) => {
1487 let state = self.clone();
1488
1489 tokio::task::spawn_blocking(move || {
1490 span.in_scope(move || {
1491 let best_chain = state.latest_best_chain();
1492
1493 let height = hash_or_height
1494 .height_or_else(|hash| {
1495 read::find::height_by_hash(best_chain.clone(), &state.db, hash)
1496 })
1497 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1498
1499 let hash = hash_or_height
1500 .hash_or_else(|height| {
1501 read::find::hash_by_height(best_chain.clone(), &state.db, height)
1502 })
1503 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1504
1505 let next_height = height.next()?;
1506 let next_block_hash =
1507 read::find::hash_by_height(best_chain.clone(), &state.db, next_height);
1508
1509 let header = read::block_header(best_chain, &state.db, height.into())
1510 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1511
1512 // The work is done in the future.
1513 timer.finish(module_path!(), line!(), "ReadRequest::Block");
1514
1515 Ok(ReadResponse::BlockHeader {
1516 header,
1517 hash,
1518 height,
1519 next_block_hash,
1520 })
1521 })
1522 })
1523 .wait_for_panics()
1524 }
1525
1526 // For the get_raw_transaction RPC and the StateService.
1527 ReadRequest::Transaction(hash) => {
1528 let state = self.clone();
1529
1530 tokio::task::spawn_blocking(move || {
1531 span.in_scope(move || {
1532 let response =
1533 read::mined_transaction(state.latest_best_chain(), &state.db, hash);
1534
1535 // The work is done in the future.
1536 timer.finish(module_path!(), line!(), "ReadRequest::Transaction");
1537
1538 Ok(ReadResponse::Transaction(response))
1539 })
1540 })
1541 .wait_for_panics()
1542 }
1543
1544 ReadRequest::AnyChainTransaction(hash) => {
1545 let state = self.clone();
1546
1547 tokio::task::spawn_blocking(move || {
1548 span.in_scope(move || {
1549 let tx = state.non_finalized_state_receiver.with_watch_data(
1550 |non_finalized_state| {
1551 read::any_transaction(
1552 non_finalized_state.chain_iter(),
1553 &state.db,
1554 hash,
1555 )
1556 },
1557 );
1558
1559 // The work is done in the future.
1560 timer.finish(module_path!(), line!(), "ReadRequest::AnyChainTransaction");
1561
1562 Ok(ReadResponse::AnyChainTransaction(tx))
1563 })
1564 })
1565 .wait_for_panics()
1566 }
1567
1568 // Used by the getblock (verbose) RPC.
1569 ReadRequest::TransactionIdsForBlock(hash_or_height) => {
1570 let state = self.clone();
1571
1572 tokio::task::spawn_blocking(move || {
1573 span.in_scope(move || {
1574 let transaction_ids = state.non_finalized_state_receiver.with_watch_data(
1575 |non_finalized_state| {
1576 read::transaction_hashes_for_block(
1577 non_finalized_state.best_chain(),
1578 &state.db,
1579 hash_or_height,
1580 )
1581 },
1582 );
1583
1584 // The work is done in the future.
1585 timer.finish(
1586 module_path!(),
1587 line!(),
1588 "ReadRequest::TransactionIdsForBlock",
1589 );
1590
1591 Ok(ReadResponse::TransactionIdsForBlock(transaction_ids))
1592 })
1593 })
1594 .wait_for_panics()
1595 }
1596
1597 ReadRequest::AnyChainTransactionIdsForBlock(hash_or_height) => {
1598 let state = self.clone();
1599
1600 tokio::task::spawn_blocking(move || {
1601 span.in_scope(move || {
1602 let transaction_ids = state.non_finalized_state_receiver.with_watch_data(
1603 |non_finalized_state| {
1604 read::transaction_hashes_for_any_block(
1605 non_finalized_state.chain_iter(),
1606 &state.db,
1607 hash_or_height,
1608 )
1609 },
1610 );
1611
1612 // The work is done in the future.
1613 timer.finish(
1614 module_path!(),
1615 line!(),
1616 "ReadRequest::AnyChainTransactionIdsForBlock",
1617 );
1618
1619 Ok(ReadResponse::AnyChainTransactionIdsForBlock(
1620 transaction_ids,
1621 ))
1622 })
1623 })
1624 .wait_for_panics()
1625 }
1626
1627 #[cfg(feature = "indexer")]
1628 ReadRequest::SpendingTransactionId(spend) => {
1629 let state = self.clone();
1630
1631 tokio::task::spawn_blocking(move || {
1632 span.in_scope(move || {
1633 let spending_transaction_id = state
1634 .non_finalized_state_receiver
1635 .with_watch_data(|non_finalized_state| {
1636 read::spending_transaction_hash(
1637 non_finalized_state.best_chain(),
1638 &state.db,
1639 spend,
1640 )
1641 });
1642
1643 // The work is done in the future.
1644 timer.finish(
1645 module_path!(),
1646 line!(),
1647 "ReadRequest::TransactionIdForSpentOutPoint",
1648 );
1649
1650 Ok(ReadResponse::TransactionId(spending_transaction_id))
1651 })
1652 })
1653 .wait_for_panics()
1654 }
1655
1656 ReadRequest::UnspentBestChainUtxo(outpoint) => {
1657 let state = self.clone();
1658
1659 tokio::task::spawn_blocking(move || {
1660 span.in_scope(move || {
1661 let utxo = state.non_finalized_state_receiver.with_watch_data(
1662 |non_finalized_state| {
1663 read::unspent_utxo(
1664 non_finalized_state.best_chain(),
1665 &state.db,
1666 outpoint,
1667 )
1668 },
1669 );
1670
1671 // The work is done in the future.
1672 timer.finish(module_path!(), line!(), "ReadRequest::UnspentBestChainUtxo");
1673
1674 Ok(ReadResponse::UnspentBestChainUtxo(utxo))
1675 })
1676 })
1677 .wait_for_panics()
1678 }
1679
1680 // Manually used by the StateService to implement part of AwaitUtxo.
1681 ReadRequest::AnyChainUtxo(outpoint) => {
1682 let state = self.clone();
1683
1684 tokio::task::spawn_blocking(move || {
1685 span.in_scope(move || {
1686 let utxo = state.non_finalized_state_receiver.with_watch_data(
1687 |non_finalized_state| {
1688 read::any_utxo(non_finalized_state, &state.db, outpoint)
1689 },
1690 );
1691
1692 // The work is done in the future.
1693 timer.finish(module_path!(), line!(), "ReadRequest::AnyChainUtxo");
1694
1695 Ok(ReadResponse::AnyChainUtxo(utxo))
1696 })
1697 })
1698 .wait_for_panics()
1699 }
1700
1701 // Used by the StateService.
1702 ReadRequest::BlockLocator => {
1703 let state = self.clone();
1704
1705 tokio::task::spawn_blocking(move || {
1706 span.in_scope(move || {
1707 let block_locator = state.non_finalized_state_receiver.with_watch_data(
1708 |non_finalized_state| {
1709 read::block_locator(non_finalized_state.best_chain(), &state.db)
1710 },
1711 );
1712
1713 // The work is done in the future.
1714 timer.finish(module_path!(), line!(), "ReadRequest::BlockLocator");
1715
1716 Ok(ReadResponse::BlockLocator(
1717 block_locator.unwrap_or_default(),
1718 ))
1719 })
1720 })
1721 .wait_for_panics()
1722 }
1723
1724 // Used by the StateService.
1725 ReadRequest::FindBlockHashes { known_blocks, stop } => {
1726 let state = self.clone();
1727
1728 tokio::task::spawn_blocking(move || {
1729 span.in_scope(move || {
1730 let block_hashes = state.non_finalized_state_receiver.with_watch_data(
1731 |non_finalized_state| {
1732 read::find_chain_hashes(
1733 non_finalized_state.best_chain(),
1734 &state.db,
1735 known_blocks,
1736 stop,
1737 MAX_FIND_BLOCK_HASHES_RESULTS,
1738 )
1739 },
1740 );
1741
1742 // The work is done in the future.
1743 timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHashes");
1744
1745 Ok(ReadResponse::BlockHashes(block_hashes))
1746 })
1747 })
1748 .wait_for_panics()
1749 }
1750
1751 // Used by the StateService.
1752 ReadRequest::FindBlockHeaders { known_blocks, stop } => {
1753 let state = self.clone();
1754
1755 tokio::task::spawn_blocking(move || {
1756 span.in_scope(move || {
1757 let block_headers = state.non_finalized_state_receiver.with_watch_data(
1758 |non_finalized_state| {
1759 read::find_chain_headers(
1760 non_finalized_state.best_chain(),
1761 &state.db,
1762 known_blocks,
1763 stop,
1764 MAX_FIND_BLOCK_HEADERS_RESULTS,
1765 )
1766 },
1767 );
1768
1769 let block_headers = block_headers
1770 .into_iter()
1771 .map(|header| CountedHeader { header })
1772 .collect();
1773
1774 // The work is done in the future.
1775 timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHeaders");
1776
1777 Ok(ReadResponse::BlockHeaders(block_headers))
1778 })
1779 })
1780 .wait_for_panics()
1781 }
1782
1783 ReadRequest::SaplingTree(hash_or_height) => {
1784 let state = self.clone();
1785
1786 tokio::task::spawn_blocking(move || {
1787 span.in_scope(move || {
1788 let sapling_tree = state.non_finalized_state_receiver.with_watch_data(
1789 |non_finalized_state| {
1790 read::sapling_tree(
1791 non_finalized_state.best_chain(),
1792 &state.db,
1793 hash_or_height,
1794 )
1795 },
1796 );
1797
1798 // The work is done in the future.
1799 timer.finish(module_path!(), line!(), "ReadRequest::SaplingTree");
1800
1801 Ok(ReadResponse::SaplingTree(sapling_tree))
1802 })
1803 })
1804 .wait_for_panics()
1805 }
1806
1807 ReadRequest::OrchardTree(hash_or_height) => {
1808 let state = self.clone();
1809
1810 tokio::task::spawn_blocking(move || {
1811 span.in_scope(move || {
1812 let orchard_tree = state.non_finalized_state_receiver.with_watch_data(
1813 |non_finalized_state| {
1814 read::orchard_tree(
1815 non_finalized_state.best_chain(),
1816 &state.db,
1817 hash_or_height,
1818 )
1819 },
1820 );
1821
1822 // The work is done in the future.
1823 timer.finish(module_path!(), line!(), "ReadRequest::OrchardTree");
1824
1825 Ok(ReadResponse::OrchardTree(orchard_tree))
1826 })
1827 })
1828 .wait_for_panics()
1829 }
1830
1831 ReadRequest::SaplingSubtrees { start_index, limit } => {
1832 let state = self.clone();
1833
1834 tokio::task::spawn_blocking(move || {
1835 span.in_scope(move || {
1836 let end_index = limit
1837 .and_then(|limit| start_index.0.checked_add(limit.0))
1838 .map(NoteCommitmentSubtreeIndex);
1839
1840 let sapling_subtrees = state.non_finalized_state_receiver.with_watch_data(
1841 |non_finalized_state| {
1842 if let Some(end_index) = end_index {
1843 read::sapling_subtrees(
1844 non_finalized_state.best_chain(),
1845 &state.db,
1846 start_index..end_index,
1847 )
1848 } else {
1849 // If there is no end bound, just return all the trees.
1850 // If the end bound would overflow, just returns all the trees, because that's what
1851 // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1852 // the trees run out.)
1853 read::sapling_subtrees(
1854 non_finalized_state.best_chain(),
1855 &state.db,
1856 start_index..,
1857 )
1858 }
1859 },
1860 );
1861
1862 // The work is done in the future.
1863 timer.finish(module_path!(), line!(), "ReadRequest::SaplingSubtrees");
1864
1865 Ok(ReadResponse::SaplingSubtrees(sapling_subtrees))
1866 })
1867 })
1868 .wait_for_panics()
1869 }
1870
1871 ReadRequest::OrchardSubtrees { start_index, limit } => {
1872 let state = self.clone();
1873
1874 tokio::task::spawn_blocking(move || {
1875 span.in_scope(move || {
1876 let end_index = limit
1877 .and_then(|limit| start_index.0.checked_add(limit.0))
1878 .map(NoteCommitmentSubtreeIndex);
1879
1880 let orchard_subtrees = state.non_finalized_state_receiver.with_watch_data(
1881 |non_finalized_state| {
1882 if let Some(end_index) = end_index {
1883 read::orchard_subtrees(
1884 non_finalized_state.best_chain(),
1885 &state.db,
1886 start_index..end_index,
1887 )
1888 } else {
1889 // If there is no end bound, just return all the trees.
1890 // If the end bound would overflow, just returns all the trees, because that's what
1891 // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1892 // the trees run out.)
1893 read::orchard_subtrees(
1894 non_finalized_state.best_chain(),
1895 &state.db,
1896 start_index..,
1897 )
1898 }
1899 },
1900 );
1901
1902 // The work is done in the future.
1903 timer.finish(module_path!(), line!(), "ReadRequest::OrchardSubtrees");
1904
1905 Ok(ReadResponse::OrchardSubtrees(orchard_subtrees))
1906 })
1907 })
1908 .wait_for_panics()
1909 }
1910
1911 // For the get_address_balance RPC.
1912 ReadRequest::AddressBalance(addresses) => {
1913 let state = self.clone();
1914
1915 tokio::task::spawn_blocking(move || {
1916 span.in_scope(move || {
1917 let (balance, received) = state
1918 .non_finalized_state_receiver
1919 .with_watch_data(|non_finalized_state| {
1920 read::transparent_balance(
1921 non_finalized_state.best_chain().cloned(),
1922 &state.db,
1923 addresses,
1924 )
1925 })?;
1926
1927 // The work is done in the future.
1928 timer.finish(module_path!(), line!(), "ReadRequest::AddressBalance");
1929
1930 Ok(ReadResponse::AddressBalance { balance, received })
1931 })
1932 })
1933 .wait_for_panics()
1934 }
1935
1936 // For the get_address_tx_ids RPC.
1937 ReadRequest::TransactionIdsByAddresses {
1938 addresses,
1939 height_range,
1940 } => {
1941 let state = self.clone();
1942
1943 tokio::task::spawn_blocking(move || {
1944 span.in_scope(move || {
1945 let tx_ids = state.non_finalized_state_receiver.with_watch_data(
1946 |non_finalized_state| {
1947 read::transparent_tx_ids(
1948 non_finalized_state.best_chain(),
1949 &state.db,
1950 addresses,
1951 height_range,
1952 )
1953 },
1954 );
1955
1956 // The work is done in the future.
1957 timer.finish(
1958 module_path!(),
1959 line!(),
1960 "ReadRequest::TransactionIdsByAddresses",
1961 );
1962
1963 tx_ids.map(ReadResponse::AddressesTransactionIds)
1964 })
1965 })
1966 .wait_for_panics()
1967 }
1968
1969 // For the get_address_utxos RPC.
1970 ReadRequest::UtxosByAddresses(addresses) => {
1971 let state = self.clone();
1972
1973 tokio::task::spawn_blocking(move || {
1974 span.in_scope(move || {
1975 let utxos = state.non_finalized_state_receiver.with_watch_data(
1976 |non_finalized_state| {
1977 read::address_utxos(
1978 &state.network,
1979 non_finalized_state.best_chain(),
1980 &state.db,
1981 addresses,
1982 )
1983 },
1984 );
1985
1986 // The work is done in the future.
1987 timer.finish(module_path!(), line!(), "ReadRequest::UtxosByAddresses");
1988
1989 utxos.map(ReadResponse::AddressUtxos)
1990 })
1991 })
1992 .wait_for_panics()
1993 }
1994
1995 ReadRequest::CheckBestChainTipNullifiersAndAnchors(unmined_tx) => {
1996 let state = self.clone();
1997
1998 tokio::task::spawn_blocking(move || {
1999 span.in_scope(move || {
2000 let latest_non_finalized_best_chain =
2001 state.latest_non_finalized_state().best_chain().cloned();
2002
2003 check::nullifier::tx_no_duplicates_in_chain(
2004 &state.db,
2005 latest_non_finalized_best_chain.as_ref(),
2006 &unmined_tx.transaction,
2007 )?;
2008
2009 check::anchors::tx_anchors_refer_to_final_treestates(
2010 &state.db,
2011 latest_non_finalized_best_chain.as_ref(),
2012 &unmined_tx,
2013 )?;
2014
2015 // The work is done in the future.
2016 timer.finish(
2017 module_path!(),
2018 line!(),
2019 "ReadRequest::CheckBestChainTipNullifiersAndAnchors",
2020 );
2021
2022 Ok(ReadResponse::ValidBestChainTipNullifiersAndAnchors)
2023 })
2024 })
2025 .wait_for_panics()
2026 }
2027
2028 // Used by the get_block and get_block_hash RPCs.
2029 ReadRequest::BestChainBlockHash(height) => {
2030 let state = self.clone();
2031
2032 // # Performance
2033 //
2034 // Allow other async tasks to make progress while concurrently reading blocks from disk.
2035
2036 tokio::task::spawn_blocking(move || {
2037 span.in_scope(move || {
2038 let hash = state.non_finalized_state_receiver.with_watch_data(
2039 |non_finalized_state| {
2040 read::hash_by_height(
2041 non_finalized_state.best_chain(),
2042 &state.db,
2043 height,
2044 )
2045 },
2046 );
2047
2048 // The work is done in the future.
2049 timer.finish(module_path!(), line!(), "ReadRequest::BestChainBlockHash");
2050
2051 Ok(ReadResponse::BlockHash(hash))
2052 })
2053 })
2054 .wait_for_panics()
2055 }
2056
2057 // Used by get_block_template and getblockchaininfo RPCs.
2058 ReadRequest::ChainInfo => {
2059 let state = self.clone();
2060 let latest_non_finalized_state = self.latest_non_finalized_state();
2061
2062 // # Performance
2063 //
2064 // Allow other async tasks to make progress while concurrently reading blocks from disk.
2065
2066 tokio::task::spawn_blocking(move || {
2067 span.in_scope(move || {
2068 // # Correctness
2069 //
2070 // It is ok to do these lookups using multiple database calls. Finalized state updates
2071 // can only add overlapping blocks, and block hashes are unique across all chain forks.
2072 //
2073 // If there is a large overlap between the non-finalized and finalized states,
2074 // where the finalized tip is above the non-finalized tip,
2075 // Zebra is receiving a lot of blocks, or this request has been delayed for a long time.
2076 //
2077 // In that case, the `getblocktemplate` RPC will return an error because Zebra
2078 // is not synced to the tip. That check happens before the RPC makes this request.
2079 let get_block_template_info =
2080 read::difficulty::get_block_template_chain_info(
2081 &latest_non_finalized_state,
2082 &state.db,
2083 &state.network,
2084 );
2085
2086 // The work is done in the future.
2087 timer.finish(module_path!(), line!(), "ReadRequest::ChainInfo");
2088
2089 get_block_template_info.map(ReadResponse::ChainInfo)
2090 })
2091 })
2092 .wait_for_panics()
2093 }
2094
2095 // Used by getmininginfo, getnetworksolps, and getnetworkhashps RPCs.
2096 ReadRequest::SolutionRate { num_blocks, height } => {
2097 let state = self.clone();
2098
2099 // # Performance
2100 //
2101 // Allow other async tasks to make progress while concurrently reading blocks from disk.
2102
2103 tokio::task::spawn_blocking(move || {
2104 span.in_scope(move || {
2105 let latest_non_finalized_state = state.latest_non_finalized_state();
2106 // # Correctness
2107 //
2108 // It is ok to do these lookups using multiple database calls. Finalized state updates
2109 // can only add overlapping blocks, and block hashes are unique across all chain forks.
2110 //
2111 // The worst that can happen here is that the default `start_hash` will be below
2112 // the chain tip.
2113 let (tip_height, tip_hash) =
2114 match read::tip(latest_non_finalized_state.best_chain(), &state.db) {
2115 Some(tip_hash) => tip_hash,
2116 None => return Ok(ReadResponse::SolutionRate(None)),
2117 };
2118
2119 let start_hash = match height {
2120 Some(height) if height < tip_height => read::hash_by_height(
2121 latest_non_finalized_state.best_chain(),
2122 &state.db,
2123 height,
2124 ),
2125 // use the chain tip hash if height is above it or not provided.
2126 _ => Some(tip_hash),
2127 };
2128
2129 let solution_rate = start_hash.and_then(|start_hash| {
2130 read::difficulty::solution_rate(
2131 &latest_non_finalized_state,
2132 &state.db,
2133 num_blocks,
2134 start_hash,
2135 )
2136 });
2137
2138 // The work is done in the future.
2139 timer.finish(module_path!(), line!(), "ReadRequest::SolutionRate");
2140
2141 Ok(ReadResponse::SolutionRate(solution_rate))
2142 })
2143 })
2144 .wait_for_panics()
2145 }
2146
2147 ReadRequest::CheckBlockProposalValidity(semantically_verified) => {
2148 let state = self.clone();
2149
2150 // # Performance
2151 //
2152 // Allow other async tasks to make progress while concurrently reading blocks from disk.
2153
2154 tokio::task::spawn_blocking(move || {
2155 span.in_scope(move || {
2156 tracing::debug!("attempting to validate and commit block proposal onto a cloned non-finalized state");
2157 let mut latest_non_finalized_state = state.latest_non_finalized_state();
2158
2159 // The previous block of a valid proposal must be on the best chain tip.
2160 let Some((_best_tip_height, best_tip_hash)) = read::best_tip(&latest_non_finalized_state, &state.db) else {
2161 return Err("state is empty: wait for Zebra to sync before submitting a proposal".into());
2162 };
2163
2164 if semantically_verified.block.header.previous_block_hash != best_tip_hash {
2165 return Err("proposal is not based on the current best chain tip: previous block hash must be the best chain tip".into());
2166 }
2167
2168 // This clone of the non-finalized state is dropped when this closure returns.
2169 // The non-finalized state that's used in the rest of the state (including finalizing
2170 // blocks into the db) is not mutated here.
2171 //
2172 // TODO: Convert `CommitSemanticallyVerifiedError` to a new `ValidateProposalError`?
2173 latest_non_finalized_state.disable_metrics();
2174
2175 write::validate_and_commit_non_finalized(
2176 &state.db,
2177 &mut latest_non_finalized_state,
2178 semantically_verified,
2179 )?;
2180
2181 // The work is done in the future.
2182 timer.finish(
2183 module_path!(),
2184 line!(),
2185 "ReadRequest::CheckBlockProposalValidity",
2186 );
2187
2188 Ok(ReadResponse::ValidBlockProposal)
2189 })
2190 })
2191 .wait_for_panics()
2192 }
2193
2194 ReadRequest::TipBlockSize => {
2195 let state = self.clone();
2196
2197 tokio::task::spawn_blocking(move || {
2198 span.in_scope(move || {
2199 // Get the best chain tip height.
2200 let tip_height = state
2201 .non_finalized_state_receiver
2202 .with_watch_data(|non_finalized_state| {
2203 read::tip_height(non_finalized_state.best_chain(), &state.db)
2204 })
2205 .unwrap_or(Height(0));
2206
2207 // Get the block at the best chain tip height.
2208 let block = state.non_finalized_state_receiver.with_watch_data(
2209 |non_finalized_state| {
2210 read::block(
2211 non_finalized_state.best_chain(),
2212 &state.db,
2213 tip_height.into(),
2214 )
2215 },
2216 );
2217
2218 // The work is done in the future.
2219 timer.finish(module_path!(), line!(), "ReadRequest::TipBlockSize");
2220
2221 // Respond with the length of the obtained block if any.
2222 match block {
2223 Some(b) => Ok(ReadResponse::TipBlockSize(Some(
2224 b.zcash_serialize_to_vec()?.len(),
2225 ))),
2226 None => Ok(ReadResponse::TipBlockSize(None)),
2227 }
2228 })
2229 })
2230 .wait_for_panics()
2231 }
2232
2233 ReadRequest::NonFinalizedBlocksListener => {
2234 // The non-finalized blocks listener is used to notify the state service
2235 // about new blocks that have been added to the non-finalized state.
2236 let non_finalized_blocks_listener = NonFinalizedBlocksListener::spawn(
2237 self.network.clone(),
2238 self.non_finalized_state_receiver.clone(),
2239 );
2240
2241 async move {
2242 timer.finish(
2243 module_path!(),
2244 line!(),
2245 "ReadRequest::NonFinalizedBlocksListener",
2246 );
2247
2248 Ok(ReadResponse::NonFinalizedBlocksListener(
2249 non_finalized_blocks_listener,
2250 ))
2251 }
2252 .boxed()
2253 }
2254 }
2255 }
2256}
2257
2258/// Initialize a state service from the provided [`Config`].
2259/// Returns a boxed state service, a read-only state service,
2260/// and receivers for state chain tip updates.
2261///
2262/// Each `network` has its own separate on-disk database.
2263///
2264/// The state uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
2265/// to work out when it is near the final checkpoint.
2266///
2267/// To share access to the state, wrap the returned service in a `Buffer`,
2268/// or clone the returned [`ReadStateService`].
2269///
2270/// It's possible to construct multiple state services in the same application (as
2271/// long as they, e.g., use different storage locations), but doing so is
2272/// probably not what you want.
2273pub async fn init(
2274 config: Config,
2275 network: &Network,
2276 max_checkpoint_height: block::Height,
2277 checkpoint_verify_concurrency_limit: usize,
2278) -> (
2279 BoxService<Request, Response, BoxError>,
2280 ReadStateService,
2281 LatestChainTip,
2282 ChainTipChange,
2283) {
2284 let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
2285 StateService::new(
2286 config,
2287 network,
2288 max_checkpoint_height,
2289 checkpoint_verify_concurrency_limit,
2290 )
2291 .await;
2292
2293 (
2294 BoxService::new(state_service),
2295 read_only_state_service,
2296 latest_chain_tip,
2297 chain_tip_change,
2298 )
2299}
2300
2301/// Initialize a read state service from the provided [`Config`].
2302/// Returns a read-only state service,
2303///
2304/// Each `network` has its own separate on-disk database.
2305///
2306/// To share access to the state, clone the returned [`ReadStateService`].
2307pub fn init_read_only(
2308 config: Config,
2309 network: &Network,
2310) -> (
2311 ReadStateService,
2312 ZebraDb,
2313 tokio::sync::watch::Sender<NonFinalizedState>,
2314) {
2315 let finalized_state = FinalizedState::new_with_debug(
2316 &config,
2317 network,
2318 true,
2319 #[cfg(feature = "elasticsearch")]
2320 false,
2321 true,
2322 );
2323 let (non_finalized_state_sender, non_finalized_state_receiver) =
2324 tokio::sync::watch::channel(NonFinalizedState::new(network));
2325
2326 (
2327 ReadStateService::new(
2328 &finalized_state,
2329 None,
2330 WatchReceiver::new(non_finalized_state_receiver),
2331 ),
2332 finalized_state.db.clone(),
2333 non_finalized_state_sender,
2334 )
2335}
2336
2337/// Calls [`init_read_only`] with the provided [`Config`] and [`Network`] from a blocking task.
2338/// Returns a [`tokio::task::JoinHandle`] with a read state service and chain tip sender.
2339pub fn spawn_init_read_only(
2340 config: Config,
2341 network: &Network,
2342) -> tokio::task::JoinHandle<(
2343 ReadStateService,
2344 ZebraDb,
2345 tokio::sync::watch::Sender<NonFinalizedState>,
2346)> {
2347 let network = network.clone();
2348 tokio::task::spawn_blocking(move || init_read_only(config, &network))
2349}
2350
2351/// Returns a [`StateService`] with an ephemeral [`Config`] and a buffer with a single slot.
2352///
2353/// This can be used to create a state service for testing. See also [`init`].
2354#[cfg(any(test, feature = "proptest-impl"))]
2355pub async fn init_test(
2356 network: &Network,
2357) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
2358 // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
2359 // if we ever need to test final checkpoint sent UTXO queries
2360 let (state_service, _, _, _) =
2361 StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
2362
2363 Buffer::new(BoxService::new(state_service), 1)
2364}
2365
2366/// Initializes a state service with an ephemeral [`Config`] and a buffer with a single slot,
2367/// then returns the read-write service, read-only service, and tip watch channels.
2368///
2369/// This can be used to create a state service for testing. See also [`init`].
2370#[cfg(any(test, feature = "proptest-impl"))]
2371pub async fn init_test_services(
2372 network: &Network,
2373) -> (
2374 Buffer<BoxService<Request, Response, BoxError>, Request>,
2375 ReadStateService,
2376 LatestChainTip,
2377 ChainTipChange,
2378) {
2379 // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
2380 // if we ever need to test final checkpoint sent UTXO queries
2381 let (state_service, read_state_service, latest_chain_tip, chain_tip_change) =
2382 StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
2383
2384 let state_service = Buffer::new(BoxService::new(state_service), 1);
2385
2386 (
2387 state_service,
2388 read_state_service,
2389 latest_chain_tip,
2390 chain_tip_change,
2391 )
2392}