#![allow(dead_code)]
use std::{
iter,
path::{Path, PathBuf},
time::Duration,
};
use color_eyre::eyre::{eyre, Result};
use semver::Version;
use tower::{util::BoxService, Service};
use zebra_chain::{
block::{self, Block, Height},
parameters::Network,
serialization::ZcashDeserializeInto,
};
use zebra_node_services::rpc_client::RpcRequestClient;
use zebra_state::{ChainTipChange, LatestChainTip, MAX_BLOCK_REORG_HEIGHT};
use zebra_test::command::TestChild;
use crate::common::{
launch::spawn_zebrad_for_rpc_with_opts,
sync::{check_sync_logs_until, MempoolBehavior, SYNC_FINISHED_REGEX},
test_type::TestType,
};
pub const DATABASE_FORMAT_CHECK_INTERVAL: Duration = Duration::from_secs(5 * 60);
pub const DATABASE_FORMAT_UPGRADE_IS_LONG: bool = false;
pub type BoxStateService =
BoxService<zebra_state::Request, zebra_state::Response, zebra_state::BoxError>;
#[tracing::instrument(skip(zebrad))]
pub fn wait_for_state_version_message<T>(zebrad: &mut TestChild<T>) -> Result<String> {
tracing::info!(
zebrad = ?zebrad.cmd,
"launched zebrad, waiting for zebrad to open the state database..."
);
zebrad.expect_stdout_line_matches(
"(creating new database with the current format)|\
(trying to open older database format)|\
(trying to open newer database format)|\
(trying to open current database format)",
)
}
#[tracing::instrument(skip(zebrad))]
pub fn wait_for_state_version_upgrade<T>(
zebrad: &mut TestChild<T>,
state_version_message: &str,
required_version: Version,
extra_required_log_regexes: impl IntoIterator<Item = String> + std::fmt::Debug,
) -> Result<()> {
tracing::info!(
zebrad = ?zebrad.cmd,
%state_version_message,
%required_version,
?extra_required_log_regexes,
"waiting for zebrad state upgrade..."
);
if state_version_message.contains("launching upgrade task") {
let upgrade_pattern = format!(
"marked database format as upgraded.*format_upgrade_version.*=.*{required_version}"
);
let extra_required_log_regexes = extra_required_log_regexes.into_iter();
let required_logs: Vec<String> = iter::once(upgrade_pattern)
.chain(extra_required_log_regexes)
.collect();
let upgrade_messages = zebrad.expect_stdout_line_matches_all_unordered(&required_logs)?;
tracing::info!(
zebrad = ?zebrad.cmd,
%state_version_message,
%required_version,
?required_logs,
?upgrade_messages,
"zebrad state has been upgraded"
);
} else {
let required_logs: Vec<String> = extra_required_log_regexes.into_iter().collect();
let upgrade_messages = zebrad.expect_stdout_line_matches_all_unordered(&required_logs)?;
tracing::info!(
zebrad = ?zebrad.cmd,
%state_version_message,
%required_version,
?required_logs,
?upgrade_messages,
"no zebrad upgrade needed"
);
}
Ok(())
}
#[tracing::instrument(skip(cache_dir))]
pub async fn start_state_service_with_cache_dir(
network: &Network,
cache_dir: impl Into<PathBuf>,
) -> Result<(
BoxStateService,
impl Service<
zebra_state::ReadRequest,
Response = zebra_state::ReadResponse,
Error = zebra_state::BoxError,
>,
LatestChainTip,
ChainTipChange,
)> {
let config = zebra_state::Config {
cache_dir: cache_dir.into(),
..zebra_state::Config::default()
};
Ok(zebra_state::init(config, network, Height::MAX, 0).await)
}
#[tracing::instrument]
pub async fn load_finalized_tip_height_from_state_directory(
network: &Network,
state_path: &Path,
) -> Result<block::Height> {
let config = zebra_state::Config {
cache_dir: state_path.to_path_buf(),
..zebra_state::Config::default()
};
let network = network.clone();
let (_read_state, db, _sender) =
tokio::task::spawn_blocking(move || zebra_state::init_read_only(config, &network))
.await
.map_err(|e| eyre!("Blocking task failed while loading state: {e}"))?;
let finalized_tip_height = db
.finalized_tip_height()
.ok_or_else(|| eyre!("State directory doesn't have a finalized tip block"))?;
Ok(finalized_tip_height)
}
pub async fn future_blocks(
network: &Network,
test_type: TestType,
test_name: &str,
max_num_blocks: u32,
) -> Result<Vec<Block>> {
let blocks: Vec<Block> = raw_future_blocks(network, test_type, test_name, max_num_blocks)
.await?
.into_iter()
.map(hex::decode)
.map(|block_bytes| {
block_bytes
.expect("getblock rpc calls in get_raw_future_blocks should return valid hexdata")
.zcash_deserialize_into()
.expect("decoded hex data from getblock rpc calls should deserialize into blocks")
})
.collect();
Ok(blocks)
}
pub async fn raw_future_blocks(
network: &Network,
test_type: TestType,
test_name: &str,
max_num_blocks: u32,
) -> Result<Vec<String>> {
assert!(max_num_blocks > 0);
let max_num_blocks = max_num_blocks.min(MAX_BLOCK_REORG_HEIGHT);
let mut raw_blocks = Vec::with_capacity(max_num_blocks as usize);
assert!(
test_type.needs_zebra_cached_state() && test_type.needs_zebra_rpc_server(),
"raw_future_blocks needs zebra cached state and rpc server"
);
let should_sync = true;
let (zebrad, zebra_rpc_address) =
spawn_zebrad_for_rpc_with_opts(network.clone(), test_name, test_type, should_sync, false)?
.ok_or_else(|| eyre!("raw_future_blocks requires a cached state"))?;
let rpc_address = zebra_rpc_address.expect("test type must have RPC port");
let mut zebrad = check_sync_logs_until(
zebrad,
network,
SYNC_FINISHED_REGEX,
MempoolBehavior::ShouldAutomaticallyActivate,
true,
)?;
let rpc_client = RpcRequestClient::new(rpc_address);
let blockchain_info: serde_json::Value = serde_json::from_str(
&rpc_client
.text_from_call("getblockchaininfo", "[]".to_string())
.await?,
)?;
let tip_height: u32 = blockchain_info["result"]["blocks"]
.as_u64()
.expect("unexpected block height: doesn't fit in u64")
.try_into()
.expect("unexpected block height: doesn't fit in u32");
let estimated_finalized_tip_height = tip_height - MAX_BLOCK_REORG_HEIGHT;
tracing::info!(
?tip_height,
?estimated_finalized_tip_height,
"got tip height from blockchaininfo",
);
for block_height in (0..max_num_blocks).map(|idx| idx + estimated_finalized_tip_height) {
let raw_block: serde_json::Value = serde_json::from_str(
&rpc_client
.text_from_call("getblock", format!(r#"["{block_height}", 0]"#))
.await?,
)?;
raw_blocks.push((
block_height,
raw_block["result"]
.as_str()
.expect("unexpected getblock result: not a string")
.to_string(),
));
}
zebrad.kill(true)?;
zebrad.wait_with_output()?;
std::thread::sleep(Duration::from_secs(3));
let zebrad_state_path = test_type
.zebrad_state_path(test_name)
.expect("already checked that there is a cached state path");
let Height(finalized_tip_height) =
load_finalized_tip_height_from_state_directory(network, zebrad_state_path.as_ref()).await?;
tracing::info!(
?finalized_tip_height,
non_finalized_tip_height = ?tip_height,
?estimated_finalized_tip_height,
"got finalized tip height from state directory"
);
let raw_future_blocks = raw_blocks
.into_iter()
.filter_map(|(height, raw_block)| height.gt(&finalized_tip_height).then_some(raw_block))
.collect();
Ok(raw_future_blocks)
}