use stateright::{Checker, Model, Property};
const MAX_REV: u8 = 5;
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
struct St {
head: u8,
delivered: u8,
store_cursor: u8,
q_from: u8,
applied_mem: u8,
holes: bool,
}
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
enum Act {
Churn,
Deliver,
FlushOk,
FlushFail,
CrashRestart,
}
#[derive(Clone, Copy, PartialEq, Eq)]
enum Mutation {
None,
DropFailedBatch,
ResumeFromMemApplied,
}
#[derive(Clone)]
struct AppliedLoop {
mutation: Mutation,
}
impl Model for AppliedLoop {
type State = St;
type Action = Act;
fn init_states(&self) -> Vec<St> {
vec![St {
head: 0,
delivered: 0,
store_cursor: 0,
q_from: 0,
applied_mem: 0,
holes: false,
}]
}
fn actions(&self, s: &St, acts: &mut Vec<Act>) {
if s.head < MAX_REV {
acts.push(Act::Churn);
}
if s.delivered < s.head {
acts.push(Act::Deliver);
}
if s.delivered > s.q_from {
acts.push(Act::FlushOk);
acts.push(Act::FlushFail);
}
acts.push(Act::CrashRestart);
}
fn next_state(&self, s: &St, a: Act) -> Option<St> {
let mut s = s.clone();
match a {
Act::Churn => s.head += 1,
Act::Deliver => s.delivered += 1,
Act::FlushOk => {
if s.q_from > s.store_cursor {
s.holes = true;
}
s.store_cursor = s.delivered;
s.q_from = s.delivered;
s.applied_mem = s.delivered;
}
Act::FlushFail => {
s.applied_mem = s.delivered;
match self.mutation {
Mutation::None | Mutation::ResumeFromMemApplied => {}
Mutation::DropFailedBatch => s.q_from = s.delivered,
}
}
Act::CrashRestart => {
let resume = match self.mutation {
Mutation::ResumeFromMemApplied => s.applied_mem,
_ => s.store_cursor,
};
if resume > s.store_cursor {
s.holes = true;
}
s.delivered = resume;
s.q_from = resume.max(s.store_cursor);
s.applied_mem = resume;
}
}
Some(s)
}
fn properties(&self) -> Vec<Property<Self>> {
let mut props: Vec<Property<Self>> = Vec::new();
if self.mutation == Mutation::None {
props.push(Property::<Self>::always(
"the store's contents are always complete up to its cursor",
|_, s| !s.holes,
));
props.push(Property::<Self>::always(
"the re-queued range is always anchored at the store cursor",
|_, s| s.q_from == s.store_cursor,
));
props.push(Property::<Self>::always(
"quiescence implies the store is at the head",
|_, s| {
let quiesced =
s.head == MAX_REV && s.delivered == s.head && s.q_from == s.delivered;
!quiesced || s.store_cursor == s.head
},
));
props.push(Property::<Self>::sometimes(
"a failed flush is followed by a cumulative successful commit",
|_, s| s.store_cursor > 0 && s.applied_mem == s.store_cursor && s.head > 1,
));
props.push(Property::<Self>::sometimes(
"the in-memory cursor runs ahead of the store across a failure",
|_, s| s.applied_mem > s.store_cursor,
));
} else {
props.push(Property::<Self>::sometimes(
"HAZARD reachable: cursor advances over dropped data",
|_, s| s.holes,
));
}
props
}
}
fn check(mutation: Mutation, label: &str) {
let model = AppliedLoop { mutation };
let checker = model.checker().spawn_bfs().join();
println!(
"{label}: {} states, {} unique",
checker.state_count(),
checker.unique_state_count(),
);
checker.assert_properties();
}
#[test]
fn shipped_flush_semantics_preserve_cursor_authority() {
check(Mutation::None, "applied: shipped");
}
#[test]
fn dropping_failed_batches_corrupts_the_fold() {
check(Mutation::DropFailedBatch, "applied: drop-on-fail (pre-fix)");
}
#[test]
fn resuming_from_memory_cursor_corrupts_the_fold() {
check(
Mutation::ResumeFromMemApplied,
"applied: resume-from-memory",
);
}