greentic_runner_host/runner/
flow_adapter.rs1use std::collections::{BTreeMap, BTreeSet};
2use std::str::FromStr;
3
4use anyhow::{Context, Result, anyhow};
5use greentic_flow::model::{FlowDoc, NodeDoc};
6use greentic_types::flow::FlowHasher;
7use greentic_types::{
8 ComponentId, Flow, FlowComponentRef, FlowId, FlowKind, FlowMetadata, InputMapping, Node,
9 NodeId, OutputMapping, Routing, TelemetryHints,
10};
11use indexmap::IndexMap;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value};
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct FlowIR {
17 pub id: String,
18 pub flow_type: String,
19 pub start: Option<String>,
20 pub parameters: Value,
21 pub nodes: IndexMap<String, NodeIR>,
22}
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct NodeIR {
26 pub component: String,
27 pub payload_expr: Value,
28 pub routes: Vec<RouteIR>,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct RouteIR {
33 pub to: Option<String>,
34 #[serde(default)]
35 pub out: bool,
36}
37
38const FLOW_SCHEMA_VERSION: &str = "1.0";
39
40pub fn flow_doc_to_ir(doc: FlowDoc) -> Result<FlowIR> {
41 let mut nodes: IndexMap<String, NodeIR> = IndexMap::new();
42 for (id, node) in doc.nodes {
43 let (component, payload_expr) = extract_node_payload(&node)
44 .with_context(|| format!("missing component for node `{id}`"))?;
45 let routes = parse_routes(node.routing)?;
46 nodes.insert(
47 id.clone(),
48 NodeIR {
49 component,
50 payload_expr,
51 routes,
52 },
53 );
54 }
55
56 Ok(FlowIR {
57 id: doc.id,
58 flow_type: doc.flow_type,
59 start: doc.start,
60 parameters: doc.parameters,
61 nodes,
62 })
63}
64
65fn extract_node_payload(node: &NodeDoc) -> Result<(String, Value)> {
66 if let Some(operation) = node.operation.as_deref() {
67 return Ok((operation.to_string(), node.payload.clone()));
68 }
69 let Some((component, payload)) = node
70 .raw
71 .iter()
72 .next()
73 .map(|(key, value)| (key.clone(), value.clone()))
74 else {
75 return Err(anyhow!("node missing operation payload"));
76 };
77 Ok((component, payload))
78}
79
80pub fn flow_ir_to_flow(flow_ir: FlowIR) -> Result<Flow> {
81 let id = FlowId::from_str(&flow_ir.id)
82 .with_context(|| format!("invalid flow id `{}`", flow_ir.id))?;
83 let kind = map_flow_kind(&flow_ir.flow_type)?;
84
85 let mut entrypoints = BTreeMap::new();
86 if let Some(start) = &flow_ir.start {
87 entrypoints.insert("default".to_string(), Value::String(start.clone()));
88 }
89
90 let nodes = map_nodes(flow_ir.nodes)?;
91 let metadata = build_metadata(flow_ir.start, flow_ir.parameters);
92
93 Ok(Flow {
94 schema_version: FLOW_SCHEMA_VERSION.to_string(),
95 id,
96 kind,
97 entrypoints,
98 nodes,
99 metadata,
100 })
101}
102
103fn map_flow_kind(kind: &str) -> Result<FlowKind> {
104 match kind {
105 "messaging" => Ok(FlowKind::Messaging),
106 "event" | "events" => Ok(FlowKind::Event),
107 "component-config" => Ok(FlowKind::ComponentConfig),
108 "job" => Ok(FlowKind::Job),
109 "http" => Ok(FlowKind::Http),
110 other => Err(anyhow!("unknown flow kind `{other}`")),
111 }
112}
113
114fn map_nodes(nodes: IndexMap<String, NodeIR>) -> Result<IndexMap<NodeId, Node, FlowHasher>> {
115 let mut mapped: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
116 for (raw_id, node_ir) in nodes {
117 let node_id =
118 NodeId::from_str(&raw_id).with_context(|| format!("invalid node id `{raw_id}`"))?;
119 let node = map_node(node_id.clone(), node_ir)?;
120 mapped.insert(node_id, node);
121 }
122 Ok(mapped)
123}
124
125fn map_node(node_id: NodeId, node_ir: NodeIR) -> Result<Node> {
126 let component_id = ComponentId::from_str(&node_ir.component).with_context(|| {
127 format!(
128 "invalid component ref `{}` for node {}",
129 node_ir.component,
130 node_id.as_str()
131 )
132 })?;
133 let component = FlowComponentRef {
134 id: component_id,
135 pack_alias: None,
136 operation: None,
137 };
138 let routing = map_routing(&node_ir.routes)?;
139 Ok(Node {
140 id: node_id,
141 component,
142 input: InputMapping {
143 mapping: node_ir.payload_expr,
144 },
145 output: OutputMapping {
146 mapping: Value::Object(JsonMap::new()),
147 },
148 routing,
149 telemetry: TelemetryHints::default(),
150 })
151}
152
153fn map_routing(routes: &[RouteIR]) -> Result<Routing> {
154 if routes.is_empty() {
155 return Ok(Routing::End);
156 }
157
158 if routes.len() == 1 {
159 let route = &routes[0];
160 if route.out || route.to.as_deref() == Some("out") {
161 return Ok(Routing::End);
162 }
163 if let Some(to) = &route.to {
164 let node_id =
165 NodeId::from_str(to).with_context(|| format!("invalid route target `{to}`"))?;
166 return Ok(Routing::Next { node_id });
167 }
168 }
169
170 serde_json::to_value(routes)
171 .map(Routing::Custom)
172 .map_err(|err| anyhow!(err))
173}
174
175fn build_metadata(start: Option<String>, parameters: Value) -> FlowMetadata {
176 let mut extra = JsonMap::new();
177 if let Some(start) = start {
178 extra.insert("start".into(), Value::String(start));
179 }
180 if !parameters.is_null() {
181 extra.insert("parameters".into(), parameters);
182 }
183 FlowMetadata {
184 title: None,
185 description: None,
186 tags: BTreeSet::new(),
187 extra: Value::Object(extra),
188 }
189}
190
191fn parse_routes(raw: Value) -> Result<Vec<RouteIR>> {
192 if raw.is_null() {
193 return Ok(Vec::new());
194 }
195 serde_json::from_value::<Vec<RouteIR>>(raw.clone()).map_err(|err| {
196 anyhow!("failed to parse routes from node routing: {err}; value was {raw:?}")
197 })
198}