greentic_runner_host/runner/
flow_adapter.rs1use std::collections::{BTreeMap, BTreeSet};
2use std::str::FromStr;
3
4use anyhow::{Context, Result, anyhow};
5use greentic_flow::model::FlowDoc;
6use greentic_types::flow::FlowHasher;
7use greentic_types::{
8 ComponentId, Flow, FlowComponentRef, FlowId, FlowKind, FlowMetadata, InputMapping, Node,
9 NodeId, OutputMapping, Routing, TelemetryHints,
10};
11use indexmap::IndexMap;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value};
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct FlowIR {
17 pub id: String,
18 pub flow_type: String,
19 pub start: Option<String>,
20 pub parameters: Value,
21 pub nodes: IndexMap<String, NodeIR>,
22}
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct NodeIR {
26 pub component: String,
27 pub payload_expr: Value,
28 pub routes: Vec<RouteIR>,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct RouteIR {
33 pub to: Option<String>,
34 #[serde(default)]
35 pub out: bool,
36}
37
38const FLOW_SCHEMA_VERSION: &str = "1.0";
39
40pub fn flow_doc_to_ir(doc: FlowDoc) -> Result<FlowIR> {
41 let mut nodes: IndexMap<String, NodeIR> = IndexMap::new();
42 for (id, node) in doc.nodes {
43 let routes = parse_routes(node.routing)?;
44 nodes.insert(
45 id.clone(),
46 NodeIR {
47 component: node.component,
48 payload_expr: node.payload,
49 routes,
50 },
51 );
52 }
53
54 Ok(FlowIR {
55 id: doc.id,
56 flow_type: doc.flow_type,
57 start: doc.start,
58 parameters: doc.parameters,
59 nodes,
60 })
61}
62
63pub fn flow_ir_to_flow(flow_ir: FlowIR) -> Result<Flow> {
64 let id = FlowId::from_str(&flow_ir.id)
65 .with_context(|| format!("invalid flow id `{}`", flow_ir.id))?;
66 let kind = map_flow_kind(&flow_ir.flow_type)?;
67
68 let mut entrypoints = BTreeMap::new();
69 if let Some(start) = &flow_ir.start {
70 entrypoints.insert("default".to_string(), Value::String(start.clone()));
71 }
72
73 let nodes = map_nodes(flow_ir.nodes)?;
74 let metadata = build_metadata(flow_ir.start, flow_ir.parameters);
75
76 Ok(Flow {
77 schema_version: FLOW_SCHEMA_VERSION.to_string(),
78 id,
79 kind,
80 entrypoints,
81 nodes,
82 metadata,
83 })
84}
85
86fn map_flow_kind(kind: &str) -> Result<FlowKind> {
87 match kind {
88 "messaging" => Ok(FlowKind::Messaging),
89 "event" | "events" => Ok(FlowKind::Event),
90 "component-config" => Ok(FlowKind::ComponentConfig),
91 "job" => Ok(FlowKind::Job),
92 "http" => Ok(FlowKind::Http),
93 other => Err(anyhow!("unknown flow kind `{other}`")),
94 }
95}
96
97fn map_nodes(nodes: IndexMap<String, NodeIR>) -> Result<IndexMap<NodeId, Node, FlowHasher>> {
98 let mut mapped: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
99 for (raw_id, node_ir) in nodes {
100 let node_id =
101 NodeId::from_str(&raw_id).with_context(|| format!("invalid node id `{raw_id}`"))?;
102 let node = map_node(node_id.clone(), node_ir)?;
103 mapped.insert(node_id, node);
104 }
105 Ok(mapped)
106}
107
108fn map_node(node_id: NodeId, node_ir: NodeIR) -> Result<Node> {
109 let component_id = ComponentId::from_str(&node_ir.component).with_context(|| {
110 format!(
111 "invalid component ref `{}` for node {}",
112 node_ir.component,
113 node_id.as_str()
114 )
115 })?;
116 let component = FlowComponentRef {
117 id: component_id,
118 pack_alias: None,
119 operation: None,
120 };
121 let routing = map_routing(&node_ir.routes)?;
122 Ok(Node {
123 id: node_id,
124 component,
125 input: InputMapping {
126 mapping: node_ir.payload_expr,
127 },
128 output: OutputMapping {
129 mapping: Value::Object(JsonMap::new()),
130 },
131 routing,
132 telemetry: TelemetryHints::default(),
133 })
134}
135
136fn map_routing(routes: &[RouteIR]) -> Result<Routing> {
137 if routes.is_empty() {
138 return Ok(Routing::End);
139 }
140
141 if routes.len() == 1 {
142 let route = &routes[0];
143 if route.out || route.to.as_deref() == Some("out") {
144 return Ok(Routing::End);
145 }
146 if let Some(to) = &route.to {
147 let node_id =
148 NodeId::from_str(to).with_context(|| format!("invalid route target `{to}`"))?;
149 return Ok(Routing::Next { node_id });
150 }
151 }
152
153 serde_json::to_value(routes)
154 .map(Routing::Custom)
155 .map_err(|err| anyhow!(err))
156}
157
158fn build_metadata(start: Option<String>, parameters: Value) -> FlowMetadata {
159 let mut extra = JsonMap::new();
160 if let Some(start) = start {
161 extra.insert("start".into(), Value::String(start));
162 }
163 if !parameters.is_null() {
164 extra.insert("parameters".into(), parameters);
165 }
166 FlowMetadata {
167 title: None,
168 description: None,
169 tags: BTreeSet::new(),
170 extra: Value::Object(extra),
171 }
172}
173
174fn parse_routes(raw: Value) -> Result<Vec<RouteIR>> {
175 if raw.is_null() {
176 return Ok(Vec::new());
177 }
178 serde_json::from_value::<Vec<RouteIR>>(raw.clone()).map_err(|err| {
179 anyhow!("failed to parse routes from node routing: {err}; value was {raw:?}")
180 })
181}