stepflow-flow 0.13.0

Stepflow workflow definition types — Flow, Step, ValueExpr, and related types.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
// Copyright 2025 DataStax Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under
// the License.

use std::collections::HashMap;

use serde_with::{DefaultOnNull, serde_as};

use super::{FlowSchema, Step, ValueRef, VariableSchema};
use crate::{FlowResult, ValueExpr, schema::SchemaRef};

/// A workflow consisting of a sequence of steps and their outputs.
///
/// A flow represents a complete workflow that can be executed. It contains:
/// - A sequence of steps to execute
/// - Named outputs that can reference step outputs
///
/// Flows should not be cloned. They should generally be stored and passed as a
/// reference or inside an `Arc`.
#[serde_as]
#[derive(
    Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Default, schemars::JsonSchema,
)]
#[serde(rename_all = "camelCase")]
pub struct Flow {
    /// The name of the flow.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub name: Option<String>,

    /// The description of the flow.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub description: Option<String>,

    /// The version of the flow.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub version: Option<String>,

    /// Consolidated schema information for the flow.
    /// Contains input/output schemas, step output schemas, and shared `$defs`.
    #[serde(default, skip_serializing_if = "FlowSchema::is_empty")]
    #[serde_as(as = "DefaultOnNull")]
    #[schemars(with = "FlowSchema")]
    pub schemas: FlowSchema,

    /// The steps to execute for the flow.
    #[serde(default, skip_serializing_if = "Vec::is_empty")]
    #[serde_as(as = "DefaultOnNull")]
    pub steps: Vec<Step>,

    /// The outputs of the flow, mapping output names to their values.
    #[serde(default, skip_serializing_if = "ValueExpr::is_null")]
    pub output: ValueExpr,

    /// Test configuration for the flow.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub test: Option<TestConfig>,

    /// Example inputs for the workflow that can be used for testing and UI dropdowns.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub examples: Option<Vec<ExampleInput>>,

    /// Extensible metadata for the flow that can be used by tools and frameworks.
    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
    #[serde_as(as = "DefaultOnNull")]
    pub metadata: HashMap<String, serde_json::Value>,
}

impl Flow {
    /// Create a clone of this flow.
    ///
    /// **Warning**: This method performs a deep clone of the entire workflow structure,
    /// including all steps, metadata, and configurations. This can be expensive for
    /// large workflows.
    ///
    /// # Performance
    /// - Cloning large workflows with many steps can be slow
    /// - Consider using `Arc<Flow>` for shared ownership instead
    /// - Only use this when you need to modify the workflow structure
    ///
    /// # Example
    /// ```rust
    /// use stepflow_core::workflow::Flow;
    ///
    /// let original_flow = Flow::default();
    /// let cloned_flow = original_flow.slow_clone();
    /// ```
    pub fn slow_clone(&self) -> Self {
        self.clone()
    }

    pub fn name(&self) -> Option<&str> {
        self.name.as_deref()
    }

    pub fn description(&self) -> Option<&str> {
        self.description.as_deref()
    }

    pub fn version(&self) -> Option<&str> {
        self.version.as_deref()
    }

    pub fn metadata(&self) -> &HashMap<String, serde_json::Value> {
        &self.metadata
    }

    /// Returns a reference to all steps in the flow.
    pub fn steps(&self) -> &[Step] {
        &self.steps
    }

    pub fn examples(&self) -> &[ExampleInput] {
        self.examples.as_deref().unwrap_or(&[])
    }

    /// Get the variable schema for the flow.
    ///
    /// This constructs a `VariableSchema` from the schema definition, extracting
    /// runtime metadata like defaults, secrets, and required variables.
    pub fn variables(&self) -> Option<VariableSchema> {
        self.schemas().variables.clone().map(VariableSchema::from)
    }

    /// Get a reference to the variable schema (raw SchemaRef).
    pub fn variable_schema(&self) -> Option<&SchemaRef> {
        self.schemas().variables.as_ref()
    }

    /// Returns a reference to the step at the given index.
    ///
    /// # Panics
    ///
    /// Panics if the index is out of bounds.
    pub fn step(&self, index: usize) -> &Step {
        &self.steps[index]
    }

    /// Returns a mutable reference to the step at the given index.
    ///
    /// # Panics
    ///
    /// Panics if the index is out of bounds.
    pub fn step_mut(&mut self, index: usize) -> &mut Step {
        self.steps.get_mut(index).expect("Index out of bounds")
    }

    /// Returns a reference to the flow's output value.
    pub fn output(&self) -> &ValueExpr {
        &self.output
    }

    pub fn test(&self) -> Option<&TestConfig> {
        self.test.as_ref()
    }

