use std::{
net::SocketAddr,
sync::atomic::{AtomicBool, Ordering},
time::{Duration, Instant},
};
use tempfile::TempDir;
use zebra_node_services::rpc_client::RpcRequestClient;
use zebra_test::prelude::*;
use crate::common::{launch::ZebradTestDirExt, test_type::TestType};
pub const TIP_CHECK_RATE_LIMIT: Duration = Duration::from_secs(60);
#[tracing::instrument]
pub fn wait_for_zebrad_and_lightwalletd_sync<
P: ZebradTestDirExt + std::fmt::Debug + std::marker::Send + 'static,
>(
mut lightwalletd: TestChild<TempDir>,
lightwalletd_rpc_port: u16,
mut zebrad: TestChild<P>,
zebra_rpc_address: SocketAddr,
test_type: TestType,
wait_for_zebrad_mempool: bool,
wait_for_zebrad_tip: bool,
) -> Result<(TestChild<TempDir>, TestChild<P>)> {
let is_zebrad_finished = AtomicBool::new(!wait_for_zebrad_tip);
let is_lightwalletd_finished = AtomicBool::new(false);
let is_zebrad_finished = &is_zebrad_finished;
let is_lightwalletd_finished = &is_lightwalletd_finished;
let zebrad_mut = &mut zebrad;
let zebrad_wait_fn = || -> Result<_> {
if wait_for_zebrad_mempool {
tracing::info!(
?test_type,
"waiting for zebrad to activate the mempool when it gets near the tip..."
);
zebrad_mut.expect_stdout_line_matches("activating mempool")?;
}
if wait_for_zebrad_tip {
tracing::info!(?test_type, "waiting for zebrad to sync to the tip...");
zebrad_mut.expect_stdout_line_matches(crate::common::sync::SYNC_FINISHED_REGEX)?;
}
is_zebrad_finished.store(true, Ordering::SeqCst);
tracing::info!(
?test_type,
"zebrad is waiting for lightwalletd to sync to the tip..."
);
while !is_lightwalletd_finished.load(Ordering::SeqCst) {
if wait_for_zebrad_tip {
zebrad_mut.expect_stdout_line_matches(crate::common::sync::SYNC_FINISHED_REGEX)?;
} else {
zebrad_mut.expect_stdout_line_matches(crate::common::sync::SYNC_PROGRESS_REGEX)?;
}
}
Ok(zebrad_mut)
};
let lightwalletd_mut = &mut lightwalletd;
let lightwalletd_wait_fn = || -> Result<_> {
tracing::info!(
?test_type,
"lightwalletd is waiting for zebrad to sync to the tip..."
);
while !is_zebrad_finished.load(Ordering::SeqCst) {
assert!(
lightwalletd_mut.wait_for_stdout_line(None),
"lightwalletd output unexpectedly finished early",
);
}
tracing::info!(?test_type, "waiting for lightwalletd to sync to the tip...");
while !are_zebrad_and_lightwalletd_tips_synced(zebra_rpc_address, lightwalletd_mut)? {
let previous_check = Instant::now();
while previous_check.elapsed() < TIP_CHECK_RATE_LIMIT {
assert!(
lightwalletd_mut.wait_for_stdout_line(None),
"lightwalletd output unexpectedly finished early",
);
}
}
is_lightwalletd_finished.store(true, Ordering::SeqCst);
Ok(lightwalletd_mut)
};
std::thread::scope(|s| {
let zebrad_thread = s.spawn(|| {
let zebrad_result = zebrad_wait_fn();
is_zebrad_finished.store(true, Ordering::SeqCst);
zebrad_result.expect("test failed while waiting for zebrad to sync");
});
let lightwalletd_thread = s.spawn(|| {
let lightwalletd_result = lightwalletd_wait_fn();
is_lightwalletd_finished.store(true, Ordering::SeqCst);
lightwalletd_result.expect("test failed while waiting for lightwalletd to sync.");
});
s.spawn(|| {
let zebrad_result = zebrad_thread.join();
is_zebrad_finished.store(true, Ordering::SeqCst);
zebrad_result.expect("test panicked or failed while waiting for zebrad to sync");
});
s.spawn(|| {
let lightwalletd_result = lightwalletd_thread.join();
is_lightwalletd_finished.store(true, Ordering::SeqCst);
lightwalletd_result
.expect("test panicked or failed while waiting for lightwalletd to sync");
});
});
Ok((lightwalletd, zebrad))
}
#[tracing::instrument]
pub fn are_zebrad_and_lightwalletd_tips_synced(
zebra_rpc_address: SocketAddr,
lightwalletd: &mut TestChild<TempDir>,
) -> Result<bool> {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
rt.block_on(async {
let mut lightwalletd_next_height = 1;
if let Ok(line) = lightwalletd.expect_stdout_line_matches("Waiting for block: [0-9]+") {
let line_json: serde_json::Value = serde_json::from_str(line.as_str())
.expect("captured lightwalletd logs are always valid json");
let msg = line_json["msg"]
.as_str()
.expect("`msg` field is always a valid string");
let last = msg
.split(' ')
.next_back()
.expect("always possible to get the last word of a separated by space string");
lightwalletd_next_height = last
.parse()
.expect("the last word is always the block number so it can be parsed to i32 ");
}
let lightwalletd_tip_height = (lightwalletd_next_height - 1) as u64;
let client = RpcRequestClient::new(zebra_rpc_address);
let zebrad_blockchain_info = client
.text_from_call("getblockchaininfo", "[]".to_string())
.await?;
let zebrad_blockchain_info: serde_json::Value =
serde_json::from_str(&zebrad_blockchain_info)?;
let zebrad_tip_height = zebrad_blockchain_info["result"]["blocks"]
.as_u64()
.expect("unexpected block height: doesn't fit in u64");
if lightwalletd_tip_height != zebrad_tip_height {
tracing::info!(
lightwalletd_tip_height,
zebrad_tip_height,
zebra_rpc_address = ?zebra_rpc_address,
"lightwalletd tip is behind Zebra tip, waiting for sync",
);
} else {
tracing::debug!(
lightwalletd_tip_height,
zebrad_tip_height,
zebra_rpc_address = ?zebra_rpc_address,
"lightwalletd tip matches Zebra tip",
);
}
Ok(lightwalletd_tip_height == zebrad_tip_height)
})
}