agent_stream_kit/
flow.rs

1use std::sync::atomic::AtomicUsize;
2
3use serde::{Deserialize, Serialize};
4use serde_json::{Map, Value};
5
6use crate::FnvIndexMap;
7use crate::agent::AgentSpec;
8use crate::askit::ASKit;
9use crate::definition::{AgentDefinition, AgentDefinitions};
10use crate::error::AgentError;
11
12pub type AgentFlows = FnvIndexMap<String, AgentFlow>;
13
14#[derive(Clone, Debug, Serialize, Deserialize)]
15pub struct AgentFlow {
16    #[serde(skip_serializing_if = "String::is_empty")]
17    id: String,
18
19    name: String,
20
21    nodes: Vec<AgentFlowNode>,
22
23    edges: Vec<AgentFlowEdge>,
24
25    #[serde(flatten)]
26    pub extensions: FnvIndexMap<String, Value>,
27}
28
29impl AgentFlow {
30    pub fn new(name: String) -> Self {
31        Self {
32            id: new_id(),
33            name,
34            nodes: Vec::new(),
35            edges: Vec::new(),
36            extensions: FnvIndexMap::default(),
37        }
38    }
39
40    pub fn id(&self) -> &str {
41        &self.id
42    }
43
44    pub fn name(&self) -> &str {
45        &self.name
46    }
47
48    pub fn set_name(&mut self, new_name: String) {
49        self.name = new_name;
50    }
51
52    pub fn nodes(&self) -> &Vec<AgentFlowNode> {
53        &self.nodes
54    }
55
56    pub fn add_node(&mut self, node: AgentFlowNode) {
57        self.nodes.push(node);
58    }
59
60    pub fn remove_node(&mut self, node_id: &str) {
61        self.nodes.retain(|node| node.id != node_id);
62    }
63
64    pub fn set_nodes(&mut self, nodes: Vec<AgentFlowNode>) {
65        self.nodes = nodes;
66    }
67
68    pub fn edges(&self) -> &Vec<AgentFlowEdge> {
69        &self.edges
70    }
71
72    pub fn add_edge(&mut self, edge: AgentFlowEdge) {
73        self.edges.push(edge);
74    }
75
76    pub fn remove_edge(&mut self, edge_id: &str) -> Option<AgentFlowEdge> {
77        if let Some(edge) = self.edges.iter().find(|edge| edge.id == edge_id).cloned() {
78            self.edges.retain(|e| e.id != edge_id);
79            Some(edge)
80        } else {
81            None
82        }
83    }
84
85    pub fn set_edges(&mut self, edges: Vec<AgentFlowEdge>) {
86        self.edges = edges;
87    }
88
89    pub async fn start(&self, askit: &ASKit) -> Result<(), AgentError> {
90        for agent in self.nodes.iter() {
91            if !agent.enabled {
92                continue;
93            }
94            askit.start_agent(&agent.id).await.unwrap_or_else(|e| {
95                log::error!("Failed to start agent {}: {}", agent.id, e);
96            });
97        }
98        Ok(())
99    }
100
101    pub async fn stop(&self, askit: &ASKit) -> Result<(), AgentError> {
102        for agent in self.nodes.iter() {
103            if !agent.enabled {
104                continue;
105            }
106            askit.stop_agent(&agent.id).await.unwrap_or_else(|e| {
107                log::error!("Failed to stop agent {}: {}", agent.id, e);
108            });
109        }
110        Ok(())
111    }
112
113    pub fn disable_all_nodes(&mut self) {
114        for node in self.nodes.iter_mut() {
115            node.enabled = false;
116        }
117    }
118
119    pub fn to_json(&self) -> Result<String, AgentError> {
120        let json = serde_json::to_string_pretty(self)
121            .map_err(|e| AgentError::SerializationError(e.to_string()))?;
122        Ok(json)
123    }
124
125    pub fn from_json(json_str: &str) -> Result<Self, AgentError> {
126        let mut flow: AgentFlow = serde_json::from_str(json_str)
127            .map_err(|e| AgentError::SerializationError(e.to_string()))?;
128        flow.id = new_id();
129        Ok(flow)
130    }
131
132    /// Deserialize a flow with a compatibility layer for legacy node formats.
133    /// Falls back to parsing the old shape and populates spec.inputs/outputs from AgentDefinitions.
134    pub fn from_json_with_defs(
135        json_str: &str,
136        defs: &AgentDefinitions,
137    ) -> Result<Self, AgentError> {
138        match serde_json::from_str::<AgentFlow>(json_str) {
139            Ok(mut flow) => {
140                flow.id = new_id();
141                Ok(flow)
142            }
143            Err(deserialize_err) => {
144                let legacy_json: Value = serde_json::from_str(json_str).map_err(|e| {
145                    AgentError::SerializationError(format!("Failed to parse AgentFlow json: {}", e))
146                })?;
147
148                let converted_json = convert_legacy_flow(legacy_json, defs).map_err(|e| {
149                    AgentError::SerializationError(format!(
150                        "Failed to deserialize AgentFlow ({}); legacy format conversion failed: {}",
151                        deserialize_err, e
152                    ))
153                })?;
154
155                let mut flow: AgentFlow = serde_json::from_value(converted_json).map_err(|e| {
156                    AgentError::SerializationError(format!(
157                        "Failed to deserialize converted AgentFlow: {}",
158                        e
159                    ))
160                })?;
161                flow.id = new_id();
162                Ok(flow)
163            }
164        }
165    }
166}
167
168pub fn copy_sub_flow(
169    nodes: &Vec<AgentFlowNode>,
170    edges: &Vec<AgentFlowEdge>,
171) -> (Vec<AgentFlowNode>, Vec<AgentFlowEdge>) {
172    let mut new_nodes = Vec::new();
173    let mut node_id_map = FnvIndexMap::default();
174    for node in nodes {
175        let new_id = new_id();
176        node_id_map.insert(node.id.clone(), new_id.clone());
177        let mut new_node = node.clone();
178        new_node.id = new_id;
179        new_nodes.push(new_node);
180    }
181
182    let mut new_edges = Vec::new();
183    for edge in edges {
184        let Some(source) = node_id_map.get(&edge.source) else {
185            continue;
186        };
187        let Some(target) = node_id_map.get(&edge.target) else {
188            continue;
189        };
190        let mut new_edge = edge.clone();
191        new_edge.id = new_id();
192        new_edge.source = source.clone();
193        new_edge.target = target.clone();
194        new_edges.push(new_edge);
195    }
196
197    (new_nodes, new_edges)
198}
199
200// AgentFlowNode
201
202#[derive(Debug, Serialize, Deserialize, Clone)]
203pub struct AgentFlowNode {
204    pub id: String,
205
206    pub enabled: bool,
207
208    pub spec: AgentSpec,
209
210    #[serde(flatten)]
211    pub extensions: FnvIndexMap<String, Value>,
212}
213
214impl AgentFlowNode {
215    pub fn new(def: &AgentDefinition) -> Result<Self, AgentError> {
216        let spec = def.to_spec();
217
218        Ok(Self {
219            id: new_id(),
220            enabled: false,
221            spec,
222            extensions: FnvIndexMap::default(),
223        })
224    }
225}
226
227static NODE_ID_COUNTER: AtomicUsize = AtomicUsize::new(1);
228
229fn new_id() -> String {
230    return NODE_ID_COUNTER
231        .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
232        .to_string();
233}
234
235fn convert_legacy_flow(
236    mut legacy_json: Value,
237    defs: &AgentDefinitions,
238) -> Result<Value, AgentError> {
239    let Some(obj) = legacy_json.as_object_mut() else {
240        return Err(AgentError::SerializationError(
241            "AgentFlow json is not an object".to_string(),
242        ));
243    };
244
245    let Some(nodes_val) = obj.get_mut("nodes") else {
246        return Err(AgentError::SerializationError(
247            "AgentFlow json missing nodes".to_string(),
248        ));
249    };
250
251    let Some(nodes) = nodes_val.as_array_mut() else {
252        return Err(AgentError::SerializationError(
253            "AgentFlow nodes is not an array".to_string(),
254        ));
255    };
256
257    for node in nodes.iter_mut() {
258        convert_legacy_node(node, defs)?;
259    }
260
261    Ok(legacy_json)
262}
263
264fn convert_legacy_node(node_val: &mut Value, defs: &AgentDefinitions) -> Result<(), AgentError> {
265    let Some(node_obj) = node_val.as_object_mut() else {
266        return Err(AgentError::SerializationError(
267            "AgentFlow node is not an object".to_string(),
268        ));
269    };
270
271    if node_obj.contains_key("spec") {
272        return Ok(());
273    }
274
275    let def_name = node_obj
276        .get("def_name")
277        .and_then(|v| v.as_str())
278        .map(|s| s.to_string())
279        .ok_or_else(|| {
280            AgentError::SerializationError("Legacy node missing def_name".to_string())
281        })?;
282
283    let (inputs, outputs, def_display_configs) = defs
284        .get(def_name.as_str())
285        .map(|def| {
286            (
287                def.inputs.clone().unwrap_or_default(),
288                def.outputs.clone().unwrap_or_default(),
289                def.display_configs.clone(),
290            )
291        })
292        .unwrap_or((Vec::new(), Vec::new(), None));
293
294    let configs = node_obj.remove("configs").unwrap_or(Value::Null);
295    let display_configs = node_obj
296        .remove("display_configs")
297        .or_else(|| def_display_configs.and_then(|cfg| serde_json::to_value(cfg).ok()));
298
299    let mut spec_map = Map::new();
300    spec_map.insert("def_name".into(), Value::String(def_name.to_string()));
301    if !inputs.is_empty() {
302        spec_map.insert(
303            "inputs".into(),
304            serde_json::to_value(inputs).map_err(|e| {
305                AgentError::SerializationError(format!(
306                    "Failed to serialize inputs for legacy node {}: {}",
307                    def_name, e
308                ))
309            })?,
310        );
311    }
312    if !outputs.is_empty() {
313        spec_map.insert(
314            "outputs".into(),
315            serde_json::to_value(outputs).map_err(|e| {
316                AgentError::SerializationError(format!(
317                    "Failed to serialize outputs for legacy node {}: {}",
318                    def_name, e
319                ))
320            })?,
321        );
322    }
323    if !configs.is_null() {
324        spec_map.insert("configs".into(), configs);
325    }
326    if let Some(display_configs) = display_configs {
327        spec_map.insert("display_configs".into(), display_configs);
328    }
329
330    node_obj.remove("def_name");
331    node_obj.insert("spec".into(), Value::Object(spec_map));
332
333    Ok(())
334}
335
336// AgentFlowEdge
337
338#[derive(Debug, Default, Serialize, Deserialize, Clone)]
339pub struct AgentFlowEdge {
340    pub id: String,
341    pub source: String,
342    pub source_handle: String,
343    pub target: String,
344    pub target_handle: String,
345}