mod types;
pub use types::{
ChannelInfo, ConditionalEdgeInfo, EdgeInfo, GraphPolicySummary, GraphTopology, NodeInfo,
NodePolicySummary, RouteInfo, ValidationReport, WaitingEdgeInfo,
};
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use crate::Result;
use crate::graph::builder::{END, GraphBuilder, START};
use crate::graph::compiled::CompiledGraph;
use crate::language::{Blueprint, Routing};
struct NodePart {
id: String,
kind: Option<String>,
command_routing: bool,
subgraph: bool,
interrupt: bool,
deferred: bool,
command_destinations: Vec<String>,
metadata: BTreeMap<String, String>,
}
struct TopologyParts {
graph_id: String,
name: Option<String>,
recursion_limit: usize,
parallel: bool,
max_concurrency: Option<usize>,
node_timeout_ms: Option<u128>,
nodes: Vec<NodePart>,
edges: Vec<(String, String)>,
conditional: Vec<(String, Vec<(String, String)>)>,
waiting: Vec<(String, Vec<String>)>,
channels: Vec<(String, String)>,
}
fn build_topology(parts: TopologyParts) -> GraphTopology {
let TopologyParts {
graph_id,
name,
recursion_limit,
parallel,
max_concurrency,
node_timeout_ms,
nodes,
edges,
conditional,
waiting,
channels,
} = parts;
let mut waiting_edges: Vec<WaitingEdgeInfo> = waiting
.into_iter()
.map(|(target, mut predecessors)| {
predecessors.sort();
predecessors.dedup();
WaitingEdgeInfo {
target,
predecessors,
}
})
.collect();
waiting_edges.sort();
let waiting_pairs: BTreeSet<(String, String)> = waiting_edges
.iter()
.flat_map(|w| {
w.predecessors
.iter()
.map(move |p| (p.clone(), w.target.clone()))
})
.collect();
let barrier_targets: BTreeSet<String> =
waiting_edges.iter().map(|w| w.target.clone()).collect();
let mut entry: Option<String> = None;
let mut direct: Vec<EdgeInfo> = Vec::new();
let mut finish_nodes: Vec<String> = Vec::new();
for (from, to) in edges {
if from == START {
entry = Some(to);
} else if to == END {
finish_nodes.push(from);
} else if waiting_pairs.contains(&(from.clone(), to.clone())) {
} else {
direct.push(EdgeInfo { from, to });
}
}
let mut conditional_edges: Vec<ConditionalEdgeInfo> = conditional
.into_iter()
.map(|(from, routes)| {
let mut routes: Vec<RouteInfo> = routes
.into_iter()
.map(|(label, target)| RouteInfo { label, target })
.collect();
routes.sort();
ConditionalEdgeInfo { from, routes }
})
.collect();
let channels: Vec<ChannelInfo> = channels
.into_iter()
.map(|(name, reducer)| ChannelInfo { name, reducer })
.collect();
direct.sort();
conditional_edges.sort_by(|a, b| a.from.cmp(&b.from));
finish_nodes.sort();
finish_nodes.dedup();
let conditional_set: BTreeSet<&str> =
conditional_edges.iter().map(|c| c.from.as_str()).collect();
let direct_from_set: BTreeSet<&str> = direct.iter().map(|e| e.from.as_str()).collect();
let finish_set: BTreeSet<&str> = finish_nodes.iter().map(String::as_str).collect();
let mut node_infos: Vec<NodeInfo> = nodes
.into_iter()
.map(|n| {
let routing = if n.command_routing {
"command"
} else if conditional_set.contains(n.id.as_str()) {
"conditional"
} else if finish_set.contains(n.id.as_str()) {
"terminal"
} else if direct_from_set.contains(n.id.as_str()) {
"static"
} else {
"unrouted"
};
let policy = NodePolicySummary {
routing: routing.to_string(),
barrier: barrier_targets.contains(&n.id),
interrupt: n.interrupt,
deferred: n.deferred,
subgraph: n.subgraph,
};
let mut command_destinations = n.command_destinations;
command_destinations.sort();
command_destinations.dedup();
NodeInfo {
id: n.id,
kind: n.kind,
command_routing: n.command_routing,
subgraph: n.subgraph,
interrupt: n.interrupt,
deferred: n.deferred,
command_destinations,
metadata: n.metadata,
policy,
}
})
.collect();
node_infos.sort_by(|a, b| a.id.cmp(&b.id));
let policy = GraphPolicySummary {
recursion_limit,
parallel,
max_concurrency,
node_timeout_ms,
};
let validation = validate(
&node_infos,
entry.as_deref(),
&direct,
&conditional_edges,
&waiting_edges,
&finish_nodes,
);
GraphTopology {
graph_id,
name,
entry,
recursion_limit,
parallel,
nodes: node_infos,
edges: direct,
conditional_edges,
waiting_edges,
finish_nodes,
channels,
policy,
validation,
}
}
fn validate(
nodes: &[NodeInfo],
entry: Option<&str>,
direct: &[EdgeInfo],
conditional: &[ConditionalEdgeInfo],
waiting: &[WaitingEdgeInfo],
finish_nodes: &[String],
) -> ValidationReport {
let declared: BTreeSet<&str> = nodes.iter().map(|n| n.id.as_str()).collect();
let known = |id: &str| id == START || id == END || declared.contains(id);
let mut errors: Vec<String> = Vec::new();
let mut warnings: Vec<String> = Vec::new();
match entry {
None => errors.push("graph has no entry node (no START edge)".to_string()),
Some(e) if !known(e) => {
errors.push(format!("entry node `{e}` is not declared"));
}
Some(_) => {}
}
for edge in direct {
if !known(&edge.from) {
errors.push(format!("edge source `{}` is not declared", edge.from));
}
if !known(&edge.to) {
errors.push(format!("edge target `{}` is not declared", edge.to));
}
}
for cond in conditional {
if !known(&cond.from) {
errors.push(format!(
"conditional source `{}` is not declared",
cond.from
));
}
for route in &cond.routes {
if !known(&route.target) {
errors.push(format!(
"conditional route `{}` of `{}` targets undeclared node `{}`",
route.label, cond.from, route.target
));
}
}
}
for w in waiting {
if !known(&w.target) {
errors.push(format!("barrier target `{}` is not declared", w.target));
}
for pred in &w.predecessors {
if !known(pred) {
errors.push(format!(
"barrier predecessor `{pred}` of `{}` is not declared",
w.target
));
}
}
}
for node in nodes {
for dest in &node.command_destinations {
if !known(dest) {
errors.push(format!(
"command destination `{dest}` of `{}` is not declared",
node.id
));
}
}
}
for f in finish_nodes {
if !known(f) {
errors.push(format!("finish node `{f}` is not declared"));
}
}
if let Some(entry) = entry {
let mut successors: BTreeMap<&str, BTreeSet<&str>> = BTreeMap::new();
for edge in direct {
successors.entry(&edge.from).or_default().insert(&edge.to);
}
for cond in conditional {
let entry_set = successors.entry(&cond.from).or_default();
for route in &cond.routes {
entry_set.insert(&route.target);
}
}
for w in waiting {
for pred in &w.predecessors {
successors.entry(pred).or_default().insert(&w.target);
}
}
for node in nodes {
for dest in &node.command_destinations {
successors.entry(&node.id).or_default().insert(dest);
}
}
let mut seen: BTreeSet<&str> = BTreeSet::new();
let mut queue: VecDeque<&str> = VecDeque::new();
seen.insert(entry);
queue.push_back(entry);
while let Some(node) = queue.pop_front() {
if let Some(next) = successors.get(node) {
for &n in next {
if seen.insert(n) {
queue.push_back(n);
}
}
}
}
for node in nodes {
if !seen.contains(node.id.as_str()) {
warnings.push(format!("node `{}` is unreachable from the entry", node.id));
}
}
}
for node in nodes {
if node.policy.routing == "unrouted" {
warnings.push(format!(
"node `{}` has no outgoing route and is not terminal",
node.id
));
}
}
errors.sort();
errors.dedup();
warnings.sort();
warnings.dedup();
ValidationReport {
ok: errors.is_empty(),
errors,
warnings,
}
}
fn node_parts<'a, I>(
ids: I,
is_command: impl Fn(&crate::harness::ids::NodeId) -> bool,
meta: &std::collections::HashMap<crate::harness::ids::NodeId, crate::graph::builder::NodeMeta>,
) -> Vec<NodePart>
where
I: IntoIterator<Item = &'a crate::harness::ids::NodeId>,
{
ids.into_iter()
.map(|id| {
let m = meta.get(id);
NodePart {
id: id.to_string(),
kind: m.and_then(|m| m.kind.clone()),
command_routing: is_command(id),
subgraph: m.is_some_and(|m| m.subgraph),
interrupt: m.is_some_and(|m| m.interrupt),
deferred: m.is_some_and(|m| m.deferred),
command_destinations: m
.map(|m| {
m.command_destinations
.iter()
.map(|d| d.to_string())
.collect()
})
.unwrap_or_default(),
metadata: m.map(|m| m.metadata.clone()).unwrap_or_default(),
}
})
.collect()
}
impl<State, Update> CompiledGraph<State, Update> {
pub fn topology(&self) -> GraphTopology {
let nodes = node_parts(
self.nodes.keys(),
|id| self.command_nodes.contains(id),
&self.node_meta,
);
let edges = self
.edges
.iter()
.map(|(from, to)| (from.to_string(), to.to_string()))
.collect();
let conditional = self
.branches
.iter()
.map(|(from, branch)| {
let routes = branch
.routes
.iter()
.map(|(label, target)| (label.clone(), target.to_string()))
.collect();
(from.to_string(), routes)
})
.collect();
let waiting = self
.waiting
.iter()
.map(|(target, preds)| {
(
target.to_string(),
preds.iter().map(ToString::to_string).collect(),
)
})
.collect();
build_topology(TopologyParts {
graph_id: self.graph_id().to_string(),
name: self.name().map(str::to_string),
recursion_limit: self.recursion_limit,
parallel: self.parallel,
max_concurrency: self.max_concurrency,
node_timeout_ms: self.node_timeout.map(|d| d.as_millis()),
nodes,
edges,
conditional,
waiting,
channels: Vec::new(),
})
}
}
impl<State, Update> GraphBuilder<State, Update> {
pub fn topology(&self) -> GraphTopology {
let nodes = node_parts(
self.nodes.keys(),
|id| self.command_nodes.contains(id),
&self.node_meta,
);
let edges = self
.edges
.iter()
.map(|(from, to)| (from.to_string(), to.to_string()))
.collect();
let conditional = self
.branches
.iter()
.map(|(from, branch)| {
let routes = branch
.routes
.iter()
.map(|(label, target)| (label.clone(), target.to_string()))
.collect();
(from.to_string(), routes)
})
.collect();
let waiting = self
.waiting
.iter()
.map(|(target, preds)| {
(
target.to_string(),
preds.iter().map(ToString::to_string).collect(),
)
})
.collect();
build_topology(TopologyParts {
graph_id: self.graph_id.to_string(),
name: self.name.clone(),
recursion_limit: self.recursion_limit,
parallel: self.parallel,
max_concurrency: self.max_concurrency,
node_timeout_ms: self.node_timeout.map(|d| d.as_millis()),
nodes,
edges,
conditional,
waiting,
channels: Vec::new(),
})
}
}
pub fn blueprint_to_topology(blueprint: &Blueprint) -> GraphTopology {
let recursion_limit = blueprint
.defaults
.iter()
.find(|(key, _)| key == "recursion_limit")
.and_then(|(_, value)| match value {
crate::language::Literal::Num(n) if *n >= 0.0 => Some(*n as usize),
_ => None,
})
.unwrap_or(0);
let nodes = blueprint
.nodes
.iter()
.map(|n| {
let subgraph = n.kind == "subgraph";
NodePart {
id: n.name.clone(),
kind: Some(n.kind.clone()),
command_routing: false,
subgraph,
interrupt: false,
deferred: false,
command_destinations: Vec::new(),
metadata: BTreeMap::new(),
}
})
.collect();
let mut edges: Vec<(String, String)> = blueprint
.edges
.iter()
.map(|e| (e.from.clone(), e.to.clone()))
.collect();
edges.push((START.to_string(), blueprint.start.clone()));
let mut conditional: Vec<(String, Vec<(String, String)>)> = Vec::new();
for node in &blueprint.nodes {
match &node.routing {
Routing::Next(target) => edges.push((node.name.clone(), target.clone())),
Routing::Terminal => edges.push((node.name.clone(), END.to_string())),
Routing::Conditional(routes) => {
conditional.push((node.name.clone(), routes.clone()));
}
}
}
let channels = blueprint
.channels
.iter()
.map(|c| (c.name.clone(), c.reducer.clone()))
.collect();
build_topology(TopologyParts {
graph_id: blueprint.graph_id.clone(),
name: None,
recursion_limit,
parallel: false,
max_concurrency: None,
node_timeout_ms: None,
nodes,
edges,
conditional,
waiting: Vec::new(),
channels,
})
}
pub fn to_json(topology: &GraphTopology) -> String {
serde_json::to_string_pretty(topology).unwrap_or_else(|_| "{}".to_string())
}
pub fn from_json(json: &str) -> Result<GraphTopology> {
Ok(serde_json::from_str(json)?)
}
pub fn to_mermaid(topology: &GraphTopology) -> String {
let mut out = String::from("flowchart TD\n");
out.push_str(" START([START])\n");
out.push_str(" END([END])\n");
for node in &topology.nodes {
let id = mermaid_id(&node.id);
let label = escape_label(&node.id);
if node.subgraph {
out.push_str(&format!(" {id}[[\"{label}\"]]\n"));
} else {
out.push_str(&format!(" {id}[\"{label}\"]\n"));
}
}
emit_marker_class(&mut out, topology, "subgraph", |n| n.subgraph);
emit_marker_class(&mut out, topology, "interrupt", |n| n.interrupt);
emit_marker_class(&mut out, topology, "deferred", |n| n.deferred);
out.push('\n');
if let Some(entry) = &topology.entry {
out.push_str(&format!(" START --> {}\n", mermaid_ref(entry)));
}
for edge in &topology.edges {
out.push_str(&format!(
" {} --> {}\n",
mermaid_ref(&edge.from),
mermaid_ref(&edge.to)
));
}
for cond in &topology.conditional_edges {
for route in &cond.routes {
out.push_str(&format!(
" {} -- {} --> {}\n",
mermaid_ref(&cond.from),
escape_label(&route.label),
mermaid_ref(&route.target)
));
}
}
for w in &topology.waiting_edges {
for pred in &w.predecessors {
out.push_str(&format!(
" {} -. barrier .-> {}\n",
mermaid_ref(pred),
mermaid_ref(&w.target)
));
}
}
for node in &topology.nodes {
for dest in &node.command_destinations {
out.push_str(&format!(
" {} -. goto .-> {}\n",
mermaid_ref(&node.id),
mermaid_ref(dest)
));
}
}
for node in &topology.finish_nodes {
out.push_str(&format!(" {} --> END\n", mermaid_ref(node)));
}
out
}
fn emit_marker_class(
out: &mut String,
topology: &GraphTopology,
class: &str,
pred: impl Fn(&NodeInfo) -> bool,
) {
let matching: Vec<&NodeInfo> = topology.nodes.iter().filter(|n| pred(n)).collect();
if matching.is_empty() {
return;
}
out.push_str(&format!(" classDef {class} stroke-dasharray: 4 2;\n"));
for node in matching {
out.push_str(&format!(" class {} {class}\n", mermaid_id(&node.id)));
}
}
pub fn blueprint_to_mermaid(blueprint: &Blueprint) -> String {
to_mermaid(&blueprint_to_topology(blueprint))
}
pub fn blueprint_to_json(blueprint: &Blueprint) -> String {
to_json(&blueprint_to_topology(blueprint))
}
fn mermaid_ref(id: &str) -> String {
if id == START || id == "START" {
"START".to_string()
} else if id == END || id == "END" {
"END".to_string()
} else {
mermaid_id(id)
}
}
fn mermaid_id(id: &str) -> String {
let mut sanitized = String::with_capacity(id.len() + 2);
sanitized.push_str("n_");
for ch in id.chars() {
if ch.is_ascii_alphanumeric() || ch == '_' {
sanitized.push(ch);
} else {
sanitized.push('_');
}
}
sanitized
}
fn escape_label(label: &str) -> String {
label.replace('"', """)
}
#[cfg(test)]
mod test;