1use std::any::Any;
2
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5
6use crate::askit::ASKit;
7use crate::config::AgentConfigs;
8use crate::context::AgentContext;
9use crate::definition::{AgentConfigSpecs, AgentDisplayConfigSpecs};
10use crate::error::AgentError;
11use crate::runtime::runtime;
12use crate::value::AgentValue;
13
14#[derive(Debug, Default, Clone, PartialEq)]
15pub enum AgentStatus {
16 #[default]
17 Init,
18 Start,
19 Stop,
20}
21
22pub enum AgentMessage {
23 Input {
24 ctx: AgentContext,
25 pin: String,
26 value: AgentValue,
27 },
28 Config {
29 configs: AgentConfigs,
30 },
31 Stop,
32}
33
34#[async_trait]
36pub trait Agent: Send + Sync + 'static {
37 fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError>
38 where
39 Self: Sized;
40
41 fn askit(&self) -> &ASKit;
42
43 fn id(&self) -> &str;
44
45 fn status(&self) -> &AgentStatus;
46
47 fn spec(&self) -> &AgentSpec;
48
49 fn def_name(&self) -> &str;
50
51 fn configs(&self) -> Result<&AgentConfigs, AgentError>;
52
53 fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError>;
54
55 fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError>;
56
57 fn get_global_configs(&self) -> Option<AgentConfigs> {
58 self.askit().get_global_configs(self.def_name())
59 }
60
61 fn flow_id(&self) -> &str;
62
63 fn set_flow_id(&mut self, flow_id: String);
64
65 async fn start(&mut self) -> Result<(), AgentError>;
66
67 async fn stop(&mut self) -> Result<(), AgentError>;
68
69 async fn process(
70 &mut self,
71 ctx: AgentContext,
72 pin: String,
73 value: AgentValue,
74 ) -> Result<(), AgentError>;
75
76 fn runtime(&self) -> &tokio::runtime::Runtime {
77 runtime()
78 }
79
80 fn as_any(&self) -> &dyn Any;
81
82 fn as_any_mut(&mut self) -> &mut dyn Any;
83}
84
85impl dyn Agent {
86 pub fn as_agent<T: Agent>(&self) -> Option<&T> {
87 self.as_any().downcast_ref::<T>()
88 }
89
90 pub fn as_agent_mut<T: Agent>(&mut self) -> Option<&mut T> {
91 self.as_any_mut().downcast_mut::<T>()
92 }
93}
94
95pub struct AgentData {
97 pub askit: ASKit,
99
100 pub id: String,
102
103 pub spec: AgentSpec,
105
106 pub flow_id: String,
109
110 pub status: AgentStatus,
112}
113
114impl AgentData {
115 pub fn new(askit: ASKit, id: String, spec: AgentSpec) -> Self {
116 Self {
117 askit,
118 id,
119 spec,
120 flow_id: String::new(),
121 status: AgentStatus::Init,
122 }
123 }
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct AgentSpec {
129 pub def_name: String,
131
132 #[serde(skip_serializing_if = "Option::is_none", default)]
134 pub inputs: Option<Vec<String>>,
135
136 #[serde(skip_serializing_if = "Option::is_none", default)]
138 pub outputs: Option<Vec<String>>,
139
140 #[serde(skip_serializing_if = "Option::is_none")]
142 pub configs: Option<AgentConfigs>,
143
144 #[serde(skip_serializing_if = "Option::is_none")]
146 pub config_specs: Option<AgentConfigSpecs>,
147
148 #[serde(skip_serializing_if = "Option::is_none")]
150 pub display_config_specs: Option<AgentDisplayConfigSpecs>,
151}
152
153pub trait HasAgentData {
154 fn data(&self) -> &AgentData;
155
156 fn mut_data(&mut self) -> &mut AgentData;
157}
158
159#[async_trait]
160pub trait AsAgent: HasAgentData + Send + Sync + 'static {
161 fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError>
162 where
163 Self: Sized;
164
165 fn configs_changed(&mut self) -> Result<(), AgentError> {
166 Ok(())
167 }
168
169 async fn start(&mut self) -> Result<(), AgentError> {
170 Ok(())
171 }
172
173 async fn stop(&mut self) -> Result<(), AgentError> {
174 Ok(())
175 }
176
177 async fn process(
178 &mut self,
179 _ctx: AgentContext,
180 _pin: String,
181 _value: AgentValue,
182 ) -> Result<(), AgentError> {
183 Ok(())
184 }
185}
186
187#[async_trait]
188impl<T: AsAgent> Agent for T {
189 fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError> {
190 let mut agent = T::new(askit, id, spec)?;
191 agent.mut_data().status = AgentStatus::Init;
192 Ok(agent)
193 }
194
195 fn askit(&self) -> &ASKit {
196 &self.data().askit
197 }
198
199 fn id(&self) -> &str {
200 &self.data().id
201 }
202
203 fn spec(&self) -> &AgentSpec {
204 &self.data().spec
205 }
206
207 fn status(&self) -> &AgentStatus {
208 &self.data().status
209 }
210
211 fn def_name(&self) -> &str {
212 self.data().spec.def_name.as_str()
213 }
214
215 fn configs(&self) -> Result<&AgentConfigs, AgentError> {
216 self.data()
217 .spec
218 .configs
219 .as_ref()
220 .ok_or(AgentError::NoConfig)
221 }
222
223 fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError> {
224 if let Some(configs) = &mut self.mut_data().spec.configs {
225 configs.set(key.clone(), value.clone());
226 self.configs_changed()?;
227 }
228 Ok(())
229 }
230
231 fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError> {
232 self.mut_data().spec.configs = Some(configs);
233 self.configs_changed()
234 }
235
236 fn flow_id(&self) -> &str {
237 &self.data().flow_id
238 }
239
240 fn set_flow_id(&mut self, flow_id: String) {
241 self.mut_data().flow_id = flow_id.clone();
242 }
243
244 async fn start(&mut self) -> Result<(), AgentError> {
245 self.mut_data().status = AgentStatus::Start;
246
247 if let Err(e) = self.start().await {
248 self.askit()
249 .emit_agent_error(self.id().to_string(), e.to_string());
250 return Err(e);
251 }
252
253 Ok(())
254 }
255
256 async fn stop(&mut self) -> Result<(), AgentError> {
257 self.mut_data().status = AgentStatus::Stop;
258 self.stop().await?;
259 self.mut_data().status = AgentStatus::Init;
260 Ok(())
261 }
262
263 async fn process(
264 &mut self,
265 ctx: AgentContext,
266 pin: String,
267 value: AgentValue,
268 ) -> Result<(), AgentError> {
269 if let Err(e) = self.process(ctx, pin, value).await {
270 self.askit()
271 .emit_agent_error(self.id().to_string(), e.to_string());
272 return Err(e);
273 }
274 Ok(())
275 }
276
277 fn get_global_configs(&self) -> Option<AgentConfigs> {
278 self.askit().get_global_configs(self.def_name())
279 }
280
281 fn as_any(&self) -> &dyn Any {
282 self
283 }
284
285 fn as_any_mut(&mut self) -> &mut dyn Any {
286 self
287 }
288}
289
290pub fn new_agent_boxed<T: Agent>(
291 askit: ASKit,
292 id: String,
293 spec: AgentSpec,
294) -> Result<Box<dyn Agent>, AgentError> {
295 Ok(Box::new(T::new(askit, id, spec)?))
296}
297
298pub fn agent_new(
299 askit: ASKit,
300 agent_id: String,
301 spec: AgentSpec,
302) -> Result<Box<dyn Agent>, AgentError> {
303 let def;
304 {
305 let def_name = &spec.def_name;
306 let defs = askit.defs.lock().unwrap();
307 def = defs
308 .get(def_name)
309 .ok_or_else(|| AgentError::UnknownDefName(def_name.to_string()))?
310 .clone();
311 }
312
313 if let Some(new_boxed) = def.new_boxed {
314 return new_boxed(askit, agent_id, spec);
315 }
316
317 match def.kind.as_str() {
318 _ => return Err(AgentError::UnknownDefKind(def.kind.to_string()).into()),
327 }
328}