impl ProtocolMachine {
pub fn refinement_slice(
&self,
) -> Result<ProtocolMachineRefinementSlice, RefinementSliceError> {
cooperative_refinement_slice(&self.coroutines, &self.sessions, &self.sched)
}
#[must_use]
pub fn last_pre_dispatch_refinement_slice(&self) -> Option<ProtocolMachineRefinementSlice> {
self.last_pre_dispatch_state.clone()
}
pub fn transition_refinement_summary(
&self,
) -> Result<crate::refinement::TransitionRefinementSummary, RefinementSliceError> {
crate::refinement::TransitionRefinementSummary::from_runtime_state(
&self.coroutines,
&self.sessions,
&self.sched,
self.last_sched_step.as_ref(),
)
}
pub fn claimed_runtime_core_bundle(
&self,
) -> Result<crate::refinement::ClaimedRuntimeCoreBundle, RefinementSliceError> {
crate::refinement::ClaimedRuntimeCoreBundle::from_runtime_state(
&self.coroutines,
&self.sessions,
&self.sched,
self.last_sched_step.as_ref(),
)
}
pub fn runtime_observation_bundle(
&self,
) -> Result<crate::refinement::RuntimeObservationBundle, RefinementSliceError> {
let authority_audit_log = self.combined_authority_audit_log();
crate::refinement::RuntimeObservationBundle::from_runtime_state(
&self.coroutines,
&self.sessions,
&self.sched,
self.last_sched_step.as_ref(),
authority_audit_log.as_slice(),
self.delegation_audit_log.as_slice(),
self.operation_instances.as_slice(),
self.obs_trace.as_slice(),
self.outstanding_effects.as_slice(),
self.output_condition_checks.as_slice(),
self.progress_contracts.as_slice(),
self.progress_transitions.as_slice(),
self.effect_exchanges.as_slice(),
)
}
fn combined_authority_audit_log(&self) -> Vec<AuthorityAuditRecord> {
let mut out = self.authority_audit_log.as_slice().to_vec();
for sid in self.sessions.session_ids() {
if let Some(records) = self.sessions.authority_audit_log(sid) {
out.extend_from_slice(records);
}
}
out.sort_by_key(|record| record.tick.unwrap_or(0));
out
}
#[must_use]
pub fn memory_usage(&self) -> ProtocolMachineMemoryUsage {
let session_store = self.sessions.memory_usage();
let retained_bytes = self.retained_bytes(session_store.retained_bytes.total);
ProtocolMachineMemoryUsage {
session_store,
coroutine_records: self.coroutines.len(),
terminal_coroutines: self.coroutines.iter().filter(|coro| coro.is_terminal()).count(),
program_count: self.programs.len(),
program_instruction_count: self.programs.instruction_count(),
obs_events: self.obs_trace.len(),
effect_trace_entries: self.effect_trace.len(),
communication_artifacts: self.communication_consumption_artifacts.len(),
output_condition_checks: self.output_condition_checks.len(),
delegation_audits: self.delegation_audit_log.len(),
authority_audits: self.authority_audit_log.len(),
retained_bytes,
}
}
fn retained_bytes(&self, session_store_bytes: usize) -> ProtocolMachineRetainedBytes {
let mut retained_bytes = ProtocolMachineRetainedBytes {
session_store: session_store_bytes,
coroutines: self.coroutines.iter().map(serialized_byte_size).sum(),
programs: serialized_byte_size(&self.programs)
.saturating_add(serialized_byte_size(&self.code)),
resource_states: serialized_byte_size(&self.resource_states),
traces: serialized_byte_size(&self.obs_trace)
.saturating_add(serialized_byte_size(&self.effect_trace))
.saturating_add(serialized_byte_size(&self.delegation_audit_log))
.saturating_add(serialized_byte_size(&self.authority_audit_log)),
replay: serialized_byte_size(&self.communication_consumption)
.saturating_add(serialized_byte_size(&self.communication_consumption_artifacts)),
output_condition_checks: serialized_byte_size(&self.output_condition_checks),
scheduler_and_control: serialized_byte_size(&self.sched)
.saturating_add(serialized_byte_size(&self.eligible_ready))
.saturating_add(serialized_byte_size(&self.paused_roles))
.saturating_add(serialized_byte_size(&self.crashed_sites))
.saturating_add(serialized_byte_size(&self.partitioned_edges))
.saturating_add(serialized_byte_size(&self.corrupted_edges))
.saturating_add(serialized_byte_size(&self.timed_out_sites))
.saturating_add(serialized_byte_size(&self.clock))
.saturating_add(serialized_byte_size(&self.last_sched_step))
.saturating_add(serialized_byte_size(&self.handler_identity_anchor))
.saturating_add(serialized_byte_size(&self.next_coro_id))
.saturating_add(serialized_byte_size(&self.next_session_id)),
symbols: serialized_byte_size(&self.role_symbols)
.saturating_add(serialized_byte_size(&self.label_symbols))
.saturating_add(serialized_byte_size(&self.handler_symbols))
.saturating_add(serialized_byte_size(&self.edge_symbols)),
guard_layer: serialized_byte_size(&self.guard_layer),
monitor: serialized_byte_size(&self.monitor),
arena: serialized_byte_size(&self.arena),
total: 0,
};
retained_bytes.total = Self::retained_bytes_total(&retained_bytes);
retained_bytes
}
fn retained_bytes_total(retained_bytes: &ProtocolMachineRetainedBytes) -> usize {
retained_bytes
.session_store
.saturating_add(retained_bytes.coroutines)
.saturating_add(retained_bytes.programs)
.saturating_add(retained_bytes.resource_states)
.saturating_add(retained_bytes.traces)
.saturating_add(retained_bytes.replay)
.saturating_add(retained_bytes.output_condition_checks)
.saturating_add(retained_bytes.scheduler_and_control)
.saturating_add(retained_bytes.symbols)
.saturating_add(retained_bytes.guard_layer)
.saturating_add(retained_bytes.monitor)
.saturating_add(retained_bytes.arena)
}
#[must_use]
pub fn output_condition_checks(&self) -> &[OutputConditionCheck] {
self.output_condition_checks.as_slice()
}
#[must_use]
pub fn effect_trace(&self) -> &[EffectTraceEntry] {
self.effect_trace.as_slice()
}
#[must_use]
pub fn effect_exchanges(&self) -> &[EffectExchangeRecord] {
self.effect_exchanges.as_slice()
}
#[must_use]
pub fn operation_instances(&self) -> &[OperationInstance] {
self.operation_instances.as_slice()
}
#[must_use]
pub fn outstanding_effects(&self) -> &[OutstandingEffect] {
self.outstanding_effects.as_slice()
}
#[must_use]
pub fn progress_contracts(&self) -> &[crate::semantic_objects::ProgressContract] {
self.progress_contracts.as_slice()
}
#[must_use]
pub fn progress_transitions(&self) -> &[crate::semantic_objects::ProgressTransition] {
self.progress_transitions.as_slice()
}
#[must_use]
pub fn delegation_audit_log(&self) -> &[DelegationAuditRecord] {
self.delegation_audit_log.as_slice()
}
#[must_use]
pub fn authority_audit_log(&self) -> &[AuthorityAuditRecord] {
self.authority_audit_log.as_slice()
}
#[must_use]
pub fn semantic_audit_log(&self) -> Vec<SemanticAuditRecord> {
let authority_audit_log = self.combined_authority_audit_log();
semantic_audit_log_v1(
authority_audit_log.as_slice(),
self.delegation_audit_log.as_slice(),
self.operation_instances.as_slice(),
self.obs_trace.as_slice(),
self.outstanding_effects.as_slice(),
self.progress_contracts.as_slice(),
self.progress_transitions.as_slice(),
)
}
#[must_use]
pub fn capability_lifecycle_audit_log(
&self,
) -> Vec<crate::ProtocolCriticalCapabilityLifecycleRecord> {
let authority_audit_log = self.combined_authority_audit_log();
crate::capability_lifecycle_audit_log_v1(
authority_audit_log.as_slice(),
self.delegation_audit_log.as_slice(),
)
}
#[must_use]
pub fn semantic_objects(&self) -> ProtocolMachineSemanticObjects {
let authority_audit_log = self.combined_authority_audit_log();
protocol_machine_semantic_objects(
authority_audit_log.as_slice(),
self.delegation_audit_log.as_slice(),
self.operation_instances.as_slice(),
self.outstanding_effects.as_slice(),
self.output_condition_checks.as_slice(),
self.progress_contracts.as_slice(),
self.progress_transitions.as_slice(),
)
}
#[must_use]
pub fn publication_events(&self) -> Vec<crate::semantic_objects::PublicationEvent> {
self.semantic_objects().publication_events
}
#[must_use]
pub fn prestate_bindings(&self) -> Vec<crate::semantic_objects::PrestateBinding> {
self.semantic_objects().prestate_bindings
}
#[must_use]
pub fn agreement_profiles(&self) -> Vec<crate::semantic_objects::AgreementProfile> {
self.semantic_objects().agreement_profiles
}
#[must_use]
pub fn agreement_contracts(&self) -> Vec<crate::semantic_objects::AgreementContract> {
self.semantic_objects().agreement_contracts
}
#[must_use]
pub fn agreement_evidence(&self) -> Vec<crate::semantic_objects::AgreementEvidence> {
self.semantic_objects().agreement_evidence
}
#[must_use]
pub fn agreement_states(&self) -> Vec<crate::semantic_objects::AgreementState> {
self.semantic_objects().agreement_states
}
pub fn require_authoritative_read(
&self,
read_id: &str,
) -> Result<crate::semantic_objects::AuthoritativeRead, ProtocolMachineError> {
self.semantic_objects()
.require_authoritative_read(read_id)
.cloned()
.map_err(|message| ProtocolMachineError::HandlerError(crate::effect::EffectFailure::contract_violation(message)))
}
pub fn require_canonical_handle(
&self,
handle_id: &str,
) -> Result<crate::semantic_objects::CanonicalHandle, ProtocolMachineError> {
self.semantic_objects()
.require_canonical_handle(handle_id)
.cloned()
.map_err(|message| ProtocolMachineError::HandlerError(crate::effect::EffectFailure::contract_violation(message)))
}
#[must_use]
pub fn communication_replay_root(&self) -> crate::verification::Hash {
self.communication_consumption.root()
}
#[must_use]
pub fn communication_consumption_artifacts(&self) -> &[CommunicationConsumptionArtifact] {
self.communication_consumption_artifacts.as_slice()
}
pub fn drain_obs_trace(&mut self) -> Vec<ObsEvent> {
self.obs_trace.drain()
}
pub fn drain_effect_trace(&mut self) -> Vec<EffectTraceEntry> {
self.effect_trace.drain()
}
pub fn drain_output_condition_checks(&mut self) -> Vec<OutputConditionCheck> {
self.output_condition_checks.drain()
}
pub fn drain_delegation_audit_log(&mut self) -> Vec<DelegationAuditRecord> {
self.delegation_audit_log.drain()
}
pub fn drain_authority_audit_log(&mut self) -> Vec<AuthorityAuditRecord> {
self.authority_audit_log.drain()
}
pub fn drain_communication_consumption_artifacts(
&mut self,
) -> Vec<CommunicationConsumptionArtifact> {
self.communication_consumption_artifacts.drain()
}
#[must_use]
pub fn canonical_replay_fragment(&self) -> CanonicalReplayFragmentV1 {
let authority_audit_log = self.combined_authority_audit_log();
let partitioned_edges = self.partitioned_edges.iter().cloned().collect();
let corrupted_edges = self
.corrupted_edges
.iter()
.map(|(edge, corruption)| (edge.clone(), *corruption))
.collect();
let timed_out_sites = self
.timed_out_sites
.iter()
.map(|(site, witness)| (site.clone(), witness.until_tick))
.collect();
canonical_replay_fragment_v1(
self.obs_trace.as_slice(),
self.effect_trace.as_slice(),
authority_audit_log.as_slice(),
self.delegation_audit_log.as_slice(),
self.operation_instances.as_slice(),
self.outstanding_effects.as_slice(),
self.output_condition_checks.as_slice(),
self.progress_contracts.as_slice(),
self.progress_transitions.as_slice(),
self.crashed_sites.iter().cloned().collect(),
partitioned_edges,
corrupted_edges,
timed_out_sites,
self.config.effect_determinism_tier,
self.config.communication_replay_mode,
Some(self.communication_consumption.root()),
self.communication_consumption_artifacts.as_slice().to_vec(),
)
}
#[must_use]
pub fn crashed_sites(&self) -> &BTreeSet<SiteId> {
&self.crashed_sites
}
#[must_use]
pub fn partitioned_edges(&self) -> &BTreeSet<(SiteId, SiteId)> {
&self.partitioned_edges
}
#[must_use]
pub fn corrupted_edges(&self) -> &BTreeMap<(SiteId, SiteId), CorruptionType> {
&self.corrupted_edges
}
#[must_use]
pub fn timed_out_sites(&self) -> BTreeMap<SiteId, u64> {
self.timed_out_sites
.iter()
.map(|(site, witness)| (site.clone(), witness.until_tick))
.collect()
}
#[must_use]
pub fn timeout_witnesses(&self) -> &BTreeMap<SiteId, TimeoutWitness> {
&self.timed_out_sites
}
}