use indexmap::IndexMap;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use crate::{
error::{FlowError, FlowErrorLocation, Result},
loader::load_ygtc_from_str,
model::{FlowDoc, NodeDoc},
};
#[derive(Debug, Clone)]
pub struct FlowIr {
pub id: String,
pub title: Option<String>,
pub description: Option<String>,
pub kind: String,
pub start: Option<String>,
pub parameters: Value,
pub tags: Vec<String>,
pub schema_version: Option<u32>,
pub entrypoints: IndexMap<String, String>,
pub meta: Option<Value>,
pub nodes: IndexMap<String, NodeIr>,
}
#[derive(Debug, Clone)]
pub struct NodeIr {
pub id: String,
pub operation: String,
pub payload: Value,
pub output: Value,
pub in_map: Option<Value>,
pub out_map: Option<Value>,
pub err_map: Option<Value>,
pub routing: Vec<Route>,
pub telemetry: Option<Value>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct Route {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub to: Option<String>,
#[serde(default, skip_serializing_if = "is_false")]
pub out: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub status: Option<String>,
#[serde(default, skip_serializing_if = "is_false")]
pub reply: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub condition: Option<String>,
}
fn is_false(value: &bool) -> bool {
!*value
}
impl FlowIr {
pub fn from_doc(doc: FlowDoc) -> Result<Self> {
let schema_version = doc.schema_version;
let entrypoints = resolve_entrypoints(&doc);
let mut nodes = IndexMap::new();
for (id, node_doc) in doc.nodes {
let (operation, payload) = extract_operation(&node_doc, &id)?;
let routing = parse_routing(&node_doc, &id)?;
let output = node_doc
.raw
.get("output")
.cloned()
.unwrap_or_else(|| Value::Object(Map::new()));
let in_map = node_doc.raw.get("in_map").cloned();
let out_map = node_doc.raw.get("out_map").cloned();
let err_map = node_doc.raw.get("err_map").cloned();
nodes.insert(
id.clone(),
NodeIr {
id: id.clone(),
operation,
payload,
output,
in_map,
out_map,
err_map,
routing,
telemetry: node_doc
.telemetry
.clone()
.and_then(|t| serde_json::to_value(t).ok()),
},
);
}
Ok(FlowIr {
id: doc.id,
title: doc.title,
description: doc.description,
kind: doc.flow_type,
start: doc.start,
parameters: doc.parameters,
tags: doc.tags,
schema_version,
entrypoints,
meta: doc.meta,
nodes,
})
}
pub fn to_doc(&self) -> Result<FlowDoc> {
let mut nodes: IndexMap<String, NodeDoc> = IndexMap::new();
for (id, node_ir) in &self.nodes {
let mut raw = IndexMap::new();
raw.insert(node_ir.operation.clone(), node_ir.payload.clone());
if !node_ir.output.is_object()
|| !node_ir
.output
.as_object()
.map(|m| m.is_empty())
.unwrap_or(false)
{
raw.insert("output".to_string(), node_ir.output.clone());
}
if let Some(in_map) = node_ir.in_map.as_ref() {
raw.insert("in_map".to_string(), in_map.clone());
}
if let Some(out_map) = node_ir.out_map.as_ref() {
raw.insert("out_map".to_string(), out_map.clone());
}
if let Some(err_map) = node_ir.err_map.as_ref() {
raw.insert("err_map".to_string(), err_map.clone());
}
let routing_value =
serde_json::to_value(&node_ir.routing).map_err(|e| FlowError::Internal {
message: format!("serialize routing for node '{id}': {e}"),
location: FlowErrorLocation::at_path(format!("nodes.{id}.routing")),
})?;
let routing_yaml = if node_ir.routing.len() == 1
&& node_ir.routing[0].out
&& node_ir.routing[0].to.is_none()
&& !node_ir.routing[0].reply
&& node_ir.routing[0].status.is_none()
&& node_ir.routing[0].condition.is_none()
{
Value::String("out".to_string())
} else if node_ir.routing.len() == 1
&& node_ir.routing[0].reply
&& node_ir.routing[0].to.is_none()
&& !node_ir.routing[0].out
&& node_ir.routing[0].status.is_none()
&& node_ir.routing[0].condition.is_none()
{
Value::String("reply".to_string())
} else {
routing_value
};
nodes.insert(
id.clone(),
NodeDoc {
routing: routing_yaml,
telemetry: node_ir
.telemetry
.as_ref()
.and_then(|t| serde_json::from_value(t.clone()).ok()),
operation: Some(node_ir.operation.clone()),
payload: node_ir.payload.clone(),
raw,
},
);
}
let mut entrypoints = IndexMap::new();
for (name, target) in &self.entrypoints {
if name == "default" {
continue;
}
entrypoints.insert(name.clone(), Value::String(target.clone()));
}
let start = self
.entrypoints
.get("default")
.cloned()
.or_else(|| self.start.clone());
Ok(FlowDoc {
id: self.id.clone(),
title: self.title.clone(),
description: self.description.clone(),
flow_type: self.kind.clone(),
start,
parameters: self.parameters.clone(),
tags: self.tags.clone(),
schema_version: self.schema_version,
entrypoints,
meta: self.meta.clone(),
nodes,
})
}
}
fn resolve_entrypoints(doc: &FlowDoc) -> IndexMap<String, String> {
let mut entries = IndexMap::new();
if let Some(start) = &doc.start {
entries.insert("default".to_string(), start.clone());
} else if doc.nodes.contains_key("in") {
entries.insert("default".to_string(), "in".to_string());
} else if let Some(first) = doc.nodes.keys().next() {
entries.insert("default".to_string(), first.clone());
}
for (k, v) in &doc.entrypoints {
if let Some(target) = v.as_str() {
entries.insert(k.clone(), target.to_string());
}
}
entries
}
fn parse_routing(node: &NodeDoc, node_id: &str) -> Result<Vec<Route>> {
if node.routing.is_null() {
return Ok(Vec::new());
}
if let Some(s) = node.routing.as_str() {
return match s {
"out" => Ok(vec![Route {
out: true,
..Route::default()
}]),
"reply" => Ok(vec![Route {
reply: true,
..Route::default()
}]),
other => Err(FlowError::Routing {
node_id: node_id.to_string(),
message: format!("unsupported routing shorthand '{other}'"),
location: FlowErrorLocation::at_path(format!("nodes.{node_id}.routing")),
}),
};
}
#[derive(serde::Deserialize)]
struct RouteDoc {
#[serde(default)]
to: Option<String>,
#[serde(default)]
out: Option<bool>,
#[serde(default)]
status: Option<String>,
#[serde(default)]
reply: Option<bool>,
#[serde(default)]
condition: Option<String>,
}
let routes: Vec<RouteDoc> =
serde_json::from_value(node.routing.clone()).map_err(|e| FlowError::Internal {
message: format!("routing decode for node '{node_id}': {e}"),
location: FlowErrorLocation::at_path(format!("nodes.{node_id}.routing")),
})?;
Ok(routes
.into_iter()
.map(|r| Route {
to: r.to,
out: r.out.unwrap_or(false),
status: r.status,
reply: r.reply.unwrap_or(false),
condition: r.condition,
})
.collect())
}
pub fn parse_flow_to_ir(yaml: &str) -> Result<FlowIr> {
let doc = load_ygtc_from_str(yaml)?;
FlowIr::from_doc(doc)
}
fn extract_operation(node: &NodeDoc, node_id: &str) -> Result<(String, Value)> {
let reserved = [
"routing",
"telemetry",
"output",
"in_map",
"out_map",
"err_map",
"retry",
"timeout",
"when",
"annotations",
"meta",
];
if let Some(exec) = node.raw.get("component.exec") {
let op = node
.raw
.get("operation")
.and_then(Value::as_str)
.or(node.operation.as_deref())
.unwrap_or("");
if op.trim().is_empty() {
return Err(FlowError::Internal {
message: format!("node '{node_id}' missing operation key"),
location: FlowErrorLocation::at_path(format!("nodes.{node_id}")),
});
}
return Ok((op.to_string(), exec.clone()));
}
let mut op_key: Option<String> = None;
let mut payload: Option<Value> = None;
for (k, v) in &node.raw {
if reserved.contains(&k.as_str()) {
continue;
}
if op_key.is_some() {
return Err(FlowError::Internal {
message: format!(
"node '{node_id}' must have exactly one operation key, found multiple"
),
location: FlowErrorLocation::at_path(format!("nodes.{node_id}")),
});
}
op_key = Some(k.clone());
payload = Some(v.clone());
}
if let (Some(k), Some(v)) = (op_key, payload) {
return Ok((k, v));
}
if let Some(op) = &node.operation {
return Ok((op.clone(), node.payload.clone()));
}
Err(FlowError::Internal {
message: format!("node '{node_id}' missing operation key"),
location: FlowErrorLocation::at_path(format!("nodes.{node_id}")),
})
}
#[cfg(test)]
mod tests {
use super::parse_flow_to_ir;
use serde_json::json;
#[test]
fn parse_and_roundtrip_preserves_alias_maps() {
let yaml = r#"
id: alias_flow
type: messaging
schema_version: 2
nodes:
start:
component.exec:
component: repo://demo/component
config:
greeting: hi
operation: run
in_map:
source: $.input
out_map:
target: $.output
err_map:
target: $.error
routing: out
"#;
let flow = parse_flow_to_ir(yaml).expect("parse flow");
let node = flow.nodes.get("start").expect("start node");
assert_eq!(node.in_map.as_ref(), Some(&json!({ "source": "$.input" })));
assert_eq!(
node.out_map.as_ref(),
Some(&json!({ "target": "$.output" }))
);
assert_eq!(node.err_map.as_ref(), Some(&json!({ "target": "$.error" })));
let doc = flow.to_doc().expect("to doc");
let raw = &doc.nodes.get("start").expect("start doc node").raw;
assert_eq!(raw.get("in_map"), Some(&json!({ "source": "$.input" })));
assert_eq!(raw.get("out_map"), Some(&json!({ "target": "$.output" })));
assert_eq!(raw.get("err_map"), Some(&json!({ "target": "$.error" })));
}
}