use std::{
error::Error,
fmt::{self, Display, Formatter},
marker::PhantomData,
mem,
ops::Range,
sync::Arc,
};
use reifydb_core::{common::CommitVersion, encoded::shape::RowShape, interface::catalog::flow::FlowNodeId};
use reifydb_type::value::Value;
pub mod batcher;
pub mod config;
pub mod event;
pub mod generator;
pub mod materialize;
pub mod oracle;
pub mod report;
pub mod runner;
pub mod schema;
pub mod strategy;
use config::{ChaosConfig, SupportedOps};
use event::ChaosBatch;
use oracle::MaterializedTable;
use report::Tolerances;
use runner::{OracleFn, RunnableChaos};
use schema::{ChaosSchema, KeyStrategy};
use strategy::{ColumnRegistry, ColumnSampler, RowContent, samplers};
use crate::{operator::FFIOperator, testing::harness::OperatorTestHarness};
#[derive(Debug)]
pub enum ChaosError {
UnreachableSupportedOps,
MissingField(&'static str),
OutputKeyColumnMissing(String),
InputColumnsMissingSampler(Vec<String>),
HarnessBuild(String),
}
impl Display for ChaosError {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
ChaosError::UnreachableSupportedOps => write!(
f,
"SupportedOps configuration is unreachable: enabling Update or Remove without Insert leaves the generator with no way to populate live rows"
),
ChaosError::MissingField(name) => write!(f, "missing required builder field: {name}"),
ChaosError::OutputKeyColumnMissing(col) => {
write!(f, "output_key column '{col}' is not present in output_shape")
}
ChaosError::InputColumnsMissingSampler(cols) => {
write!(f, "input columns without samplers: {cols:?}")
}
ChaosError::HarnessBuild(msg) => write!(f, "operator harness build failed: {msg}"),
}
}
}
impl Error for ChaosError {}
pub type ChaosResult<T> = Result<T, ChaosError>;
pub struct ChaosHarness<T: FFIOperator> {
_phantom: PhantomData<T>,
}
impl<T: FFIOperator> ChaosHarness<T> {
pub fn builder() -> ChaosHarnessBuilder<T> {
ChaosHarnessBuilder::new()
}
}
pub struct ChaosHarnessBuilder<T: FFIOperator> {
seed: u64,
config: ChaosConfig,
node_id: FlowNodeId,
version: CommitVersion,
operator_config: Vec<(String, Value)>,
input_shape: Option<RowShape>,
output_shape: Option<RowShape>,
key_strategy: Option<KeyStrategy>,
output_key_columns: Vec<String>,
registry: ColumnRegistry,
tolerances: Tolerances,
oracle: Option<OracleFn>,
_phantom: PhantomData<T>,
}
impl<T: FFIOperator> Default for ChaosHarnessBuilder<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: FFIOperator> ChaosHarnessBuilder<T> {
pub fn new() -> Self {
Self {
seed: 0,
config: ChaosConfig::default(),
node_id: FlowNodeId(1),
version: CommitVersion(1),
operator_config: Vec::new(),
input_shape: None,
output_shape: None,
key_strategy: None,
output_key_columns: Vec::new(),
registry: ColumnRegistry::new(),
tolerances: Tolerances::new(),
oracle: None,
_phantom: PhantomData,
}
}
pub fn seed(mut self, seed: u64) -> Self {
self.seed = seed;
self
}
pub fn with_chaos(mut self, config: ChaosConfig) -> Self {
self.config = config;
self
}
pub fn with_supported_ops(mut self, ops: SupportedOps) -> Self {
self.config.supported_ops = ops;
self
}
pub fn with_node_id(mut self, node_id: FlowNodeId) -> Self {
self.node_id = node_id;
self
}
pub fn with_version(mut self, version: CommitVersion) -> Self {
self.version = version;
self
}
pub fn with_config<I, K>(mut self, config: I) -> Self
where
I: IntoIterator<Item = (K, Value)>,
K: Into<String>,
{
self.operator_config = config.into_iter().map(|(k, v)| (k.into(), v)).collect();
self
}
pub fn with_input_shape(mut self, shape: RowShape) -> Self {
self.input_shape = Some(shape);
self
}
pub fn with_output_shape(mut self, shape: RowShape) -> Self {
self.output_shape = Some(shape);
self
}
pub fn with_key_strategy(mut self, key_strategy: KeyStrategy) -> Self {
self.key_strategy = Some(key_strategy);
self
}
pub fn with_output_key<I, S>(mut self, columns: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.output_key_columns = columns.into_iter().map(Into::into).collect();
self
}
pub fn with_column(mut self, name: impl Into<String>, sampler: ColumnSampler) -> Self {
self.registry.register(name, sampler);
self
}
pub fn with_row_constraints(mut self, f: impl Fn(&mut RowContent) + Send + Sync + 'static) -> Self {
self.registry.set_constraint(Arc::new(f));
self
}
pub fn with_tolerance(mut self, column: impl Into<String>, tol: f64) -> Self {
self.tolerances = mem::take(&mut self.tolerances).with(column, tol);
self
}
pub fn with_oracle<F>(mut self, f: F) -> Self
where
F: Fn(&[ChaosBatch]) -> MaterializedTable + Send + Sync + 'static,
{
self.oracle = Some(Arc::new(f));
self
}
pub fn build(self) -> ChaosResult<RunnableChaos<T>> {
validate_supported_ops(&self.config.supported_ops)?;
let input_shape = self.input_shape.ok_or(ChaosError::MissingField("input_shape"))?;
let output_shape = self.output_shape.ok_or(ChaosError::MissingField("output_shape"))?;
let key_strategy = self.key_strategy.ok_or(ChaosError::MissingField("key_strategy"))?;
if self.output_key_columns.is_empty() {
return Err(ChaosError::MissingField("output_key"));
}
let oracle = self.oracle.ok_or(ChaosError::MissingField("oracle"))?;
let schema = ChaosSchema {
input_shape,
output_shape,
key_strategy,
output_key_columns: self.output_key_columns,
};
schema.validate().map_err(ChaosError::OutputKeyColumnMissing)?;
self.registry.validate(&schema.input_shape).map_err(ChaosError::InputColumnsMissingSampler)?;
let schema = Arc::new(schema);
let mut builder =
OperatorTestHarness::<T>::builder().with_node_id(self.node_id).with_version(self.version);
for (k, v) in self.operator_config {
builder = builder.add_config(k, v);
}
let harness = builder.build().map_err(|e| ChaosError::HarnessBuild(format!("{e:?}")))?;
Ok(RunnableChaos {
seed: self.seed,
config: self.config,
schema,
registry: Arc::new(self.registry),
tolerances: self.tolerances,
oracle,
harness,
})
}
}
fn validate_supported_ops(ops: &SupportedOps) -> ChaosResult<()> {
if !ops.insert && (ops.update || ops.remove) {
return Err(ChaosError::UnreachableSupportedOps);
}
Ok(())
}
pub trait IntoColumnSampler {
fn into_sampler(self) -> ColumnSampler;
}
impl IntoColumnSampler for ColumnSampler {
fn into_sampler(self) -> ColumnSampler {
self
}
}
impl IntoColumnSampler for Range<u64> {
fn into_sampler(self) -> ColumnSampler {
samplers::u64_range(self)
}
}
impl IntoColumnSampler for Range<u32> {
fn into_sampler(self) -> ColumnSampler {
samplers::u32_range(self)
}
}
impl IntoColumnSampler for Range<i64> {
fn into_sampler(self) -> ColumnSampler {
samplers::i64_range(self)
}
}
impl IntoColumnSampler for Range<f64> {
fn into_sampler(self) -> ColumnSampler {
samplers::f64_range(self)
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use reifydb_abi::operator::capabilities::CAPABILITY_ALL_STANDARD;
use reifydb_core::encoded::shape::{RowShape, RowShapeField};
use reifydb_type::value::{row_number::RowNumber, r#type::Type};
use super::{config::BatchSizeDist, *};
use crate::{
error::Result,
operator::{
FFIOperator, FFIOperatorMetadata, change::BorrowedChange, column::operator::OperatorColumn,
context::OperatorContext,
},
};
struct NoOpOperator;
impl FFIOperatorMetadata for NoOpOperator {
const NAME: &'static str = "noop";
const API: u32 = 1;
const VERSION: &'static str = "1.0.0";
const DESCRIPTION: &'static str = "no-op operator for chaos builder tests";
const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
}
impl FFIOperator for NoOpOperator {
fn new(_operator_id: FlowNodeId, _config: &HashMap<String, Value>) -> Result<Self> {
Ok(Self)
}
fn apply(&mut self, _ctx: &mut OperatorContext, _input: BorrowedChange<'_>) -> Result<()> {
Ok(())
}
fn pull(&mut self, _ctx: &mut OperatorContext, _row_numbers: &[RowNumber]) -> Result<()> {
Ok(())
}
}
fn shape(fields: &[(&str, Type)]) -> RowShape {
RowShape::new(fields.iter().map(|(n, t)| RowShapeField::unconstrained(*n, t.clone())).collect())
}
#[test]
fn types_compile() {
let _ = ChaosConfig::default();
let _ = SupportedOps::default();
let _ = SupportedOps::insert_only();
let _ = SupportedOps::no_remove();
let _ = SupportedOps::no_update();
let _ = BatchSizeDist::default();
let _ = BatchSizeDist::Constant(1);
let _ = BatchSizeDist::Uniform {
min: 1,
max: 10,
};
let _ = BatchSizeDist::Geometric(0.4);
let _ = MaterializedTable::empty();
}
#[test]
fn unreachable_supported_ops_rejected() {
let bad = SupportedOps {
insert: false,
update: true,
remove: false,
};
assert!(matches!(validate_supported_ops(&bad), Err(ChaosError::UnreachableSupportedOps)));
let also_bad = SupportedOps {
insert: false,
update: false,
remove: true,
};
assert!(matches!(validate_supported_ops(&also_bad), Err(ChaosError::UnreachableSupportedOps)));
}
#[test]
fn reachable_supported_ops_accepted() {
assert!(validate_supported_ops(&SupportedOps::all()).is_ok());
assert!(validate_supported_ops(&SupportedOps::insert_only()).is_ok());
assert!(validate_supported_ops(&SupportedOps::no_remove()).is_ok());
assert!(validate_supported_ops(&SupportedOps::no_update()).is_ok());
}
#[test]
fn empty_supported_ops_is_unreachable() {
let none = SupportedOps {
insert: false,
update: false,
remove: false,
};
assert!(validate_supported_ops(&none).is_ok());
}
fn well_formed_builder() -> ChaosHarnessBuilder<NoOpOperator> {
ChaosHarness::<NoOpOperator>::builder()
.with_input_shape(shape(&[("k", Type::Uint8), ("v", Type::Float8)]))
.with_output_shape(shape(&[("k", Type::Uint8), ("v", Type::Float8)]))
.with_key_strategy(KeyStrategy::Sequential)
.with_output_key(["k"])
.with_column("k", samplers::u64_range(1..1000))
.with_column("v", samplers::f64_range(0.0..1.0))
.with_oracle(|_| MaterializedTable::empty())
}
#[test]
fn build_accepts_well_formed_builder() {
assert!(well_formed_builder().build().is_ok(), "expected well-formed builder to succeed");
}
fn expect_build_err(result: ChaosResult<RunnableChaos<NoOpOperator>>, label: &str) -> ChaosError {
match result {
Ok(_) => panic!("expected error from build(): {label}"),
Err(e) => e,
}
}
#[test]
fn build_rejects_typoed_output_key() {
let err =
expect_build_err(well_formed_builder().with_output_key(["typo"]).build(), "typo'd output_key");
match err {
ChaosError::OutputKeyColumnMissing(col) => assert_eq!(col, "typo"),
other => panic!("expected OutputKeyColumnMissing(\"typo\"), got {other:?}"),
}
}
#[test]
fn build_rejects_input_columns_without_samplers() {
let result = ChaosHarness::<NoOpOperator>::builder()
.with_input_shape(shape(&[("k", Type::Uint8), ("v", Type::Float8), ("missing", Type::Int8)]))
.with_output_shape(shape(&[("k", Type::Uint8)]))
.with_key_strategy(KeyStrategy::Sequential)
.with_output_key(["k"])
.with_column("k", samplers::u64_range(1..1000))
.with_column("v", samplers::f64_range(0.0..1.0))
.with_oracle(|_| MaterializedTable::empty())
.build();
match expect_build_err(result, "missing sampler") {
ChaosError::InputColumnsMissingSampler(cols) => {
assert_eq!(cols, vec!["missing".to_string()]);
}
other => panic!("expected InputColumnsMissingSampler, got {other:?}"),
}
}
#[test]
fn build_rejects_missing_required_fields() {
let err = expect_build_err(ChaosHarness::<NoOpOperator>::builder().build(), "no input_shape");
assert!(matches!(err, ChaosError::MissingField("input_shape")), "{err:?}");
let err = expect_build_err(
ChaosHarness::<NoOpOperator>::builder().with_input_shape(shape(&[("k", Type::Uint8)])).build(),
"no output_shape",
);
assert!(matches!(err, ChaosError::MissingField("output_shape")), "{err:?}");
let err = expect_build_err(
ChaosHarness::<NoOpOperator>::builder()
.with_input_shape(shape(&[("k", Type::Uint8)]))
.with_output_shape(shape(&[("k", Type::Uint8)]))
.build(),
"no key_strategy",
);
assert!(matches!(err, ChaosError::MissingField("key_strategy")), "{err:?}");
let err = expect_build_err(
ChaosHarness::<NoOpOperator>::builder()
.with_input_shape(shape(&[("k", Type::Uint8)]))
.with_output_shape(shape(&[("k", Type::Uint8)]))
.with_key_strategy(KeyStrategy::Sequential)
.build(),
"no output_key",
);
assert!(matches!(err, ChaosError::MissingField("output_key")), "{err:?}");
let err = expect_build_err(
ChaosHarness::<NoOpOperator>::builder()
.with_input_shape(shape(&[("k", Type::Uint8)]))
.with_output_shape(shape(&[("k", Type::Uint8)]))
.with_key_strategy(KeyStrategy::Sequential)
.with_output_key(["k"])
.with_column("k", samplers::u64_range(1..1000))
.build(),
"no oracle",
);
assert!(matches!(err, ChaosError::MissingField("oracle")), "{err:?}");
}
}