    pub fn test_mut(&mut self) -> Option<&mut TestConfig> {
        self.test.as_mut()
    }

    /// Get the flow's schema information.
    pub fn schemas(&self) -> &FlowSchema {
        &self.schemas
    }

    /// Get a mutable reference to the flow's schema information.
    pub fn schemas_mut(&mut self) -> &mut FlowSchema {
        &mut self.schemas
    }

    /// Get the flow's input schema.
    pub fn input_schema(&self) -> Option<&SchemaRef> {
        self.schemas.input.as_ref()
    }

    /// Set the flow's input schema.
    pub fn set_input_schema(&mut self, input_schema: Option<SchemaRef>) {
        self.schemas.input = input_schema;
    }

    /// Get the flow's output schema.
    pub fn output_schema(&self) -> Option<&SchemaRef> {
        self.schemas.output.as_ref()
    }

    /// Set the flow's output schema.
    pub fn set_output_schema(&mut self, output_schema: Option<SchemaRef>) {
        self.schemas.output = output_schema;
    }

    /// Get the output schema for a specific step.
    pub fn step_output_schema(&self, step_id: &str) -> Option<&SchemaRef> {
        self.schemas.steps.get(step_id)
    }

    /// Set the output schema for a specific step.
    pub fn set_step_output_schema(&mut self, step_id: String, step_schema: SchemaRef) {
        self.schemas.steps.insert(step_id, step_schema);
    }

    /// Get all example inputs, including those derived from test cases.
    pub fn get_all_examples(&self) -> Vec<ExampleInput> {
        let mut examples = self.examples().to_vec();

        // Add examples from test cases if they exist
        if let Some(test_config) = &self.test {
            for test_case in &test_config.cases {
                // Only add if there isn't already an example with the same name
                if !examples.iter().any(|ex| ex.name == test_case.name) {
                    examples.push(ExampleInput::from(test_case));
                }
            }
        }

        examples
    }
}

/// A wrapper around `Arc<Flow>` to support poem-openapi traits.
///
/// This wrapper exists to work around Rust's orphan rules which prevent
/// implementing external traits on external types like `Arc<Flow>`.
#[derive(Debug, Clone, PartialEq)]
pub struct FlowRef(std::sync::Arc<Flow>);

impl FlowRef {
    /// Create a new FlowRef from a Flow.
    pub fn new(flow: Flow) -> Self {
        Self(std::sync::Arc::new(flow))
    }

    /// Create a new FlowRef from an `Arc<Flow>`.
    pub fn from_arc(arc: std::sync::Arc<Flow>) -> Self {
        Self(arc)
    }

    /// Get a reference to the underlying Flow.
    pub fn as_flow(&self) -> &Flow {
        &self.0
    }

    /// Get the underlying `Arc<Flow>`.
    pub fn into_arc(self) -> std::sync::Arc<Flow> {
        self.0
    }

    /// Get a reference to the underlying `Arc<Flow>`.
    pub fn as_arc(&self) -> &std::sync::Arc<Flow> {
        &self.0
    }
}

impl std::ops::Deref for FlowRef {
    type Target = Flow;

    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

impl From<Flow> for FlowRef {
    fn from(flow: Flow) -> Self {
        Self::new(flow)
    }
}

impl From<std::sync::Arc<Flow>> for FlowRef {
    fn from(arc: std::sync::Arc<Flow>) -> Self {
        Self::from_arc(arc)
    }
}

impl serde::Serialize for FlowRef {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: serde::Serializer,
    {
        self.0.serialize(serializer)
    }
}

impl<'de> serde::Deserialize<'de> for FlowRef {
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
    where
        D: serde::Deserializer<'de>,
    {
        let flow = Flow::deserialize(deserializer)?;
        Ok(Self::new(flow))
    }
}

/// Configuration for testing a workflow.
#[serde_as]
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, schemars::JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct TestConfig {
    /// Path to an external stepflow config file for tests.
    /// Relative paths are resolved from the workflow file's directory.
    /// Mutually exclusive with `config` - validated at runtime.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub config_file: Option<String>,

    /// Inline stepflow configuration for tests.
    /// Mutually exclusive with `config_file` - validated at runtime.
    #[serde(
        default,
        skip_serializing_if = "Option::is_none",
        alias = "stepflow_config"
    )]
    pub config: Option<serde_json::Value>,

    /// Test cases for the workflow.
    #[serde(default, skip_serializing_if = "Vec::is_empty")]
    #[serde_as(as = "DefaultOnNull")]
    pub cases: Vec<TestCase>,
}

/// A single test case for a workflow.
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, schemars::JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct TestCase {
    /// Unique identifier for the test case.
    pub name: String,

    /// Optional description of what this test case verifies.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub description: Option<String>,

    /// Input data for the workflow in this test case.
    pub input: ValueRef,

    /// Expected output from the workflow for this test case.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub output: Option<FlowResult>,
}

