use stateright::{Checker, Model, Property};
const MAX_REV: u8 = 3;
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
struct Artifact {
node: u8,
cursor: u8,
has_key: bool,
}
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
enum ExporterPc {
Idle,
Exported(Artifact),
PayloadUp(Artifact),
Done(u8),
}
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
enum FoldStatus {
Synced,
StaleKey,
}
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
enum ImporterPc {
Start,
GotManifest(Artifact),
Imported(Artifact),
CrossCheckFailed,
FetchMissed,
Resumed(Artifact, FoldStatus),
}
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
struct St<const N: usize> {
head: u8,
delete_rev: Option<u8>,
floor: u8,
applied: [u8; N],
exporters: [ExporterPc; N],
payload: Option<Artifact>,
manifest: Option<Artifact>,
uploaded: std::collections::BTreeSet<Artifact>,
regressed: bool,
refused: bool,
importer: ImporterPc,
}
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
enum Act {
Churn,
DeleteKey,
Compact,
Apply(u8),
Export(u8),
UploadPayload(u8),
Publish(u8),
Crash(u8),
NextRound(u8),
ReadManifest,
ReadPayload,
Prune,
Retry,
Resume,
ResumeResyncDegraded,
}
#[derive(Clone, Copy, PartialEq, Eq)]
enum ResyncMode {
None,
Degrade,
FailStop,
}
#[derive(Clone, Copy, PartialEq, Eq)]
enum Mutation {
None,
LwwPointer,
PruneAgeOnly,
SilentClamp,
}
#[derive(Clone)]
struct SnapshotProtocol<const N: usize> {
pointer_swap: bool,
resync: ResyncMode,
mutation: Mutation,
max_rev: u8,
}
impl<const N: usize> SnapshotProtocol<N> {
fn shipped(resync: ResyncMode) -> Self {
Self {
pointer_swap: true,
resync,
mutation: Mutation::None,
max_rev: MAX_REV,
}
}
fn legacy(resync: ResyncMode) -> Self {
Self {
pointer_swap: false,
resync,
mutation: Mutation::None,
max_rev: MAX_REV,
}
}
fn key_present_at(s: &St<N>, cursor: u8) -> bool {
s.delete_rev.is_none_or(|d| cursor < d)
}
fn bucket_has_key(s: &St<N>) -> bool {
s.delete_rev.is_none()
}
fn resume_ok(&self, s: &St<N>, a: Artifact) -> bool {
slipstream::protocol::resume_window_ok(a.cursor as u64, s.floor as u64 + 1)
}
fn relist_only_status(s: &St<N>, a: Artifact) -> FoldStatus {
let marker_evicted = s.delete_rev.is_some_and(|d| d <= s.floor);
if a.has_key && !Self::bucket_has_key(s) && marker_evicted {
FoldStatus::StaleKey
} else {
FoldStatus::Synced
}
}
}
impl<const N: usize> Model for SnapshotProtocol<N> {
type State = St<N>;
type Action = Act;
fn init_states(&self) -> Vec<St<N>> {
vec![St {
head: 0,
delete_rev: None,
floor: 0,
applied: [0; N],
exporters: [ExporterPc::Idle; N],
payload: None,
manifest: None,
uploaded: Default::default(),
regressed: false,
refused: false,
importer: ImporterPc::Start,
}]
}
fn actions(&self, s: &St<N>, acts: &mut Vec<Act>) {
if s.head < self.max_rev {
acts.push(Act::Churn);
if s.delete_rev.is_none() {
acts.push(Act::DeleteKey);
}
}
if s.floor < s.head {
acts.push(Act::Compact);
}
for n in 0..N as u8 {
if s.applied[n as usize] < s.head {
acts.push(Act::Apply(n));
}
match s.exporters[n as usize] {
ExporterPc::Idle if s.applied[n as usize] >= 1 => acts.push(Act::Export(n)),
ExporterPc::Done(c) if s.applied[n as usize] > c => {
acts.push(Act::NextRound(n));
}
ExporterPc::Exported(_) => {
acts.push(Act::UploadPayload(n));
acts.push(Act::Crash(n));
}
ExporterPc::PayloadUp(_) => {
acts.push(Act::Publish(n));
acts.push(Act::Crash(n));
}
_ => {}
}
}
if self.pointer_swap
&& let Some(m) = s.manifest
&& s.uploaded.iter().any(|a| a.cursor < m.cursor)
{
acts.push(Act::Prune);
}
match s.importer {
ImporterPc::Start if s.manifest.is_some() => acts.push(Act::ReadManifest),
ImporterPc::GotManifest(_) => acts.push(Act::ReadPayload),
ImporterPc::CrossCheckFailed | ImporterPc::FetchMissed => acts.push(Act::Retry),
ImporterPc::Imported(a) => {
acts.push(Act::Resume);
if self.resync == ResyncMode::Degrade && !self.resume_ok(s, a) {
acts.push(Act::ResumeResyncDegraded);
}
}
_ => {}
}
}
fn next_state(&self, s: &St<N>, a: Act) -> Option<St<N>> {
let mut s = s.clone();
match a {
Act::Churn => s.head += 1,
Act::DeleteKey => {
s.head += 1;
s.delete_rev = Some(s.head);
}
Act::Compact => s.floor += 1,
Act::Apply(n) => s.applied[n as usize] += 1,
Act::Export(n) => {
let cursor = s.applied[n as usize];
s.exporters[n as usize] = ExporterPc::Exported(Artifact {
node: n,
cursor,
has_key: Self::key_present_at(&s, cursor),
});
}
Act::UploadPayload(n) => {
let ExporterPc::Exported(a) = s.exporters[n as usize] else {
return None;
};
if self.pointer_swap {
s.uploaded.insert(a);
} else {
s.payload = Some(a);
}
s.exporters[n as usize] = ExporterPc::PayloadUp(a);
}
Act::Publish(n) => {
let ExporterPc::PayloadUp(a) = s.exporters[n as usize] else {
return None;
};
if self.pointer_swap {
let observed = match s.manifest {
None => slipstream::protocol::PointerState::Absent,
Some(m) => slipstream::protocol::PointerState::Present {
rank: Some(m.cursor as u64),
},
};
let allowed = self.mutation == Mutation::LwwPointer
|| slipstream::protocol::pointer_publish_allowed(
&observed,
a.cursor as u64,
);
if allowed {
if let Some(m) = s.manifest
&& m.cursor > a.cursor
{
s.regressed = true;
}
s.manifest = Some(a);
} else {
s.refused = true;
}
} else {
if let Some(m) = s.manifest
&& m.cursor > a.cursor
{
s.regressed = true;
}
s.manifest = Some(a);
}
s.exporters[n as usize] = ExporterPc::Done(a.cursor);
}
Act::Crash(n) => {
s.exporters[n as usize] = ExporterPc::Idle;
}
Act::NextRound(n) => {
s.exporters[n as usize] = ExporterPc::Idle;
}
Act::ReadManifest => {
let m = s.manifest?;
s.importer = ImporterPc::GotManifest(m);
}
Act::ReadPayload => {
let ImporterPc::GotManifest(m) = s.importer else {
return None;
};
if self.pointer_swap {
if s.uploaded.contains(&m) {
s.importer = ImporterPc::Imported(m);
} else {
s.importer = ImporterPc::FetchMissed;
}
} else {
match s.payload {
Some(p) if p == m => s.importer = ImporterPc::Imported(p),
_ => s.importer = ImporterPc::CrossCheckFailed,
}
}
}
Act::Prune => {
let keep = s.manifest?;
if self.mutation == Mutation::PruneAgeOnly {
s.uploaded.retain(|a| *a == keep);
} else {
s.uploaded.retain(|a| {
!slipstream::protocol::payload_prunable(
Some(a.cursor as u64),
keep.cursor as u64,
*a == keep,
true,
)
});
}
}
Act::Retry => s.importer = ImporterPc::Start,
Act::Resume => {
let ImporterPc::Imported(a) = s.importer else {
return None;
};
let status = if self.resume_ok(&s, a) {
FoldStatus::Synced
} else if self.mutation == Mutation::SilentClamp {
Self::relist_only_status(&s, a)
} else if self.resync != ResyncMode::None {
FoldStatus::Synced
} else {
Self::relist_only_status(&s, a)
};
s.importer = ImporterPc::Resumed(a, status);
}
Act::ResumeResyncDegraded => {
let ImporterPc::Imported(a) = s.importer else {
return None;
};
if self.resync != ResyncMode::Degrade || self.resume_ok(&s, a) {
return None;
}
s.importer = ImporterPc::Resumed(a, Self::relist_only_status(&s, a));
}
}
Some(s)
}
fn properties(&self) -> Vec<Property<Self>> {
let mut props: Vec<Property<Self>> = Vec::new();
if self.pointer_swap {
props.push(Property::<Self>::always(
"published cursor never regresses",
|_, s| !s.regressed,
));
props.push(Property::<Self>::always(
"monotone pointer: importer never observes a cursor drop",
|_, s| match (s.importer, s.manifest) {
(ImporterPc::GotManifest(g), Some(m))
| (ImporterPc::Imported(g), Some(m))
| (ImporterPc::Resumed(g, _), Some(m)) => g.cursor <= m.cursor,
_ => true,
},
));
props.push(Property::<Self>::always(
"cross-check never fires (torn pair structurally impossible)",
|_, s| s.importer != ImporterPc::CrossCheckFailed,
));
props.push(Property::<Self>::always(
"pointer target always fetchable (write-once before publish)",
|_, s| s.manifest.is_none_or(|m| s.uploaded.contains(&m)),
));
props.push(Property::<Self>::sometimes(
"the swap refuses an older publish (clobber attempt occurs and is stopped)",
|_, s| s.refused,
));
props.push(Property::<Self>::sometimes(
"a prune racing a stale pointer read forces a detected retry",
|_, s| s.importer == ImporterPc::FetchMissed,
));
props.push(Property::<Self>::always(
"a fetch miss only happens on a stale pointer read, never the current one",
|_, s| {
s.importer != ImporterPc::FetchMissed
|| matches!(s.manifest, Some(m) if s.uploaded.contains(&m))
},
));
} else {
props.push(Property::<Self>::sometimes(
"HAZARD reachable: published artifact regresses (slow-exporter clobber)",
|_, s| s.regressed,
));
props.push(Property::<Self>::sometimes(
"HAZARD reachable: importer observes a torn pair (detected, bootstrap outage)",
|_, s| s.importer == ImporterPc::CrossCheckFailed,
));
props.push(Property::<Self>::sometimes(
"regression is non-fatal: a post-regression bootstrap still converges",
|_, s| {
s.regressed && matches!(s.importer, ImporterPc::Resumed(_, FoldStatus::Synced))
},
));
}
props.push(Property::<Self>::always(
"no mixed import: an installed fold is one exporter's exported state",
|_, s| match s.importer {
ImporterPc::Imported(a) | ImporterPc::Resumed(a, _) => {
a.node < N as u8
&& a.cursor >= 1
&& a.cursor <= s.head
&& a.has_key == s.delete_rev.is_none_or(|d| a.cursor < d)
}
_ => true,
},
));
match self.resync {
ResyncMode::FailStop => {
props.push(Property::<Self>::always(
"bootstrap never silently diverges (stale is merely stale)",
|_, s| !matches!(s.importer, ImporterPc::Resumed(_, FoldStatus::StaleKey)),
));
}
ResyncMode::Degrade => {
props.push(Property::<Self>::sometimes(
"HAZARD reachable: armed resync that degrades on error diverges silently",
|_, s| matches!(s.importer, ImporterPc::Resumed(_, FoldStatus::StaleKey)),
));
}
ResyncMode::None => {
props.push(Property::<Self>::sometimes(
"HAZARD reachable: silent stale-key divergence without resync",
|_, s| matches!(s.importer, ImporterPc::Resumed(_, FoldStatus::StaleKey)),
));
}
}
props.push(Property::<Self>::sometimes(
"a bootstrap completes and resumes synced",
|_, s| matches!(s.importer, ImporterPc::Resumed(_, FoldStatus::Synced)),
));
props.push(Property::<Self>::sometimes(
"a publisher runs a second round against its own previous publish",
|_, s| {
s.manifest.is_some_and(|m| {
matches!(
s.exporters[m.node as usize],
ExporterPc::Exported(_) | ExporterPc::PayloadUp(_)
)
})
},
));
if self.resync == ResyncMode::FailStop && self.mutation == Mutation::None {
props.push(Property::<Self>::always(
"every maximal run ends with a completed, synced bootstrap",
|m, s| {
let mut acts = Vec::new();
m.actions(s, &mut acts);
!acts.is_empty()
|| matches!(s.importer, ImporterPc::Resumed(_, FoldStatus::Synced))
},
));
}
props
}
}
fn run<const N: usize>(
model: SnapshotProtocol<N>,
label: &str,
) -> impl Checker<SnapshotProtocol<N>> {
let checker = model.checker().spawn_bfs().join();
println!(
"{label}: {} states, {} unique",
checker.state_count(),
checker.unique_state_count(),
);
checker
}
fn check<const N: usize>(model: SnapshotProtocol<N>, label: &str) {
run(model, label).assert_properties();
}
#[test]
fn shipped_protocol_pointer_swap_failstop_resync() {
check(
SnapshotProtocol::<2>::shipped(ResyncMode::FailStop),
"shipped: pointer-swap + failstop",
);
}
#[test]
fn shipped_protocol_without_resync_still_diverges() {
check(
SnapshotProtocol::<2>::shipped(ResyncMode::None),
"shipped: pointer-swap + no resync",
);
}
#[test]
fn legacy_two_register_transport_has_reachable_hazards() {
check(
SnapshotProtocol::<2>::legacy(ResyncMode::FailStop),
"legacy: two-register + failstop",
);
}
#[test]
fn legacy_degrading_resync_diverges() {
check(
SnapshotProtocol::<2>::legacy(ResyncMode::Degrade),
"legacy: degrade-on-error resync",
);
}
#[test]
fn no_resync_reader_diverges() {
check(
SnapshotProtocol::<2>::legacy(ResyncMode::None),
"legacy: no resync reader",
);
}
#[test]
fn mutation_lww_pointer_is_caught() {
let mut model = SnapshotProtocol::<2>::shipped(ResyncMode::FailStop);
model.mutation = Mutation::LwwPointer;
let checker = run(model, "mutation: lww pointer");
assert!(
checker
.discovery("published cursor never regresses")
.is_some(),
"the checker must produce a regression counterexample when the \
monotonic publish guard is removed"
);
}
#[test]
fn mutation_age_only_prune_is_caught() {
let mut model = SnapshotProtocol::<2>::shipped(ResyncMode::FailStop);
model.mutation = Mutation::PruneAgeOnly;
let checker = run(model, "mutation: age-only prune");
assert!(
checker
.discovery("pointer target always fetchable (write-once before publish)")
.is_some(),
"the checker must produce a dangling-pointer counterexample when \
prune ignores the strictly-below rule (the original design bug)"
);
}
#[test]
fn mutation_silent_clamp_is_caught() {
let mut model = SnapshotProtocol::<2>::shipped(ResyncMode::FailStop);
model.mutation = Mutation::SilentClamp;
let checker = run(model, "mutation: silent clamp");
assert!(
checker
.discovery("bootstrap never silently diverges (stale is merely stale)")
.is_some(),
"the checker must produce a silent-divergence counterexample when \
expiry detection is removed (the live NATS clamp bug class)"
);
}
#[test]
#[ignore = "deep bounds: run in release"]
fn deep_more_revisions() {
let mut model = SnapshotProtocol::<2>::shipped(ResyncMode::FailStop);
model.max_rev = 5;
check(model, "deep: 2 exporters, rev <= 5");
}
#[test]
#[ignore = "deep bounds: run in release"]
fn deep_three_exporters() {
let model = SnapshotProtocol::<3>::shipped(ResyncMode::FailStop);
check(model, "deep: 3 exporters, rev <= 3");
}
#[test]
#[ignore = "deep bounds: run in release"]
fn deep_three_exporters_legacy_hazards() {
let model = SnapshotProtocol::<3>::legacy(ResyncMode::FailStop);
check(model, "deep: 3 exporters, legacy two-register");
}