use rustsim::prelude::*;
#[derive(Debug, Clone)]
struct Particle {
id: AgentId,
x: f32,
vx: f32,
}
impl Agent for Particle {
fn id(&self) -> AgentId {
self.id
}
}
impl SoaExtractable for Particle {
fn num_columns() -> usize {
2
}
fn column_names() -> Vec<&'static str> {
vec!["x", "vx"]
}
fn extract_row(&self, columns: &mut [Vec<f32>]) {
columns[0].push(self.x);
columns[1].push(self.vx);
}
fn write_back_row(&mut self, columns: &[&[f32]], row: usize) {
self.x = columns[0][row];
self.vx = columns[1][row];
}
}
fn build_runtime() -> ColumnarRuntime {
let mut store = VecStore::new();
for id in 1..=4 {
store.insert(Particle {
id,
x: id as f32,
vx: id as f32 * 0.5,
});
}
let mut runtime = ColumnarRuntime::upload::<Particle, _>(&store);
runtime.add_cpu_phase("integrate", |columns, n| {
let (x_col, rest) = columns.split_at_mut(1);
let x = &mut x_col[0];
let vx = &rest[0];
for row in 0..n {
x[row] += vx[row];
}
});
runtime
}
fn x_values(control: &EngineControlPlane) -> Vec<f32> {
control.runtime().device_store().column(0).to_vec()
}
#[test]
fn checkpoint_restore_rewinds_state_and_preserves_phases() {
let mut control = EngineControlPlane::new(build_runtime());
control.step().unwrap();
let checkpoint = control
.checkpoint(Some("after-step-1".to_string()))
.unwrap();
control.step().unwrap();
let after_step_2 = x_values(&control);
control.step().unwrap();
assert_ne!(x_values(&control), after_step_2);
control.restore(&checkpoint).unwrap();
assert_eq!(control.runtime().step_index(), 1);
assert_eq!(x_values(&control), checkpoint.state.columns[0]);
control.step().unwrap();
assert_eq!(x_values(&control), after_step_2);
}
#[test]
fn deterministic_replay_records_and_verifies_fingerprints() {
let record_config = ControlPlaneConfig {
replay_mode: ReplayMode::Record,
..ControlPlaneConfig::default()
};
let mut recorder = EngineControlPlane::with_config(build_runtime(), record_config).unwrap();
recorder.run(3).unwrap();
let expected = recorder.recorded_replay().to_vec();
let verify_config = ControlPlaneConfig {
replay_mode: ReplayMode::Verify { expected },
..ControlPlaneConfig::default()
};
let mut verifier = EngineControlPlane::with_config(build_runtime(), verify_config).unwrap();
let verified = verifier.run(3).unwrap();
assert_eq!(verified.len(), 3);
assert_eq!(verifier.metrics().replay_fingerprints, 3);
}
#[test]
fn deterministic_replay_reports_typed_divergence() {
let record_config = ControlPlaneConfig {
replay_mode: ReplayMode::Record,
..ControlPlaneConfig::default()
};
let mut recorder = EngineControlPlane::with_config(build_runtime(), record_config).unwrap();
recorder.step().unwrap();
let mut expected = recorder.recorded_replay().to_vec();
expected[0].value ^= 1;
let verify_config = ControlPlaneConfig {
replay_mode: ReplayMode::Verify { expected },
..ControlPlaneConfig::default()
};
let mut verifier = EngineControlPlane::with_config(build_runtime(), verify_config).unwrap();
let err = verifier.step().unwrap_err();
assert!(matches!(
err,
EngineControlPlaneError::ReplayDivergence { step_index: 1, .. }
));
}
#[test]
fn automatic_checkpoints_and_metrics_are_retained() {
let config = ControlPlaneConfig {
checkpoint_policy: CheckpointPolicy::every_steps(2),
max_retained_checkpoints: 2,
partition_plan: PartitionPlan::row_ranges(4, 2).unwrap(),
..ControlPlaneConfig::default()
};
let mut control = EngineControlPlane::with_config(build_runtime(), config).unwrap();
control.run(5).unwrap();
let metrics = control.metrics();
let checkpoints: Vec<_> = control.checkpoints().collect();
assert_eq!(metrics.steps_completed, 5);
assert_eq!(metrics.checkpoints_created, 2);
assert_eq!(metrics.partition_count, 2);
assert_eq!(metrics.phases.len(), 1);
assert_eq!(metrics.phases[0].executions, 5);
assert_eq!(checkpoints.len(), 2);
assert_eq!(checkpoints[0].step_index, 2);
assert_eq!(checkpoints[1].step_index, 4);
}
#[test]
fn partition_plan_row_ranges_are_deterministic() {
let plan = PartitionPlan::row_ranges(10, 3).unwrap();
let ranges: Vec<_> = plan
.assignments()
.iter()
.map(|assignment| {
(
assignment.partition_id.0,
assignment.row_start,
assignment.row_end,
)
})
.collect();
assert_eq!(ranges, vec![(0, 0, 4), (1, 4, 7), (2, 7, 10)]);
assert_eq!(plan.partition_count(), 3);
assert!(!plan.has_worker_assignments());
}
#[test]
fn partition_checkpoints_round_trip_authoritative_state() {
let config = ControlPlaneConfig {
partition_plan: PartitionPlan::row_ranges(4, 2)
.unwrap()
.with_workers(&["worker-a", "worker-b"])
.unwrap(),
..ControlPlaneConfig::default()
};
let mut source = EngineControlPlane::with_config(build_runtime(), config.clone()).unwrap();
source.step().unwrap();
let partitions = source.partition_checkpoints().unwrap();
assert_eq!(partitions.len(), 2);
assert_eq!(partitions[0].assignment.worker.as_deref(), Some("worker-a"));
assert_eq!(partitions[0].state.ids, vec![1, 2]);
assert_eq!(partitions[1].state.ids, vec![3, 4]);
let mut restored = EngineControlPlane::with_config(build_runtime(), config).unwrap();
restored.restore_partitions(&partitions).unwrap();
assert_eq!(
restored.runtime().step_index(),
source.runtime().step_index()
);
assert_eq!(x_values(&restored), x_values(&source));
assert_eq!(
restored.runtime().device_store().ids(),
source.runtime().device_store().ids()
);
}
#[test]
fn partition_step_dispatches_workers_and_reassembles_authoritative_state() {
let plan = PartitionPlan::row_ranges(4, 2)
.unwrap()
.with_workers(&["gpu-0", "gpu-1"])
.unwrap();
let config = ControlPlaneConfig {
checkpoint_policy: CheckpointPolicy::every_steps(1),
partition_plan: plan,
partition_workers: vec![
PartitionWorker::cuda_device("gpu-0", 0),
PartitionWorker::cuda_device("gpu-1", 1),
],
..ControlPlaneConfig::default()
};
let mut control = EngineControlPlane::with_config(build_runtime(), config).unwrap();
control.add_partition_cpu_phase("partition_integrate", |context, columns, n| {
let (x_col, rest) = columns.split_at_mut(1);
let x = &mut x_col[0];
let vx = &rest[0];
let device_bias = match context.backend {
PartitionExecutionBackend::CudaDevice { device_ordinal } => device_ordinal as f32,
_ => 0.0,
};
for row in 0..n {
x[row] += vx[row] + device_bias;
}
Ok(())
});
let step = control.step_partitions().unwrap();
assert_eq!(step.step_index, 0);
assert_eq!(control.runtime().step_index(), 1);
assert_eq!(x_values(&control), vec![1.5, 3.0, 5.5, 7.0]);
assert_eq!(step.phases.len(), 2);
assert_eq!(step.phases[0].worker.as_deref(), Some("gpu-0"));
assert_eq!(step.phases[1].worker.as_deref(), Some("gpu-1"));
assert!(matches!(
step.phases[1].backend,
PartitionExecutionBackend::CudaDevice { device_ordinal: 1 }
));
assert!(step.checkpoint.is_some());
let metrics = control.metrics();
assert_eq!(metrics.steps_completed, 1);
assert_eq!(metrics.partition_steps_completed, 1);
assert_eq!(metrics.partition_rows_processed, 4);
assert_eq!(metrics.phase_count, 2);
assert_eq!(metrics.phases.len(), 1);
assert_eq!(metrics.phases[0].executions, 2);
}
#[test]
fn executor_partition_step_runs_self_contained_tasks_and_reassembles_state() {
let plan = PartitionPlan::row_ranges(4, 2)
.unwrap()
.with_workers(&["gpu-0", "gpu-1"])
.unwrap();
let config = ControlPlaneConfig {
checkpoint_policy: CheckpointPolicy::every_steps(1),
partition_plan: plan,
partition_workers: vec![
PartitionWorker::cuda_device("gpu-0", 0),
PartitionWorker::cuda_device("gpu-1", 1),
],
..ControlPlaneConfig::default()
};
let mut control = EngineControlPlane::with_config(build_runtime(), config).unwrap();
control.add_partition_executor_phase("executor_integrate", |context, columns, n| {
let (x_col, rest) = columns.split_at_mut(1);
let x = &mut x_col[0];
let vx = &rest[0];
let device_bias = match context.backend {
PartitionExecutionBackend::CudaDevice { device_ordinal } => device_ordinal as f32,
_ => 0.0,
};
for row in 0..n {
x[row] += vx[row] + device_bias;
}
Ok(())
});
let mut executor = LocalThreadedPartitionExecutor::new();
let step = control
.step_partitions_with_executor(&mut executor)
.unwrap();
assert_eq!(step.step_index, 0);
assert_eq!(control.runtime().step_index(), 1);
assert_eq!(x_values(&control), vec![1.5, 3.0, 5.5, 7.0]);
assert_eq!(step.phases.len(), 2);
assert_eq!(step.phases[0].worker.as_deref(), Some("gpu-0"));
assert_eq!(step.phases[1].worker.as_deref(), Some("gpu-1"));
assert!(step.checkpoint.is_some());
let metrics = control.metrics();
assert_eq!(metrics.steps_completed, 1);
assert_eq!(metrics.partition_steps_completed, 1);
assert_eq!(metrics.partition_rows_processed, 4);
assert_eq!(metrics.phase_count, 2);
assert_eq!(metrics.phases.len(), 1);
assert_eq!(metrics.phases[0].executions, 2);
}
struct RecordingEndpointClient {
endpoints: Vec<String>,
}
impl ExternalPartitionClient for RecordingEndpointClient {
fn execute_partition(
&mut self,
endpoint: &str,
task: PartitionTask,
) -> Result<PartitionTaskResult, String> {
self.endpoints.push(endpoint.to_string());
let (context, mut checkpoint, phase_base, phases) = task.into_parts();
let rows = checkpoint.state.agent_count();
let mut reports = Vec::new();
for (local_phase_index, phase) in phases.iter().enumerate() {
let start = std::time::Instant::now();
phase.run(&context, &mut checkpoint.state.columns, rows)?;
reports.push(PartitionPhaseReport {
partition_id: context.partition_id,
worker: context.worker.clone(),
phase_index: phase_base + local_phase_index,
name: phase.name(),
backend: context.backend.clone(),
rows,
elapsed_us: start.elapsed().as_micros(),
});
}
Ok(PartitionTaskResult::completed(
&context, checkpoint, reports,
))
}
}
#[test]
fn endpoint_partition_executor_uses_external_worker_payloads() {
let plan = PartitionPlan::row_ranges(4, 2)
.unwrap()
.with_workers(&["node-a", "node-b"])
.unwrap();
let config = ControlPlaneConfig {
partition_plan: plan,
partition_workers: vec![
PartitionWorker::external("node-a", "grpc://node-a:7000"),
PartitionWorker::external("node-b", "grpc://node-b:7000"),
],
..ControlPlaneConfig::default()
};
let mut control = EngineControlPlane::with_config(build_runtime(), config).unwrap();
control.add_partition_executor_phase("remote_integrate", |context, columns, n| {
let (x_col, rest) = columns.split_at_mut(1);
let x = &mut x_col[0];
let vx = &rest[0];
let endpoint_bias = match &context.backend {
PartitionExecutionBackend::External { endpoint } if endpoint.contains("node-b") => 10.0,
_ => 0.0,
};
for row in 0..n {
x[row] += vx[row] + endpoint_bias;
}
Ok(())
});
let mut executor = EndpointPartitionExecutor::new(RecordingEndpointClient {
endpoints: Vec::new(),
});
let step = control
.step_partitions_with_executor(&mut executor)
.unwrap();
assert_eq!(x_values(&control), vec![1.5, 3.0, 14.5, 16.0]);
assert_eq!(step.phases.len(), 2);
assert_eq!(
executor.client().endpoints,
vec!["grpc://node-a:7000", "grpc://node-b:7000"]
);
assert!(matches!(
step.phases[1].backend,
PartitionExecutionBackend::External { .. }
));
}
#[test]
fn partition_step_reports_missing_worker_before_mutating_state() {
let config = ControlPlaneConfig {
partition_plan: PartitionPlan::row_ranges(4, 2)
.unwrap()
.with_workers(&["worker-a", "worker-b"])
.unwrap(),
partition_workers: vec![PartitionWorker::local_cpu("worker-a")],
..ControlPlaneConfig::default()
};
let mut control = EngineControlPlane::with_config(build_runtime(), config).unwrap();
control.add_partition_cpu_phase("never_reaches_second_partition", |_context, columns, n| {
for row in 0..n {
columns[0][row] += 10.0;
}
Ok(())
});
let err = control.step_partitions().unwrap_err();
assert!(matches!(
err,
EngineControlPlaneError::MissingPartitionWorker {
partition: PartitionId(1),
..
}
));
assert_eq!(control.runtime().step_index(), 0);
assert_eq!(x_values(&control), vec![1.0, 2.0, 3.0, 4.0]);
}
#[test]
fn partition_step_reports_phase_failures_with_partition_identity() {
let config = ControlPlaneConfig {
partition_plan: PartitionPlan::row_ranges(4, 2).unwrap(),
..ControlPlaneConfig::default()
};
let mut control = EngineControlPlane::with_config(build_runtime(), config).unwrap();
control.add_partition_cpu_phase("fallible_partition_phase", |context, _columns, _n| {
if context.partition_id == PartitionId(1) {
Err("synthetic failure".to_string())
} else {
Ok(())
}
});
let err = control.step_partitions().unwrap_err();
assert!(matches!(
err,
EngineControlPlaneError::PartitionExecution {
partition: PartitionId(1),
phase: "fallible_partition_phase",
..
}
));
assert_eq!(control.runtime().step_index(), 0);
}
#[test]
fn partition_restore_rejects_wrong_plan_order() {
let config = ControlPlaneConfig {
partition_plan: PartitionPlan::row_ranges(4, 2).unwrap(),
..ControlPlaneConfig::default()
};
let mut source = EngineControlPlane::with_config(build_runtime(), config.clone()).unwrap();
let mut partitions = source.partition_checkpoints().unwrap();
partitions.swap(0, 1);
let mut restored = EngineControlPlane::with_config(build_runtime(), config).unwrap();
let err = restored.restore_partitions(&partitions).unwrap_err();
assert!(matches!(
err,
EngineControlPlaneError::PartitionCheckpoint(_)
));
}