/// An example input for a workflow that can be used in UI dropdowns.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, schemars::JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct ExampleInput {
    /// Name of the example input for display purposes.
    pub name: String,

    /// Optional description of what this example demonstrates.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub description: Option<String>,

    /// The input data for this example.
    pub input: ValueRef,
}

impl From<&TestCase> for ExampleInput {
    fn from(test_case: &TestCase) -> Self {
        Self {
            name: test_case.name.clone(),
            description: test_case.description.clone(),
            input: test_case.input.clone(),
        }
    }
}

#[cfg(test)]
mod tests {
    use crate::workflow::{FlowBuilder, StepBuilder};

    use super::*;

    #[test]
    fn test_flow_from_yaml() {
        let yaml = r#"
        name: test
        description: test
        version: 1.0.0
        schemas:
            type: object
            properties:
                input:
                    type: object
                    properties:
                        name:
                            type: string
                            description: The name to echo
                        count:
                            type: integer
                output:
                    type: object
                    properties:
                        s1a:
                            type: string
                        s2b:
                            type: string
        steps:
          - component: /langflow/echo
            id: s1
            input:
              a: "hello world"
          - component: /mcp/foo/bar
            id: s2
            input:
              a: "hello world 2"
        output:
            s1a: { $step: s1, path: "a" }
            s2b: { $step: s2, path: a }
        "#;
        let flow: Flow = serde_yaml_ng::from_str(yaml).unwrap();
        let input_schema = SchemaRef::parse_json(r#"{"type":"object","properties":{"name":{"type":"string","description":"The name to echo"},"count":{"type":"integer"}}}"#).unwrap();
        let output_schema = SchemaRef::parse_json(
            r#"{"type":"object","properties":{"s1a":{"type":"string"},"s2b":{"type":"string"}}}"#,
        )
        .unwrap();
        // Verify basic flow properties
        assert_eq!(flow.name, Some("test".to_owned()));
        assert_eq!(flow.description, Some("test".to_owned()));
        assert_eq!(flow.version, Some("1.0.0".to_owned()));
        assert_eq!(flow.schemas.input, Some(input_schema.clone()));
        assert_eq!(flow.schemas.output, Some(output_schema.clone()));
        assert_eq!(flow.steps.len(), 2);

        // Verify step details
        assert_eq!(flow.steps[0].id, "s1");
        assert_eq!(flow.steps[0].component.path(), "/langflow/echo");
        assert_eq!(flow.steps[1].id, "s2");
        assert_eq!(flow.steps[1].component.path(), "/mcp/foo/bar");

        // Test round-trip serialization to ensure expressions are preserved
        let serialized = serde_json::to_string(&flow).unwrap();
        let deserialized: Flow = serde_json::from_str(&serialized).unwrap();
        assert_eq!(flow.name, deserialized.name);
        assert_eq!(flow.steps.len(), deserialized.steps.len());
        assert_eq!(flow.output, deserialized.output);

        // Verify that the output contains proper expression structures
        // The output should be parsed as an Object expression containing step references
        assert!(matches!(flow.output, ValueExpr::Object(_)));

        // Test full structural equality
        let expected_flow = FlowBuilder::new()
            .name("test")
            .description("test")
            .version("1.0.0")
            .input_schema(input_schema)
            .output_schema(output_schema)
            .steps(vec![
                StepBuilder::new("s1")
                    .component("/langflow/echo")
                    .input_literal(serde_json::json!({
                        "a": "hello world"
                    }))
                    .build(),
                StepBuilder::new("s2")
                    .component("/mcp/foo/bar")
                    .input_literal(serde_json::json!({
                        "a": "hello world 2"
                    }))
                    .build(),
            ])
            .output(
                serde_json::from_value(serde_json::json!({
                    "s1a": { "$step": "s1", "path": "a" },
                    "s2b": { "$step": "s2", "path": "a" }
                }))
                .unwrap(),
            )
            .build();

        similar_asserts::assert_serde_eq!(&flow, &expected_flow);
    }

    #[test]
    fn test_get_all_examples() {
        use super::*;
        use serde_json::json;

        // Create a flow with both examples and test cases
        let flow = FlowBuilder::new()
            .name("test_flow")
            .output(ValueExpr::literal(json!({})))
            .examples(vec![ExampleInput {
                name: "example1".to_string(),
                description: Some("Direct example".to_string()),
                input: ValueRef::new(json!({"input": "example"})),
            }])
            .test_config(TestConfig {
                config: None,
                config_file: None,
                cases: vec![
                    TestCase {
                        name: "test1".to_string(),
                        description: Some("Test case as example".to_string()),
                        input: ValueRef::new(json!({"input": "test"})),
                        output: None,
                    },
                    TestCase {
                        name: "example1".to_string(), // Duplicate name, should not be added
                        description: Some("Duplicate name".to_string()),
                        input: ValueRef::new(json!({"input": "duplicate"})),
                        output: None,
                    },
                ],
            })
            .build();

        let all_examples = flow.get_all_examples();

        // Should have 2 examples: 1 direct + 1 from test cases (duplicate name ignored)
        assert_eq!(all_examples.len(), 2);

        // Check first example (direct)
        assert_eq!(all_examples[0].name, "example1");
        assert_eq!(
            all_examples[0].description,
            Some("Direct example".to_string())
        );

        // Check second example (from test case)
        assert_eq!(all_examples[1].name, "test1");
        assert_eq!(
            all_examples[1].description,
            Some("Test case as example".to_string())
        );
    }

    #[test]
    fn test_flow_all_optional_null() {
        // All optional/defaulted fields sent as explicit null — the worst case from
        // a Python client calling model_dump() without exclude_none=True.
        let json = serde_json::json!({
            "name": null,
            "description": null,
            "version": null,
            "schemas": null,
            "steps": null,
            "output": null,
            "test": null,
            "examples": null,
            "metadata": null,
        });
        let flow: Flow = serde_json::from_value(json).unwrap();
        assert!(flow.name.is_none());
        assert!(flow.description.is_none());
        assert!(flow.version.is_none());
        assert!(flow.schemas.is_empty());
        assert!(flow.steps.is_empty());
        assert!(flow.output.is_null());
        assert!(flow.test.is_none());
        assert!(flow.examples.is_none());
        assert!(flow.metadata.is_empty());
    }

    #[test]
    fn test_test_config_all_optional_null() {
        // All optional/defaulted fields as explicit null
        let json = serde_json::json!({
            "configFile": null,
            "config": null,
            "cases": null,
        });
        let config: TestConfig = serde_json::from_value(json).unwrap();
        assert!(config.config_file.is_none());
        assert!(config.config.is_none());
        assert!(config.cases.is_empty());
    }

    #[test]
    fn test_test_case_optional_null() {
        // Optional fields in TestCase as explicit null
        let json = serde_json::json!({
            "name": "my_test",
            "input": {"key": "value"},
            "description": null,
            "output": null,
        });
        let case: TestCase = serde_json::from_value(json).unwrap();
        assert_eq!(case.name, "my_test");
        assert!(case.description.is_none());
        assert!(case.output.is_none());
    }

    #[test]
    fn test_example_input_optional_null() {
        // Optional fields in ExampleInput as explicit null
        let json = serde_json::json!({
            "name": "my_example",
            "input": 42,
            "description": null,
        });
        let example: ExampleInput = serde_json::from_value(json).unwrap();
        assert_eq!(example.name, "my_example");
        assert!(example.description.is_none());
    }

    #[test]
    fn test_schema_comparison_with_flow_json() {
        use crate::json_schema::generate_json_schema_with_defs;
        use std::env;

        // Generate schema from Rust types
        let generated_json = generate_json_schema_with_defs::<Flow>();
        let generated_schema_str = serde_json::to_string_pretty(&generated_json).unwrap();

        let flow_schema_path = format!("{}/../../../schemas/flow.json", env!("CARGO_MANIFEST_DIR"));
        // Note: same relative path works for both stepflow-core and stepflow-flow
        // since both are in crates/*/
        // Check if we should overwrite the reference schema or if it doesn't exist
        if env::var("STEPFLOW_OVERWRITE_SCHEMA").is_ok() {
            // Ensure the directory exists
            if let Some(parent) = std::path::Path::new(&flow_schema_path).parent() {
                std::fs::create_dir_all(parent).expect("Failed to create schema directory");
            }

            std::fs::write(&flow_schema_path, &generated_schema_str)
                .expect("Failed to write updated schema");
        } else {
            match std::fs::read_to_string(&flow_schema_path) {
                Ok(expected_schema_str) => {
                    // Use similar_asserts for better diff output when schemas don't match
                    assert_eq!(
                        generated_schema_str, expected_schema_str,
                        "Generated schema does not match the reference schema at {flow_schema_path}. \
                         Run 'STEPFLOW_OVERWRITE_SCHEMA=1 cargo test -p stepflow-core' to update."
                    );
                }
                Err(_) => {
                    // File doesn't exist, fail the test with helpful message
                    panic!(
                        "Flow schema file not found at {flow_schema_path}. Run 'STEPFLOW_OVERWRITE_SCHEMA=1 cargo test -p stepflow-core' to create it."
                    );
                }
            }
        }
    }
}