#![allow(missing_docs)]
#![cfg(not(target_arch = "wasm32"))]
#![allow(clippy::needless_collect, clippy::let_underscore_must_use)]
#[allow(dead_code, unreachable_pub)]
#[path = "support/mod.rs"]
mod test_support;
use std::collections::{BTreeMap, HashSet};
use std::time::Duration;
use assert_matches::assert_matches;
use telltale_machine::buffer::BufferConfig;
use telltale_machine::coroutine::{CoroStatus, Fault};
use telltale_machine::instr::Endpoint;
use telltale_machine::model::effects::{
EffectHandler, EffectResult, SendDecision, SendDecisionInput, TopologyPerturbation,
};
use telltale_machine::model::output_condition::OutputConditionHint;
use telltale_machine::model::state::{
AuthorityArtifact, AuthorityAuditEvent, OwnershipScope, SessionStatus, SessionStore,
};
use telltale_machine::{ObsEvent, ProtocolMachine, ProtocolMachineConfig};
use telltale_types::LocalTypeR;
use test_support::PassthroughHandler;
fn single_role_end_image(
program: Vec<telltale_machine::instr::Instr>,
) -> telltale_machine::runtime::loader::CodeImage {
use telltale_types::GlobalType;
telltale_machine::runtime::loader::CodeImage {
programs: {
let mut m = BTreeMap::new();
m.insert("A".to_string(), program);
m
},
global_type: GlobalType::End,
local_types: {
let mut m = BTreeMap::new();
m.insert("A".to_string(), LocalTypeR::End);
m
},
}
}
#[derive(Debug, Clone, Copy)]
struct KnowledgePayloadHandler;
impl EffectHandler for KnowledgePayloadHandler {
fn handle_send(
&self,
_role: &str,
_partner: &str,
_label: &str,
_state: &[telltale_machine::coroutine::Value],
) -> EffectResult<telltale_machine::coroutine::Value> {
EffectResult::success(telltale_machine::coroutine::Value::Unit)
}
fn send_decision(&self, input: SendDecisionInput<'_>) -> EffectResult<SendDecision> {
EffectResult::success(SendDecision::Deliver(
telltale_machine::coroutine::Value::Prod(
Box::new(telltale_machine::coroutine::Value::Endpoint(Endpoint {
sid: input.sid,
role: input.role.to_string(),
})),
Box::new(telltale_machine::coroutine::Value::Str(
"secret".to_string(),
)),
),
))
}
fn handle_recv(
&self,
_role: &str,
_partner: &str,
_label: &str,
_state: &mut Vec<telltale_machine::coroutine::Value>,
_payload: &telltale_machine::coroutine::Value,
) -> EffectResult<()> {
EffectResult::success(())
}
fn handle_choose(
&self,
_role: &str,
_partner: &str,
labels: &[String],
_state: &[telltale_machine::coroutine::Value],
) -> EffectResult<String> {
match labels.first().cloned() {
Some(label) => EffectResult::success(label),
None => EffectResult::failure(
telltale_machine::model::effects::EffectFailure::invalid_input(
"no labels available",
),
),
}
}
fn step(
&self,
_role: &str,
_state: &mut Vec<telltale_machine::coroutine::Value>,
) -> EffectResult<()> {
EffectResult::success(())
}
}
#[derive(Debug, Clone, Copy)]
struct HintedInvokeHandler;
impl EffectHandler for HintedInvokeHandler {
fn handle_send(
&self,
_role: &str,
_partner: &str,
_label: &str,
_state: &[telltale_machine::coroutine::Value],
) -> EffectResult<telltale_machine::coroutine::Value> {
EffectResult::success(telltale_machine::coroutine::Value::Unit)
}
fn handle_recv(
&self,
_role: &str,
_partner: &str,
_label: &str,
_state: &mut Vec<telltale_machine::coroutine::Value>,
_payload: &telltale_machine::coroutine::Value,
) -> EffectResult<()> {
EffectResult::success(())
}
fn handle_choose(
&self,
_role: &str,
_partner: &str,
labels: &[String],
_state: &[telltale_machine::coroutine::Value],
) -> EffectResult<String> {
match labels.first().cloned() {
Some(label) => EffectResult::success(label),
None => EffectResult::failure(
telltale_machine::model::effects::EffectFailure::invalid_input(
"no labels available",
),
),
}
}
fn step(
&self,
_role: &str,
_state: &mut Vec<telltale_machine::coroutine::Value>,
) -> EffectResult<()> {
EffectResult::success(())
}
fn output_condition_hint(
&self,
sid: usize,
role: &str,
_state: &[telltale_machine::coroutine::Value],
) -> Option<OutputConditionHint> {
Some(OutputConditionHint {
predicate_ref: "machine.custom.observable".to_string(),
witness_ref: Some(format!("sid:{sid}:role:{role}")),
})
}
}
#[derive(Debug, Clone, Copy)]
struct TimeoutAtTickOneHandler;
impl EffectHandler for TimeoutAtTickOneHandler {
fn handle_send(
&self,
_role: &str,
_partner: &str,
_label: &str,
_state: &[telltale_machine::coroutine::Value],
) -> EffectResult<telltale_machine::coroutine::Value> {
EffectResult::success(telltale_machine::coroutine::Value::Unit)
}
fn handle_recv(
&self,
_role: &str,
_partner: &str,
_label: &str,
_state: &mut Vec<telltale_machine::coroutine::Value>,
_payload: &telltale_machine::coroutine::Value,
) -> EffectResult<()> {
EffectResult::success(())
}
fn handle_choose(
&self,
_role: &str,
_partner: &str,
labels: &[String],
_state: &[telltale_machine::coroutine::Value],
) -> EffectResult<String> {
match labels.first().cloned() {
Some(label) => EffectResult::success(label),
None => EffectResult::failure(
telltale_machine::model::effects::EffectFailure::invalid_input(
"no labels available",
),
),
}
}
fn step(
&self,
_role: &str,
_state: &mut Vec<telltale_machine::coroutine::Value>,
) -> EffectResult<()> {
EffectResult::success(())
}
fn topology_events(&self, tick: u64) -> EffectResult<Vec<TopologyPerturbation>> {
if tick == 1 {
EffectResult::success(vec![TopologyPerturbation::Timeout {
site: "A".to_string(),
duration: Duration::from_millis(20),
}])
} else {
EffectResult::success(Vec::new())
}
}
}
#[test]
fn test_lean_session_coherent() {
let image = test_support::simple_send_recv_image("A", "B", "msg");
let mut machine = ProtocolMachine::new(ProtocolMachineConfig::default());
let sid = machine.load_choreography(&image).unwrap();
let handler = PassthroughHandler;
let ep_a = Endpoint {
sid,
role: "A".into(),
};
let ep_b = Endpoint {
sid,
role: "B".into(),
};
assert_matches!(
machine.sessions().lookup_type(&ep_a),
Some(LocalTypeR::Send { .. })
);
assert_matches!(
machine.sessions().lookup_type(&ep_b),
Some(LocalTypeR::Recv { .. })
);
machine.run(&handler, 100).unwrap();
assert!(machine.sessions().lookup_type(&ep_a).is_none());
assert!(machine.sessions().lookup_type(&ep_b).is_none());
}
#[test]
fn test_lean_session_ns_disjoint() {
let image1 = test_support::simple_send_recv_image("A", "B", "msg");
let image2 = test_support::simple_send_recv_image("A", "B", "data");
let mut machine = ProtocolMachine::new(ProtocolMachineConfig::default());
let sid1 = machine.load_choreography(&image1).unwrap();
let sid2 = machine.load_choreography(&image2).unwrap();
let ep1 = Endpoint {
sid: sid1,
role: "A".into(),
};
let ep2 = Endpoint {
sid: sid2,
role: "A".into(),
};
assert!(machine.sessions().lookup_type(&ep1).is_some());
assert!(machine.sessions().lookup_type(&ep2).is_some());
assert_ne!(sid1, sid2);
}
#[test]
fn test_lean_conservation_inv_preserved() {
let image = test_support::simple_send_recv_image("A", "B", "msg");
let mut machine = ProtocolMachine::new(ProtocolMachineConfig::default());
let sid = machine.load_choreography(&image).unwrap();
let handler = PassthroughHandler;
let count_before = machine.sessions().get(sid).unwrap().local_types.len();
assert_eq!(count_before, 2);
let _ = machine.step(&handler);
let session = machine.sessions().get(sid).unwrap();
assert!(session.local_types.len() <= count_before);
}
#[test]
fn test_lean_close_empty() {
let mut store = SessionStore::new();
let sid = store.open(
vec!["A".into(), "B".into()],
&BufferConfig::default(),
&BTreeMap::new(),
);
store.close(sid).unwrap();
let session = store.get(sid).unwrap();
for buf in session.buffers.values() {
assert!(buf.is_empty());
}
}
#[test]
fn test_lean_leave_preserves_coherent() {
let image = test_support::simple_send_recv_image("A", "B", "msg");
let mut machine = ProtocolMachine::new(ProtocolMachineConfig::default());
let sid = machine.load_choreography(&image).unwrap();
let handler = PassthroughHandler;
machine.run(&handler, 100).unwrap();
let session = machine.sessions().get(sid).unwrap();
assert!(session.local_types.is_empty());
}
#[test]
fn test_lean_transport_fifo() {
let image = test_support::recursive_send_recv_image("A", "B", "msg");
let mut machine = ProtocolMachine::new(ProtocolMachineConfig::default());
let sid = machine.load_choreography(&image).unwrap();
let handler = PassthroughHandler;
let _ = machine.run(&handler, 200);
let mut sent_order: Vec<String> = Vec::new();
let mut recv_order: Vec<String> = Vec::new();
for event in machine.trace() {
match event {
ObsEvent::Sent {
session,
from,
to,
label,
..
} if *session == sid && from == "A" && to == "B" => {
sent_order.push(label.clone());
}
ObsEvent::Received {
session,
from,
to,
label,
..
} if *session == sid && from == "A" && to == "B" => {
recv_order.push(label.clone());
}
_ => {}
}
}
for (i, r) in recv_order.iter().enumerate() {
assert_eq!(r, &sent_order[i], "FIFO violated at index {i}");
}
}
#[test]
fn test_lean_transport_no_dup() {
let image = test_support::simple_send_recv_image("A", "B", "msg");
let mut machine = ProtocolMachine::new(ProtocolMachineConfig::default());
let sid = machine.load_choreography(&image).unwrap();
let handler = PassthroughHandler;
machine.run(&handler, 100).unwrap();
let sent_count = machine
.trace()
.iter()
.filter(|e| matches!(e, ObsEvent::Sent { session, .. } if *session == sid))
.count();
let recv_count = machine
.trace()
.iter()
.filter(|e| matches!(e, ObsEvent::Received { session, .. } if *session == sid))
.count();
assert!(recv_count <= sent_count, "more receives than sends");
}
#[test]
fn test_lean_transport_no_create() {
let image = test_support::simple_send_recv_image("A", "B", "msg");
let mut machine = ProtocolMachine::new(ProtocolMachineConfig::default());
let sid = machine.load_choreography(&image).unwrap();
let handler = PassthroughHandler;
machine.run(&handler, 100).unwrap();
let mut sent_edges: Vec<(String, String, String)> = Vec::new();
let mut recv_edges: Vec<(String, String, String)> = Vec::new();
for event in machine.trace() {
match event {
ObsEvent::Sent {
session,
from,
to,
label,
..
} if *session == sid => {
sent_edges.push((from.clone(), to.clone(), label.clone()));
}
ObsEvent::Received {
session,
from,
to,
label,
..
} if *session == sid => {
recv_edges.push((from.clone(), to.clone(), label.clone()));
}
_ => {}
}
}
for r in &recv_edges {
assert!(sent_edges.contains(r), "received without prior send: {r:?}");
}
}
#[test]
fn test_lean_send_receive_signature_verification() {
let image = test_support::simple_send_recv_image("A", "B", "msg");
let mut machine = ProtocolMachine::new(ProtocolMachineConfig::default());
let sid = machine.load_choreography(&image).unwrap();
let handler = PassthroughHandler;
let mut has_payload = false;
for _ in 0..8 {
let _ = machine.step(&handler).expect("step before tamper");
let queued = machine
.sessions()
.get(sid)
.expect("session exists")
.has_message("A", "B");
if queued {
has_payload = true;
break;
}
}
assert!(has_payload, "expected queued A->B payload before tamper");
let sess = machine.sessions_mut().get_mut(sid).expect("session exists");
let mut signed = sess
.recv_signed("A", "B")
.expect("signed payload must exist");
signed.signature.signer = telltale_machine::verification::VerifyingKey([0_u8; 32]);
let _ = sess
.send_signed("A", "B", &signed)
.expect("re-enqueue tampered payload");
let result = machine.run(&handler, 32);
assert_matches!(
result,
Err(telltale_machine::ProtocolMachineError::Fault {
fault: Fault::VerificationFailed { .. },
..
})
);
}
#[test]
fn test_lean_offer_choose_label_alignment() {
let image = test_support::choice_image("A", "B", &["yes", "no"]);
let mut machine = ProtocolMachine::new(ProtocolMachineConfig::default());
let sid = machine.load_choreography(&image).unwrap();
let handler = PassthroughHandler;
machine.run(&handler, 100).unwrap();
let offered: Vec<String> = machine
.trace()
.iter()
.filter_map(|e| match e {
ObsEvent::Offered { edge, label, .. } if edge.sid == sid => Some(label.clone()),
_ => None,
})
.collect();
let chose: Vec<String> = machine
.trace()
.iter()
.filter_map(|e| match e {
ObsEvent::Chose { edge, label, .. } if edge.sid == sid => Some(label.clone()),
_ => None,
})
.collect();
assert!(!offered.is_empty(), "expected offered events");
assert_eq!(offered, chose, "offered/chose label traces diverged");
}
#[test]
fn test_lean_open_close_session_state_transitions() {
use telltale_machine::buffer::BufferConfig;
use telltale_types::{Label, LocalTypeR};
let mut local_types = BTreeMap::new();
local_types.insert(
"A".to_string(),
LocalTypeR::Send {
partner: "B".into(),
branches: vec![(Label::new("msg"), None, LocalTypeR::End)],
},
);
local_types.insert(
"B".to_string(),
LocalTypeR::Recv {
partner: "A".into(),
branches: vec![(Label::new("msg"), None, LocalTypeR::End)],
},
);
let mut store = SessionStore::new();
let sid = store.open(
vec!["A".to_string(), "B".to_string()],
&BufferConfig::default(),
&local_types,
);
let opened = store.get(sid).expect("session exists after open");
assert_eq!(opened.status, SessionStatus::Active);
assert!(
opened.buffers.values().all(|buf| buf.is_empty()),
"newly opened session must start with empty buffers"
);
store.close(sid).expect("close session");
let session = store.get(sid).expect("session exists after close");
assert_eq!(session.status, SessionStatus::Closed);
assert!(
session.buffers.values().all(|buf| buf.is_empty()),
"expected drained buffers after close"
);
}
#[test]
fn test_lean_transfer_endpoint_movement() {
use telltale_machine::instr::{ImmValue, Instr};
use telltale_machine::runtime::loader::CodeImage;
use telltale_types::GlobalType;
let mut local_types = BTreeMap::new();
local_types.insert("A".to_string(), LocalTypeR::End);
local_types.insert("B".to_string(), LocalTypeR::End);
let mut programs = BTreeMap::new();
programs.insert(
"A".to_string(),
vec![
Instr::Set {
dst: 1,
val: ImmValue::Nat(1),
},
Instr::Transfer {
endpoint: 0,
target: 1,
bundle: 2,
},
Instr::Halt,
],
);
programs.insert("B".to_string(), vec![Instr::Halt]);
let image = CodeImage {
programs,
global_type: GlobalType::End,
local_types,
};
let mut machine = ProtocolMachine::new(ProtocolMachineConfig::default());
let sid = machine.load_choreography(&image).unwrap();
machine.run(&PassthroughHandler, 100).unwrap();
assert!(
machine.trace().iter().any(
|e| matches!(e, ObsEvent::Transferred { session, from, to, .. }
if *session == sid && *from == 0 && *to == 1)
),
"expected transferred event from coro 0 to coro 1"
);
let ep_a = Endpoint {
sid,
role: "A".to_string(),
};
let coros = machine.session_coroutines(sid);
let source = coros
.iter()
.find(|c| c.id == 0)
.expect("source coroutine exists");
let target = coros
.iter()
.find(|c| c.id == 1)
.expect("target coroutine exists");
assert!(
!source.owned_endpoints.contains(&ep_a),
"source should no longer own transferred endpoint"
);
assert!(
target.owned_endpoints.contains(&ep_a),
"target should own transferred endpoint"
);
}
#[test]
fn test_lean_tag_check_epistemic_behavior() {
use telltale_machine::coroutine::Value;
use telltale_machine::instr::{ImmValue, Instr};
let mut image = test_support::simple_send_recv_image("A", "B", "msg");
image.programs.insert(
"A".to_string(),
vec![Instr::Send { chan: 0, val: 1 }, Instr::Halt],
);
image.programs.insert(
"B".to_string(),
vec![
Instr::Receive { chan: 0, dst: 1 },
Instr::Tag { fact: 1, dst: 2 },
Instr::Set {
dst: 3,
val: ImmValue::Str("Observer".to_string()),
},
Instr::Check {
knowledge: 1,
target: 3,
dst: 4,
},
Instr::Halt,
],
);
let mut machine = ProtocolMachine::new(ProtocolMachineConfig::default());
let sid = machine.load_choreography(&image).unwrap();
machine.run(&KnowledgePayloadHandler, 100).unwrap();
assert!(
machine
.trace()
.iter()
.any(|e| matches!(e, ObsEvent::Tagged { session, fact, .. }
if *session == sid && fact == "secret")),
"expected tagged event with transported fact"
);
assert!(
machine.trace().iter().any(
|e| matches!(e, ObsEvent::Checked { session, target, permitted, .. }
if *session == sid && target == "Observer" && *permitted)
),
"expected permitted checked event"
);
let b = machine.coroutine(1).expect("B coroutine exists");
assert_eq!(b.regs[4], Value::Bool(true));
}
#[test]
fn test_lean_acquire_release_guard_behavior() {
use telltale_machine::instr::Instr;
let image = single_role_end_image(vec![
Instr::Acquire {
layer: "auth".to_string(),
dst: 1,
},
Instr::Release {
layer: "auth".to_string(),
evidence: 1,
},
Instr::Halt,
]);
let mut machine = ProtocolMachine::new(ProtocolMachineConfig::default());
machine.load_choreography(&image).unwrap();
machine.run(&PassthroughHandler, 100).unwrap();
assert!(
machine
.trace()
.iter()
.any(|e| matches!(e, ObsEvent::Acquired { layer, .. } if layer == "auth")),
"expected acquired event"
);
assert!(
machine
.trace()
.iter()
.any(|e| matches!(e, ObsEvent::Released { layer, .. } if layer == "auth")),
"expected released event"
);
}
#[test]
fn test_lean_invoke_and_output_condition_hint_behavior() {
use telltale_machine::instr::{Instr, InvokeAction};
use telltale_machine::model::output_condition::OutputConditionPolicy;
let image = single_role_end_image(vec![
Instr::Invoke {
action: InvokeAction::Reg(0),
},
Instr::Halt,
]);
let cfg = ProtocolMachineConfig {
output_condition_policy: OutputConditionPolicy::PredicateAllowList(vec![
"machine.custom.observable".to_string(),
]),
..ProtocolMachineConfig::default()
};
let mut machine = ProtocolMachine::new(cfg);
machine.load_choreography(&image).unwrap();
machine.run(&HintedInvokeHandler, 100).unwrap();
assert!(
machine
.trace()
.iter()
.any(|e| matches!(e, ObsEvent::Invoked { .. })),
"expected invoked event"
);
let checks = machine.output_condition_checks();
assert!(!checks.is_empty(), "expected output-condition checks");
assert_eq!(checks[0].meta.predicate_ref, "machine.custom.observable");
assert_eq!(
checks[0].meta.witness_ref.as_deref(),
Some("sid:0:role:A"),
"expected witness ref from handler hint"
);
assert!(
checks[0].passed,
"allowlist policy should accept custom predicate"
);
}
#[test]
fn test_lean_failure_branch_and_terminal_fault_ordering() {
use telltale_machine::instr::{ImmValue, Instr};
let image = single_role_end_image(vec![
Instr::Set {
dst: 99,
val: ImmValue::Nat(7),
},
Instr::Halt,
]);
let mut machine = ProtocolMachine::new(ProtocolMachineConfig::default());
let sid = machine.load_choreography(&image).unwrap();
let err = machine.step(&PassthroughHandler);
assert!(err.is_err(), "register-bounds violation must fault");
let failure_idx = machine
.trace()
.iter()
.position(|event| {
matches!(event, ObsEvent::FailureBranchEntered { session, .. } if *session == sid)
})
.expect("failure branch event");
let fault_idx = machine
.trace()
.iter()
.position(|event| matches!(event, ObsEvent::Faulted { .. }))
.expect("faulted event");
assert!(
failure_idx < fault_idx,
"failure-branch event must precede terminal fault event"
);
}
#[test]
fn test_lean_authority_timeout_and_cancellation_trace_behavior() {
use telltale_machine::instr::Instr;
let image = single_role_end_image(vec![Instr::Halt]);
let mut machine = ProtocolMachine::new(ProtocolMachineConfig::default());
let owned = machine
.load_choreography_owned(&image, "owner/a")
.expect("load owned choreography");
let step = machine
.step(&TimeoutAtTickOneHandler)
.expect("timeout ingress should not fault");
assert!(
matches!(step, telltale_machine::StepResult::Stuck),
"timeout ingress should block scheduling"
);
assert!(
machine
.trace()
.iter()
.any(|event| matches!(event, ObsEvent::TimeoutIssued { site, .. } if site == "A")),
"expected explicit timeout event"
);
assert!(
machine.authority_audit_log().iter().any(|record| matches!(
(&record.artifact, record.event),
(AuthorityArtifact::Timeout(_), AuthorityAuditEvent::Issued)
)),
"expected issued timeout witness in authority audit log"
);
assert!(
machine
.semantic_objects()
.outstanding_effects
.iter()
.any(|effect| effect.effect_kind == "topology_event"),
"timeout ingress should create explicit outstanding-effect state"
);
let sid = owned.session_id();
let receipt = owned
.begin_transfer(&mut machine, "owner/b", OwnershipScope::Session)
.expect("stage transfer");
let cancellation = owned
.cancel_abandoned_transfer(&mut machine, &receipt)
.expect("cancel abandoned transfer");
let trace = machine.trace();
let requested_idx = trace
.iter()
.position(|event| {
matches!(event, ObsEvent::CancellationRequested { session, witness_id, .. }
if *session == sid && *witness_id == cancellation.witness_id)
})
.expect("cancellation requested event");
let cancelled_idx = trace
.iter()
.position(|event| {
matches!(event, ObsEvent::Cancelled { session, witness_id, .. }
if *session == sid && *witness_id == cancellation.witness_id)
})
.expect("cancelled event");
let terminal_idx = trace
.iter()
.position(|event| {
matches!(event, ObsEvent::SessionTerminal { session, .. }
if *session == sid)
})
.expect("session terminal event");
assert!(
requested_idx < cancelled_idx && cancelled_idx < terminal_idx,
"cancellation request, completion, and terminal reason must stay ordered"
);
}
#[test]
fn test_lean_authority_evidence_rejection_behavior() {
let mut store = SessionStore::new();
let mut local_types = BTreeMap::new();
local_types.insert("A".to_string(), LocalTypeR::End);
local_types.insert("B".to_string(), LocalTypeR::End);
let sid = store.open(
vec!["A".into(), "B".into()],
&BufferConfig::default(),
&local_types,
);
let owner = store
.claim_ownership(sid, "owner/a", OwnershipScope::Session)
.expect("claim ownership");
let witness = store
.issue_readiness_witness(&owner, "commit.ready")
.expect("issue readiness witness");
let forged = telltale_machine::model::state::ReadinessWitness {
predicate_ref: "forged.ready".to_string(),
..witness.clone()
};
let forged_err = store
.consume_readiness_witness(&owner, &forged)
.expect_err("forged witness must fail");
assert!(
matches!(
forged_err,
telltale_machine::model::state::OwnershipError::InvalidWitness { .. }
),
"forged witness must fail with InvalidWitness"
);
store
.consume_readiness_witness(&owner, &witness)
.expect("consume readiness witness once");
let reused = store
.consume_readiness_witness(&owner, &witness)
.expect_err("reused witness must fail");
assert!(
matches!(
reused,
telltale_machine::model::state::OwnershipError::WitnessConsumed { .. }
),
"double-consume must fail with WitnessConsumed"
);
let audit = store.authority_audit_log(sid).expect("authority audit log");
assert!(
audit
.iter()
.any(|record| record.event == AuthorityAuditEvent::Consumed),
"expected consumed readiness audit record"
);
let rejected = audit
.iter()
.filter(|record| record.event == AuthorityAuditEvent::Rejected)
.count();
assert!(
rejected >= 2,
"expected rejection audit records for reused and forged evidence"
);
}
#[test]
fn test_lean_control_and_spawn_behavior() {
use telltale_machine::coroutine::Value;
use telltale_machine::instr::{ImmValue, Instr};
let image = single_role_end_image(vec![
Instr::Set {
dst: 1,
val: ImmValue::Nat(42),
},
Instr::Spawn {
target: 6,
args: vec![1],
},
Instr::Jump { target: 4 },
Instr::Set {
dst: 2,
val: ImmValue::Nat(999),
},
Instr::Yield,
Instr::Halt,
Instr::Move { dst: 2, src: 0 },
Instr::Halt,
]);
let mut machine = ProtocolMachine::new(ProtocolMachineConfig::default());
let sid = machine.load_choreography(&image).unwrap();
machine.run(&PassthroughHandler, 100).unwrap();
let coros = machine.session_coroutines(sid);
assert_eq!(coros.len(), 2, "spawn should create one child coroutine");
assert!(
coros.iter().all(|c| c.is_terminal()),
"both coroutines should halt"
);
let child = coros
.iter()
.find(|c| c.id == 1)
.expect("spawned child coroutine exists");
assert_eq!(
child.regs[2],
Value::Nat(42),
"spawn arg copy + move should preserve parent value"
);
}
#[test]
fn test_lean_fork_join_abort_speculation_behavior() {
use telltale_machine::instr::{ImmValue, Instr};
let image = single_role_end_image(vec![
Instr::Set {
dst: 1,
val: ImmValue::Nat(9),
},
Instr::Fork { ghost: 1 },
Instr::Join,
Instr::Fork { ghost: 1 },
Instr::Abort,
Instr::Halt,
]);
let cfg = ProtocolMachineConfig {
speculation_enabled: true,
..ProtocolMachineConfig::default()
};
let mut machine = ProtocolMachine::new(cfg);
machine.load_choreography(&image).unwrap();
machine.run(&PassthroughHandler, 100).unwrap();
assert!(
machine
.trace()
.iter()
.any(|e| matches!(e, ObsEvent::Forked { ghost, .. } if *ghost == 9)),
"expected forked event"
);
assert!(
machine
.trace()
.iter()
.any(|e| matches!(e, ObsEvent::Joined { .. })),
"expected joined event"
);
assert!(
machine
.trace()
.iter()
.any(|e| matches!(e, ObsEvent::Aborted { .. })),
"expected aborted event"
);
}
#[test]
fn test_lean_fork_requires_speculation_enabled() {
use telltale_machine::instr::{ImmValue, Instr};
let image = single_role_end_image(vec![
Instr::Set {
dst: 1,
val: ImmValue::Nat(3),
},
Instr::Fork { ghost: 1 },
]);
let mut machine = ProtocolMachine::new(ProtocolMachineConfig::default());
machine.load_choreography(&image).unwrap();
let result = machine.run(&PassthroughHandler, 16);
assert_matches!(
result,
Err(telltale_machine::ProtocolMachineError::Fault {
fault: Fault::Speculation { .. },
..
})
);
}
#[test]
fn test_lean_join_requires_active_speculation() {
use telltale_machine::instr::Instr;
let image = single_role_end_image(vec![Instr::Join]);
let cfg = ProtocolMachineConfig {
speculation_enabled: true,
..ProtocolMachineConfig::default()
};
let mut machine = ProtocolMachine::new(cfg);
machine.load_choreography(&image).unwrap();
let result = machine.run(&PassthroughHandler, 16);
assert_matches!(
result,
Err(telltale_machine::ProtocolMachineError::Fault {
fault: Fault::Speculation { .. },
..
})
);
}
#[test]
fn test_lean_abort_requires_active_speculation() {
use telltale_machine::instr::Instr;
let image = single_role_end_image(vec![Instr::Abort]);
let cfg = ProtocolMachineConfig {
speculation_enabled: true,
..ProtocolMachineConfig::default()
};
let mut machine = ProtocolMachine::new(cfg);
machine.load_choreography(&image).unwrap();
let result = machine.run(&PassthroughHandler, 16);
assert_matches!(
result,
Err(telltale_machine::ProtocolMachineError::Fault {
fault: Fault::Speculation { .. },
..
})
);
}
#[test]
fn test_lean_abort_policy_is_deterministic_and_scoped() {
use telltale_machine::instr::{ImmValue, Instr};
let image = single_role_end_image(vec![
Instr::Set {
dst: 1,
val: ImmValue::Nat(7),
},
Instr::Fork { ghost: 1 },
Instr::Abort,
Instr::Halt,
]);
let run_once = || {
let cfg = ProtocolMachineConfig {
speculation_enabled: true,
..ProtocolMachineConfig::default()
};
let mut machine = ProtocolMachine::new(cfg);
let sid = machine.load_choreography(&image).unwrap();
assert_matches!(
machine.step(&PassthroughHandler),
Ok(telltale_machine::StepResult::Continue)
); assert_matches!(
machine.step(&PassthroughHandler),
Ok(telltale_machine::StepResult::Continue)
);
let before_effect_len = machine.effect_trace().len();
let before_crashed = machine.crashed_sites().clone();
let before_partitioned = machine.partitioned_edges().clone();
let before_corrupted = machine.corrupted_edges().clone();
let before_timed_out = machine.timed_out_sites().clone();
let before_trace_len = machine.trace().len();
assert_matches!(
machine.step(&PassthroughHandler),
Ok(telltale_machine::StepResult::Continue)
);
let coros = machine.session_coroutines(sid);
assert_eq!(
coros.len(),
1,
"single-role fixture should keep one coroutine"
);
assert!(
coros[0].spec_state.is_none(),
"abort should clear speculation state"
);
assert_eq!(
machine.effect_trace().len(),
before_effect_len,
"abort should not mutate effect trace"
);
assert_eq!(machine.crashed_sites(), &before_crashed);
assert_eq!(machine.partitioned_edges(), &before_partitioned);
assert_eq!(machine.corrupted_edges(), &before_corrupted);
assert_eq!(machine.timed_out_sites(), before_timed_out);
assert!(
machine.trace().len() > before_trace_len,
"abort step should append at least one observable event"
);
assert!(
machine.trace()[before_trace_len..]
.iter()
.any(|event| matches!(event, ObsEvent::Aborted { .. })),
"abort step should append an Aborted event"
);
machine.run(&PassthroughHandler, 16).unwrap();
machine.canonical_replay_fragment()
};
let left = run_once();
let right = run_once();
assert_eq!(
left, right,
"abort policy should be deterministic for fixed initial state and handler"
);
}
#[test]
fn test_lean_schedule_confluence() {
use telltale_machine::SchedPolicy;
let image = test_support::simple_send_recv_image("A", "B", "msg");
let run_with_policy = |policy: SchedPolicy| -> HashSet<String> {
let config = ProtocolMachineConfig {
sched_policy: policy,
..ProtocolMachineConfig::default()
};
let mut machine = ProtocolMachine::new(config);
machine.load_choreography(&image).unwrap();
let handler = PassthroughHandler;
machine.run(&handler, 100).unwrap();
machine
.trace()
.iter()
.filter_map(|e| match e {
ObsEvent::Sent { label, .. } => Some(format!("sent:{label}")),
ObsEvent::Received { label, .. } => Some(format!("recv:{label}")),
_ => None,
})
.collect()
};
let coop_events = run_with_policy(SchedPolicy::Cooperative);
let rr_events = run_with_policy(SchedPolicy::RoundRobin);
assert_eq!(coop_events, rr_events);
}
#[test]
fn test_lean_cooperative_refines_concurrent() {
use telltale_machine::SchedPolicy;
let image = test_support::simple_send_recv_image("A", "B", "msg");
let run_with_policy = |policy: SchedPolicy| -> bool {
let config = ProtocolMachineConfig {
sched_policy: policy,
..ProtocolMachineConfig::default()
};
let mut machine = ProtocolMachine::new(config);
let sid = machine.load_choreography(&image).unwrap();
let handler = PassthroughHandler;
machine.run(&handler, 100).unwrap();
machine
.session_coroutines(sid)
.iter()
.all(|c| c.is_terminal())
};
assert!(run_with_policy(SchedPolicy::Cooperative));
assert!(run_with_policy(SchedPolicy::RoundRobin));
}
#[test]
fn test_lean_monitor_sound_send() {
let image = test_support::simple_send_recv_image("A", "B", "msg");
let mut machine = ProtocolMachine::new(ProtocolMachineConfig::default());
machine.load_choreography(&image).unwrap();
let handler = PassthroughHandler;
machine.run(&handler, 100).unwrap();
let faults: Vec<_> = machine
.trace()
.iter()
.filter(|e| matches!(e, ObsEvent::Faulted { .. }))
.collect();
assert!(faults.is_empty());
}
#[test]
fn test_lean_monitor_sound_recv() {
let image = test_support::simple_send_recv_image("A", "B", "msg");
let mut machine = ProtocolMachine::new(ProtocolMachineConfig::default());
machine.load_choreography(&image).unwrap();
let handler = PassthroughHandler;
machine.run(&handler, 100).unwrap();
let faults: Vec<_> = machine
.trace()
.iter()
.filter(|e| matches!(e, ObsEvent::Faulted { .. }))
.collect();
assert!(faults.is_empty());
}
#[test]
fn test_lean_monitor_sound_choose() {
let image = test_support::choice_image("A", "B", &["yes", "no"]);
let mut machine = ProtocolMachine::new(ProtocolMachineConfig::default());
machine.load_choreography(&image).unwrap();
let handler = PassthroughHandler;
machine.run(&handler, 100).unwrap();
let faults: Vec<_> = machine
.trace()
.iter()
.filter(|e| matches!(e, ObsEvent::Faulted { .. }))
.collect();
assert!(faults.is_empty());
}
#[test]
fn test_lean_monitor_sound_offer() {
let image = test_support::choice_image("A", "B", &["yes", "no"]);
let mut machine = ProtocolMachine::new(ProtocolMachineConfig::default());
machine.load_choreography(&image).unwrap();
let handler = PassthroughHandler;
machine.run(&handler, 100).unwrap();
let faults: Vec<_> = machine
.trace()
.iter()
.filter(|e| matches!(e, ObsEvent::Faulted { .. }))
.collect();
assert!(faults.is_empty());
}
#[test]
fn test_lean_causal_consistency() {
let image = test_support::simple_send_recv_image("A", "B", "msg");
let mut machine = ProtocolMachine::new(ProtocolMachineConfig::default());
let sid = machine.load_choreography(&image).unwrap();
let handler = PassthroughHandler;
machine.run(&handler, 100).unwrap();
let mut running_sent = 0;
let mut running_recv = 0;
for event in machine.trace() {
match event {
ObsEvent::Sent { session, .. } if *session == sid => running_sent += 1,
ObsEvent::Received { session, .. } if *session == sid => {
running_recv += 1;
assert!(running_recv <= running_sent, "received before sent");
}
_ => {}
}
}
}
#[test]
fn test_lean_fifo_consistency() {
let image = test_support::recursive_send_recv_image("A", "B", "msg");
let mut machine = ProtocolMachine::new(ProtocolMachineConfig::default());
let sid = machine.load_choreography(&image).unwrap();
let handler = PassthroughHandler;
let _ = machine.run(&handler, 200);
let mut sent_labels: Vec<String> = Vec::new();
let mut recv_labels: Vec<String> = Vec::new();
for event in machine.trace() {
match event {
ObsEvent::Sent {
session,
from,
to,
label,
..
} if *session == sid && from == "A" && to == "B" => {
sent_labels.push(label.clone());
}
ObsEvent::Received {
session,
from,
to,
label,
..
} if *session == sid && from == "A" && to == "B" => {
recv_labels.push(label.clone());
}
_ => {}
}
}
assert!(recv_labels.len() <= sent_labels.len());
for (i, label) in recv_labels.iter().enumerate() {
assert_eq!(label, &sent_labels[i]);
}
}
#[test]
fn test_lean_no_phantom_events() {
let image = test_support::simple_send_recv_image("A", "B", "msg");
let mut machine = ProtocolMachine::new(ProtocolMachineConfig::default());
machine.load_choreography(&image).unwrap();
let handler = PassthroughHandler;
machine.run(&handler, 100).unwrap();
for event in machine.trace() {
match event {
ObsEvent::Sent { .. }
| ObsEvent::Received { .. }
| ObsEvent::Opened { .. }
| ObsEvent::Closed { .. }
| ObsEvent::Halted { .. }
| ObsEvent::Invoked { .. }
| ObsEvent::Faulted { .. } => {}
_ => {}
}
}
let has_opened = machine
.trace()
.iter()
.any(|e| matches!(e, ObsEvent::Opened { .. }));
let has_sent = machine
.trace()
.iter()
.any(|e| matches!(e, ObsEvent::Sent { .. }));
let has_recv = machine
.trace()
.iter()
.any(|e| matches!(e, ObsEvent::Received { .. }));
let has_halted = machine
.trace()
.iter()
.any(|e| matches!(e, ObsEvent::Halted { .. }));
assert!(has_opened);
assert!(has_sent);
assert!(has_recv);
assert!(has_halted);
}
#[test]
fn test_lean_wf_pc_bounds() {
let image = test_support::simple_send_recv_image("A", "B", "msg");
let mut machine = ProtocolMachine::new(ProtocolMachineConfig::default());
machine.load_choreography(&image).unwrap();
let handler = PassthroughHandler;
for _ in 0..50 {
for coro_id in 0..10 {
if let Some(coro) = machine.coroutine(coro_id) {
if coro.status == CoroStatus::Ready {
let program_len = machine
.coroutine_program_len(coro.id)
.expect("ready coroutine must reference a valid program");
assert!(
coro.pc < program_len,
"PC {} out of bounds for program len {}",
coro.pc,
program_len
);
}
}
}
match machine.step(&handler) {
Ok(telltale_machine::StepResult::AllDone | telltale_machine::StepResult::Stuck) => {
break
}
Ok(telltale_machine::StepResult::Continue) => {}
Err(_) => break,
}
}
}
#[test]
fn test_lean_endpoint_ownership_unique() {
let image = test_support::simple_send_recv_image("A", "B", "msg");
let mut machine = ProtocolMachine::new(ProtocolMachineConfig::default());
let sid = machine.load_choreography(&image).unwrap();
let mut seen_endpoints = HashSet::new();
for coro in machine.session_coroutines(sid) {
for ep in &coro.owned_endpoints {
assert!(
seen_endpoints.insert(ep.clone()),
"duplicate endpoint ownership: {ep:?}"
);
}
}
}