stepflow_flow/workflow/
flow_schema.rs1use std::collections::HashMap;
16
17use indexmap::IndexMap;
18
19use crate::schema::SchemaRef;
20
21#[derive(Debug, Clone, PartialEq, Default)]
29pub struct FlowSchema {
30 pub defs: HashMap<String, SchemaRef>,
33
34 pub input: Option<SchemaRef>,
36
37 pub output: Option<SchemaRef>,
39
40 pub variables: Option<SchemaRef>,
43
44 pub steps: IndexMap<String, SchemaRef>,
49}
50
51impl schemars::JsonSchema for FlowSchema {
52 fn schema_name() -> std::borrow::Cow<'static, str> {
53 "FlowSchema".into()
54 }
55
56 fn json_schema(_generator: &mut schemars::SchemaGenerator) -> schemars::Schema {
57 schemars::json_schema!({
64 "type": "object",
65 "description": "A JSON Schema object describing the flow's type information. Contains input/output schemas, variable schemas, and step output schemas wrapped in standard JSON Schema format with type, properties, and $defs.",
66 "additionalProperties": true
67 })
68 }
69}
70
71impl serde::Serialize for FlowSchema {
72 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
73 where
74 S: serde::Serializer,
75 {
76 use serde::ser::SerializeMap as _;
77
78 let mut properties = serde_json::Map::new();
80
81 if let Some(input) = &self.input {
82 properties.insert(
83 "input".to_string(),
84 serde_json::to_value(input).map_err(serde::ser::Error::custom)?,
85 );
86 }
87
88 if let Some(output) = &self.output {
89 properties.insert(
90 "output".to_string(),
91 serde_json::to_value(output).map_err(serde::ser::Error::custom)?,
92 );
93 }
94
95 if let Some(variables) = &self.variables {
96 properties.insert(
97 "variables".to_string(),
98 serde_json::to_value(variables).map_err(serde::ser::Error::custom)?,
99 );
100 }
101
102 if !self.steps.is_empty() {
103 let mut step_properties = serde_json::Map::new();
105 for (step_id, step_schema) in &self.steps {
106 step_properties.insert(
107 step_id.clone(),
108 serde_json::to_value(step_schema).map_err(serde::ser::Error::custom)?,
109 );
110 }
111
112 let steps_schema = serde_json::json!({
113 "type": "object",
114 "properties": step_properties
115 });
116 properties.insert("steps".to_string(), steps_schema);
117 }
118
119 let mut field_count = 1; if !self.defs.is_empty() {
122 field_count += 1;
123 }
124 if !properties.is_empty() {
125 field_count += 1;
126 }
127
128 let mut map = serializer.serialize_map(Some(field_count))?;
129
130 map.serialize_entry("type", "object")?;
131
132 if !self.defs.is_empty() {
133 map.serialize_entry("$defs", &self.defs)?;
134 }
135
136 if !properties.is_empty() {
137 map.serialize_entry("properties", &properties)?;
138 }
139
140 map.end()
141 }
142}
143
144impl<'de> serde::Deserialize<'de> for FlowSchema {
145 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
146 where
147 D: serde::Deserializer<'de>,
148 {
149 use serde::de::Error as _;
150
151 let value = serde_json::Value::deserialize(deserializer)?;
153
154 if value.is_null() {
156 return Ok(FlowSchema::default());
157 }
158
159 let obj = value
160 .as_object()
161 .ok_or_else(|| D::Error::custom("FlowSchema must be an object"))?;
162
163 let defs: HashMap<String, SchemaRef> = if let Some(defs_val) = obj.get("$defs") {
165 if defs_val.is_null() {
166 HashMap::new()
167 } else {
168 serde_json::from_value(defs_val.clone()).map_err(D::Error::custom)?
169 }
170 } else {
171 HashMap::new()
172 };
173
174 let properties = obj.get("properties").and_then(|p| p.as_object());
178
179 if properties.is_none() {
181 let flat_keys = ["input", "output", "variables", "steps"];
182 for key in flat_keys {
183 if obj.contains_key(key) {
184 return Err(D::Error::custom(format!(
185 "FlowSchema field \"{key}\" must be inside a \"properties\" wrapper. \
186 Expected JSON Schema format: \
187 {{\"type\": \"object\", \"properties\": {{\"{key}\": ...}}}}"
188 )));
189 }
190 }
191 }
192
193 let input: Option<SchemaRef> = if let Some(props) = properties {
194 props
195 .get("input")
196 .map(|v| serde_json::from_value(v.clone()))
197 .transpose()
198 .map_err(D::Error::custom)?
199 } else {
200 None
201 };
202
203 let output: Option<SchemaRef> = if let Some(props) = properties {
204 props
205 .get("output")
206 .map(|v| serde_json::from_value(v.clone()))
207 .transpose()
208 .map_err(D::Error::custom)?
209 } else {
210 None
211 };
212
213 let variables: Option<SchemaRef> = if let Some(props) = properties {
214 props
215 .get("variables")
216 .map(|v| serde_json::from_value(v.clone()))
217 .transpose()
218 .map_err(D::Error::custom)?
219 } else {
220 None
221 };
222
223 let steps: IndexMap<String, SchemaRef> = if let Some(props) = properties {
225 if let Some(steps_obj) = props.get("steps").and_then(|s| s.as_object()) {
226 if let Some(step_properties) =
227 steps_obj.get("properties").and_then(|p| p.as_object())
228 {
229 let mut steps_map = IndexMap::new();
230 for (step_id, step_schema) in step_properties {
231 let schema: SchemaRef = serde_json::from_value(step_schema.clone())
232 .map_err(D::Error::custom)?;
233 steps_map.insert(step_id.clone(), schema);
234 }
235 steps_map
236 } else {
237 IndexMap::new()
238 }
239 } else {
240 IndexMap::new()
241 }
242 } else {
243 IndexMap::new()
244 };
245
246 Ok(FlowSchema {
247 defs,
248 input,
249 output,
250 variables,
251 steps,
252 })
253 }
254}
255
256impl FlowSchema {
257 pub fn is_empty(&self) -> bool {
259 self.defs.is_empty()
260 && self.input.is_none()
261 && self.output.is_none()
262 && self.variables.is_none()
263 && self.steps.is_empty()
264 }
265}
266
267#[cfg(test)]
268mod tests {
269 use super::*;
270 use crate::workflow::variable_schema::VariableSchema;
271 use serde_json::json;
272
273 #[test]
274 fn test_deserialize_json_schema_format() {
275 let json = json!({
276 "type": "object",
277 "properties": {
278 "variables": {
279 "type": "object",
280 "properties": {
281 "API_KEY": {
282 "type": "string",
283 "env_var": "MY_API_KEY"
284 }
285 }
286 }
287 }
288 });
289
290 let schema: FlowSchema = serde_json::from_value(json).unwrap();
291 assert!(schema.variables.is_some());
292
293 let var_schema = VariableSchema::from(schema.variables.unwrap());
294 assert_eq!(var_schema.env_var_name("API_KEY"), Some("MY_API_KEY"));
295 }
296
297 #[test]
298 fn test_rejects_flat_format() {
299 let json = json!({
302 "variables": {
303 "type": "object",
304 "properties": {
305 "OPENAI_API_KEY": {
306 "type": ["string", "null"],
307 "env_var": "OPENAI_API_KEY"
308 }
309 }
310 },
311 "steps": {}
312 });
313
314 let result = serde_json::from_value::<FlowSchema>(json);
315 let err = result.unwrap_err().to_string();
316 assert!(
317 err.contains("must be inside a \"properties\" wrapper"),
318 "Expected clear error about properties wrapper, got: {err}"
319 );
320 }
321
322 #[test]
323 fn test_roundtrip_preserves_env_var() {
324 let json = json!({
325 "type": "object",
326 "properties": {
327 "variables": {
328 "type": "object",
329 "properties": {
330 "API_KEY": {
331 "type": "string",
332 "env_var": "MY_API_KEY",
333 "is_secret": true
334 }
335 }
336 }
337 }
338 });
339
340 let schema: FlowSchema = serde_json::from_value(json).unwrap();
341
342 let serialized = serde_json::to_value(&schema).unwrap();
344 let schema2: FlowSchema = serde_json::from_value(serialized).unwrap();
345 assert!(schema2.variables.is_some());
346
347 let var_schema = VariableSchema::from(schema2.variables.unwrap());
348 assert_eq!(var_schema.env_var_name("API_KEY"), Some("MY_API_KEY"));
349 }
350
351 #[test]
352 fn test_empty_schema_accepted() {
353 let empty: FlowSchema = serde_json::from_value(json!({})).unwrap();
355 assert!(empty.is_empty());
356
357 let null: FlowSchema = serde_json::from_value(json!(null)).unwrap();
358 assert!(null.is_empty());
359
360 let type_only: FlowSchema = serde_json::from_value(json!({"type": "object"})).unwrap();
361 assert!(type_only.is_empty());
362 }
363}