zebra_state/service.rs
1//! [`tower::Service`]s for Zebra's cached chain state.
2//!
3//! Zebra provides cached state access via two main services:
4//! - [`StateService`]: a read-write service that writes blocks to the state,
5//! and redirects most read requests to the [`ReadStateService`].
6//! - [`ReadStateService`]: a read-only service that answers from the most
7//! recent committed block.
8//!
9//! Most users should prefer [`ReadStateService`], unless they need to write blocks to the state.
10//!
11//! Zebra also provides access to the best chain tip via:
12//! - [`LatestChainTip`]: a read-only channel that contains the latest committed
13//! tip.
14//! - [`ChainTipChange`]: a read-only channel that can asynchronously await
15//! chain tip changes.
16
17use std::{
18 collections::HashMap,
19 future::Future,
20 pin::Pin,
21 sync::Arc,
22 task::{Context, Poll},
23 time::{Duration, Instant},
24};
25
26use futures::future::FutureExt;
27use tokio::sync::oneshot;
28use tower::{util::BoxService, Service, ServiceExt};
29use tracing::{instrument, Instrument, Span};
30
31#[cfg(any(test, feature = "proptest-impl"))]
32use tower::buffer::Buffer;
33
34use zebra_chain::{
35 block::{self, CountedHeader, HeightDiff},
36 diagnostic::{task::WaitForPanics, CodeTimer},
37 parameters::{Network, NetworkUpgrade},
38 subtree::NoteCommitmentSubtreeIndex,
39};
40
41use zebra_chain::{block::Height, serialization::ZcashSerialize};
42
43use crate::{
44 constants::{
45 MAX_FIND_BLOCK_HASHES_RESULTS, MAX_FIND_BLOCK_HEADERS_RESULTS, MAX_LEGACY_CHAIN_BLOCKS,
46 },
47 error::{CommitBlockError, CommitCheckpointVerifiedError, InvalidateError, ReconsiderError},
48 response::NonFinalizedBlocksListener,
49 service::{
50 block_iter::any_ancestor_blocks,
51 chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip},
52 finalized_state::{FinalizedState, ZebraDb},
53 non_finalized_state::{Chain, NonFinalizedState},
54 pending_utxos::PendingUtxos,
55 queued_blocks::QueuedBlocks,
56 watch_receiver::WatchReceiver,
57 },
58 BoxError, CheckpointVerifiedBlock, CommitSemanticallyVerifiedError, Config, KnownBlock,
59 ReadRequest, ReadResponse, Request, Response, SemanticallyVerifiedBlock,
60};
61
62pub mod block_iter;
63pub mod chain_tip;
64pub mod watch_receiver;
65
66pub mod check;
67
68pub(crate) mod finalized_state;
69pub(crate) mod non_finalized_state;
70mod pending_utxos;
71mod queued_blocks;
72pub(crate) mod read;
73mod traits;
74mod write;
75
76#[cfg(any(test, feature = "proptest-impl"))]
77pub mod arbitrary;
78
79#[cfg(test)]
80mod tests;
81
82pub use finalized_state::{OutputLocation, TransactionIndex, TransactionLocation};
83use write::NonFinalizedWriteMessage;
84
85use self::queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified, SentHashes};
86
87pub use self::traits::{ReadState, State};
88
89/// A read-write service for Zebra's cached blockchain state.
90///
91/// This service modifies and provides access to:
92/// - the non-finalized state: the ~100 most recent blocks.
93/// Zebra allows chain forks in the non-finalized state,
94/// stores it in memory, and re-downloads it when restarted.
95/// - the finalized state: older blocks that have many confirmations.
96/// Zebra stores the single best chain in the finalized state,
97/// and re-loads it from disk when restarted.
98///
99/// Read requests to this service are buffered, then processed concurrently.
100/// Block write requests are buffered, then queued, then processed in order by a separate task.
101///
102/// Most state users can get faster read responses using the [`ReadStateService`],
103/// because its requests do not share a [`tower::buffer::Buffer`] with block write requests.
104///
105/// To quickly get the latest block, use [`LatestChainTip`] or [`ChainTipChange`].
106/// They can read the latest block directly, without queueing any requests.
107#[derive(Debug)]
108pub(crate) struct StateService {
109 // Configuration
110 //
111 /// The configured Zcash network.
112 network: Network,
113
114 /// The height that we start storing UTXOs from finalized blocks.
115 ///
116 /// This height should be lower than the last few checkpoints,
117 /// so the full verifier can verify UTXO spends from those blocks,
118 /// even if they haven't been committed to the finalized state yet.
119 full_verifier_utxo_lookahead: block::Height,
120
121 // Queued Blocks
122 //
123 /// Queued blocks for the [`NonFinalizedState`] that arrived out of order.
124 /// These blocks are awaiting their parent blocks before they can do contextual verification.
125 non_finalized_state_queued_blocks: QueuedBlocks,
126
127 /// Queued blocks for the [`FinalizedState`] that arrived out of order.
128 /// These blocks are awaiting their parent blocks before they can do contextual verification.
129 ///
130 /// Indexed by their parent block hash.
131 finalized_state_queued_blocks: HashMap<block::Hash, QueuedCheckpointVerified>,
132
133 /// Channels to send blocks to the block write task.
134 block_write_sender: write::BlockWriteSender,
135
136 /// The [`block::Hash`] of the most recent block sent on
137 /// `finalized_block_write_sender` or `non_finalized_block_write_sender`.
138 ///
139 /// On startup, this is:
140 /// - the finalized tip, if there are stored blocks, or
141 /// - the genesis block's parent hash, if the database is empty.
142 ///
143 /// If `invalid_block_write_reset_receiver` gets a reset, this is:
144 /// - the hash of the last valid committed block (the parent of the invalid block).
145 finalized_block_write_last_sent_hash: block::Hash,
146
147 /// A set of block hashes that have been sent to the block write task.
148 /// Hashes of blocks below the finalized tip height are periodically pruned.
149 non_finalized_block_write_sent_hashes: SentHashes,
150
151 /// If an invalid block is sent on `finalized_block_write_sender`
152 /// or `non_finalized_block_write_sender`,
153 /// this channel gets the [`block::Hash`] of the valid tip.
154 //
155 // TODO: add tests for finalized and non-finalized resets (#2654)
156 invalid_block_write_reset_receiver: tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
157
158 // Pending UTXO Request Tracking
159 //
160 /// The set of outpoints with pending requests for their associated transparent::Output.
161 pending_utxos: PendingUtxos,
162
163 /// Instant tracking the last time `pending_utxos` was pruned.
164 last_prune: Instant,
165
166 // Updating Concurrently Readable State
167 //
168 /// A cloneable [`ReadStateService`], used to answer concurrent read requests.
169 ///
170 /// TODO: move users of read [`Request`]s to [`ReadStateService`], and remove `read_service`.
171 read_service: ReadStateService,
172
173 // Metrics
174 //
175 /// A metric tracking the maximum height that's currently in `finalized_state_queued_blocks`
176 ///
177 /// Set to `f64::NAN` if `finalized_state_queued_blocks` is empty, because grafana shows NaNs
178 /// as a break in the graph.
179 max_finalized_queue_height: f64,
180}
181
182/// A read-only service for accessing Zebra's cached blockchain state.
183///
184/// This service provides read-only access to:
185/// - the non-finalized state: the ~100 most recent blocks.
186/// - the finalized state: older blocks that have many confirmations.
187///
188/// Requests to this service are processed in parallel,
189/// ignoring any blocks queued by the read-write [`StateService`].
190///
191/// This quick response behavior is better for most state users.
192/// It allows other async tasks to make progress while concurrently reading data from disk.
193#[derive(Clone, Debug)]
194pub struct ReadStateService {
195 // Configuration
196 //
197 /// The configured Zcash network.
198 network: Network,
199
200 // Shared Concurrently Readable State
201 //
202 /// A watch channel with a cached copy of the [`NonFinalizedState`].
203 ///
204 /// This state is only updated between requests,
205 /// so it might include some block data that is also on `disk`.
206 non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
207
208 /// The shared inner on-disk database for the finalized state.
209 ///
210 /// RocksDB allows reads and writes via a shared reference,
211 /// but [`ZebraDb`] doesn't expose any write methods or types.
212 ///
213 /// This chain is updated concurrently with requests,
214 /// so it might include some block data that is also in `best_mem`.
215 db: ZebraDb,
216
217 /// A shared handle to a task that writes blocks to the [`NonFinalizedState`] or [`FinalizedState`],
218 /// once the queues have received all their parent blocks.
219 ///
220 /// Used to check for panics when writing blocks.
221 block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
222}
223
224impl Drop for StateService {
225 fn drop(&mut self) {
226 // The state service owns the state, tasks, and channels,
227 // so dropping it should shut down everything.
228
229 // Close the channels (non-blocking)
230 // This makes the block write thread exit the next time it checks the channels.
231 // We want to do this here so we get any errors or panics from the block write task before it shuts down.
232 self.invalid_block_write_reset_receiver.close();
233
234 std::mem::drop(self.block_write_sender.finalized.take());
235 std::mem::drop(self.block_write_sender.non_finalized.take());
236
237 self.clear_finalized_block_queue(CommitBlockError::WriteTaskExited);
238 self.clear_non_finalized_block_queue(CommitBlockError::WriteTaskExited);
239
240 // Log database metrics before shutting down
241 info!("dropping the state: logging database metrics");
242 self.log_db_metrics();
243
244 // Then drop self.read_service, which checks the block write task for panics,
245 // and tries to shut down the database.
246 }
247}
248
249impl Drop for ReadStateService {
250 fn drop(&mut self) {
251 // The read state service shares the state,
252 // so dropping it should check if we can shut down.
253
254 // TODO: move this into a try_shutdown() method
255 if let Some(block_write_task) = self.block_write_task.take() {
256 if let Some(block_write_task_handle) = Arc::into_inner(block_write_task) {
257 // We're the last database user, so we can tell it to shut down (blocking):
258 // - flushes the database to disk, and
259 // - drops the database, which cleans up any database tasks correctly.
260 self.db.shutdown(true);
261
262 // We are the last state with a reference to this thread, so we can
263 // wait until the block write task finishes, then check for panics (blocking).
264 // (We'd also like to abort the thread, but std::thread::JoinHandle can't do that.)
265
266 // This log is verbose during tests.
267 #[cfg(not(test))]
268 info!("waiting for the block write task to finish");
269 #[cfg(test)]
270 debug!("waiting for the block write task to finish");
271
272 // TODO: move this into a check_for_panics() method
273 if let Err(thread_panic) = block_write_task_handle.join() {
274 std::panic::resume_unwind(thread_panic);
275 } else {
276 debug!("shutting down the state because the block write task has finished");
277 }
278 }
279 } else {
280 // Even if we're not the last database user, try shutting it down.
281 //
282 // TODO: rename this to try_shutdown()?
283 self.db.shutdown(false);
284 }
285 }
286}
287
288impl StateService {
289 const PRUNE_INTERVAL: Duration = Duration::from_secs(30);
290
291 /// Creates a new state service for the state `config` and `network`.
292 ///
293 /// Uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
294 /// to work out when it is near the final checkpoint.
295 ///
296 /// Returns the read-write and read-only state services,
297 /// and read-only watch channels for its best chain tip.
298 pub async fn new(
299 config: Config,
300 network: &Network,
301 max_checkpoint_height: block::Height,
302 checkpoint_verify_concurrency_limit: usize,
303 ) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
304 let (finalized_state, finalized_tip, timer) = {
305 let config = config.clone();
306 let network = network.clone();
307 tokio::task::spawn_blocking(move || {
308 let timer = CodeTimer::start();
309 let finalized_state = FinalizedState::new(
310 &config,
311 &network,
312 #[cfg(feature = "elasticsearch")]
313 true,
314 );
315 timer.finish(module_path!(), line!(), "opening finalized state database");
316
317 let timer = CodeTimer::start();
318 let finalized_tip = finalized_state.db.tip_block();
319
320 (finalized_state, finalized_tip, timer)
321 })
322 .await
323 .expect("failed to join blocking task")
324 };
325
326 // # Correctness
327 //
328 // The state service must set the finalized block write sender to `None`
329 // if there are blocks in the restored non-finalized state that are above
330 // the max checkpoint height so that non-finalized blocks can be written, otherwise,
331 // Zebra will be unable to commit semantically verified blocks, and its chain sync will stall.
332 //
333 // The state service must not set the finalized block write sender to `None` if there
334 // aren't blocks in the restored non-finalized state that are above the max checkpoint height,
335 // otherwise, unless checkpoint sync is disabled in the zebra-consensus configuration,
336 // Zebra will be unable to commit checkpoint verified blocks, and its chain sync will stall.
337 let is_finalized_tip_past_max_checkpoint = if let Some(tip) = &finalized_tip {
338 tip.coinbase_height().expect("valid block must have height") >= max_checkpoint_height
339 } else {
340 false
341 };
342 let (non_finalized_state, non_finalized_state_sender, non_finalized_state_receiver) =
343 NonFinalizedState::new(network)
344 .with_backup(
345 config.non_finalized_state_backup_dir(network),
346 &finalized_state.db,
347 is_finalized_tip_past_max_checkpoint,
348 )
349 .await;
350
351 let non_finalized_block_write_sent_hashes = SentHashes::new(&non_finalized_state);
352 let initial_tip = non_finalized_state
353 .best_tip_block()
354 .map(|cv_block| cv_block.block.clone())
355 .or(finalized_tip)
356 .map(CheckpointVerifiedBlock::from)
357 .map(ChainTipBlock::from);
358
359 tracing::info!(chain_tip = ?initial_tip.as_ref().map(|tip| (tip.hash, tip.height)), "loaded Zebra state cache");
360
361 let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
362 ChainTipSender::new(initial_tip, network);
363
364 let finalized_state_for_writing = finalized_state.clone();
365 let should_use_finalized_block_write_sender = non_finalized_state.is_chain_set_empty();
366 let (block_write_sender, invalid_block_write_reset_receiver, block_write_task) =
367 write::BlockWriteSender::spawn(
368 finalized_state_for_writing,
369 non_finalized_state,
370 chain_tip_sender,
371 non_finalized_state_sender,
372 should_use_finalized_block_write_sender,
373 );
374
375 let read_service = ReadStateService::new(
376 &finalized_state,
377 block_write_task,
378 non_finalized_state_receiver,
379 );
380
381 let full_verifier_utxo_lookahead = max_checkpoint_height
382 - HeightDiff::try_from(checkpoint_verify_concurrency_limit)
383 .expect("fits in HeightDiff");
384 let full_verifier_utxo_lookahead =
385 full_verifier_utxo_lookahead.unwrap_or(block::Height::MIN);
386 let non_finalized_state_queued_blocks = QueuedBlocks::default();
387 let pending_utxos = PendingUtxos::default();
388
389 let finalized_block_write_last_sent_hash =
390 tokio::task::spawn_blocking(move || finalized_state.db.finalized_tip_hash())
391 .await
392 .expect("failed to join blocking task");
393
394 let state = Self {
395 network: network.clone(),
396 full_verifier_utxo_lookahead,
397 non_finalized_state_queued_blocks,
398 finalized_state_queued_blocks: HashMap::new(),
399 block_write_sender,
400 finalized_block_write_last_sent_hash,
401 non_finalized_block_write_sent_hashes,
402 invalid_block_write_reset_receiver,
403 pending_utxos,
404 last_prune: Instant::now(),
405 read_service: read_service.clone(),
406 max_finalized_queue_height: f64::NAN,
407 };
408 timer.finish(module_path!(), line!(), "initializing state service");
409
410 tracing::info!("starting legacy chain check");
411 let timer = CodeTimer::start();
412
413 if let (Some(tip), Some(nu5_activation_height)) = (
414 {
415 let read_state = state.read_service.clone();
416 tokio::task::spawn_blocking(move || read_state.best_tip())
417 .await
418 .expect("task should not panic")
419 },
420 NetworkUpgrade::Nu5.activation_height(network),
421 ) {
422 if let Err(error) = check::legacy_chain(
423 nu5_activation_height,
424 any_ancestor_blocks(
425 &state.read_service.latest_non_finalized_state(),
426 &state.read_service.db,
427 tip.1,
428 ),
429 &state.network,
430 MAX_LEGACY_CHAIN_BLOCKS,
431 ) {
432 let legacy_db_path = state.read_service.db.path().to_path_buf();
433 panic!(
434 "Cached state contains a legacy chain.\n\
435 An outdated Zebra version did not know about a recent network upgrade,\n\
436 so it followed a legacy chain using outdated consensus branch rules.\n\
437 Hint: Delete your database, and restart Zebra to do a full sync.\n\
438 Database path: {legacy_db_path:?}\n\
439 Error: {error:?}",
440 );
441 }
442 }
443
444 tracing::info!("cached state consensus branch is valid: no legacy chain found");
445 timer.finish(module_path!(), line!(), "legacy chain check");
446
447 // Spawn a background task to periodically export RocksDB metrics to Prometheus
448 let db_for_metrics = read_service.db.clone();
449 tokio::spawn(async move {
450 let mut interval = tokio::time::interval(Duration::from_secs(30));
451 loop {
452 interval.tick().await;
453 db_for_metrics.export_metrics();
454 }
455 });
456
457 (state, read_service, latest_chain_tip, chain_tip_change)
458 }
459
460 /// Call read only state service to log rocksdb database metrics.
461 pub fn log_db_metrics(&self) {
462 self.read_service.db.print_db_metrics();
463 }
464
465 /// Queue a checkpoint verified block for verification and storage in the finalized state.
466 ///
467 /// Returns a channel receiver that provides the result of the block commit.
468 fn queue_and_commit_to_finalized_state(
469 &mut self,
470 checkpoint_verified: CheckpointVerifiedBlock,
471 ) -> oneshot::Receiver<Result<block::Hash, CommitCheckpointVerifiedError>> {
472 // # Correctness & Performance
473 //
474 // This method must not block, access the database, or perform CPU-intensive tasks,
475 // because it is called directly from the tokio executor's Future threads.
476
477 let queued_prev_hash = checkpoint_verified.block.header.previous_block_hash;
478 let queued_height = checkpoint_verified.height;
479
480 // If we're close to the final checkpoint, make the block's UTXOs available for
481 // semantic block verification, even when it is in the channel.
482 if self.is_close_to_final_checkpoint(queued_height) {
483 self.non_finalized_block_write_sent_hashes
484 .add_finalized(&checkpoint_verified)
485 }
486
487 let (rsp_tx, rsp_rx) = oneshot::channel();
488 let queued = (checkpoint_verified, rsp_tx);
489
490 if self.block_write_sender.finalized.is_some() {
491 // We're still committing checkpoint verified blocks
492 if let Some(duplicate_queued) = self
493 .finalized_state_queued_blocks
494 .insert(queued_prev_hash, queued)
495 {
496 Self::send_checkpoint_verified_block_error(
497 duplicate_queued,
498 CommitBlockError::new_duplicate(
499 Some(queued_prev_hash.into()),
500 KnownBlock::Queue,
501 ),
502 );
503 }
504
505 self.drain_finalized_queue_and_commit();
506 } else {
507 // We've finished committing checkpoint verified blocks to the finalized state,
508 // so drop any repeated queued blocks, and return an error.
509 //
510 // TODO: track the latest sent height, and drop any blocks under that height
511 // every time we send some blocks (like QueuedSemanticallyVerifiedBlocks)
512 Self::send_checkpoint_verified_block_error(
513 queued,
514 CommitBlockError::new_duplicate(None, KnownBlock::Finalized),
515 );
516
517 self.clear_finalized_block_queue(CommitBlockError::new_duplicate(
518 None,
519 KnownBlock::Finalized,
520 ));
521 }
522
523 if self.finalized_state_queued_blocks.is_empty() {
524 self.max_finalized_queue_height = f64::NAN;
525 } else if self.max_finalized_queue_height.is_nan()
526 || self.max_finalized_queue_height < queued_height.0 as f64
527 {
528 // if there are still blocks in the queue, then either:
529 // - the new block was lower than the old maximum, and there was a gap before it,
530 // so the maximum is still the same (and we skip this code), or
531 // - the new block is higher than the old maximum, and there is at least one gap
532 // between the finalized tip and the new maximum
533 self.max_finalized_queue_height = queued_height.0 as f64;
534 }
535
536 metrics::gauge!("state.checkpoint.queued.max.height").set(self.max_finalized_queue_height);
537 metrics::gauge!("state.checkpoint.queued.block.count")
538 .set(self.finalized_state_queued_blocks.len() as f64);
539
540 rsp_rx
541 }
542
543 /// Finds finalized state queue blocks to be committed to the state in order,
544 /// removes them from the queue, and sends them to the block commit task.
545 ///
546 /// After queueing a finalized block, this method checks whether the newly
547 /// queued block (and any of its descendants) can be committed to the state.
548 ///
549 /// Returns an error if the block commit channel has been closed.
550 pub fn drain_finalized_queue_and_commit(&mut self) {
551 use tokio::sync::mpsc::error::{SendError, TryRecvError};
552
553 // # Correctness & Performance
554 //
555 // This method must not block, access the database, or perform CPU-intensive tasks,
556 // because it is called directly from the tokio executor's Future threads.
557
558 // If a block failed, we need to start again from a valid tip.
559 match self.invalid_block_write_reset_receiver.try_recv() {
560 Ok(reset_tip_hash) => self.finalized_block_write_last_sent_hash = reset_tip_hash,
561 Err(TryRecvError::Disconnected) => {
562 info!("Block commit task closed the block reset channel. Is Zebra shutting down?");
563 return;
564 }
565 // There are no errors, so we can just use the last block hash we sent
566 Err(TryRecvError::Empty) => {}
567 }
568
569 while let Some(queued_block) = self
570 .finalized_state_queued_blocks
571 .remove(&self.finalized_block_write_last_sent_hash)
572 {
573 let last_sent_finalized_block_height = queued_block.0.height;
574
575 self.finalized_block_write_last_sent_hash = queued_block.0.hash;
576
577 // If we've finished sending finalized blocks, ignore any repeated blocks.
578 // (Blocks can be repeated after a syncer reset.)
579 if let Some(finalized_block_write_sender) = &self.block_write_sender.finalized {
580 let send_result = finalized_block_write_sender.send(queued_block);
581
582 // If the receiver is closed, we can't send any more blocks.
583 if let Err(SendError(queued)) = send_result {
584 // If Zebra is shutting down, drop blocks and return an error.
585 Self::send_checkpoint_verified_block_error(
586 queued,
587 CommitBlockError::WriteTaskExited,
588 );
589
590 self.clear_finalized_block_queue(CommitBlockError::WriteTaskExited);
591 } else {
592 metrics::gauge!("state.checkpoint.sent.block.height")
593 .set(last_sent_finalized_block_height.0 as f64);
594 };
595 }
596 }
597 }
598
599 /// Drops all finalized state queue blocks, and sends an error on their result channels.
600 fn clear_finalized_block_queue(
601 &mut self,
602 error: impl Into<CommitCheckpointVerifiedError> + Clone,
603 ) {
604 for (_hash, queued) in self.finalized_state_queued_blocks.drain() {
605 Self::send_checkpoint_verified_block_error(queued, error.clone());
606 }
607 }
608
609 /// Send an error on a `QueuedCheckpointVerified` block's result channel, and drop the block
610 fn send_checkpoint_verified_block_error(
611 queued: QueuedCheckpointVerified,
612 error: impl Into<CommitCheckpointVerifiedError>,
613 ) {
614 let (finalized, rsp_tx) = queued;
615
616 // The block sender might have already given up on this block,
617 // so ignore any channel send errors.
618 let _ = rsp_tx.send(Err(error.into()));
619 std::mem::drop(finalized);
620 }
621
622 /// Drops all non-finalized state queue blocks, and sends an error on their result channels.
623 fn clear_non_finalized_block_queue(
624 &mut self,
625 error: impl Into<CommitSemanticallyVerifiedError> + Clone,
626 ) {
627 for (_hash, queued) in self.non_finalized_state_queued_blocks.drain() {
628 Self::send_semantically_verified_block_error(queued, error.clone());
629 }
630 }
631
632 /// Send an error on a `QueuedSemanticallyVerified` block's result channel, and drop the block
633 fn send_semantically_verified_block_error(
634 queued: QueuedSemanticallyVerified,
635 error: impl Into<CommitSemanticallyVerifiedError>,
636 ) {
637 let (finalized, rsp_tx) = queued;
638
639 // The block sender might have already given up on this block,
640 // so ignore any channel send errors.
641 let _ = rsp_tx.send(Err(error.into()));
642 std::mem::drop(finalized);
643 }
644
645 /// Queue a semantically verified block for contextual verification and check if any queued
646 /// blocks are ready to be verified and committed to the state.
647 ///
648 /// This function encodes the logic for [committing non-finalized blocks][1]
649 /// in RFC0005.
650 ///
651 /// [1]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html#committing-non-finalized-blocks
652 #[instrument(level = "debug", skip(self, semantically_verified))]
653 fn queue_and_commit_to_non_finalized_state(
654 &mut self,
655 semantically_verified: SemanticallyVerifiedBlock,
656 ) -> oneshot::Receiver<Result<block::Hash, CommitSemanticallyVerifiedError>> {
657 tracing::debug!(block = %semantically_verified.block, "queueing block for contextual verification");
658 let parent_hash = semantically_verified.block.header.previous_block_hash;
659
660 if self
661 .non_finalized_block_write_sent_hashes
662 .contains(&semantically_verified.hash)
663 {
664 let (rsp_tx, rsp_rx) = oneshot::channel();
665 let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
666 Some(semantically_verified.hash.into()),
667 KnownBlock::WriteChannel,
668 )
669 .into()));
670 return rsp_rx;
671 }
672
673 if self
674 .read_service
675 .db
676 .contains_height(semantically_verified.height)
677 {
678 let (rsp_tx, rsp_rx) = oneshot::channel();
679 let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
680 Some(semantically_verified.height.into()),
681 KnownBlock::Finalized,
682 )
683 .into()));
684 return rsp_rx;
685 }
686
687 // [`Request::CommitSemanticallyVerifiedBlock`] contract: a request to commit a block which
688 // has been queued but not yet committed to the state fails the older request and replaces
689 // it with the newer request.
690 let rsp_rx = if let Some((_, old_rsp_tx)) = self
691 .non_finalized_state_queued_blocks
692 .get_mut(&semantically_verified.hash)
693 {
694 tracing::debug!("replacing older queued request with new request");
695 let (mut rsp_tx, rsp_rx) = oneshot::channel();
696 std::mem::swap(old_rsp_tx, &mut rsp_tx);
697 let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
698 Some(semantically_verified.hash.into()),
699 KnownBlock::Queue,
700 )
701 .into()));
702 rsp_rx
703 } else {
704 let (rsp_tx, rsp_rx) = oneshot::channel();
705 self.non_finalized_state_queued_blocks
706 .queue((semantically_verified, rsp_tx));
707 rsp_rx
708 };
709
710 // We've finished sending checkpoint verified blocks when:
711 // - we've sent the verified block for the last checkpoint, and
712 // - it has been successfully written to disk.
713 //
714 // We detect the last checkpoint by looking for non-finalized blocks
715 // that are a child of the last block we sent.
716 //
717 // TODO: configure the state with the last checkpoint hash instead?
718 if self.block_write_sender.finalized.is_some()
719 && self
720 .non_finalized_state_queued_blocks
721 .has_queued_children(self.finalized_block_write_last_sent_hash)
722 && self.read_service.db.finalized_tip_hash()
723 == self.finalized_block_write_last_sent_hash
724 {
725 // Tell the block write task to stop committing checkpoint verified blocks to the finalized state,
726 // and move on to committing semantically verified blocks to the non-finalized state.
727 std::mem::drop(self.block_write_sender.finalized.take());
728 // Remove any checkpoint-verified block hashes from `non_finalized_block_write_sent_hashes`.
729 self.non_finalized_block_write_sent_hashes = SentHashes::default();
730 // Mark `SentHashes` as usable by the `can_fork_chain_at()` method.
731 self.non_finalized_block_write_sent_hashes
732 .can_fork_chain_at_hashes = true;
733 // Send blocks from non-finalized queue
734 self.send_ready_non_finalized_queued(self.finalized_block_write_last_sent_hash);
735 // We've finished committing checkpoint verified blocks to finalized state, so drop any repeated queued blocks.
736 self.clear_finalized_block_queue(CommitBlockError::new_duplicate(
737 None,
738 KnownBlock::Finalized,
739 ));
740 } else if !self.can_fork_chain_at(&parent_hash) {
741 tracing::trace!("unready to verify, returning early");
742 } else if self.block_write_sender.finalized.is_none() {
743 // Wait until block commit task is ready to write non-finalized blocks before dequeuing them
744 self.send_ready_non_finalized_queued(parent_hash);
745
746 let finalized_tip_height = self.read_service.db.finalized_tip_height().expect(
747 "Finalized state must have at least one block before committing non-finalized state",
748 );
749
750 self.non_finalized_state_queued_blocks
751 .prune_by_height(finalized_tip_height);
752
753 self.non_finalized_block_write_sent_hashes
754 .prune_by_height(finalized_tip_height);
755 }
756
757 rsp_rx
758 }
759
760 /// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks.
761 fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
762 self.non_finalized_block_write_sent_hashes
763 .can_fork_chain_at(hash)
764 || &self.read_service.db.finalized_tip_hash() == hash
765 }
766
767 /// Returns `true` if `queued_height` is near the final checkpoint.
768 ///
769 /// The semantic block verifier needs access to UTXOs from checkpoint verified blocks
770 /// near the final checkpoint, so that it can verify blocks that spend those UTXOs.
771 ///
772 /// If it doesn't have the required UTXOs, some blocks will time out,
773 /// but succeed after a syncer restart.
774 fn is_close_to_final_checkpoint(&self, queued_height: block::Height) -> bool {
775 queued_height >= self.full_verifier_utxo_lookahead
776 }
777
778 /// Sends all queued blocks whose parents have recently arrived starting from `new_parent`
779 /// in breadth-first ordering to the block write task which will attempt to validate and commit them
780 #[tracing::instrument(level = "debug", skip(self, new_parent))]
781 fn send_ready_non_finalized_queued(&mut self, new_parent: block::Hash) {
782 use tokio::sync::mpsc::error::SendError;
783 if let Some(non_finalized_block_write_sender) = &self.block_write_sender.non_finalized {
784 let mut new_parents: Vec<block::Hash> = vec![new_parent];
785
786 while let Some(parent_hash) = new_parents.pop() {
787 let queued_children = self
788 .non_finalized_state_queued_blocks
789 .dequeue_children(parent_hash);
790
791 for queued_child in queued_children {
792 let (SemanticallyVerifiedBlock { hash, .. }, _) = queued_child;
793
794 self.non_finalized_block_write_sent_hashes
795 .add(&queued_child.0);
796 let send_result = non_finalized_block_write_sender.send(queued_child.into());
797
798 if let Err(SendError(NonFinalizedWriteMessage::Commit(queued))) = send_result {
799 // If Zebra is shutting down, drop blocks and return an error.
800 Self::send_semantically_verified_block_error(
801 queued,
802 CommitBlockError::WriteTaskExited,
803 );
804
805 self.clear_non_finalized_block_queue(CommitBlockError::WriteTaskExited);
806
807 return;
808 };
809
810 new_parents.push(hash);
811 }
812 }
813
814 self.non_finalized_block_write_sent_hashes.finish_batch();
815 };
816 }
817
818 /// Return the tip of the current best chain.
819 pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
820 self.read_service.best_tip()
821 }
822
823 fn send_invalidate_block(
824 &self,
825 hash: block::Hash,
826 ) -> oneshot::Receiver<Result<block::Hash, InvalidateError>> {
827 let (rsp_tx, rsp_rx) = oneshot::channel();
828
829 let Some(sender) = &self.block_write_sender.non_finalized else {
830 let _ = rsp_tx.send(Err(InvalidateError::ProcessingCheckpointedBlocks));
831 return rsp_rx;
832 };
833
834 if let Err(tokio::sync::mpsc::error::SendError(error)) =
835 sender.send(NonFinalizedWriteMessage::Invalidate { hash, rsp_tx })
836 {
837 let NonFinalizedWriteMessage::Invalidate { rsp_tx, .. } = error else {
838 unreachable!("should return the same Invalidate message could not be sent");
839 };
840
841 let _ = rsp_tx.send(Err(InvalidateError::SendInvalidateRequestFailed));
842 }
843
844 rsp_rx
845 }
846
847 fn send_reconsider_block(
848 &self,
849 hash: block::Hash,
850 ) -> oneshot::Receiver<Result<Vec<block::Hash>, ReconsiderError>> {
851 let (rsp_tx, rsp_rx) = oneshot::channel();
852
853 let Some(sender) = &self.block_write_sender.non_finalized else {
854 let _ = rsp_tx.send(Err(ReconsiderError::CheckpointCommitInProgress));
855 return rsp_rx;
856 };
857
858 if let Err(tokio::sync::mpsc::error::SendError(error)) =
859 sender.send(NonFinalizedWriteMessage::Reconsider { hash, rsp_tx })
860 {
861 let NonFinalizedWriteMessage::Reconsider { rsp_tx, .. } = error else {
862 unreachable!("should return the same Reconsider message could not be sent");
863 };
864
865 let _ = rsp_tx.send(Err(ReconsiderError::ReconsiderSendFailed));
866 }
867
868 rsp_rx
869 }
870
871 /// Assert some assumptions about the semantically verified `block` before it is queued.
872 fn assert_block_can_be_validated(&self, block: &SemanticallyVerifiedBlock) {
873 // required by `Request::CommitSemanticallyVerifiedBlock` call
874 assert!(
875 block.height > self.network.mandatory_checkpoint_height(),
876 "invalid semantically verified block height: the canopy checkpoint is mandatory, pre-canopy \
877 blocks, and the canopy activation block, must be committed to the state as finalized \
878 blocks"
879 );
880 }
881
882 fn known_sent_hash(&self, hash: &block::Hash) -> Option<KnownBlock> {
883 self.non_finalized_block_write_sent_hashes
884 .contains(hash)
885 .then_some(KnownBlock::WriteChannel)
886 }
887}
888
889impl ReadStateService {
890 /// Creates a new read-only state service, using the provided finalized state and
891 /// block write task handle.
892 ///
893 /// Returns the newly created service,
894 /// and a watch channel for updating the shared recent non-finalized chain.
895 pub(crate) fn new(
896 finalized_state: &FinalizedState,
897 block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
898 non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
899 ) -> Self {
900 let read_service = Self {
901 network: finalized_state.network(),
902 db: finalized_state.db.clone(),
903 non_finalized_state_receiver,
904 block_write_task,
905 };
906
907 tracing::debug!("created new read-only state service");
908
909 read_service
910 }
911
912 /// Return the tip of the current best chain.
913 pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
914 read::best_tip(&self.latest_non_finalized_state(), &self.db)
915 }
916
917 /// Gets a clone of the latest non-finalized state from the `non_finalized_state_receiver`
918 fn latest_non_finalized_state(&self) -> NonFinalizedState {
919 self.non_finalized_state_receiver.cloned_watch_data()
920 }
921
922 /// Gets a clone of the latest, best non-finalized chain from the `non_finalized_state_receiver`
923 #[allow(dead_code)]
924 fn latest_best_chain(&self) -> Option<Arc<Chain>> {
925 self.latest_non_finalized_state().best_chain().cloned()
926 }
927
928 /// Test-only access to the inner database.
929 /// Can be used to modify the database without doing any consensus checks.
930 #[cfg(any(test, feature = "proptest-impl"))]
931 pub fn db(&self) -> &ZebraDb {
932 &self.db
933 }
934
935 /// Logs rocksdb metrics using the read only state service.
936 pub fn log_db_metrics(&self) {
937 self.db.print_db_metrics();
938 }
939}
940
941impl Service<Request> for StateService {
942 type Response = Response;
943 type Error = BoxError;
944 type Future =
945 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
946
947 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
948 // Check for panics in the block write task
949 let poll = self.read_service.poll_ready(cx);
950
951 // Prune outdated UTXO requests
952 let now = Instant::now();
953
954 if self.last_prune + Self::PRUNE_INTERVAL < now {
955 let tip = self.best_tip();
956 let old_len = self.pending_utxos.len();
957
958 self.pending_utxos.prune();
959 self.last_prune = now;
960
961 let new_len = self.pending_utxos.len();
962 let prune_count = old_len
963 .checked_sub(new_len)
964 .expect("prune does not add any utxo requests");
965 if prune_count > 0 {
966 tracing::debug!(
967 ?old_len,
968 ?new_len,
969 ?prune_count,
970 ?tip,
971 "pruned utxo requests"
972 );
973 } else {
974 tracing::debug!(len = ?old_len, ?tip, "no utxo requests needed pruning");
975 }
976 }
977
978 poll
979 }
980
981 #[instrument(name = "state", skip(self, req))]
982 fn call(&mut self, req: Request) -> Self::Future {
983 req.count_metric();
984 let timer = CodeTimer::start();
985 let span = Span::current();
986
987 match req {
988 // Uses non_finalized_state_queued_blocks and pending_utxos in the StateService
989 // Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb.
990 //
991 // The expected error type for this request is `CommitSemanticallyVerifiedError`.
992 Request::CommitSemanticallyVerifiedBlock(semantically_verified) => {
993 self.assert_block_can_be_validated(&semantically_verified);
994
995 self.pending_utxos
996 .check_against_ordered(&semantically_verified.new_outputs);
997
998 // # Performance
999 //
1000 // Allow other async tasks to make progress while blocks are being verified
1001 // and written to disk. But wait for the blocks to finish committing,
1002 // so that `StateService` multi-block queries always observe a consistent state.
1003 //
1004 // Since each block is spawned into its own task,
1005 // there shouldn't be any other code running in the same task,
1006 // so we don't need to worry about blocking it:
1007 // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
1008
1009 let rsp_rx = tokio::task::block_in_place(move || {
1010 span.in_scope(|| {
1011 self.queue_and_commit_to_non_finalized_state(semantically_verified)
1012 })
1013 });
1014
1015 // TODO:
1016 // - check for panics in the block write task here,
1017 // as well as in poll_ready()
1018
1019 // The work is all done, the future just waits on a channel for the result
1020 timer.finish(module_path!(), line!(), "CommitSemanticallyVerifiedBlock");
1021
1022 // Await the channel response, flatten the result, map receive errors to
1023 // `CommitSemanticallyVerifiedError::WriteTaskExited`.
1024 // Then flatten the nested Result and convert any errors to a BoxError.
1025 let span = Span::current();
1026 async move {
1027 rsp_rx
1028 .await
1029 .map_err(|_recv_error| CommitBlockError::WriteTaskExited.into())
1030 .and_then(|result| result)
1031 .map_err(BoxError::from)
1032 .map(Response::Committed)
1033 }
1034 .instrument(span)
1035 .boxed()
1036 }
1037
1038 // Uses finalized_state_queued_blocks and pending_utxos in the StateService.
1039 // Accesses shared writeable state in the StateService.
1040 //
1041 // The expected error type for this request is `CommitCheckpointVerifiedError`.
1042 Request::CommitCheckpointVerifiedBlock(finalized) => {
1043 // # Consensus
1044 //
1045 // A semantic block verification could have called AwaitUtxo
1046 // before this checkpoint verified block arrived in the state.
1047 // So we need to check for pending UTXO requests sent by running
1048 // semantic block verifications.
1049 //
1050 // This check is redundant for most checkpoint verified blocks,
1051 // because semantic verification can only succeed near the final
1052 // checkpoint, when all the UTXOs are available for the verifying block.
1053 //
1054 // (Checkpoint block UTXOs are verified using block hash checkpoints
1055 // and transaction merkle tree block header commitments.)
1056 self.pending_utxos
1057 .check_against_ordered(&finalized.new_outputs);
1058
1059 // # Performance
1060 //
1061 // This method doesn't block, access the database, or perform CPU-intensive tasks,
1062 // so we can run it directly in the tokio executor's Future threads.
1063 let rsp_rx = self.queue_and_commit_to_finalized_state(finalized);
1064
1065 // TODO:
1066 // - check for panics in the block write task here,
1067 // as well as in poll_ready()
1068
1069 // The work is all done, the future just waits on a channel for the result
1070 timer.finish(module_path!(), line!(), "CommitCheckpointVerifiedBlock");
1071
1072 // Await the channel response, flatten the result, map receive errors to
1073 // `CommitCheckpointVerifiedError::WriteTaskExited`.
1074 // Then flatten the nested Result and convert any errors to a BoxError.
1075 async move {
1076 rsp_rx
1077 .await
1078 .map_err(|_recv_error| CommitBlockError::WriteTaskExited.into())
1079 .and_then(|result| result)
1080 .map_err(BoxError::from)
1081 .map(Response::Committed)
1082 }
1083 .instrument(span)
1084 .boxed()
1085 }
1086
1087 // Uses pending_utxos and non_finalized_state_queued_blocks in the StateService.
1088 // If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService.
1089 Request::AwaitUtxo(outpoint) => {
1090 // Prepare the AwaitUtxo future from PendingUxtos.
1091 let response_fut = self.pending_utxos.queue(outpoint);
1092 // Only instrument `response_fut`, the ReadStateService already
1093 // instruments its requests with the same span.
1094
1095 let response_fut = response_fut.instrument(span).boxed();
1096
1097 // Check the non-finalized block queue outside the returned future,
1098 // so we can access mutable state fields.
1099 if let Some(utxo) = self.non_finalized_state_queued_blocks.utxo(&outpoint) {
1100 self.pending_utxos.respond(&outpoint, utxo);
1101
1102 // We're finished, the returned future gets the UTXO from the respond() channel.
1103 timer.finish(module_path!(), line!(), "AwaitUtxo/queued-non-finalized");
1104
1105 return response_fut;
1106 }
1107
1108 // Check the sent non-finalized blocks
1109 if let Some(utxo) = self.non_finalized_block_write_sent_hashes.utxo(&outpoint) {
1110 self.pending_utxos.respond(&outpoint, utxo);
1111
1112 // We're finished, the returned future gets the UTXO from the respond() channel.
1113 timer.finish(module_path!(), line!(), "AwaitUtxo/sent-non-finalized");
1114
1115 return response_fut;
1116 }
1117
1118 // We ignore any UTXOs in FinalizedState.finalized_state_queued_blocks,
1119 // because it is only used during checkpoint verification.
1120 //
1121 // This creates a rare race condition, but it doesn't seem to happen much in practice.
1122 // See #5126 for details.
1123
1124 // Manually send a request to the ReadStateService,
1125 // to get UTXOs from any non-finalized chain or the finalized chain.
1126 let read_service = self.read_service.clone();
1127
1128 // Run the request in an async block, so we can await the response.
1129 async move {
1130 let req = ReadRequest::AnyChainUtxo(outpoint);
1131
1132 let rsp = read_service.oneshot(req).await?;
1133
1134 // Optional TODO:
1135 // - make pending_utxos.respond() async using a channel,
1136 // so we can respond to all waiting requests here
1137 //
1138 // This change is not required for correctness, because:
1139 // - any waiting requests should have returned when the block was sent to the state
1140 // - otherwise, the request returns immediately if:
1141 // - the block is in the non-finalized queue, or
1142 // - the block is in any non-finalized chain or the finalized state
1143 //
1144 // And if the block is in the finalized queue,
1145 // that's rare enough that a retry is ok.
1146 if let ReadResponse::AnyChainUtxo(Some(utxo)) = rsp {
1147 // We got a UTXO, so we replace the response future with the result own.
1148 timer.finish(module_path!(), line!(), "AwaitUtxo/any-chain");
1149
1150 return Ok(Response::Utxo(utxo));
1151 }
1152
1153 // We're finished, but the returned future is waiting on the respond() channel.
1154 timer.finish(module_path!(), line!(), "AwaitUtxo/waiting");
1155
1156 response_fut.await
1157 }
1158 .boxed()
1159 }
1160
1161 // Used by sync, inbound, and block verifier to check if a block is already in the state
1162 // before downloading or validating it.
1163 Request::KnownBlock(hash) => {
1164 let timer = CodeTimer::start();
1165
1166 let sent_hash_response = self.known_sent_hash(&hash);
1167 let read_service = self.read_service.clone();
1168
1169 async move {
1170 if sent_hash_response.is_some() {
1171 return Ok(Response::KnownBlock(sent_hash_response));
1172 };
1173
1174 let response = read::non_finalized_state_contains_block_hash(
1175 &read_service.latest_non_finalized_state(),
1176 hash,
1177 )
1178 .or_else(|| read::finalized_state_contains_block_hash(&read_service.db, hash));
1179
1180 // The work is done in the future.
1181 timer.finish(module_path!(), line!(), "Request::KnownBlock");
1182
1183 Ok(Response::KnownBlock(response))
1184 }
1185 .boxed()
1186 }
1187
1188 // The expected error type for this request is `InvalidateError`
1189 Request::InvalidateBlock(block_hash) => {
1190 let rsp_rx = tokio::task::block_in_place(move || {
1191 span.in_scope(|| self.send_invalidate_block(block_hash))
1192 });
1193
1194 // Await the channel response, flatten the result, map receive errors to
1195 // `InvalidateError::InvalidateRequestDropped`.
1196 // Then flatten the nested Result and convert any errors to a BoxError.
1197 let span = Span::current();
1198 async move {
1199 rsp_rx
1200 .await
1201 .map_err(|_recv_error| InvalidateError::InvalidateRequestDropped)
1202 .and_then(|result| result)
1203 .map_err(BoxError::from)
1204 .map(Response::Invalidated)
1205 }
1206 .instrument(span)
1207 .boxed()
1208 }
1209
1210 // The expected error type for this request is `ReconsiderError`
1211 Request::ReconsiderBlock(block_hash) => {
1212 let rsp_rx = tokio::task::block_in_place(move || {
1213 span.in_scope(|| self.send_reconsider_block(block_hash))
1214 });
1215
1216 // Await the channel response, flatten the result, map receive errors to
1217 // `ReconsiderError::ReconsiderResponseDropped`.
1218 // Then flatten the nested Result and convert any errors to a BoxError.
1219 let span = Span::current();
1220 async move {
1221 rsp_rx
1222 .await
1223 .map_err(|_recv_error| ReconsiderError::ReconsiderResponseDropped)
1224 .and_then(|result| result)
1225 .map_err(BoxError::from)
1226 .map(Response::Reconsidered)
1227 }
1228 .instrument(span)
1229 .boxed()
1230 }
1231
1232 // Runs concurrently using the ReadStateService
1233 Request::Tip
1234 | Request::Depth(_)
1235 | Request::BestChainNextMedianTimePast
1236 | Request::BestChainBlockHash(_)
1237 | Request::BlockLocator
1238 | Request::Transaction(_)
1239 | Request::AnyChainTransaction(_)
1240 | Request::UnspentBestChainUtxo(_)
1241 | Request::Block(_)
1242 | Request::BlockAndSize(_)
1243 | Request::BlockHeader(_)
1244 | Request::FindBlockHashes { .. }
1245 | Request::FindBlockHeaders { .. }
1246 | Request::CheckBestChainTipNullifiersAndAnchors(_) => {
1247 // Redirect the request to the concurrent ReadStateService
1248 let read_service = self.read_service.clone();
1249
1250 async move {
1251 let req = req
1252 .try_into()
1253 .expect("ReadRequest conversion should not fail");
1254
1255 let rsp = read_service.oneshot(req).await?;
1256 let rsp = rsp.try_into().expect("Response conversion should not fail");
1257
1258 Ok(rsp)
1259 }
1260 .boxed()
1261 }
1262
1263 Request::CheckBlockProposalValidity(_) => {
1264 // Redirect the request to the concurrent ReadStateService
1265 let read_service = self.read_service.clone();
1266
1267 async move {
1268 let req = req
1269 .try_into()
1270 .expect("ReadRequest conversion should not fail");
1271
1272 let rsp = read_service.oneshot(req).await?;
1273 let rsp = rsp.try_into().expect("Response conversion should not fail");
1274
1275 Ok(rsp)
1276 }
1277 .boxed()
1278 }
1279 }
1280 }
1281}
1282
1283impl Service<ReadRequest> for ReadStateService {
1284 type Response = ReadResponse;
1285 type Error = BoxError;
1286 type Future =
1287 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1288
1289 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1290 // Check for panics in the block write task
1291 //
1292 // TODO: move into a check_for_panics() method
1293 let block_write_task = self.block_write_task.take();
1294
1295 if let Some(block_write_task) = block_write_task {
1296 if block_write_task.is_finished() {
1297 if let Some(block_write_task) = Arc::into_inner(block_write_task) {
1298 // We are the last state with a reference to this task, so we can propagate any panics
1299 if let Err(thread_panic) = block_write_task.join() {
1300 std::panic::resume_unwind(thread_panic);
1301 }
1302 }
1303 } else {
1304 // It hasn't finished, so we need to put it back
1305 self.block_write_task = Some(block_write_task);
1306 }
1307 }
1308
1309 self.db.check_for_panics();
1310
1311 Poll::Ready(Ok(()))
1312 }
1313
1314 #[instrument(name = "read_state", skip(self, req))]
1315 fn call(&mut self, req: ReadRequest) -> Self::Future {
1316 req.count_metric();
1317 let timer = CodeTimer::start();
1318 let span = Span::current();
1319
1320 match req {
1321 // Used by the `getblockchaininfo` RPC.
1322 ReadRequest::UsageInfo => {
1323 let db = self.db.clone();
1324
1325 tokio::task::spawn_blocking(move || {
1326 span.in_scope(move || {
1327 // The work is done in the future.
1328
1329 let db_size = db.size();
1330
1331 timer.finish(module_path!(), line!(), "ReadRequest::UsageInfo");
1332
1333 Ok(ReadResponse::UsageInfo(db_size))
1334 })
1335 })
1336 .wait_for_panics()
1337 }
1338
1339 // Used by the StateService.
1340 ReadRequest::Tip => {
1341 let state = self.clone();
1342
1343 tokio::task::spawn_blocking(move || {
1344 span.in_scope(move || {
1345 let tip = state.non_finalized_state_receiver.with_watch_data(
1346 |non_finalized_state| {
1347 read::tip(non_finalized_state.best_chain(), &state.db)
1348 },
1349 );
1350
1351 // The work is done in the future.
1352 timer.finish(module_path!(), line!(), "ReadRequest::Tip");
1353
1354 Ok(ReadResponse::Tip(tip))
1355 })
1356 })
1357 .wait_for_panics()
1358 }
1359
1360 // Used by `getblockchaininfo` RPC method.
1361 ReadRequest::TipPoolValues => {
1362 let state = self.clone();
1363
1364 tokio::task::spawn_blocking(move || {
1365 span.in_scope(move || {
1366 let tip_with_value_balance = state
1367 .non_finalized_state_receiver
1368 .with_watch_data(|non_finalized_state| {
1369 read::tip_with_value_balance(
1370 non_finalized_state.best_chain(),
1371 &state.db,
1372 )
1373 });
1374
1375 // The work is done in the future.
1376 // TODO: Do this in the Drop impl with the variant name?
1377 timer.finish(module_path!(), line!(), "ReadRequest::TipPoolValues");
1378
1379 let (tip_height, tip_hash, value_balance) = tip_with_value_balance?
1380 .ok_or(BoxError::from("no chain tip available yet"))?;
1381
1382 Ok(ReadResponse::TipPoolValues {
1383 tip_height,
1384 tip_hash,
1385 value_balance,
1386 })
1387 })
1388 })
1389 .wait_for_panics()
1390 }
1391
1392 // Used by getblock
1393 ReadRequest::BlockInfo(hash_or_height) => {
1394 let state = self.clone();
1395
1396 tokio::task::spawn_blocking(move || {
1397 span.in_scope(move || {
1398 let value_balance = state.non_finalized_state_receiver.with_watch_data(
1399 |non_finalized_state| {
1400 read::block_info(
1401 non_finalized_state.best_chain(),
1402 &state.db,
1403 hash_or_height,
1404 )
1405 },
1406 );
1407
1408 // The work is done in the future.
1409 // TODO: Do this in the Drop impl with the variant name?
1410 timer.finish(module_path!(), line!(), "ReadRequest::BlockInfo");
1411
1412 Ok(ReadResponse::BlockInfo(value_balance))
1413 })
1414 })
1415 .wait_for_panics()
1416 }
1417
1418 // Used by the StateService.
1419 ReadRequest::Depth(hash) => {
1420 let state = self.clone();
1421
1422 tokio::task::spawn_blocking(move || {
1423 span.in_scope(move || {
1424 let depth = state.non_finalized_state_receiver.with_watch_data(
1425 |non_finalized_state| {
1426 read::depth(non_finalized_state.best_chain(), &state.db, hash)
1427 },
1428 );
1429
1430 // The work is done in the future.
1431 timer.finish(module_path!(), line!(), "ReadRequest::Depth");
1432
1433 Ok(ReadResponse::Depth(depth))
1434 })
1435 })
1436 .wait_for_panics()
1437 }
1438
1439 // Used by the StateService.
1440 ReadRequest::BestChainNextMedianTimePast => {
1441 let state = self.clone();
1442
1443 tokio::task::spawn_blocking(move || {
1444 span.in_scope(move || {
1445 let non_finalized_state = state.latest_non_finalized_state();
1446 let median_time_past =
1447 read::next_median_time_past(&non_finalized_state, &state.db);
1448
1449 // The work is done in the future.
1450 timer.finish(
1451 module_path!(),
1452 line!(),
1453 "ReadRequest::BestChainNextMedianTimePast",
1454 );
1455
1456 Ok(ReadResponse::BestChainNextMedianTimePast(median_time_past?))
1457 })
1458 })
1459 .wait_for_panics()
1460 }
1461
1462 // Used by the get_block (raw) RPC and the StateService.
1463 ReadRequest::Block(hash_or_height) => {
1464 let state = self.clone();
1465
1466 tokio::task::spawn_blocking(move || {
1467 span.in_scope(move || {
1468 let block = state.non_finalized_state_receiver.with_watch_data(
1469 |non_finalized_state| {
1470 read::block(
1471 non_finalized_state.best_chain(),
1472 &state.db,
1473 hash_or_height,
1474 )
1475 },
1476 );
1477
1478 // The work is done in the future.
1479 timer.finish(module_path!(), line!(), "ReadRequest::Block");
1480
1481 Ok(ReadResponse::Block(block))
1482 })
1483 })
1484 .wait_for_panics()
1485 }
1486
1487 // Used by the get_block (raw) RPC and the StateService.
1488 ReadRequest::BlockAndSize(hash_or_height) => {
1489 let state = self.clone();
1490
1491 tokio::task::spawn_blocking(move || {
1492 span.in_scope(move || {
1493 let block_and_size = state.non_finalized_state_receiver.with_watch_data(
1494 |non_finalized_state| {
1495 read::block_and_size(
1496 non_finalized_state.best_chain(),
1497 &state.db,
1498 hash_or_height,
1499 )
1500 },
1501 );
1502
1503 // The work is done in the future.
1504 timer.finish(module_path!(), line!(), "ReadRequest::BlockAndSize");
1505
1506 Ok(ReadResponse::BlockAndSize(block_and_size))
1507 })
1508 })
1509 .wait_for_panics()
1510 }
1511
1512 // Used by the get_block (verbose) RPC and the StateService.
1513 ReadRequest::BlockHeader(hash_or_height) => {
1514 let state = self.clone();
1515
1516 tokio::task::spawn_blocking(move || {
1517 span.in_scope(move || {
1518 let best_chain = state.latest_best_chain();
1519
1520 let height = hash_or_height
1521 .height_or_else(|hash| {
1522 read::find::height_by_hash(best_chain.clone(), &state.db, hash)
1523 })
1524 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1525
1526 let hash = hash_or_height
1527 .hash_or_else(|height| {
1528 read::find::hash_by_height(best_chain.clone(), &state.db, height)
1529 })
1530 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1531
1532 let next_height = height.next()?;
1533 let next_block_hash =
1534 read::find::hash_by_height(best_chain.clone(), &state.db, next_height);
1535
1536 let header = read::block_header(best_chain, &state.db, height.into())
1537 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1538
1539 // The work is done in the future.
1540 timer.finish(module_path!(), line!(), "ReadRequest::Block");
1541
1542 Ok(ReadResponse::BlockHeader {
1543 header,
1544 hash,
1545 height,
1546 next_block_hash,
1547 })
1548 })
1549 })
1550 .wait_for_panics()
1551 }
1552
1553 // For the get_raw_transaction RPC and the StateService.
1554 ReadRequest::Transaction(hash) => {
1555 let state = self.clone();
1556
1557 tokio::task::spawn_blocking(move || {
1558 span.in_scope(move || {
1559 let response =
1560 read::mined_transaction(state.latest_best_chain(), &state.db, hash);
1561
1562 // The work is done in the future.
1563 timer.finish(module_path!(), line!(), "ReadRequest::Transaction");
1564
1565 Ok(ReadResponse::Transaction(response))
1566 })
1567 })
1568 .wait_for_panics()
1569 }
1570
1571 ReadRequest::AnyChainTransaction(hash) => {
1572 let state = self.clone();
1573
1574 tokio::task::spawn_blocking(move || {
1575 span.in_scope(move || {
1576 let tx = state.non_finalized_state_receiver.with_watch_data(
1577 |non_finalized_state| {
1578 read::any_transaction(
1579 non_finalized_state.chain_iter(),
1580 &state.db,
1581 hash,
1582 )
1583 },
1584 );
1585
1586 // The work is done in the future.
1587 timer.finish(module_path!(), line!(), "ReadRequest::AnyChainTransaction");
1588
1589 Ok(ReadResponse::AnyChainTransaction(tx))
1590 })
1591 })
1592 .wait_for_panics()
1593 }
1594
1595 // Used by the getblock (verbose) RPC.
1596 ReadRequest::TransactionIdsForBlock(hash_or_height) => {
1597 let state = self.clone();
1598
1599 tokio::task::spawn_blocking(move || {
1600 span.in_scope(move || {
1601 let transaction_ids = state.non_finalized_state_receiver.with_watch_data(
1602 |non_finalized_state| {
1603 read::transaction_hashes_for_block(
1604 non_finalized_state.best_chain(),
1605 &state.db,
1606 hash_or_height,
1607 )
1608 },
1609 );
1610
1611 // The work is done in the future.
1612 timer.finish(
1613 module_path!(),
1614 line!(),
1615 "ReadRequest::TransactionIdsForBlock",
1616 );
1617
1618 Ok(ReadResponse::TransactionIdsForBlock(transaction_ids))
1619 })
1620 })
1621 .wait_for_panics()
1622 }
1623
1624 ReadRequest::AnyChainTransactionIdsForBlock(hash_or_height) => {
1625 let state = self.clone();
1626
1627 tokio::task::spawn_blocking(move || {
1628 span.in_scope(move || {
1629 let transaction_ids = state.non_finalized_state_receiver.with_watch_data(
1630 |non_finalized_state| {
1631 read::transaction_hashes_for_any_block(
1632 non_finalized_state.chain_iter(),
1633 &state.db,
1634 hash_or_height,
1635 )
1636 },
1637 );
1638
1639 // The work is done in the future.
1640 timer.finish(
1641 module_path!(),
1642 line!(),
1643 "ReadRequest::AnyChainTransactionIdsForBlock",
1644 );
1645
1646 Ok(ReadResponse::AnyChainTransactionIdsForBlock(
1647 transaction_ids,
1648 ))
1649 })
1650 })
1651 .wait_for_panics()
1652 }
1653
1654 #[cfg(feature = "indexer")]
1655 ReadRequest::SpendingTransactionId(spend) => {
1656 let state = self.clone();
1657
1658 tokio::task::spawn_blocking(move || {
1659 span.in_scope(move || {
1660 let spending_transaction_id = state
1661 .non_finalized_state_receiver
1662 .with_watch_data(|non_finalized_state| {
1663 read::spending_transaction_hash(
1664 non_finalized_state.best_chain(),
1665 &state.db,
1666 spend,
1667 )
1668 });
1669
1670 // The work is done in the future.
1671 timer.finish(
1672 module_path!(),
1673 line!(),
1674 "ReadRequest::TransactionIdForSpentOutPoint",
1675 );
1676
1677 Ok(ReadResponse::TransactionId(spending_transaction_id))
1678 })
1679 })
1680 .wait_for_panics()
1681 }
1682
1683 ReadRequest::UnspentBestChainUtxo(outpoint) => {
1684 let state = self.clone();
1685
1686 tokio::task::spawn_blocking(move || {
1687 span.in_scope(move || {
1688 let utxo = state.non_finalized_state_receiver.with_watch_data(
1689 |non_finalized_state| {
1690 read::unspent_utxo(
1691 non_finalized_state.best_chain(),
1692 &state.db,
1693 outpoint,
1694 )
1695 },
1696 );
1697
1698 // The work is done in the future.
1699 timer.finish(module_path!(), line!(), "ReadRequest::UnspentBestChainUtxo");
1700
1701 Ok(ReadResponse::UnspentBestChainUtxo(utxo))
1702 })
1703 })
1704 .wait_for_panics()
1705 }
1706
1707 // Manually used by the StateService to implement part of AwaitUtxo.
1708 ReadRequest::AnyChainUtxo(outpoint) => {
1709 let state = self.clone();
1710
1711 tokio::task::spawn_blocking(move || {
1712 span.in_scope(move || {
1713 let utxo = state.non_finalized_state_receiver.with_watch_data(
1714 |non_finalized_state| {
1715 read::any_utxo(non_finalized_state, &state.db, outpoint)
1716 },
1717 );
1718
1719 // The work is done in the future.
1720 timer.finish(module_path!(), line!(), "ReadRequest::AnyChainUtxo");
1721
1722 Ok(ReadResponse::AnyChainUtxo(utxo))
1723 })
1724 })
1725 .wait_for_panics()
1726 }
1727
1728 // Used by the StateService.
1729 ReadRequest::BlockLocator => {
1730 let state = self.clone();
1731
1732 tokio::task::spawn_blocking(move || {
1733 span.in_scope(move || {
1734 let block_locator = state.non_finalized_state_receiver.with_watch_data(
1735 |non_finalized_state| {
1736 read::block_locator(non_finalized_state.best_chain(), &state.db)
1737 },
1738 );
1739
1740 // The work is done in the future.
1741 timer.finish(module_path!(), line!(), "ReadRequest::BlockLocator");
1742
1743 Ok(ReadResponse::BlockLocator(
1744 block_locator.unwrap_or_default(),
1745 ))
1746 })
1747 })
1748 .wait_for_panics()
1749 }
1750
1751 // Used by the StateService.
1752 ReadRequest::FindBlockHashes { known_blocks, stop } => {
1753 let state = self.clone();
1754
1755 tokio::task::spawn_blocking(move || {
1756 span.in_scope(move || {
1757 let block_hashes = state.non_finalized_state_receiver.with_watch_data(
1758 |non_finalized_state| {
1759 read::find_chain_hashes(
1760 non_finalized_state.best_chain(),
1761 &state.db,
1762 known_blocks,
1763 stop,
1764 MAX_FIND_BLOCK_HASHES_RESULTS,
1765 )
1766 },
1767 );
1768
1769 // The work is done in the future.
1770 timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHashes");
1771
1772 Ok(ReadResponse::BlockHashes(block_hashes))
1773 })
1774 })
1775 .wait_for_panics()
1776 }
1777
1778 // Used by the StateService.
1779 ReadRequest::FindBlockHeaders { known_blocks, stop } => {
1780 let state = self.clone();
1781
1782 tokio::task::spawn_blocking(move || {
1783 span.in_scope(move || {
1784 let block_headers = state.non_finalized_state_receiver.with_watch_data(
1785 |non_finalized_state| {
1786 read::find_chain_headers(
1787 non_finalized_state.best_chain(),
1788 &state.db,
1789 known_blocks,
1790 stop,
1791 MAX_FIND_BLOCK_HEADERS_RESULTS,
1792 )
1793 },
1794 );
1795
1796 let block_headers = block_headers
1797 .into_iter()
1798 .map(|header| CountedHeader { header })
1799 .collect();
1800
1801 // The work is done in the future.
1802 timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHeaders");
1803
1804 Ok(ReadResponse::BlockHeaders(block_headers))
1805 })
1806 })
1807 .wait_for_panics()
1808 }
1809
1810 ReadRequest::SaplingTree(hash_or_height) => {
1811 let state = self.clone();
1812
1813 tokio::task::spawn_blocking(move || {
1814 span.in_scope(move || {
1815 let sapling_tree = state.non_finalized_state_receiver.with_watch_data(
1816 |non_finalized_state| {
1817 read::sapling_tree(
1818 non_finalized_state.best_chain(),
1819 &state.db,
1820 hash_or_height,
1821 )
1822 },
1823 );
1824
1825 // The work is done in the future.
1826 timer.finish(module_path!(), line!(), "ReadRequest::SaplingTree");
1827
1828 Ok(ReadResponse::SaplingTree(sapling_tree))
1829 })
1830 })
1831 .wait_for_panics()
1832 }
1833
1834 ReadRequest::OrchardTree(hash_or_height) => {
1835 let state = self.clone();
1836
1837 tokio::task::spawn_blocking(move || {
1838 span.in_scope(move || {
1839 let orchard_tree = state.non_finalized_state_receiver.with_watch_data(
1840 |non_finalized_state| {
1841 read::orchard_tree(
1842 non_finalized_state.best_chain(),
1843 &state.db,
1844 hash_or_height,
1845 )
1846 },
1847 );
1848
1849 // The work is done in the future.
1850 timer.finish(module_path!(), line!(), "ReadRequest::OrchardTree");
1851
1852 Ok(ReadResponse::OrchardTree(orchard_tree))
1853 })
1854 })
1855 .wait_for_panics()
1856 }
1857
1858 ReadRequest::SaplingSubtrees { start_index, limit } => {
1859 let state = self.clone();
1860
1861 tokio::task::spawn_blocking(move || {
1862 span.in_scope(move || {
1863 let end_index = limit
1864 .and_then(|limit| start_index.0.checked_add(limit.0))
1865 .map(NoteCommitmentSubtreeIndex);
1866
1867 let sapling_subtrees = state.non_finalized_state_receiver.with_watch_data(
1868 |non_finalized_state| {
1869 if let Some(end_index) = end_index {
1870 read::sapling_subtrees(
1871 non_finalized_state.best_chain(),
1872 &state.db,
1873 start_index..end_index,
1874 )
1875 } else {
1876 // If there is no end bound, just return all the trees.
1877 // If the end bound would overflow, just returns all the trees, because that's what
1878 // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1879 // the trees run out.)
1880 read::sapling_subtrees(
1881 non_finalized_state.best_chain(),
1882 &state.db,
1883 start_index..,
1884 )
1885 }
1886 },
1887 );
1888
1889 // The work is done in the future.
1890 timer.finish(module_path!(), line!(), "ReadRequest::SaplingSubtrees");
1891
1892 Ok(ReadResponse::SaplingSubtrees(sapling_subtrees))
1893 })
1894 })
1895 .wait_for_panics()
1896 }
1897
1898 ReadRequest::OrchardSubtrees { start_index, limit } => {
1899 let state = self.clone();
1900
1901 tokio::task::spawn_blocking(move || {
1902 span.in_scope(move || {
1903 let end_index = limit
1904 .and_then(|limit| start_index.0.checked_add(limit.0))
1905 .map(NoteCommitmentSubtreeIndex);
1906
1907 let orchard_subtrees = state.non_finalized_state_receiver.with_watch_data(
1908 |non_finalized_state| {
1909 if let Some(end_index) = end_index {
1910 read::orchard_subtrees(
1911 non_finalized_state.best_chain(),
1912 &state.db,
1913 start_index..end_index,
1914 )
1915 } else {
1916 // If there is no end bound, just return all the trees.
1917 // If the end bound would overflow, just returns all the trees, because that's what
1918 // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1919 // the trees run out.)
1920 read::orchard_subtrees(
1921 non_finalized_state.best_chain(),
1922 &state.db,
1923 start_index..,
1924 )
1925 }
1926 },
1927 );
1928
1929 // The work is done in the future.
1930 timer.finish(module_path!(), line!(), "ReadRequest::OrchardSubtrees");
1931
1932 Ok(ReadResponse::OrchardSubtrees(orchard_subtrees))
1933 })
1934 })
1935 .wait_for_panics()
1936 }
1937
1938 // For the get_address_balance RPC.
1939 ReadRequest::AddressBalance(addresses) => {
1940 let state = self.clone();
1941
1942 tokio::task::spawn_blocking(move || {
1943 span.in_scope(move || {
1944 let (balance, received) = state
1945 .non_finalized_state_receiver
1946 .with_watch_data(|non_finalized_state| {
1947 read::transparent_balance(
1948 non_finalized_state.best_chain().cloned(),
1949 &state.db,
1950 addresses,
1951 )
1952 })?;
1953
1954 // The work is done in the future.
1955 timer.finish(module_path!(), line!(), "ReadRequest::AddressBalance");
1956
1957 Ok(ReadResponse::AddressBalance { balance, received })
1958 })
1959 })
1960 .wait_for_panics()
1961 }
1962
1963 // For the get_address_tx_ids RPC.
1964 ReadRequest::TransactionIdsByAddresses {
1965 addresses,
1966 height_range,
1967 } => {
1968 let state = self.clone();
1969
1970 tokio::task::spawn_blocking(move || {
1971 span.in_scope(move || {
1972 let tx_ids = state.non_finalized_state_receiver.with_watch_data(
1973 |non_finalized_state| {
1974 read::transparent_tx_ids(
1975 non_finalized_state.best_chain(),
1976 &state.db,
1977 addresses,
1978 height_range,
1979 )
1980 },
1981 );
1982
1983 // The work is done in the future.
1984 timer.finish(
1985 module_path!(),
1986 line!(),
1987 "ReadRequest::TransactionIdsByAddresses",
1988 );
1989
1990 tx_ids.map(ReadResponse::AddressesTransactionIds)
1991 })
1992 })
1993 .wait_for_panics()
1994 }
1995
1996 // For the get_address_utxos RPC.
1997 ReadRequest::UtxosByAddresses(addresses) => {
1998 let state = self.clone();
1999
2000 tokio::task::spawn_blocking(move || {
2001 span.in_scope(move || {
2002 let utxos = state.non_finalized_state_receiver.with_watch_data(
2003 |non_finalized_state| {
2004 read::address_utxos(
2005 &state.network,
2006 non_finalized_state.best_chain(),
2007 &state.db,
2008 addresses,
2009 )
2010 },
2011 );
2012
2013 // The work is done in the future.
2014 timer.finish(module_path!(), line!(), "ReadRequest::UtxosByAddresses");
2015
2016 utxos.map(ReadResponse::AddressUtxos)
2017 })
2018 })
2019 .wait_for_panics()
2020 }
2021
2022 ReadRequest::CheckBestChainTipNullifiersAndAnchors(unmined_tx) => {
2023 let state = self.clone();
2024
2025 tokio::task::spawn_blocking(move || {
2026 span.in_scope(move || {
2027 let latest_non_finalized_best_chain =
2028 state.latest_non_finalized_state().best_chain().cloned();
2029
2030 check::nullifier::tx_no_duplicates_in_chain(
2031 &state.db,
2032 latest_non_finalized_best_chain.as_ref(),
2033 &unmined_tx.transaction,
2034 )?;
2035
2036 check::anchors::tx_anchors_refer_to_final_treestates(
2037 &state.db,
2038 latest_non_finalized_best_chain.as_ref(),
2039 &unmined_tx,
2040 )?;
2041
2042 // The work is done in the future.
2043 timer.finish(
2044 module_path!(),
2045 line!(),
2046 "ReadRequest::CheckBestChainTipNullifiersAndAnchors",
2047 );
2048
2049 Ok(ReadResponse::ValidBestChainTipNullifiersAndAnchors)
2050 })
2051 })
2052 .wait_for_panics()
2053 }
2054
2055 // Used by the get_block and get_block_hash RPCs.
2056 ReadRequest::BestChainBlockHash(height) => {
2057 let state = self.clone();
2058
2059 // # Performance
2060 //
2061 // Allow other async tasks to make progress while concurrently reading blocks from disk.
2062
2063 tokio::task::spawn_blocking(move || {
2064 span.in_scope(move || {
2065 let hash = state.non_finalized_state_receiver.with_watch_data(
2066 |non_finalized_state| {
2067 read::hash_by_height(
2068 non_finalized_state.best_chain(),
2069 &state.db,
2070 height,
2071 )
2072 },
2073 );
2074
2075 // The work is done in the future.
2076 timer.finish(module_path!(), line!(), "ReadRequest::BestChainBlockHash");
2077
2078 Ok(ReadResponse::BlockHash(hash))
2079 })
2080 })
2081 .wait_for_panics()
2082 }
2083
2084 // Used by get_block_template and getblockchaininfo RPCs.
2085 ReadRequest::ChainInfo => {
2086 let state = self.clone();
2087 let latest_non_finalized_state = self.latest_non_finalized_state();
2088
2089 // # Performance
2090 //
2091 // Allow other async tasks to make progress while concurrently reading blocks from disk.
2092
2093 tokio::task::spawn_blocking(move || {
2094 span.in_scope(move || {
2095 // # Correctness
2096 //
2097 // It is ok to do these lookups using multiple database calls. Finalized state updates
2098 // can only add overlapping blocks, and block hashes are unique across all chain forks.
2099 //
2100 // If there is a large overlap between the non-finalized and finalized states,
2101 // where the finalized tip is above the non-finalized tip,
2102 // Zebra is receiving a lot of blocks, or this request has been delayed for a long time.
2103 //
2104 // In that case, the `getblocktemplate` RPC will return an error because Zebra
2105 // is not synced to the tip. That check happens before the RPC makes this request.
2106 let get_block_template_info =
2107 read::difficulty::get_block_template_chain_info(
2108 &latest_non_finalized_state,
2109 &state.db,
2110 &state.network,
2111 );
2112
2113 // The work is done in the future.
2114 timer.finish(module_path!(), line!(), "ReadRequest::ChainInfo");
2115
2116 get_block_template_info.map(ReadResponse::ChainInfo)
2117 })
2118 })
2119 .wait_for_panics()
2120 }
2121
2122 // Used by getmininginfo, getnetworksolps, and getnetworkhashps RPCs.
2123 ReadRequest::SolutionRate { num_blocks, height } => {
2124 let state = self.clone();
2125
2126 // # Performance
2127 //
2128 // Allow other async tasks to make progress while concurrently reading blocks from disk.
2129
2130 tokio::task::spawn_blocking(move || {
2131 span.in_scope(move || {
2132 let latest_non_finalized_state = state.latest_non_finalized_state();
2133 // # Correctness
2134 //
2135 // It is ok to do these lookups using multiple database calls. Finalized state updates
2136 // can only add overlapping blocks, and block hashes are unique across all chain forks.
2137 //
2138 // The worst that can happen here is that the default `start_hash` will be below
2139 // the chain tip.
2140 let (tip_height, tip_hash) =
2141 match read::tip(latest_non_finalized_state.best_chain(), &state.db) {
2142 Some(tip_hash) => tip_hash,
2143 None => return Ok(ReadResponse::SolutionRate(None)),
2144 };
2145
2146 let start_hash = match height {
2147 Some(height) if height < tip_height => read::hash_by_height(
2148 latest_non_finalized_state.best_chain(),
2149 &state.db,
2150 height,
2151 ),
2152 // use the chain tip hash if height is above it or not provided.
2153 _ => Some(tip_hash),
2154 };
2155
2156 let solution_rate = start_hash.and_then(|start_hash| {
2157 read::difficulty::solution_rate(
2158 &latest_non_finalized_state,
2159 &state.db,
2160 num_blocks,
2161 start_hash,
2162 )
2163 });
2164
2165 // The work is done in the future.
2166 timer.finish(module_path!(), line!(), "ReadRequest::SolutionRate");
2167
2168 Ok(ReadResponse::SolutionRate(solution_rate))
2169 })
2170 })
2171 .wait_for_panics()
2172 }
2173
2174 ReadRequest::CheckBlockProposalValidity(semantically_verified) => {
2175 let state = self.clone();
2176
2177 // # Performance
2178 //
2179 // Allow other async tasks to make progress while concurrently reading blocks from disk.
2180
2181 tokio::task::spawn_blocking(move || {
2182 span.in_scope(move || {
2183 tracing::debug!("attempting to validate and commit block proposal onto a cloned non-finalized state");
2184 let mut latest_non_finalized_state = state.latest_non_finalized_state();
2185
2186 // The previous block of a valid proposal must be on the best chain tip.
2187 let Some((_best_tip_height, best_tip_hash)) = read::best_tip(&latest_non_finalized_state, &state.db) else {
2188 return Err("state is empty: wait for Zebra to sync before submitting a proposal".into());
2189 };
2190
2191 if semantically_verified.block.header.previous_block_hash != best_tip_hash {
2192 return Err("proposal is not based on the current best chain tip: previous block hash must be the best chain tip".into());
2193 }
2194
2195 // This clone of the non-finalized state is dropped when this closure returns.
2196 // The non-finalized state that's used in the rest of the state (including finalizing
2197 // blocks into the db) is not mutated here.
2198 //
2199 // TODO: Convert `CommitSemanticallyVerifiedError` to a new `ValidateProposalError`?
2200 latest_non_finalized_state.disable_metrics();
2201
2202 write::validate_and_commit_non_finalized(
2203 &state.db,
2204 &mut latest_non_finalized_state,
2205 semantically_verified,
2206 )?;
2207
2208 // The work is done in the future.
2209 timer.finish(
2210 module_path!(),
2211 line!(),
2212 "ReadRequest::CheckBlockProposalValidity",
2213 );
2214
2215 Ok(ReadResponse::ValidBlockProposal)
2216 })
2217 })
2218 .wait_for_panics()
2219 }
2220
2221 ReadRequest::TipBlockSize => {
2222 let state = self.clone();
2223
2224 tokio::task::spawn_blocking(move || {
2225 span.in_scope(move || {
2226 // Get the best chain tip height.
2227 let tip_height = state
2228 .non_finalized_state_receiver
2229 .with_watch_data(|non_finalized_state| {
2230 read::tip_height(non_finalized_state.best_chain(), &state.db)
2231 })
2232 .unwrap_or(Height(0));
2233
2234 // Get the block at the best chain tip height.
2235 let block = state.non_finalized_state_receiver.with_watch_data(
2236 |non_finalized_state| {
2237 read::block(
2238 non_finalized_state.best_chain(),
2239 &state.db,
2240 tip_height.into(),
2241 )
2242 },
2243 );
2244
2245 // The work is done in the future.
2246 timer.finish(module_path!(), line!(), "ReadRequest::TipBlockSize");
2247
2248 // Respond with the length of the obtained block if any.
2249 match block {
2250 Some(b) => Ok(ReadResponse::TipBlockSize(Some(
2251 b.zcash_serialize_to_vec()?.len(),
2252 ))),
2253 None => Ok(ReadResponse::TipBlockSize(None)),
2254 }
2255 })
2256 })
2257 .wait_for_panics()
2258 }
2259
2260 ReadRequest::NonFinalizedBlocksListener => {
2261 // The non-finalized blocks listener is used to notify the state service
2262 // about new blocks that have been added to the non-finalized state.
2263 let non_finalized_blocks_listener = NonFinalizedBlocksListener::spawn(
2264 self.network.clone(),
2265 self.non_finalized_state_receiver.clone(),
2266 );
2267
2268 async move {
2269 timer.finish(
2270 module_path!(),
2271 line!(),
2272 "ReadRequest::NonFinalizedBlocksListener",
2273 );
2274
2275 Ok(ReadResponse::NonFinalizedBlocksListener(
2276 non_finalized_blocks_listener,
2277 ))
2278 }
2279 .boxed()
2280 }
2281 }
2282 }
2283}
2284
2285/// Initialize a state service from the provided [`Config`].
2286/// Returns a boxed state service, a read-only state service,
2287/// and receivers for state chain tip updates.
2288///
2289/// Each `network` has its own separate on-disk database.
2290///
2291/// The state uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
2292/// to work out when it is near the final checkpoint.
2293///
2294/// To share access to the state, wrap the returned service in a `Buffer`,
2295/// or clone the returned [`ReadStateService`].
2296///
2297/// It's possible to construct multiple state services in the same application (as
2298/// long as they, e.g., use different storage locations), but doing so is
2299/// probably not what you want.
2300pub async fn init(
2301 config: Config,
2302 network: &Network,
2303 max_checkpoint_height: block::Height,
2304 checkpoint_verify_concurrency_limit: usize,
2305) -> (
2306 BoxService<Request, Response, BoxError>,
2307 ReadStateService,
2308 LatestChainTip,
2309 ChainTipChange,
2310) {
2311 let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
2312 StateService::new(
2313 config,
2314 network,
2315 max_checkpoint_height,
2316 checkpoint_verify_concurrency_limit,
2317 )
2318 .await;
2319
2320 (
2321 BoxService::new(state_service),
2322 read_only_state_service,
2323 latest_chain_tip,
2324 chain_tip_change,
2325 )
2326}
2327
2328/// Initialize a read state service from the provided [`Config`].
2329/// Returns a read-only state service,
2330///
2331/// Each `network` has its own separate on-disk database.
2332///
2333/// To share access to the state, clone the returned [`ReadStateService`].
2334pub fn init_read_only(
2335 config: Config,
2336 network: &Network,
2337) -> (
2338 ReadStateService,
2339 ZebraDb,
2340 tokio::sync::watch::Sender<NonFinalizedState>,
2341) {
2342 let finalized_state = FinalizedState::new_with_debug(
2343 &config,
2344 network,
2345 true,
2346 #[cfg(feature = "elasticsearch")]
2347 false,
2348 true,
2349 );
2350 let (non_finalized_state_sender, non_finalized_state_receiver) =
2351 tokio::sync::watch::channel(NonFinalizedState::new(network));
2352
2353 (
2354 ReadStateService::new(
2355 &finalized_state,
2356 None,
2357 WatchReceiver::new(non_finalized_state_receiver),
2358 ),
2359 finalized_state.db.clone(),
2360 non_finalized_state_sender,
2361 )
2362}
2363
2364/// Calls [`init_read_only`] with the provided [`Config`] and [`Network`] from a blocking task.
2365/// Returns a [`tokio::task::JoinHandle`] with a read state service and chain tip sender.
2366pub fn spawn_init_read_only(
2367 config: Config,
2368 network: &Network,
2369) -> tokio::task::JoinHandle<(
2370 ReadStateService,
2371 ZebraDb,
2372 tokio::sync::watch::Sender<NonFinalizedState>,
2373)> {
2374 let network = network.clone();
2375 tokio::task::spawn_blocking(move || init_read_only(config, &network))
2376}
2377
2378/// Returns a [`StateService`] with an ephemeral [`Config`] and a buffer with a single slot.
2379///
2380/// This can be used to create a state service for testing. See also [`init`].
2381#[cfg(any(test, feature = "proptest-impl"))]
2382pub async fn init_test(
2383 network: &Network,
2384) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
2385 // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
2386 // if we ever need to test final checkpoint sent UTXO queries
2387 let (state_service, _, _, _) =
2388 StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
2389
2390 Buffer::new(BoxService::new(state_service), 1)
2391}
2392
2393/// Initializes a state service with an ephemeral [`Config`] and a buffer with a single slot,
2394/// then returns the read-write service, read-only service, and tip watch channels.
2395///
2396/// This can be used to create a state service for testing. See also [`init`].
2397#[cfg(any(test, feature = "proptest-impl"))]
2398pub async fn init_test_services(
2399 network: &Network,
2400) -> (
2401 Buffer<BoxService<Request, Response, BoxError>, Request>,
2402 ReadStateService,
2403 LatestChainTip,
2404 ChainTipChange,
2405) {
2406 // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
2407 // if we ever need to test final checkpoint sent UTXO queries
2408 let (state_service, read_state_service, latest_chain_tip, chain_tip_change) =
2409 StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
2410
2411 let state_service = Buffer::new(BoxService::new(state_service), 1);
2412
2413 (
2414 state_service,
2415 read_state_service,
2416 latest_chain_tip,
2417 chain_tip_change,
2418 )
2419}