1use std::sync::atomic::AtomicUsize;
2
3use serde::{Deserialize, Serialize};
4use serde_json::{Map, Value};
5
6use crate::FnvIndexMap;
7use crate::agent::AgentSpec;
8use crate::askit::ASKit;
9use crate::definition::{AgentDefinition, AgentDefinitions};
10use crate::error::AgentError;
11
12pub type AgentFlows = FnvIndexMap<String, AgentFlow>;
13
14#[derive(Clone, Debug, Serialize, Deserialize)]
15pub struct AgentFlow {
16 #[serde(skip_serializing_if = "String::is_empty")]
17 id: String,
18
19 name: String,
20
21 nodes: Vec<AgentFlowNode>,
22
23 edges: Vec<AgentFlowEdge>,
24
25 #[serde(flatten)]
26 pub extensions: FnvIndexMap<String, Value>,
27}
28
29impl AgentFlow {
30 pub fn new(name: String) -> Self {
31 Self {
32 id: new_id(),
33 name,
34 nodes: Vec::new(),
35 edges: Vec::new(),
36 extensions: FnvIndexMap::default(),
37 }
38 }
39
40 pub fn id(&self) -> &str {
41 &self.id
42 }
43
44 pub fn name(&self) -> &str {
45 &self.name
46 }
47
48 pub fn set_name(&mut self, new_name: String) {
49 self.name = new_name;
50 }
51
52 pub fn nodes(&self) -> &Vec<AgentFlowNode> {
53 &self.nodes
54 }
55
56 pub fn add_node(&mut self, node: AgentFlowNode) {
57 self.nodes.push(node);
58 }
59
60 pub fn remove_node(&mut self, node_id: &str) {
61 self.nodes.retain(|node| node.id != node_id);
62 }
63
64 pub fn set_nodes(&mut self, nodes: Vec<AgentFlowNode>) {
65 self.nodes = nodes;
66 }
67
68 pub fn edges(&self) -> &Vec<AgentFlowEdge> {
69 &self.edges
70 }
71
72 pub fn add_edge(&mut self, edge: AgentFlowEdge) {
73 self.edges.push(edge);
74 }
75
76 pub fn remove_edge(&mut self, edge_id: &str) -> Option<AgentFlowEdge> {
77 if let Some(edge) = self.edges.iter().find(|edge| edge.id == edge_id).cloned() {
78 self.edges.retain(|e| e.id != edge_id);
79 Some(edge)
80 } else {
81 None
82 }
83 }
84
85 pub fn set_edges(&mut self, edges: Vec<AgentFlowEdge>) {
86 self.edges = edges;
87 }
88
89 pub async fn start(&self, askit: &ASKit) -> Result<(), AgentError> {
90 for agent in self.nodes.iter() {
91 if !agent.enabled {
92 continue;
93 }
94 askit.start_agent(&agent.id).await.unwrap_or_else(|e| {
95 log::error!("Failed to start agent {}: {}", agent.id, e);
96 });
97 }
98 Ok(())
99 }
100
101 pub async fn stop(&self, askit: &ASKit) -> Result<(), AgentError> {
102 for agent in self.nodes.iter() {
103 if !agent.enabled {
104 continue;
105 }
106 askit.stop_agent(&agent.id).await.unwrap_or_else(|e| {
107 log::error!("Failed to stop agent {}: {}", agent.id, e);
108 });
109 }
110 Ok(())
111 }
112
113 pub fn disable_all_nodes(&mut self) {
114 for node in self.nodes.iter_mut() {
115 node.enabled = false;
116 }
117 }
118
119 pub fn to_json(&self) -> Result<String, AgentError> {
120 let json = serde_json::to_string_pretty(self)
121 .map_err(|e| AgentError::SerializationError(e.to_string()))?;
122 Ok(json)
123 }
124
125 pub fn from_json(json_str: &str) -> Result<Self, AgentError> {
126 let mut flow: AgentFlow = serde_json::from_str(json_str)
127 .map_err(|e| AgentError::SerializationError(e.to_string()))?;
128 flow.id = new_id();
129 Ok(flow)
130 }
131
132 pub fn from_json_with_defs(
135 json_str: &str,
136 defs: &AgentDefinitions,
137 ) -> Result<Self, AgentError> {
138 match serde_json::from_str::<AgentFlow>(json_str) {
139 Ok(mut flow) => {
140 flow.id = new_id();
141 Ok(flow)
142 }
143 Err(deserialize_err) => {
144 let legacy_json: Value = serde_json::from_str(json_str).map_err(|e| {
145 AgentError::SerializationError(format!("Failed to parse AgentFlow json: {}", e))
146 })?;
147
148 let converted_json = convert_legacy_flow(legacy_json, defs).map_err(|e| {
149 AgentError::SerializationError(format!(
150 "Failed to deserialize AgentFlow ({}); legacy format conversion failed: {}",
151 deserialize_err, e
152 ))
153 })?;
154
155 let mut flow: AgentFlow = serde_json::from_value(converted_json).map_err(|e| {
156 AgentError::SerializationError(format!(
157 "Failed to deserialize converted AgentFlow: {}",
158 e
159 ))
160 })?;
161 flow.id = new_id();
162 Ok(flow)
163 }
164 }
165 }
166}
167
168pub fn copy_sub_flow(
169 nodes: &Vec<AgentFlowNode>,
170 edges: &Vec<AgentFlowEdge>,
171) -> (Vec<AgentFlowNode>, Vec<AgentFlowEdge>) {
172 let mut new_nodes = Vec::new();
173 let mut node_id_map = FnvIndexMap::default();
174 for node in nodes {
175 let new_id = new_id();
176 node_id_map.insert(node.id.clone(), new_id.clone());
177 let mut new_node = node.clone();
178 new_node.id = new_id;
179 new_nodes.push(new_node);
180 }
181
182 let mut new_edges = Vec::new();
183 for edge in edges {
184 let Some(source) = node_id_map.get(&edge.source) else {
185 continue;
186 };
187 let Some(target) = node_id_map.get(&edge.target) else {
188 continue;
189 };
190 let mut new_edge = edge.clone();
191 new_edge.id = new_id();
192 new_edge.source = source.clone();
193 new_edge.target = target.clone();
194 new_edges.push(new_edge);
195 }
196
197 (new_nodes, new_edges)
198}
199
200#[derive(Debug, Serialize, Deserialize, Clone)]
203pub struct AgentFlowNode {
204 pub id: String,
205
206 pub enabled: bool,
207
208 pub spec: AgentSpec,
209
210 #[serde(flatten)]
211 pub extensions: FnvIndexMap<String, Value>,
212}
213
214impl AgentFlowNode {
215 pub fn new(def: &AgentDefinition) -> Result<Self, AgentError> {
216 let spec = def.to_spec();
217
218 Ok(Self {
219 id: new_id(),
220 enabled: false,
221 spec,
222 extensions: FnvIndexMap::default(),
223 })
224 }
225}
226
227static NODE_ID_COUNTER: AtomicUsize = AtomicUsize::new(1);
228
229fn new_id() -> String {
230 return NODE_ID_COUNTER
231 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
232 .to_string();
233}
234
235fn convert_legacy_flow(
236 mut legacy_json: Value,
237 defs: &AgentDefinitions,
238) -> Result<Value, AgentError> {
239 let Some(obj) = legacy_json.as_object_mut() else {
240 return Err(AgentError::SerializationError(
241 "AgentFlow json is not an object".to_string(),
242 ));
243 };
244
245 let Some(nodes_val) = obj.get_mut("nodes") else {
246 return Err(AgentError::SerializationError(
247 "AgentFlow json missing nodes".to_string(),
248 ));
249 };
250
251 let Some(nodes) = nodes_val.as_array_mut() else {
252 return Err(AgentError::SerializationError(
253 "AgentFlow nodes is not an array".to_string(),
254 ));
255 };
256
257 for node in nodes.iter_mut() {
258 convert_legacy_node(node, defs)?;
259 }
260
261 Ok(legacy_json)
262}
263
264fn convert_legacy_node(node_val: &mut Value, defs: &AgentDefinitions) -> Result<(), AgentError> {
265 let Some(node_obj) = node_val.as_object_mut() else {
266 return Err(AgentError::SerializationError(
267 "AgentFlow node is not an object".to_string(),
268 ));
269 };
270
271 if node_obj.contains_key("spec") {
272 return Ok(());
273 }
274
275 let def_name = node_obj
276 .get("def_name")
277 .and_then(|v| v.as_str())
278 .map(|s| s.to_string())
279 .ok_or_else(|| {
280 AgentError::SerializationError("Legacy node missing def_name".to_string())
281 })?;
282
283 let (inputs, outputs, def_display_configs) = defs
284 .get(def_name.as_str())
285 .map(|def| {
286 (
287 def.inputs.clone().unwrap_or_default(),
288 def.outputs.clone().unwrap_or_default(),
289 def.display_configs.clone(),
290 )
291 })
292 .unwrap_or((Vec::new(), Vec::new(), None));
293
294 let configs = node_obj.remove("configs").unwrap_or(Value::Null);
295 let display_configs = node_obj
296 .remove("display_configs")
297 .or_else(|| def_display_configs.and_then(|cfg| serde_json::to_value(cfg).ok()));
298
299 let mut spec_map = Map::new();
300 spec_map.insert("def_name".into(), Value::String(def_name.to_string()));
301 if !inputs.is_empty() {
302 spec_map.insert(
303 "inputs".into(),
304 serde_json::to_value(inputs).map_err(|e| {
305 AgentError::SerializationError(format!(
306 "Failed to serialize inputs for legacy node {}: {}",
307 def_name, e
308 ))
309 })?,
310 );
311 }
312 if !outputs.is_empty() {
313 spec_map.insert(
314 "outputs".into(),
315 serde_json::to_value(outputs).map_err(|e| {
316 AgentError::SerializationError(format!(
317 "Failed to serialize outputs for legacy node {}: {}",
318 def_name, e
319 ))
320 })?,
321 );
322 }
323 if !configs.is_null() {
324 spec_map.insert("configs".into(), configs);
325 }
326 if let Some(display_configs) = display_configs {
327 spec_map.insert("display_configs".into(), display_configs);
328 }
329
330 node_obj.remove("def_name");
331 node_obj.insert("spec".into(), Value::Object(spec_map));
332
333 Ok(())
334}
335
336#[derive(Debug, Default, Serialize, Deserialize, Clone)]
339pub struct AgentFlowEdge {
340 pub id: String,
341 pub source: String,
342 pub source_handle: String,
343 pub target: String,
344 pub target_handle: String,
345}