agent_stream_kit/
spec.rs

1use std::ops::Not;
2use std::sync::atomic::AtomicUsize;
3
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6
7use crate::FnvIndexMap;
8use crate::askit::ASKit;
9use crate::config::AgentConfigs;
10use crate::definition::{AgentConfigSpecs, AgentDefinition};
11use crate::error::AgentError;
12
13pub type AgentStreams = FnvIndexMap<String, AgentStream>;
14
15#[derive(Clone, Debug, Serialize, Deserialize)]
16pub struct AgentStream {
17    #[serde(skip_serializing_if = "String::is_empty")]
18    id: String,
19
20    name: String,
21
22    agents: Vec<AgentSpec>,
23
24    channels: Vec<ChannelSpec>,
25
26    #[serde(flatten)]
27    pub extensions: FnvIndexMap<String, Value>,
28}
29
30impl AgentStream {
31    pub fn new(name: String) -> Self {
32        Self {
33            id: new_id(),
34            name,
35            agents: Vec::new(),
36            channels: Vec::new(),
37            extensions: FnvIndexMap::default(),
38        }
39    }
40
41    pub fn id(&self) -> &str {
42        &self.id
43    }
44
45    pub fn name(&self) -> &str {
46        &self.name
47    }
48
49    pub fn set_name(&mut self, new_name: String) {
50        self.name = new_name;
51    }
52
53    pub fn agents(&self) -> &Vec<AgentSpec> {
54        &self.agents
55    }
56
57    pub fn add_agent(&mut self, agent: AgentSpec) {
58        self.agents.push(agent);
59    }
60
61    pub fn remove_agent(&mut self, agent_id: &str) {
62        self.agents.retain(|agent| agent.id != agent_id);
63    }
64
65    pub fn set_agents(&mut self, agents: Vec<AgentSpec>) {
66        self.agents = agents;
67    }
68
69    pub fn channels(&self) -> &Vec<ChannelSpec> {
70        &self.channels
71    }
72
73    pub fn add_channels(&mut self, channel: ChannelSpec) {
74        self.channels.push(channel);
75    }
76
77    pub fn remove_channel(&mut self, channel_id: &str) -> Option<ChannelSpec> {
78        if let Some(channel) = self
79            .channels
80            .iter()
81            .find(|channel| channel.id == channel_id)
82            .cloned()
83        {
84            self.channels.retain(|e| e.id != channel_id);
85            Some(channel)
86        } else {
87            None
88        }
89    }
90
91    pub fn set_channels(&mut self, channels: Vec<ChannelSpec>) {
92        self.channels = channels;
93    }
94
95    pub async fn start(&self, askit: &ASKit) -> Result<(), AgentError> {
96        for agent in self.agents.iter() {
97            if !agent.enabled {
98                continue;
99            }
100            askit.start_agent(&agent.id).await.unwrap_or_else(|e| {
101                log::error!("Failed to start agent {}: {}", agent.id, e);
102            });
103        }
104        Ok(())
105    }
106
107    pub async fn stop(&self, askit: &ASKit) -> Result<(), AgentError> {
108        for agent in self.agents.iter() {
109            if !agent.enabled {
110                continue;
111            }
112            askit.stop_agent(&agent.id).await.unwrap_or_else(|e| {
113                log::error!("Failed to stop agent {}: {}", agent.id, e);
114            });
115        }
116        Ok(())
117    }
118
119    pub fn disable_all_nodes(&mut self) {
120        for node in self.agents.iter_mut() {
121            node.enabled = false;
122        }
123    }
124
125    pub fn to_json(&self) -> Result<String, AgentError> {
126        let json = serde_json::to_string_pretty(self)
127            .map_err(|e| AgentError::SerializationError(e.to_string()))?;
128        Ok(json)
129    }
130
131    pub fn from_json(json_str: &str) -> Result<Self, AgentError> {
132        let mut stream: AgentStream = serde_json::from_str(json_str)
133            .map_err(|e| AgentError::SerializationError(e.to_string()))?;
134        stream.id = new_id();
135        Ok(stream)
136    }
137}
138
139pub fn copy_sub_stream(
140    agents: &Vec<AgentSpec>,
141    channels: &Vec<ChannelSpec>,
142) -> (Vec<AgentSpec>, Vec<ChannelSpec>) {
143    let mut new_agents = Vec::new();
144    let mut agent_id_map = FnvIndexMap::default();
145    for agent in agents {
146        let new_id = new_id();
147        agent_id_map.insert(agent.id.clone(), new_id.clone());
148        let mut new_agent = agent.clone();
149        new_agent.id = new_id;
150        new_agents.push(new_agent);
151    }
152
153    let mut new_channels = Vec::new();
154    for channel in channels {
155        let Some(source) = agent_id_map.get(&channel.source) else {
156            continue;
157        };
158        let Some(target) = agent_id_map.get(&channel.target) else {
159            continue;
160        };
161        let mut new_channel = channel.clone();
162        new_channel.id = new_id();
163        new_channel.source = source.clone();
164        new_channel.target = target.clone();
165        new_channels.push(new_channel);
166    }
167
168    (new_agents, new_channels)
169}
170
171/// Information held by each agent.
172#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct AgentSpec {
174    #[serde(skip_serializing_if = "String::is_empty", default)]
175    pub id: String,
176
177    /// Name of the AgentDefinition.
178    #[serde(skip_serializing_if = "String::is_empty", default)]
179    pub def_name: String,
180
181    /// List of input pin names.
182    #[serde(skip_serializing_if = "Option::is_none", default)]
183    pub inputs: Option<Vec<String>>,
184
185    /// List of output pin names.
186    #[serde(skip_serializing_if = "Option::is_none", default)]
187    pub outputs: Option<Vec<String>>,
188
189    /// Config values.
190    #[serde(skip_serializing_if = "Option::is_none")]
191    pub configs: Option<AgentConfigs>,
192
193    /// Config specs.
194    #[serde(skip_serializing_if = "Option::is_none")]
195    pub config_specs: Option<AgentConfigSpecs>,
196
197    #[serde(default, skip_serializing_if = "<&bool>::not")]
198    pub enabled: bool,
199
200    #[serde(flatten)]
201    pub extensions: FnvIndexMap<String, serde_json::Value>,
202}
203
204impl AgentSpec {
205    pub fn from_def(def: &AgentDefinition) -> Self {
206        let mut spec = def.to_spec();
207        spec.id = new_id();
208        spec
209    }
210}
211
212static ID_COUNTER: AtomicUsize = AtomicUsize::new(1);
213
214fn new_id() -> String {
215    return ID_COUNTER
216        .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
217        .to_string();
218}
219
220// ChannelSpec
221
222#[derive(Debug, Default, Serialize, Deserialize, Clone)]
223pub struct ChannelSpec {
224    pub id: String,
225    pub source: String,
226    pub source_handle: String,
227    pub target: String,
228    pub target_handle: String,
229}