use std::collections::HashMap;
use indexmap::IndexMap;
use crate::schema::SchemaRef;
#[derive(Debug, Clone, PartialEq, Default)]
pub struct FlowSchema {
pub defs: HashMap<String, SchemaRef>,
pub input: Option<SchemaRef>,
pub output: Option<SchemaRef>,
pub variables: Option<SchemaRef>,
pub steps: IndexMap<String, SchemaRef>,
}
impl schemars::JsonSchema for FlowSchema {
fn schema_name() -> std::borrow::Cow<'static, str> {
"FlowSchema".into()
}
fn json_schema(_generator: &mut schemars::SchemaGenerator) -> schemars::Schema {
schemars::json_schema!({
"type": "object",
"description": "A JSON Schema object describing the flow's type information. Contains input/output schemas, variable schemas, and step output schemas wrapped in standard JSON Schema format with type, properties, and $defs.",
"additionalProperties": true
})
}
}
impl serde::Serialize for FlowSchema {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeMap as _;
let mut properties = serde_json::Map::new();
if let Some(input) = &self.input {
properties.insert(
"input".to_string(),
serde_json::to_value(input).map_err(serde::ser::Error::custom)?,
);
}
if let Some(output) = &self.output {
properties.insert(
"output".to_string(),
serde_json::to_value(output).map_err(serde::ser::Error::custom)?,
);
}
if let Some(variables) = &self.variables {
properties.insert(
"variables".to_string(),
serde_json::to_value(variables).map_err(serde::ser::Error::custom)?,
);
}
if !self.steps.is_empty() {
let mut step_properties = serde_json::Map::new();
for (step_id, step_schema) in &self.steps {
step_properties.insert(
step_id.clone(),
serde_json::to_value(step_schema).map_err(serde::ser::Error::custom)?,
);
}
let steps_schema = serde_json::json!({
"type": "object",
"properties": step_properties
});
properties.insert("steps".to_string(), steps_schema);
}
let mut field_count = 1; if !self.defs.is_empty() {
field_count += 1;
}
if !properties.is_empty() {
field_count += 1;
}
let mut map = serializer.serialize_map(Some(field_count))?;
map.serialize_entry("type", "object")?;
if !self.defs.is_empty() {
map.serialize_entry("$defs", &self.defs)?;
}
if !properties.is_empty() {
map.serialize_entry("properties", &properties)?;
}
map.end()
}
}
impl<'de> serde::Deserialize<'de> for FlowSchema {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::Error as _;
let value = serde_json::Value::deserialize(deserializer)?;
if value.is_null() {
return Ok(FlowSchema::default());
}
let obj = value
.as_object()
.ok_or_else(|| D::Error::custom("FlowSchema must be an object"))?;
let defs: HashMap<String, SchemaRef> = if let Some(defs_val) = obj.get("$defs") {
if defs_val.is_null() {
HashMap::new()
} else {
serde_json::from_value(defs_val.clone()).map_err(D::Error::custom)?
}
} else {
HashMap::new()
};
let properties = obj.get("properties").and_then(|p| p.as_object());
if properties.is_none() {
let flat_keys = ["input", "output", "variables", "steps"];
for key in flat_keys {
if obj.contains_key(key) {
return Err(D::Error::custom(format!(
"FlowSchema field \"{key}\" must be inside a \"properties\" wrapper. \
Expected JSON Schema format: \
{{\"type\": \"object\", \"properties\": {{\"{key}\": ...}}}}"
)));
}
}
}
let input: Option<SchemaRef> = if let Some(props) = properties {
props
.get("input")
.map(|v| serde_json::from_value(v.clone()))
.transpose()
.map_err(D::Error::custom)?
} else {
None
};
let output: Option<SchemaRef> = if let Some(props) = properties {
props
.get("output")
.map(|v| serde_json::from_value(v.clone()))
.transpose()
.map_err(D::Error::custom)?
} else {
None
};
let variables: Option<SchemaRef> = if let Some(props) = properties {
props
.get("variables")
.map(|v| serde_json::from_value(v.clone()))
.transpose()
.map_err(D::Error::custom)?
} else {
None
};
let steps: IndexMap<String, SchemaRef> = if let Some(props) = properties {
if let Some(steps_obj) = props.get("steps").and_then(|s| s.as_object()) {
if let Some(step_properties) =
steps_obj.get("properties").and_then(|p| p.as_object())
{
let mut steps_map = IndexMap::new();
for (step_id, step_schema) in step_properties {
let schema: SchemaRef = serde_json::from_value(step_schema.clone())
.map_err(D::Error::custom)?;
steps_map.insert(step_id.clone(), schema);
}
steps_map
} else {
IndexMap::new()
}
} else {
IndexMap::new()
}
} else {
IndexMap::new()
};
Ok(FlowSchema {
defs,
input,
output,
variables,
steps,
})
}
}
impl FlowSchema {
pub fn is_empty(&self) -> bool {
self.defs.is_empty()
&& self.input.is_none()
&& self.output.is_none()
&& self.variables.is_none()
&& self.steps.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::workflow::variable_schema::VariableSchema;
use serde_json::json;
#[test]
fn test_deserialize_json_schema_format() {
let json = json!({
"type": "object",
"properties": {
"variables": {
"type": "object",
"properties": {
"API_KEY": {
"type": "string",
"env_var": "MY_API_KEY"
}
}
}
}
});
let schema: FlowSchema = serde_json::from_value(json).unwrap();
assert!(schema.variables.is_some());
let var_schema = VariableSchema::from(schema.variables.unwrap());
assert_eq!(var_schema.env_var_name("API_KEY"), Some("MY_API_KEY"));
}
#[test]
fn test_rejects_flat_format() {
let json = json!({
"variables": {
"type": "object",
"properties": {
"OPENAI_API_KEY": {
"type": ["string", "null"],
"env_var": "OPENAI_API_KEY"
}
}
},
"steps": {}
});
let result = serde_json::from_value::<FlowSchema>(json);
let err = result.unwrap_err().to_string();
assert!(
err.contains("must be inside a \"properties\" wrapper"),
"Expected clear error about properties wrapper, got: {err}"
);
}
#[test]
fn test_roundtrip_preserves_env_var() {
let json = json!({
"type": "object",
"properties": {
"variables": {
"type": "object",
"properties": {
"API_KEY": {
"type": "string",
"env_var": "MY_API_KEY",
"is_secret": true
}
}
}
}
});
let schema: FlowSchema = serde_json::from_value(json).unwrap();
let serialized = serde_json::to_value(&schema).unwrap();
let schema2: FlowSchema = serde_json::from_value(serialized).unwrap();
assert!(schema2.variables.is_some());
let var_schema = VariableSchema::from(schema2.variables.unwrap());
assert_eq!(var_schema.env_var_name("API_KEY"), Some("MY_API_KEY"));
}
#[test]
fn test_empty_schema_accepted() {
let empty: FlowSchema = serde_json::from_value(json!({})).unwrap();
assert!(empty.is_empty());
let null: FlowSchema = serde_json::from_value(json!(null)).unwrap();
assert!(null.is_empty());
let type_only: FlowSchema = serde_json::from_value(json!({"type": "object"})).unwrap();
assert!(type_only.is_empty());
}
}