greentic_runner_host/runner/
flow_adapter.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::str::FromStr;
3
4use anyhow::{Context, Result, anyhow};
5use greentic_flow::model::FlowDoc;
6use greentic_types::flow::FlowHasher;
7use greentic_types::{
8    ComponentId, Flow, FlowComponentRef, FlowId, FlowKind, FlowMetadata, InputMapping, Node,
9    NodeId, OutputMapping, Routing, TelemetryHints,
10};
11use indexmap::IndexMap;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value};
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct FlowIR {
17    pub id: String,
18    pub flow_type: String,
19    pub start: Option<String>,
20    pub parameters: Value,
21    pub nodes: IndexMap<String, NodeIR>,
22}
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct NodeIR {
26    pub component: String,
27    pub payload_expr: Value,
28    pub routes: Vec<RouteIR>,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct RouteIR {
33    pub to: Option<String>,
34    #[serde(default)]
35    pub out: bool,
36}
37
38const FLOW_SCHEMA_VERSION: &str = "1.0";
39
40pub fn flow_doc_to_ir(doc: FlowDoc) -> Result<FlowIR> {
41    let mut nodes: IndexMap<String, NodeIR> = IndexMap::new();
42    for (id, node) in doc.nodes {
43        let routes = parse_routes(node.routing)?;
44        nodes.insert(
45            id.clone(),
46            NodeIR {
47                component: node.component,
48                payload_expr: node.payload,
49                routes,
50            },
51        );
52    }
53
54    Ok(FlowIR {
55        id: doc.id,
56        flow_type: doc.flow_type,
57        start: doc.start,
58        parameters: doc.parameters,
59        nodes,
60    })
61}
62
63pub fn flow_ir_to_flow(flow_ir: FlowIR) -> Result<Flow> {
64    let id = FlowId::from_str(&flow_ir.id)
65        .with_context(|| format!("invalid flow id `{}`", flow_ir.id))?;
66    let kind = map_flow_kind(&flow_ir.flow_type)?;
67
68    let mut entrypoints = BTreeMap::new();
69    if let Some(start) = &flow_ir.start {
70        entrypoints.insert("default".to_string(), Value::String(start.clone()));
71    }
72
73    let nodes = map_nodes(flow_ir.nodes)?;
74    let metadata = build_metadata(flow_ir.start, flow_ir.parameters);
75
76    Ok(Flow {
77        schema_version: FLOW_SCHEMA_VERSION.to_string(),
78        id,
79        kind,
80        entrypoints,
81        nodes,
82        metadata,
83    })
84}
85
86fn map_flow_kind(kind: &str) -> Result<FlowKind> {
87    match kind {
88        "messaging" => Ok(FlowKind::Messaging),
89        "event" | "events" => Ok(FlowKind::Event),
90        "component-config" => Ok(FlowKind::ComponentConfig),
91        "job" => Ok(FlowKind::Job),
92        "http" => Ok(FlowKind::Http),
93        other => Err(anyhow!("unknown flow kind `{other}`")),
94    }
95}
96
97fn map_nodes(nodes: IndexMap<String, NodeIR>) -> Result<IndexMap<NodeId, Node, FlowHasher>> {
98    let mut mapped: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
99    for (raw_id, node_ir) in nodes {
100        let node_id =
101            NodeId::from_str(&raw_id).with_context(|| format!("invalid node id `{raw_id}`"))?;
102        let node = map_node(node_id.clone(), node_ir)?;
103        mapped.insert(node_id, node);
104    }
105    Ok(mapped)
106}
107
108fn map_node(node_id: NodeId, node_ir: NodeIR) -> Result<Node> {
109    let component_id = ComponentId::from_str(&node_ir.component).with_context(|| {
110        format!(
111            "invalid component ref `{}` for node {}",
112            node_ir.component,
113            node_id.as_str()
114        )
115    })?;
116    let component = FlowComponentRef {
117        id: component_id,
118        pack_alias: None,
119        operation: None,
120    };
121    let routing = map_routing(&node_ir.routes)?;
122    Ok(Node {
123        id: node_id,
124        component,
125        input: InputMapping {
126            mapping: node_ir.payload_expr,
127        },
128        output: OutputMapping {
129            mapping: Value::Object(JsonMap::new()),
130        },
131        routing,
132        telemetry: TelemetryHints::default(),
133    })
134}
135
136fn map_routing(routes: &[RouteIR]) -> Result<Routing> {
137    if routes.is_empty() {
138        return Ok(Routing::End);
139    }
140
141    if routes.len() == 1 {
142        let route = &routes[0];
143        if route.out || route.to.as_deref() == Some("out") {
144            return Ok(Routing::End);
145        }
146        if let Some(to) = &route.to {
147            let node_id =
148                NodeId::from_str(to).with_context(|| format!("invalid route target `{to}`"))?;
149            return Ok(Routing::Next { node_id });
150        }
151    }
152
153    serde_json::to_value(routes)
154        .map(Routing::Custom)
155        .map_err(|err| anyhow!(err))
156}
157
158fn build_metadata(start: Option<String>, parameters: Value) -> FlowMetadata {
159    let mut extra = JsonMap::new();
160    if let Some(start) = start {
161        extra.insert("start".into(), Value::String(start));
162    }
163    if !parameters.is_null() {
164        extra.insert("parameters".into(), parameters);
165    }
166    FlowMetadata {
167        title: None,
168        description: None,
169        tags: BTreeSet::new(),
170        extra: Value::Object(extra),
171    }
172}
173
174fn parse_routes(raw: Value) -> Result<Vec<RouteIR>> {
175    if raw.is_null() {
176        return Ok(Vec::new());
177    }
178    serde_json::from_value::<Vec<RouteIR>>(raw.clone()).map_err(|err| {
179        anyhow!("failed to parse routes from node routing: {err}; value was {raw:?}")
180    })
181}