zebra_state/service.rs
1//! [`tower::Service`]s for Zebra's cached chain state.
2//!
3//! Zebra provides cached state access via two main services:
4//! - [`StateService`]: a read-write service that writes blocks to the state,
5//! and redirects most read requests to the [`ReadStateService`].
6//! - [`ReadStateService`]: a read-only service that answers from the most
7//! recent committed block.
8//!
9//! Most users should prefer [`ReadStateService`], unless they need to write blocks to the state.
10//!
11//! Zebra also provides access to the best chain tip via:
12//! - [`LatestChainTip`]: a read-only channel that contains the latest committed
13//! tip.
14//! - [`ChainTipChange`]: a read-only channel that can asynchronously await
15//! chain tip changes.
16
17use std::{
18 collections::HashMap,
19 convert,
20 future::Future,
21 pin::Pin,
22 sync::Arc,
23 task::{Context, Poll},
24 time::{Duration, Instant},
25};
26
27use futures::future::FutureExt;
28use tokio::sync::oneshot;
29use tower::{util::BoxService, Service, ServiceExt};
30use tracing::{instrument, Instrument, Span};
31
32#[cfg(any(test, feature = "proptest-impl"))]
33use tower::buffer::Buffer;
34
35use zebra_chain::{
36 block::{self, CountedHeader, HeightDiff},
37 diagnostic::{task::WaitForPanics, CodeTimer},
38 parameters::{Network, NetworkUpgrade},
39 subtree::NoteCommitmentSubtreeIndex,
40};
41
42use zebra_chain::{block::Height, serialization::ZcashSerialize};
43
44use crate::{
45 constants::{
46 MAX_FIND_BLOCK_HASHES_RESULTS, MAX_FIND_BLOCK_HEADERS_RESULTS, MAX_LEGACY_CHAIN_BLOCKS,
47 },
48 error::{InvalidateError, QueueAndCommitError, ReconsiderError},
49 response::NonFinalizedBlocksListener,
50 service::{
51 block_iter::any_ancestor_blocks,
52 chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip},
53 finalized_state::{FinalizedState, ZebraDb},
54 non_finalized_state::{Chain, NonFinalizedState},
55 pending_utxos::PendingUtxos,
56 queued_blocks::QueuedBlocks,
57 watch_receiver::WatchReceiver,
58 },
59 BoxError, CheckpointVerifiedBlock, CommitSemanticallyVerifiedError, Config, ReadRequest,
60 ReadResponse, Request, Response, SemanticallyVerifiedBlock,
61};
62
63pub mod block_iter;
64pub mod chain_tip;
65pub mod watch_receiver;
66
67pub mod check;
68
69pub(crate) mod finalized_state;
70pub(crate) mod non_finalized_state;
71mod pending_utxos;
72mod queued_blocks;
73pub(crate) mod read;
74mod write;
75
76#[cfg(any(test, feature = "proptest-impl"))]
77pub mod arbitrary;
78
79#[cfg(test)]
80mod tests;
81
82pub use finalized_state::{OutputLocation, TransactionIndex, TransactionLocation};
83use write::NonFinalizedWriteMessage;
84
85use self::queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified, SentHashes};
86
87/// A read-write service for Zebra's cached blockchain state.
88///
89/// This service modifies and provides access to:
90/// - the non-finalized state: the ~100 most recent blocks.
91/// Zebra allows chain forks in the non-finalized state,
92/// stores it in memory, and re-downloads it when restarted.
93/// - the finalized state: older blocks that have many confirmations.
94/// Zebra stores the single best chain in the finalized state,
95/// and re-loads it from disk when restarted.
96///
97/// Read requests to this service are buffered, then processed concurrently.
98/// Block write requests are buffered, then queued, then processed in order by a separate task.
99///
100/// Most state users can get faster read responses using the [`ReadStateService`],
101/// because its requests do not share a [`tower::buffer::Buffer`] with block write requests.
102///
103/// To quickly get the latest block, use [`LatestChainTip`] or [`ChainTipChange`].
104/// They can read the latest block directly, without queueing any requests.
105#[derive(Debug)]
106pub(crate) struct StateService {
107 // Configuration
108 //
109 /// The configured Zcash network.
110 network: Network,
111
112 /// The height that we start storing UTXOs from finalized blocks.
113 ///
114 /// This height should be lower than the last few checkpoints,
115 /// so the full verifier can verify UTXO spends from those blocks,
116 /// even if they haven't been committed to the finalized state yet.
117 full_verifier_utxo_lookahead: block::Height,
118
119 // Queued Blocks
120 //
121 /// Queued blocks for the [`NonFinalizedState`] that arrived out of order.
122 /// These blocks are awaiting their parent blocks before they can do contextual verification.
123 non_finalized_state_queued_blocks: QueuedBlocks,
124
125 /// Queued blocks for the [`FinalizedState`] that arrived out of order.
126 /// These blocks are awaiting their parent blocks before they can do contextual verification.
127 ///
128 /// Indexed by their parent block hash.
129 finalized_state_queued_blocks: HashMap<block::Hash, QueuedCheckpointVerified>,
130
131 /// Channels to send blocks to the block write task.
132 block_write_sender: write::BlockWriteSender,
133
134 /// The [`block::Hash`] of the most recent block sent on
135 /// `finalized_block_write_sender` or `non_finalized_block_write_sender`.
136 ///
137 /// On startup, this is:
138 /// - the finalized tip, if there are stored blocks, or
139 /// - the genesis block's parent hash, if the database is empty.
140 ///
141 /// If `invalid_block_write_reset_receiver` gets a reset, this is:
142 /// - the hash of the last valid committed block (the parent of the invalid block).
143 finalized_block_write_last_sent_hash: block::Hash,
144
145 /// A set of block hashes that have been sent to the block write task.
146 /// Hashes of blocks below the finalized tip height are periodically pruned.
147 non_finalized_block_write_sent_hashes: SentHashes,
148
149 /// If an invalid block is sent on `finalized_block_write_sender`
150 /// or `non_finalized_block_write_sender`,
151 /// this channel gets the [`block::Hash`] of the valid tip.
152 //
153 // TODO: add tests for finalized and non-finalized resets (#2654)
154 invalid_block_write_reset_receiver: tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
155
156 // Pending UTXO Request Tracking
157 //
158 /// The set of outpoints with pending requests for their associated transparent::Output.
159 pending_utxos: PendingUtxos,
160
161 /// Instant tracking the last time `pending_utxos` was pruned.
162 last_prune: Instant,
163
164 // Updating Concurrently Readable State
165 //
166 /// A cloneable [`ReadStateService`], used to answer concurrent read requests.
167 ///
168 /// TODO: move users of read [`Request`]s to [`ReadStateService`], and remove `read_service`.
169 read_service: ReadStateService,
170
171 // Metrics
172 //
173 /// A metric tracking the maximum height that's currently in `finalized_state_queued_blocks`
174 ///
175 /// Set to `f64::NAN` if `finalized_state_queued_blocks` is empty, because grafana shows NaNs
176 /// as a break in the graph.
177 max_finalized_queue_height: f64,
178}
179
180/// A read-only service for accessing Zebra's cached blockchain state.
181///
182/// This service provides read-only access to:
183/// - the non-finalized state: the ~100 most recent blocks.
184/// - the finalized state: older blocks that have many confirmations.
185///
186/// Requests to this service are processed in parallel,
187/// ignoring any blocks queued by the read-write [`StateService`].
188///
189/// This quick response behavior is better for most state users.
190/// It allows other async tasks to make progress while concurrently reading data from disk.
191#[derive(Clone, Debug)]
192pub struct ReadStateService {
193 // Configuration
194 //
195 /// The configured Zcash network.
196 network: Network,
197
198 // Shared Concurrently Readable State
199 //
200 /// A watch channel with a cached copy of the [`NonFinalizedState`].
201 ///
202 /// This state is only updated between requests,
203 /// so it might include some block data that is also on `disk`.
204 non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
205
206 /// The shared inner on-disk database for the finalized state.
207 ///
208 /// RocksDB allows reads and writes via a shared reference,
209 /// but [`ZebraDb`] doesn't expose any write methods or types.
210 ///
211 /// This chain is updated concurrently with requests,
212 /// so it might include some block data that is also in `best_mem`.
213 db: ZebraDb,
214
215 /// A shared handle to a task that writes blocks to the [`NonFinalizedState`] or [`FinalizedState`],
216 /// once the queues have received all their parent blocks.
217 ///
218 /// Used to check for panics when writing blocks.
219 block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
220}
221
222impl Drop for StateService {
223 fn drop(&mut self) {
224 // The state service owns the state, tasks, and channels,
225 // so dropping it should shut down everything.
226
227 // Close the channels (non-blocking)
228 // This makes the block write thread exit the next time it checks the channels.
229 // We want to do this here so we get any errors or panics from the block write task before it shuts down.
230 self.invalid_block_write_reset_receiver.close();
231
232 std::mem::drop(self.block_write_sender.finalized.take());
233 std::mem::drop(self.block_write_sender.non_finalized.take());
234
235 self.clear_finalized_block_queue(
236 "dropping the state: dropped unused finalized state queue block",
237 );
238 self.clear_non_finalized_block_queue(CommitSemanticallyVerifiedError::WriteTaskExited);
239
240 // Log database metrics before shutting down
241 info!("dropping the state: logging database metrics");
242 self.log_db_metrics();
243
244 // Then drop self.read_service, which checks the block write task for panics,
245 // and tries to shut down the database.
246 }
247}
248
249impl Drop for ReadStateService {
250 fn drop(&mut self) {
251 // The read state service shares the state,
252 // so dropping it should check if we can shut down.
253
254 // TODO: move this into a try_shutdown() method
255 if let Some(block_write_task) = self.block_write_task.take() {
256 if let Some(block_write_task_handle) = Arc::into_inner(block_write_task) {
257 // We're the last database user, so we can tell it to shut down (blocking):
258 // - flushes the database to disk, and
259 // - drops the database, which cleans up any database tasks correctly.
260 self.db.shutdown(true);
261
262 // We are the last state with a reference to this thread, so we can
263 // wait until the block write task finishes, then check for panics (blocking).
264 // (We'd also like to abort the thread, but std::thread::JoinHandle can't do that.)
265
266 // This log is verbose during tests.
267 #[cfg(not(test))]
268 info!("waiting for the block write task to finish");
269 #[cfg(test)]
270 debug!("waiting for the block write task to finish");
271
272 // TODO: move this into a check_for_panics() method
273 if let Err(thread_panic) = block_write_task_handle.join() {
274 std::panic::resume_unwind(thread_panic);
275 } else {
276 debug!("shutting down the state because the block write task has finished");
277 }
278 }
279 } else {
280 // Even if we're not the last database user, try shutting it down.
281 //
282 // TODO: rename this to try_shutdown()?
283 self.db.shutdown(false);
284 }
285 }
286}
287
288impl StateService {
289 const PRUNE_INTERVAL: Duration = Duration::from_secs(30);
290
291 /// Creates a new state service for the state `config` and `network`.
292 ///
293 /// Uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
294 /// to work out when it is near the final checkpoint.
295 ///
296 /// Returns the read-write and read-only state services,
297 /// and read-only watch channels for its best chain tip.
298 pub async fn new(
299 config: Config,
300 network: &Network,
301 max_checkpoint_height: block::Height,
302 checkpoint_verify_concurrency_limit: usize,
303 ) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
304 let (finalized_state, finalized_tip, timer) = {
305 let config = config.clone();
306 let network = network.clone();
307 tokio::task::spawn_blocking(move || {
308 let timer = CodeTimer::start();
309 let finalized_state = FinalizedState::new(
310 &config,
311 &network,
312 #[cfg(feature = "elasticsearch")]
313 true,
314 );
315 timer.finish(module_path!(), line!(), "opening finalized state database");
316
317 let timer = CodeTimer::start();
318 let finalized_tip = finalized_state.db.tip_block();
319
320 (finalized_state, finalized_tip, timer)
321 })
322 .await
323 .expect("failed to join blocking task")
324 };
325
326 // # Correctness
327 //
328 // The state service must set the finalized block write sender to `None`
329 // if there are blocks in the restored non-finalized state that are above
330 // the max checkpoint height so that non-finalized blocks can be written, otherwise,
331 // Zebra will be unable to commit semantically verified blocks, and its chain sync will stall.
332 //
333 // The state service must not set the finalized block write sender to `None` if there
334 // aren't blocks in the restored non-finalized state that are above the max checkpoint height,
335 // otherwise, unless checkpoint sync is disabled in the zebra-consensus configuration,
336 // Zebra will be unable to commit checkpoint verified blocks, and its chain sync will stall.
337 let is_finalized_tip_past_max_checkpoint = if let Some(tip) = &finalized_tip {
338 tip.coinbase_height().expect("valid block must have height") >= max_checkpoint_height
339 } else {
340 false
341 };
342 let (non_finalized_state, non_finalized_state_sender, non_finalized_state_receiver) =
343 NonFinalizedState::new(network)
344 .with_backup(
345 config.non_finalized_state_backup_dir(network),
346 &finalized_state.db,
347 is_finalized_tip_past_max_checkpoint,
348 )
349 .await;
350
351 let non_finalized_block_write_sent_hashes = SentHashes::new(&non_finalized_state);
352 let initial_tip = non_finalized_state
353 .best_tip_block()
354 .map(|cv_block| cv_block.block.clone())
355 .or(finalized_tip)
356 .map(CheckpointVerifiedBlock::from)
357 .map(ChainTipBlock::from);
358
359 tracing::info!(chain_tip = ?initial_tip.as_ref().map(|tip| (tip.hash, tip.height)), "loaded Zebra state cache");
360
361 let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
362 ChainTipSender::new(initial_tip, network);
363
364 let finalized_state_for_writing = finalized_state.clone();
365 let should_use_finalized_block_write_sender = non_finalized_state.is_chain_set_empty();
366 let (block_write_sender, invalid_block_write_reset_receiver, block_write_task) =
367 write::BlockWriteSender::spawn(
368 finalized_state_for_writing,
369 non_finalized_state,
370 chain_tip_sender,
371 non_finalized_state_sender,
372 should_use_finalized_block_write_sender,
373 );
374
375 let read_service = ReadStateService::new(
376 &finalized_state,
377 block_write_task,
378 non_finalized_state_receiver,
379 );
380
381 let full_verifier_utxo_lookahead = max_checkpoint_height
382 - HeightDiff::try_from(checkpoint_verify_concurrency_limit)
383 .expect("fits in HeightDiff");
384 let full_verifier_utxo_lookahead =
385 full_verifier_utxo_lookahead.unwrap_or(block::Height::MIN);
386 let non_finalized_state_queued_blocks = QueuedBlocks::default();
387 let pending_utxos = PendingUtxos::default();
388
389 let finalized_block_write_last_sent_hash =
390 tokio::task::spawn_blocking(move || finalized_state.db.finalized_tip_hash())
391 .await
392 .expect("failed to join blocking task");
393
394 let state = Self {
395 network: network.clone(),
396 full_verifier_utxo_lookahead,
397 non_finalized_state_queued_blocks,
398 finalized_state_queued_blocks: HashMap::new(),
399 block_write_sender,
400 finalized_block_write_last_sent_hash,
401 non_finalized_block_write_sent_hashes,
402 invalid_block_write_reset_receiver,
403 pending_utxos,
404 last_prune: Instant::now(),
405 read_service: read_service.clone(),
406 max_finalized_queue_height: f64::NAN,
407 };
408 timer.finish(module_path!(), line!(), "initializing state service");
409
410 tracing::info!("starting legacy chain check");
411 let timer = CodeTimer::start();
412
413 if let (Some(tip), Some(nu5_activation_height)) = (
414 {
415 let read_state = state.read_service.clone();
416 tokio::task::spawn_blocking(move || read_state.best_tip())
417 .await
418 .expect("task should not panic")
419 },
420 NetworkUpgrade::Nu5.activation_height(network),
421 ) {
422 if let Err(error) = check::legacy_chain(
423 nu5_activation_height,
424 any_ancestor_blocks(
425 &state.read_service.latest_non_finalized_state(),
426 &state.read_service.db,
427 tip.1,
428 ),
429 &state.network,
430 MAX_LEGACY_CHAIN_BLOCKS,
431 ) {
432 let legacy_db_path = state.read_service.db.path().to_path_buf();
433 panic!(
434 "Cached state contains a legacy chain.\n\
435 An outdated Zebra version did not know about a recent network upgrade,\n\
436 so it followed a legacy chain using outdated consensus branch rules.\n\
437 Hint: Delete your database, and restart Zebra to do a full sync.\n\
438 Database path: {legacy_db_path:?}\n\
439 Error: {error:?}",
440 );
441 }
442 }
443
444 tracing::info!("cached state consensus branch is valid: no legacy chain found");
445 timer.finish(module_path!(), line!(), "legacy chain check");
446
447 (state, read_service, latest_chain_tip, chain_tip_change)
448 }
449
450 /// Call read only state service to log rocksdb database metrics.
451 pub fn log_db_metrics(&self) {
452 self.read_service.db.print_db_metrics();
453 }
454
455 /// Queue a checkpoint verified block for verification and storage in the finalized state.
456 ///
457 /// Returns a channel receiver that provides the result of the block commit.
458 fn queue_and_commit_to_finalized_state(
459 &mut self,
460 checkpoint_verified: CheckpointVerifiedBlock,
461 ) -> oneshot::Receiver<Result<block::Hash, BoxError>> {
462 // # Correctness & Performance
463 //
464 // This method must not block, access the database, or perform CPU-intensive tasks,
465 // because it is called directly from the tokio executor's Future threads.
466
467 let queued_prev_hash = checkpoint_verified.block.header.previous_block_hash;
468 let queued_height = checkpoint_verified.height;
469
470 // If we're close to the final checkpoint, make the block's UTXOs available for
471 // semantic block verification, even when it is in the channel.
472 if self.is_close_to_final_checkpoint(queued_height) {
473 self.non_finalized_block_write_sent_hashes
474 .add_finalized(&checkpoint_verified)
475 }
476
477 let (rsp_tx, rsp_rx) = oneshot::channel();
478 let queued = (checkpoint_verified, rsp_tx);
479
480 if self.block_write_sender.finalized.is_some() {
481 // We're still committing checkpoint verified blocks
482 if let Some(duplicate_queued) = self
483 .finalized_state_queued_blocks
484 .insert(queued_prev_hash, queued)
485 {
486 Self::send_checkpoint_verified_block_error(
487 duplicate_queued,
488 "dropping older checkpoint verified block: got newer duplicate block",
489 );
490 }
491
492 self.drain_finalized_queue_and_commit();
493 } else {
494 // We've finished committing checkpoint verified blocks to the finalized state,
495 // so drop any repeated queued blocks, and return an error.
496 //
497 // TODO: track the latest sent height, and drop any blocks under that height
498 // every time we send some blocks (like QueuedSemanticallyVerifiedBlocks)
499 Self::send_checkpoint_verified_block_error(
500 queued,
501 "already finished committing checkpoint verified blocks: dropped duplicate block, \
502 block is already committed to the state",
503 );
504
505 self.clear_finalized_block_queue(
506 "already finished committing checkpoint verified blocks: dropped duplicate block, \
507 block is already committed to the state",
508 );
509 }
510
511 if self.finalized_state_queued_blocks.is_empty() {
512 self.max_finalized_queue_height = f64::NAN;
513 } else if self.max_finalized_queue_height.is_nan()
514 || self.max_finalized_queue_height < queued_height.0 as f64
515 {
516 // if there are still blocks in the queue, then either:
517 // - the new block was lower than the old maximum, and there was a gap before it,
518 // so the maximum is still the same (and we skip this code), or
519 // - the new block is higher than the old maximum, and there is at least one gap
520 // between the finalized tip and the new maximum
521 self.max_finalized_queue_height = queued_height.0 as f64;
522 }
523
524 metrics::gauge!("state.checkpoint.queued.max.height").set(self.max_finalized_queue_height);
525 metrics::gauge!("state.checkpoint.queued.block.count")
526 .set(self.finalized_state_queued_blocks.len() as f64);
527
528 rsp_rx
529 }
530
531 /// Finds finalized state queue blocks to be committed to the state in order,
532 /// removes them from the queue, and sends them to the block commit task.
533 ///
534 /// After queueing a finalized block, this method checks whether the newly
535 /// queued block (and any of its descendants) can be committed to the state.
536 ///
537 /// Returns an error if the block commit channel has been closed.
538 pub fn drain_finalized_queue_and_commit(&mut self) {
539 use tokio::sync::mpsc::error::{SendError, TryRecvError};
540
541 // # Correctness & Performance
542 //
543 // This method must not block, access the database, or perform CPU-intensive tasks,
544 // because it is called directly from the tokio executor's Future threads.
545
546 // If a block failed, we need to start again from a valid tip.
547 match self.invalid_block_write_reset_receiver.try_recv() {
548 Ok(reset_tip_hash) => self.finalized_block_write_last_sent_hash = reset_tip_hash,
549 Err(TryRecvError::Disconnected) => {
550 info!("Block commit task closed the block reset channel. Is Zebra shutting down?");
551 return;
552 }
553 // There are no errors, so we can just use the last block hash we sent
554 Err(TryRecvError::Empty) => {}
555 }
556
557 while let Some(queued_block) = self
558 .finalized_state_queued_blocks
559 .remove(&self.finalized_block_write_last_sent_hash)
560 {
561 let last_sent_finalized_block_height = queued_block.0.height;
562
563 self.finalized_block_write_last_sent_hash = queued_block.0.hash;
564
565 // If we've finished sending finalized blocks, ignore any repeated blocks.
566 // (Blocks can be repeated after a syncer reset.)
567 if let Some(finalized_block_write_sender) = &self.block_write_sender.finalized {
568 let send_result = finalized_block_write_sender.send(queued_block);
569
570 // If the receiver is closed, we can't send any more blocks.
571 if let Err(SendError(queued)) = send_result {
572 // If Zebra is shutting down, drop blocks and return an error.
573 Self::send_checkpoint_verified_block_error(
574 queued,
575 "block commit task exited. Is Zebra shutting down?",
576 );
577
578 self.clear_finalized_block_queue(
579 "block commit task exited. Is Zebra shutting down?",
580 );
581 } else {
582 metrics::gauge!("state.checkpoint.sent.block.height")
583 .set(last_sent_finalized_block_height.0 as f64);
584 };
585 }
586 }
587 }
588
589 /// Drops all finalized state queue blocks, and sends an error on their result channels.
590 fn clear_finalized_block_queue(&mut self, error: impl Into<BoxError> + Clone) {
591 for (_hash, queued) in self.finalized_state_queued_blocks.drain() {
592 Self::send_checkpoint_verified_block_error(queued, error.clone());
593 }
594 }
595
596 /// Send an error on a `QueuedCheckpointVerified` block's result channel, and drop the block
597 fn send_checkpoint_verified_block_error(
598 queued: QueuedCheckpointVerified,
599 error: impl Into<BoxError>,
600 ) {
601 let (finalized, rsp_tx) = queued;
602
603 // The block sender might have already given up on this block,
604 // so ignore any channel send errors.
605 let _ = rsp_tx.send(Err(error.into()));
606 std::mem::drop(finalized);
607 }
608
609 /// Drops all non-finalized state queue blocks, and sends an error on their result channels.
610 fn clear_non_finalized_block_queue(&mut self, error: CommitSemanticallyVerifiedError) {
611 for (_hash, queued) in self.non_finalized_state_queued_blocks.drain() {
612 Self::send_semantically_verified_block_error(queued, error.clone());
613 }
614 }
615
616 /// Send an error on a `QueuedSemanticallyVerified` block's result channel, and drop the block
617 fn send_semantically_verified_block_error(
618 queued: QueuedSemanticallyVerified,
619 error: CommitSemanticallyVerifiedError,
620 ) {
621 let (finalized, rsp_tx) = queued;
622
623 // The block sender might have already given up on this block,
624 // so ignore any channel send errors.
625 let _ = rsp_tx.send(Err(error));
626 std::mem::drop(finalized);
627 }
628
629 /// Queue a semantically verified block for contextual verification and check if any queued
630 /// blocks are ready to be verified and committed to the state.
631 ///
632 /// This function encodes the logic for [committing non-finalized blocks][1]
633 /// in RFC0005.
634 ///
635 /// [1]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html#committing-non-finalized-blocks
636 #[instrument(level = "debug", skip(self, semantically_verified))]
637 fn queue_and_commit_to_non_finalized_state(
638 &mut self,
639 semantically_verified: SemanticallyVerifiedBlock,
640 ) -> oneshot::Receiver<Result<block::Hash, CommitSemanticallyVerifiedError>> {
641 tracing::debug!(block = %semantically_verified.block, "queueing block for contextual verification");
642 let parent_hash = semantically_verified.block.header.previous_block_hash;
643
644 if self
645 .non_finalized_block_write_sent_hashes
646 .contains(&semantically_verified.hash)
647 {
648 let (rsp_tx, rsp_rx) = oneshot::channel();
649 let _ = rsp_tx.send(Err(QueueAndCommitError::new_duplicate(
650 semantically_verified.hash,
651 )
652 .into()));
653 return rsp_rx;
654 }
655
656 if self
657 .read_service
658 .db
659 .contains_height(semantically_verified.height)
660 {
661 let (rsp_tx, rsp_rx) = oneshot::channel();
662 let _ = rsp_tx.send(Err(QueueAndCommitError::new_already_finalized(
663 semantically_verified.height,
664 )
665 .into()));
666 return rsp_rx;
667 }
668
669 // [`Request::CommitSemanticallyVerifiedBlock`] contract: a request to commit a block which
670 // has been queued but not yet committed to the state fails the older request and replaces
671 // it with the newer request.
672 let rsp_rx = if let Some((_, old_rsp_tx)) = self
673 .non_finalized_state_queued_blocks
674 .get_mut(&semantically_verified.hash)
675 {
676 tracing::debug!("replacing older queued request with new request");
677 let (mut rsp_tx, rsp_rx) = oneshot::channel();
678 std::mem::swap(old_rsp_tx, &mut rsp_tx);
679 let _ = rsp_tx.send(Err(QueueAndCommitError::new_replaced(
680 semantically_verified.hash,
681 )
682 .into()));
683 rsp_rx
684 } else {
685 let (rsp_tx, rsp_rx) = oneshot::channel();
686 self.non_finalized_state_queued_blocks
687 .queue((semantically_verified, rsp_tx));
688 rsp_rx
689 };
690
691 // We've finished sending checkpoint verified blocks when:
692 // - we've sent the verified block for the last checkpoint, and
693 // - it has been successfully written to disk.
694 //
695 // We detect the last checkpoint by looking for non-finalized blocks
696 // that are a child of the last block we sent.
697 //
698 // TODO: configure the state with the last checkpoint hash instead?
699 if self.block_write_sender.finalized.is_some()
700 && self
701 .non_finalized_state_queued_blocks
702 .has_queued_children(self.finalized_block_write_last_sent_hash)
703 && self.read_service.db.finalized_tip_hash()
704 == self.finalized_block_write_last_sent_hash
705 {
706 // Tell the block write task to stop committing checkpoint verified blocks to the finalized state,
707 // and move on to committing semantically verified blocks to the non-finalized state.
708 std::mem::drop(self.block_write_sender.finalized.take());
709 // Remove any checkpoint-verified block hashes from `non_finalized_block_write_sent_hashes`.
710 self.non_finalized_block_write_sent_hashes = SentHashes::default();
711 // Mark `SentHashes` as usable by the `can_fork_chain_at()` method.
712 self.non_finalized_block_write_sent_hashes
713 .can_fork_chain_at_hashes = true;
714 // Send blocks from non-finalized queue
715 self.send_ready_non_finalized_queued(self.finalized_block_write_last_sent_hash);
716 // We've finished committing checkpoint verified blocks to finalized state, so drop any repeated queued blocks.
717 self.clear_finalized_block_queue(
718 "already finished committing checkpoint verified blocks: dropped duplicate block, \
719 block is already committed to the state",
720 );
721 } else if !self.can_fork_chain_at(&parent_hash) {
722 tracing::trace!("unready to verify, returning early");
723 } else if self.block_write_sender.finalized.is_none() {
724 // Wait until block commit task is ready to write non-finalized blocks before dequeuing them
725 self.send_ready_non_finalized_queued(parent_hash);
726
727 let finalized_tip_height = self.read_service.db.finalized_tip_height().expect(
728 "Finalized state must have at least one block before committing non-finalized state",
729 );
730
731 self.non_finalized_state_queued_blocks
732 .prune_by_height(finalized_tip_height);
733
734 self.non_finalized_block_write_sent_hashes
735 .prune_by_height(finalized_tip_height);
736 }
737
738 rsp_rx
739 }
740
741 /// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks.
742 fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
743 self.non_finalized_block_write_sent_hashes
744 .can_fork_chain_at(hash)
745 || &self.read_service.db.finalized_tip_hash() == hash
746 }
747
748 /// Returns `true` if `queued_height` is near the final checkpoint.
749 ///
750 /// The semantic block verifier needs access to UTXOs from checkpoint verified blocks
751 /// near the final checkpoint, so that it can verify blocks that spend those UTXOs.
752 ///
753 /// If it doesn't have the required UTXOs, some blocks will time out,
754 /// but succeed after a syncer restart.
755 fn is_close_to_final_checkpoint(&self, queued_height: block::Height) -> bool {
756 queued_height >= self.full_verifier_utxo_lookahead
757 }
758
759 /// Sends all queued blocks whose parents have recently arrived starting from `new_parent`
760 /// in breadth-first ordering to the block write task which will attempt to validate and commit them
761 #[tracing::instrument(level = "debug", skip(self, new_parent))]
762 fn send_ready_non_finalized_queued(&mut self, new_parent: block::Hash) {
763 use tokio::sync::mpsc::error::SendError;
764 if let Some(non_finalized_block_write_sender) = &self.block_write_sender.non_finalized {
765 let mut new_parents: Vec<block::Hash> = vec![new_parent];
766
767 while let Some(parent_hash) = new_parents.pop() {
768 let queued_children = self
769 .non_finalized_state_queued_blocks
770 .dequeue_children(parent_hash);
771
772 for queued_child in queued_children {
773 let (SemanticallyVerifiedBlock { hash, .. }, _) = queued_child;
774
775 self.non_finalized_block_write_sent_hashes
776 .add(&queued_child.0);
777 let send_result = non_finalized_block_write_sender.send(queued_child.into());
778
779 if let Err(SendError(NonFinalizedWriteMessage::Commit(queued))) = send_result {
780 // If Zebra is shutting down, drop blocks and return an error.
781 Self::send_semantically_verified_block_error(
782 queued,
783 CommitSemanticallyVerifiedError::WriteTaskExited,
784 );
785
786 self.clear_non_finalized_block_queue(
787 CommitSemanticallyVerifiedError::WriteTaskExited,
788 );
789
790 return;
791 };
792
793 new_parents.push(hash);
794 }
795 }
796
797 self.non_finalized_block_write_sent_hashes.finish_batch();
798 };
799 }
800
801 /// Return the tip of the current best chain.
802 pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
803 self.read_service.best_tip()
804 }
805
806 fn send_invalidate_block(
807 &self,
808 hash: block::Hash,
809 ) -> oneshot::Receiver<Result<block::Hash, InvalidateError>> {
810 let (rsp_tx, rsp_rx) = oneshot::channel();
811
812 let Some(sender) = &self.block_write_sender.non_finalized else {
813 let _ = rsp_tx.send(Err(InvalidateError::ProcessingCheckpointedBlocks));
814 return rsp_rx;
815 };
816
817 if let Err(tokio::sync::mpsc::error::SendError(error)) =
818 sender.send(NonFinalizedWriteMessage::Invalidate { hash, rsp_tx })
819 {
820 let NonFinalizedWriteMessage::Invalidate { rsp_tx, .. } = error else {
821 unreachable!("should return the same Invalidate message could not be sent");
822 };
823
824 let _ = rsp_tx.send(Err(InvalidateError::SendInvalidateRequestFailed));
825 }
826
827 rsp_rx
828 }
829
830 fn send_reconsider_block(
831 &self,
832 hash: block::Hash,
833 ) -> oneshot::Receiver<Result<Vec<block::Hash>, ReconsiderError>> {
834 let (rsp_tx, rsp_rx) = oneshot::channel();
835
836 let Some(sender) = &self.block_write_sender.non_finalized else {
837 let _ = rsp_tx.send(Err(ReconsiderError::CheckpointCommitInProgress));
838 return rsp_rx;
839 };
840
841 if let Err(tokio::sync::mpsc::error::SendError(error)) =
842 sender.send(NonFinalizedWriteMessage::Reconsider { hash, rsp_tx })
843 {
844 let NonFinalizedWriteMessage::Reconsider { rsp_tx, .. } = error else {
845 unreachable!("should return the same Reconsider message could not be sent");
846 };
847
848 let _ = rsp_tx.send(Err(ReconsiderError::ReconsiderSendFailed));
849 }
850
851 rsp_rx
852 }
853
854 /// Assert some assumptions about the semantically verified `block` before it is queued.
855 fn assert_block_can_be_validated(&self, block: &SemanticallyVerifiedBlock) {
856 // required by `Request::CommitSemanticallyVerifiedBlock` call
857 assert!(
858 block.height > self.network.mandatory_checkpoint_height(),
859 "invalid semantically verified block height: the canopy checkpoint is mandatory, pre-canopy \
860 blocks, and the canopy activation block, must be committed to the state as finalized \
861 blocks"
862 );
863 }
864}
865
866impl ReadStateService {
867 /// Creates a new read-only state service, using the provided finalized state and
868 /// block write task handle.
869 ///
870 /// Returns the newly created service,
871 /// and a watch channel for updating the shared recent non-finalized chain.
872 pub(crate) fn new(
873 finalized_state: &FinalizedState,
874 block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
875 non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
876 ) -> Self {
877 let read_service = Self {
878 network: finalized_state.network(),
879 db: finalized_state.db.clone(),
880 non_finalized_state_receiver,
881 block_write_task,
882 };
883
884 tracing::debug!("created new read-only state service");
885
886 read_service
887 }
888
889 /// Return the tip of the current best chain.
890 pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
891 read::best_tip(&self.latest_non_finalized_state(), &self.db)
892 }
893
894 /// Gets a clone of the latest non-finalized state from the `non_finalized_state_receiver`
895 fn latest_non_finalized_state(&self) -> NonFinalizedState {
896 self.non_finalized_state_receiver.cloned_watch_data()
897 }
898
899 /// Gets a clone of the latest, best non-finalized chain from the `non_finalized_state_receiver`
900 #[allow(dead_code)]
901 fn latest_best_chain(&self) -> Option<Arc<Chain>> {
902 self.latest_non_finalized_state().best_chain().cloned()
903 }
904
905 /// Test-only access to the inner database.
906 /// Can be used to modify the database without doing any consensus checks.
907 #[cfg(any(test, feature = "proptest-impl"))]
908 pub fn db(&self) -> &ZebraDb {
909 &self.db
910 }
911
912 /// Logs rocksdb metrics using the read only state service.
913 pub fn log_db_metrics(&self) {
914 self.db.print_db_metrics();
915 }
916}
917
918impl Service<Request> for StateService {
919 type Response = Response;
920 type Error = BoxError;
921 type Future =
922 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
923
924 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
925 // Check for panics in the block write task
926 let poll = self.read_service.poll_ready(cx);
927
928 // Prune outdated UTXO requests
929 let now = Instant::now();
930
931 if self.last_prune + Self::PRUNE_INTERVAL < now {
932 let tip = self.best_tip();
933 let old_len = self.pending_utxos.len();
934
935 self.pending_utxos.prune();
936 self.last_prune = now;
937
938 let new_len = self.pending_utxos.len();
939 let prune_count = old_len
940 .checked_sub(new_len)
941 .expect("prune does not add any utxo requests");
942 if prune_count > 0 {
943 tracing::debug!(
944 ?old_len,
945 ?new_len,
946 ?prune_count,
947 ?tip,
948 "pruned utxo requests"
949 );
950 } else {
951 tracing::debug!(len = ?old_len, ?tip, "no utxo requests needed pruning");
952 }
953 }
954
955 poll
956 }
957
958 #[instrument(name = "state", skip(self, req))]
959 fn call(&mut self, req: Request) -> Self::Future {
960 req.count_metric();
961 let timer = CodeTimer::start();
962 let span = Span::current();
963
964 match req {
965 // Uses non_finalized_state_queued_blocks and pending_utxos in the StateService
966 // Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb.
967 //
968 // The expected error type for this request is `CommitSemanticallyVerifiedError`.
969 Request::CommitSemanticallyVerifiedBlock(semantically_verified) => {
970 self.assert_block_can_be_validated(&semantically_verified);
971
972 self.pending_utxos
973 .check_against_ordered(&semantically_verified.new_outputs);
974
975 // # Performance
976 //
977 // Allow other async tasks to make progress while blocks are being verified
978 // and written to disk. But wait for the blocks to finish committing,
979 // so that `StateService` multi-block queries always observe a consistent state.
980 //
981 // Since each block is spawned into its own task,
982 // there shouldn't be any other code running in the same task,
983 // so we don't need to worry about blocking it:
984 // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
985
986 let rsp_rx = tokio::task::block_in_place(move || {
987 span.in_scope(|| {
988 self.queue_and_commit_to_non_finalized_state(semantically_verified)
989 })
990 });
991
992 // TODO:
993 // - check for panics in the block write task here,
994 // as well as in poll_ready()
995
996 // The work is all done, the future just waits on a channel for the result
997 timer.finish(module_path!(), line!(), "CommitSemanticallyVerifiedBlock");
998
999 // Await the channel response, flatten the result, map receive errors to
1000 // `CommitSemanticallyVerifiedError::WriteTaskExited`.
1001 // Then flatten the nested Result and convert any errors to a BoxError.
1002 let span = Span::current();
1003 async move {
1004 rsp_rx
1005 .await
1006 .map_err(|_recv_error| CommitSemanticallyVerifiedError::WriteTaskExited)
1007 .flatten()
1008 .map_err(BoxError::from)
1009 .map(Response::Committed)
1010 }
1011 .instrument(span)
1012 .boxed()
1013 }
1014
1015 // Uses finalized_state_queued_blocks and pending_utxos in the StateService.
1016 // Accesses shared writeable state in the StateService.
1017 Request::CommitCheckpointVerifiedBlock(finalized) => {
1018 // # Consensus
1019 //
1020 // A semantic block verification could have called AwaitUtxo
1021 // before this checkpoint verified block arrived in the state.
1022 // So we need to check for pending UTXO requests sent by running
1023 // semantic block verifications.
1024 //
1025 // This check is redundant for most checkpoint verified blocks,
1026 // because semantic verification can only succeed near the final
1027 // checkpoint, when all the UTXOs are available for the verifying block.
1028 //
1029 // (Checkpoint block UTXOs are verified using block hash checkpoints
1030 // and transaction merkle tree block header commitments.)
1031 self.pending_utxos
1032 .check_against_ordered(&finalized.new_outputs);
1033
1034 // # Performance
1035 //
1036 // This method doesn't block, access the database, or perform CPU-intensive tasks,
1037 // so we can run it directly in the tokio executor's Future threads.
1038 let rsp_rx = self.queue_and_commit_to_finalized_state(finalized);
1039
1040 // TODO:
1041 // - check for panics in the block write task here,
1042 // as well as in poll_ready()
1043
1044 // The work is all done, the future just waits on a channel for the result
1045 timer.finish(module_path!(), line!(), "CommitCheckpointVerifiedBlock");
1046
1047 async move {
1048 rsp_rx
1049 .await
1050 .map_err(|_recv_error| {
1051 BoxError::from("block was dropped from the queue of finalized blocks")
1052 })
1053 // TODO: replace with Result::flatten once it stabilises
1054 // https://github.com/rust-lang/rust/issues/70142
1055 .and_then(convert::identity)
1056 .map(Response::Committed)
1057 }
1058 .instrument(span)
1059 .boxed()
1060 }
1061
1062 // Uses pending_utxos and non_finalized_state_queued_blocks in the StateService.
1063 // If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService.
1064 Request::AwaitUtxo(outpoint) => {
1065 // Prepare the AwaitUtxo future from PendingUxtos.
1066 let response_fut = self.pending_utxos.queue(outpoint);
1067 // Only instrument `response_fut`, the ReadStateService already
1068 // instruments its requests with the same span.
1069
1070 let response_fut = response_fut.instrument(span).boxed();
1071
1072 // Check the non-finalized block queue outside the returned future,
1073 // so we can access mutable state fields.
1074 if let Some(utxo) = self.non_finalized_state_queued_blocks.utxo(&outpoint) {
1075 self.pending_utxos.respond(&outpoint, utxo);
1076
1077 // We're finished, the returned future gets the UTXO from the respond() channel.
1078 timer.finish(module_path!(), line!(), "AwaitUtxo/queued-non-finalized");
1079
1080 return response_fut;
1081 }
1082
1083 // Check the sent non-finalized blocks
1084 if let Some(utxo) = self.non_finalized_block_write_sent_hashes.utxo(&outpoint) {
1085 self.pending_utxos.respond(&outpoint, utxo);
1086
1087 // We're finished, the returned future gets the UTXO from the respond() channel.
1088 timer.finish(module_path!(), line!(), "AwaitUtxo/sent-non-finalized");
1089
1090 return response_fut;
1091 }
1092
1093 // We ignore any UTXOs in FinalizedState.finalized_state_queued_blocks,
1094 // because it is only used during checkpoint verification.
1095 //
1096 // This creates a rare race condition, but it doesn't seem to happen much in practice.
1097 // See #5126 for details.
1098
1099 // Manually send a request to the ReadStateService,
1100 // to get UTXOs from any non-finalized chain or the finalized chain.
1101 let read_service = self.read_service.clone();
1102
1103 // Run the request in an async block, so we can await the response.
1104 async move {
1105 let req = ReadRequest::AnyChainUtxo(outpoint);
1106
1107 let rsp = read_service.oneshot(req).await?;
1108
1109 // Optional TODO:
1110 // - make pending_utxos.respond() async using a channel,
1111 // so we can respond to all waiting requests here
1112 //
1113 // This change is not required for correctness, because:
1114 // - any waiting requests should have returned when the block was sent to the state
1115 // - otherwise, the request returns immediately if:
1116 // - the block is in the non-finalized queue, or
1117 // - the block is in any non-finalized chain or the finalized state
1118 //
1119 // And if the block is in the finalized queue,
1120 // that's rare enough that a retry is ok.
1121 if let ReadResponse::AnyChainUtxo(Some(utxo)) = rsp {
1122 // We got a UTXO, so we replace the response future with the result own.
1123 timer.finish(module_path!(), line!(), "AwaitUtxo/any-chain");
1124
1125 return Ok(Response::Utxo(utxo));
1126 }
1127
1128 // We're finished, but the returned future is waiting on the respond() channel.
1129 timer.finish(module_path!(), line!(), "AwaitUtxo/waiting");
1130
1131 response_fut.await
1132 }
1133 .boxed()
1134 }
1135
1136 // Used by sync, inbound, and block verifier to check if a block is already in the state
1137 // before downloading or validating it.
1138 Request::KnownBlock(hash) => {
1139 let timer = CodeTimer::start();
1140
1141 let read_service = self.read_service.clone();
1142
1143 async move {
1144 let response = read::non_finalized_state_contains_block_hash(
1145 &read_service.latest_non_finalized_state(),
1146 hash,
1147 )
1148 .or_else(|| read::finalized_state_contains_block_hash(&read_service.db, hash));
1149
1150 // The work is done in the future.
1151 timer.finish(module_path!(), line!(), "Request::KnownBlock");
1152
1153 Ok(Response::KnownBlock(response))
1154 }
1155 .boxed()
1156 }
1157
1158 // The expected error type for this request is `InvalidateError`
1159 Request::InvalidateBlock(block_hash) => {
1160 let rsp_rx = tokio::task::block_in_place(move || {
1161 span.in_scope(|| self.send_invalidate_block(block_hash))
1162 });
1163
1164 // Await the channel response, flatten the result, map receive errors to
1165 // `InvalidateError::InvalidateRequestDropped`.
1166 // Then flatten the nested Result and convert any errors to a BoxError.
1167 let span = Span::current();
1168 async move {
1169 rsp_rx
1170 .await
1171 .map_err(|_recv_error| InvalidateError::InvalidateRequestDropped)
1172 .flatten()
1173 .map_err(BoxError::from)
1174 .map(Response::Invalidated)
1175 }
1176 .instrument(span)
1177 .boxed()
1178 }
1179
1180 // The expected error type for this request is `ReconsiderError`
1181 Request::ReconsiderBlock(block_hash) => {
1182 let rsp_rx = tokio::task::block_in_place(move || {
1183 span.in_scope(|| self.send_reconsider_block(block_hash))
1184 });
1185
1186 // Await the channel response, flatten the result, map receive errors to
1187 // `ReconsiderError::ReconsiderResponseDropped`.
1188 // Then flatten the nested Result and convert any errors to a BoxError.
1189 let span = Span::current();
1190 async move {
1191 rsp_rx
1192 .await
1193 .map_err(|_recv_error| ReconsiderError::ReconsiderResponseDropped)
1194 .flatten()
1195 .map_err(BoxError::from)
1196 .map(Response::Reconsidered)
1197 }
1198 .instrument(span)
1199 .boxed()
1200 }
1201
1202 // Runs concurrently using the ReadStateService
1203 Request::Tip
1204 | Request::Depth(_)
1205 | Request::BestChainNextMedianTimePast
1206 | Request::BestChainBlockHash(_)
1207 | Request::BlockLocator
1208 | Request::Transaction(_)
1209 | Request::AnyChainTransaction(_)
1210 | Request::UnspentBestChainUtxo(_)
1211 | Request::Block(_)
1212 | Request::BlockAndSize(_)
1213 | Request::BlockHeader(_)
1214 | Request::FindBlockHashes { .. }
1215 | Request::FindBlockHeaders { .. }
1216 | Request::CheckBestChainTipNullifiersAndAnchors(_) => {
1217 // Redirect the request to the concurrent ReadStateService
1218 let read_service = self.read_service.clone();
1219
1220 async move {
1221 let req = req
1222 .try_into()
1223 .expect("ReadRequest conversion should not fail");
1224
1225 let rsp = read_service.oneshot(req).await?;
1226 let rsp = rsp.try_into().expect("Response conversion should not fail");
1227
1228 Ok(rsp)
1229 }
1230 .boxed()
1231 }
1232
1233 Request::CheckBlockProposalValidity(_) => {
1234 // Redirect the request to the concurrent ReadStateService
1235 let read_service = self.read_service.clone();
1236
1237 async move {
1238 let req = req
1239 .try_into()
1240 .expect("ReadRequest conversion should not fail");
1241
1242 let rsp = read_service.oneshot(req).await?;
1243 let rsp = rsp.try_into().expect("Response conversion should not fail");
1244
1245 Ok(rsp)
1246 }
1247 .boxed()
1248 }
1249 }
1250 }
1251}
1252
1253impl Service<ReadRequest> for ReadStateService {
1254 type Response = ReadResponse;
1255 type Error = BoxError;
1256 type Future =
1257 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1258
1259 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1260 // Check for panics in the block write task
1261 //
1262 // TODO: move into a check_for_panics() method
1263 let block_write_task = self.block_write_task.take();
1264
1265 if let Some(block_write_task) = block_write_task {
1266 if block_write_task.is_finished() {
1267 if let Some(block_write_task) = Arc::into_inner(block_write_task) {
1268 // We are the last state with a reference to this task, so we can propagate any panics
1269 if let Err(thread_panic) = block_write_task.join() {
1270 std::panic::resume_unwind(thread_panic);
1271 }
1272 }
1273 } else {
1274 // It hasn't finished, so we need to put it back
1275 self.block_write_task = Some(block_write_task);
1276 }
1277 }
1278
1279 self.db.check_for_panics();
1280
1281 Poll::Ready(Ok(()))
1282 }
1283
1284 #[instrument(name = "read_state", skip(self, req))]
1285 fn call(&mut self, req: ReadRequest) -> Self::Future {
1286 req.count_metric();
1287 let timer = CodeTimer::start();
1288 let span = Span::current();
1289
1290 match req {
1291 // Used by the `getblockchaininfo` RPC.
1292 ReadRequest::UsageInfo => {
1293 let db = self.db.clone();
1294
1295 tokio::task::spawn_blocking(move || {
1296 span.in_scope(move || {
1297 // The work is done in the future.
1298
1299 let db_size = db.size();
1300
1301 timer.finish(module_path!(), line!(), "ReadRequest::UsageInfo");
1302
1303 Ok(ReadResponse::UsageInfo(db_size))
1304 })
1305 })
1306 .wait_for_panics()
1307 }
1308
1309 // Used by the StateService.
1310 ReadRequest::Tip => {
1311 let state = self.clone();
1312
1313 tokio::task::spawn_blocking(move || {
1314 span.in_scope(move || {
1315 let tip = state.non_finalized_state_receiver.with_watch_data(
1316 |non_finalized_state| {
1317 read::tip(non_finalized_state.best_chain(), &state.db)
1318 },
1319 );
1320
1321 // The work is done in the future.
1322 timer.finish(module_path!(), line!(), "ReadRequest::Tip");
1323
1324 Ok(ReadResponse::Tip(tip))
1325 })
1326 })
1327 .wait_for_panics()
1328 }
1329
1330 // Used by `getblockchaininfo` RPC method.
1331 ReadRequest::TipPoolValues => {
1332 let state = self.clone();
1333
1334 tokio::task::spawn_blocking(move || {
1335 span.in_scope(move || {
1336 let tip_with_value_balance = state
1337 .non_finalized_state_receiver
1338 .with_watch_data(|non_finalized_state| {
1339 read::tip_with_value_balance(
1340 non_finalized_state.best_chain(),
1341 &state.db,
1342 )
1343 });
1344
1345 // The work is done in the future.
1346 // TODO: Do this in the Drop impl with the variant name?
1347 timer.finish(module_path!(), line!(), "ReadRequest::TipPoolValues");
1348
1349 let (tip_height, tip_hash, value_balance) = tip_with_value_balance?
1350 .ok_or(BoxError::from("no chain tip available yet"))?;
1351
1352 Ok(ReadResponse::TipPoolValues {
1353 tip_height,
1354 tip_hash,
1355 value_balance,
1356 })
1357 })
1358 })
1359 .wait_for_panics()
1360 }
1361
1362 // Used by getblock
1363 ReadRequest::BlockInfo(hash_or_height) => {
1364 let state = self.clone();
1365
1366 tokio::task::spawn_blocking(move || {
1367 span.in_scope(move || {
1368 let value_balance = state.non_finalized_state_receiver.with_watch_data(
1369 |non_finalized_state| {
1370 read::block_info(
1371 non_finalized_state.best_chain(),
1372 &state.db,
1373 hash_or_height,
1374 )
1375 },
1376 );
1377
1378 // The work is done in the future.
1379 // TODO: Do this in the Drop impl with the variant name?
1380 timer.finish(module_path!(), line!(), "ReadRequest::BlockInfo");
1381
1382 Ok(ReadResponse::BlockInfo(value_balance))
1383 })
1384 })
1385 .wait_for_panics()
1386 }
1387
1388 // Used by the StateService.
1389 ReadRequest::Depth(hash) => {
1390 let state = self.clone();
1391
1392 tokio::task::spawn_blocking(move || {
1393 span.in_scope(move || {
1394 let depth = state.non_finalized_state_receiver.with_watch_data(
1395 |non_finalized_state| {
1396 read::depth(non_finalized_state.best_chain(), &state.db, hash)
1397 },
1398 );
1399
1400 // The work is done in the future.
1401 timer.finish(module_path!(), line!(), "ReadRequest::Depth");
1402
1403 Ok(ReadResponse::Depth(depth))
1404 })
1405 })
1406 .wait_for_panics()
1407 }
1408
1409 // Used by the StateService.
1410 ReadRequest::BestChainNextMedianTimePast => {
1411 let state = self.clone();
1412
1413 tokio::task::spawn_blocking(move || {
1414 span.in_scope(move || {
1415 let non_finalized_state = state.latest_non_finalized_state();
1416 let median_time_past =
1417 read::next_median_time_past(&non_finalized_state, &state.db);
1418
1419 // The work is done in the future.
1420 timer.finish(
1421 module_path!(),
1422 line!(),
1423 "ReadRequest::BestChainNextMedianTimePast",
1424 );
1425
1426 Ok(ReadResponse::BestChainNextMedianTimePast(median_time_past?))
1427 })
1428 })
1429 .wait_for_panics()
1430 }
1431
1432 // Used by the get_block (raw) RPC and the StateService.
1433 ReadRequest::Block(hash_or_height) => {
1434 let state = self.clone();
1435
1436 tokio::task::spawn_blocking(move || {
1437 span.in_scope(move || {
1438 let block = state.non_finalized_state_receiver.with_watch_data(
1439 |non_finalized_state| {
1440 read::block(
1441 non_finalized_state.best_chain(),
1442 &state.db,
1443 hash_or_height,
1444 )
1445 },
1446 );
1447
1448 // The work is done in the future.
1449 timer.finish(module_path!(), line!(), "ReadRequest::Block");
1450
1451 Ok(ReadResponse::Block(block))
1452 })
1453 })
1454 .wait_for_panics()
1455 }
1456
1457 // Used by the get_block (raw) RPC and the StateService.
1458 ReadRequest::BlockAndSize(hash_or_height) => {
1459 let state = self.clone();
1460
1461 tokio::task::spawn_blocking(move || {
1462 span.in_scope(move || {
1463 let block_and_size = state.non_finalized_state_receiver.with_watch_data(
1464 |non_finalized_state| {
1465 read::block_and_size(
1466 non_finalized_state.best_chain(),
1467 &state.db,
1468 hash_or_height,
1469 )
1470 },
1471 );
1472
1473 // The work is done in the future.
1474 timer.finish(module_path!(), line!(), "ReadRequest::BlockAndSize");
1475
1476 Ok(ReadResponse::BlockAndSize(block_and_size))
1477 })
1478 })
1479 .wait_for_panics()
1480 }
1481
1482 // Used by the get_block (verbose) RPC and the StateService.
1483 ReadRequest::BlockHeader(hash_or_height) => {
1484 let state = self.clone();
1485
1486 tokio::task::spawn_blocking(move || {
1487 span.in_scope(move || {
1488 let best_chain = state.latest_best_chain();
1489
1490 let height = hash_or_height
1491 .height_or_else(|hash| {
1492 read::find::height_by_hash(best_chain.clone(), &state.db, hash)
1493 })
1494 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1495
1496 let hash = hash_or_height
1497 .hash_or_else(|height| {
1498 read::find::hash_by_height(best_chain.clone(), &state.db, height)
1499 })
1500 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1501
1502 let next_height = height.next()?;
1503 let next_block_hash =
1504 read::find::hash_by_height(best_chain.clone(), &state.db, next_height);
1505
1506 let header = read::block_header(best_chain, &state.db, height.into())
1507 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1508
1509 // The work is done in the future.
1510 timer.finish(module_path!(), line!(), "ReadRequest::Block");
1511
1512 Ok(ReadResponse::BlockHeader {
1513 header,
1514 hash,
1515 height,
1516 next_block_hash,
1517 })
1518 })
1519 })
1520 .wait_for_panics()
1521 }
1522
1523 // For the get_raw_transaction RPC and the StateService.
1524 ReadRequest::Transaction(hash) => {
1525 let state = self.clone();
1526
1527 tokio::task::spawn_blocking(move || {
1528 span.in_scope(move || {
1529 let response =
1530 read::mined_transaction(state.latest_best_chain(), &state.db, hash);
1531
1532 // The work is done in the future.
1533 timer.finish(module_path!(), line!(), "ReadRequest::Transaction");
1534
1535 Ok(ReadResponse::Transaction(response))
1536 })
1537 })
1538 .wait_for_panics()
1539 }
1540
1541 ReadRequest::AnyChainTransaction(hash) => {
1542 let state = self.clone();
1543
1544 tokio::task::spawn_blocking(move || {
1545 span.in_scope(move || {
1546 let tx = state.non_finalized_state_receiver.with_watch_data(
1547 |non_finalized_state| {
1548 read::any_transaction(
1549 non_finalized_state.chain_iter(),
1550 &state.db,
1551 hash,
1552 )
1553 },
1554 );
1555
1556 // The work is done in the future.
1557 timer.finish(module_path!(), line!(), "ReadRequest::AnyChainTransaction");
1558
1559 Ok(ReadResponse::AnyChainTransaction(tx))
1560 })
1561 })
1562 .wait_for_panics()
1563 }
1564
1565 // Used by the getblock (verbose) RPC.
1566 ReadRequest::TransactionIdsForBlock(hash_or_height) => {
1567 let state = self.clone();
1568
1569 tokio::task::spawn_blocking(move || {
1570 span.in_scope(move || {
1571 let transaction_ids = state.non_finalized_state_receiver.with_watch_data(
1572 |non_finalized_state| {
1573 read::transaction_hashes_for_block(
1574 non_finalized_state.best_chain(),
1575 &state.db,
1576 hash_or_height,
1577 )
1578 },
1579 );
1580
1581 // The work is done in the future.
1582 timer.finish(
1583 module_path!(),
1584 line!(),
1585 "ReadRequest::TransactionIdsForBlock",
1586 );
1587
1588 Ok(ReadResponse::TransactionIdsForBlock(transaction_ids))
1589 })
1590 })
1591 .wait_for_panics()
1592 }
1593
1594 ReadRequest::AnyChainTransactionIdsForBlock(hash_or_height) => {
1595 let state = self.clone();
1596
1597 tokio::task::spawn_blocking(move || {
1598 span.in_scope(move || {
1599 let transaction_ids = state.non_finalized_state_receiver.with_watch_data(
1600 |non_finalized_state| {
1601 read::transaction_hashes_for_any_block(
1602 non_finalized_state.chain_iter(),
1603 &state.db,
1604 hash_or_height,
1605 )
1606 },
1607 );
1608
1609 // The work is done in the future.
1610 timer.finish(
1611 module_path!(),
1612 line!(),
1613 "ReadRequest::AnyChainTransactionIdsForBlock",
1614 );
1615
1616 Ok(ReadResponse::AnyChainTransactionIdsForBlock(
1617 transaction_ids,
1618 ))
1619 })
1620 })
1621 .wait_for_panics()
1622 }
1623
1624 #[cfg(feature = "indexer")]
1625 ReadRequest::SpendingTransactionId(spend) => {
1626 let state = self.clone();
1627
1628 tokio::task::spawn_blocking(move || {
1629 span.in_scope(move || {
1630 let spending_transaction_id = state
1631 .non_finalized_state_receiver
1632 .with_watch_data(|non_finalized_state| {
1633 read::spending_transaction_hash(
1634 non_finalized_state.best_chain(),
1635 &state.db,
1636 spend,
1637 )
1638 });
1639
1640 // The work is done in the future.
1641 timer.finish(
1642 module_path!(),
1643 line!(),
1644 "ReadRequest::TransactionIdForSpentOutPoint",
1645 );
1646
1647 Ok(ReadResponse::TransactionId(spending_transaction_id))
1648 })
1649 })
1650 .wait_for_panics()
1651 }
1652
1653 ReadRequest::UnspentBestChainUtxo(outpoint) => {
1654 let state = self.clone();
1655
1656 tokio::task::spawn_blocking(move || {
1657 span.in_scope(move || {
1658 let utxo = state.non_finalized_state_receiver.with_watch_data(
1659 |non_finalized_state| {
1660 read::unspent_utxo(
1661 non_finalized_state.best_chain(),
1662 &state.db,
1663 outpoint,
1664 )
1665 },
1666 );
1667
1668 // The work is done in the future.
1669 timer.finish(module_path!(), line!(), "ReadRequest::UnspentBestChainUtxo");
1670
1671 Ok(ReadResponse::UnspentBestChainUtxo(utxo))
1672 })
1673 })
1674 .wait_for_panics()
1675 }
1676
1677 // Manually used by the StateService to implement part of AwaitUtxo.
1678 ReadRequest::AnyChainUtxo(outpoint) => {
1679 let state = self.clone();
1680
1681 tokio::task::spawn_blocking(move || {
1682 span.in_scope(move || {
1683 let utxo = state.non_finalized_state_receiver.with_watch_data(
1684 |non_finalized_state| {
1685 read::any_utxo(non_finalized_state, &state.db, outpoint)
1686 },
1687 );
1688
1689 // The work is done in the future.
1690 timer.finish(module_path!(), line!(), "ReadRequest::AnyChainUtxo");
1691
1692 Ok(ReadResponse::AnyChainUtxo(utxo))
1693 })
1694 })
1695 .wait_for_panics()
1696 }
1697
1698 // Used by the StateService.
1699 ReadRequest::BlockLocator => {
1700 let state = self.clone();
1701
1702 tokio::task::spawn_blocking(move || {
1703 span.in_scope(move || {
1704 let block_locator = state.non_finalized_state_receiver.with_watch_data(
1705 |non_finalized_state| {
1706 read::block_locator(non_finalized_state.best_chain(), &state.db)
1707 },
1708 );
1709
1710 // The work is done in the future.
1711 timer.finish(module_path!(), line!(), "ReadRequest::BlockLocator");
1712
1713 Ok(ReadResponse::BlockLocator(
1714 block_locator.unwrap_or_default(),
1715 ))
1716 })
1717 })
1718 .wait_for_panics()
1719 }
1720
1721 // Used by the StateService.
1722 ReadRequest::FindBlockHashes { known_blocks, stop } => {
1723 let state = self.clone();
1724
1725 tokio::task::spawn_blocking(move || {
1726 span.in_scope(move || {
1727 let block_hashes = state.non_finalized_state_receiver.with_watch_data(
1728 |non_finalized_state| {
1729 read::find_chain_hashes(
1730 non_finalized_state.best_chain(),
1731 &state.db,
1732 known_blocks,
1733 stop,
1734 MAX_FIND_BLOCK_HASHES_RESULTS,
1735 )
1736 },
1737 );
1738
1739 // The work is done in the future.
1740 timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHashes");
1741
1742 Ok(ReadResponse::BlockHashes(block_hashes))
1743 })
1744 })
1745 .wait_for_panics()
1746 }
1747
1748 // Used by the StateService.
1749 ReadRequest::FindBlockHeaders { known_blocks, stop } => {
1750 let state = self.clone();
1751
1752 tokio::task::spawn_blocking(move || {
1753 span.in_scope(move || {
1754 let block_headers = state.non_finalized_state_receiver.with_watch_data(
1755 |non_finalized_state| {
1756 read::find_chain_headers(
1757 non_finalized_state.best_chain(),
1758 &state.db,
1759 known_blocks,
1760 stop,
1761 MAX_FIND_BLOCK_HEADERS_RESULTS,
1762 )
1763 },
1764 );
1765
1766 let block_headers = block_headers
1767 .into_iter()
1768 .map(|header| CountedHeader { header })
1769 .collect();
1770
1771 // The work is done in the future.
1772 timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHeaders");
1773
1774 Ok(ReadResponse::BlockHeaders(block_headers))
1775 })
1776 })
1777 .wait_for_panics()
1778 }
1779
1780 ReadRequest::SaplingTree(hash_or_height) => {
1781 let state = self.clone();
1782
1783 tokio::task::spawn_blocking(move || {
1784 span.in_scope(move || {
1785 let sapling_tree = state.non_finalized_state_receiver.with_watch_data(
1786 |non_finalized_state| {
1787 read::sapling_tree(
1788 non_finalized_state.best_chain(),
1789 &state.db,
1790 hash_or_height,
1791 )
1792 },
1793 );
1794
1795 // The work is done in the future.
1796 timer.finish(module_path!(), line!(), "ReadRequest::SaplingTree");
1797
1798 Ok(ReadResponse::SaplingTree(sapling_tree))
1799 })
1800 })
1801 .wait_for_panics()
1802 }
1803
1804 ReadRequest::OrchardTree(hash_or_height) => {
1805 let state = self.clone();
1806
1807 tokio::task::spawn_blocking(move || {
1808 span.in_scope(move || {
1809 let orchard_tree = state.non_finalized_state_receiver.with_watch_data(
1810 |non_finalized_state| {
1811 read::orchard_tree(
1812 non_finalized_state.best_chain(),
1813 &state.db,
1814 hash_or_height,
1815 )
1816 },
1817 );
1818
1819 // The work is done in the future.
1820 timer.finish(module_path!(), line!(), "ReadRequest::OrchardTree");
1821
1822 Ok(ReadResponse::OrchardTree(orchard_tree))
1823 })
1824 })
1825 .wait_for_panics()
1826 }
1827
1828 ReadRequest::SaplingSubtrees { start_index, limit } => {
1829 let state = self.clone();
1830
1831 tokio::task::spawn_blocking(move || {
1832 span.in_scope(move || {
1833 let end_index = limit
1834 .and_then(|limit| start_index.0.checked_add(limit.0))
1835 .map(NoteCommitmentSubtreeIndex);
1836
1837 let sapling_subtrees = state.non_finalized_state_receiver.with_watch_data(
1838 |non_finalized_state| {
1839 if let Some(end_index) = end_index {
1840 read::sapling_subtrees(
1841 non_finalized_state.best_chain(),
1842 &state.db,
1843 start_index..end_index,
1844 )
1845 } else {
1846 // If there is no end bound, just return all the trees.
1847 // If the end bound would overflow, just returns all the trees, because that's what
1848 // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1849 // the trees run out.)
1850 read::sapling_subtrees(
1851 non_finalized_state.best_chain(),
1852 &state.db,
1853 start_index..,
1854 )
1855 }
1856 },
1857 );
1858
1859 // The work is done in the future.
1860 timer.finish(module_path!(), line!(), "ReadRequest::SaplingSubtrees");
1861
1862 Ok(ReadResponse::SaplingSubtrees(sapling_subtrees))
1863 })
1864 })
1865 .wait_for_panics()
1866 }
1867
1868 ReadRequest::OrchardSubtrees { start_index, limit } => {
1869 let state = self.clone();
1870
1871 tokio::task::spawn_blocking(move || {
1872 span.in_scope(move || {
1873 let end_index = limit
1874 .and_then(|limit| start_index.0.checked_add(limit.0))
1875 .map(NoteCommitmentSubtreeIndex);
1876
1877 let orchard_subtrees = state.non_finalized_state_receiver.with_watch_data(
1878 |non_finalized_state| {
1879 if let Some(end_index) = end_index {
1880 read::orchard_subtrees(
1881 non_finalized_state.best_chain(),
1882 &state.db,
1883 start_index..end_index,
1884 )
1885 } else {
1886 // If there is no end bound, just return all the trees.
1887 // If the end bound would overflow, just returns all the trees, because that's what
1888 // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1889 // the trees run out.)
1890 read::orchard_subtrees(
1891 non_finalized_state.best_chain(),
1892 &state.db,
1893 start_index..,
1894 )
1895 }
1896 },
1897 );
1898
1899 // The work is done in the future.
1900 timer.finish(module_path!(), line!(), "ReadRequest::OrchardSubtrees");
1901
1902 Ok(ReadResponse::OrchardSubtrees(orchard_subtrees))
1903 })
1904 })
1905 .wait_for_panics()
1906 }
1907
1908 // For the get_address_balance RPC.
1909 ReadRequest::AddressBalance(addresses) => {
1910 let state = self.clone();
1911
1912 tokio::task::spawn_blocking(move || {
1913 span.in_scope(move || {
1914 let (balance, received) = state
1915 .non_finalized_state_receiver
1916 .with_watch_data(|non_finalized_state| {
1917 read::transparent_balance(
1918 non_finalized_state.best_chain().cloned(),
1919 &state.db,
1920 addresses,
1921 )
1922 })?;
1923
1924 // The work is done in the future.
1925 timer.finish(module_path!(), line!(), "ReadRequest::AddressBalance");
1926
1927 Ok(ReadResponse::AddressBalance { balance, received })
1928 })
1929 })
1930 .wait_for_panics()
1931 }
1932
1933 // For the get_address_tx_ids RPC.
1934 ReadRequest::TransactionIdsByAddresses {
1935 addresses,
1936 height_range,
1937 } => {
1938 let state = self.clone();
1939
1940 tokio::task::spawn_blocking(move || {
1941 span.in_scope(move || {
1942 let tx_ids = state.non_finalized_state_receiver.with_watch_data(
1943 |non_finalized_state| {
1944 read::transparent_tx_ids(
1945 non_finalized_state.best_chain(),
1946 &state.db,
1947 addresses,
1948 height_range,
1949 )
1950 },
1951 );
1952
1953 // The work is done in the future.
1954 timer.finish(
1955 module_path!(),
1956 line!(),
1957 "ReadRequest::TransactionIdsByAddresses",
1958 );
1959
1960 tx_ids.map(ReadResponse::AddressesTransactionIds)
1961 })
1962 })
1963 .wait_for_panics()
1964 }
1965
1966 // For the get_address_utxos RPC.
1967 ReadRequest::UtxosByAddresses(addresses) => {
1968 let state = self.clone();
1969
1970 tokio::task::spawn_blocking(move || {
1971 span.in_scope(move || {
1972 let utxos = state.non_finalized_state_receiver.with_watch_data(
1973 |non_finalized_state| {
1974 read::address_utxos(
1975 &state.network,
1976 non_finalized_state.best_chain(),
1977 &state.db,
1978 addresses,
1979 )
1980 },
1981 );
1982
1983 // The work is done in the future.
1984 timer.finish(module_path!(), line!(), "ReadRequest::UtxosByAddresses");
1985
1986 utxos.map(ReadResponse::AddressUtxos)
1987 })
1988 })
1989 .wait_for_panics()
1990 }
1991
1992 ReadRequest::CheckBestChainTipNullifiersAndAnchors(unmined_tx) => {
1993 let state = self.clone();
1994
1995 tokio::task::spawn_blocking(move || {
1996 span.in_scope(move || {
1997 let latest_non_finalized_best_chain =
1998 state.latest_non_finalized_state().best_chain().cloned();
1999
2000 check::nullifier::tx_no_duplicates_in_chain(
2001 &state.db,
2002 latest_non_finalized_best_chain.as_ref(),
2003 &unmined_tx.transaction,
2004 )?;
2005
2006 check::anchors::tx_anchors_refer_to_final_treestates(
2007 &state.db,
2008 latest_non_finalized_best_chain.as_ref(),
2009 &unmined_tx,
2010 )?;
2011
2012 // The work is done in the future.
2013 timer.finish(
2014 module_path!(),
2015 line!(),
2016 "ReadRequest::CheckBestChainTipNullifiersAndAnchors",
2017 );
2018
2019 Ok(ReadResponse::ValidBestChainTipNullifiersAndAnchors)
2020 })
2021 })
2022 .wait_for_panics()
2023 }
2024
2025 // Used by the get_block and get_block_hash RPCs.
2026 ReadRequest::BestChainBlockHash(height) => {
2027 let state = self.clone();
2028
2029 // # Performance
2030 //
2031 // Allow other async tasks to make progress while concurrently reading blocks from disk.
2032
2033 tokio::task::spawn_blocking(move || {
2034 span.in_scope(move || {
2035 let hash = state.non_finalized_state_receiver.with_watch_data(
2036 |non_finalized_state| {
2037 read::hash_by_height(
2038 non_finalized_state.best_chain(),
2039 &state.db,
2040 height,
2041 )
2042 },
2043 );
2044
2045 // The work is done in the future.
2046 timer.finish(module_path!(), line!(), "ReadRequest::BestChainBlockHash");
2047
2048 Ok(ReadResponse::BlockHash(hash))
2049 })
2050 })
2051 .wait_for_panics()
2052 }
2053
2054 // Used by get_block_template and getblockchaininfo RPCs.
2055 ReadRequest::ChainInfo => {
2056 let state = self.clone();
2057 let latest_non_finalized_state = self.latest_non_finalized_state();
2058
2059 // # Performance
2060 //
2061 // Allow other async tasks to make progress while concurrently reading blocks from disk.
2062
2063 tokio::task::spawn_blocking(move || {
2064 span.in_scope(move || {
2065 // # Correctness
2066 //
2067 // It is ok to do these lookups using multiple database calls. Finalized state updates
2068 // can only add overlapping blocks, and block hashes are unique across all chain forks.
2069 //
2070 // If there is a large overlap between the non-finalized and finalized states,
2071 // where the finalized tip is above the non-finalized tip,
2072 // Zebra is receiving a lot of blocks, or this request has been delayed for a long time.
2073 //
2074 // In that case, the `getblocktemplate` RPC will return an error because Zebra
2075 // is not synced to the tip. That check happens before the RPC makes this request.
2076 let get_block_template_info =
2077 read::difficulty::get_block_template_chain_info(
2078 &latest_non_finalized_state,
2079 &state.db,
2080 &state.network,
2081 );
2082
2083 // The work is done in the future.
2084 timer.finish(module_path!(), line!(), "ReadRequest::ChainInfo");
2085
2086 get_block_template_info.map(ReadResponse::ChainInfo)
2087 })
2088 })
2089 .wait_for_panics()
2090 }
2091
2092 // Used by getmininginfo, getnetworksolps, and getnetworkhashps RPCs.
2093 ReadRequest::SolutionRate { num_blocks, height } => {
2094 let state = self.clone();
2095
2096 // # Performance
2097 //
2098 // Allow other async tasks to make progress while concurrently reading blocks from disk.
2099
2100 tokio::task::spawn_blocking(move || {
2101 span.in_scope(move || {
2102 let latest_non_finalized_state = state.latest_non_finalized_state();
2103 // # Correctness
2104 //
2105 // It is ok to do these lookups using multiple database calls. Finalized state updates
2106 // can only add overlapping blocks, and block hashes are unique across all chain forks.
2107 //
2108 // The worst that can happen here is that the default `start_hash` will be below
2109 // the chain tip.
2110 let (tip_height, tip_hash) =
2111 match read::tip(latest_non_finalized_state.best_chain(), &state.db) {
2112 Some(tip_hash) => tip_hash,
2113 None => return Ok(ReadResponse::SolutionRate(None)),
2114 };
2115
2116 let start_hash = match height {
2117 Some(height) if height < tip_height => read::hash_by_height(
2118 latest_non_finalized_state.best_chain(),
2119 &state.db,
2120 height,
2121 ),
2122 // use the chain tip hash if height is above it or not provided.
2123 _ => Some(tip_hash),
2124 };
2125
2126 let solution_rate = start_hash.and_then(|start_hash| {
2127 read::difficulty::solution_rate(
2128 &latest_non_finalized_state,
2129 &state.db,
2130 num_blocks,
2131 start_hash,
2132 )
2133 });
2134
2135 // The work is done in the future.
2136 timer.finish(module_path!(), line!(), "ReadRequest::SolutionRate");
2137
2138 Ok(ReadResponse::SolutionRate(solution_rate))
2139 })
2140 })
2141 .wait_for_panics()
2142 }
2143
2144 ReadRequest::CheckBlockProposalValidity(semantically_verified) => {
2145 let state = self.clone();
2146
2147 // # Performance
2148 //
2149 // Allow other async tasks to make progress while concurrently reading blocks from disk.
2150
2151 tokio::task::spawn_blocking(move || {
2152 span.in_scope(move || {
2153 tracing::debug!("attempting to validate and commit block proposal onto a cloned non-finalized state");
2154 let mut latest_non_finalized_state = state.latest_non_finalized_state();
2155
2156 // The previous block of a valid proposal must be on the best chain tip.
2157 let Some((_best_tip_height, best_tip_hash)) = read::best_tip(&latest_non_finalized_state, &state.db) else {
2158 return Err("state is empty: wait for Zebra to sync before submitting a proposal".into());
2159 };
2160
2161 if semantically_verified.block.header.previous_block_hash != best_tip_hash {
2162 return Err("proposal is not based on the current best chain tip: previous block hash must be the best chain tip".into());
2163 }
2164
2165 // This clone of the non-finalized state is dropped when this closure returns.
2166 // The non-finalized state that's used in the rest of the state (including finalizing
2167 // blocks into the db) is not mutated here.
2168 //
2169 // TODO: Convert `CommitSemanticallyVerifiedError` to a new `ValidateProposalError`?
2170 latest_non_finalized_state.disable_metrics();
2171
2172 write::validate_and_commit_non_finalized(
2173 &state.db,
2174 &mut latest_non_finalized_state,
2175 semantically_verified,
2176 )?;
2177
2178 // The work is done in the future.
2179 timer.finish(
2180 module_path!(),
2181 line!(),
2182 "ReadRequest::CheckBlockProposalValidity",
2183 );
2184
2185 Ok(ReadResponse::ValidBlockProposal)
2186 })
2187 })
2188 .wait_for_panics()
2189 }
2190
2191 ReadRequest::TipBlockSize => {
2192 let state = self.clone();
2193
2194 tokio::task::spawn_blocking(move || {
2195 span.in_scope(move || {
2196 // Get the best chain tip height.
2197 let tip_height = state
2198 .non_finalized_state_receiver
2199 .with_watch_data(|non_finalized_state| {
2200 read::tip_height(non_finalized_state.best_chain(), &state.db)
2201 })
2202 .unwrap_or(Height(0));
2203
2204 // Get the block at the best chain tip height.
2205 let block = state.non_finalized_state_receiver.with_watch_data(
2206 |non_finalized_state| {
2207 read::block(
2208 non_finalized_state.best_chain(),
2209 &state.db,
2210 tip_height.into(),
2211 )
2212 },
2213 );
2214
2215 // The work is done in the future.
2216 timer.finish(module_path!(), line!(), "ReadRequest::TipBlockSize");
2217
2218 // Respond with the length of the obtained block if any.
2219 match block {
2220 Some(b) => Ok(ReadResponse::TipBlockSize(Some(
2221 b.zcash_serialize_to_vec()?.len(),
2222 ))),
2223 None => Ok(ReadResponse::TipBlockSize(None)),
2224 }
2225 })
2226 })
2227 .wait_for_panics()
2228 }
2229
2230 ReadRequest::NonFinalizedBlocksListener => {
2231 // The non-finalized blocks listener is used to notify the state service
2232 // about new blocks that have been added to the non-finalized state.
2233 let non_finalized_blocks_listener = NonFinalizedBlocksListener::spawn(
2234 self.network.clone(),
2235 self.non_finalized_state_receiver.clone(),
2236 );
2237
2238 async move {
2239 timer.finish(
2240 module_path!(),
2241 line!(),
2242 "ReadRequest::NonFinalizedBlocksListener",
2243 );
2244
2245 Ok(ReadResponse::NonFinalizedBlocksListener(
2246 non_finalized_blocks_listener,
2247 ))
2248 }
2249 .boxed()
2250 }
2251 }
2252 }
2253}
2254
2255/// Initialize a state service from the provided [`Config`].
2256/// Returns a boxed state service, a read-only state service,
2257/// and receivers for state chain tip updates.
2258///
2259/// Each `network` has its own separate on-disk database.
2260///
2261/// The state uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
2262/// to work out when it is near the final checkpoint.
2263///
2264/// To share access to the state, wrap the returned service in a `Buffer`,
2265/// or clone the returned [`ReadStateService`].
2266///
2267/// It's possible to construct multiple state services in the same application (as
2268/// long as they, e.g., use different storage locations), but doing so is
2269/// probably not what you want.
2270pub async fn init(
2271 config: Config,
2272 network: &Network,
2273 max_checkpoint_height: block::Height,
2274 checkpoint_verify_concurrency_limit: usize,
2275) -> (
2276 BoxService<Request, Response, BoxError>,
2277 ReadStateService,
2278 LatestChainTip,
2279 ChainTipChange,
2280) {
2281 let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
2282 StateService::new(
2283 config,
2284 network,
2285 max_checkpoint_height,
2286 checkpoint_verify_concurrency_limit,
2287 )
2288 .await;
2289
2290 (
2291 BoxService::new(state_service),
2292 read_only_state_service,
2293 latest_chain_tip,
2294 chain_tip_change,
2295 )
2296}
2297
2298/// Initialize a read state service from the provided [`Config`].
2299/// Returns a read-only state service,
2300///
2301/// Each `network` has its own separate on-disk database.
2302///
2303/// To share access to the state, clone the returned [`ReadStateService`].
2304pub fn init_read_only(
2305 config: Config,
2306 network: &Network,
2307) -> (
2308 ReadStateService,
2309 ZebraDb,
2310 tokio::sync::watch::Sender<NonFinalizedState>,
2311) {
2312 let finalized_state = FinalizedState::new_with_debug(
2313 &config,
2314 network,
2315 true,
2316 #[cfg(feature = "elasticsearch")]
2317 false,
2318 true,
2319 );
2320 let (non_finalized_state_sender, non_finalized_state_receiver) =
2321 tokio::sync::watch::channel(NonFinalizedState::new(network));
2322
2323 (
2324 ReadStateService::new(
2325 &finalized_state,
2326 None,
2327 WatchReceiver::new(non_finalized_state_receiver),
2328 ),
2329 finalized_state.db.clone(),
2330 non_finalized_state_sender,
2331 )
2332}
2333
2334/// Calls [`init_read_only`] with the provided [`Config`] and [`Network`] from a blocking task.
2335/// Returns a [`tokio::task::JoinHandle`] with a read state service and chain tip sender.
2336pub fn spawn_init_read_only(
2337 config: Config,
2338 network: &Network,
2339) -> tokio::task::JoinHandle<(
2340 ReadStateService,
2341 ZebraDb,
2342 tokio::sync::watch::Sender<NonFinalizedState>,
2343)> {
2344 let network = network.clone();
2345 tokio::task::spawn_blocking(move || init_read_only(config, &network))
2346}
2347
2348/// Returns a [`StateService`] with an ephemeral [`Config`] and a buffer with a single slot.
2349///
2350/// This can be used to create a state service for testing. See also [`init`].
2351#[cfg(any(test, feature = "proptest-impl"))]
2352pub async fn init_test(
2353 network: &Network,
2354) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
2355 // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
2356 // if we ever need to test final checkpoint sent UTXO queries
2357 let (state_service, _, _, _) =
2358 StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
2359
2360 Buffer::new(BoxService::new(state_service), 1)
2361}
2362
2363/// Initializes a state service with an ephemeral [`Config`] and a buffer with a single slot,
2364/// then returns the read-write service, read-only service, and tip watch channels.
2365///
2366/// This can be used to create a state service for testing. See also [`init`].
2367#[cfg(any(test, feature = "proptest-impl"))]
2368pub async fn init_test_services(
2369 network: &Network,
2370) -> (
2371 Buffer<BoxService<Request, Response, BoxError>, Request>,
2372 ReadStateService,
2373 LatestChainTip,
2374 ChainTipChange,
2375) {
2376 // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
2377 // if we ever need to test final checkpoint sent UTXO queries
2378 let (state_service, read_state_service, latest_chain_tip, chain_tip_change) =
2379 StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
2380
2381 let state_service = Buffer::new(BoxService::new(state_service), 1);
2382
2383 (
2384 state_service,
2385 read_state_service,
2386 latest_chain_tip,
2387 chain_tip_change,
2388 )
2389}