use super::*;
use crate::meerkat_machine_types::{
MeerkatMachineFieldlessRuntimeInternalInput,
canonical_meerkat_machine_runtime_internal_fieldless_input_variant_manifest,
canonical_meerkat_machine_runtime_internal_input_variant_manifest,
};
#[derive(Debug, Clone)]
pub(crate) struct DslTransitionEffects {
effects: Vec<dsl::MeerkatMachineEffect>,
}
impl DslTransitionEffects {
fn new(effects: Vec<dsl::MeerkatMachineEffect>) -> Self {
Self { effects }
}
pub(crate) fn as_slice(&self) -> &[dsl::MeerkatMachineEffect] {
&self.effects
}
}
pub(crate) fn apply_dsl_transition_on_authority(
authority: &crate::driver::ephemeral::SharedIngressDslAuthority,
input: dsl::MeerkatMachineInput,
context: &str,
) -> Result<DslTransitionEffects, String> {
MeerkatMachine::reject_raw_fieldless_runtime_internal_dsl_input(&input)?;
let mut authority = authority
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
dsl::MeerkatMachineMutator::apply(&mut *authority, input)
.map(|transition| DslTransitionEffects::new(transition.into_effects()))
.map_err(|err| dsl_authority::map_error(err, context))
}
impl std::ops::Deref for DslTransitionEffects {
type Target = [dsl::MeerkatMachineEffect];
fn deref(&self) -> &Self::Target {
self.as_slice()
}
}
impl MeerkatMachine {
pub(super) async fn stage_session_runtime_internal_dsl_transition(
&self,
session_id: &SessionId,
input: MeerkatMachineFieldlessRuntimeInternalInput,
) -> Result<StagedSessionDslInput, String> {
let authority = self.session_dsl_authority(session_id).await?;
Self::stage_runtime_internal_dsl_transition_on_authority(&authority, input)
}
pub(super) fn stage_runtime_internal_dsl_transition_on_authority(
authority: &crate::driver::ephemeral::SharedIngressDslAuthority,
input: MeerkatMachineFieldlessRuntimeInternalInput,
) -> Result<StagedSessionDslInput, String> {
let variant = input.input_variant();
if !canonical_meerkat_machine_runtime_internal_input_variant_manifest().contains(&variant) {
return Err(format!(
"runtime-internal input {variant:?} is absent from the typed production manifest"
));
}
if !canonical_meerkat_machine_runtime_internal_fieldless_input_variant_manifest()
.contains(&variant)
{
return Err(format!(
"runtime-internal input {variant:?} is absent from the typed fieldless manifest"
));
}
if !input.requires_typed_runtime_internal_stager() {
return Err(format!(
"fieldless runtime-internal input {variant:?} is owned by {:?}, not the typed runtime-internal stager",
input.authority()
));
}
Self::stage_dsl_transition_on_authority_after_typed_gate(
authority,
input.dsl_input(),
variant.as_str(),
)
}
pub(super) async fn stage_session_dsl_input(
&self,
session_id: &SessionId,
input: dsl::MeerkatMachineInput,
context: &str,
) -> Result<dsl::MeerkatMachineAuthoritySnapshot, String> {
self.stage_session_dsl_transition(session_id, input, context)
.await
.map(|staged| staged.previous_snapshot)
}
pub(super) async fn stage_session_dsl_transition(
&self,
session_id: &SessionId,
input: dsl::MeerkatMachineInput,
context: &str,
) -> Result<StagedSessionDslInput, String> {
let authority = self.session_dsl_authority(session_id).await?;
Self::stage_dsl_transition_on_authority(&authority, input, context)
}
pub(super) fn stage_dsl_transition_on_authority(
authority: &crate::driver::ephemeral::SharedIngressDslAuthority,
input: dsl::MeerkatMachineInput,
context: &str,
) -> Result<StagedSessionDslInput, String> {
Self::reject_raw_fieldless_runtime_internal_dsl_input(&input)?;
Self::stage_dsl_transition_on_authority_after_typed_gate(authority, input, context)
}
fn stage_dsl_transition_on_authority_after_typed_gate(
authority: &crate::driver::ephemeral::SharedIngressDslAuthority,
input: dsl::MeerkatMachineInput,
context: &str,
) -> Result<StagedSessionDslInput, String> {
let mut authority = authority
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let previous_snapshot = authority.snapshot();
let effects = dsl::MeerkatMachineMutator::apply(&mut *authority, input)
.map(|transition| DslTransitionEffects::new(transition.into_effects()))
.map_err(|err| dsl_authority::map_error(err, context))?;
let committed_snapshot = authority.snapshot();
Ok(StagedSessionDslInput {
previous_snapshot,
committed_snapshot,
effects,
})
}
pub(super) fn reject_raw_fieldless_runtime_internal_dsl_input(
input: &dsl::MeerkatMachineInput,
) -> Result<(), String> {
MeerkatMachineFieldlessRuntimeInternalInput::reject_raw_dsl_input(input)
}
pub(super) async fn apply_session_dsl_input(
&self,
session_id: &SessionId,
input: dsl::MeerkatMachineInput,
context: &str,
) -> Result<(dsl::MeerkatMachineAuthoritySnapshot, DslTransitionEffects), String> {
self.apply_session_dsl_input_with_dispatch_failure(
session_id,
input,
context,
CommittedEffectDispatchFailure::PreserveCommittedDslState,
)
.await
}
pub(super) async fn apply_session_dsl_input_with_dispatch_failure(
&self,
session_id: &SessionId,
input: dsl::MeerkatMachineInput,
context: &str,
dispatch_failure: CommittedEffectDispatchFailure,
) -> Result<(dsl::MeerkatMachineAuthoritySnapshot, DslTransitionEffects), String> {
Self::reject_raw_fieldless_runtime_internal_dsl_input(&input)?;
let authority = self.session_dsl_authority(session_id).await?;
let (previous_snapshot, effects) = {
let mut authority = authority
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let previous_snapshot = authority.snapshot();
let effects = dsl::MeerkatMachineMutator::apply(&mut *authority, input)
.map(|transition| DslTransitionEffects::new(transition.into_effects()))
.map_err(|err| dsl_authority::map_error(err, context))?;
(previous_snapshot, effects)
};
if let Err(error) = self.dispatch_routed_signals_from_effects(&effects).await {
let CommittedEffectDispatchFailure::PreserveCommittedDslState = dispatch_failure;
return Err(format!(
"DSL authority ({context}): committed effect dispatch failed: {error}"
));
}
Ok((previous_snapshot, effects))
}
pub(super) async fn apply_routed_session_dsl_input(
&self,
session_id: &SessionId,
input: dsl::MeerkatMachineInput,
context: &str,
) -> Result<
(dsl::MeerkatMachineAuthoritySnapshot, DslTransitionEffects),
dsl_authority::DslTransitionRefusal,
> {
if let Err(reason) = Self::reject_raw_fieldless_runtime_internal_dsl_input(&input) {
return Err(dsl_authority::DslTransitionRefusal::other(
"routed_raw_internal_input_rejected",
reason,
));
}
let authority = self
.session_dsl_authority(session_id)
.await
.map_err(|reason| {
dsl_authority::DslTransitionRefusal::other("session_authority_unavailable", reason)
})?;
let (previous_snapshot, effects) = {
let mut authority = authority
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let previous_snapshot = authority.snapshot();
let effects = dsl::MeerkatMachineMutator::apply(&mut *authority, input)
.map(|transition| DslTransitionEffects::new(transition.into_effects()))
.map_err(|err| dsl_authority::refusal(err, context))?;
(previous_snapshot, effects)
};
if let Err(error) = self.dispatch_routed_signals_from_effects(&effects).await {
return Err(dsl_authority::DslTransitionRefusal::other(
"committed_effect_dispatch_failed",
format!("DSL authority ({context}): committed effect dispatch failed: {error}"),
));
}
Ok((previous_snapshot, effects))
}
}