1use std::ops::Not;
2use std::sync::atomic::AtomicUsize;
3
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6
7use crate::FnvIndexMap;
8use crate::askit::ASKit;
9use crate::config::AgentConfigs;
10use crate::definition::{AgentConfigSpecs, AgentDefinition};
11use crate::error::AgentError;
12
13pub type AgentStreams = FnvIndexMap<String, AgentStream>;
14
15#[derive(Clone, Debug, Serialize, Deserialize)]
16pub struct AgentStream {
17 #[serde(skip_serializing_if = "String::is_empty")]
18 id: String,
19
20 name: String,
21
22 agents: Vec<AgentSpec>,
23
24 channels: Vec<ChannelSpec>,
25
26 #[serde(flatten)]
27 pub extensions: FnvIndexMap<String, Value>,
28}
29
30impl AgentStream {
31 pub fn new(name: String) -> Self {
32 Self {
33 id: new_id(),
34 name,
35 agents: Vec::new(),
36 channels: Vec::new(),
37 extensions: FnvIndexMap::default(),
38 }
39 }
40
41 pub fn id(&self) -> &str {
42 &self.id
43 }
44
45 pub fn name(&self) -> &str {
46 &self.name
47 }
48
49 pub fn set_name(&mut self, new_name: String) {
50 self.name = new_name;
51 }
52
53 pub fn agents(&self) -> &Vec<AgentSpec> {
54 &self.agents
55 }
56
57 pub fn add_agent(&mut self, agent: AgentSpec) {
58 self.agents.push(agent);
59 }
60
61 pub fn remove_agent(&mut self, agent_id: &str) {
62 self.agents.retain(|agent| agent.id != agent_id);
63 }
64
65 pub fn set_agents(&mut self, agents: Vec<AgentSpec>) {
66 self.agents = agents;
67 }
68
69 pub fn channels(&self) -> &Vec<ChannelSpec> {
70 &self.channels
71 }
72
73 pub fn add_channels(&mut self, channel: ChannelSpec) {
74 self.channels.push(channel);
75 }
76
77 pub fn remove_channel(&mut self, channel_id: &str) -> Option<ChannelSpec> {
78 if let Some(channel) = self
79 .channels
80 .iter()
81 .find(|channel| channel.id == channel_id)
82 .cloned()
83 {
84 self.channels.retain(|e| e.id != channel_id);
85 Some(channel)
86 } else {
87 None
88 }
89 }
90
91 pub fn set_channels(&mut self, channels: Vec<ChannelSpec>) {
92 self.channels = channels;
93 }
94
95 pub async fn start(&self, askit: &ASKit) -> Result<(), AgentError> {
96 for agent in self.agents.iter() {
97 if !agent.enabled {
98 continue;
99 }
100 askit.start_agent(&agent.id).await.unwrap_or_else(|e| {
101 log::error!("Failed to start agent {}: {}", agent.id, e);
102 });
103 }
104 Ok(())
105 }
106
107 pub async fn stop(&self, askit: &ASKit) -> Result<(), AgentError> {
108 for agent in self.agents.iter() {
109 if !agent.enabled {
110 continue;
111 }
112 askit.stop_agent(&agent.id).await.unwrap_or_else(|e| {
113 log::error!("Failed to stop agent {}: {}", agent.id, e);
114 });
115 }
116 Ok(())
117 }
118
119 pub fn disable_all_nodes(&mut self) {
120 for node in self.agents.iter_mut() {
121 node.enabled = false;
122 }
123 }
124
125 pub fn to_json(&self) -> Result<String, AgentError> {
126 let json = serde_json::to_string_pretty(self)
127 .map_err(|e| AgentError::SerializationError(e.to_string()))?;
128 Ok(json)
129 }
130
131 pub fn from_json(json_str: &str) -> Result<Self, AgentError> {
132 let mut stream: AgentStream = serde_json::from_str(json_str)
133 .map_err(|e| AgentError::SerializationError(e.to_string()))?;
134 stream.id = new_id();
135 Ok(stream)
136 }
137}
138
139pub fn copy_sub_stream(
140 agents: &Vec<AgentSpec>,
141 channels: &Vec<ChannelSpec>,
142) -> (Vec<AgentSpec>, Vec<ChannelSpec>) {
143 let mut new_agents = Vec::new();
144 let mut agent_id_map = FnvIndexMap::default();
145 for agent in agents {
146 let new_id = new_id();
147 agent_id_map.insert(agent.id.clone(), new_id.clone());
148 let mut new_agent = agent.clone();
149 new_agent.id = new_id;
150 new_agents.push(new_agent);
151 }
152
153 let mut new_channels = Vec::new();
154 for channel in channels {
155 let Some(source) = agent_id_map.get(&channel.source) else {
156 continue;
157 };
158 let Some(target) = agent_id_map.get(&channel.target) else {
159 continue;
160 };
161 let mut new_channel = channel.clone();
162 new_channel.id = new_id();
163 new_channel.source = source.clone();
164 new_channel.target = target.clone();
165 new_channels.push(new_channel);
166 }
167
168 (new_agents, new_channels)
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct AgentSpec {
174 #[serde(skip_serializing_if = "String::is_empty", default)]
175 pub id: String,
176
177 #[serde(skip_serializing_if = "String::is_empty", default)]
179 pub def_name: String,
180
181 #[serde(skip_serializing_if = "Option::is_none", default)]
183 pub inputs: Option<Vec<String>>,
184
185 #[serde(skip_serializing_if = "Option::is_none", default)]
187 pub outputs: Option<Vec<String>>,
188
189 #[serde(skip_serializing_if = "Option::is_none")]
191 pub configs: Option<AgentConfigs>,
192
193 #[serde(skip_serializing_if = "Option::is_none")]
195 pub config_specs: Option<AgentConfigSpecs>,
196
197 #[serde(default, skip_serializing_if = "<&bool>::not")]
198 pub enabled: bool,
199
200 #[serde(flatten)]
201 pub extensions: FnvIndexMap<String, serde_json::Value>,
202}
203
204impl AgentSpec {
205 pub fn from_def(def: &AgentDefinition) -> Self {
206 let mut spec = def.to_spec();
207 spec.id = new_id();
208 spec
209 }
210}
211
212static ID_COUNTER: AtomicUsize = AtomicUsize::new(1);
213
214fn new_id() -> String {
215 return ID_COUNTER
216 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
217 .to_string();
218}
219
220#[derive(Debug, Default, Serialize, Deserialize, Clone)]
223pub struct ChannelSpec {
224 pub id: String,
225 pub source: String,
226 pub source_handle: String,
227 pub target: String,
228 pub target_handle: String,
229}