Skip to main content

ergo_runtime/runtime/
execute.rs

1//! runtime/execute.rs — Kernel graph execution engine
2//!
3//! Purpose:
4//! - Executes a validated expanded graph against registered primitive
5//!   implementations (sources, computes, triggers, actions) within a
6//!   supplied `ExecutionContext`.
7//!
8//! Owns:
9//! - Topological execution ordering of nodes
10//! - Value propagation along wired edges
11//! - Type conversion between runtime value domains (trigger ↔ action)
12//! - Context key injection for source nodes
13//! - Parameter binding for compute and trigger nodes
14//! - Action skip/invoke gating (R.7)
15//! - Non-finite numeric output rejection (NUM-FINITE-1)
16//!
17//! Does not own:
18//! - Graph validation (see `validate.rs`)
19//! - Primitive registration or catalog ownership (see `catalog.rs`)
20//! - Episode lifecycle or replay (see `ergo-supervisor`)
21//!
22//! Connects to:
23//! - `validate.rs` — receives `ValidatedGraph` after validation
24//! - `types.rs` — uses `ExecError`, `ExecutionReport`, `Registries`
25//! - `catalog.rs` — queries primitive metadata for parameter binding
26//!
27//! Safety notes:
28//! - All execution paths must remain deterministic for replay integrity
29//! - R.7: NotEmitted triggers must be caught by should_skip_action
30//!   before reaching action value conversion; violation returns
31//!   `ExecError::ActionSkipViolation` instead of panicking
32
33use std::collections::HashMap;
34
35use crate::action::{ActionOutcome, ActionValue};
36use crate::cluster::{PrimitiveKind, ValueType};
37use crate::common::{derive_intent_id, ActionEffect, EffectWrite, IntentField, IntentRecord};
38use crate::trigger::{TriggerEvent, TriggerValue};
39
40use super::types::{
41    Endpoint, ExecError, ExecutionContext, ExecutionReport, Registries, RuntimeEvent, RuntimeValue,
42    ValidatedEdge, ValidatedGraph, ValidatedNode,
43};
44
45pub fn execute(
46    graph: &ValidatedGraph,
47    registries: &Registries,
48    ctx: &ExecutionContext,
49) -> Result<ExecutionReport, ExecError> {
50    if let Some(node) = first_intent_emitting_action(graph, registries) {
51        return Err(ExecError::IntentMetadataRequired { node });
52    }
53    execute_with_metadata(graph, registries, ctx, "graph", "event")
54}
55
56pub fn execute_with_metadata(
57    graph: &ValidatedGraph,
58    registries: &Registries,
59    ctx: &ExecutionContext,
60    graph_id: &str,
61    event_id: &str,
62) -> Result<ExecutionReport, ExecError> {
63    let mut node_outputs: HashMap<String, HashMap<String, RuntimeValue>> = HashMap::new();
64    let mut effects: Vec<ActionEffect> = Vec::new();
65
66    for node_id in &graph.topo_order {
67        let node = graph
68            .nodes
69            .get(node_id)
70            .ok_or_else(|| ExecError::MissingNode {
71                node: node_id.clone(),
72            })?;
73
74        let inputs = collect_inputs(node_id, &node.inputs, &graph.edges, &node_outputs)?;
75
76        let outputs = match node.kind {
77            PrimitiveKind::Source => execute_source(node, inputs, registries, ctx)?,
78            PrimitiveKind::Compute => execute_compute(node, inputs, registries)?,
79            PrimitiveKind::Trigger => execute_trigger(node, inputs, registries)?,
80            PrimitiveKind::Action => {
81                // R.7: Actions execute only when all trigger event inputs are Emitted.
82                // If any Event input is TriggerEvent::NotEmitted, skip execution.
83                if should_skip_action(&inputs) {
84                    produce_skipped_outputs(node)
85                } else {
86                    let (action_outputs, action_effects) =
87                        execute_action(node, inputs, registries, graph_id, event_id)?;
88                    effects.extend(action_effects);
89                    action_outputs
90                }
91            }
92        };
93
94        node_outputs.insert(node_id.clone(), outputs);
95    }
96
97    let mut outputs: HashMap<String, RuntimeValue> = HashMap::new();
98    for out in &graph.boundary_outputs {
99        if let Some(node_outs) = node_outputs.get(&out.maps_to.node_id) {
100            if let Some(val) = node_outs.get(&out.maps_to.port_name) {
101                outputs.insert(out.name.clone(), val.clone());
102            } else {
103                return Err(ExecError::MissingOutput {
104                    node: out.maps_to.node_id.clone(),
105                    output: out.maps_to.port_name.clone(),
106                });
107            }
108        } else {
109            return Err(ExecError::MissingOutput {
110                node: out.maps_to.node_id.clone(),
111                output: out.maps_to.port_name.clone(),
112            });
113        }
114    }
115
116    Ok(ExecutionReport { outputs, effects })
117}
118
119fn collect_inputs(
120    target: &str,
121    input_specs: &[crate::cluster::InputMetadata],
122    edges: &[ValidatedEdge],
123    node_outputs: &HashMap<String, HashMap<String, RuntimeValue>>,
124) -> Result<HashMap<String, RuntimeValue>, ExecError> {
125    let mut inputs: HashMap<String, RuntimeValue> = HashMap::new();
126
127    for edge in edges {
128        let Endpoint::NodePort {
129            node_id: to_node,
130            port_name: to_port,
131        } = &edge.to;
132        if to_node == target {
133            let Endpoint::NodePort {
134                node_id: from,
135                port_name: from_port,
136            } = &edge.from;
137            let outs = node_outputs
138                .get(from)
139                .ok_or_else(|| ExecError::MissingOutput {
140                    node: from.clone(),
141                    output: from_port.clone(),
142                })?;
143            let val = outs
144                .get(from_port)
145                .ok_or_else(|| ExecError::MissingOutput {
146                    node: from.clone(),
147                    output: from_port.clone(),
148                })?;
149            inputs.insert(to_port.clone(), val.clone());
150        }
151    }
152
153    // fill missing required
154    for spec in input_specs {
155        if spec.required && !inputs.contains_key(&spec.name) {
156            return Err(ExecError::MissingOutput {
157                node: target.to_string(),
158                output: spec.name.clone(),
159            });
160        }
161    }
162
163    Ok(inputs)
164}
165
166fn execute_source(
167    node: &ValidatedNode,
168    _inputs: HashMap<String, RuntimeValue>,
169    registries: &Registries,
170    ctx: &ExecutionContext,
171) -> Result<HashMap<String, RuntimeValue>, ExecError> {
172    let primitive =
173        registries
174            .sources
175            .get(&node.impl_id)
176            .ok_or_else(|| ExecError::UnknownPrimitive {
177                id: node.impl_id.clone(),
178                version: node.version.clone(),
179            })?;
180
181    let manifest = primitive.manifest();
182    let manifest_name_parameters = source_manifest_name_parameters(node, manifest);
183    for req in &manifest.requires.context {
184        // Resolve $key bindings before the required check so optional
185        // parameter-bound keys are still resolved.
186        let resolved_name =
187            crate::common::resolve_manifest_name(&req.name, &manifest_name_parameters).map_err(
188                |_| ExecError::MissingRequiredContextKey {
189                    node: node.runtime_id.clone(),
190                    key: req.name.clone(),
191                },
192            )?;
193        if !req.required {
194            continue;
195        }
196        match ctx.value(&resolved_name) {
197            None => {
198                return Err(ExecError::MissingRequiredContextKey {
199                    node: node.runtime_id.clone(),
200                    key: resolved_name,
201                });
202            }
203            Some(val) => {
204                if val.value_type() != req.ty {
205                    return Err(ExecError::ContextKeyTypeMismatch {
206                        node: node.runtime_id.clone(),
207                        key: resolved_name,
208                        expected: req.ty.clone(),
209                        got: val.value_type(),
210                    });
211                }
212            }
213        }
214    }
215
216    let mut mapped_parameters: HashMap<String, crate::source::ParameterValue> = HashMap::new();
217    for (name, val) in &node.parameters {
218        let mapped = map_to_source_parameter_value(val).ok_or_else(|| {
219            ExecError::ParameterTypeConversionFailed {
220                node: node.runtime_id.clone(),
221                parameter: name.clone(),
222            }
223        })?;
224        mapped_parameters.insert(name.clone(), mapped);
225    }
226
227    let outputs = primitive.produce(&mapped_parameters, ctx);
228    ensure_finite(&node.runtime_id, &outputs)?;
229    Ok(outputs
230        .into_iter()
231        .map(|(k, v)| (k, map_common_value(v)))
232        .collect())
233}
234
235fn execute_compute(
236    node: &ValidatedNode,
237    inputs: HashMap<String, RuntimeValue>,
238    registries: &Registries,
239) -> Result<HashMap<String, RuntimeValue>, ExecError> {
240    let primitive =
241        registries
242            .computes
243            .get(&node.impl_id)
244            .ok_or_else(|| ExecError::UnknownPrimitive {
245                id: node.impl_id.clone(),
246                version: node.version.clone(),
247            })?;
248
249    let mut mapped_inputs: HashMap<String, crate::common::Value> = HashMap::new();
250    for (name, val) in inputs {
251        let mapped = map_to_compute_value(&val).ok_or_else(|| ExecError::TypeConversionFailed {
252            node: node.runtime_id.clone(),
253            port: name.clone(),
254        })?;
255        mapped_inputs.insert(name, mapped);
256    }
257
258    let mut mapped_parameters: HashMap<String, crate::common::Value> = HashMap::new();
259    for (name, val) in &node.parameters {
260        let mapped = map_to_compute_parameter_value(val).ok_or_else(|| {
261            // X.11: If Int conversion failed, it's out of range; otherwise type mismatch
262            if let crate::cluster::ParameterValue::Int(i) = val {
263                ExecError::ParameterOutOfRange {
264                    node: node.runtime_id.clone(),
265                    parameter: name.clone(),
266                    value: *i,
267                }
268            } else {
269                ExecError::ParameterTypeConversionFailed {
270                    node: node.runtime_id.clone(),
271                    parameter: name.clone(),
272                }
273            }
274        })?;
275        mapped_parameters.insert(name.clone(), mapped);
276    }
277
278    let outputs = primitive
279        .compute(&mapped_inputs, &mapped_parameters, None)
280        .map_err(|error| ExecError::ComputeFailed {
281            node: node.runtime_id.clone(),
282            id: node.impl_id.clone(),
283            version: node.version.clone(),
284            error,
285        })?;
286    for output_name in node.outputs.keys() {
287        if !outputs.contains_key(output_name) {
288            return Err(ExecError::MissingOutput {
289                node: node.runtime_id.clone(),
290                output: output_name.clone(),
291            });
292        }
293    }
294    ensure_finite(&node.runtime_id, &outputs)?;
295    Ok(outputs
296        .into_iter()
297        .map(|(k, v)| (k, map_common_value(v)))
298        .collect())
299}
300
301fn execute_trigger(
302    node: &ValidatedNode,
303    inputs: HashMap<String, RuntimeValue>,
304    registries: &Registries,
305) -> Result<HashMap<String, RuntimeValue>, ExecError> {
306    let primitive =
307        registries
308            .triggers
309            .get(&node.impl_id)
310            .ok_or_else(|| ExecError::UnknownPrimitive {
311                id: node.impl_id.clone(),
312                version: node.version.clone(),
313            })?;
314
315    let mut mapped_inputs: HashMap<String, TriggerValue> = HashMap::new();
316    for (name, val) in inputs {
317        let mapped = map_to_trigger_value(&val).ok_or_else(|| ExecError::TypeConversionFailed {
318            node: node.runtime_id.clone(),
319            port: name.clone(),
320        })?;
321        mapped_inputs.insert(name, mapped);
322    }
323
324    let mut mapped_parameters: HashMap<String, crate::trigger::ParameterValue> = HashMap::new();
325    for (name, val) in &node.parameters {
326        let mapped = map_to_trigger_parameter_value(val).ok_or_else(|| {
327            ExecError::ParameterTypeConversionFailed {
328                node: node.runtime_id.clone(),
329                parameter: name.clone(),
330            }
331        })?;
332        mapped_parameters.insert(name.clone(), mapped);
333    }
334
335    let outputs = primitive.evaluate(&mapped_inputs, &mapped_parameters);
336    Ok(outputs
337        .into_iter()
338        .map(|(k, v)| (k, map_trigger_value(v)))
339        .collect())
340}
341
342fn execute_action(
343    node: &ValidatedNode,
344    inputs: HashMap<String, RuntimeValue>,
345    registries: &Registries,
346    graph_id: &str,
347    event_id: &str,
348) -> Result<(HashMap<String, RuntimeValue>, Vec<ActionEffect>), ExecError> {
349    let primitive =
350        registries
351            .actions
352            .get(&node.impl_id)
353            .ok_or_else(|| ExecError::UnknownPrimitive {
354                id: node.impl_id.clone(),
355                version: node.version.clone(),
356            })?;
357
358    let mut mapped_inputs: HashMap<String, ActionValue> = HashMap::new();
359    for (name, val) in &inputs {
360        let mapped = map_to_action_value(val, &node.runtime_id, name)?;
361        mapped_inputs.insert(name.clone(), mapped);
362    }
363
364    let mut mapped_parameters: HashMap<String, crate::action::ParameterValue> = HashMap::new();
365    for (name, val) in &node.parameters {
366        let mapped = map_to_action_parameter_value(val).ok_or_else(|| {
367            ExecError::ParameterTypeConversionFailed {
368                node: node.runtime_id.clone(),
369                parameter: name.clone(),
370            }
371        })?;
372        mapped_parameters.insert(name.clone(), mapped);
373    }
374
375    let outputs = primitive.execute(&mapped_inputs, &mapped_parameters);
376
377    // Build effects from manifest write declarations and v1 intent declarations.
378    let manifest = primitive.manifest();
379    let mut writes = Vec::new();
380    for spec in &manifest.effects.writes {
381        // Resolve $key via Decision 2 infrastructure
382        let resolved_name = crate::common::resolve_manifest_name(&spec.name, &node.parameters)
383            .map_err(|_| ExecError::ParameterTypeConversionFailed {
384                node: node.runtime_id.clone(),
385                parameter: spec.name.clone(),
386            })?;
387
388        // Read the write value from the action input snapshot
389        let input_val = inputs
390            .get(&spec.from_input)
391            .ok_or_else(|| ExecError::MissingOutput {
392                node: node.runtime_id.clone(),
393                output: spec.from_input.clone(),
394            })?;
395
396        let value = map_runtime_value_to_common(input_val).ok_or_else(|| {
397            ExecError::TypeConversionFailed {
398                node: node.runtime_id.clone(),
399                port: spec.from_input.clone(),
400            }
401        })?;
402
403        writes.push(EffectWrite {
404            key: resolved_name,
405            value,
406        });
407    }
408
409    let mut intents_by_kind: Vec<(String, Vec<IntentRecord>)> = Vec::new();
410    for (intent_ordinal, intent_spec) in manifest.effects.intents.iter().enumerate() {
411        let mut fields = Vec::new();
412        let mut field_values_by_name = HashMap::new();
413
414        for field_spec in &intent_spec.fields {
415            let value = match (
416                field_spec.from_input.as_ref(),
417                field_spec.from_param.as_ref(),
418            ) {
419                (Some(from_input), None) => {
420                    let input_value =
421                        mapped_inputs
422                            .get(from_input)
423                            .ok_or_else(|| ExecError::MissingOutput {
424                                node: node.runtime_id.clone(),
425                                output: from_input.clone(),
426                            })?;
427                    map_action_value_to_common(input_value).ok_or_else(|| {
428                        ExecError::TypeConversionFailed {
429                            node: node.runtime_id.clone(),
430                            port: from_input.clone(),
431                        }
432                    })?
433                }
434                (None, Some(from_param)) => {
435                    let parameter_value = mapped_parameters.get(from_param).ok_or_else(|| {
436                        ExecError::ParameterTypeConversionFailed {
437                            node: node.runtime_id.clone(),
438                            parameter: from_param.clone(),
439                        }
440                    })?;
441                    map_action_parameter_value_to_common(parameter_value).ok_or_else(|| {
442                        ExecError::ParameterTypeConversionFailed {
443                            node: node.runtime_id.clone(),
444                            parameter: from_param.clone(),
445                        }
446                    })?
447                }
448                _ => {
449                    return Err(ExecError::ParameterTypeConversionFailed {
450                        node: node.runtime_id.clone(),
451                        parameter: field_spec.name.clone(),
452                    });
453                }
454            };
455
456            field_values_by_name.insert(field_spec.name.clone(), value.clone());
457            fields.push(IntentField {
458                name: field_spec.name.clone(),
459                value,
460            });
461        }
462
463        for mirror_write in &intent_spec.mirror_writes {
464            let resolved_name =
465                crate::common::resolve_manifest_name(&mirror_write.name, &node.parameters)
466                    .map_err(|_| ExecError::ParameterTypeConversionFailed {
467                        node: node.runtime_id.clone(),
468                        parameter: mirror_write.name.clone(),
469                    })?;
470            let mirrored_value = field_values_by_name
471                .get(&mirror_write.from_field)
472                .ok_or_else(|| ExecError::MissingOutput {
473                    node: node.runtime_id.clone(),
474                    output: mirror_write.from_field.clone(),
475                })?
476                .clone();
477            writes.push(EffectWrite {
478                key: resolved_name,
479                value: mirrored_value,
480            });
481        }
482
483        let intent = IntentRecord {
484            kind: intent_spec.name.clone(),
485            intent_id: derive_intent_id(
486                graph_id,
487                event_id,
488                &node.runtime_id,
489                &intent_spec.name,
490                intent_ordinal,
491            ),
492            fields,
493        };
494
495        if let Some((_, records)) = intents_by_kind
496            .iter_mut()
497            .find(|(kind, _)| kind == &intent_spec.name)
498        {
499            records.push(intent);
500        } else {
501            intents_by_kind.push((intent_spec.name.clone(), vec![intent]));
502        }
503    }
504
505    let mut effects = Vec::new();
506    if !writes.is_empty() {
507        effects.push(ActionEffect {
508            kind: "set_context".to_string(),
509            writes,
510            intents: vec![],
511        });
512    }
513
514    for (intent_kind, intents) in intents_by_kind {
515        effects.push(ActionEffect {
516            kind: intent_kind,
517            writes: vec![],
518            intents,
519        });
520    }
521
522    let runtime_outputs = outputs
523        .into_iter()
524        .map(|(k, v)| (k, map_action_value(v)))
525        .collect();
526
527    Ok((runtime_outputs, effects))
528}
529
530/// NUM-FINITE-1: Reject non-finite numeric outputs before propagation.
531///
532/// This guard ensures that NaN, inf, and -inf cannot reach triggers or actions,
533/// preventing counterintuitive behavior (e.g., NaN comparisons always return false).
534///
535/// Called after compute and source outputs are produced, before values enter
536/// the node_outputs map.
537///
538/// See: NUM-FINITE-1 in PHASE_INVARIANTS.md
539fn ensure_finite(
540    node: &str,
541    outputs: &HashMap<String, crate::common::Value>,
542) -> Result<(), ExecError> {
543    for (port, value) in outputs {
544        match value {
545            crate::common::Value::Number(n) if !n.is_finite() => {
546                return Err(ExecError::NonFiniteOutput {
547                    node: node.to_string(),
548                    port: port.to_string(),
549                });
550            }
551            crate::common::Value::Series(values)
552                if values.iter().any(|value| !value.is_finite()) =>
553            {
554                return Err(ExecError::NonFiniteOutput {
555                    node: node.to_string(),
556                    port: port.to_string(),
557                });
558            }
559            _ => {}
560        }
561    }
562
563    Ok(())
564}
565
566fn first_intent_emitting_action(graph: &ValidatedGraph, registries: &Registries) -> Option<String> {
567    for node_id in &graph.topo_order {
568        let Some(node) = graph.nodes.get(node_id) else {
569            continue;
570        };
571        if node.kind != PrimitiveKind::Action {
572            continue;
573        }
574
575        let Some(action) = registries.actions.get(&node.impl_id) else {
576            continue;
577        };
578        if !action.manifest().effects.intents.is_empty() {
579            return Some(node.runtime_id.clone());
580        }
581    }
582    None
583}
584
585fn map_common_value(v: crate::common::Value) -> RuntimeValue {
586    match v {
587        crate::common::Value::Number(n) => RuntimeValue::Number(n),
588        crate::common::Value::Series(s) => RuntimeValue::Series(s),
589        crate::common::Value::Bool(b) => RuntimeValue::Bool(b),
590        crate::common::Value::String(s) => RuntimeValue::String(s),
591    }
592}
593
594fn map_to_compute_value(v: &RuntimeValue) -> Option<crate::common::Value> {
595    match v {
596        RuntimeValue::Number(n) => Some(crate::common::Value::Number(*n)),
597        RuntimeValue::Series(s) => Some(crate::common::Value::Series(s.clone())),
598        RuntimeValue::Bool(b) => Some(crate::common::Value::Bool(*b)),
599        _ => None,
600    }
601}
602
603/// X.11: Maximum safe integer for exact f64 representation (2^53).
604const MAX_SAFE_INT: i64 = 9_007_199_254_740_992;
605
606fn map_to_compute_parameter_value(
607    v: &crate::cluster::ParameterValue,
608) -> Option<crate::common::Value> {
609    match v {
610        crate::cluster::ParameterValue::Int(i) => {
611            // X.11: Guard against precision loss for |i| > 2^53
612            // Note: Use explicit bounds instead of i.abs() to avoid overflow panic on i64::MIN
613            if *i >= -MAX_SAFE_INT && *i <= MAX_SAFE_INT {
614                Some(crate::common::Value::Number(*i as f64))
615            } else {
616                None // Caller will produce ParameterOutOfRange with full context
617            }
618        }
619        crate::cluster::ParameterValue::Number(n) => Some(crate::common::Value::Number(*n)),
620        crate::cluster::ParameterValue::Bool(b) => Some(crate::common::Value::Bool(*b)),
621        _ => None,
622    }
623}
624
625fn map_trigger_value(v: TriggerValue) -> RuntimeValue {
626    match v {
627        TriggerValue::Number(n) => RuntimeValue::Number(n),
628        TriggerValue::Series(s) => RuntimeValue::Series(s),
629        TriggerValue::Bool(b) => RuntimeValue::Bool(b),
630        TriggerValue::Event(e) => RuntimeValue::Event(RuntimeEvent::Trigger(e)),
631    }
632}
633
634fn map_to_trigger_value(v: &RuntimeValue) -> Option<TriggerValue> {
635    match v {
636        RuntimeValue::Number(n) => Some(TriggerValue::Number(*n)),
637        RuntimeValue::Series(s) => Some(TriggerValue::Series(s.clone())),
638        RuntimeValue::Bool(b) => Some(TriggerValue::Bool(*b)),
639        RuntimeValue::Event(RuntimeEvent::Trigger(e)) => Some(TriggerValue::Event(e.clone())),
640        _ => None,
641    }
642}
643
644fn map_to_trigger_parameter_value(
645    v: &crate::cluster::ParameterValue,
646) -> Option<crate::trigger::ParameterValue> {
647    match v {
648        crate::cluster::ParameterValue::Int(i) => Some(crate::trigger::ParameterValue::Int(*i)),
649        crate::cluster::ParameterValue::Number(n) => {
650            Some(crate::trigger::ParameterValue::Number(*n))
651        }
652        crate::cluster::ParameterValue::Bool(b) => Some(crate::trigger::ParameterValue::Bool(*b)),
653        crate::cluster::ParameterValue::String(s) => {
654            Some(crate::trigger::ParameterValue::String(s.clone()))
655        }
656        crate::cluster::ParameterValue::Enum(e) => {
657            Some(crate::trigger::ParameterValue::Enum(e.clone()))
658        }
659    }
660}
661
662fn map_action_value(v: ActionValue) -> RuntimeValue {
663    match v {
664        ActionValue::Event(e) => RuntimeValue::Event(RuntimeEvent::Action(e)),
665        ActionValue::Number(n) => RuntimeValue::Number(n),
666        ActionValue::Series(s) => RuntimeValue::Series(s),
667        ActionValue::Bool(b) => RuntimeValue::Bool(b),
668        ActionValue::String(s) => RuntimeValue::String(s),
669    }
670}
671
672fn map_to_action_value(v: &RuntimeValue, node: &str, port: &str) -> Result<ActionValue, ExecError> {
673    Ok(match v {
674        RuntimeValue::Event(RuntimeEvent::Action(e)) => ActionValue::Event(e.clone()),
675        RuntimeValue::Event(RuntimeEvent::Trigger(TriggerEvent::Emitted)) => {
676            ActionValue::Event(crate::action::ActionOutcome::Attempted)
677        }
678        RuntimeValue::Event(RuntimeEvent::Trigger(TriggerEvent::NotEmitted)) => {
679            // R.7: should_skip_action() must catch NotEmitted before this point.
680            // Return an error instead of panicking to maintain kernel determinism.
681            return Err(ExecError::ActionSkipViolation {
682                node: node.to_string(),
683                port: port.to_string(),
684            });
685        }
686        RuntimeValue::Number(n) => ActionValue::Number(*n),
687        RuntimeValue::Series(s) => ActionValue::Series(s.clone()),
688        RuntimeValue::Bool(b) => ActionValue::Bool(*b),
689        RuntimeValue::String(s) => ActionValue::String(s.clone()),
690    })
691}
692
693fn map_runtime_value_to_common(v: &RuntimeValue) -> Option<crate::common::Value> {
694    match v {
695        RuntimeValue::Number(n) => Some(crate::common::Value::Number(*n)),
696        RuntimeValue::Bool(b) => Some(crate::common::Value::Bool(*b)),
697        RuntimeValue::String(s) => Some(crate::common::Value::String(s.clone())),
698        RuntimeValue::Series(s) => Some(crate::common::Value::Series(s.clone())),
699        RuntimeValue::Event(_) => None,
700    }
701}
702
703fn map_action_value_to_common(v: &ActionValue) -> Option<crate::common::Value> {
704    match v {
705        ActionValue::Number(n) => Some(crate::common::Value::Number(*n)),
706        ActionValue::Series(s) => Some(crate::common::Value::Series(s.clone())),
707        ActionValue::Bool(b) => Some(crate::common::Value::Bool(*b)),
708        ActionValue::String(s) => Some(crate::common::Value::String(s.clone())),
709        ActionValue::Event(_) => None,
710    }
711}
712
713fn map_action_parameter_value_to_common(
714    v: &crate::action::ParameterValue,
715) -> Option<crate::common::Value> {
716    match v {
717        crate::action::ParameterValue::Number(n) => Some(crate::common::Value::Number(*n)),
718        crate::action::ParameterValue::Bool(b) => Some(crate::common::Value::Bool(*b)),
719        crate::action::ParameterValue::String(s) => Some(crate::common::Value::String(s.clone())),
720        crate::action::ParameterValue::Int(_) | crate::action::ParameterValue::Enum(_) => None,
721    }
722}
723
724fn map_to_action_parameter_value(
725    v: &crate::cluster::ParameterValue,
726) -> Option<crate::action::ParameterValue> {
727    match v {
728        crate::cluster::ParameterValue::Int(i) => Some(crate::action::ParameterValue::Int(*i)),
729        crate::cluster::ParameterValue::Number(n) => {
730            Some(crate::action::ParameterValue::Number(*n))
731        }
732        crate::cluster::ParameterValue::Bool(b) => Some(crate::action::ParameterValue::Bool(*b)),
733        crate::cluster::ParameterValue::String(s) => {
734            Some(crate::action::ParameterValue::String(s.clone()))
735        }
736        crate::cluster::ParameterValue::Enum(e) => {
737            Some(crate::action::ParameterValue::Enum(e.clone()))
738        }
739    }
740}
741
742fn map_to_source_parameter_value(
743    v: &crate::cluster::ParameterValue,
744) -> Option<crate::source::ParameterValue> {
745    match v {
746        crate::cluster::ParameterValue::Int(i) => Some(crate::source::ParameterValue::Int(*i)),
747        crate::cluster::ParameterValue::Number(n) => {
748            Some(crate::source::ParameterValue::Number(*n))
749        }
750        crate::cluster::ParameterValue::Bool(b) => Some(crate::source::ParameterValue::Bool(*b)),
751        crate::cluster::ParameterValue::String(s) => {
752            Some(crate::source::ParameterValue::String(s.clone()))
753        }
754        crate::cluster::ParameterValue::Enum(e) => {
755            Some(crate::source::ParameterValue::Enum(e.clone()))
756        }
757    }
758}
759
760fn source_manifest_name_parameters(
761    node: &ValidatedNode,
762    manifest: &crate::source::SourcePrimitiveManifest,
763) -> HashMap<String, crate::cluster::ParameterValue> {
764    let mut resolved = node.parameters.clone();
765
766    for spec in &manifest.parameters {
767        if resolved.contains_key(&spec.name) {
768            continue;
769        }
770        let Some(default) = &spec.default else {
771            continue;
772        };
773        if let Some(mapped) = map_source_default_to_cluster_parameter_value(default) {
774            resolved.insert(spec.name.clone(), mapped);
775        }
776    }
777
778    resolved
779}
780
781fn map_source_default_to_cluster_parameter_value(
782    v: &crate::source::ParameterValue,
783) -> Option<crate::cluster::ParameterValue> {
784    match v {
785        crate::source::ParameterValue::Int(i) => Some(crate::cluster::ParameterValue::Int(*i)),
786        crate::source::ParameterValue::Number(n) => {
787            Some(crate::cluster::ParameterValue::Number(*n))
788        }
789        crate::source::ParameterValue::Bool(b) => Some(crate::cluster::ParameterValue::Bool(*b)),
790        crate::source::ParameterValue::String(s) => {
791            Some(crate::cluster::ParameterValue::String(s.clone()))
792        }
793        crate::source::ParameterValue::Enum(e) => {
794            Some(crate::cluster::ParameterValue::Enum(e.clone()))
795        }
796    }
797}
798
799/// R.7 gating: Returns true if any Event input is TriggerEvent::NotEmitted.
800/// Uses AND semantics: all trigger events must be Emitted for action to execute.
801fn should_skip_action(inputs: &HashMap<String, RuntimeValue>) -> bool {
802    inputs.values().any(|v| {
803        matches!(
804            v,
805            RuntimeValue::Event(RuntimeEvent::Trigger(TriggerEvent::NotEmitted))
806        )
807    })
808}
809
810/// Produce outputs for a skipped action. Event outputs get ActionOutcome::Skipped.
811fn produce_skipped_outputs(node: &ValidatedNode) -> HashMap<String, RuntimeValue> {
812    node.outputs
813        .iter()
814        .map(|(name, meta)| {
815            let value = match meta.value_type {
816                ValueType::Event => {
817                    RuntimeValue::Event(RuntimeEvent::Action(ActionOutcome::Skipped))
818                }
819                // Non-event outputs use sensible defaults (actions are terminal per F.2).
820                ValueType::Number => RuntimeValue::Number(0.0),
821                ValueType::Bool => RuntimeValue::Bool(false),
822                ValueType::String => RuntimeValue::String(String::new()),
823                ValueType::Series => RuntimeValue::Series(vec![]),
824            };
825            (name.clone(), value)
826        })
827        .collect()
828}
829
830#[cfg(test)]
831mod tests {
832    use super::ensure_finite;
833    use crate::common::Value;
834
835    #[test]
836    fn num_finite_guard_rejects_nan() {
837        let outputs =
838            std::collections::HashMap::from([("result".to_string(), Value::Number(f64::NAN))]);
839        let result = ensure_finite("test_node", &outputs);
840        assert!(matches!(
841            result,
842            Err(super::ExecError::NonFiniteOutput { .. })
843        ));
844    }
845
846    #[test]
847    fn num_finite_guard_rejects_infinity() {
848        let outputs =
849            std::collections::HashMap::from([("result".to_string(), Value::Number(f64::INFINITY))]);
850        let result = ensure_finite("test_node", &outputs);
851        assert!(matches!(
852            result,
853            Err(super::ExecError::NonFiniteOutput { .. })
854        ));
855    }
856
857    #[test]
858    fn num_finite_guard_allows_finite() {
859        let outputs =
860            std::collections::HashMap::from([("result".to_string(), Value::Number(42.0))]);
861        let result = ensure_finite("test_node", &outputs);
862        assert!(result.is_ok());
863    }
864}