Skip to main content

greentic_runner_host/runner/
flow_adapter.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::str::FromStr;
3
4use anyhow::{Context, Result, anyhow};
5use greentic_flow::model::{FlowDoc, NodeDoc};
6use greentic_types::flow::FlowHasher;
7use greentic_types::{
8    ComponentId, Flow, FlowComponentRef, FlowId, FlowKind, FlowMetadata, InputMapping, Node,
9    NodeId, OutputMapping, Routing, TelemetryHints,
10};
11use indexmap::IndexMap;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value};
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct FlowIR {
17    pub id: String,
18    pub flow_type: String,
19    pub start: Option<String>,
20    pub parameters: Value,
21    pub nodes: IndexMap<String, NodeIR>,
22}
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct NodeIR {
26    pub component: String,
27    pub payload_expr: Value,
28    pub routes: Vec<RouteIR>,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct RouteIR {
33    pub to: Option<String>,
34    #[serde(default)]
35    pub out: bool,
36}
37
38const FLOW_SCHEMA_VERSION: &str = "1.0";
39
40pub fn flow_doc_to_ir(doc: FlowDoc) -> Result<FlowIR> {
41    let mut nodes: IndexMap<String, NodeIR> = IndexMap::new();
42    for (id, node) in doc.nodes {
43        let (component, payload_expr) = extract_node_payload(&node)
44            .with_context(|| format!("missing component for node `{id}`"))?;
45        let routes = parse_routes(node.routing)?;
46        nodes.insert(
47            id.clone(),
48            NodeIR {
49                component,
50                payload_expr,
51                routes,
52            },
53        );
54    }
55
56    Ok(FlowIR {
57        id: doc.id,
58        flow_type: doc.flow_type,
59        start: doc.start,
60        parameters: doc.parameters,
61        nodes,
62    })
63}
64
65fn extract_node_payload(node: &NodeDoc) -> Result<(String, Value)> {
66    if let Some(operation) = node.operation.as_deref() {
67        return Ok((operation.to_string(), node.payload.clone()));
68    }
69    let Some((component, payload)) = node
70        .raw
71        .iter()
72        .next()
73        .map(|(key, value)| (key.clone(), value.clone()))
74    else {
75        return Err(anyhow!("node missing operation payload"));
76    };
77    Ok((component, payload))
78}
79
80pub fn flow_ir_to_flow(flow_ir: FlowIR) -> Result<Flow> {
81    let id = FlowId::from_str(&flow_ir.id)
82        .with_context(|| format!("invalid flow id `{}`", flow_ir.id))?;
83    let kind = map_flow_kind(&flow_ir.flow_type)?;
84
85    let mut entrypoints = BTreeMap::new();
86    if let Some(start) = &flow_ir.start {
87        entrypoints.insert("default".to_string(), Value::String(start.clone()));
88    }
89
90    let nodes = map_nodes(flow_ir.nodes)?;
91    let metadata = build_metadata(flow_ir.start, flow_ir.parameters);
92
93    Ok(Flow {
94        schema_version: FLOW_SCHEMA_VERSION.to_string(),
95        id,
96        kind,
97        entrypoints,
98        nodes,
99        metadata,
100    })
101}
102
103fn map_flow_kind(kind: &str) -> Result<FlowKind> {
104    match kind {
105        "messaging" => Ok(FlowKind::Messaging),
106        "event" | "events" => Ok(FlowKind::Event),
107        "component-config" => Ok(FlowKind::ComponentConfig),
108        "job" => Ok(FlowKind::Job),
109        "http" => Ok(FlowKind::Http),
110        other => Err(anyhow!("unknown flow kind `{other}`")),
111    }
112}
113
114fn map_nodes(nodes: IndexMap<String, NodeIR>) -> Result<IndexMap<NodeId, Node, FlowHasher>> {
115    let mut mapped: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
116    for (raw_id, node_ir) in nodes {
117        let node_id =
118            NodeId::from_str(&raw_id).with_context(|| format!("invalid node id `{raw_id}`"))?;
119        let node = map_node(node_id.clone(), node_ir)?;
120        mapped.insert(node_id, node);
121    }
122    Ok(mapped)
123}
124
125fn map_node(node_id: NodeId, node_ir: NodeIR) -> Result<Node> {
126    let component_id = ComponentId::from_str(&node_ir.component).with_context(|| {
127        format!(
128            "invalid component ref `{}` for node {}",
129            node_ir.component,
130            node_id.as_str()
131        )
132    })?;
133    let component = FlowComponentRef {
134        id: component_id,
135        pack_alias: None,
136        operation: None,
137    };
138    let routing = map_routing(&node_ir.routes)?;
139    Ok(Node {
140        id: node_id,
141        component,
142        input: InputMapping {
143            mapping: node_ir.payload_expr,
144        },
145        output: OutputMapping {
146            mapping: Value::Object(JsonMap::new()),
147        },
148        routing,
149        telemetry: TelemetryHints::default(),
150    })
151}
152
153fn map_routing(routes: &[RouteIR]) -> Result<Routing> {
154    if routes.is_empty() {
155        return Ok(Routing::End);
156    }
157
158    if routes.len() == 1 {
159        let route = &routes[0];
160        if route.out || route.to.as_deref() == Some("out") {
161            return Ok(Routing::End);
162        }
163        if let Some(to) = &route.to {
164            let node_id =
165                NodeId::from_str(to).with_context(|| format!("invalid route target `{to}`"))?;
166            return Ok(Routing::Next { node_id });
167        }
168    }
169
170    serde_json::to_value(routes)
171        .map(Routing::Custom)
172        .map_err(|err| anyhow!(err))
173}
174
175fn build_metadata(start: Option<String>, parameters: Value) -> FlowMetadata {
176    let mut extra = JsonMap::new();
177    if let Some(start) = start {
178        extra.insert("start".into(), Value::String(start));
179    }
180    if !parameters.is_null() {
181        extra.insert("parameters".into(), parameters);
182    }
183    FlowMetadata {
184        title: None,
185        description: None,
186        tags: BTreeSet::new(),
187        extra: Value::Object(extra),
188    }
189}
190
191fn parse_routes(raw: Value) -> Result<Vec<RouteIR>> {
192    if raw.is_null() {
193        return Ok(Vec::new());
194    }
195    serde_json::from_value::<Vec<RouteIR>>(raw.clone()).map_err(|err| {
196        anyhow!("failed to parse routes from node routing: {err}; value was {raw:?}")
197    })
198}