Skip to main content

stepflow_flow/workflow/
flow_schema.rs

1// Copyright 2025 DataStax Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4// in compliance with the License. You may obtain a copy of the License at
5//
6//     http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software distributed under the License
9// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10// or implied. See the License for the specific language governing permissions and limitations under
11// the License.
12
13//! Flow schema types for consolidated schema information.
14
15use std::collections::HashMap;
16
17use indexmap::IndexMap;
18
19use crate::schema::SchemaRef;
20
21/// Consolidated schema information for a flow.
22///
23/// This struct contains all schema/type information for the flow in a single location,
24/// allowing shared `$defs` across all schemas and avoiding duplication.
25///
26/// Serializes as a valid JSON Schema with `type: "object"` and flow-specific
27/// properties (`input`, `output`, `variables`, `steps`) under the `properties` key.
28#[derive(Debug, Clone, PartialEq, Default)]
29pub struct FlowSchema {
30    /// Shared type definitions that can be referenced by other schemas.
31    /// References use the format `#/schemas/$defs/TypeName`.
32    pub defs: HashMap<String, SchemaRef>,
33
34    /// The input schema for the flow.
35    pub input: Option<SchemaRef>,
36
37    /// The output schema for the flow.
38    pub output: Option<SchemaRef>,
39
40    /// Schema for workflow variables. This is a JSON Schema object where
41    /// properties define the available variables and their types.
42    pub variables: Option<SchemaRef>,
43
44    /// Output schemas for each step, keyed by step ID.
45    /// Note: Step input schemas are not included here as they are
46    /// component metadata, not flow-specific schemas.
47    /// Uses IndexMap to preserve insertion order for deterministic serialization.
48    pub steps: IndexMap<String, SchemaRef>,
49}
50
51impl schemars::JsonSchema for FlowSchema {
52    fn schema_name() -> std::borrow::Cow<'static, str> {
53        "FlowSchema".into()
54    }
55
56    fn json_schema(_generator: &mut schemars::SchemaGenerator) -> schemars::Schema {
57        // Describe FlowSchema as an opaque JSON Schema object.
58        // The actual structure uses custom serde serialization that wraps fields
59        // in JSON Schema format (type/properties/$defs), which doesn't match
60        // the struct fields. By describing it as a free-form object, the OpenAPI
61        // spec and generated SDKs treat it as dict[str, Any] and pass it through
62        // without needing to understand the internal structure.
63        schemars::json_schema!({
64            "type": "object",
65            "description": "A JSON Schema object describing the flow's type information. Contains input/output schemas, variable schemas, and step output schemas wrapped in standard JSON Schema format with type, properties, and $defs.",
66            "additionalProperties": true
67        })
68    }
69}
70
71impl serde::Serialize for FlowSchema {
72    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
73    where
74        S: serde::Serializer,
75    {
76        use serde::ser::SerializeMap as _;
77
78        // Build properties object
79        let mut properties = serde_json::Map::new();
80
81        if let Some(input) = &self.input {
82            properties.insert(
83                "input".to_string(),
84                serde_json::to_value(input).map_err(serde::ser::Error::custom)?,
85            );
86        }
87
88        if let Some(output) = &self.output {
89            properties.insert(
90                "output".to_string(),
91                serde_json::to_value(output).map_err(serde::ser::Error::custom)?,
92            );
93        }
94
95        if let Some(variables) = &self.variables {
96            properties.insert(
97                "variables".to_string(),
98                serde_json::to_value(variables).map_err(serde::ser::Error::custom)?,
99            );
100        }
101
102        if !self.steps.is_empty() {
103            // Build steps as { type: object, properties: { step1: ..., step2: ... } }
104            let mut step_properties = serde_json::Map::new();
105            for (step_id, step_schema) in &self.steps {
106                step_properties.insert(
107                    step_id.clone(),
108                    serde_json::to_value(step_schema).map_err(serde::ser::Error::custom)?,
109                );
110            }
111
112            let steps_schema = serde_json::json!({
113                "type": "object",
114                "properties": step_properties
115            });
116            properties.insert("steps".to_string(), steps_schema);
117        }
118
119        // Count fields to serialize
120        let mut field_count = 1; // type is always present
121        if !self.defs.is_empty() {
122            field_count += 1;
123        }
124        if !properties.is_empty() {
125            field_count += 1;
126        }
127
128        let mut map = serializer.serialize_map(Some(field_count))?;
129
130        map.serialize_entry("type", "object")?;
131
132        if !self.defs.is_empty() {
133            map.serialize_entry("$defs", &self.defs)?;
134        }
135
136        if !properties.is_empty() {
137            map.serialize_entry("properties", &properties)?;
138        }
139
140        map.end()
141    }
142}
143
144impl<'de> serde::Deserialize<'de> for FlowSchema {
145    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
146    where
147        D: serde::Deserializer<'de>,
148    {
149        use serde::de::Error as _;
150
151        // Deserialize as a generic JSON value first
152        let value = serde_json::Value::deserialize(deserializer)?;
153
154        // Handle null as empty schema
155        if value.is_null() {
156            return Ok(FlowSchema::default());
157        }
158
159        let obj = value
160            .as_object()
161            .ok_or_else(|| D::Error::custom("FlowSchema must be an object"))?;
162
163        // Extract $defs
164        let defs: HashMap<String, SchemaRef> = if let Some(defs_val) = obj.get("$defs") {
165            if defs_val.is_null() {
166                HashMap::new()
167            } else {
168                serde_json::from_value(defs_val.clone()).map_err(D::Error::custom)?
169            }
170        } else {
171            HashMap::new()
172        };
173
174        // FlowSchema serializes as a JSON Schema: {"type": "object", "properties": {...}}
175        // Reject flat format (e.g. {"variables": ...}) — callers must send the correct
176        // JSON Schema structure.
177        let properties = obj.get("properties").and_then(|p| p.as_object());
178
179        // Detect common mistake: flat format without "properties" wrapper
180        if properties.is_none() {
181            let flat_keys = ["input", "output", "variables", "steps"];
182            for key in flat_keys {
183                if obj.contains_key(key) {
184                    return Err(D::Error::custom(format!(
185                        "FlowSchema field \"{key}\" must be inside a \"properties\" wrapper. \
186                         Expected JSON Schema format: \
187                         {{\"type\": \"object\", \"properties\": {{\"{key}\": ...}}}}"
188                    )));
189                }
190            }
191        }
192
193        let input: Option<SchemaRef> = if let Some(props) = properties {
194            props
195                .get("input")
196                .map(|v| serde_json::from_value(v.clone()))
197                .transpose()
198                .map_err(D::Error::custom)?
199        } else {
200            None
201        };
202
203        let output: Option<SchemaRef> = if let Some(props) = properties {
204            props
205                .get("output")
206                .map(|v| serde_json::from_value(v.clone()))
207                .transpose()
208                .map_err(D::Error::custom)?
209        } else {
210            None
211        };
212
213        let variables: Option<SchemaRef> = if let Some(props) = properties {
214            props
215                .get("variables")
216                .map(|v| serde_json::from_value(v.clone()))
217                .transpose()
218                .map_err(D::Error::custom)?
219        } else {
220            None
221        };
222
223        // Extract steps from properties.steps.properties
224        let steps: IndexMap<String, SchemaRef> = if let Some(props) = properties {
225            if let Some(steps_obj) = props.get("steps").and_then(|s| s.as_object()) {
226                if let Some(step_properties) =
227                    steps_obj.get("properties").and_then(|p| p.as_object())
228                {
229                    let mut steps_map = IndexMap::new();
230                    for (step_id, step_schema) in step_properties {
231                        let schema: SchemaRef = serde_json::from_value(step_schema.clone())
232                            .map_err(D::Error::custom)?;
233                        steps_map.insert(step_id.clone(), schema);
234                    }
235                    steps_map
236                } else {
237                    IndexMap::new()
238                }
239            } else {
240                IndexMap::new()
241            }
242        } else {
243            IndexMap::new()
244        };
245
246        Ok(FlowSchema {
247            defs,
248            input,
249            output,
250            variables,
251            steps,
252        })
253    }
254}
255
256impl FlowSchema {
257    /// Returns true if all fields are empty/None.
258    pub fn is_empty(&self) -> bool {
259        self.defs.is_empty()
260            && self.input.is_none()
261            && self.output.is_none()
262            && self.variables.is_none()
263            && self.steps.is_empty()
264    }
265}
266
267#[cfg(test)]
268mod tests {
269    use super::*;
270    use crate::workflow::variable_schema::VariableSchema;
271    use serde_json::json;
272
273    #[test]
274    fn test_deserialize_json_schema_format() {
275        let json = json!({
276            "type": "object",
277            "properties": {
278                "variables": {
279                    "type": "object",
280                    "properties": {
281                        "API_KEY": {
282                            "type": "string",
283                            "env_var": "MY_API_KEY"
284                        }
285                    }
286                }
287            }
288        });
289
290        let schema: FlowSchema = serde_json::from_value(json).unwrap();
291        assert!(schema.variables.is_some());
292
293        let var_schema = VariableSchema::from(schema.variables.unwrap());
294        assert_eq!(var_schema.env_var_name("API_KEY"), Some("MY_API_KEY"));
295    }
296
297    #[test]
298    fn test_rejects_flat_format() {
299        // Flat format (fields at top level without "properties" wrapper) should
300        // be rejected with a clear error message.
301        let json = json!({
302            "variables": {
303                "type": "object",
304                "properties": {
305                    "OPENAI_API_KEY": {
306                        "type": ["string", "null"],
307                        "env_var": "OPENAI_API_KEY"
308                    }
309                }
310            },
311            "steps": {}
312        });
313
314        let result = serde_json::from_value::<FlowSchema>(json);
315        let err = result.unwrap_err().to_string();
316        assert!(
317            err.contains("must be inside a \"properties\" wrapper"),
318            "Expected clear error about properties wrapper, got: {err}"
319        );
320    }
321
322    #[test]
323    fn test_roundtrip_preserves_env_var() {
324        let json = json!({
325            "type": "object",
326            "properties": {
327                "variables": {
328                    "type": "object",
329                    "properties": {
330                        "API_KEY": {
331                            "type": "string",
332                            "env_var": "MY_API_KEY",
333                            "is_secret": true
334                        }
335                    }
336                }
337            }
338        });
339
340        let schema: FlowSchema = serde_json::from_value(json).unwrap();
341
342        // Serialize back and deserialize again
343        let serialized = serde_json::to_value(&schema).unwrap();
344        let schema2: FlowSchema = serde_json::from_value(serialized).unwrap();
345        assert!(schema2.variables.is_some());
346
347        let var_schema = VariableSchema::from(schema2.variables.unwrap());
348        assert_eq!(var_schema.env_var_name("API_KEY"), Some("MY_API_KEY"));
349    }
350
351    #[test]
352    fn test_empty_schema_accepted() {
353        // Empty object and null should both work
354        let empty: FlowSchema = serde_json::from_value(json!({})).unwrap();
355        assert!(empty.is_empty());
356
357        let null: FlowSchema = serde_json::from_value(json!(null)).unwrap();
358        assert!(null.is_empty());
359
360        let type_only: FlowSchema = serde_json::from_value(json!({"type": "object"})).unwrap();
361        assert!(type_only.is_empty());
362    }
363}