mod condition_check_reactor;
pub mod network;
mod test_rng;
use std::{
any::type_name,
collections::HashSet,
fmt::Debug,
marker::PhantomData,
sync::atomic::{AtomicU16, Ordering},
};
use futures::channel::oneshot;
use serde::{de::DeserializeOwned, Serialize};
use tempfile::TempDir;
use tokio::runtime::{self, Runtime};
use crate::{
components::Component,
effect::{EffectBuilder, Effects, Responder},
logging,
reactor::{EventQueueHandle, QueueKind, Scheduler},
};
use anyhow::Context;
pub(crate) use condition_check_reactor::ConditionCheckReactor;
pub(crate) use test_rng::TestRng;
const PORT_LOWER_BOUND: u16 = 10_000;
pub fn bincode_roundtrip<T: Serialize + DeserializeOwned + Eq + Debug>(value: &T) {
let serialized = bincode::serialize(value).unwrap();
let deserialized = bincode::deserialize(serialized.as_slice()).unwrap();
assert_eq!(*value, deserialized);
}
#[allow(clippy::assertions_on_constants)]
pub(crate) fn unused_port_on_localhost() -> u16 {
const PRIME: u16 = 54101;
const GENERATOR: u16 = 35892;
assert!(PORT_LOWER_BOUND + PRIME + 10 < u16::MAX);
static RNG_STATE: AtomicU16 = AtomicU16::new(GENERATOR);
for _ in 0..10_000 {
if let Ok(fresh_port) =
RNG_STATE.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
let new_value = (state as u32 + GENERATOR as u32) % (PRIME as u32);
Some(new_value as u16 + PORT_LOWER_BOUND)
})
{
return fresh_port;
}
}
panic!("could not generate random new port after 10_000 tries");
}
pub(crate) fn init_logging() {
logging::init()
.ok();
}
pub(crate) struct ComponentHarness<REv: 'static> {
pub(crate) rng: TestRng,
pub(crate) scheduler: &'static Scheduler<REv>,
#[allow(unused)] pub(crate) event_queue_handle: EventQueueHandle<REv>,
pub(crate) effect_builder: EffectBuilder<REv>,
pub(crate) tmp: TempDir,
pub(crate) runtime: Runtime,
}
pub(crate) struct ComponentHarnessBuilder<REv: 'static> {
rng: Option<TestRng>,
tmp: Option<TempDir>,
_phantom: PhantomData<REv>,
}
impl<REv: 'static> ComponentHarnessBuilder<REv> {
pub(crate) fn build(self) -> ComponentHarness<REv> {
self.try_build().expect("failed to build component harness")
}
pub(crate) fn on_disk(mut self, on_disk: TempDir) -> ComponentHarnessBuilder<REv> {
self.tmp = Some(on_disk);
self
}
pub(crate) fn rng(mut self, rng: TestRng) -> ComponentHarnessBuilder<REv> {
self.rng = Some(rng);
self
}
pub(crate) fn try_build(self) -> anyhow::Result<ComponentHarness<REv>> {
let tmp = match self.tmp {
Some(tmp) => tmp,
None => {
TempDir::new().context("could not create temporary directory for test harness")?
}
};
let rng = self.rng.unwrap_or_else(TestRng::new);
let scheduler = Box::leak(Box::new(Scheduler::new(QueueKind::weights())));
let event_queue_handle = EventQueueHandle::new(scheduler);
let effect_builder = EffectBuilder::new(event_queue_handle);
let runtime = runtime::Builder::new()
.threaded_scheduler()
.enable_all()
.build()
.context("build tokio runtime")?;
Ok(ComponentHarness {
rng,
scheduler,
event_queue_handle,
effect_builder,
tmp,
runtime,
})
}
}
impl<REv: 'static> ComponentHarness<REv> {
pub(crate) fn builder() -> ComponentHarnessBuilder<REv> {
ComponentHarnessBuilder {
rng: None,
tmp: None,
_phantom: PhantomData,
}
}
pub(crate) fn into_parts(self) -> (TempDir, TestRng) {
(self.tmp, self.rng)
}
pub(crate) fn is_idle(&self) -> bool {
self.scheduler.item_count() == 0
}
pub(crate) fn send_request<C, T, F>(&mut self, component: &mut C, f: F) -> T
where
C: Component<REv>,
<C as Component<REv>>::Event: Send + 'static,
T: Send + 'static,
F: FnOnce(Responder<T>) -> C::Event,
{
let (sender, receiver) = oneshot::channel();
let responder = Responder::create(sender);
let request_event = f(responder);
let returned_effects = self.send_event(component, request_event);
for effect in returned_effects {
self.runtime.spawn(effect);
}
self.runtime.block_on(receiver).unwrap_or_else(|err| {
panic!(
"request for {} channel closed with error \"{}\", this is a serious bug --- \
a component will likely be stuck from now on",
err,
type_name::<T>()
);
})
}
#[inline]
pub(crate) fn send_event<C>(&mut self, component: &mut C, ev: C::Event) -> Effects<C::Event>
where
C: Component<REv>,
{
component.handle_event(self.effect_builder, &mut self.rng, ev)
}
}
impl<REv: 'static> Default for ComponentHarness<REv> {
fn default() -> Self {
Self::builder().build()
}
}
#[test]
fn test_random_port_gen() {
const NUM_ROUNDS: usize = 40_000;
let values: HashSet<_> = (0..NUM_ROUNDS)
.map(|_| {
let port = unused_port_on_localhost();
assert!(port >= PORT_LOWER_BOUND);
port
})
.collect();
assert_eq!(values.len(), NUM_ROUNDS);
}
#[test]
fn default_works_without_panicking_for_component_harness() {
let _harness = ComponentHarness::<()>::default();
}