1use std::any::Any;
2
3use async_trait::async_trait;
4
5use super::askit::ASKit;
6use super::config::AgentConfigs;
7use super::context::AgentContext;
8use super::error::AgentError;
9use super::runtime::runtime;
10use super::value::AgentValue;
11
12#[derive(Debug, Default, Clone, PartialEq)]
13pub enum AgentStatus {
14 #[default]
15 Init,
16 Start,
17 Stop,
18}
19
20pub enum AgentMessage {
21 Input {
22 ctx: AgentContext,
23 pin: String,
24 value: AgentValue,
25 },
26 Config {
27 configs: AgentConfigs,
28 },
29 Stop,
30}
31
32#[async_trait]
33pub trait Agent: Send + Sync + 'static {
34 fn new(
35 askit: ASKit,
36 id: String,
37 def_name: String,
38 configs: Option<AgentConfigs>,
39 ) -> Result<Self, AgentError>
40 where
41 Self: Sized;
42
43 fn askit(&self) -> &ASKit;
44
45 fn id(&self) -> &str;
46
47 fn status(&self) -> &AgentStatus;
48
49 fn def_name(&self) -> &str;
50
51 fn configs(&self) -> Result<&AgentConfigs, AgentError>;
52
53 fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError>;
54
55 fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError>;
56
57 fn get_global_configs(&self) -> Option<AgentConfigs> {
58 self.askit().get_global_configs(self.def_name())
59 }
60
61 fn flow_id(&self) -> &str;
62
63 fn set_flow_id(&mut self, flow_id: String);
64
65 async fn start(&mut self) -> Result<(), AgentError>;
66
67 async fn stop(&mut self) -> Result<(), AgentError>;
68
69 async fn process(
70 &mut self,
71 ctx: AgentContext,
72 pin: String,
73 value: AgentValue,
74 ) -> Result<(), AgentError>;
75
76 fn runtime(&self) -> &tokio::runtime::Runtime {
77 runtime()
78 }
79
80 fn as_any(&self) -> &dyn Any;
81
82 fn as_any_mut(&mut self) -> &mut dyn Any;
83}
84
85impl dyn Agent {
86 pub fn as_agent<T: Agent>(&self) -> Option<&T> {
87 self.as_any().downcast_ref::<T>()
88 }
89
90 pub fn as_agent_mut<T: Agent>(&mut self) -> Option<&mut T> {
91 self.as_any_mut().downcast_mut::<T>()
92 }
93}
94
95pub struct AgentData {
96 pub askit: ASKit,
97
98 pub id: String,
99 pub status: AgentStatus,
100 pub def_name: String,
101 pub flow_id: String,
102 pub configs: Option<AgentConfigs>,
103}
104
105impl AgentData {
106 pub fn new(askit: ASKit, id: String, def_name: String, configs: Option<AgentConfigs>) -> Self {
107 Self {
108 askit,
109 id,
110 status: AgentStatus::Init,
111 def_name,
112 flow_id: String::new(),
113 configs,
114 }
115 }
116}
117
118pub trait HasAgentData {
119 fn data(&self) -> &AgentData;
120
121 fn mut_data(&mut self) -> &mut AgentData;
122}
123
124#[async_trait]
125pub trait AsAgent: HasAgentData + Send + Sync + 'static {
126 fn new(
127 askit: ASKit,
128 id: String,
129 def_name: String,
130 configs: Option<AgentConfigs>,
131 ) -> Result<Self, AgentError>
132 where
133 Self: Sized;
134
135 fn configs_changed(&mut self) -> Result<(), AgentError> {
136 Ok(())
137 }
138
139 async fn start(&mut self) -> Result<(), AgentError> {
140 Ok(())
141 }
142
143 async fn stop(&mut self) -> Result<(), AgentError> {
144 Ok(())
145 }
146
147 async fn process(
148 &mut self,
149 _ctx: AgentContext,
150 _pin: String,
151 _value: AgentValue,
152 ) -> Result<(), AgentError> {
153 Ok(())
154 }
155}
156
157#[async_trait]
158impl<T: AsAgent> Agent for T {
159 fn new(
160 askit: ASKit,
161 id: String,
162 def_name: String,
163 configs: Option<AgentConfigs>,
164 ) -> Result<Self, AgentError> {
165 let mut agent = T::new(askit, id, def_name, configs)?;
166 agent.mut_data().status = AgentStatus::Init;
167 Ok(agent)
168 }
169
170 fn askit(&self) -> &ASKit {
171 &self.data().askit
172 }
173
174 fn id(&self) -> &str {
175 &self.data().id
176 }
177
178 fn status(&self) -> &AgentStatus {
179 &self.data().status
180 }
181
182 fn def_name(&self) -> &str {
183 self.data().def_name.as_str()
184 }
185
186 fn configs(&self) -> Result<&AgentConfigs, AgentError> {
187 self.data().configs.as_ref().ok_or(AgentError::NoConfig)
188 }
189
190 fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError> {
191 if let Some(configs) = &mut self.mut_data().configs {
192 configs.set(key.clone(), value.clone());
193 self.configs_changed()?;
194 }
195 Ok(())
196 }
197
198 fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError> {
199 self.mut_data().configs = Some(configs);
200 self.configs_changed()
201 }
202
203 fn flow_id(&self) -> &str {
204 &self.data().flow_id
205 }
206
207 fn set_flow_id(&mut self, flow_id: String) {
208 self.mut_data().flow_id = flow_id.clone();
209 }
210
211 async fn start(&mut self) -> Result<(), AgentError> {
212 self.mut_data().status = AgentStatus::Start;
213
214 if let Err(e) = self.start().await {
215 self.askit()
216 .emit_agent_error(self.id().to_string(), e.to_string());
217 return Err(e);
218 }
219
220 Ok(())
221 }
222
223 async fn stop(&mut self) -> Result<(), AgentError> {
224 self.mut_data().status = AgentStatus::Stop;
225 self.stop().await?;
226 self.mut_data().status = AgentStatus::Init;
227 Ok(())
228 }
229
230 async fn process(
231 &mut self,
232 ctx: AgentContext,
233 pin: String,
234 value: AgentValue,
235 ) -> Result<(), AgentError> {
236 if let Err(e) = self.process(ctx, pin, value).await {
237 self.askit()
238 .emit_agent_error(self.id().to_string(), e.to_string());
239 return Err(e);
240 }
241 Ok(())
242 }
243
244 fn get_global_configs(&self) -> Option<AgentConfigs> {
245 self.askit().get_global_configs(self.def_name())
246 }
247
248 fn as_any(&self) -> &dyn Any {
249 self
250 }
251
252 fn as_any_mut(&mut self) -> &mut dyn Any {
253 self
254 }
255}
256
257pub fn new_agent_boxed<T: Agent>(
258 askit: ASKit,
259 id: String,
260 def_name: String,
261 configs: Option<AgentConfigs>,
262) -> Result<Box<dyn Agent>, AgentError> {
263 Ok(Box::new(T::new(askit, id, def_name, configs)?))
264}
265
266pub fn agent_new(
267 askit: ASKit,
268 agent_id: String,
269 def_name: &str,
270 configs: Option<AgentConfigs>,
271) -> Result<Box<dyn Agent>, AgentError> {
272 let def;
273 {
274 let defs = askit.defs.lock().unwrap();
275 def = defs
276 .get(def_name)
277 .ok_or_else(|| AgentError::UnknownDefName(def_name.to_string()))?
278 .clone();
279 }
280
281 let default_config = def.default_configs.clone();
282 let configs = match (default_config, configs) {
283 (Some(def_cfg), Some(mut cfg)) => {
284 for (k, v) in def_cfg.iter() {
285 if !cfg.contains_key(k) {
286 cfg.set(k.clone(), v.value.clone());
287 }
288 }
289 Some(cfg)
290 }
291 (Some(def_cfg), None) => {
292 let mut cfg = AgentConfigs::default();
293 for (k, v) in def_cfg.iter() {
294 cfg.set(k.clone(), v.value.clone());
295 }
296 Some(cfg)
297 }
298 (None, Some(cfg)) => Some(cfg),
299 (None, None) => None,
300 };
301
302 if let Some(new_boxed) = def.new_boxed {
303 return new_boxed(askit, agent_id, def_name.to_string(), configs);
304 }
305
306 match def.kind.as_str() {
307 _ => return Err(AgentError::UnknownDefKind(def.kind.to_string()).into()),
316 }
317}