1use std::collections::HashMap;
34
35use crate::action::{ActionOutcome, ActionValue};
36use crate::cluster::{PrimitiveKind, ValueType};
37use crate::common::{derive_intent_id, ActionEffect, EffectWrite, IntentField, IntentRecord};
38use crate::trigger::{TriggerEvent, TriggerValue};
39
40use super::types::{
41 Endpoint, ExecError, ExecutionContext, ExecutionReport, Registries, RuntimeEvent, RuntimeValue,
42 ValidatedEdge, ValidatedGraph, ValidatedNode,
43};
44
45pub fn execute(
46 graph: &ValidatedGraph,
47 registries: &Registries,
48 ctx: &ExecutionContext,
49) -> Result<ExecutionReport, ExecError> {
50 if let Some(node) = first_intent_emitting_action(graph, registries) {
51 return Err(ExecError::IntentMetadataRequired { node });
52 }
53 execute_with_metadata(graph, registries, ctx, "graph", "event")
54}
55
56pub fn execute_with_metadata(
57 graph: &ValidatedGraph,
58 registries: &Registries,
59 ctx: &ExecutionContext,
60 graph_id: &str,
61 event_id: &str,
62) -> Result<ExecutionReport, ExecError> {
63 let mut node_outputs: HashMap<String, HashMap<String, RuntimeValue>> = HashMap::new();
64 let mut effects: Vec<ActionEffect> = Vec::new();
65
66 for node_id in &graph.topo_order {
67 let node = graph
68 .nodes
69 .get(node_id)
70 .ok_or_else(|| ExecError::MissingNode {
71 node: node_id.clone(),
72 })?;
73
74 let inputs = collect_inputs(node_id, &node.inputs, &graph.edges, &node_outputs)?;
75
76 let outputs = match node.kind {
77 PrimitiveKind::Source => execute_source(node, inputs, registries, ctx)?,
78 PrimitiveKind::Compute => execute_compute(node, inputs, registries)?,
79 PrimitiveKind::Trigger => execute_trigger(node, inputs, registries)?,
80 PrimitiveKind::Action => {
81 if should_skip_action(&inputs) {
84 produce_skipped_outputs(node)
85 } else {
86 let (action_outputs, action_effects) =
87 execute_action(node, inputs, registries, graph_id, event_id)?;
88 effects.extend(action_effects);
89 action_outputs
90 }
91 }
92 };
93
94 node_outputs.insert(node_id.clone(), outputs);
95 }
96
97 let mut outputs: HashMap<String, RuntimeValue> = HashMap::new();
98 for out in &graph.boundary_outputs {
99 if let Some(node_outs) = node_outputs.get(&out.maps_to.node_id) {
100 if let Some(val) = node_outs.get(&out.maps_to.port_name) {
101 outputs.insert(out.name.clone(), val.clone());
102 } else {
103 return Err(ExecError::MissingOutput {
104 node: out.maps_to.node_id.clone(),
105 output: out.maps_to.port_name.clone(),
106 });
107 }
108 } else {
109 return Err(ExecError::MissingOutput {
110 node: out.maps_to.node_id.clone(),
111 output: out.maps_to.port_name.clone(),
112 });
113 }
114 }
115
116 Ok(ExecutionReport { outputs, effects })
117}
118
119fn collect_inputs(
120 target: &str,
121 input_specs: &[crate::cluster::InputMetadata],
122 edges: &[ValidatedEdge],
123 node_outputs: &HashMap<String, HashMap<String, RuntimeValue>>,
124) -> Result<HashMap<String, RuntimeValue>, ExecError> {
125 let mut inputs: HashMap<String, RuntimeValue> = HashMap::new();
126
127 for edge in edges {
128 let Endpoint::NodePort {
129 node_id: to_node,
130 port_name: to_port,
131 } = &edge.to;
132 if to_node == target {
133 let Endpoint::NodePort {
134 node_id: from,
135 port_name: from_port,
136 } = &edge.from;
137 let outs = node_outputs
138 .get(from)
139 .ok_or_else(|| ExecError::MissingOutput {
140 node: from.clone(),
141 output: from_port.clone(),
142 })?;
143 let val = outs
144 .get(from_port)
145 .ok_or_else(|| ExecError::MissingOutput {
146 node: from.clone(),
147 output: from_port.clone(),
148 })?;
149 inputs.insert(to_port.clone(), val.clone());
150 }
151 }
152
153 for spec in input_specs {
155 if spec.required && !inputs.contains_key(&spec.name) {
156 return Err(ExecError::MissingOutput {
157 node: target.to_string(),
158 output: spec.name.clone(),
159 });
160 }
161 }
162
163 Ok(inputs)
164}
165
166fn execute_source(
167 node: &ValidatedNode,
168 _inputs: HashMap<String, RuntimeValue>,
169 registries: &Registries,
170 ctx: &ExecutionContext,
171) -> Result<HashMap<String, RuntimeValue>, ExecError> {
172 let primitive =
173 registries
174 .sources
175 .get(&node.impl_id)
176 .ok_or_else(|| ExecError::UnknownPrimitive {
177 id: node.impl_id.clone(),
178 version: node.version.clone(),
179 })?;
180
181 let manifest = primitive.manifest();
182 let manifest_name_parameters = source_manifest_name_parameters(node, manifest);
183 for req in &manifest.requires.context {
184 let resolved_name =
187 crate::common::resolve_manifest_name(&req.name, &manifest_name_parameters).map_err(
188 |_| ExecError::MissingRequiredContextKey {
189 node: node.runtime_id.clone(),
190 key: req.name.clone(),
191 },
192 )?;
193 if !req.required {
194 continue;
195 }
196 match ctx.value(&resolved_name) {
197 None => {
198 return Err(ExecError::MissingRequiredContextKey {
199 node: node.runtime_id.clone(),
200 key: resolved_name,
201 });
202 }
203 Some(val) => {
204 if val.value_type() != req.ty {
205 return Err(ExecError::ContextKeyTypeMismatch {
206 node: node.runtime_id.clone(),
207 key: resolved_name,
208 expected: req.ty.clone(),
209 got: val.value_type(),
210 });
211 }
212 }
213 }
214 }
215
216 let mut mapped_parameters: HashMap<String, crate::source::ParameterValue> = HashMap::new();
217 for (name, val) in &node.parameters {
218 let mapped = map_to_source_parameter_value(val).ok_or_else(|| {
219 ExecError::ParameterTypeConversionFailed {
220 node: node.runtime_id.clone(),
221 parameter: name.clone(),
222 }
223 })?;
224 mapped_parameters.insert(name.clone(), mapped);
225 }
226
227 let outputs = primitive.produce(&mapped_parameters, ctx);
228 ensure_finite(&node.runtime_id, &outputs)?;
229 Ok(outputs
230 .into_iter()
231 .map(|(k, v)| (k, map_common_value(v)))
232 .collect())
233}
234
235fn execute_compute(
236 node: &ValidatedNode,
237 inputs: HashMap<String, RuntimeValue>,
238 registries: &Registries,
239) -> Result<HashMap<String, RuntimeValue>, ExecError> {
240 let primitive =
241 registries
242 .computes
243 .get(&node.impl_id)
244 .ok_or_else(|| ExecError::UnknownPrimitive {
245 id: node.impl_id.clone(),
246 version: node.version.clone(),
247 })?;
248
249 let mut mapped_inputs: HashMap<String, crate::common::Value> = HashMap::new();
250 for (name, val) in inputs {
251 let mapped = map_to_compute_value(&val).ok_or_else(|| ExecError::TypeConversionFailed {
252 node: node.runtime_id.clone(),
253 port: name.clone(),
254 })?;
255 mapped_inputs.insert(name, mapped);
256 }
257
258 let mut mapped_parameters: HashMap<String, crate::common::Value> = HashMap::new();
259 for (name, val) in &node.parameters {
260 let mapped = map_to_compute_parameter_value(val).ok_or_else(|| {
261 if let crate::cluster::ParameterValue::Int(i) = val {
263 ExecError::ParameterOutOfRange {
264 node: node.runtime_id.clone(),
265 parameter: name.clone(),
266 value: *i,
267 }
268 } else {
269 ExecError::ParameterTypeConversionFailed {
270 node: node.runtime_id.clone(),
271 parameter: name.clone(),
272 }
273 }
274 })?;
275 mapped_parameters.insert(name.clone(), mapped);
276 }
277
278 let outputs = primitive
279 .compute(&mapped_inputs, &mapped_parameters, None)
280 .map_err(|error| ExecError::ComputeFailed {
281 node: node.runtime_id.clone(),
282 id: node.impl_id.clone(),
283 version: node.version.clone(),
284 error,
285 })?;
286 for output_name in node.outputs.keys() {
287 if !outputs.contains_key(output_name) {
288 return Err(ExecError::MissingOutput {
289 node: node.runtime_id.clone(),
290 output: output_name.clone(),
291 });
292 }
293 }
294 ensure_finite(&node.runtime_id, &outputs)?;
295 Ok(outputs
296 .into_iter()
297 .map(|(k, v)| (k, map_common_value(v)))
298 .collect())
299}
300
301fn execute_trigger(
302 node: &ValidatedNode,
303 inputs: HashMap<String, RuntimeValue>,
304 registries: &Registries,
305) -> Result<HashMap<String, RuntimeValue>, ExecError> {
306 let primitive =
307 registries
308 .triggers
309 .get(&node.impl_id)
310 .ok_or_else(|| ExecError::UnknownPrimitive {
311 id: node.impl_id.clone(),
312 version: node.version.clone(),
313 })?;
314
315 let mut mapped_inputs: HashMap<String, TriggerValue> = HashMap::new();
316 for (name, val) in inputs {
317 let mapped = map_to_trigger_value(&val).ok_or_else(|| ExecError::TypeConversionFailed {
318 node: node.runtime_id.clone(),
319 port: name.clone(),
320 })?;
321 mapped_inputs.insert(name, mapped);
322 }
323
324 let mut mapped_parameters: HashMap<String, crate::trigger::ParameterValue> = HashMap::new();
325 for (name, val) in &node.parameters {
326 let mapped = map_to_trigger_parameter_value(val).ok_or_else(|| {
327 ExecError::ParameterTypeConversionFailed {
328 node: node.runtime_id.clone(),
329 parameter: name.clone(),
330 }
331 })?;
332 mapped_parameters.insert(name.clone(), mapped);
333 }
334
335 let outputs = primitive.evaluate(&mapped_inputs, &mapped_parameters);
336 Ok(outputs
337 .into_iter()
338 .map(|(k, v)| (k, map_trigger_value(v)))
339 .collect())
340}
341
342fn execute_action(
343 node: &ValidatedNode,
344 inputs: HashMap<String, RuntimeValue>,
345 registries: &Registries,
346 graph_id: &str,
347 event_id: &str,
348) -> Result<(HashMap<String, RuntimeValue>, Vec<ActionEffect>), ExecError> {
349 let primitive =
350 registries
351 .actions
352 .get(&node.impl_id)
353 .ok_or_else(|| ExecError::UnknownPrimitive {
354 id: node.impl_id.clone(),
355 version: node.version.clone(),
356 })?;
357
358 let mut mapped_inputs: HashMap<String, ActionValue> = HashMap::new();
359 for (name, val) in &inputs {
360 let mapped = map_to_action_value(val, &node.runtime_id, name)?;
361 mapped_inputs.insert(name.clone(), mapped);
362 }
363
364 let mut mapped_parameters: HashMap<String, crate::action::ParameterValue> = HashMap::new();
365 for (name, val) in &node.parameters {
366 let mapped = map_to_action_parameter_value(val).ok_or_else(|| {
367 ExecError::ParameterTypeConversionFailed {
368 node: node.runtime_id.clone(),
369 parameter: name.clone(),
370 }
371 })?;
372 mapped_parameters.insert(name.clone(), mapped);
373 }
374
375 let outputs = primitive.execute(&mapped_inputs, &mapped_parameters);
376
377 let manifest = primitive.manifest();
379 let mut writes = Vec::new();
380 for spec in &manifest.effects.writes {
381 let resolved_name = crate::common::resolve_manifest_name(&spec.name, &node.parameters)
383 .map_err(|_| ExecError::ParameterTypeConversionFailed {
384 node: node.runtime_id.clone(),
385 parameter: spec.name.clone(),
386 })?;
387
388 let input_val = inputs
390 .get(&spec.from_input)
391 .ok_or_else(|| ExecError::MissingOutput {
392 node: node.runtime_id.clone(),
393 output: spec.from_input.clone(),
394 })?;
395
396 let value = map_runtime_value_to_common(input_val).ok_or_else(|| {
397 ExecError::TypeConversionFailed {
398 node: node.runtime_id.clone(),
399 port: spec.from_input.clone(),
400 }
401 })?;
402
403 writes.push(EffectWrite {
404 key: resolved_name,
405 value,
406 });
407 }
408
409 let mut intents_by_kind: Vec<(String, Vec<IntentRecord>)> = Vec::new();
410 for (intent_ordinal, intent_spec) in manifest.effects.intents.iter().enumerate() {
411 let mut fields = Vec::new();
412 let mut field_values_by_name = HashMap::new();
413
414 for field_spec in &intent_spec.fields {
415 let value = match (
416 field_spec.from_input.as_ref(),
417 field_spec.from_param.as_ref(),
418 ) {
419 (Some(from_input), None) => {
420 let input_value =
421 mapped_inputs
422 .get(from_input)
423 .ok_or_else(|| ExecError::MissingOutput {
424 node: node.runtime_id.clone(),
425 output: from_input.clone(),
426 })?;
427 map_action_value_to_common(input_value).ok_or_else(|| {
428 ExecError::TypeConversionFailed {
429 node: node.runtime_id.clone(),
430 port: from_input.clone(),
431 }
432 })?
433 }
434 (None, Some(from_param)) => {
435 let parameter_value = mapped_parameters.get(from_param).ok_or_else(|| {
436 ExecError::ParameterTypeConversionFailed {
437 node: node.runtime_id.clone(),
438 parameter: from_param.clone(),
439 }
440 })?;
441 map_action_parameter_value_to_common(parameter_value).ok_or_else(|| {
442 ExecError::ParameterTypeConversionFailed {
443 node: node.runtime_id.clone(),
444 parameter: from_param.clone(),
445 }
446 })?
447 }
448 _ => {
449 return Err(ExecError::ParameterTypeConversionFailed {
450 node: node.runtime_id.clone(),
451 parameter: field_spec.name.clone(),
452 });
453 }
454 };
455
456 field_values_by_name.insert(field_spec.name.clone(), value.clone());
457 fields.push(IntentField {
458 name: field_spec.name.clone(),
459 value,
460 });
461 }
462
463 for mirror_write in &intent_spec.mirror_writes {
464 let resolved_name =
465 crate::common::resolve_manifest_name(&mirror_write.name, &node.parameters)
466 .map_err(|_| ExecError::ParameterTypeConversionFailed {
467 node: node.runtime_id.clone(),
468 parameter: mirror_write.name.clone(),
469 })?;
470 let mirrored_value = field_values_by_name
471 .get(&mirror_write.from_field)
472 .ok_or_else(|| ExecError::MissingOutput {
473 node: node.runtime_id.clone(),
474 output: mirror_write.from_field.clone(),
475 })?
476 .clone();
477 writes.push(EffectWrite {
478 key: resolved_name,
479 value: mirrored_value,
480 });
481 }
482
483 let intent = IntentRecord {
484 kind: intent_spec.name.clone(),
485 intent_id: derive_intent_id(
486 graph_id,
487 event_id,
488 &node.runtime_id,
489 &intent_spec.name,
490 intent_ordinal,
491 ),
492 fields,
493 };
494
495 if let Some((_, records)) = intents_by_kind
496 .iter_mut()
497 .find(|(kind, _)| kind == &intent_spec.name)
498 {
499 records.push(intent);
500 } else {
501 intents_by_kind.push((intent_spec.name.clone(), vec![intent]));
502 }
503 }
504
505 let mut effects = Vec::new();
506 if !writes.is_empty() {
507 effects.push(ActionEffect {
508 kind: "set_context".to_string(),
509 writes,
510 intents: vec![],
511 });
512 }
513
514 for (intent_kind, intents) in intents_by_kind {
515 effects.push(ActionEffect {
516 kind: intent_kind,
517 writes: vec![],
518 intents,
519 });
520 }
521
522 let runtime_outputs = outputs
523 .into_iter()
524 .map(|(k, v)| (k, map_action_value(v)))
525 .collect();
526
527 Ok((runtime_outputs, effects))
528}
529
530fn ensure_finite(
540 node: &str,
541 outputs: &HashMap<String, crate::common::Value>,
542) -> Result<(), ExecError> {
543 for (port, value) in outputs {
544 match value {
545 crate::common::Value::Number(n) if !n.is_finite() => {
546 return Err(ExecError::NonFiniteOutput {
547 node: node.to_string(),
548 port: port.to_string(),
549 });
550 }
551 crate::common::Value::Series(values)
552 if values.iter().any(|value| !value.is_finite()) =>
553 {
554 return Err(ExecError::NonFiniteOutput {
555 node: node.to_string(),
556 port: port.to_string(),
557 });
558 }
559 _ => {}
560 }
561 }
562
563 Ok(())
564}
565
566fn first_intent_emitting_action(graph: &ValidatedGraph, registries: &Registries) -> Option<String> {
567 for node_id in &graph.topo_order {
568 let Some(node) = graph.nodes.get(node_id) else {
569 continue;
570 };
571 if node.kind != PrimitiveKind::Action {
572 continue;
573 }
574
575 let Some(action) = registries.actions.get(&node.impl_id) else {
576 continue;
577 };
578 if !action.manifest().effects.intents.is_empty() {
579 return Some(node.runtime_id.clone());
580 }
581 }
582 None
583}
584
585fn map_common_value(v: crate::common::Value) -> RuntimeValue {
586 match v {
587 crate::common::Value::Number(n) => RuntimeValue::Number(n),
588 crate::common::Value::Series(s) => RuntimeValue::Series(s),
589 crate::common::Value::Bool(b) => RuntimeValue::Bool(b),
590 crate::common::Value::String(s) => RuntimeValue::String(s),
591 }
592}
593
594fn map_to_compute_value(v: &RuntimeValue) -> Option<crate::common::Value> {
595 match v {
596 RuntimeValue::Number(n) => Some(crate::common::Value::Number(*n)),
597 RuntimeValue::Series(s) => Some(crate::common::Value::Series(s.clone())),
598 RuntimeValue::Bool(b) => Some(crate::common::Value::Bool(*b)),
599 _ => None,
600 }
601}
602
603const MAX_SAFE_INT: i64 = 9_007_199_254_740_992;
605
606fn map_to_compute_parameter_value(
607 v: &crate::cluster::ParameterValue,
608) -> Option<crate::common::Value> {
609 match v {
610 crate::cluster::ParameterValue::Int(i) => {
611 if *i >= -MAX_SAFE_INT && *i <= MAX_SAFE_INT {
614 Some(crate::common::Value::Number(*i as f64))
615 } else {
616 None }
618 }
619 crate::cluster::ParameterValue::Number(n) => Some(crate::common::Value::Number(*n)),
620 crate::cluster::ParameterValue::Bool(b) => Some(crate::common::Value::Bool(*b)),
621 _ => None,
622 }
623}
624
625fn map_trigger_value(v: TriggerValue) -> RuntimeValue {
626 match v {
627 TriggerValue::Number(n) => RuntimeValue::Number(n),
628 TriggerValue::Series(s) => RuntimeValue::Series(s),
629 TriggerValue::Bool(b) => RuntimeValue::Bool(b),
630 TriggerValue::Event(e) => RuntimeValue::Event(RuntimeEvent::Trigger(e)),
631 }
632}
633
634fn map_to_trigger_value(v: &RuntimeValue) -> Option<TriggerValue> {
635 match v {
636 RuntimeValue::Number(n) => Some(TriggerValue::Number(*n)),
637 RuntimeValue::Series(s) => Some(TriggerValue::Series(s.clone())),
638 RuntimeValue::Bool(b) => Some(TriggerValue::Bool(*b)),
639 RuntimeValue::Event(RuntimeEvent::Trigger(e)) => Some(TriggerValue::Event(e.clone())),
640 _ => None,
641 }
642}
643
644fn map_to_trigger_parameter_value(
645 v: &crate::cluster::ParameterValue,
646) -> Option<crate::trigger::ParameterValue> {
647 match v {
648 crate::cluster::ParameterValue::Int(i) => Some(crate::trigger::ParameterValue::Int(*i)),
649 crate::cluster::ParameterValue::Number(n) => {
650 Some(crate::trigger::ParameterValue::Number(*n))
651 }
652 crate::cluster::ParameterValue::Bool(b) => Some(crate::trigger::ParameterValue::Bool(*b)),
653 crate::cluster::ParameterValue::String(s) => {
654 Some(crate::trigger::ParameterValue::String(s.clone()))
655 }
656 crate::cluster::ParameterValue::Enum(e) => {
657 Some(crate::trigger::ParameterValue::Enum(e.clone()))
658 }
659 }
660}
661
662fn map_action_value(v: ActionValue) -> RuntimeValue {
663 match v {
664 ActionValue::Event(e) => RuntimeValue::Event(RuntimeEvent::Action(e)),
665 ActionValue::Number(n) => RuntimeValue::Number(n),
666 ActionValue::Series(s) => RuntimeValue::Series(s),
667 ActionValue::Bool(b) => RuntimeValue::Bool(b),
668 ActionValue::String(s) => RuntimeValue::String(s),
669 }
670}
671
672fn map_to_action_value(v: &RuntimeValue, node: &str, port: &str) -> Result<ActionValue, ExecError> {
673 Ok(match v {
674 RuntimeValue::Event(RuntimeEvent::Action(e)) => ActionValue::Event(e.clone()),
675 RuntimeValue::Event(RuntimeEvent::Trigger(TriggerEvent::Emitted)) => {
676 ActionValue::Event(crate::action::ActionOutcome::Attempted)
677 }
678 RuntimeValue::Event(RuntimeEvent::Trigger(TriggerEvent::NotEmitted)) => {
679 return Err(ExecError::ActionSkipViolation {
682 node: node.to_string(),
683 port: port.to_string(),
684 });
685 }
686 RuntimeValue::Number(n) => ActionValue::Number(*n),
687 RuntimeValue::Series(s) => ActionValue::Series(s.clone()),
688 RuntimeValue::Bool(b) => ActionValue::Bool(*b),
689 RuntimeValue::String(s) => ActionValue::String(s.clone()),
690 })
691}
692
693fn map_runtime_value_to_common(v: &RuntimeValue) -> Option<crate::common::Value> {
694 match v {
695 RuntimeValue::Number(n) => Some(crate::common::Value::Number(*n)),
696 RuntimeValue::Bool(b) => Some(crate::common::Value::Bool(*b)),
697 RuntimeValue::String(s) => Some(crate::common::Value::String(s.clone())),
698 RuntimeValue::Series(s) => Some(crate::common::Value::Series(s.clone())),
699 RuntimeValue::Event(_) => None,
700 }
701}
702
703fn map_action_value_to_common(v: &ActionValue) -> Option<crate::common::Value> {
704 match v {
705 ActionValue::Number(n) => Some(crate::common::Value::Number(*n)),
706 ActionValue::Series(s) => Some(crate::common::Value::Series(s.clone())),
707 ActionValue::Bool(b) => Some(crate::common::Value::Bool(*b)),
708 ActionValue::String(s) => Some(crate::common::Value::String(s.clone())),
709 ActionValue::Event(_) => None,
710 }
711}
712
713fn map_action_parameter_value_to_common(
714 v: &crate::action::ParameterValue,
715) -> Option<crate::common::Value> {
716 match v {
717 crate::action::ParameterValue::Number(n) => Some(crate::common::Value::Number(*n)),
718 crate::action::ParameterValue::Bool(b) => Some(crate::common::Value::Bool(*b)),
719 crate::action::ParameterValue::String(s) => Some(crate::common::Value::String(s.clone())),
720 crate::action::ParameterValue::Int(_) | crate::action::ParameterValue::Enum(_) => None,
721 }
722}
723
724fn map_to_action_parameter_value(
725 v: &crate::cluster::ParameterValue,
726) -> Option<crate::action::ParameterValue> {
727 match v {
728 crate::cluster::ParameterValue::Int(i) => Some(crate::action::ParameterValue::Int(*i)),
729 crate::cluster::ParameterValue::Number(n) => {
730 Some(crate::action::ParameterValue::Number(*n))
731 }
732 crate::cluster::ParameterValue::Bool(b) => Some(crate::action::ParameterValue::Bool(*b)),
733 crate::cluster::ParameterValue::String(s) => {
734 Some(crate::action::ParameterValue::String(s.clone()))
735 }
736 crate::cluster::ParameterValue::Enum(e) => {
737 Some(crate::action::ParameterValue::Enum(e.clone()))
738 }
739 }
740}
741
742fn map_to_source_parameter_value(
743 v: &crate::cluster::ParameterValue,
744) -> Option<crate::source::ParameterValue> {
745 match v {
746 crate::cluster::ParameterValue::Int(i) => Some(crate::source::ParameterValue::Int(*i)),
747 crate::cluster::ParameterValue::Number(n) => {
748 Some(crate::source::ParameterValue::Number(*n))
749 }
750 crate::cluster::ParameterValue::Bool(b) => Some(crate::source::ParameterValue::Bool(*b)),
751 crate::cluster::ParameterValue::String(s) => {
752 Some(crate::source::ParameterValue::String(s.clone()))
753 }
754 crate::cluster::ParameterValue::Enum(e) => {
755 Some(crate::source::ParameterValue::Enum(e.clone()))
756 }
757 }
758}
759
760fn source_manifest_name_parameters(
761 node: &ValidatedNode,
762 manifest: &crate::source::SourcePrimitiveManifest,
763) -> HashMap<String, crate::cluster::ParameterValue> {
764 let mut resolved = node.parameters.clone();
765
766 for spec in &manifest.parameters {
767 if resolved.contains_key(&spec.name) {
768 continue;
769 }
770 let Some(default) = &spec.default else {
771 continue;
772 };
773 if let Some(mapped) = map_source_default_to_cluster_parameter_value(default) {
774 resolved.insert(spec.name.clone(), mapped);
775 }
776 }
777
778 resolved
779}
780
781fn map_source_default_to_cluster_parameter_value(
782 v: &crate::source::ParameterValue,
783) -> Option<crate::cluster::ParameterValue> {
784 match v {
785 crate::source::ParameterValue::Int(i) => Some(crate::cluster::ParameterValue::Int(*i)),
786 crate::source::ParameterValue::Number(n) => {
787 Some(crate::cluster::ParameterValue::Number(*n))
788 }
789 crate::source::ParameterValue::Bool(b) => Some(crate::cluster::ParameterValue::Bool(*b)),
790 crate::source::ParameterValue::String(s) => {
791 Some(crate::cluster::ParameterValue::String(s.clone()))
792 }
793 crate::source::ParameterValue::Enum(e) => {
794 Some(crate::cluster::ParameterValue::Enum(e.clone()))
795 }
796 }
797}
798
799fn should_skip_action(inputs: &HashMap<String, RuntimeValue>) -> bool {
802 inputs.values().any(|v| {
803 matches!(
804 v,
805 RuntimeValue::Event(RuntimeEvent::Trigger(TriggerEvent::NotEmitted))
806 )
807 })
808}
809
810fn produce_skipped_outputs(node: &ValidatedNode) -> HashMap<String, RuntimeValue> {
812 node.outputs
813 .iter()
814 .map(|(name, meta)| {
815 let value = match meta.value_type {
816 ValueType::Event => {
817 RuntimeValue::Event(RuntimeEvent::Action(ActionOutcome::Skipped))
818 }
819 ValueType::Number => RuntimeValue::Number(0.0),
821 ValueType::Bool => RuntimeValue::Bool(false),
822 ValueType::String => RuntimeValue::String(String::new()),
823 ValueType::Series => RuntimeValue::Series(vec![]),
824 };
825 (name.clone(), value)
826 })
827 .collect()
828}
829
830#[cfg(test)]
831mod tests {
832 use super::ensure_finite;
833 use crate::common::Value;
834
835 #[test]
836 fn num_finite_guard_rejects_nan() {
837 let outputs =
838 std::collections::HashMap::from([("result".to_string(), Value::Number(f64::NAN))]);
839 let result = ensure_finite("test_node", &outputs);
840 assert!(matches!(
841 result,
842 Err(super::ExecError::NonFiniteOutput { .. })
843 ));
844 }
845
846 #[test]
847 fn num_finite_guard_rejects_infinity() {
848 let outputs =
849 std::collections::HashMap::from([("result".to_string(), Value::Number(f64::INFINITY))]);
850 let result = ensure_finite("test_node", &outputs);
851 assert!(matches!(
852 result,
853 Err(super::ExecError::NonFiniteOutput { .. })
854 ));
855 }
856
857 #[test]
858 fn num_finite_guard_allows_finite() {
859 let outputs =
860 std::collections::HashMap::from([("result".to_string(), Value::Number(42.0))]);
861 let result = ensure_finite("test_node", &outputs);
862 assert!(result.is_ok());
863 }
864}