1use std::collections::HashMap;
14
15use serde_with::{DefaultOnNull, serde_as};
16
17use super::{FlowSchema, Step, ValueRef, VariableSchema};
18use crate::{FlowResult, ValueExpr, schema::SchemaRef};
19
20#[serde_as]
29#[derive(
30 Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Default, schemars::JsonSchema,
31)]
32#[serde(rename_all = "camelCase")]
33pub struct Flow {
34 #[serde(default, skip_serializing_if = "Option::is_none")]
36 pub name: Option<String>,
37
38 #[serde(default, skip_serializing_if = "Option::is_none")]
40 pub description: Option<String>,
41
42 #[serde(default, skip_serializing_if = "Option::is_none")]
44 pub version: Option<String>,
45
46 #[serde(default, skip_serializing_if = "FlowSchema::is_empty")]
49 #[serde_as(as = "DefaultOnNull")]
50 #[schemars(with = "FlowSchema")]
51 pub schemas: FlowSchema,
52
53 #[serde(default, skip_serializing_if = "Vec::is_empty")]
55 #[serde_as(as = "DefaultOnNull")]
56 pub steps: Vec<Step>,
57
58 #[serde(default, skip_serializing_if = "ValueExpr::is_null")]
60 pub output: ValueExpr,
61
62 #[serde(default, skip_serializing_if = "Option::is_none")]
64 pub test: Option<TestConfig>,
65
66 #[serde(default, skip_serializing_if = "Option::is_none")]
68 pub examples: Option<Vec<ExampleInput>>,
69
70 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
72 #[serde_as(as = "DefaultOnNull")]
73 pub metadata: HashMap<String, serde_json::Value>,
74}
75
76impl Flow {
77 pub fn slow_clone(&self) -> Self {
96 self.clone()
97 }
98
99 pub fn name(&self) -> Option<&str> {
100 self.name.as_deref()
101 }
102
103 pub fn description(&self) -> Option<&str> {
104 self.description.as_deref()
105 }
106
107 pub fn version(&self) -> Option<&str> {
108 self.version.as_deref()
109 }
110
111 pub fn metadata(&self) -> &HashMap<String, serde_json::Value> {
112 &self.metadata
113 }
114
115 pub fn steps(&self) -> &[Step] {
117 &self.steps
118 }
119
120 pub fn examples(&self) -> &[ExampleInput] {
121 self.examples.as_deref().unwrap_or(&[])
122 }
123
124 pub fn variables(&self) -> Option<VariableSchema> {
129 self.schemas().variables.clone().map(VariableSchema::from)
130 }
131
132 pub fn variable_schema(&self) -> Option<&SchemaRef> {
134 self.schemas().variables.as_ref()
135 }
136
137 pub fn step(&self, index: usize) -> &Step {
143 &self.steps[index]
144 }
145
146 pub fn step_mut(&mut self, index: usize) -> &mut Step {
152 self.steps.get_mut(index).expect("Index out of bounds")
153 }
154
155 pub fn output(&self) -> &ValueExpr {
157 &self.output
158 }
159
160 pub fn test(&self) -> Option<&TestConfig> {
161 self.test.as_ref()
162 }
163
164 pub fn test_mut(&mut self) -> Option<&mut TestConfig> {
165 self.test.as_mut()
166 }
167
168 pub fn schemas(&self) -> &FlowSchema {
170 &self.schemas
171 }
172
173 pub fn schemas_mut(&mut self) -> &mut FlowSchema {
175 &mut self.schemas
176 }
177
178 pub fn input_schema(&self) -> Option<&SchemaRef> {
180 self.schemas.input.as_ref()
181 }
182
183 pub fn set_input_schema(&mut self, input_schema: Option<SchemaRef>) {
185 self.schemas.input = input_schema;
186 }
187
188 pub fn output_schema(&self) -> Option<&SchemaRef> {
190 self.schemas.output.as_ref()
191 }
192
193 pub fn set_output_schema(&mut self, output_schema: Option<SchemaRef>) {
195 self.schemas.output = output_schema;
196 }
197
198 pub fn step_output_schema(&self, step_id: &str) -> Option<&SchemaRef> {
200 self.schemas.steps.get(step_id)
201 }
202
203 pub fn set_step_output_schema(&mut self, step_id: String, step_schema: SchemaRef) {
205 self.schemas.steps.insert(step_id, step_schema);
206 }
207
208 pub fn get_all_examples(&self) -> Vec<ExampleInput> {
210 let mut examples = self.examples().to_vec();
211
212 if let Some(test_config) = &self.test {
214 for test_case in &test_config.cases {
215 if !examples.iter().any(|ex| ex.name == test_case.name) {
217 examples.push(ExampleInput::from(test_case));
218 }
219 }
220 }
221
222 examples
223 }
224}
225
226#[derive(Debug, Clone, PartialEq)]
231pub struct FlowRef(std::sync::Arc<Flow>);
232
233impl FlowRef {
234 pub fn new(flow: Flow) -> Self {
236 Self(std::sync::Arc::new(flow))
237 }
238
239 pub fn from_arc(arc: std::sync::Arc<Flow>) -> Self {
241 Self(arc)
242 }
243
244 pub fn as_flow(&self) -> &Flow {
246 &self.0
247 }
248
249 pub fn into_arc(self) -> std::sync::Arc<Flow> {
251 self.0
252 }
253
254 pub fn as_arc(&self) -> &std::sync::Arc<Flow> {
256 &self.0
257 }
258}
259
260impl std::ops::Deref for FlowRef {
261 type Target = Flow;
262
263 fn deref(&self) -> &Self::Target {
264 &self.0
265 }
266}
267
268impl From<Flow> for FlowRef {
269 fn from(flow: Flow) -> Self {
270 Self::new(flow)
271 }
272}
273
274impl From<std::sync::Arc<Flow>> for FlowRef {
275 fn from(arc: std::sync::Arc<Flow>) -> Self {
276 Self::from_arc(arc)
277 }
278}
279
280impl serde::Serialize for FlowRef {
281 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
282 where
283 S: serde::Serializer,
284 {
285 self.0.serialize(serializer)
286 }
287}
288
289impl<'de> serde::Deserialize<'de> for FlowRef {
290 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
291 where
292 D: serde::Deserializer<'de>,
293 {
294 let flow = Flow::deserialize(deserializer)?;
295 Ok(Self::new(flow))
296 }
297}
298
299#[serde_as]
301#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, schemars::JsonSchema)]
302#[serde(rename_all = "camelCase")]
303pub struct TestConfig {
304 #[serde(default, skip_serializing_if = "Option::is_none")]
308 pub config_file: Option<String>,
309
310 #[serde(
313 default,
314 skip_serializing_if = "Option::is_none",
315 alias = "stepflow_config"
316 )]
317 pub config: Option<serde_json::Value>,
318
319 #[serde(default, skip_serializing_if = "Vec::is_empty")]
321 #[serde_as(as = "DefaultOnNull")]
322 pub cases: Vec<TestCase>,
323}
324
325#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, schemars::JsonSchema)]
327#[serde(rename_all = "camelCase")]
328pub struct TestCase {
329 pub name: String,
331
332 #[serde(default, skip_serializing_if = "Option::is_none")]
334 pub description: Option<String>,
335
336 pub input: ValueRef,
338
339 #[serde(default, skip_serializing_if = "Option::is_none")]
341 pub output: Option<FlowResult>,
342}
343
344#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, schemars::JsonSchema)]
346#[serde(rename_all = "camelCase")]
347pub struct ExampleInput {
348 pub name: String,
350
351 #[serde(default, skip_serializing_if = "Option::is_none")]
353 pub description: Option<String>,
354
355 pub input: ValueRef,
357}
358
359impl From<&TestCase> for ExampleInput {
360 fn from(test_case: &TestCase) -> Self {
361 Self {
362 name: test_case.name.clone(),
363 description: test_case.description.clone(),
364 input: test_case.input.clone(),
365 }
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use crate::workflow::{FlowBuilder, StepBuilder};
372
373 use super::*;
374
375 #[test]
376 fn test_flow_from_yaml() {
377 let yaml = r#"
378 name: test
379 description: test
380 version: 1.0.0
381 schemas:
382 type: object
383 properties:
384 input:
385 type: object
386 properties:
387 name:
388 type: string
389 description: The name to echo
390 count:
391 type: integer
392 output:
393 type: object
394 properties:
395 s1a:
396 type: string
397 s2b:
398 type: string
399 steps:
400 - component: /langflow/echo
401 id: s1
402 input:
403 a: "hello world"
404 - component: /mcp/foo/bar
405 id: s2
406 input:
407 a: "hello world 2"
408 output:
409 s1a: { $step: s1, path: "a" }
410 s2b: { $step: s2, path: a }
411 "#;
412 let flow: Flow = serde_yaml_ng::from_str(yaml).unwrap();
413 let input_schema = SchemaRef::parse_json(r#"{"type":"object","properties":{"name":{"type":"string","description":"The name to echo"},"count":{"type":"integer"}}}"#).unwrap();
414 let output_schema = SchemaRef::parse_json(
415 r#"{"type":"object","properties":{"s1a":{"type":"string"},"s2b":{"type":"string"}}}"#,
416 )
417 .unwrap();
418 assert_eq!(flow.name, Some("test".to_owned()));
420 assert_eq!(flow.description, Some("test".to_owned()));
421 assert_eq!(flow.version, Some("1.0.0".to_owned()));
422 assert_eq!(flow.schemas.input, Some(input_schema.clone()));
423 assert_eq!(flow.schemas.output, Some(output_schema.clone()));
424 assert_eq!(flow.steps.len(), 2);
425
426 assert_eq!(flow.steps[0].id, "s1");
428 assert_eq!(flow.steps[0].component.path(), "/langflow/echo");
429 assert_eq!(flow.steps[1].id, "s2");
430 assert_eq!(flow.steps[1].component.path(), "/mcp/foo/bar");
431
432 let serialized = serde_json::to_string(&flow).unwrap();
434 let deserialized: Flow = serde_json::from_str(&serialized).unwrap();
435 assert_eq!(flow.name, deserialized.name);
436 assert_eq!(flow.steps.len(), deserialized.steps.len());
437 assert_eq!(flow.output, deserialized.output);
438
439 assert!(matches!(flow.output, ValueExpr::Object(_)));
442
443 let expected_flow = FlowBuilder::new()
445 .name("test")
446 .description("test")
447 .version("1.0.0")
448 .input_schema(input_schema)
449 .output_schema(output_schema)
450 .steps(vec![
451 StepBuilder::new("s1")
452 .component("/langflow/echo")
453 .input_literal(serde_json::json!({
454 "a": "hello world"
455 }))
456 .build(),
457 StepBuilder::new("s2")
458 .component("/mcp/foo/bar")
459 .input_literal(serde_json::json!({
460 "a": "hello world 2"
461 }))
462 .build(),
463 ])
464 .output(
465 serde_json::from_value(serde_json::json!({
466 "s1a": { "$step": "s1", "path": "a" },
467 "s2b": { "$step": "s2", "path": "a" }
468 }))
469 .unwrap(),
470 )
471 .build();
472
473 similar_asserts::assert_serde_eq!(&flow, &expected_flow);
474 }
475
476 #[test]
477 fn test_get_all_examples() {
478 use super::*;
479 use serde_json::json;
480
481 let flow = FlowBuilder::new()
483 .name("test_flow")
484 .output(ValueExpr::literal(json!({})))
485 .examples(vec![ExampleInput {
486 name: "example1".to_string(),
487 description: Some("Direct example".to_string()),
488 input: ValueRef::new(json!({"input": "example"})),
489 }])
490 .test_config(TestConfig {
491 config: None,
492 config_file: None,
493 cases: vec![
494 TestCase {
495 name: "test1".to_string(),
496 description: Some("Test case as example".to_string()),
497 input: ValueRef::new(json!({"input": "test"})),
498 output: None,
499 },
500 TestCase {
501 name: "example1".to_string(), description: Some("Duplicate name".to_string()),
503 input: ValueRef::new(json!({"input": "duplicate"})),
504 output: None,
505 },
506 ],
507 })
508 .build();
509
510 let all_examples = flow.get_all_examples();
511
512 assert_eq!(all_examples.len(), 2);
514
515 assert_eq!(all_examples[0].name, "example1");
517 assert_eq!(
518 all_examples[0].description,
519 Some("Direct example".to_string())
520 );
521
522 assert_eq!(all_examples[1].name, "test1");
524 assert_eq!(
525 all_examples[1].description,
526 Some("Test case as example".to_string())
527 );
528 }
529
530 #[test]
531 fn test_flow_all_optional_null() {
532 let json = serde_json::json!({
535 "name": null,
536 "description": null,
537 "version": null,
538 "schemas": null,
539 "steps": null,
540 "output": null,
541 "test": null,
542 "examples": null,
543 "metadata": null,
544 });
545 let flow: Flow = serde_json::from_value(json).unwrap();
546 assert!(flow.name.is_none());
547 assert!(flow.description.is_none());
548 assert!(flow.version.is_none());
549 assert!(flow.schemas.is_empty());
550 assert!(flow.steps.is_empty());
551 assert!(flow.output.is_null());
552 assert!(flow.test.is_none());
553 assert!(flow.examples.is_none());
554 assert!(flow.metadata.is_empty());
555 }
556
557 #[test]
558 fn test_test_config_all_optional_null() {
559 let json = serde_json::json!({
561 "configFile": null,
562 "config": null,
563 "cases": null,
564 });
565 let config: TestConfig = serde_json::from_value(json).unwrap();
566 assert!(config.config_file.is_none());
567 assert!(config.config.is_none());
568 assert!(config.cases.is_empty());
569 }
570
571 #[test]
572 fn test_test_case_optional_null() {
573 let json = serde_json::json!({
575 "name": "my_test",
576 "input": {"key": "value"},
577 "description": null,
578 "output": null,
579 });
580 let case: TestCase = serde_json::from_value(json).unwrap();
581 assert_eq!(case.name, "my_test");
582 assert!(case.description.is_none());
583 assert!(case.output.is_none());
584 }
585
586 #[test]
587 fn test_example_input_optional_null() {
588 let json = serde_json::json!({
590 "name": "my_example",
591 "input": 42,
592 "description": null,
593 });
594 let example: ExampleInput = serde_json::from_value(json).unwrap();
595 assert_eq!(example.name, "my_example");
596 assert!(example.description.is_none());
597 }
598
599 #[test]
600 fn test_schema_comparison_with_flow_json() {
601 use crate::json_schema::generate_json_schema_with_defs;
602 use std::env;
603
604 let generated_json = generate_json_schema_with_defs::<Flow>();
606 let generated_schema_str = serde_json::to_string_pretty(&generated_json).unwrap();
607
608 let flow_schema_path = format!("{}/../../../schemas/flow.json", env!("CARGO_MANIFEST_DIR"));
609 if env::var("STEPFLOW_OVERWRITE_SCHEMA").is_ok() {
613 if let Some(parent) = std::path::Path::new(&flow_schema_path).parent() {
615 std::fs::create_dir_all(parent).expect("Failed to create schema directory");
616 }
617
618 std::fs::write(&flow_schema_path, &generated_schema_str)
619 .expect("Failed to write updated schema");
620 } else {
621 match std::fs::read_to_string(&flow_schema_path) {
622 Ok(expected_schema_str) => {
623 assert_eq!(
625 generated_schema_str, expected_schema_str,
626 "Generated schema does not match the reference schema at {flow_schema_path}. \
627 Run 'STEPFLOW_OVERWRITE_SCHEMA=1 cargo test -p stepflow-core' to update."
628 );
629 }
630 Err(_) => {
631 panic!(
633 "Flow schema file not found at {flow_schema_path}. Run 'STEPFLOW_OVERWRITE_SCHEMA=1 cargo test -p stepflow-core' to create it."
634 );
635 }
636 }
637 }
638 }
639}