mod condition_check_reactor;
mod fake_transaction_acceptor;
pub(crate) mod filter_reactor;
pub(crate) mod network;
pub(crate) mod test_clock;
use std::{
any::type_name,
fmt::Debug,
fs,
io::Write,
marker::PhantomData,
ops::Range,
sync::atomic::{AtomicU16, Ordering},
time,
};
use anyhow::Context;
use assert_json_diff::{assert_json_eq, assert_json_matches_no_panic, CompareMode, Config};
use derive_more::From;
use futures::channel::oneshot;
use once_cell::sync::Lazy;
use rand::Rng;
use serde_json::Value;
use tempfile::TempDir;
use tokio::runtime::{self, Runtime};
use tracing::{debug, warn};
use casper_types::testing::TestRng;
use crate::{
components::Component,
effect::{
announcements::{ControlAnnouncement, FatalAnnouncement},
requests::NetworkRequest,
EffectBuilder, Effects, Responder,
},
logging,
protocol::Message,
reactor::{EventQueueHandle, QueueKind, ReactorEvent, Scheduler},
};
pub(crate) use condition_check_reactor::ConditionCheckReactor;
pub(crate) use fake_transaction_acceptor::FakeTransactionAcceptor;
const FATAL_GRACE_TIME: time::Duration = time::Duration::from_secs(3);
#[cfg(not(target_os = "windows"))]
const TEST_PORT_RANGE: Range<u16> = {
29000..29997
};
#[cfg(target_os = "windows")]
const TEST_PORT_RANGE: Range<u16> = 60001..60998;
const TEST_PORT_STRIDE: u16 = 29;
pub(crate) const LARGE_WASM_LANE_ID: u8 = 3;
macro_rules! map {
() => { std::collections::BTreeMap::new() };
( $first_key:expr => $first_value:expr $( , $key:expr => $value:expr )* $(,)? ) => {{
let mut map = std::collections::BTreeMap::new();
assert!(map.insert($first_key, $first_value).is_none());
$(
assert!(map.insert($key, $value).is_none());
)*
map
}};
}
macro_rules! set {
() => { std::collections::BTreeSet::new() };
( $first_value:expr $( , $value:expr )* $(,)? ) => {{
let mut set = std::collections::BTreeSet::new();
assert!(set.insert($first_value));
$(
assert!(set.insert($value));
)*
set
}}
}
pub(crate) use map;
pub(crate) use set;
pub(crate) fn unused_port_on_localhost() -> u16 {
static NEXT_PORT: Lazy<AtomicU16> = Lazy::new(|| {
rand::thread_rng()
.gen_range(TEST_PORT_RANGE.start..(TEST_PORT_RANGE.start + TEST_PORT_STRIDE))
.into()
});
NEXT_PORT.fetch_add(TEST_PORT_STRIDE, Ordering::SeqCst)
}
pub(crate) fn init_logging() {
logging::init()
.ok();
}
pub(crate) struct ComponentHarness<REv: 'static> {
pub(crate) rng: TestRng,
pub(crate) scheduler: &'static Scheduler<REv>,
pub(crate) effect_builder: EffectBuilder<REv>,
pub(crate) tmp: TempDir,
pub(crate) runtime: Runtime,
}
pub(crate) struct ComponentHarnessBuilder<REv: 'static> {
rng: Option<TestRng>,
tmp: Option<TempDir>,
_phantom: PhantomData<REv>,
}
impl<REv: 'static + Debug> ComponentHarnessBuilder<REv> {
pub(crate) fn build(self) -> ComponentHarness<REv> {
self.try_build().expect("failed to build component harness")
}
pub(crate) fn on_disk(mut self, on_disk: TempDir) -> ComponentHarnessBuilder<REv> {
self.tmp = Some(on_disk);
self
}
pub(crate) fn rng(mut self, rng: TestRng) -> ComponentHarnessBuilder<REv> {
self.rng = Some(rng);
self
}
pub(crate) fn try_build(self) -> anyhow::Result<ComponentHarness<REv>> {
let tmp = match self.tmp {
Some(tmp) => tmp,
None => {
TempDir::new().context("could not create temporary directory for test harness")?
}
};
let rng = self.rng.unwrap_or_default();
let scheduler = Box::leak(Box::new(Scheduler::new(QueueKind::weights(), None)));
let event_queue_handle = EventQueueHandle::without_shutdown(scheduler);
let effect_builder = EffectBuilder::new(event_queue_handle);
let runtime = runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("build tokio runtime")?;
Ok(ComponentHarness {
rng,
scheduler,
effect_builder,
tmp,
runtime,
})
}
}
impl<REv: 'static> ComponentHarness<REv> {
pub(crate) fn builder() -> ComponentHarnessBuilder<REv> {
ComponentHarnessBuilder {
rng: None,
tmp: None,
_phantom: PhantomData,
}
}
pub(crate) fn into_parts(self) -> (TempDir, TestRng) {
(self.tmp, self.rng)
}
pub(crate) fn is_idle(&self) -> bool {
self.scheduler.item_count() == 0
}
pub(crate) fn send_request<C, T, F>(&mut self, component: &mut C, f: F) -> T
where
C: Component<REv>,
<C as Component<REv>>::Event: Send + 'static,
T: Send + 'static,
F: FnOnce(Responder<T>) -> C::Event,
REv: ReactorEvent,
{
let (sender, receiver) = oneshot::channel();
let responder = Responder::without_shutdown(sender);
let request_event = f(responder);
let returned_effects = self.send_event(component, request_event);
let mut join_handles = Vec::new();
for effect in returned_effects {
join_handles.push(self.runtime.spawn(effect));
}
self.runtime.block_on(receiver).unwrap_or_else(|err| {
let join_all = async {
for handle in join_handles {
if let Err(err) = handle.await {
warn!("Join error while waiting for an effect to finish: {}", err);
};
}
};
if let Err(_timeout) = self.runtime.block_on(async move {
tokio::time::timeout(FATAL_GRACE_TIME, join_all).await
}) {
warn!(grace_time=?FATAL_GRACE_TIME, "while a responder was dropped in a unit test, \
I waited for all other pending effects to complete in case the output of a \
`fatal!` was among them but none of them completed");
}
for _ in 0..(self.scheduler.item_count()) {
let ((_ancestor, ev), _queue_kind) = self.runtime.block_on(self.scheduler.pop());
if !ev.is_control() {
debug!(?ev, "ignoring event while looking for a fatal");
continue;
}
match ev.try_into_control().unwrap() {
ControlAnnouncement::ShutdownDueToUserRequest { .. } => {
panic!("a control announcement requesting a shutdown due to user request was received")
}
ControlAnnouncement::ShutdownForUpgrade { .. } => {
panic!("a control announcement requesting a shutdown for upgrade was received")
}
ControlAnnouncement::ShutdownAfterCatchingUp { .. } => {
panic!("a control announcement requesting a shutdown after catching up was received")
}
fatal @ ControlAnnouncement::FatalError { .. } => {
panic!(
"a control announcement requesting a fatal error was received: {}",
fatal
)
}
ControlAnnouncement::QueueDumpRequest { .. } => {
panic!("queue dumps are not supported in the test harness")
}
ControlAnnouncement::ActivateFailpoint { .. } => {
panic!("currently no failpoint activations implemented in test harness")
},
}
}
panic!(
"request for {} channel closed with return value \"{}\" in unit test harness",
type_name::<T>(),
err,
);
})
}
#[inline]
pub(crate) fn send_event<C>(&mut self, component: &mut C, ev: C::Event) -> Effects<C::Event>
where
C: Component<REv>,
{
component.handle_event(self.effect_builder, &mut self.rng, ev)
}
}
impl<REv: 'static + Debug> Default for ComponentHarness<REv> {
fn default() -> Self {
Self::builder().build()
}
}
#[derive(Debug, From)]
pub(crate) enum UnitTestEvent {
#[from]
ControlAnnouncement(ControlAnnouncement),
#[from]
FatalAnnouncement(FatalAnnouncement),
#[from]
NetworkRequest(#[allow(dead_code)] NetworkRequest<Message>),
}
impl ReactorEvent for UnitTestEvent {
fn is_control(&self) -> bool {
match self {
UnitTestEvent::ControlAnnouncement(_) | UnitTestEvent::FatalAnnouncement(_) => true,
UnitTestEvent::NetworkRequest(_) => false,
}
}
fn try_into_control(self) -> Option<ControlAnnouncement> {
match self {
UnitTestEvent::ControlAnnouncement(ctrl_ann) => Some(ctrl_ann),
UnitTestEvent::FatalAnnouncement(FatalAnnouncement { file, line, msg }) => {
Some(ControlAnnouncement::FatalError { file, line, msg })
}
UnitTestEvent::NetworkRequest(_) => None,
}
}
}
pub(crate) async fn advance_time(duration: time::Duration) {
tokio::time::pause();
tokio::time::advance(duration).await;
tokio::time::resume();
debug!("advanced time by {} secs", duration.as_secs());
}
pub fn assert_schema(schema_path: String, actual_schema: String) {
let expected_schema = fs::read_to_string(&schema_path).unwrap();
let expected_schema: Value = serde_json::from_str(&expected_schema).unwrap();
let mut temp_file = tempfile::Builder::new()
.suffix(".json")
.tempfile_in(env!("OUT_DIR"))
.unwrap();
temp_file.write_all(actual_schema.as_bytes()).unwrap();
let actual_schema: Value = serde_json::from_str(&actual_schema).unwrap();
let (_file, temp_file_path) = temp_file.keep().unwrap();
let result = assert_json_matches_no_panic(
&actual_schema,
&expected_schema,
Config::new(CompareMode::Strict),
);
assert_eq!(
result,
Ok(()),
"schema does not match:\nexpected:\n{}\nactual:\n{}\n",
schema_path,
temp_file_path.display()
);
assert_json_eq!(actual_schema, expected_schema);
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use super::{unused_port_on_localhost, ComponentHarness};
#[test]
fn default_works_without_panicking_for_component_harness() {
let _harness = ComponentHarness::<()>::default();
}
#[test]
fn can_generate_at_least_100_unused_ports() {
let ports: HashSet<u16> = (0..100).map(|_| unused_port_on_localhost()).collect();
assert_eq!(ports.len(), 100);
}
}