Skip to main content

stepflow_flow/workflow/
flow.rs

1// Copyright 2025 DataStax Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4// in compliance with the License. You may obtain a copy of the License at
5//
6//     http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software distributed under the License
9// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10// or implied. See the License for the specific language governing permissions and limitations under
11// the License.
12
13use std::collections::HashMap;
14
15use serde_with::{DefaultOnNull, serde_as};
16
17use super::{FlowSchema, Step, ValueRef, VariableSchema};
18use crate::{FlowResult, ValueExpr, schema::SchemaRef};
19
20/// A workflow consisting of a sequence of steps and their outputs.
21///
22/// A flow represents a complete workflow that can be executed. It contains:
23/// - A sequence of steps to execute
24/// - Named outputs that can reference step outputs
25///
26/// Flows should not be cloned. They should generally be stored and passed as a
27/// reference or inside an `Arc`.
28#[serde_as]
29#[derive(
30    Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Default, schemars::JsonSchema,
31)]
32#[serde(rename_all = "camelCase")]
33pub struct Flow {
34    /// The name of the flow.
35    #[serde(default, skip_serializing_if = "Option::is_none")]
36    pub name: Option<String>,
37
38    /// The description of the flow.
39    #[serde(default, skip_serializing_if = "Option::is_none")]
40    pub description: Option<String>,
41
42    /// The version of the flow.
43    #[serde(default, skip_serializing_if = "Option::is_none")]
44    pub version: Option<String>,
45
46    /// Consolidated schema information for the flow.
47    /// Contains input/output schemas, step output schemas, and shared `$defs`.
48    #[serde(default, skip_serializing_if = "FlowSchema::is_empty")]
49    #[serde_as(as = "DefaultOnNull")]
50    #[schemars(with = "FlowSchema")]
51    pub schemas: FlowSchema,
52
53    /// The steps to execute for the flow.
54    #[serde(default, skip_serializing_if = "Vec::is_empty")]
55    #[serde_as(as = "DefaultOnNull")]
56    pub steps: Vec<Step>,
57
58    /// The outputs of the flow, mapping output names to their values.
59    #[serde(default, skip_serializing_if = "ValueExpr::is_null")]
60    pub output: ValueExpr,
61
62    /// Test configuration for the flow.
63    #[serde(default, skip_serializing_if = "Option::is_none")]
64    pub test: Option<TestConfig>,
65
66    /// Example inputs for the workflow that can be used for testing and UI dropdowns.
67    #[serde(default, skip_serializing_if = "Option::is_none")]
68    pub examples: Option<Vec<ExampleInput>>,
69
70    /// Extensible metadata for the flow that can be used by tools and frameworks.
71    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
72    #[serde_as(as = "DefaultOnNull")]
73    pub metadata: HashMap<String, serde_json::Value>,
74}
75
76impl Flow {
77    /// Create a clone of this flow.
78    ///
79    /// **Warning**: This method performs a deep clone of the entire workflow structure,
80    /// including all steps, metadata, and configurations. This can be expensive for
81    /// large workflows.
82    ///
83    /// # Performance
84    /// - Cloning large workflows with many steps can be slow
85    /// - Consider using `Arc<Flow>` for shared ownership instead
86    /// - Only use this when you need to modify the workflow structure
87    ///
88    /// # Example
89    /// ```rust
90    /// use stepflow_core::workflow::Flow;
91    ///
92    /// let original_flow = Flow::default();
93    /// let cloned_flow = original_flow.slow_clone();
94    /// ```
95    pub fn slow_clone(&self) -> Self {
96        self.clone()
97    }
98
99    pub fn name(&self) -> Option<&str> {
100        self.name.as_deref()
101    }
102
103    pub fn description(&self) -> Option<&str> {
104        self.description.as_deref()
105    }
106
107    pub fn version(&self) -> Option<&str> {
108        self.version.as_deref()
109    }
110
111    pub fn metadata(&self) -> &HashMap<String, serde_json::Value> {
112        &self.metadata
113    }
114
115    /// Returns a reference to all steps in the flow.
116    pub fn steps(&self) -> &[Step] {
117        &self.steps
118    }
119
120    pub fn examples(&self) -> &[ExampleInput] {
121        self.examples.as_deref().unwrap_or(&[])
122    }
123
124    /// Get the variable schema for the flow.
125    ///
126    /// This constructs a `VariableSchema` from the schema definition, extracting
127    /// runtime metadata like defaults, secrets, and required variables.
128    pub fn variables(&self) -> Option<VariableSchema> {
129        self.schemas().variables.clone().map(VariableSchema::from)
130    }
131
132    /// Get a reference to the variable schema (raw SchemaRef).
133    pub fn variable_schema(&self) -> Option<&SchemaRef> {
134        self.schemas().variables.as_ref()
135    }
136
137    /// Returns a reference to the step at the given index.
138    ///
139    /// # Panics
140    ///
141    /// Panics if the index is out of bounds.
142    pub fn step(&self, index: usize) -> &Step {
143        &self.steps[index]
144    }
145
146    /// Returns a mutable reference to the step at the given index.
147    ///
148    /// # Panics
149    ///
150    /// Panics if the index is out of bounds.
151    pub fn step_mut(&mut self, index: usize) -> &mut Step {
152        self.steps.get_mut(index).expect("Index out of bounds")
153    }
154
155    /// Returns a reference to the flow's output value.
156    pub fn output(&self) -> &ValueExpr {
157        &self.output
158    }
159
160    pub fn test(&self) -> Option<&TestConfig> {
161        self.test.as_ref()
162    }
163
164    pub fn test_mut(&mut self) -> Option<&mut TestConfig> {
165        self.test.as_mut()
166    }
167
168    /// Get the flow's schema information.
169    pub fn schemas(&self) -> &FlowSchema {
170        &self.schemas
171    }
172
173    /// Get a mutable reference to the flow's schema information.
174    pub fn schemas_mut(&mut self) -> &mut FlowSchema {
175        &mut self.schemas
176    }
177
178    /// Get the flow's input schema.
179    pub fn input_schema(&self) -> Option<&SchemaRef> {
180        self.schemas.input.as_ref()
181    }
182
183    /// Set the flow's input schema.
184    pub fn set_input_schema(&mut self, input_schema: Option<SchemaRef>) {
185        self.schemas.input = input_schema;
186    }
187
188    /// Get the flow's output schema.
189    pub fn output_schema(&self) -> Option<&SchemaRef> {
190        self.schemas.output.as_ref()
191    }
192
193    /// Set the flow's output schema.
194    pub fn set_output_schema(&mut self, output_schema: Option<SchemaRef>) {
195        self.schemas.output = output_schema;
196    }
197
198    /// Get the output schema for a specific step.
199    pub fn step_output_schema(&self, step_id: &str) -> Option<&SchemaRef> {
200        self.schemas.steps.get(step_id)
201    }
202
203    /// Set the output schema for a specific step.
204    pub fn set_step_output_schema(&mut self, step_id: String, step_schema: SchemaRef) {
205        self.schemas.steps.insert(step_id, step_schema);
206    }
207
208    /// Get all example inputs, including those derived from test cases.
209    pub fn get_all_examples(&self) -> Vec<ExampleInput> {
210        let mut examples = self.examples().to_vec();
211
212        // Add examples from test cases if they exist
213        if let Some(test_config) = &self.test {
214            for test_case in &test_config.cases {
215                // Only add if there isn't already an example with the same name
216                if !examples.iter().any(|ex| ex.name == test_case.name) {
217                    examples.push(ExampleInput::from(test_case));
218                }
219            }
220        }
221
222        examples
223    }
224}
225
226/// A wrapper around `Arc<Flow>` to support poem-openapi traits.
227///
228/// This wrapper exists to work around Rust's orphan rules which prevent
229/// implementing external traits on external types like `Arc<Flow>`.
230#[derive(Debug, Clone, PartialEq)]
231pub struct FlowRef(std::sync::Arc<Flow>);
232
233impl FlowRef {
234    /// Create a new FlowRef from a Flow.
235    pub fn new(flow: Flow) -> Self {
236        Self(std::sync::Arc::new(flow))
237    }
238
239    /// Create a new FlowRef from an `Arc<Flow>`.
240    pub fn from_arc(arc: std::sync::Arc<Flow>) -> Self {
241        Self(arc)
242    }
243
244    /// Get a reference to the underlying Flow.
245    pub fn as_flow(&self) -> &Flow {
246        &self.0
247    }
248
249    /// Get the underlying `Arc<Flow>`.
250    pub fn into_arc(self) -> std::sync::Arc<Flow> {
251        self.0
252    }
253
254    /// Get a reference to the underlying `Arc<Flow>`.
255    pub fn as_arc(&self) -> &std::sync::Arc<Flow> {
256        &self.0
257    }
258}
259
260impl std::ops::Deref for FlowRef {
261    type Target = Flow;
262
263    fn deref(&self) -> &Self::Target {
264        &self.0
265    }
266}
267
268impl From<Flow> for FlowRef {
269    fn from(flow: Flow) -> Self {
270        Self::new(flow)
271    }
272}
273
274impl From<std::sync::Arc<Flow>> for FlowRef {
275    fn from(arc: std::sync::Arc<Flow>) -> Self {
276        Self::from_arc(arc)
277    }
278}
279
280impl serde::Serialize for FlowRef {
281    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
282    where
283        S: serde::Serializer,
284    {
285        self.0.serialize(serializer)
286    }
287}
288
289impl<'de> serde::Deserialize<'de> for FlowRef {
290    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
291    where
292        D: serde::Deserializer<'de>,
293    {
294        let flow = Flow::deserialize(deserializer)?;
295        Ok(Self::new(flow))
296    }
297}
298
299/// Configuration for testing a workflow.
300#[serde_as]
301#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, schemars::JsonSchema)]
302#[serde(rename_all = "camelCase")]
303pub struct TestConfig {
304    /// Path to an external stepflow config file for tests.
305    /// Relative paths are resolved from the workflow file's directory.
306    /// Mutually exclusive with `config` - validated at runtime.
307    #[serde(default, skip_serializing_if = "Option::is_none")]
308    pub config_file: Option<String>,
309
310    /// Inline stepflow configuration for tests.
311    /// Mutually exclusive with `config_file` - validated at runtime.
312    #[serde(
313        default,
314        skip_serializing_if = "Option::is_none",
315        alias = "stepflow_config"
316    )]
317    pub config: Option<serde_json::Value>,
318
319    /// Test cases for the workflow.
320    #[serde(default, skip_serializing_if = "Vec::is_empty")]
321    #[serde_as(as = "DefaultOnNull")]
322    pub cases: Vec<TestCase>,
323}
324
325/// A single test case for a workflow.
326#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, schemars::JsonSchema)]
327#[serde(rename_all = "camelCase")]
328pub struct TestCase {
329    /// Unique identifier for the test case.
330    pub name: String,
331
332    /// Optional description of what this test case verifies.
333    #[serde(default, skip_serializing_if = "Option::is_none")]
334    pub description: Option<String>,
335
336    /// Input data for the workflow in this test case.
337    pub input: ValueRef,
338
339    /// Expected output from the workflow for this test case.
340    #[serde(default, skip_serializing_if = "Option::is_none")]
341    pub output: Option<FlowResult>,
342}
343
344/// An example input for a workflow that can be used in UI dropdowns.
345#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, schemars::JsonSchema)]
346#[serde(rename_all = "camelCase")]
347pub struct ExampleInput {
348    /// Name of the example input for display purposes.
349    pub name: String,
350
351    /// Optional description of what this example demonstrates.
352    #[serde(default, skip_serializing_if = "Option::is_none")]
353    pub description: Option<String>,
354
355    /// The input data for this example.
356    pub input: ValueRef,
357}
358
359impl From<&TestCase> for ExampleInput {
360    fn from(test_case: &TestCase) -> Self {
361        Self {
362            name: test_case.name.clone(),
363            description: test_case.description.clone(),
364            input: test_case.input.clone(),
365        }
366    }
367}
368
369#[cfg(test)]
370mod tests {
371    use crate::workflow::{FlowBuilder, StepBuilder};
372
373    use super::*;
374
375    #[test]
376    fn test_flow_from_yaml() {
377        let yaml = r#"
378        name: test
379        description: test
380        version: 1.0.0
381        schemas:
382            type: object
383            properties:
384                input:
385                    type: object
386                    properties:
387                        name:
388                            type: string
389                            description: The name to echo
390                        count:
391                            type: integer
392                output:
393                    type: object
394                    properties:
395                        s1a:
396                            type: string
397                        s2b:
398                            type: string
399        steps:
400          - component: /langflow/echo
401            id: s1
402            input:
403              a: "hello world"
404          - component: /mcp/foo/bar
405            id: s2
406            input:
407              a: "hello world 2"
408        output:
409            s1a: { $step: s1, path: "a" }
410            s2b: { $step: s2, path: a }
411        "#;
412        let flow: Flow = serde_yaml_ng::from_str(yaml).unwrap();
413        let input_schema = SchemaRef::parse_json(r#"{"type":"object","properties":{"name":{"type":"string","description":"The name to echo"},"count":{"type":"integer"}}}"#).unwrap();
414        let output_schema = SchemaRef::parse_json(
415            r#"{"type":"object","properties":{"s1a":{"type":"string"},"s2b":{"type":"string"}}}"#,
416        )
417        .unwrap();
418        // Verify basic flow properties
419        assert_eq!(flow.name, Some("test".to_owned()));
420        assert_eq!(flow.description, Some("test".to_owned()));
421        assert_eq!(flow.version, Some("1.0.0".to_owned()));
422        assert_eq!(flow.schemas.input, Some(input_schema.clone()));
423        assert_eq!(flow.schemas.output, Some(output_schema.clone()));
424        assert_eq!(flow.steps.len(), 2);
425
426        // Verify step details
427        assert_eq!(flow.steps[0].id, "s1");
428        assert_eq!(flow.steps[0].component.path(), "/langflow/echo");
429        assert_eq!(flow.steps[1].id, "s2");
430        assert_eq!(flow.steps[1].component.path(), "/mcp/foo/bar");
431
432        // Test round-trip serialization to ensure expressions are preserved
433        let serialized = serde_json::to_string(&flow).unwrap();
434        let deserialized: Flow = serde_json::from_str(&serialized).unwrap();
435        assert_eq!(flow.name, deserialized.name);
436        assert_eq!(flow.steps.len(), deserialized.steps.len());
437        assert_eq!(flow.output, deserialized.output);
438
439        // Verify that the output contains proper expression structures
440        // The output should be parsed as an Object expression containing step references
441        assert!(matches!(flow.output, ValueExpr::Object(_)));
442
443        // Test full structural equality
444        let expected_flow = FlowBuilder::new()
445            .name("test")
446            .description("test")
447            .version("1.0.0")
448            .input_schema(input_schema)
449            .output_schema(output_schema)
450            .steps(vec![
451                StepBuilder::new("s1")
452                    .component("/langflow/echo")
453                    .input_literal(serde_json::json!({
454                        "a": "hello world"
455                    }))
456                    .build(),
457                StepBuilder::new("s2")
458                    .component("/mcp/foo/bar")
459                    .input_literal(serde_json::json!({
460                        "a": "hello world 2"
461                    }))
462                    .build(),
463            ])
464            .output(
465                serde_json::from_value(serde_json::json!({
466                    "s1a": { "$step": "s1", "path": "a" },
467                    "s2b": { "$step": "s2", "path": "a" }
468                }))
469                .unwrap(),
470            )
471            .build();
472
473        similar_asserts::assert_serde_eq!(&flow, &expected_flow);
474    }
475
476    #[test]
477    fn test_get_all_examples() {
478        use super::*;
479        use serde_json::json;
480
481        // Create a flow with both examples and test cases
482        let flow = FlowBuilder::new()
483            .name("test_flow")
484            .output(ValueExpr::literal(json!({})))
485            .examples(vec![ExampleInput {
486                name: "example1".to_string(),
487                description: Some("Direct example".to_string()),
488                input: ValueRef::new(json!({"input": "example"})),
489            }])
490            .test_config(TestConfig {
491                config: None,
492                config_file: None,
493                cases: vec![
494                    TestCase {
495                        name: "test1".to_string(),
496                        description: Some("Test case as example".to_string()),
497                        input: ValueRef::new(json!({"input": "test"})),
498                        output: None,
499                    },
500                    TestCase {
501                        name: "example1".to_string(), // Duplicate name, should not be added
502                        description: Some("Duplicate name".to_string()),
503                        input: ValueRef::new(json!({"input": "duplicate"})),
504                        output: None,
505                    },
506                ],
507            })
508            .build();
509
510        let all_examples = flow.get_all_examples();
511
512        // Should have 2 examples: 1 direct + 1 from test cases (duplicate name ignored)
513        assert_eq!(all_examples.len(), 2);
514
515        // Check first example (direct)
516        assert_eq!(all_examples[0].name, "example1");
517        assert_eq!(
518            all_examples[0].description,
519            Some("Direct example".to_string())
520        );
521
522        // Check second example (from test case)
523        assert_eq!(all_examples[1].name, "test1");
524        assert_eq!(
525            all_examples[1].description,
526            Some("Test case as example".to_string())
527        );
528    }
529
530    #[test]
531    fn test_flow_all_optional_null() {
532        // All optional/defaulted fields sent as explicit null — the worst case from
533        // a Python client calling model_dump() without exclude_none=True.
534        let json = serde_json::json!({
535            "name": null,
536            "description": null,
537            "version": null,
538            "schemas": null,
539            "steps": null,
540            "output": null,
541            "test": null,
542            "examples": null,
543            "metadata": null,
544        });
545        let flow: Flow = serde_json::from_value(json).unwrap();
546        assert!(flow.name.is_none());
547        assert!(flow.description.is_none());
548        assert!(flow.version.is_none());
549        assert!(flow.schemas.is_empty());
550        assert!(flow.steps.is_empty());
551        assert!(flow.output.is_null());
552        assert!(flow.test.is_none());
553        assert!(flow.examples.is_none());
554        assert!(flow.metadata.is_empty());
555    }
556
557    #[test]
558    fn test_test_config_all_optional_null() {
559        // All optional/defaulted fields as explicit null
560        let json = serde_json::json!({
561            "configFile": null,
562            "config": null,
563            "cases": null,
564        });
565        let config: TestConfig = serde_json::from_value(json).unwrap();
566        assert!(config.config_file.is_none());
567        assert!(config.config.is_none());
568        assert!(config.cases.is_empty());
569    }
570
571    #[test]
572    fn test_test_case_optional_null() {
573        // Optional fields in TestCase as explicit null
574        let json = serde_json::json!({
575            "name": "my_test",
576            "input": {"key": "value"},
577            "description": null,
578            "output": null,
579        });
580        let case: TestCase = serde_json::from_value(json).unwrap();
581        assert_eq!(case.name, "my_test");
582        assert!(case.description.is_none());
583        assert!(case.output.is_none());
584    }
585
586    #[test]
587    fn test_example_input_optional_null() {
588        // Optional fields in ExampleInput as explicit null
589        let json = serde_json::json!({
590            "name": "my_example",
591            "input": 42,
592            "description": null,
593        });
594        let example: ExampleInput = serde_json::from_value(json).unwrap();
595        assert_eq!(example.name, "my_example");
596        assert!(example.description.is_none());
597    }
598
599    #[test]
600    fn test_schema_comparison_with_flow_json() {
601        use crate::json_schema::generate_json_schema_with_defs;
602        use std::env;
603
604        // Generate schema from Rust types
605        let generated_json = generate_json_schema_with_defs::<Flow>();
606        let generated_schema_str = serde_json::to_string_pretty(&generated_json).unwrap();
607
608        let flow_schema_path = format!("{}/../../../schemas/flow.json", env!("CARGO_MANIFEST_DIR"));
609        // Note: same relative path works for both stepflow-core and stepflow-flow
610        // since both are in crates/*/
611        // Check if we should overwrite the reference schema or if it doesn't exist
612        if env::var("STEPFLOW_OVERWRITE_SCHEMA").is_ok() {
613            // Ensure the directory exists
614            if let Some(parent) = std::path::Path::new(&flow_schema_path).parent() {
615                std::fs::create_dir_all(parent).expect("Failed to create schema directory");
616            }
617
618            std::fs::write(&flow_schema_path, &generated_schema_str)
619                .expect("Failed to write updated schema");
620        } else {
621            match std::fs::read_to_string(&flow_schema_path) {
622                Ok(expected_schema_str) => {
623                    // Use similar_asserts for better diff output when schemas don't match
624                    assert_eq!(
625                        generated_schema_str, expected_schema_str,
626                        "Generated schema does not match the reference schema at {flow_schema_path}. \
627                         Run 'STEPFLOW_OVERWRITE_SCHEMA=1 cargo test -p stepflow-core' to update."
628                    );
629                }
630                Err(_) => {
631                    // File doesn't exist, fail the test with helpful message
632                    panic!(
633                        "Flow schema file not found at {flow_schema_path}. Run 'STEPFLOW_OVERWRITE_SCHEMA=1 cargo test -p stepflow-core' to create it."
634                    );
635                }
636            }
637        }
638    }
639}