1#![deny(unsafe_code)]
5#![allow(clippy::result_large_err)]
6
7pub mod add_step;
8pub mod answers;
9pub mod cache;
10pub mod component_catalog;
11pub mod component_schema;
12pub mod component_setup;
13pub mod config_flow;
14pub mod contracts;
15pub mod error;
16pub mod flow_bundle;
17pub mod flow_ir;
18pub mod flow_meta;
19pub mod i18n;
20pub mod info;
21pub mod ir;
22pub mod json_output;
23pub mod lint;
24pub mod loader;
25pub mod model;
26pub mod path_safety;
27pub mod qa_runner;
28pub mod questions;
29pub mod questions_schema;
30pub mod registry;
31pub mod resolve;
32pub mod resolve_summary;
33pub mod schema_mode;
34pub mod schema_validate;
35pub mod splice;
36pub mod template;
37pub mod util;
38pub mod wizard;
39pub mod wizard_ops;
40pub mod wizard_state;
41
42pub use flow_bundle::{
43 ComponentPin, FlowBundle, NodeRef, blake3_hex, canonicalize_json, extract_component_pins,
44 load_and_validate_bundle, load_and_validate_bundle_with_flow,
45};
46pub use json_output::{JsonDiagnostic, LintJsonOutput, lint_to_stdout_json};
47pub use splice::{NEXT_NODE_PLACEHOLDER, splice_node_after};
48
49pub const SLOT_SCHEMA_METADATA_KEY: &str = "greentic.slot_schema";
51
52use crate::{error::Result, model::FlowDoc};
53use greentic_types::{
54 ComponentId, Flow, FlowComponentRef, FlowId, FlowKind, FlowMetadata, InputMapping, Node,
55 NodeId, OutputMapping, Routing, TelemetryHints, flow::FlowHasher,
56};
57use indexmap::IndexMap;
58use serde_json::Value;
59use std::collections::{BTreeMap, BTreeSet, HashSet};
60use std::path::Path;
61
62pub fn map_flow_type(flow_type: &str) -> Result<FlowKind> {
64 match flow_type {
65 "messaging" => Ok(FlowKind::Messaging),
66 "event" | "events" => Ok(FlowKind::Event),
67 "component-config" => Ok(FlowKind::ComponentConfig),
68 "job" => Ok(FlowKind::Job),
69 "http" => Ok(FlowKind::Http),
70 other => Err(crate::error::FlowError::UnknownFlowType {
71 flow_type: other.to_string(),
72 location: crate::error::FlowErrorLocation::at_path("type"),
73 }),
74 }
75}
76
77pub fn compile_flow(doc: FlowDoc) -> Result<Flow> {
79 let FlowDoc {
80 id,
81 title,
82 description,
83 flow_type,
84 start,
85 parameters,
86 tags,
87 schema_version,
88 mut entrypoints,
89 meta: _,
90 slot_schema,
91 nodes: node_docs,
92 } = doc;
93
94 let kind = map_flow_type(&flow_type)?;
95 let known_nodes: HashSet<String> = node_docs.keys().cloned().collect();
96 if let Some(entry) = start
97 .clone()
98 .or_else(|| known_nodes.contains("in").then(|| "in".to_string()))
99 .or_else(|| node_docs.keys().next().cloned())
100 {
101 entrypoints
102 .entry("default".to_string())
103 .or_insert_with(|| Value::String(entry));
104 }
105
106 let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
107 for (node_id_str, node_doc) in node_docs {
108 let node_id = NodeId::new(node_id_str.as_str()).map_err(|e| {
109 crate::error::FlowError::InvalidIdentifier {
110 kind: "node",
111 value: node_id_str.clone(),
112 detail: e.to_string(),
113 location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id_str}")),
114 }
115 })?;
116 let routing = compile_routing(&node_doc.routing, &known_nodes, node_id_str.as_str())?;
117 let telemetry = node_doc
118 .telemetry
119 .map(|t| TelemetryHints {
120 span_name: t.span_name,
121 attributes: t.attributes,
122 sampling: t.sampling,
123 })
124 .unwrap_or_default();
125 let mut op_key: Option<String> = None;
126 let mut op_sibling: Option<String> = None;
127 let mut payload: Option<Value> = None;
128 let mut input_mapping: Option<Value> = None;
129 let mut output_mapping: Option<Value> = None;
130 let mut err_mapping: Option<Value> = None;
131 for (k, v) in node_doc.raw {
132 match k.as_str() {
133 "in_map" => {
134 input_mapping = Some(v);
135 continue;
136 }
137 "out_map" | "output" => {
138 output_mapping = Some(v);
139 continue;
140 }
141 "err_map" => {
142 err_mapping = Some(v);
143 continue;
144 }
145 "operation" => {
146 op_sibling = v.as_str().map(str::to_string);
147 continue;
148 }
149 _ => {}
150 }
151 op_key = Some(k);
152 payload = Some(v);
153 }
154 let operation = op_key.ok_or_else(|| crate::error::FlowError::Internal {
155 message: format!("node '{node_id_str}' missing operation key"),
156 location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id_str}")),
157 })?;
158 let is_mcp = operation.as_str() == crate::ir::MCP_COMPONENT;
159 let is_builtin =
160 matches!(operation.as_str(), "questions" | "template") || operation.starts_with("dw.");
161 let is_legacy = schema_version.unwrap_or(1) < 2;
162 let (component_id, op_field) = if is_mcp {
163 (crate::ir::MCP_COMPONENT.to_string(), op_sibling)
168 } else if is_builtin || is_legacy {
169 (operation, op_sibling)
170 } else {
171 (
172 "component.exec".to_string(),
173 Some(op_sibling.unwrap_or(operation)),
174 )
175 };
176 let node = Node {
177 id: node_id.clone(),
178 component: FlowComponentRef {
179 id: ComponentId::new(&component_id).unwrap(),
180 pack_alias: None,
181 operation: op_field,
182 },
183 input: InputMapping {
184 mapping: input_mapping
185 .or(payload)
186 .unwrap_or_else(|| Value::Object(Default::default())),
187 },
188 output: OutputMapping {
189 mapping: output_mapping.unwrap_or_else(|| Value::Object(Default::default())),
190 },
191 err_map: err_mapping.map(|mapping| OutputMapping { mapping }),
192 routing,
193 telemetry,
194 };
195 nodes.insert(node_id, node);
196 }
197
198 let flow_id =
199 FlowId::new(id.as_str()).map_err(|e| crate::error::FlowError::InvalidIdentifier {
200 kind: "flow",
201 value: id.clone(),
202 detail: e.to_string(),
203 location: crate::error::FlowErrorLocation::at_path("id"),
204 })?;
205
206 let entrypoints_map: BTreeMap<String, Value> = entrypoints.into_iter().collect();
207
208 let mut extra = parameters;
209 if let Some(ss) = slot_schema {
210 validate_slot_schema_names(&ss)?;
211 match extra {
212 Value::Object(ref mut map) => {
213 map.insert(SLOT_SCHEMA_METADATA_KEY.to_string(), ss);
214 }
215 Value::Null => {
216 let mut map = serde_json::Map::new();
217 map.insert(SLOT_SCHEMA_METADATA_KEY.to_string(), ss);
218 extra = Value::Object(map);
219 }
220 _ => {
221 tracing::warn!(
222 flow_id = %id,
223 "slot_schema present but parameters is not an object; skipping forward into extra"
224 );
225 }
226 }
227 }
228
229 Ok(Flow {
230 schema_version: "flow-v1".to_string(),
231 id: flow_id,
232 kind,
233 entrypoints: entrypoints_map,
234 nodes,
235 metadata: FlowMetadata {
236 title,
237 description,
238 tags: tags.into_iter().collect::<BTreeSet<_>>(),
239 extra,
240 },
241 })
242}
243
244fn validate_slot_schema_names(value: &Value) -> Result<()> {
249 use crate::error::{FlowError, FlowErrorLocation, SchemaErrorDetail};
250
251 let Some(slots) = value.as_array() else {
252 return Ok(()); };
254
255 let mut seen_names: HashSet<&str> = HashSet::with_capacity(slots.len());
256
257 for (i, slot) in slots.iter().enumerate() {
258 let Some(name) = slot.get("name").and_then(Value::as_str) else {
259 continue; };
261 if !seen_names.insert(name) {
262 let path = format!("slot_schema[{i}]");
263 let message = format!("{path}: duplicate slot name '{name}'");
264 let loc_path = format!("{path}/name");
265 return Err(FlowError::Schema {
266 message: message.clone(),
267 details: vec![SchemaErrorDetail {
268 message,
269 location: FlowErrorLocation::at_path(&loc_path),
270 }],
271 location: FlowErrorLocation::at_path(loc_path),
272 });
273 }
274
275 if let Some(pat) = slot.get("pattern").and_then(Value::as_str)
277 && let Err(e) = regex::Regex::new(pat)
278 {
279 let path = format!("slot_schema[{i}]");
280 let message = format!("{path}: invalid regex pattern for slot '{name}': {e}");
281 let loc_path = format!("{path}/pattern");
282 return Err(FlowError::Schema {
283 message: message.clone(),
284 details: vec![SchemaErrorDetail {
285 message,
286 location: FlowErrorLocation::at_path(&loc_path),
287 }],
288 location: FlowErrorLocation::at_path(loc_path),
289 });
290 }
291 }
292
293 Ok(())
294}
295
296pub fn compile_ygtc_str(src: &str) -> Result<Flow> {
298 let doc = loader::load_ygtc_from_str(src)?;
299 compile_flow(doc)
300}
301
302pub fn compile_ygtc_file(path: &Path) -> Result<Flow> {
304 let doc = loader::load_ygtc_from_path(path)?;
305 compile_flow(doc)
306}
307
308fn compile_routing(raw: &Value, nodes: &HashSet<String>, node_id: &str) -> Result<Routing> {
309 #[derive(serde::Deserialize)]
310 struct RouteDoc {
311 #[serde(default)]
312 to: Option<String>,
313 #[serde(default)]
314 out: Option<bool>,
315 #[serde(default)]
316 status: Option<String>,
317 #[serde(default)]
318 reply: Option<bool>,
319 #[serde(default)]
320 condition: Option<String>,
321 }
322
323 let routes: Vec<RouteDoc> = if raw.is_null() {
324 Vec::new()
325 } else if let Some(shorthand) = raw.as_str() {
326 match shorthand {
327 "out" => vec![RouteDoc {
328 to: None,
329 out: Some(true),
330 status: None,
331 reply: None,
332 condition: None,
333 }],
334 "reply" => vec![RouteDoc {
335 to: None,
336 out: None,
337 status: None,
338 reply: Some(true),
339 condition: None,
340 }],
341 other => {
342 return Err(crate::error::FlowError::Routing {
343 node_id: node_id.to_string(),
344 message: format!("invalid routing shorthand '{other}'"),
345 location: crate::error::FlowErrorLocation::at_path(format!(
346 "nodes.{node_id}.routing"
347 )),
348 });
349 }
350 }
351 } else {
352 serde_json::from_value(raw.clone()).map_err(|e| crate::error::FlowError::Routing {
353 node_id: node_id.to_string(),
354 message: e.to_string(),
355 location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id}.routing")),
356 })?
357 };
358
359 if routes.iter().any(|r| r.condition.is_some()) {
361 for route in &routes {
363 if let Some(to) = &route.to
364 && !nodes.contains(to)
365 {
366 return Err(crate::error::FlowError::MissingNode {
367 target: to.clone(),
368 node_id: node_id.to_string(),
369 location: crate::error::FlowErrorLocation::at_path(format!(
370 "nodes.{node_id}.routing"
371 )),
372 });
373 }
374 }
375 return Ok(Routing::Custom(raw.clone()));
376 }
377
378 if routes.len() == 1 {
379 let route = &routes[0];
380 let has_condition = raw
381 .as_array()
382 .and_then(|routes| routes.first())
383 .and_then(Value::as_object)
384 .is_some_and(|route| route.contains_key("condition"));
385 let is_out = route.out.unwrap_or(false);
386 if has_condition {
387 return Ok(Routing::Custom(raw.clone()));
388 }
389 if route.reply.unwrap_or(false) {
390 return Ok(Routing::Reply);
391 }
392 if route.status.is_some() {
393 return Ok(Routing::Custom(raw.clone()));
396 }
397 if let Some(to) = &route.to {
398 if to == "out" || is_out {
399 return Ok(Routing::End);
400 }
401 if !nodes.contains(to) {
402 return Err(crate::error::FlowError::MissingNode {
403 target: to.clone(),
404 node_id: node_id.to_string(),
405 location: crate::error::FlowErrorLocation::at_path(format!(
406 "nodes.{node_id}.routing"
407 )),
408 });
409 }
410 return Ok(Routing::Next {
411 node_id: NodeId::new(to.as_str()).map_err(|e| {
412 crate::error::FlowError::InvalidIdentifier {
413 kind: "node",
414 value: to.clone(),
415 detail: e.to_string(),
416 location: crate::error::FlowErrorLocation::at_path(format!(
417 "nodes.{node_id}.routing"
418 )),
419 }
420 })?,
421 });
422 }
423 if is_out {
424 return Ok(Routing::End);
425 }
426 }
427
428 if routes.is_empty() {
429 return Ok(Routing::End);
430 }
431
432 if routes.len() >= 2 {
434 use std::collections::BTreeMap;
435 let has_condition = raw.as_array().is_some_and(|routes| {
436 routes.iter().any(|route| {
437 route
438 .as_object()
439 .is_some_and(|route| route.contains_key("condition"))
440 })
441 });
442 if has_condition {
443 return Ok(Routing::Custom(raw.clone()));
444 }
445 let mut on_status: BTreeMap<String, NodeId> = BTreeMap::new();
446 let mut default: Option<NodeId> = None;
447 let mut any_status = false;
448 for route in &routes {
449 if route.reply.unwrap_or(false) || route.out.unwrap_or(false) {
450 return Ok(Routing::Custom(raw.clone()));
451 }
452 let to = match &route.to {
453 Some(t) => t,
454 None => return Ok(Routing::Custom(raw.clone())),
455 };
456 if !nodes.contains(to) {
457 return Err(crate::error::FlowError::MissingNode {
458 target: to.clone(),
459 node_id: node_id.to_string(),
460 location: crate::error::FlowErrorLocation::at_path(format!(
461 "nodes.{node_id}.routing"
462 )),
463 });
464 }
465 let to_id = NodeId::new(to.as_str()).map_err(|e| {
466 crate::error::FlowError::InvalidIdentifier {
467 kind: "node",
468 value: to.clone(),
469 detail: e.to_string(),
470 location: crate::error::FlowErrorLocation::at_path(format!(
471 "nodes.{node_id}.routing"
472 )),
473 }
474 })?;
475 if let Some(status) = &route.status {
476 any_status = true;
477 on_status.insert(status.clone(), to_id);
478 } else {
479 default = Some(to_id);
480 }
481 }
482 if any_status {
483 return Ok(Routing::Branch { on_status, default });
484 }
485 if let Some(default) = default {
486 return Ok(Routing::Branch {
487 on_status,
488 default: Some(default),
489 });
490 }
491 }
492
493 Ok(Routing::Custom(raw.clone()))
494}
495
496#[cfg(test)]
497mod tests {
498 use super::*;
499 use crate::loader::load_ygtc_from_str;
500 use serde_json::json;
501 use tempfile::tempdir;
502
503 #[test]
504 fn map_flow_type_supports_known_aliases() {
505 assert_eq!(map_flow_type("messaging").unwrap(), FlowKind::Messaging);
506 assert_eq!(map_flow_type("events").unwrap(), FlowKind::Event);
507 assert_eq!(
508 map_flow_type("component-config").unwrap(),
509 FlowKind::ComponentConfig
510 );
511 assert!(matches!(
512 map_flow_type("unknown").unwrap_err(),
513 crate::error::FlowError::UnknownFlowType { .. }
514 ));
515 }
516
517 #[test]
518 fn compile_flow_builds_entrypoints_and_branch_routing() {
519 let yaml = r#"id: demo
520type: messaging
521nodes:
522 start:
523 qa.process: {}
524 routing:
525 - status: ok
526 to: done
527 - to: fallback
528 done:
529 template: "ok"
530 routing: out
531 fallback:
532 template: "fallback"
533 routing: reply
534"#;
535
536 let flow = compile_ygtc_str(yaml).expect("compile flow");
537 assert_eq!(flow.entrypoints.get("default"), Some(&json!("start")));
538 match flow
539 .nodes
540 .get(&NodeId::new("start").unwrap())
541 .unwrap()
542 .routing
543 .clone()
544 {
545 Routing::Branch { on_status, default } => {
546 assert_eq!(on_status.get("ok").unwrap().as_str(), "done");
547 assert_eq!(default.unwrap().as_str(), "fallback");
548 }
549 other => panic!("expected branch routing, got {other:?}"),
550 }
551 }
552
553 #[test]
554 fn compile_ygtc_file_reports_invalid_routing_targets() {
555 let dir = tempdir().unwrap();
556 let path = dir.path().join("bad.ygtc");
557 std::fs::write(
558 &path,
559 r#"id: demo
560type: messaging
561nodes:
562 start:
563 qa.process: {}
564 routing:
565 - to: missing
566"#,
567 )
568 .unwrap();
569
570 let err = compile_ygtc_file(&path).expect_err("missing routing target should fail");
571 assert!(matches!(err, crate::error::FlowError::MissingNode { .. }));
572 }
573
574 #[test]
575 fn compile_flow_rejects_invalid_routing_shorthand() {
576 let err = load_ygtc_from_str(
577 r#"id: demo
578type: messaging
579nodes:
580 start:
581 qa.process: {}
582 routing: invalid
583"#,
584 )
585 .expect_err("invalid shorthand should fail during load");
586 assert!(
589 matches!(
590 err,
591 crate::error::FlowError::Routing { .. } | crate::error::FlowError::Schema { .. }
592 ),
593 "expected Routing or Schema error, got: {err}"
594 );
595 }
596
597 #[test]
598 fn compile_preserves_single_conditional_route_as_custom() {
599 let flow = compile_ygtc_str(
600 r#"
601id: default
602schema_version: 2
603type: messaging
604start: greet
605
606nodes:
607 greet:
608 emit.response:
609 text: hi
610 routing:
611 - condition: in.input.text != ""
612 to: ask
613
614 ask:
615 emit.response:
616 text: ok
617 routing:
618 - out: true
619"#,
620 )
621 .expect("flow should compile");
622
623 let greet_id = NodeId::new("greet").expect("valid node id");
624 let greet = flow.nodes.get(&greet_id).expect("greet node exists");
625 assert!(matches!(greet.routing, Routing::Custom(_)));
626 }
627
628 #[test]
629 fn compile_preserves_multi_route_conditions_as_custom() {
630 let flow = compile_ygtc_str(
631 r#"
632id: default
633schema_version: 2
634type: messaging
635start: ask
636
637nodes:
638 ask:
639 provider.invoke:
640 provider_type: llm.openai.compat.ollama
641 op: chat
642 routing:
643 - condition: error != ""
644 to: send_error
645 - to: send_response
646
647 send_response:
648 emit.response:
649 text: ok
650 routing:
651 - out: true
652
653 send_error:
654 emit.response:
655 text: bad
656 routing:
657 - out: true
658"#,
659 )
660 .expect("flow should compile");
661
662 let ask_id = NodeId::new("ask").expect("valid node id");
663 let ask = flow.nodes.get(&ask_id).expect("ask node exists");
664 assert!(matches!(ask.routing, Routing::Custom(_)));
665 }
666
667 #[test]
668 fn compile_flow_forwards_slot_schema_into_metadata_extra() {
669 let slot_schema = json!([
670 { "name": "city", "slot_type": "string", "pattern": "^[A-Z].+" },
671 { "name": "color", "slot_type": "enum", "enum_values": ["red", "blue"] }
672 ]);
673 let doc = crate::model::FlowDoc {
674 id: "test".to_string(),
675 title: None,
676 description: None,
677 flow_type: "messaging".to_string(),
678 start: None,
679 parameters: json!({"custom_key": 42}),
680 tags: vec![],
681 schema_version: None,
682 entrypoints: Default::default(),
683 meta: None,
684 slot_schema: Some(slot_schema.clone()),
685 nodes: {
686 let mut m = indexmap::IndexMap::new();
687 m.insert(
688 "start".to_string(),
689 crate::model::NodeDoc {
690 routing: json!("out"),
691 raw: {
692 let mut r = indexmap::IndexMap::new();
693 r.insert("template".to_string(), json!("hi"));
694 r
695 },
696 ..Default::default()
697 },
698 );
699 m
700 },
701 };
702
703 let flow = compile_flow(doc).expect("compile_flow should succeed");
704 assert_eq!(
705 flow.metadata.extra[SLOT_SCHEMA_METADATA_KEY], slot_schema,
706 "slot_schema must be forwarded into metadata.extra"
707 );
708 assert_eq!(
709 flow.metadata.extra["custom_key"],
710 json!(42),
711 "original parameters must be preserved"
712 );
713 }
714
715 fn component_exec_doc_with_slots(slot_schema: Option<Value>) -> crate::model::FlowDoc {
719 crate::model::FlowDoc {
720 id: "exec-test".to_string(),
721 title: None,
722 description: None,
723 flow_type: "messaging".to_string(),
724 start: None,
725 parameters: json!({}),
726 tags: vec![],
727 schema_version: None,
728 entrypoints: Default::default(),
729 meta: None,
730 slot_schema,
731 nodes: {
732 let mut m = indexmap::IndexMap::new();
733 m.insert(
734 "run".to_string(),
735 crate::model::NodeDoc {
736 routing: json!("out"),
737 raw: {
738 let mut r = indexmap::IndexMap::new();
739 r.insert("component.exec".to_string(), json!("some.component"));
740 r
741 },
742 ..Default::default()
743 },
744 );
745 m
746 },
747 }
748 }
749
750 #[test]
751 fn compile_rejects_duplicate_slot_names() {
752 let doc = component_exec_doc_with_slots(Some(json!([
753 { "name": "city", "slot_type": "string", "pattern": "^.+" },
754 { "name": "city", "slot_type": "number" }
755 ])));
756 let msg = compile_flow(doc)
757 .expect_err("compile_flow should reject duplicate slot names")
758 .to_string();
759 assert!(
760 msg.contains("duplicate"),
761 "error should mention 'duplicate': {msg}"
762 );
763 }
764
765 #[test]
766 fn compile_rejects_invalid_regex_in_slot_pattern() {
767 let doc = component_exec_doc_with_slots(Some(json!([
768 { "name": "city", "slot_type": "string", "pattern": "(" }
769 ])));
770 let msg = compile_flow(doc)
771 .expect_err("compile_flow should reject invalid regex")
772 .to_string();
773 assert!(
774 msg.contains("invalid regex pattern"),
775 "error should mention 'invalid regex pattern': {msg}"
776 );
777 }
778
779 #[test]
780 fn compile_accepts_valid_slot_schema_via_component_exec() {
781 let doc = component_exec_doc_with_slots(Some(json!([
782 { "name": "city", "slot_type": "string", "pattern": "^[A-Z].+" },
783 { "name": "color", "slot_type": "enum", "enum_values": ["red", "blue"] },
784 { "name": "count", "slot_type": "number" },
785 { "name": "active", "slot_type": "boolean" },
786 { "name": "when", "slot_type": "date" }
787 ])));
788 let flow = compile_flow(doc).expect("valid slot_schema should compile");
789 assert!(
790 flow.metadata.extra.get(SLOT_SCHEMA_METADATA_KEY).is_some(),
791 "slot_schema must be present in metadata.extra"
792 );
793 }
794
795 #[test]
796 fn routing_shorthand_validator_rejects_invalid_string_directly() {
797 let nodes = HashSet::from(["start".to_string()]);
798 let err = compile_routing(&json!("invalid"), &nodes, "start")
799 .expect_err("invalid shorthand must be rejected");
800 assert!(
801 matches!(err, crate::error::FlowError::Routing { .. }),
802 "expected Routing error, got: {err}"
803 );
804 }
805}