#![allow(
clippy::unwrap_used,
clippy::expect_used,
clippy::panic,
clippy::implicit_clone,
clippy::unnecessary_cast,
clippy::redundant_clone
)]
pub use crate::ids::{BranchId, FlowNodeId, FrameId, LoopInstanceId};
#[allow(non_camel_case_types)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum DependencyMode {
All,
Any,
}
impl DependencyMode {
pub fn as_str(&self) -> &'static str {
match self {
Self::All => "All",
Self::Any => "Any",
}
}
}
impl std::fmt::Display for DependencyMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
#[allow(non_camel_case_types)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum FlowNodeKind {
Step,
Loop,
}
impl FlowNodeKind {
pub fn as_str(&self) -> &'static str {
match self {
Self::Step => "Step",
Self::Loop => "Loop",
}
}
}
impl std::fmt::Display for FlowNodeKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
#[allow(non_camel_case_types)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum FrameScope {
Root,
Body,
}
impl FrameScope {
pub fn as_str(&self) -> &'static str {
match self {
Self::Root => "Root",
Self::Body => "Body",
}
}
}
impl std::fmt::Display for FrameScope {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
#[allow(non_camel_case_types)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum NodeRunStatus {
Pending,
Ready,
Running,
Completed,
Failed,
Skipped,
Canceled,
}
impl NodeRunStatus {
pub fn as_str(&self) -> &'static str {
match self {
Self::Pending => "Pending",
Self::Ready => "Ready",
Self::Running => "Running",
Self::Completed => "Completed",
Self::Failed => "Failed",
Self::Skipped => "Skipped",
Self::Canceled => "Canceled",
}
}
}
impl std::fmt::Display for NodeRunStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
#[allow(non_camel_case_types)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum FrameTerminalStatus {
Completed,
Failed,
Canceled,
}
impl FrameTerminalStatus {
pub fn as_str(&self) -> &'static str {
match self {
Self::Completed => "Completed",
Self::Failed => "Failed",
Self::Canceled => "Canceled",
}
}
}
impl std::fmt::Display for FrameTerminalStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
pub trait Context {}
pub struct EmptyContext;
impl Context for EmptyContext {}
#[allow(non_camel_case_types)]
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum Phase {
Absent,
Running,
Completed,
Failed,
Canceled,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct State {
pub phase: Phase,
pub frame_id: FrameId,
pub frame_scope: FrameScope,
pub loop_instance_id: LoopInstanceId,
pub iteration: u32,
pub last_admitted_node: FlowNodeId,
pub tracked_nodes: std::collections::BTreeSet<FlowNodeId>,
pub ordered_nodes: Vec<FlowNodeId>,
pub node_kind: std::collections::BTreeMap<FlowNodeId, FlowNodeKind>,
pub node_dependencies: std::collections::BTreeMap<FlowNodeId, Vec<FlowNodeId>>,
pub node_dependency_modes: std::collections::BTreeMap<FlowNodeId, DependencyMode>,
pub node_branches: std::collections::BTreeMap<FlowNodeId, Option<BranchId>>,
pub branch_winners: std::collections::BTreeSet<BranchId>,
pub node_status: std::collections::BTreeMap<FlowNodeId, NodeRunStatus>,
pub ready_queue: Vec<FlowNodeId>,
pub output_recorded: std::collections::BTreeMap<FlowNodeId, bool>,
pub node_condition_results: std::collections::BTreeMap<FlowNodeId, Option<bool>>,
}
impl Default for State {
fn default() -> Self {
initial_state()
}
}
pub(crate) mod inputs {
use super::*;
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct StartRootFrame {
pub frame_id: FrameId,
pub tracked_nodes: std::collections::BTreeSet<FlowNodeId>,
pub ordered_nodes: Vec<FlowNodeId>,
pub node_kind: std::collections::BTreeMap<FlowNodeId, FlowNodeKind>,
pub node_dependencies: std::collections::BTreeMap<FlowNodeId, Vec<FlowNodeId>>,
pub node_dependency_modes: std::collections::BTreeMap<FlowNodeId, DependencyMode>,
pub node_branches: std::collections::BTreeMap<FlowNodeId, Option<BranchId>>,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct StartBodyFrame {
pub frame_id: FrameId,
pub loop_instance_id: LoopInstanceId,
pub iteration: u32,
pub tracked_nodes: std::collections::BTreeSet<FlowNodeId>,
pub ordered_nodes: Vec<FlowNodeId>,
pub node_kind: std::collections::BTreeMap<FlowNodeId, FlowNodeKind>,
pub node_dependencies: std::collections::BTreeMap<FlowNodeId, Vec<FlowNodeId>>,
pub node_dependency_modes: std::collections::BTreeMap<FlowNodeId, DependencyMode>,
pub node_branches: std::collections::BTreeMap<FlowNodeId, Option<BranchId>>,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct AdmitNextReadyNode {
pub node_id: FlowNodeId,
pub ready_queue: Vec<FlowNodeId>,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct CompleteNode {
pub node_id: FlowNodeId,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct RecordNodeOutput {
pub node_id: FlowNodeId,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct FailNode {
pub node_id: FlowNodeId,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct SkipNode {
pub node_id: FlowNodeId,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct CancelNode {
pub node_id: FlowNodeId,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct SealFrame {
pub terminal_status: FrameTerminalStatus,
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub(crate) enum Input {
StartRootFrame(inputs::StartRootFrame),
StartBodyFrame(inputs::StartBodyFrame),
AdmitNextReadyNode(inputs::AdmitNextReadyNode),
CompleteNode(inputs::CompleteNode),
RecordNodeOutput(inputs::RecordNodeOutput),
FailNode(inputs::FailNode),
SkipNode(inputs::SkipNode),
CancelNode(inputs::CancelNode),
SealFrame(inputs::SealFrame),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub(crate) enum InputKind {
StartRootFrame,
StartBodyFrame,
AdmitNextReadyNode,
CompleteNode,
RecordNodeOutput,
FailNode,
SkipNode,
CancelNode,
SealFrame,
}
pub mod effects {
use super::*;
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct ReadyFrontierChanged {
pub frame_id: FrameId,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct AdmitStepWork {
pub frame_id: FrameId,
pub node_id: FlowNodeId,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct StartLoopNode {
pub frame_id: FrameId,
pub node_id: FlowNodeId,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct PersistStepOutput {
pub frame_id: FrameId,
pub node_id: FlowNodeId,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct NodeExecutionReleased {
pub frame_id: FrameId,
pub node_id: FlowNodeId,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct RootFrameCompleted {
pub frame_id: FrameId,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct RootFrameFailed {
pub frame_id: FrameId,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct RootFrameCanceled {
pub frame_id: FrameId,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct BodyFrameCompleted {
pub frame_id: FrameId,
pub loop_instance_id: LoopInstanceId,
pub iteration: u32,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct BodyFrameFailed {
pub frame_id: FrameId,
pub loop_instance_id: LoopInstanceId,
pub iteration: u32,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct BodyFrameCanceled {
pub frame_id: FrameId,
pub loop_instance_id: LoopInstanceId,
pub iteration: u32,
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum Effect {
ReadyFrontierChanged(effects::ReadyFrontierChanged),
AdmitStepWork(effects::AdmitStepWork),
StartLoopNode(effects::StartLoopNode),
PersistStepOutput(effects::PersistStepOutput),
NodeExecutionReleased(effects::NodeExecutionReleased),
RootFrameCompleted(effects::RootFrameCompleted),
RootFrameFailed(effects::RootFrameFailed),
RootFrameCanceled(effects::RootFrameCanceled),
BodyFrameCompleted(effects::BodyFrameCompleted),
BodyFrameFailed(effects::BodyFrameFailed),
BodyFrameCanceled(effects::BodyFrameCanceled),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum EffectKind {
ReadyFrontierChanged,
AdmitStepWork,
StartLoopNode,
PersistStepOutput,
NodeExecutionReleased,
RootFrameCompleted,
RootFrameFailed,
RootFrameCanceled,
BodyFrameCompleted,
BodyFrameFailed,
BodyFrameCanceled,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum TransitionId {
StartRootFrame,
StartBodyFrame,
AdmitNextReadyNode,
CompleteNode,
RecordNodeOutput,
FailNode,
SkipNode,
CancelNode,
SealFrame,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum GuardId {
Phase,
NodeExists,
NodeRunning,
ReadyFrontier,
Sealable,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum HelperId {
None,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct Outcome {
pub transition_id: TransitionId,
pub next_state: State,
pub effects: Vec<Effect>,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub(crate) enum TransitionError {
Refusal(TransitionRefusal),
Kernel(KernelError),
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub(crate) enum TransitionRefusal {
NoMatchingTransition {
phase: Phase,
trigger: TriggerDiscriminant,
},
GuardRejected {
rejections: Vec<GuardRejection>,
},
AmbiguousTransition {
transitions: Vec<TransitionId>,
},
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub(crate) enum TriggerDiscriminant {
Input(InputKind),
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct GuardRejection {
pub transition_id: TransitionId,
pub guard_id: GuardId,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum KernelError {
ContextViolation {
transition_id: TransitionId,
detail: String,
},
HelperEvaluation {
helper_id: HelperId,
detail: String,
},
CodegenInvariant {
detail: String,
},
}
pub mod helpers {
use super::*;
pub fn none<C: Context>(_: &State, context: &C) -> Result<(), KernelError> {
let _ = context;
Ok(())
}
}
pub fn initial_state() -> State {
State {
phase: Phase::Absent,
frame_id: FrameId::from(String::new()),
frame_scope: FrameScope::Root,
loop_instance_id: LoopInstanceId::from(String::new()),
iteration: 0,
last_admitted_node: FlowNodeId::from(String::new()),
tracked_nodes: Default::default(),
ordered_nodes: Vec::new(),
node_kind: Default::default(),
node_dependencies: Default::default(),
node_dependency_modes: Default::default(),
node_branches: Default::default(),
branch_winners: Default::default(),
node_status: Default::default(),
ready_queue: Vec::new(),
output_recorded: Default::default(),
node_condition_results: Default::default(),
}
}
fn refusal(phase: &Phase, kind: InputKind) -> TransitionError {
TransitionError::Refusal(TransitionRefusal::NoMatchingTransition {
phase: phase.clone(),
trigger: TriggerDiscriminant::Input(kind),
})
}
fn guard(transition_id: TransitionId, guard_id: GuardId) -> TransitionError {
TransitionError::Refusal(TransitionRefusal::GuardRejected {
rejections: vec![GuardRejection {
transition_id,
guard_id,
}],
})
}
#[cfg(test)]
fn is_terminal(status: NodeRunStatus) -> bool {
matches!(
status,
NodeRunStatus::Completed
| NodeRunStatus::Failed
| NodeRunStatus::Skipped
| NodeRunStatus::Canceled
)
}
#[cfg(test)]
fn terminal_status_from_node_projection(state: &State) -> Option<FrameTerminalStatus> {
if !state.ordered_nodes.iter().all(|node_id| {
state
.node_status
.get(node_id)
.copied()
.is_some_and(is_terminal)
}) {
return None;
}
if state
.node_status
.values()
.copied()
.any(|status| status == NodeRunStatus::Failed)
{
Some(FrameTerminalStatus::Failed)
} else if state
.node_status
.values()
.copied()
.any(|status| status == NodeRunStatus::Canceled)
{
Some(FrameTerminalStatus::Canceled)
} else {
Some(FrameTerminalStatus::Completed)
}
}
#[cfg(test)]
fn dependency_satisfied(status: NodeRunStatus) -> bool {
matches!(status, NodeRunStatus::Completed)
}
#[cfg(test)]
fn dependency_terminal_failure(status: NodeRunStatus) -> bool {
matches!(
status,
NodeRunStatus::Failed | NodeRunStatus::Skipped | NodeRunStatus::Canceled
)
}
#[cfg(test)]
fn refresh_ready_frontier(state: &mut State) {
state.ready_queue.clear();
for node_id in state.ordered_nodes.clone() {
let Some(status) = state.node_status.get(&node_id).copied() else {
continue;
};
if status == NodeRunStatus::Ready {
state.ready_queue.push(node_id);
continue;
}
if status != NodeRunStatus::Pending {
continue;
}
if let Some(branch) = state.node_branches.get(&node_id).and_then(|b| b.as_ref())
&& state.branch_winners.contains(branch)
{
continue;
}
let deps = state
.node_dependencies
.get(&node_id)
.cloned()
.unwrap_or_default();
let dep_mode = state
.node_dependency_modes
.get(&node_id)
.copied()
.unwrap_or(DependencyMode::All);
let dep_ok = if deps.is_empty() {
true
} else {
match dep_mode {
DependencyMode::All => deps.iter().all(|dep| {
state
.node_status
.get(dep)
.copied()
.is_some_and(dependency_satisfied)
}),
DependencyMode::Any => deps.iter().any(|dep| {
state
.node_status
.get(dep)
.copied()
.is_some_and(dependency_satisfied)
}),
}
};
if dep_ok {
state
.node_status
.insert(node_id.clone(), NodeRunStatus::Ready);
state.ready_queue.push(node_id);
}
}
}
#[cfg(test)]
fn propagate_blocked_nodes(state: &mut State) {
loop {
let mut changed = false;
for node_id in state.ordered_nodes.clone() {
let Some(status) = state.node_status.get(&node_id).copied() else {
continue;
};
if status != NodeRunStatus::Pending && status != NodeRunStatus::Ready {
continue;
}
let deps = state
.node_dependencies
.get(&node_id)
.cloned()
.unwrap_or_default();
if deps.is_empty() {
continue;
}
let dep_mode = state
.node_dependency_modes
.get(&node_id)
.copied()
.unwrap_or(DependencyMode::All);
let should_skip = match dep_mode {
DependencyMode::All => deps.iter().any(|dep| {
state
.node_status
.get(dep)
.copied()
.is_some_and(dependency_terminal_failure)
}),
DependencyMode::Any => deps.iter().all(|dep| {
state.node_status.get(dep).copied().is_some_and(|status| {
status == NodeRunStatus::Skipped || dependency_terminal_failure(status)
})
}),
};
if should_skip {
state
.node_status
.insert(node_id.clone(), NodeRunStatus::Skipped);
state.ready_queue.retain(|candidate| candidate != &node_id);
changed = true;
}
}
if !changed {
break;
}
}
}
#[cfg(test)]
fn apply_branch_winner(state: &mut State, winner_node: &FlowNodeId) {
let Some(branch) = state
.node_branches
.get(winner_node)
.and_then(std::clone::Clone::clone)
else {
return;
};
state.branch_winners.insert(branch.clone());
for node_id in state.ordered_nodes.clone() {
if node_id == *winner_node {
continue;
}
if state
.node_branches
.get(&node_id)
.and_then(|candidate| candidate.as_ref())
!= Some(&branch)
{
continue;
}
if let Some(status) = state.node_status.get(&node_id).copied()
&& !is_terminal(status)
&& status != NodeRunStatus::Running
{
state.node_status.insert(node_id, NodeRunStatus::Skipped);
}
}
}
#[cfg(test)]
#[allow(clippy::too_many_arguments)]
fn start_frame_state(
frame_id: FrameId,
frame_scope: FrameScope,
loop_instance_id: LoopInstanceId,
iteration: u32,
tracked_nodes: std::collections::BTreeSet<FlowNodeId>,
ordered_nodes: Vec<FlowNodeId>,
node_kind: std::collections::BTreeMap<FlowNodeId, FlowNodeKind>,
node_dependencies: std::collections::BTreeMap<FlowNodeId, Vec<FlowNodeId>>,
node_dependency_modes: std::collections::BTreeMap<FlowNodeId, DependencyMode>,
node_branches: std::collections::BTreeMap<FlowNodeId, Option<BranchId>>,
) -> State {
let mut state = State {
phase: Phase::Running,
frame_id,
frame_scope,
loop_instance_id,
iteration,
last_admitted_node: FlowNodeId::from(String::new()),
tracked_nodes,
ordered_nodes,
node_kind,
node_dependencies,
node_dependency_modes,
node_branches,
branch_winners: Default::default(),
node_status: Default::default(),
ready_queue: Vec::new(),
output_recorded: Default::default(),
node_condition_results: Default::default(),
};
for node_id in state.ordered_nodes.clone() {
state
.node_status
.insert(node_id.clone(), NodeRunStatus::Pending);
state.output_recorded.insert(node_id.clone(), false);
state.node_condition_results.insert(node_id, None);
}
refresh_ready_frontier(&mut state);
state
}
#[cfg(test)]
fn admit_next_ready_node(mut state: State) -> Result<Outcome, TransitionError> {
if state.phase != Phase::Running {
return Err(refusal(&state.phase, InputKind::AdmitNextReadyNode));
}
refresh_ready_frontier(&mut state);
let Some(node_id) = state.ready_queue.first().cloned() else {
return Err(guard(
TransitionId::AdmitNextReadyNode,
GuardId::ReadyFrontier,
));
};
state.ready_queue.retain(|candidate| candidate != &node_id);
state.last_admitted_node = node_id.clone();
state
.node_status
.insert(node_id.clone(), NodeRunStatus::Running);
let effect = match state.node_kind.get(&node_id).copied() {
Some(FlowNodeKind::Step) => Effect::AdmitStepWork(effects::AdmitStepWork {
frame_id: state.frame_id.clone(),
node_id,
}),
Some(FlowNodeKind::Loop) => Effect::StartLoopNode(effects::StartLoopNode {
frame_id: state.frame_id.clone(),
node_id,
}),
None => return Err(guard(TransitionId::AdmitNextReadyNode, GuardId::NodeExists)),
};
Ok(Outcome {
transition_id: TransitionId::AdmitNextReadyNode,
next_state: state,
effects: vec![effect],
})
}
#[cfg(test)]
fn apply_node_terminal(
state: &State,
node_id: FlowNodeId,
status: NodeRunStatus,
transition_id: TransitionId,
) -> Result<Outcome, TransitionError> {
if state.phase != Phase::Running {
return Err(refusal(
&state.phase,
match transition_id {
TransitionId::CompleteNode => InputKind::CompleteNode,
TransitionId::FailNode => InputKind::FailNode,
TransitionId::SkipNode => InputKind::SkipNode,
TransitionId::CancelNode => InputKind::CancelNode,
_ => unreachable!(),
},
));
}
let Some(current) = state.node_status.get(&node_id).copied() else {
return Err(guard(transition_id, GuardId::NodeExists));
};
if current != NodeRunStatus::Running {
return Err(guard(transition_id, GuardId::NodeRunning));
}
let mut next_state = state.clone();
next_state.node_status.insert(node_id.clone(), status);
if status == NodeRunStatus::Completed {
apply_branch_winner(&mut next_state, &node_id);
}
refresh_ready_frontier(&mut next_state);
propagate_blocked_nodes(&mut next_state);
refresh_ready_frontier(&mut next_state);
Ok(Outcome {
transition_id,
next_state,
effects: vec![Effect::NodeExecutionReleased(
effects::NodeExecutionReleased {
frame_id: state.frame_id.clone(),
node_id,
},
)],
})
}
#[cfg(test)]
fn seal_frame(state: &State, payload: inputs::SealFrame) -> Result<Outcome, TransitionError> {
if state.phase != Phase::Running {
return Err(refusal(&state.phase, InputKind::SealFrame));
}
if terminal_status_from_node_projection(state) != Some(payload.terminal_status) {
return Err(guard(TransitionId::SealFrame, GuardId::Sealable));
}
let mut next_state = state.clone();
let effect = match payload.terminal_status {
FrameTerminalStatus::Failed => {
next_state.phase = Phase::Failed;
match state.frame_scope {
FrameScope::Root => Effect::RootFrameFailed(effects::RootFrameFailed {
frame_id: state.frame_id.clone(),
}),
FrameScope::Body => Effect::BodyFrameFailed(effects::BodyFrameFailed {
frame_id: state.frame_id.clone(),
loop_instance_id: state.loop_instance_id.clone(),
iteration: state.iteration,
}),
}
}
FrameTerminalStatus::Canceled => {
next_state.phase = Phase::Canceled;
match state.frame_scope {
FrameScope::Root => Effect::RootFrameCanceled(effects::RootFrameCanceled {
frame_id: state.frame_id.clone(),
}),
FrameScope::Body => Effect::BodyFrameCanceled(effects::BodyFrameCanceled {
frame_id: state.frame_id.clone(),
loop_instance_id: state.loop_instance_id.clone(),
iteration: state.iteration,
}),
}
}
FrameTerminalStatus::Completed => {
next_state.phase = Phase::Completed;
match state.frame_scope {
FrameScope::Root => Effect::RootFrameCompleted(effects::RootFrameCompleted {
frame_id: state.frame_id.clone(),
}),
FrameScope::Body => Effect::BodyFrameCompleted(effects::BodyFrameCompleted {
frame_id: state.frame_id.clone(),
loop_instance_id: state.loop_instance_id.clone(),
iteration: state.iteration,
}),
}
}
};
Ok(Outcome {
transition_id: TransitionId::SealFrame,
next_state,
effects: vec![effect],
})
}
#[cfg(test)]
pub(crate) fn transition<C: Context>(
state: &State,
input: Input,
context: &C,
) -> Result<Outcome, TransitionError> {
let _ = context;
match input {
Input::StartRootFrame(payload) => {
if state.phase != Phase::Absent {
return Err(refusal(&state.phase, InputKind::StartRootFrame));
}
Ok(Outcome {
transition_id: TransitionId::StartRootFrame,
next_state: start_frame_state(
payload.frame_id,
FrameScope::Root,
LoopInstanceId::from(String::new()),
0,
payload.tracked_nodes,
payload.ordered_nodes,
payload.node_kind,
payload.node_dependencies,
payload.node_dependency_modes,
payload.node_branches,
),
effects: Vec::new(),
})
}
Input::StartBodyFrame(payload) => {
if state.phase != Phase::Absent {
return Err(refusal(&state.phase, InputKind::StartBodyFrame));
}
Ok(Outcome {
transition_id: TransitionId::StartBodyFrame,
next_state: start_frame_state(
payload.frame_id,
FrameScope::Body,
payload.loop_instance_id,
payload.iteration,
payload.tracked_nodes,
payload.ordered_nodes,
payload.node_kind,
payload.node_dependencies,
payload.node_dependency_modes,
payload.node_branches,
),
effects: Vec::new(),
})
}
Input::AdmitNextReadyNode(_) => admit_next_ready_node(state.clone()),
Input::CompleteNode(payload) => apply_node_terminal(
state,
payload.node_id,
NodeRunStatus::Completed,
TransitionId::CompleteNode,
),
Input::RecordNodeOutput(payload) => {
if state.phase != Phase::Running {
return Err(refusal(&state.phase, InputKind::RecordNodeOutput));
}
let mut next_state = state.clone();
if !next_state.node_status.contains_key(&payload.node_id) {
return Err(guard(TransitionId::RecordNodeOutput, GuardId::NodeExists));
}
next_state
.output_recorded
.insert(payload.node_id.clone(), true);
Ok(Outcome {
transition_id: TransitionId::RecordNodeOutput,
effects: vec![Effect::PersistStepOutput(effects::PersistStepOutput {
frame_id: state.frame_id.clone(),
node_id: payload.node_id,
})],
next_state,
})
}
Input::FailNode(payload) => apply_node_terminal(
state,
payload.node_id,
NodeRunStatus::Failed,
TransitionId::FailNode,
),
Input::SkipNode(payload) => apply_node_terminal(
state,
payload.node_id,
NodeRunStatus::Skipped,
TransitionId::SkipNode,
),
Input::CancelNode(payload) => apply_node_terminal(
state,
payload.node_id,
NodeRunStatus::Canceled,
TransitionId::CancelNode,
),
Input::SealFrame(payload) => seal_frame(state, payload),
}
}