use std::collections::HashMap;
use serde_with::{DefaultOnNull, serde_as};
use super::{FlowSchema, Step, ValueRef, VariableSchema};
use crate::{FlowResult, ValueExpr, schema::SchemaRef};
#[serde_as]
#[derive(
Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Default, schemars::JsonSchema,
)]
#[serde(rename_all = "camelCase")]
pub struct Flow {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub version: Option<String>,
#[serde(default, skip_serializing_if = "FlowSchema::is_empty")]
#[serde_as(as = "DefaultOnNull")]
#[schemars(with = "FlowSchema")]
pub schemas: FlowSchema,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
#[serde_as(as = "DefaultOnNull")]
pub steps: Vec<Step>,
#[serde(default, skip_serializing_if = "ValueExpr::is_null")]
pub output: ValueExpr,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub test: Option<TestConfig>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub examples: Option<Vec<ExampleInput>>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
#[serde_as(as = "DefaultOnNull")]
pub metadata: HashMap<String, serde_json::Value>,
}
impl Flow {
pub fn slow_clone(&self) -> Self {
self.clone()
}
pub fn name(&self) -> Option<&str> {
self.name.as_deref()
}
pub fn description(&self) -> Option<&str> {
self.description.as_deref()
}
pub fn version(&self) -> Option<&str> {
self.version.as_deref()
}
pub fn metadata(&self) -> &HashMap<String, serde_json::Value> {
&self.metadata
}
pub fn steps(&self) -> &[Step] {
&self.steps
}
pub fn examples(&self) -> &[ExampleInput] {
self.examples.as_deref().unwrap_or(&[])
}
pub fn variables(&self) -> Option<VariableSchema> {
self.schemas().variables.clone().map(VariableSchema::from)
}
pub fn variable_schema(&self) -> Option<&SchemaRef> {
self.schemas().variables.as_ref()
}
pub fn step(&self, index: usize) -> &Step {
&self.steps[index]
}
pub fn step_mut(&mut self, index: usize) -> &mut Step {
self.steps.get_mut(index).expect("Index out of bounds")
}
pub fn output(&self) -> &ValueExpr {
&self.output
}
pub fn test(&self) -> Option<&TestConfig> {
self.test.as_ref()
}
pub fn test_mut(&mut self) -> Option<&mut TestConfig> {
self.test.as_mut()
}
pub fn schemas(&self) -> &FlowSchema {
&self.schemas
}
pub fn schemas_mut(&mut self) -> &mut FlowSchema {
&mut self.schemas
}
pub fn input_schema(&self) -> Option<&SchemaRef> {
self.schemas.input.as_ref()
}
pub fn set_input_schema(&mut self, input_schema: Option<SchemaRef>) {
self.schemas.input = input_schema;
}
pub fn output_schema(&self) -> Option<&SchemaRef> {
self.schemas.output.as_ref()
}
pub fn set_output_schema(&mut self, output_schema: Option<SchemaRef>) {
self.schemas.output = output_schema;
}
pub fn step_output_schema(&self, step_id: &str) -> Option<&SchemaRef> {
self.schemas.steps.get(step_id)
}
pub fn set_step_output_schema(&mut self, step_id: String, step_schema: SchemaRef) {
self.schemas.steps.insert(step_id, step_schema);
}
pub fn get_all_examples(&self) -> Vec<ExampleInput> {
let mut examples = self.examples().to_vec();
if let Some(test_config) = &self.test {
for test_case in &test_config.cases {
if !examples.iter().any(|ex| ex.name == test_case.name) {
examples.push(ExampleInput::from(test_case));
}
}
}
examples
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct FlowRef(std::sync::Arc<Flow>);
impl FlowRef {
pub fn new(flow: Flow) -> Self {
Self(std::sync::Arc::new(flow))
}
pub fn from_arc(arc: std::sync::Arc<Flow>) -> Self {
Self(arc)
}
pub fn as_flow(&self) -> &Flow {
&self.0
}
pub fn into_arc(self) -> std::sync::Arc<Flow> {
self.0
}
pub fn as_arc(&self) -> &std::sync::Arc<Flow> {
&self.0
}
}
impl std::ops::Deref for FlowRef {
type Target = Flow;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl From<Flow> for FlowRef {
fn from(flow: Flow) -> Self {
Self::new(flow)
}
}
impl From<std::sync::Arc<Flow>> for FlowRef {
fn from(arc: std::sync::Arc<Flow>) -> Self {
Self::from_arc(arc)
}
}
impl serde::Serialize for FlowRef {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
self.0.serialize(serializer)
}
}
impl<'de> serde::Deserialize<'de> for FlowRef {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let flow = Flow::deserialize(deserializer)?;
Ok(Self::new(flow))
}
}
#[serde_as]
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, schemars::JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct TestConfig {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub config_file: Option<String>,
#[serde(
default,
skip_serializing_if = "Option::is_none",
alias = "stepflow_config"
)]
pub config: Option<serde_json::Value>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
#[serde_as(as = "DefaultOnNull")]
pub cases: Vec<TestCase>,
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, schemars::JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct TestCase {
pub name: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
pub input: ValueRef,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub output: Option<FlowResult>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, schemars::JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct ExampleInput {
pub name: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
pub input: ValueRef,
}
impl From<&TestCase> for ExampleInput {
fn from(test_case: &TestCase) -> Self {
Self {
name: test_case.name.clone(),
description: test_case.description.clone(),
input: test_case.input.clone(),
}
}
}
#[cfg(test)]
mod tests {
use crate::workflow::{FlowBuilder, StepBuilder};
use super::*;
#[test]
fn test_flow_from_yaml() {
let yaml = r#"
name: test
description: test
version: 1.0.0
schemas:
type: object
properties:
input:
type: object
properties:
name:
type: string
description: The name to echo
count:
type: integer
output:
type: object
properties:
s1a:
type: string
s2b:
type: string
steps:
- component: /langflow/echo
id: s1
input:
a: "hello world"
- component: /mcp/foo/bar
id: s2
input:
a: "hello world 2"
output:
s1a: { $step: s1, path: "a" }
s2b: { $step: s2, path: a }
"#;
let flow: Flow = serde_yaml_ng::from_str(yaml).unwrap();
let input_schema = SchemaRef::parse_json(r#"{"type":"object","properties":{"name":{"type":"string","description":"The name to echo"},"count":{"type":"integer"}}}"#).unwrap();
let output_schema = SchemaRef::parse_json(
r#"{"type":"object","properties":{"s1a":{"type":"string"},"s2b":{"type":"string"}}}"#,
)
.unwrap();
assert_eq!(flow.name, Some("test".to_owned()));
assert_eq!(flow.description, Some("test".to_owned()));
assert_eq!(flow.version, Some("1.0.0".to_owned()));
assert_eq!(flow.schemas.input, Some(input_schema.clone()));
assert_eq!(flow.schemas.output, Some(output_schema.clone()));
assert_eq!(flow.steps.len(), 2);
assert_eq!(flow.steps[0].id, "s1");
assert_eq!(flow.steps[0].component.path(), "/langflow/echo");
assert_eq!(flow.steps[1].id, "s2");
assert_eq!(flow.steps[1].component.path(), "/mcp/foo/bar");
let serialized = serde_json::to_string(&flow).unwrap();
let deserialized: Flow = serde_json::from_str(&serialized).unwrap();
assert_eq!(flow.name, deserialized.name);
assert_eq!(flow.steps.len(), deserialized.steps.len());
assert_eq!(flow.output, deserialized.output);
assert!(matches!(flow.output, ValueExpr::Object(_)));
let expected_flow = FlowBuilder::new()
.name("test")
.description("test")
.version("1.0.0")
.input_schema(input_schema)
.output_schema(output_schema)
.steps(vec![
StepBuilder::new("s1")
.component("/langflow/echo")
.input_literal(serde_json::json!({
"a": "hello world"
}))
.build(),
StepBuilder::new("s2")
.component("/mcp/foo/bar")
.input_literal(serde_json::json!({
"a": "hello world 2"
}))
.build(),
])
.output(
serde_json::from_value(serde_json::json!({
"s1a": { "$step": "s1", "path": "a" },
"s2b": { "$step": "s2", "path": "a" }
}))
.unwrap(),
)
.build();
similar_asserts::assert_serde_eq!(&flow, &expected_flow);
}
#[test]
fn test_get_all_examples() {
use super::*;
use serde_json::json;
let flow = FlowBuilder::new()
.name("test_flow")
.output(ValueExpr::literal(json!({})))
.examples(vec![ExampleInput {
name: "example1".to_string(),
description: Some("Direct example".to_string()),
input: ValueRef::new(json!({"input": "example"})),
}])
.test_config(TestConfig {
config: None,
config_file: None,
cases: vec![
TestCase {
name: "test1".to_string(),
description: Some("Test case as example".to_string()),
input: ValueRef::new(json!({"input": "test"})),
output: None,
},
TestCase {
name: "example1".to_string(), description: Some("Duplicate name".to_string()),
input: ValueRef::new(json!({"input": "duplicate"})),
output: None,
},
],
})
.build();
let all_examples = flow.get_all_examples();
assert_eq!(all_examples.len(), 2);
assert_eq!(all_examples[0].name, "example1");
assert_eq!(
all_examples[0].description,
Some("Direct example".to_string())
);
assert_eq!(all_examples[1].name, "test1");
assert_eq!(
all_examples[1].description,
Some("Test case as example".to_string())
);
}
#[test]
fn test_flow_all_optional_null() {
let json = serde_json::json!({
"name": null,
"description": null,
"version": null,
"schemas": null,
"steps": null,
"output": null,
"test": null,
"examples": null,
"metadata": null,
});
let flow: Flow = serde_json::from_value(json).unwrap();
assert!(flow.name.is_none());
assert!(flow.description.is_none());
assert!(flow.version.is_none());
assert!(flow.schemas.is_empty());
assert!(flow.steps.is_empty());
assert!(flow.output.is_null());
assert!(flow.test.is_none());
assert!(flow.examples.is_none());
assert!(flow.metadata.is_empty());
}
#[test]
fn test_test_config_all_optional_null() {
let json = serde_json::json!({
"configFile": null,
"config": null,
"cases": null,
});
let config: TestConfig = serde_json::from_value(json).unwrap();
assert!(config.config_file.is_none());
assert!(config.config.is_none());
assert!(config.cases.is_empty());
}
#[test]
fn test_test_case_optional_null() {
let json = serde_json::json!({
"name": "my_test",
"input": {"key": "value"},
"description": null,
"output": null,
});
let case: TestCase = serde_json::from_value(json).unwrap();
assert_eq!(case.name, "my_test");
assert!(case.description.is_none());
assert!(case.output.is_none());
}
#[test]
fn test_example_input_optional_null() {
let json = serde_json::json!({
"name": "my_example",
"input": 42,
"description": null,
});
let example: ExampleInput = serde_json::from_value(json).unwrap();
assert_eq!(example.name, "my_example");
assert!(example.description.is_none());
}
#[test]
fn test_schema_comparison_with_flow_json() {
use crate::json_schema::generate_json_schema_with_defs;
use std::env;
let generated_json = generate_json_schema_with_defs::<Flow>();
let generated_schema_str = serde_json::to_string_pretty(&generated_json).unwrap();
let flow_schema_path = format!("{}/../../../schemas/flow.json", env!("CARGO_MANIFEST_DIR"));
if env::var("STEPFLOW_OVERWRITE_SCHEMA").is_ok() {
if let Some(parent) = std::path::Path::new(&flow_schema_path).parent() {
std::fs::create_dir_all(parent).expect("Failed to create schema directory");
}
std::fs::write(&flow_schema_path, &generated_schema_str)
.expect("Failed to write updated schema");
} else {
match std::fs::read_to_string(&flow_schema_path) {
Ok(expected_schema_str) => {
assert_eq!(
generated_schema_str, expected_schema_str,
"Generated schema does not match the reference schema at {flow_schema_path}. \
Run 'STEPFLOW_OVERWRITE_SCHEMA=1 cargo test -p stepflow-core' to update."
);
}
Err(_) => {
panic!(
"Flow schema file not found at {flow_schema_path}. Run 'STEPFLOW_OVERWRITE_SCHEMA=1 cargo test -p stepflow-core' to create it."
);
}
}
}
}
}