Skip to main content

stepflow_flow/values/
value_expr.rs

1// Copyright 2025 DataStax Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4// in compliance with the License. You may obtain a copy of the License at
5//
6//     http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software distributed under the License
9// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10// or implied. See the License for the specific language governing permissions and limitations under
11// the License.
12
13use bit_set::BitSet;
14
15use super::{JsonPath, PathPart, StepContext, ValueRef};
16use crate::FlowResult;
17use serde_json::Value;
18
19/// A value expression that can contain literal data or references to other values.
20///
21/// This is the unified type for all workflow inputs, outputs, and data flow.
22/// Expressions can be:
23/// - References to other values (`$step`, `$input`, `$variable`)
24/// - Composable structures (arrays and objects containing expressions)
25/// - Literal values (null, bool, number, string - any primitive JSON value)
26/// - Escaped literals (`$literal`) to prevent expansion
27//
28// Serialization and deserialization are implemented in expr_serde.rs
29// JsonSchema is manually implemented to match the actual wire format
30#[derive(Debug, Clone, Eq, PartialEq, Hash)]
31pub enum ValueExpr {
32    /// Step reference: `{ $step: "step_id", path: "optional.path" }`
33    Step { step: String, path: JsonPath },
34
35    /// Workflow input: `{ $input: "path" }` where path can be "$" for root
36    Input {
37        input: JsonPath, // The path is the value (supports shorthand)
38    },
39
40    /// Variable: `{ $variable: "$.var.path", default: ... }`
41    Variable {
42        variable: JsonPath, // JSONPath including variable name and path
43        default: Option<Box<ValueExpr>>,
44    },
45
46    /// Escape hatch: `{ $literal: {...} }` - prevents recursive parsing
47    EscapedLiteral { literal: serde_json::Value },
48
49    /// Conditional expression: `{ $if: <condition>, then: <expr>, else?: <expr> }`
50    /// Returns `then` value if condition is truthy, otherwise `else` value (defaults to null)
51    If {
52        condition: Box<ValueExpr>,
53        then: Box<ValueExpr>,
54        else_expr: Option<Box<ValueExpr>>,
55    },
56
57    /// Coalesce: `{ $coalesce: [<expr1>, <expr2>, ...] }`
58    /// Returns first non-skipped, non-null value from the list
59    Coalesce { values: Vec<ValueExpr> },
60
61    /// JSON array where each element can be an expression
62    Array(Vec<ValueExpr>),
63
64    /// JSON object where each value can be an expression
65    /// Uses Vec instead of Map for efficiency and to enable hashability
66    Object(Vec<(String, ValueExpr)>),
67
68    /// Literal JSON value (null, bool, number, string)
69    /// Note: Literal objects and arrays are parsed as Object/Array variants
70    Literal(serde_json::Value),
71}
72
73impl ValueExpr {
74    /// Create a step reference expression
75    pub fn step(step_id: impl Into<String>, path: JsonPath) -> Self {
76        ValueExpr::Step {
77            step: step_id.into(),
78            path,
79        }
80    }
81
82    /// Create a step reference without a path (for compatibility with old code)
83    pub fn step_output(step_id: impl Into<String>) -> Self {
84        ValueExpr::Step {
85            step: step_id.into(),
86            path: JsonPath::default(),
87        }
88    }
89
90    /// Create a workflow input reference expression
91    pub fn workflow_input(path: JsonPath) -> Self {
92        ValueExpr::Input { input: path }
93    }
94
95    /// Create a variable reference expression
96    pub fn variable(name: impl Into<String>, default: Option<Box<ValueExpr>>) -> Self {
97        ValueExpr::Variable {
98            variable: JsonPath::from(name.into()),
99            default,
100        }
101    }
102
103    /// Create a literal value expression from a serde_json::Value
104    ///
105    /// Arrays and objects are recursively converted to composable ValueExpr structures.
106    /// Primitives (null, bool, number, string) become Literal variants.
107    pub fn literal(value: serde_json::Value) -> Self {
108        match value {
109            Value::Array(arr) => {
110                let exprs = arr.into_iter().map(ValueExpr::literal).collect();
111                ValueExpr::Array(exprs)
112            }
113            Value::Object(obj) => {
114                let exprs = obj
115                    .into_iter()
116                    .map(|(k, v)| (k, ValueExpr::literal(v)))
117                    .collect();
118                ValueExpr::Object(exprs)
119            }
120            // All primitives (null, bool, number, string) become Literal
121            primitive => ValueExpr::Literal(primitive),
122        }
123    }
124
125    /// Create an array value expression
126    pub fn array(values: Vec<ValueExpr>) -> Self {
127        ValueExpr::Array(values)
128    }
129
130    /// Create an object value expression from key-value pairs
131    pub fn object(values: Vec<(String, ValueExpr)>) -> Self {
132        ValueExpr::Object(values)
133    }
134
135    /// Create an escaped literal expression
136    pub fn escaped_literal(value: serde_json::Value) -> Self {
137        ValueExpr::EscapedLiteral { literal: value }
138    }
139
140    /// Create a conditional expression
141    pub fn if_expr(condition: ValueExpr, then: ValueExpr, else_expr: Option<ValueExpr>) -> Self {
142        ValueExpr::If {
143            condition: Box::new(condition),
144            then: Box::new(then),
145            else_expr: else_expr.map(Box::new),
146        }
147    }
148
149    /// Create a coalesce expression
150    pub fn coalesce(values: Vec<ValueExpr>) -> Self {
151        ValueExpr::Coalesce { values }
152    }
153
154    /// Create a null literal expression
155    pub fn null() -> Self {
156        ValueExpr::Literal(serde_json::Value::Null)
157    }
158
159    /// Check if this expression is a null literal
160    pub fn is_null(&self) -> bool {
161        matches!(self, ValueExpr::Literal(serde_json::Value::Null))
162    }
163
164    /// Returns the set of step indices needed to evaluate this expression.
165    ///
166    /// An empty set means the expression is ready to be fully resolved.
167    /// This method evaluates lazily - for conditional expressions like `$if`,
168    /// it only returns the condition's dependencies until the condition can
169    /// be evaluated, then returns the appropriate branch's dependencies.
170    ///
171    /// # Arguments
172    /// * `ctx` - Context providing step completion state and results
173    ///
174    /// # Example
175    /// For `{ $if: { $step: foo }, then: { $step: bar }, else: { $step: baz } }`:
176    /// 1. First call (foo not complete): returns `{foo_index}`
177    /// 2. After foo completes (truthy): returns `{bar_index}`
178    /// 3. After bar completes: returns `{}` (ready!)
179    pub fn needed_steps(&self, ctx: &impl StepContext) -> BitSet {
180        /// Collect needed steps into a mutable BitSet.
181        /// Returns `true` if we should stop early (for short-circuit evaluation).
182        fn collect<C: StepContext>(expr: &ValueExpr, ctx: &C, needed: &mut BitSet) -> bool {
183            match expr {
184                ValueExpr::Step { step, .. } => {
185                    if let Some(idx) = ctx.step_index(step)
186                        && !ctx.is_completed(idx)
187                    {
188                        needed.insert(idx);
189                    }
190                    false
191                }
192
193                ValueExpr::Input { .. } | ValueExpr::Variable { .. } => false,
194
195                ValueExpr::Literal(_) | ValueExpr::EscapedLiteral { .. } => false,
196
197                ValueExpr::If {
198                    condition,
199                    then,
200                    else_expr,
201                } => {
202                    // First collect condition's needs
203                    let before = needed.len();
204                    collect(condition, ctx, needed);
205                    if needed.len() > before {
206                        // Condition has unmet dependencies - stop here
207                        return true;
208                    }
209
210                    // Condition is ready - evaluate to determine which branch
211                    let cond_result = condition.resolve(ctx);
212                    if is_truthy(&cond_result) {
213                        collect(then, ctx, needed)
214                    } else if let Some(else_e) = else_expr {
215                        collect(else_e, ctx, needed)
216                    } else {
217                        false
218                    }
219                }
220
221                ValueExpr::Coalesce { values } => {
222                    for value in values {
223                        let before = needed.len();
224                        collect(value, ctx, needed);
225                        if needed.len() > before {
226                            // This value has unmet dependencies - stop here
227                            return true;
228                        }
229
230                        // Value is ready - evaluate to decide if we should continue
231                        let result = value.resolve(ctx);
232                        match &result {
233                            FlowResult::Success(v) if !v.as_ref().is_null() => {
234                                // Found non-null value - done
235                                return true;
236                            }
237                            FlowResult::Failed(_) => {
238                                // Error - done (will propagate during resolution)
239                                return true;
240                            }
241                            _ => {
242                                // Null or skipped - continue to next value
243                                continue;
244                            }
245                        }
246                    }
247                    false
248                }
249
250                ValueExpr::Array(items) => {
251                    for item in items {
252                        collect(item, ctx, needed);
253                    }
254                    false
255                }
256
257                ValueExpr::Object(fields) => {
258                    for (_, value) in fields {
259                        collect(value, ctx, needed);
260                    }
261                    false
262                }
263            }
264        }
265
266        let mut needed = BitSet::new();
267        collect(self, ctx, &mut needed);
268        needed
269    }
270
271    /// Resolve this expression using the provided context.
272    ///
273    /// This should only be called when `needed_steps()` returns an empty set,
274    /// meaning all required step results are available in the context.
275    ///
276    /// The context provides access to:
277    /// - Step results (`$step` references)
278    /// - Workflow input (`$input` references)
279    /// - Variables (`$variable` references)
280    pub fn resolve(&self, ctx: &impl StepContext) -> FlowResult {
281        match self {
282            ValueExpr::Step { step, path } => {
283                let Some(idx) = ctx.step_index(step) else {
284                    return FlowResult::Failed(crate::FlowError::new(
285                        crate::TaskErrorCode::ExpressionFailure,
286                        format!("Unknown step: {}", step),
287                    ));
288                };
289
290                let Some(result) = ctx.get_result(idx) else {
291                    return FlowResult::Failed(crate::FlowError::new(
292                        crate::TaskErrorCode::OrchestratorError,
293                        format!("Step {} not completed", step),
294                    ));
295                };
296
297                // Apply path if needed
298                match result {
299                    // If the step returned null, propagate null (even if path is specified)
300                    // This enables $coalesce to work with skipped steps that return null
301                    FlowResult::Success(value) if value.as_ref().is_null() => {
302                        FlowResult::Success(value.clone())
303                    }
304                    FlowResult::Success(value) if !path.is_empty() => {
305                        if let Some(sub_value) = value.resolve_json_path(path) {
306                            FlowResult::Success(sub_value)
307                        } else {
308                            FlowResult::Failed(crate::FlowError::new(
309                                crate::TaskErrorCode::ExpressionFailure,
310                                format!("Path {} not found", path),
311                            ))
312                        }
313                    }
314                    other => other.clone(),
315                }
316            }
317
318            ValueExpr::Input { input: path } => {
319                let Some(input_value) = ctx.get_input() else {
320                    return FlowResult::Failed(crate::FlowError::new(
321                        crate::TaskErrorCode::OrchestratorError,
322                        "Workflow input not available in context",
323                    ));
324                };
325
326                // Apply path if provided
327                if path.is_empty() {
328                    FlowResult::Success(input_value.clone())
329                } else if let Some(sub_value) = input_value.resolve_json_path(path) {
330                    FlowResult::Success(sub_value)
331                } else {
332                    FlowResult::Failed(crate::FlowError::new(
333                        crate::TaskErrorCode::ExpressionFailure,
334                        format!("Input path {} not found", path),
335                    ))
336                }
337            }
338
339            ValueExpr::Variable { variable, default } => {
340                // Parse the variable name and path from the JsonPath
341                let parts = variable.parts();
342                if parts.is_empty() {
343                    return FlowResult::Failed(crate::FlowError::new(
344                        crate::TaskErrorCode::OrchestratorError,
345                        "Variable path is empty",
346                    ));
347                }
348
349                // First part is the variable name
350                let var_name = match &parts[0] {
351                    PathPart::Field(name) | PathPart::IndexStr(name) => name.as_str(),
352                    PathPart::Index(_) => {
353                        return FlowResult::Failed(crate::FlowError::new(
354                            crate::TaskErrorCode::OrchestratorError,
355                            "Variable name must be a string",
356                        ));
357                    }
358                };
359
360                // Try to get the variable value
361                if let Some(var_value) = ctx.get_variable(var_name) {
362                    // Apply remaining path if any
363                    if parts.len() > 1 {
364                        let sub_path = JsonPath::from_parts(parts[1..].to_vec());
365                        if let Some(sub_value) = var_value.resolve_json_path(&sub_path) {
366                            return FlowResult::Success(sub_value);
367                        } else {
368                            // Path not found - try default
369                        }
370                    } else {
371                        return FlowResult::Success(var_value);
372                    }
373                }
374
375                // Variable not found or path failed - try default if available
376                if let Some(default_expr) = default {
377                    log::debug!("Variable '{}' not found, using default", var_name);
378                    return default_expr.resolve(ctx);
379                }
380
381                // No variable and no default - error
382                FlowResult::Failed(crate::FlowError::new(
383                    crate::TaskErrorCode::ExpressionFailure,
384                    format!("Undefined variable: {}", var_name),
385                ))
386            }
387
388            ValueExpr::Literal(value) => FlowResult::Success(ValueRef::new(value.clone())),
389
390            ValueExpr::EscapedLiteral { literal } => {
391                FlowResult::Success(ValueRef::new(literal.clone()))
392            }
393
394            ValueExpr::If {
395                condition,
396                then,
397                else_expr,
398            } => {
399                let cond_result = condition.resolve(ctx);
400                if is_truthy(&cond_result) {
401                    then.resolve(ctx)
402                } else if let Some(else_e) = else_expr {
403                    else_e.resolve(ctx)
404                } else {
405                    FlowResult::Success(ValueRef::new(serde_json::Value::Null))
406                }
407            }
408
409            ValueExpr::Coalesce { values } => {
410                for value in values {
411                    let result = value.resolve(ctx);
412                    match &result {
413                        FlowResult::Success(v) if !v.as_ref().is_null() => {
414                            return result;
415                        }
416                        FlowResult::Failed(_) => {
417                            return result;
418                        }
419                        _ => continue,
420                    }
421                }
422                FlowResult::Success(ValueRef::new(serde_json::Value::Null))
423            }
424
425            ValueExpr::Array(items) => {
426                let mut result_array = Vec::new();
427                for item in items {
428                    match item.resolve(ctx) {
429                        FlowResult::Success(value) => {
430                            result_array.push(value.as_ref().clone());
431                        }
432                        other => return other,
433                    }
434                }
435                FlowResult::Success(ValueRef::new(serde_json::Value::Array(result_array)))
436            }
437
438            ValueExpr::Object(fields) => {
439                let mut result_map = serde_json::Map::new();
440                for (k, v) in fields {
441                    match v.resolve(ctx) {
442                        FlowResult::Success(value) => {
443                            result_map.insert(k.clone(), value.as_ref().clone());
444                        }
445                        other => return other,
446                    }
447                }
448                FlowResult::Success(ValueRef::new(serde_json::Value::Object(result_map)))
449            }
450        }
451    }
452}
453
454/// Check if a FlowResult is truthy for conditional evaluation.
455///
456/// - `Success` with non-null, non-false value is truthy
457/// - `Success` with null or false is falsy
458/// - `Failed` is treated as falsy (condition evaluation failed)
459fn is_truthy(result: &FlowResult) -> bool {
460    match result {
461        FlowResult::Success(value) => match value.as_ref() {
462            serde_json::Value::Null => false,
463            serde_json::Value::Bool(b) => *b,
464            _ => true,
465        },
466        FlowResult::Failed(_) => false,
467    }
468}
469
470impl Default for ValueExpr {
471    fn default() -> Self {
472        ValueExpr::null()
473    }
474}
475
476impl schemars::JsonSchema for ValueExpr {
477    fn schema_name() -> std::borrow::Cow<'static, str> {
478        "ValueExpr".into()
479    }
480
481    fn json_schema(_generator: &mut schemars::SchemaGenerator) -> schemars::Schema {
482        serde_json::json!({
483            "description": "A value expression: any JSON value (null, boolean, number, string, array, or object). Objects with reserved $-prefixed keys are interpreted as expression references: {\"$step\": \"id\", \"path\"?: \"...\"}, {\"$input\": \"path\"}, {\"$variable\": \"path\", \"default\"?: ValueExpr}, {\"$literal\": value}, {\"$if\": cond, \"then\": expr, \"else\"?: expr}, {\"$coalesce\": [expr, ...]}. See https://stepflow.org/docs/flows/expressions for details.",
484            "externalDocs": {
485                "description": "Expressions documentation",
486                "url": "https://stepflow.org/docs/flows/expressions"
487            }
488        })
489        .try_into()
490        .expect("ValueExpr schema is valid")
491    }
492}
493
494#[cfg(test)]
495mod tests {
496    use super::*;
497    use crate::values::Secrets;
498    use serde_json::json;
499
500    #[test]
501    fn test_step_constructor() {
502        let expr = ValueExpr::step("my_step", JsonPath::default());
503        assert_eq!(
504            expr,
505            ValueExpr::Step {
506                step: "my_step".to_string(),
507                path: JsonPath::default()
508            }
509        );
510    }
511
512    #[test]
513    fn test_input_constructor() {
514        let expr = ValueExpr::workflow_input(JsonPath::from("field"));
515        assert_eq!(
516            expr,
517            ValueExpr::Input {
518                input: JsonPath::from("field")
519            }
520        );
521    }
522
523    #[test]
524    fn test_variable_constructor() {
525        let expr = ValueExpr::variable("my_var", None);
526        assert_eq!(
527            expr,
528            ValueExpr::Variable {
529                variable: JsonPath::from("my_var"),
530                default: None
531            }
532        );
533    }
534
535    #[test]
536    fn test_variable_with_default() {
537        let default_expr = Box::new(ValueExpr::literal(json!("default_value")));
538        let expr = ValueExpr::variable("my_var", Some(default_expr.clone()));
539        assert_eq!(
540            expr,
541            ValueExpr::Variable {
542                variable: JsonPath::from("my_var"),
543                default: Some(default_expr)
544            }
545        );
546    }
547
548    #[test]
549    fn test_literal_primitives() {
550        // Null
551        assert_eq!(
552            ValueExpr::literal(json!(null)),
553            ValueExpr::Literal(json!(null))
554        );
555
556        // Bool
557        assert_eq!(
558            ValueExpr::literal(json!(true)),
559            ValueExpr::Literal(json!(true))
560        );
561        assert_eq!(
562            ValueExpr::literal(json!(false)),
563            ValueExpr::Literal(json!(false))
564        );
565
566        // Number
567        assert_eq!(ValueExpr::literal(json!(42)), ValueExpr::Literal(json!(42)));
568        assert_eq!(
569            ValueExpr::literal(json!(3.25)),
570            ValueExpr::Literal(json!(3.25))
571        );
572
573        // String
574        assert_eq!(
575            ValueExpr::literal(json!("hello")),
576            ValueExpr::Literal(json!("hello"))
577        );
578    }
579
580    #[test]
581    fn test_literal_composable_structures() {
582        // Array - should be converted to Array variant
583        let arr_expr = ValueExpr::literal(json!([1, 2, 3]));
584        match arr_expr {
585            ValueExpr::Array(arr) => {
586                assert_eq!(arr.len(), 3);
587                assert_eq!(arr[0], ValueExpr::Literal(json!(1)));
588                assert_eq!(arr[1], ValueExpr::Literal(json!(2)));
589                assert_eq!(arr[2], ValueExpr::Literal(json!(3)));
590            }
591            _ => panic!("Expected Array variant"),
592        }
593
594        // Object - should be converted to Object variant
595        let obj_expr = ValueExpr::literal(json!({"a": 1, "b": "hello"}));
596        match obj_expr {
597            ValueExpr::Object(obj) => {
598                assert_eq!(obj.len(), 2);
599                // Vec is unordered in terms of what we guarantee, but serde_json preserves order
600                assert!(
601                    obj.iter()
602                        .any(|(k, v)| k == "a" && *v == ValueExpr::Literal(json!(1)))
603                );
604                assert!(
605                    obj.iter()
606                        .any(|(k, v)| k == "b" && *v == ValueExpr::Literal(json!("hello")))
607                );
608            }
609            _ => panic!("Expected Object variant"),
610        }
611    }
612
613    #[test]
614    fn test_array_constructor() {
615        let arr = ValueExpr::array(vec![
616            ValueExpr::literal(json!(1)),
617            ValueExpr::literal(json!("two")),
618        ]);
619        assert_eq!(
620            arr,
621            ValueExpr::Array(vec![
622                ValueExpr::Literal(json!(1)),
623                ValueExpr::Literal(json!("two"))
624            ])
625        );
626    }
627
628    #[test]
629    fn test_object_constructor() {
630        let obj = ValueExpr::object(vec![
631            ("a".to_string(), ValueExpr::literal(json!(1))),
632            ("b".to_string(), ValueExpr::literal(json!("hello"))),
633        ]);
634        assert_eq!(
635            obj,
636            ValueExpr::Object(vec![
637                ("a".to_string(), ValueExpr::Literal(json!(1))),
638                ("b".to_string(), ValueExpr::Literal(json!("hello")))
639            ])
640        );
641    }
642
643    #[test]
644    fn test_escaped_literal() {
645        let expr = ValueExpr::escaped_literal(json!({"step": "foo"}));
646        assert_eq!(
647            expr,
648            ValueExpr::EscapedLiteral {
649                literal: json!({"step": "foo"})
650            }
651        );
652    }
653
654    #[test]
655    fn test_composable_with_references() {
656        // Array of mixed expressions and literals
657        let arr = ValueExpr::Array(vec![
658            ValueExpr::step("step1", JsonPath::default()),
659            ValueExpr::literal(json!("literal_string")),
660            ValueExpr::workflow_input(JsonPath::from("field")),
661        ]);
662
663        match &arr {
664            ValueExpr::Array(v) => {
665                assert_eq!(v.len(), 3);
666                assert!(matches!(&v[0], ValueExpr::Step { .. }));
667                assert!(matches!(&v[1], ValueExpr::Literal(_)));
668                assert!(matches!(&v[2], ValueExpr::Input { .. }));
669            }
670            _ => panic!("Expected Array"),
671        }
672
673        // Object with mixed expressions
674        let obj = ValueExpr::Object(vec![
675            (
676                "ref".to_string(),
677                ValueExpr::step("step1", JsonPath::default()),
678            ),
679            ("lit".to_string(), ValueExpr::literal(json!("value"))),
680            (
681                "input".to_string(),
682                ValueExpr::workflow_input(JsonPath::from("x")),
683            ),
684        ]);
685
686        match &obj {
687            ValueExpr::Object(fields) => {
688                assert_eq!(fields.len(), 3);
689                assert!(matches!(&fields[0].1, ValueExpr::Step { .. }));
690                assert!(matches!(&fields[1].1, ValueExpr::Literal(_)));
691                assert!(matches!(&fields[2].1, ValueExpr::Input { .. }));
692            }
693            _ => panic!("Expected Object"),
694        }
695    }
696
697    // ========== Tests for needed_steps() ==========
698
699    /// Mock StepContext for testing
700    struct MockStepContext {
701        step_names: Vec<String>,
702        completed: BitSet,
703        results: Vec<Option<FlowResult>>,
704        input: Option<ValueRef>,
705    }
706
707    impl MockStepContext {
708        fn new(step_names: Vec<&str>) -> Self {
709            let len = step_names.len();
710            Self {
711                step_names: step_names.into_iter().map(|s| s.to_string()).collect(),
712                completed: BitSet::new(),
713                results: vec![None; len],
714                input: None,
715            }
716        }
717
718        #[allow(dead_code)]
719        fn with_input(step_names: Vec<&str>, input: serde_json::Value) -> Self {
720            let mut ctx = Self::new(step_names);
721            ctx.input = Some(ValueRef::new(input));
722            ctx
723        }
724
725        fn complete_step(&mut self, name: &str, result: FlowResult) {
726            if let Some(idx) = self.step_names.iter().position(|s| s == name) {
727                self.completed.insert(idx);
728                self.results[idx] = Some(result);
729            }
730        }
731    }
732
733    impl StepContext for MockStepContext {
734        fn step_index(&self, step_id: &str) -> Option<usize> {
735            self.step_names.iter().position(|s| s == step_id)
736        }
737
738        fn is_completed(&self, step_index: usize) -> bool {
739            self.completed.contains(step_index)
740        }
741
742        fn get_result(&self, step_index: usize) -> Option<&FlowResult> {
743            self.results.get(step_index).and_then(|r| r.as_ref())
744        }
745
746        fn get_input(&self) -> Option<&ValueRef> {
747            self.input.as_ref()
748        }
749
750        fn get_variable(&self, _name: &str) -> Option<ValueRef> {
751            None // Mock doesn't support variables
752        }
753
754        fn get_variable_secrets(&self, _name: &str) -> Secrets {
755            Secrets::empty().clone()
756        }
757    }
758
759    #[test]
760    fn test_needed_steps_literal() {
761        let ctx = MockStepContext::new(vec!["step1"]);
762        let expr = ValueExpr::literal(json!(42));
763        let needs = expr.needed_steps(&ctx);
764        assert!(needs.is_empty(), "Literals should need no steps");
765    }
766
767    #[test]
768    fn test_needed_steps_step_not_completed() {
769        let ctx = MockStepContext::new(vec!["step1", "step2"]);
770        let expr = ValueExpr::step("step1", JsonPath::default());
771        let needs = expr.needed_steps(&ctx);
772        assert!(needs.contains(0), "Should need step1 (index 0)");
773        assert!(!needs.contains(1), "Should not need step2");
774    }
775
776    #[test]
777    fn test_needed_steps_step_completed() {
778        let mut ctx = MockStepContext::new(vec!["step1"]);
779        ctx.complete_step("step1", FlowResult::Success(ValueRef::new(json!(42))));
780
781        let expr = ValueExpr::step("step1", JsonPath::default());
782        let needs = expr.needed_steps(&ctx);
783        assert!(needs.is_empty(), "Completed step should need nothing");
784    }
785
786    #[test]
787    fn test_needed_steps_if_condition_not_ready() {
788        let ctx = MockStepContext::new(vec!["cond", "then_step", "else_step"]);
789
790        // { $if: { $step: cond }, then: { $step: then_step }, else: { $step: else_step } }
791        let expr = ValueExpr::if_expr(
792            ValueExpr::step("cond", JsonPath::default()),
793            ValueExpr::step("then_step", JsonPath::default()),
794            Some(ValueExpr::step("else_step", JsonPath::default())),
795        );
796
797        let needs = expr.needed_steps(&ctx);
798        assert!(needs.contains(0), "Should need condition step");
799        assert!(!needs.contains(1), "Should NOT need then_step yet");
800        assert!(!needs.contains(2), "Should NOT need else_step yet");
801    }
802
803    #[test]
804    fn test_needed_steps_if_condition_true() {
805        let mut ctx = MockStepContext::new(vec!["cond", "then_step", "else_step"]);
806        ctx.complete_step("cond", FlowResult::Success(ValueRef::new(json!(true))));
807
808        let expr = ValueExpr::if_expr(
809            ValueExpr::step("cond", JsonPath::default()),
810            ValueExpr::step("then_step", JsonPath::default()),
811            Some(ValueExpr::step("else_step", JsonPath::default())),
812        );
813
814        let needs = expr.needed_steps(&ctx);
815        assert!(!needs.contains(0), "Should not need condition (completed)");
816        assert!(
817            needs.contains(1),
818            "Should need then_step (condition was true)"
819        );
820        assert!(!needs.contains(2), "Should NOT need else_step");
821    }
822
823    #[test]
824    fn test_needed_steps_if_condition_false() {
825        let mut ctx = MockStepContext::new(vec!["cond", "then_step", "else_step"]);
826        ctx.complete_step("cond", FlowResult::Success(ValueRef::new(json!(false))));
827
828        let expr = ValueExpr::if_expr(
829            ValueExpr::step("cond", JsonPath::default()),
830            ValueExpr::step("then_step", JsonPath::default()),
831            Some(ValueExpr::step("else_step", JsonPath::default())),
832        );
833
834        let needs = expr.needed_steps(&ctx);
835        assert!(!needs.contains(0), "Should not need condition (completed)");
836        assert!(!needs.contains(1), "Should NOT need then_step");
837        assert!(
838            needs.contains(2),
839            "Should need else_step (condition was false)"
840        );
841    }
842
843    #[test]
844    fn test_needed_steps_if_fully_ready() {
845        let mut ctx = MockStepContext::new(vec!["cond", "then_step", "else_step"]);
846        ctx.complete_step("cond", FlowResult::Success(ValueRef::new(json!(true))));
847        ctx.complete_step(
848            "then_step",
849            FlowResult::Success(ValueRef::new(json!("result"))),
850        );
851
852        let expr = ValueExpr::if_expr(
853            ValueExpr::step("cond", JsonPath::default()),
854            ValueExpr::step("then_step", JsonPath::default()),
855            Some(ValueExpr::step("else_step", JsonPath::default())),
856        );
857
858        let needs = expr.needed_steps(&ctx);
859        assert!(needs.is_empty(), "All needed steps completed - ready");
860    }
861
862    #[test]
863    fn test_needed_steps_coalesce_first_value_not_ready() {
864        let ctx = MockStepContext::new(vec!["step1", "step2"]);
865
866        let expr = ValueExpr::coalesce(vec![
867            ValueExpr::step("step1", JsonPath::default()),
868            ValueExpr::step("step2", JsonPath::default()),
869        ]);
870
871        let needs = expr.needed_steps(&ctx);
872        assert!(needs.contains(0), "Should need step1 first");
873        assert!(!needs.contains(1), "Should NOT need step2 yet");
874    }
875
876    #[test]
877    fn test_needed_steps_coalesce_first_value_null() {
878        let mut ctx = MockStepContext::new(vec!["step1", "step2"]);
879        ctx.complete_step("step1", FlowResult::Success(ValueRef::new(json!(null))));
880
881        let expr = ValueExpr::coalesce(vec![
882            ValueExpr::step("step1", JsonPath::default()),
883            ValueExpr::step("step2", JsonPath::default()),
884        ]);
885
886        let needs = expr.needed_steps(&ctx);
887        assert!(!needs.contains(0), "step1 completed (null)");
888        assert!(needs.contains(1), "Should now need step2");
889    }
890
891    #[test]
892    fn test_needed_steps_coalesce_first_value_success() {
893        let mut ctx = MockStepContext::new(vec!["step1", "step2"]);
894        ctx.complete_step("step1", FlowResult::Success(ValueRef::new(json!("value"))));
895
896        let expr = ValueExpr::coalesce(vec![
897            ValueExpr::step("step1", JsonPath::default()),
898            ValueExpr::step("step2", JsonPath::default()),
899        ]);
900
901        let needs = expr.needed_steps(&ctx);
902        assert!(needs.is_empty(), "Found non-null - no more steps needed");
903    }
904
905    #[test]
906    fn test_needed_steps_coalesce_null_continues() {
907        let mut ctx = MockStepContext::new(vec!["step1", "step2"]);
908        // Step completed with null value (equivalent to old "skipped")
909        ctx.complete_step("step1", FlowResult::Success(ValueRef::new(json!(null))));
910
911        let expr = ValueExpr::coalesce(vec![
912            ValueExpr::step("step1", JsonPath::default()),
913            ValueExpr::step("step2", JsonPath::default()),
914        ]);
915
916        let needs = expr.needed_steps(&ctx);
917        assert!(!needs.contains(0), "step1 completed (null)");
918        assert!(
919            needs.contains(1),
920            "Should now need step2 (coalesce continues on null)"
921        );
922    }
923
924    #[test]
925    fn test_needed_steps_array_union() {
926        let ctx = MockStepContext::new(vec!["step1", "step2", "step3"]);
927
928        let expr = ValueExpr::array(vec![
929            ValueExpr::step("step1", JsonPath::default()),
930            ValueExpr::step("step2", JsonPath::default()),
931            ValueExpr::literal(json!("literal")),
932        ]);
933
934        let needs = expr.needed_steps(&ctx);
935        assert!(needs.contains(0), "Should need step1");
936        assert!(needs.contains(1), "Should need step2");
937        assert!(!needs.contains(2), "Should not need step3 (not referenced)");
938    }
939
940    #[test]
941    fn test_needed_steps_object_union() {
942        let ctx = MockStepContext::new(vec!["step1", "step2"]);
943
944        let expr = ValueExpr::object(vec![
945            (
946                "a".to_string(),
947                ValueExpr::step("step1", JsonPath::default()),
948            ),
949            (
950                "b".to_string(),
951                ValueExpr::step("step2", JsonPath::default()),
952            ),
953        ]);
954
955        let needs = expr.needed_steps(&ctx);
956        assert!(needs.contains(0), "Should need step1");
957        assert!(needs.contains(1), "Should need step2");
958    }
959
960    #[test]
961    fn test_needed_steps_nested_if_in_coalesce() {
962        let ctx = MockStepContext::new(vec!["cond", "then_step", "fallback"]);
963
964        // { $coalesce: [{ $if: { $step: cond }, then: { $step: then_step } }, { $step: fallback }] }
965        let expr = ValueExpr::coalesce(vec![
966            ValueExpr::if_expr(
967                ValueExpr::step("cond", JsonPath::default()),
968                ValueExpr::step("then_step", JsonPath::default()),
969                None,
970            ),
971            ValueExpr::step("fallback", JsonPath::default()),
972        ]);
973
974        // First: need condition for the $if
975        let needs = expr.needed_steps(&ctx);
976        assert!(needs.contains(0), "Should need cond first");
977        assert!(!needs.contains(1), "Should NOT need then_step yet");
978        assert!(!needs.contains(2), "Should NOT need fallback yet");
979    }
980
981    #[test]
982    fn test_needed_steps_nested_if_condition_false_returns_null() {
983        let mut ctx = MockStepContext::new(vec!["cond", "then_step", "fallback"]);
984        ctx.complete_step("cond", FlowResult::Success(ValueRef::new(json!(false))));
985
986        // $if with no else returns null when condition is false
987        let expr = ValueExpr::coalesce(vec![
988            ValueExpr::if_expr(
989                ValueExpr::step("cond", JsonPath::default()),
990                ValueExpr::step("then_step", JsonPath::default()),
991                None, // No else - returns null
992            ),
993            ValueExpr::step("fallback", JsonPath::default()),
994        ]);
995
996        let needs = expr.needed_steps(&ctx);
997        // Condition is false, $if returns null, coalesce moves to fallback
998        assert!(!needs.contains(0), "Condition completed");
999        assert!(!needs.contains(1), "then_step not needed (condition false)");
1000        assert!(needs.contains(2), "Should need fallback now");
1001    }
1002
1003    /// Regression test for #866: integer literals in step inputs must survive
1004    /// value resolution without being coerced to floats.
1005    #[test]
1006    fn test_resolve_object_preserves_integer_types() {
1007        let ctx = MockStepContext::new(vec![]);
1008
1009        // Simulate a step input like: { "duration_ms": 10, "name": "test" }
1010        let expr = ValueExpr::Object(vec![
1011            ("duration_ms".to_string(), ValueExpr::Literal(json!(10))),
1012            ("name".to_string(), ValueExpr::Literal(json!("test"))),
1013        ]);
1014
1015        let result = expr.resolve(&ctx);
1016        let value = result.success().unwrap();
1017        let duration = value.as_ref().get("duration_ms").unwrap();
1018
1019        assert!(
1020            duration.is_u64(),
1021            "Integer literal should remain u64 after resolution, got {:?}",
1022            duration
1023        );
1024        assert_eq!(duration.as_u64(), Some(10));
1025    }
1026
1027    #[test]
1028    fn test_resolve_literal() {
1029        let ctx = MockStepContext::new(vec![]);
1030        let expr = ValueExpr::literal(json!(42));
1031        let result = expr.resolve(&ctx);
1032        assert_eq!(result.success().unwrap().as_ref(), &json!(42));
1033    }
1034
1035    #[test]
1036    fn test_resolve_step() {
1037        let mut ctx = MockStepContext::new(vec!["step1"]);
1038        ctx.complete_step(
1039            "step1",
1040            FlowResult::Success(ValueRef::new(json!({"value": 42}))),
1041        );
1042
1043        let expr = ValueExpr::step("step1", JsonPath::default());
1044        let result = expr.resolve(&ctx);
1045        assert_eq!(result.success().unwrap().as_ref(), &json!({"value": 42}));
1046    }
1047
1048    #[test]
1049    fn test_resolve_step_with_path() {
1050        let mut ctx = MockStepContext::new(vec!["step1"]);
1051        ctx.complete_step(
1052            "step1",
1053            FlowResult::Success(ValueRef::new(json!({"value": 42}))),
1054        );
1055
1056        let expr = ValueExpr::step("step1", JsonPath::from("value"));
1057        let result = expr.resolve(&ctx);
1058        assert_eq!(result.success().unwrap().as_ref(), &json!(42));
1059    }
1060
1061    #[test]
1062    fn test_resolve_if_true() {
1063        let mut ctx = MockStepContext::new(vec!["cond", "then_step"]);
1064        ctx.complete_step("cond", FlowResult::Success(ValueRef::new(json!(true))));
1065        ctx.complete_step(
1066            "then_step",
1067            FlowResult::Success(ValueRef::new(json!("then_value"))),
1068        );
1069
1070        let expr = ValueExpr::if_expr(
1071            ValueExpr::step("cond", JsonPath::default()),
1072            ValueExpr::step("then_step", JsonPath::default()),
1073            Some(ValueExpr::literal(json!("else_value"))),
1074        );
1075
1076        let result = expr.resolve(&ctx);
1077        assert_eq!(result.success().unwrap().as_ref(), &json!("then_value"));
1078    }
1079
1080    #[test]
1081    fn test_resolve_if_false() {
1082        let mut ctx = MockStepContext::new(vec!["cond"]);
1083        ctx.complete_step("cond", FlowResult::Success(ValueRef::new(json!(false))));
1084
1085        let expr = ValueExpr::if_expr(
1086            ValueExpr::step("cond", JsonPath::default()),
1087            ValueExpr::literal(json!("then_value")),
1088            Some(ValueExpr::literal(json!("else_value"))),
1089        );
1090
1091        let result = expr.resolve(&ctx);
1092        assert_eq!(result.success().unwrap().as_ref(), &json!("else_value"));
1093    }
1094
1095    #[test]
1096    fn test_resolve_coalesce() {
1097        let mut ctx = MockStepContext::new(vec!["step1", "step2"]);
1098        ctx.complete_step("step1", FlowResult::Success(ValueRef::new(json!(null))));
1099        ctx.complete_step("step2", FlowResult::Success(ValueRef::new(json!("value"))));
1100
1101        let expr = ValueExpr::coalesce(vec![
1102            ValueExpr::step("step1", JsonPath::default()),
1103            ValueExpr::step("step2", JsonPath::default()),
1104        ]);
1105
1106        let result = expr.resolve(&ctx);
1107        assert_eq!(result.success().unwrap().as_ref(), &json!("value"));
1108    }
1109
1110    #[test]
1111    fn test_is_truthy() {
1112        // Truthy values
1113        assert!(is_truthy(&FlowResult::Success(ValueRef::new(json!(true)))));
1114        assert!(is_truthy(&FlowResult::Success(ValueRef::new(json!(1)))));
1115        assert!(is_truthy(&FlowResult::Success(ValueRef::new(json!("str")))));
1116        assert!(is_truthy(&FlowResult::Success(ValueRef::new(json!([])))));
1117        assert!(is_truthy(&FlowResult::Success(ValueRef::new(json!({})))));
1118
1119        // Falsy values
1120        assert!(!is_truthy(&FlowResult::Success(ValueRef::new(json!(null)))));
1121        assert!(!is_truthy(&FlowResult::Success(ValueRef::new(json!(
1122            false
1123        )))));
1124        assert!(!is_truthy(&FlowResult::Failed(crate::FlowError::new(
1125            crate::TaskErrorCode::OrchestratorError,
1126            "error",
1127        ))));
1128    }
1129}