use std::collections::HashMap;
use elara_core::{
Event, EventType, MutationOp, NodeId, StateAtom, StateId, StateTime, StateType, VersionVector,
};
use elara_state::ReconciliationEngine;
use elara_time::TimeEngine;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
#[derive(Clone, Debug)]
pub struct FuzzerConfig {
pub node_count: usize,
pub state_count: usize,
pub event_count: usize,
pub concurrent_prob: f64,
pub reorder_prob: f64,
pub partition_prob: f64,
pub seed: u64,
}
impl Default for FuzzerConfig {
fn default() -> Self {
FuzzerConfig {
node_count: 5,
state_count: 10,
event_count: 1000,
concurrent_prob: 0.3,
reorder_prob: 0.2,
partition_prob: 0.05,
seed: 42,
}
}
}
impl FuzzerConfig {
pub fn light() -> Self {
FuzzerConfig {
node_count: 3,
state_count: 5,
event_count: 100,
concurrent_prob: 0.2,
reorder_prob: 0.1,
partition_prob: 0.0,
seed: 42,
}
}
pub fn heavy() -> Self {
FuzzerConfig {
node_count: 10,
state_count: 50,
event_count: 10000,
concurrent_prob: 0.5,
reorder_prob: 0.4,
partition_prob: 0.1,
seed: 42,
}
}
pub fn adversarial() -> Self {
FuzzerConfig {
node_count: 20,
state_count: 100,
event_count: 5000,
concurrent_prob: 0.7,
reorder_prob: 0.6,
partition_prob: 0.2,
seed: 42,
}
}
}
#[derive(Clone, Debug)]
pub struct FuzzEvent {
pub event: Event,
pub delivery_order: u64,
pub partition_id: u32,
}
pub struct FuzzNode {
pub node_id: NodeId,
pub engine: ReconciliationEngine,
pub time_engine: TimeEngine,
pub partition_id: u32,
pub events_processed: u64,
}
impl FuzzNode {
pub fn new(node_id: NodeId) -> Self {
FuzzNode {
node_id,
engine: ReconciliationEngine::new(),
time_engine: TimeEngine::new(),
partition_id: 0,
events_processed: 0,
}
}
pub fn process(&mut self, event: Event) {
let _ = self.engine.process_events(vec![event], &self.time_engine);
self.events_processed += 1;
}
pub fn get_state(&self, id: StateId) -> Option<&StateAtom> {
self.engine.field().get(id)
}
}
pub struct StateFuzzer {
config: FuzzerConfig,
nodes: HashMap<NodeId, FuzzNode>,
state_ids: Vec<StateId>,
rng: StdRng,
event_seq: u64,
current_time: StateTime,
}
impl StateFuzzer {
pub fn new(config: FuzzerConfig) -> Self {
let rng = StdRng::seed_from_u64(config.seed);
let mut nodes = HashMap::new();
for i in 0..config.node_count {
let node_id = NodeId::new(i as u64);
nodes.insert(node_id, FuzzNode::new(node_id));
}
let state_ids: Vec<StateId> = (0..config.state_count)
.map(|i| StateId::new(i as u64))
.collect();
StateFuzzer {
config,
nodes,
state_ids,
rng,
event_seq: 0,
current_time: StateTime::ZERO,
}
}
pub fn initialize_states(&mut self) {
let node_ids: Vec<NodeId> = self.nodes.keys().copied().collect();
for state_id in &self.state_ids {
let owner_idx = self.rng.gen_range(0..node_ids.len());
let owner = node_ids[owner_idx];
for node in self.nodes.values_mut() {
let atom = StateAtom::new(*state_id, StateType::Core, owner);
node.engine.field_mut().insert(atom);
}
}
}
fn generate_event(&mut self) -> FuzzEvent {
let node_ids: Vec<NodeId> = self.nodes.keys().copied().collect();
let source_idx = self.rng.gen_range(0..node_ids.len());
let source = node_ids[source_idx];
let state_idx = self.rng.gen_range(0..self.state_ids.len());
let target_state = self.state_ids[state_idx];
self.event_seq += 1;
self.current_time = StateTime::from_millis(self.current_time.as_millis() + 10);
let mutation = self.generate_mutation();
let event = Event::new(
source,
self.event_seq,
EventType::StateUpdate,
target_state,
mutation,
);
let delivery_order = if self.rng.gen::<f64>() < self.config.reorder_prob {
self.rng.gen_range(0..self.event_seq)
} else {
self.event_seq
};
let partition_id = if self.rng.gen::<f64>() < self.config.partition_prob {
self.rng.gen_range(0..3)
} else {
0
};
FuzzEvent {
event,
delivery_order,
partition_id,
}
}
fn generate_mutation(&mut self) -> MutationOp {
let mutation_type = self.rng.gen_range(0..4);
match mutation_type {
0 => {
let len = self.rng.gen_range(1..100);
let data: Vec<u8> = (0..len).map(|_| self.rng.gen()).collect();
MutationOp::Set(data)
}
1 => {
let len = self.rng.gen_range(1..50);
let data: Vec<u8> = (0..len).map(|_| self.rng.gen()).collect();
MutationOp::Append(data)
}
2 => {
let delta = self.rng.gen_range(-100..100);
MutationOp::Increment(delta)
}
_ => MutationOp::Delete,
}
}
pub fn run(&mut self) -> FuzzResult {
self.initialize_states();
let mut events: Vec<FuzzEvent> = Vec::new();
for _ in 0..self.config.event_count {
events.push(self.generate_event());
}
events.sort_by_key(|e| e.delivery_order);
for fuzz_event in events {
for node in self.nodes.values_mut() {
if fuzz_event.partition_id != 0 && node.partition_id != fuzz_event.partition_id {
continue;
}
node.process(fuzz_event.event.clone());
}
}
self.check_invariants()
}
fn check_invariants(&self) -> FuzzResult {
let mut result = FuzzResult::new();
result.convergence = self.check_convergence();
result.authority_violations = self.check_authority();
result.causality_violations = self.check_causality();
result
}
fn check_convergence(&self) -> ConvergenceResult {
let node_ids: Vec<NodeId> = self.nodes.keys().copied().collect();
if node_ids.len() < 2 {
return ConvergenceResult::Converged;
}
let reference = &self.nodes[&node_ids[0]];
let mut divergent_states = Vec::new();
for state_id in &self.state_ids {
let ref_atom = reference.get_state(*state_id);
for &node_id in &node_ids[1..] {
let node = &self.nodes[&node_id];
let atom = node.get_state(*state_id);
match (ref_atom, atom) {
(Some(r), Some(a)) => {
if r.value != a.value {
divergent_states.push(*state_id);
}
}
(None, Some(_)) | (Some(_), None) => {
divergent_states.push(*state_id);
}
(None, None) => {}
}
}
}
if divergent_states.is_empty() {
ConvergenceResult::Converged
} else {
ConvergenceResult::Diverged(divergent_states)
}
}
fn check_authority(&self) -> u32 {
0
}
fn check_causality(&self) -> u32 {
0
}
}
#[derive(Debug)]
pub enum ConvergenceResult {
Converged,
Diverged(Vec<StateId>),
}
impl ConvergenceResult {
pub fn is_converged(&self) -> bool {
matches!(self, ConvergenceResult::Converged)
}
}
#[derive(Debug)]
pub struct FuzzResult {
pub convergence: ConvergenceResult,
pub authority_violations: u32,
pub causality_violations: u32,
}
impl FuzzResult {
pub fn new() -> Self {
FuzzResult {
convergence: ConvergenceResult::Converged,
authority_violations: 0,
causality_violations: 0,
}
}
pub fn is_valid(&self) -> bool {
self.convergence.is_converged()
&& self.authority_violations == 0
&& self.causality_violations == 0
}
}
impl Default for FuzzResult {
fn default() -> Self {
Self::new()
}
}
pub mod properties {
use super::*;
pub fn source_ordering_preserved(events: &[Event]) -> bool {
let mut last_seq: HashMap<NodeId, u64> = HashMap::new();
for event in events {
if let Some(&prev) = last_seq.get(&event.source) {
if event.id.seq <= prev {
return false;
}
}
last_seq.insert(event.source, event.id.seq);
}
true
}
pub fn version_monotonic(before: &VersionVector, after: &VersionVector) -> bool {
!after.happens_before(before)
}
pub fn merge_commutative(v1: &VersionVector, v2: &VersionVector) -> bool {
let m1 = v1.merge(v2);
let m2 = v2.merge(v1);
m1 == m2
}
pub fn merge_associative(v1: &VersionVector, v2: &VersionVector, v3: &VersionVector) -> bool {
let m1 = v1.merge(v2).merge(v3);
let m2 = v1.merge(&v2.merge(v3));
m1 == m2
}
pub fn merge_idempotent(v: &VersionVector) -> bool {
let m = v.merge(v);
m == *v
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_fuzzer_light() {
let mut fuzzer = StateFuzzer::new(FuzzerConfig::light());
let result = fuzzer.run();
println!("Light fuzz result: {:?}", result);
}
#[test]
fn test_fuzzer_default() {
let mut fuzzer = StateFuzzer::new(FuzzerConfig::default());
let result = fuzzer.run();
println!("Default fuzz result: {:?}", result);
}
#[test]
fn test_version_vector_properties() {
let mut v1 = VersionVector::new();
let mut v2 = VersionVector::new();
let mut v3 = VersionVector::new();
v1.increment(NodeId::new(1));
v1.increment(NodeId::new(2));
v2.increment(NodeId::new(2));
v2.increment(NodeId::new(3));
v3.increment(NodeId::new(1));
v3.increment(NodeId::new(3));
assert!(properties::merge_commutative(&v1, &v2));
assert!(properties::merge_associative(&v1, &v2, &v3));
assert!(properties::merge_idempotent(&v1));
}
#[test]
fn test_source_ordering() {
let events = vec![
Event::new(
NodeId::new(1),
1,
EventType::StateUpdate,
StateId::new(1),
MutationOp::Set(vec![1]),
),
Event::new(
NodeId::new(1),
2,
EventType::StateUpdate,
StateId::new(1),
MutationOp::Set(vec![2]),
),
Event::new(
NodeId::new(2),
1,
EventType::StateUpdate,
StateId::new(1),
MutationOp::Set(vec![3]),
),
Event::new(
NodeId::new(1),
3,
EventType::StateUpdate,
StateId::new(1),
MutationOp::Set(vec![4]),
),
];
assert!(properties::source_ordering_preserved(&events));
}
#[test]
fn test_convergence_detection() {
let config = FuzzerConfig {
node_count: 3,
state_count: 2,
event_count: 10,
concurrent_prob: 0.0,
reorder_prob: 0.0,
partition_prob: 0.0,
seed: 42,
};
let mut fuzzer = StateFuzzer::new(config);
let result = fuzzer.run();
assert!(result.convergence.is_converged());
}
}