1#![forbid(unsafe_code)]
5#![allow(clippy::result_large_err)]
6
7pub mod add_step;
8pub mod component_catalog;
9pub mod config_flow;
10pub mod error;
11pub mod flow_bundle;
12pub mod flow_ir;
13pub mod ir;
14pub mod json_output;
15pub mod lint;
16pub mod loader;
17pub mod model;
18pub mod path_safety;
19pub mod registry;
20pub mod resolve;
21pub mod splice;
22pub mod util;
23
24pub use flow_bundle::{
25 ComponentPin, FlowBundle, NodeRef, blake3_hex, canonicalize_json, extract_component_pins,
26 load_and_validate_bundle, load_and_validate_bundle_with_flow,
27};
28pub use json_output::{JsonDiagnostic, LintJsonOutput, lint_to_stdout_json};
29pub use splice::{NEXT_NODE_PLACEHOLDER, splice_node_after};
30
31use crate::{error::Result, model::FlowDoc};
32use greentic_types::{
33 ComponentId, Flow, FlowComponentRef, FlowId, FlowKind, FlowMetadata, InputMapping, Node,
34 NodeId, OutputMapping, Routing, TelemetryHints, flow::FlowHasher,
35};
36use indexmap::IndexMap;
37use serde_json::Value;
38use std::collections::{BTreeMap, BTreeSet};
39use std::path::Path;
40
41pub fn map_flow_type(flow_type: &str) -> Result<FlowKind> {
43 match flow_type {
44 "messaging" => Ok(FlowKind::Messaging),
45 "event" | "events" => Ok(FlowKind::Event),
46 "component-config" => Ok(FlowKind::ComponentConfig),
47 "job" => Ok(FlowKind::Job),
48 "http" => Ok(FlowKind::Http),
49 other => Err(crate::error::FlowError::UnknownFlowType {
50 flow_type: other.to_string(),
51 location: crate::error::FlowErrorLocation::at_path("type"),
52 }),
53 }
54}
55
56pub fn compile_flow(doc: FlowDoc) -> Result<Flow> {
58 let kind = map_flow_type(&doc.flow_type)?;
59 let mut entrypoints = doc.entrypoints.clone();
60 if let Some(entry) = resolve_entry(&doc) {
61 entrypoints
62 .entry("default".to_string())
63 .or_insert_with(|| Value::String(entry));
64 }
65
66 let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
67 for (node_id_str, node_doc) in doc.nodes.iter() {
68 let node_id = NodeId::new(node_id_str.as_str()).map_err(|e| {
69 crate::error::FlowError::InvalidIdentifier {
70 kind: "node",
71 value: node_id_str.clone(),
72 detail: e.to_string(),
73 location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id_str}")),
74 }
75 })?;
76 let routing = compile_routing(&node_doc.routing, &doc.nodes, node_id_str)?;
77 let telemetry = node_doc
78 .telemetry
79 .as_ref()
80 .map(|t| TelemetryHints {
81 span_name: t.span_name.clone(),
82 attributes: t.attributes.clone(),
83 sampling: t.sampling.clone(),
84 })
85 .unwrap_or_default();
86 let mut op_key: Option<String> = None;
88 let mut payload: Option<Value> = None;
89 for (k, v) in &node_doc.raw {
90 op_key = Some(k.clone());
91 payload = Some(v.clone());
92 }
93 let output_mapping = node_doc
94 .raw
95 .get("output")
96 .cloned()
97 .unwrap_or_else(|| Value::Object(Default::default()));
98 let operation = op_key.ok_or_else(|| crate::error::FlowError::Internal {
99 message: format!("node '{node_id_str}' missing operation key"),
100 location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id_str}")),
101 })?;
102 let is_builtin = matches!(operation.as_str(), "questions" | "template");
103 let is_legacy = doc.schema_version.unwrap_or(1) < 2 || operation.contains('.');
104 let (component_id, op_field) = if is_builtin || is_legacy {
105 (operation.clone(), None)
106 } else {
107 ("component.exec".to_string(), Some(operation.clone()))
108 };
109 let node = Node {
110 id: node_id.clone(),
111 component: FlowComponentRef {
112 id: ComponentId::new(&component_id).unwrap(),
113 pack_alias: None,
114 operation: op_field,
115 },
116 input: InputMapping {
117 mapping: payload.unwrap_or_else(|| Value::Object(Default::default())),
118 },
119 output: OutputMapping {
120 mapping: output_mapping,
121 },
122 routing,
123 telemetry,
124 };
125 nodes.insert(node_id, node);
126 }
127
128 let flow_id =
129 FlowId::new(doc.id.as_str()).map_err(|e| crate::error::FlowError::InvalidIdentifier {
130 kind: "flow",
131 value: doc.id.clone(),
132 detail: e.to_string(),
133 location: crate::error::FlowErrorLocation::at_path("id"),
134 })?;
135
136 let entrypoints_map: BTreeMap<String, Value> = entrypoints.into_iter().collect();
137
138 Ok(Flow {
139 schema_version: "flow-v1".to_string(),
140 id: flow_id,
141 kind,
142 entrypoints: entrypoints_map,
143 nodes,
144 metadata: FlowMetadata {
145 title: doc.title,
146 description: doc.description,
147 tags: doc.tags.into_iter().collect::<BTreeSet<_>>(),
148 extra: doc.parameters,
149 },
150 })
151}
152
153pub fn compile_ygtc_str(src: &str) -> Result<Flow> {
155 let doc = loader::load_ygtc_from_str(src)?;
156 compile_flow(doc)
157}
158
159pub fn compile_ygtc_file(path: &Path) -> Result<Flow> {
161 let doc = loader::load_ygtc_from_path(path)?;
162 compile_flow(doc)
163}
164
165fn compile_routing(
166 raw: &Value,
167 nodes: &IndexMap<String, crate::model::NodeDoc>,
168 node_id: &str,
169) -> Result<Routing> {
170 #[derive(serde::Deserialize)]
171 struct RouteDoc {
172 #[serde(default)]
173 to: Option<String>,
174 #[serde(default)]
175 out: Option<bool>,
176 #[serde(default)]
177 status: Option<String>,
178 #[serde(default)]
179 reply: Option<bool>,
180 }
181
182 let routes: Vec<RouteDoc> = if raw.is_null() {
183 Vec::new()
184 } else if let Some(shorthand) = raw.as_str() {
185 match shorthand {
186 "out" => vec![RouteDoc {
187 to: None,
188 out: Some(true),
189 status: None,
190 reply: None,
191 }],
192 "reply" => vec![RouteDoc {
193 to: None,
194 out: None,
195 status: None,
196 reply: Some(true),
197 }],
198 other => {
199 return Err(crate::error::FlowError::Routing {
200 node_id: node_id.to_string(),
201 message: format!("invalid routing shorthand '{other}'"),
202 location: crate::error::FlowErrorLocation::at_path(format!(
203 "nodes.{node_id}.routing"
204 )),
205 });
206 }
207 }
208 } else {
209 serde_json::from_value(raw.clone()).map_err(|e| crate::error::FlowError::Routing {
210 node_id: node_id.to_string(),
211 message: e.to_string(),
212 location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id}.routing")),
213 })?
214 };
215
216 if routes.len() == 1 {
217 let route = &routes[0];
218 let is_out = route.out.unwrap_or(false);
219 if route.reply.unwrap_or(false) {
220 return Ok(Routing::Reply);
221 }
222 if let Some(to) = &route.to {
223 if to == "out" || is_out {
224 return Ok(Routing::End);
225 }
226 if !nodes.contains_key(to) {
227 return Err(crate::error::FlowError::MissingNode {
228 target: to.clone(),
229 node_id: node_id.to_string(),
230 location: crate::error::FlowErrorLocation::at_path(format!(
231 "nodes.{node_id}.routing"
232 )),
233 });
234 }
235 return Ok(Routing::Next {
236 node_id: NodeId::new(to.as_str()).map_err(|e| {
237 crate::error::FlowError::InvalidIdentifier {
238 kind: "node",
239 value: to.clone(),
240 detail: e.to_string(),
241 location: crate::error::FlowErrorLocation::at_path(format!(
242 "nodes.{node_id}.routing"
243 )),
244 }
245 })?,
246 });
247 }
248 if is_out {
249 return Ok(Routing::End);
250 }
251 if route.status.is_some() {
252 return Ok(Routing::Custom(raw.clone()));
254 }
255 }
256
257 if routes.is_empty() {
258 return Ok(Routing::End);
259 }
260
261 if routes.len() >= 2 {
263 use std::collections::BTreeMap;
264 let mut on_status: BTreeMap<String, NodeId> = BTreeMap::new();
265 let mut default: Option<NodeId> = None;
266 let mut any_status = false;
267 for route in &routes {
268 if route.reply.unwrap_or(false) || route.out.unwrap_or(false) {
269 return Ok(Routing::Custom(raw.clone()));
270 }
271 let to = match &route.to {
272 Some(t) => t,
273 None => return Ok(Routing::Custom(raw.clone())),
274 };
275 if !nodes.contains_key(to) {
276 return Err(crate::error::FlowError::MissingNode {
277 target: to.clone(),
278 node_id: node_id.to_string(),
279 location: crate::error::FlowErrorLocation::at_path(format!(
280 "nodes.{node_id}.routing"
281 )),
282 });
283 }
284 let to_id = NodeId::new(to.as_str()).map_err(|e| {
285 crate::error::FlowError::InvalidIdentifier {
286 kind: "node",
287 value: to.clone(),
288 detail: e.to_string(),
289 location: crate::error::FlowErrorLocation::at_path(format!(
290 "nodes.{node_id}.routing"
291 )),
292 }
293 })?;
294 if let Some(status) = &route.status {
295 any_status = true;
296 on_status.insert(status.clone(), to_id);
297 } else {
298 default = Some(to_id);
299 }
300 }
301 if any_status {
302 return Ok(Routing::Branch { on_status, default });
303 }
304 if let Some(default) = default {
305 return Ok(Routing::Branch {
306 on_status,
307 default: Some(default),
308 });
309 }
310 }
311
312 Ok(Routing::Custom(raw.clone()))
313}
314
315fn resolve_entry(doc: &FlowDoc) -> Option<String> {
316 if let Some(start) = &doc.start {
317 return Some(start.clone());
318 }
319 if doc.nodes.contains_key("in") {
320 return Some("in".to_string());
321 }
322 doc.nodes.keys().next().cloned()
323}