use crate::determinism::{replay_consistent, DeterminismMode};
use crate::effect::{EffectHandler, RecordingEffectHandler};
use crate::engine::{ProtocolMachine, ProtocolMachineError};
use serde::{Deserialize, Serialize};
use std::io::Cursor;
fn encode_snapshot(machine: &ProtocolMachine) -> Result<Vec<u8>, ProtocolMachineError> {
let mut bytes = Vec::new();
ciborium::into_writer(machine, &mut bytes).map_err(|e| {
ProtocolMachineError::PersistenceError(format!("integration snapshot encode failed: {e}"))
})?;
Ok(bytes)
}
fn decode_snapshot(bytes: &[u8]) -> Result<ProtocolMachine, ProtocolMachineError> {
ciborium::from_reader(Cursor::new(bytes)).map_err(|e| {
ProtocolMachineError::PersistenceError(format!("integration snapshot decode failed: {e}"))
})
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct LoadedProtocolMachineReplayConformance {
pub determinism_mode: DeterminismMode,
pub replay_consistent: bool,
pub config_mode_consistent: bool,
pub exact_trace_match: bool,
pub exact_effect_trace_match: bool,
pub recorded_effect_count: usize,
pub baseline_event_count: usize,
pub replay_event_count: usize,
}
pub fn run_loaded_protocol_machine_record_replay_conformance(
machine: &mut ProtocolMachine,
handler: &dyn EffectHandler,
max_steps: usize,
) -> Result<LoadedProtocolMachineReplayConformance, ProtocolMachineError> {
let snapshot = encode_snapshot(machine)?;
let recording = RecordingEffectHandler::new(handler);
machine.run(&recording, max_steps)?;
let recorded_effects = recording.effect_trace();
let baseline_trace = machine.trace().to_vec();
let baseline_effect_trace = machine.effect_trace().to_vec();
let determinism_mode = machine.config().determinism_mode;
let mut replay_vm = decode_snapshot(&snapshot)?;
replay_vm.run_replay(handler, &recorded_effects, max_steps)?;
let replay_trace = replay_vm.trace().to_vec();
let replay_effect_trace = replay_vm.effect_trace().to_vec();
let replay_mode_consistent = replay_consistent(
DeterminismMode::Replay,
&baseline_trace,
&replay_trace,
&baseline_effect_trace,
&replay_effect_trace,
);
let config_mode_consistent = replay_consistent(
determinism_mode,
&baseline_trace,
&replay_trace,
&baseline_effect_trace,
&replay_effect_trace,
);
Ok(LoadedProtocolMachineReplayConformance {
determinism_mode,
replay_consistent: replay_mode_consistent,
config_mode_consistent,
exact_trace_match: baseline_trace == replay_trace,
exact_effect_trace_match: baseline_effect_trace == replay_effect_trace,
recorded_effect_count: recorded_effects.len(),
baseline_event_count: baseline_trace.len(),
replay_event_count: replay_trace.len(),
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::coroutine::Value;
use crate::durable::WalSyncRequest;
use crate::effect::{
EffectFailure, EffectResult, SendDecision, SendDecisionInput, TopologyPerturbation,
};
use crate::engine::ProtocolMachineConfig;
use crate::loader::CodeImage;
use crate::output_condition::OutputConditionHint;
use std::collections::BTreeMap;
use telltale_types::{GlobalType, Label, LocalTypeR};
struct DeterministicHandler;
impl EffectHandler for DeterministicHandler {
fn handle_send(
&self,
_role: &str,
_partner: &str,
_label: &str,
_state: &[Value],
) -> EffectResult<Value> {
EffectResult::success(Value::Nat(1))
}
fn send_decision(&self, input: SendDecisionInput<'_>) -> EffectResult<SendDecision> {
EffectResult::success(SendDecision::Deliver(input.payload.unwrap_or(Value::Unit)))
}
fn handle_recv(
&self,
_role: &str,
_partner: &str,
_label: &str,
_state: &mut Vec<Value>,
_payload: &Value,
) -> EffectResult<()> {
EffectResult::success(())
}
fn handle_choose(
&self,
_role: &str,
_partner: &str,
labels: &[String],
_state: &[Value],
) -> EffectResult<String> {
match labels.first().cloned() {
Some(label) => EffectResult::success(label),
None => EffectResult::failure(EffectFailure::invalid_input("no labels available")),
}
}
fn step(&self, _role: &str, _state: &mut Vec<Value>) -> EffectResult<()> {
EffectResult::success(())
}
}
struct DeterministicInternalEffectHandler;
impl EffectHandler for DeterministicInternalEffectHandler {
fn handle_send(
&self,
_role: &str,
_partner: &str,
_label: &str,
_state: &[Value],
) -> EffectResult<Value> {
EffectResult::success(Value::Nat(1))
}
fn send_decision(&self, input: SendDecisionInput<'_>) -> EffectResult<SendDecision> {
EffectResult::success(SendDecision::Deliver(input.payload.unwrap_or(Value::Unit)))
}
fn handle_recv(
&self,
_role: &str,
_partner: &str,
_label: &str,
_state: &mut Vec<Value>,
_payload: &Value,
) -> EffectResult<()> {
EffectResult::success(())
}
fn handle_choose(
&self,
_role: &str,
_partner: &str,
labels: &[String],
_state: &[Value],
) -> EffectResult<String> {
match labels.first().cloned() {
Some(label) => EffectResult::success(label),
None => EffectResult::failure(EffectFailure::invalid_input("no labels available")),
}
}
fn step(&self, _role: &str, _state: &mut Vec<Value>) -> EffectResult<()> {
EffectResult::success(())
}
fn topology_events(&self, tick: u64) -> EffectResult<Vec<TopologyPerturbation>> {
let events = match tick {
1 => vec![TopologyPerturbation::Partition {
from: "A".to_string(),
to: "B".to_string(),
}],
2 => vec![TopologyPerturbation::Heal {
from: "A".to_string(),
to: "B".to_string(),
}],
_ => Vec::new(),
};
EffectResult::success(events)
}
fn output_condition_hint(
&self,
sid: usize,
role: &str,
_state: &[Value],
) -> Option<OutputConditionHint> {
Some(OutputConditionHint {
predicate_ref: "machine.integration.internal_effects".to_string(),
witness_ref: Some(format!("sid:{sid}:role:{role}")),
})
}
fn supports_wal_sync(&self) -> bool {
true
}
fn wal_sync(&self, _sync: &WalSyncRequest) -> EffectResult<()> {
EffectResult::success(())
}
}
fn simple_send_recv_image() -> CodeImage {
let mut local_types = BTreeMap::new();
local_types.insert(
"A".to_string(),
LocalTypeR::Send {
partner: "B".into(),
branches: vec![(Label::new("msg"), None, LocalTypeR::End)],
},
);
local_types.insert(
"B".to_string(),
LocalTypeR::Recv {
partner: "A".into(),
branches: vec![(Label::new("msg"), None, LocalTypeR::End)],
},
);
let global = GlobalType::send("A", "B", Label::new("msg"), GlobalType::End);
CodeImage::from_local_types(&local_types, &global)
}
#[test]
fn loaded_protocol_machine_harness_reports_replay_conformance() {
let image = simple_send_recv_image();
let mut machine = ProtocolMachine::new(ProtocolMachineConfig::default());
machine
.load_choreography(&image)
.expect("load choreography");
let report = run_loaded_protocol_machine_record_replay_conformance(
&mut machine,
&DeterministicHandler,
100,
)
.expect("harness run should succeed");
assert!(report.replay_consistent);
assert!(report.config_mode_consistent);
assert!(report.exact_trace_match);
assert!(report.exact_effect_trace_match);
assert!(report.recorded_effect_count > 0);
assert!(report.baseline_event_count > 0);
}
#[test]
fn loaded_protocol_machine_harness_preserves_internal_effect_replay_exactness() {
let image = simple_send_recv_image();
let mut machine = ProtocolMachine::new(ProtocolMachineConfig::default());
machine
.load_choreography(&image)
.expect("load choreography");
let report = run_loaded_protocol_machine_record_replay_conformance(
&mut machine,
&DeterministicInternalEffectHandler,
100,
)
.expect("harness run should succeed");
assert!(report.replay_consistent);
assert!(report.config_mode_consistent);
assert!(report.exact_trace_match);
assert!(report.exact_effect_trace_match);
assert!(report.recorded_effect_count > 0);
}
}