use std::{fmt, sync::Arc};
use chrono::{DateTime, Utc};
use futures::TryFutureExt;
use tokio::sync::watch;
use tracing::{field, instrument};
use zebra_chain::{
block,
chain_tip::ChainTip,
parameters::{Network, NetworkUpgrade},
transaction::{self, Transaction},
};
use crate::{
request::ContextuallyVerifiedBlock, service::watch_receiver::WatchReceiver, BoxError,
CheckpointVerifiedBlock, SemanticallyVerifiedBlock,
};
use TipAction::*;
#[cfg(any(test, feature = "proptest-impl"))]
use proptest_derive::Arbitrary;
#[cfg(any(test, feature = "proptest-impl"))]
use zebra_chain::serialization::arbitrary::datetime_full;
#[cfg(test)]
mod tests;
type ChainTipData = Option<ChainTipBlock>;
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))]
pub struct ChainTipBlock {
pub hash: block::Hash,
pub height: block::Height,
#[cfg_attr(
any(test, feature = "proptest-impl"),
proptest(strategy = "datetime_full()")
)]
pub time: DateTime<Utc>,
pub transactions: Vec<Arc<Transaction>>,
pub transaction_hashes: Arc<[transaction::Hash]>,
pub previous_block_hash: block::Hash,
}
impl fmt::Display for ChainTipBlock {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ChainTipBlock")
.field("height", &self.height)
.field("hash", &self.hash)
.field("transactions", &self.transactions.len())
.finish()
}
}
impl From<ContextuallyVerifiedBlock> for ChainTipBlock {
fn from(contextually_valid: ContextuallyVerifiedBlock) -> Self {
let ContextuallyVerifiedBlock {
block,
hash,
height,
transaction_hashes,
..
} = contextually_valid;
Self {
hash,
height,
time: block.header.time,
transactions: block.transactions.clone(),
transaction_hashes,
previous_block_hash: block.header.previous_block_hash,
}
}
}
impl From<SemanticallyVerifiedBlock> for ChainTipBlock {
fn from(prepared: SemanticallyVerifiedBlock) -> Self {
let SemanticallyVerifiedBlock {
block,
hash,
height,
new_outputs: _,
transaction_hashes,
deferred_pool_balance_change: _,
} = prepared;
Self {
hash,
height,
time: block.header.time,
transactions: block.transactions.clone(),
transaction_hashes,
previous_block_hash: block.header.previous_block_hash,
}
}
}
impl From<CheckpointVerifiedBlock> for ChainTipBlock {
fn from(CheckpointVerifiedBlock(prepared): CheckpointVerifiedBlock) -> Self {
prepared.into()
}
}
#[derive(Debug)]
pub struct ChainTipSender {
use_non_finalized_tip: bool,
sender: watch::Sender<ChainTipData>,
}
impl ChainTipSender {
#[instrument(skip(initial_tip), fields(new_height, new_hash))]
pub fn new(
initial_tip: impl Into<Option<ChainTipBlock>>,
network: &Network,
) -> (Self, LatestChainTip, ChainTipChange) {
let initial_tip = initial_tip.into();
Self::record_new_tip(&initial_tip);
let (sender, receiver) = watch::channel(None);
let mut sender = ChainTipSender {
use_non_finalized_tip: false,
sender,
};
let current = LatestChainTip::new(receiver);
let change = ChainTipChange::new(current.clone(), network);
sender.update(initial_tip);
(sender, current, change)
}
pub fn finalized_sender(&self) -> Self {
Self {
use_non_finalized_tip: false,
sender: self.sender.clone(),
}
}
#[instrument(
skip(self, new_tip),
fields(old_use_non_finalized_tip, old_height, old_hash, new_height, new_hash)
)]
pub fn set_finalized_tip(&mut self, new_tip: impl Into<Option<ChainTipBlock>> + Clone) {
let new_tip = new_tip.into();
self.record_fields(&new_tip);
if !self.use_non_finalized_tip {
self.update(new_tip);
}
}
#[instrument(
skip(self, new_tip),
fields(old_use_non_finalized_tip, old_height, old_hash, new_height, new_hash)
)]
pub fn set_best_non_finalized_tip(
&mut self,
new_tip: impl Into<Option<ChainTipBlock>> + Clone,
) {
let new_tip = new_tip.into();
self.record_fields(&new_tip);
if new_tip.is_some() {
self.use_non_finalized_tip = true;
self.update(new_tip)
}
}
fn update(&mut self, new_tip: Option<ChainTipBlock>) {
let active_hash = self
.sender
.borrow()
.as_ref()
.map(|active_value| active_value.hash);
let needs_update = match (new_tip.as_ref(), active_hash) {
(Some(new_tip), Some(active_hash)) => new_tip.hash != active_hash,
(Some(_new_tip), None) => true,
(None, _active_value_hash) => false,
};
if needs_update {
let _ = self.sender.send(new_tip);
}
}
fn record_new_tip(new_tip: &Option<ChainTipBlock>) {
Self::record_tip(&tracing::Span::current(), "new", new_tip);
}
fn record_fields(&self, new_tip: &Option<ChainTipBlock>) {
let span = tracing::Span::current();
let old_tip = &*self.sender.borrow();
Self::record_tip(&span, "new", new_tip);
Self::record_tip(&span, "old", old_tip);
span.record(
"old_use_non_finalized_tip",
field::debug(self.use_non_finalized_tip),
);
}
fn record_tip(span: &tracing::Span, prefix: &str, tip: &Option<ChainTipBlock>) {
let height = tip.as_ref().map(|block| block.height);
let hash = tip.as_ref().map(|block| block.hash);
span.record(format!("{prefix}_height").as_str(), field::debug(height));
span.record(format!("{prefix}_hash").as_str(), field::debug(hash));
}
}
#[derive(Clone, Debug)]
pub struct LatestChainTip {
receiver: WatchReceiver<ChainTipData>,
}
impl LatestChainTip {
fn new(receiver: watch::Receiver<ChainTipData>) -> Self {
Self {
receiver: WatchReceiver::new(receiver),
}
}
fn with_chain_tip_block<U, F>(&self, f: F) -> Option<U>
where
F: FnOnce(&ChainTipBlock) -> U,
{
let span = tracing::Span::current();
let register_span_fields = |chain_tip_block: Option<&ChainTipBlock>| {
span.record(
"height",
tracing::field::debug(chain_tip_block.map(|block| block.height)),
);
span.record(
"hash",
tracing::field::debug(chain_tip_block.map(|block| block.hash)),
);
span.record(
"time",
tracing::field::debug(chain_tip_block.map(|block| block.time)),
);
span.record(
"previous_hash",
tracing::field::debug(chain_tip_block.map(|block| block.previous_block_hash)),
);
span.record(
"transaction_count",
tracing::field::debug(chain_tip_block.map(|block| block.transaction_hashes.len())),
);
};
self.receiver.with_watch_data(|chain_tip_block| {
register_span_fields(chain_tip_block.as_ref());
chain_tip_block.as_ref().map(f)
})
}
}
impl ChainTip for LatestChainTip {
#[instrument(skip(self))]
fn best_tip_height(&self) -> Option<block::Height> {
self.with_chain_tip_block(|block| block.height)
}
#[instrument(skip(self))]
fn best_tip_hash(&self) -> Option<block::Hash> {
self.with_chain_tip_block(|block| block.hash)
}
#[instrument(skip(self))]
fn best_tip_height_and_hash(&self) -> Option<(block::Height, block::Hash)> {
self.with_chain_tip_block(|block| (block.height, block.hash))
}
#[instrument(skip(self))]
fn best_tip_block_time(&self) -> Option<DateTime<Utc>> {
self.with_chain_tip_block(|block| block.time)
}
#[instrument(skip(self))]
fn best_tip_height_and_block_time(&self) -> Option<(block::Height, DateTime<Utc>)> {
self.with_chain_tip_block(|block| (block.height, block.time))
}
#[instrument(skip(self))]
fn best_tip_mined_transaction_ids(&self) -> Arc<[transaction::Hash]> {
self.with_chain_tip_block(|block| block.transaction_hashes.clone())
.unwrap_or_else(|| Arc::new([]))
}
#[instrument(skip(self))]
async fn best_tip_changed(&mut self) -> Result<(), BoxError> {
self.receiver.changed().err_into().await
}
fn mark_best_tip_seen(&mut self) {
self.receiver.mark_as_seen();
}
}
#[derive(Debug)]
pub struct ChainTipChange {
latest_chain_tip: LatestChainTip,
last_change_hash: Option<block::Hash>,
network: Network,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum TipAction {
Grow {
block: ChainTipBlock,
},
Reset {
height: block::Height,
hash: block::Hash,
},
}
impl ChainTipChange {
#[instrument(
skip(self),
fields(
last_change_hash = ?self.last_change_hash,
network = ?self.network,
))]
pub async fn wait_for_tip_change(&mut self) -> Result<TipAction, watch::error::RecvError> {
let block = self.tip_block_change().await?;
let action = self.action(block.clone());
self.last_change_hash = Some(block.hash);
Ok(action)
}
#[instrument(
skip(self),
fields(
last_change_hash = ?self.last_change_hash,
network = ?self.network,
))]
pub fn last_tip_change(&mut self) -> Option<TipAction> {
let block = self.latest_chain_tip.with_chain_tip_block(|block| {
if Some(block.hash) != self.last_change_hash {
Some(block.clone())
} else {
None
}
})??;
let block_hash = block.hash;
let tip_action = self.action(block);
self.last_change_hash = Some(block_hash);
Some(tip_action)
}
pub fn mark_last_change_hash(&mut self, hash: block::Hash) {
self.last_change_hash = Some(hash);
}
fn action(&self, block: ChainTipBlock) -> TipAction {
assert!(
Some(block.hash) != self.last_change_hash,
"ChainTipSender and ChainTipChange ignore unchanged tips"
);
if Some(block.previous_block_hash) != self.last_change_hash
|| NetworkUpgrade::is_activation_height(&self.network, block.height)
{
TipAction::reset_with(block)
} else {
TipAction::grow_with(block)
}
}
fn new(latest_chain_tip: LatestChainTip, network: &Network) -> Self {
Self {
latest_chain_tip,
last_change_hash: None,
network: network.clone(),
}
}
async fn tip_block_change(&mut self) -> Result<ChainTipBlock, watch::error::RecvError> {
loop {
self.latest_chain_tip.receiver.changed().await?;
let new_block = self
.latest_chain_tip
.with_chain_tip_block(|block| {
if Some(block.hash) != self.last_change_hash {
Some(block.clone())
} else {
None
}
})
.flatten();
if let Some(block) = new_block {
return Ok(block);
}
}
}
pub fn latest_chain_tip(&self) -> LatestChainTip {
self.latest_chain_tip.clone()
}
}
impl Clone for ChainTipChange {
fn clone(&self) -> Self {
Self {
latest_chain_tip: self.latest_chain_tip.clone(),
last_change_hash: None,
network: self.network.clone(),
}
}
}
impl TipAction {
pub fn is_reset(&self) -> bool {
matches!(self, Reset { .. })
}
pub fn best_tip_hash(&self) -> block::Hash {
match self {
Grow { block } => block.hash,
Reset { hash, .. } => *hash,
}
}
pub fn best_tip_height(&self) -> block::Height {
match self {
Grow { block } => block.height,
Reset { height, .. } => *height,
}
}
pub fn best_tip_hash_and_height(&self) -> (block::Hash, block::Height) {
match self {
Grow { block } => (block.hash, block.height),
Reset { hash, height } => (*hash, *height),
}
}
pub(crate) fn grow_with(block: ChainTipBlock) -> Self {
Grow { block }
}
pub(crate) fn reset_with(block: ChainTipBlock) -> Self {
Reset {
height: block.height,
hash: block.hash,
}
}
#[cfg(test)]
pub(crate) fn into_reset(self) -> Self {
match self {
Grow { block } => Reset {
height: block.height,
hash: block.hash,
},
reset @ Reset { .. } => reset,
}
}
}