1use std::collections::BTreeMap;
2
3use async_trait::async_trait;
4
5use super::askit::ASKit;
6use super::config::AgentConfigs;
7use super::context::AgentContext;
8use super::error::AgentError;
9use super::runtime::runtime;
10use super::value::AgentValue;
11
12#[derive(Debug, Default, Clone, PartialEq)]
13pub enum AgentStatus {
14 #[default]
15 Init,
16 Start,
17 Stop,
18}
19
20pub enum AgentMessage {
21 Input {
22 ctx: AgentContext,
23 pin: String,
24 value: AgentValue,
25 },
26 Config {
27 configs: AgentConfigs,
28 },
29 Stop,
30}
31
32pub struct Pin {
33 pub name: String,
34 pub value: AgentValue,
35}
36
37#[async_trait]
38pub trait Agent {
39 fn new(
40 askit: ASKit,
41 id: String,
42 def_name: String,
43 configs: Option<AgentConfigs>,
44 ) -> Result<Self, AgentError>
45 where
46 Self: Sized;
47
48 fn askit(&self) -> &ASKit;
49
50 fn id(&self) -> &str;
51
52 fn status(&self) -> &AgentStatus;
53
54 fn def_name(&self) -> &str;
55
56 fn out_pin(&self, name: &str) -> Option<&Pin>;
57
58 fn set_out_pin(&mut self, name: String, value: AgentValue);
59
60 fn configs(&self) -> Result<&AgentConfigs, AgentError>;
61
62 fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError>;
63
64 fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError>;
65
66 fn get_global_configs(&self) -> Option<AgentConfigs> {
67 self.askit().get_global_configs(self.def_name())
68 }
69
70 fn flow_name(&self) -> &str;
71
72 fn set_flow_name(&mut self, flow_name: String);
73
74 async fn start(&mut self) -> Result<(), AgentError>;
75
76 async fn stop(&mut self) -> Result<(), AgentError>;
77
78 async fn process(
79 &mut self,
80 ctx: AgentContext,
81 pin: String,
82 value: AgentValue,
83 ) -> Result<(), AgentError>;
84
85 fn runtime(&self) -> &tokio::runtime::Runtime {
86 runtime()
87 }
88}
89
90pub struct AsAgentData {
91 pub askit: ASKit,
92
93 pub id: String,
94 pub status: AgentStatus,
95 pub def_name: String,
96 pub flow_name: String,
97 pub out_pins: Option<BTreeMap<String, Pin>>,
98 pub configs: Option<AgentConfigs>,
99}
100
101impl AsAgentData {
102 pub fn new(askit: ASKit, id: String, def_name: String, configs: Option<AgentConfigs>) -> Self {
103 Self {
104 askit,
105 id,
106 status: AgentStatus::Init,
107 def_name,
108 flow_name: String::new(),
109 out_pins: None,
110 configs,
111 }
112 }
113}
114
115pub trait HasAgentData {
116 fn data(&self) -> &AsAgentData;
117
118 fn mut_data(&mut self) -> &mut AsAgentData;
119}
120
121#[async_trait]
122pub trait AsAgent: HasAgentData {
123 fn new(
124 askit: ASKit,
125 id: String,
126 def_name: String,
127 configs: Option<AgentConfigs>,
128 ) -> Result<Self, AgentError>
129 where
130 Self: Sized + Send + Sync;
131
132 fn configs_changed(&mut self) -> Result<(), AgentError> {
133 Ok(())
134 }
135
136 async fn start(&mut self) -> Result<(), AgentError> {
137 Ok(())
138 }
139
140 async fn stop(&mut self) -> Result<(), AgentError> {
141 Ok(())
142 }
143
144 async fn process(
145 &mut self,
146 _ctx: AgentContext,
147 _pin: String,
148 _value: AgentValue,
149 ) -> Result<(), AgentError> {
150 Ok(())
151 }
152}
153
154#[async_trait]
155impl<T: AsAgent + Send + Sync> Agent for T {
156 fn new(
157 askit: ASKit,
158 id: String,
159 def_name: String,
160 configs: Option<AgentConfigs>,
161 ) -> Result<Self, AgentError> {
162 let mut agent = T::new(askit, id, def_name, configs)?;
163 agent.mut_data().status = AgentStatus::Init;
164 Ok(agent)
165 }
166
167 fn askit(&self) -> &ASKit {
168 &self.data().askit
169 }
170
171 fn id(&self) -> &str {
172 &self.data().id
173 }
174
175 fn status(&self) -> &AgentStatus {
176 &self.data().status
177 }
178
179 fn def_name(&self) -> &str {
180 self.data().def_name.as_str()
181 }
182
183 fn out_pin(&self, name: &str) -> Option<&Pin> {
184 if let Some(out_pins) = &self.data().out_pins {
185 return out_pins.get(name);
186 }
187 None
188 }
189
190 fn set_out_pin(&mut self, name: String, value: AgentValue) {
191 if let Some(out_pins) = &mut self.mut_data().out_pins {
192 out_pins.insert(name.clone(), Pin { name, value });
193 } else {
194 let mut out_pins = BTreeMap::new();
195 out_pins.insert(name.clone(), Pin { name, value });
196 self.mut_data().out_pins = Some(out_pins);
197 }
198 }
199
200 fn configs(&self) -> Result<&AgentConfigs, AgentError> {
201 self.data().configs.as_ref().ok_or(AgentError::NoConfig)
202 }
203
204 fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError> {
205 if let Some(configs) = &mut self.mut_data().configs {
206 configs.set(key.clone(), value.clone());
207 self.configs_changed()?;
208 }
209 Ok(())
210 }
211
212 fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError> {
213 self.mut_data().configs = Some(configs);
214 self.configs_changed()
215 }
216
217 fn flow_name(&self) -> &str {
218 &self.data().flow_name
219 }
220
221 fn set_flow_name(&mut self, flow_name: String) {
222 self.mut_data().flow_name = flow_name.clone();
223 }
224
225 async fn start(&mut self) -> Result<(), AgentError> {
226 self.mut_data().status = AgentStatus::Start;
227
228 if let Err(e) = self.start().await {
229 self.askit()
230 .emit_agent_error(self.id().to_string(), e.to_string());
231 return Err(e);
232 }
233
234 Ok(())
235 }
236
237 async fn stop(&mut self) -> Result<(), AgentError> {
238 self.mut_data().status = AgentStatus::Stop;
239 self.stop().await?;
240 self.mut_data().status = AgentStatus::Init;
241 Ok(())
242 }
243
244 async fn process(
245 &mut self,
246 ctx: AgentContext,
247 pin: String,
248 value: AgentValue,
249 ) -> Result<(), AgentError> {
250 if let Err(e) = self.process(ctx, pin, value).await {
251 self.askit()
252 .emit_agent_error(self.id().to_string(), e.to_string());
253 return Err(e);
254 }
255 Ok(())
256 }
257
258 fn get_global_configs(&self) -> Option<AgentConfigs> {
259 self.askit().get_global_configs(self.def_name())
260 }
261}
262
263pub fn new_agent_boxed<T: Agent + Send + Sync + 'static>(
264 askit: ASKit,
265 id: String,
266 def_name: String,
267 configs: Option<AgentConfigs>,
268) -> Result<Box<dyn Agent + Send + Sync>, AgentError> {
269 Ok(Box::new(T::new(askit, id, def_name, configs)?))
270}
271
272pub fn agent_new(
273 askit: ASKit,
274 agent_id: String,
275 def_name: &str,
276 configs: Option<AgentConfigs>,
277) -> Result<Box<dyn Agent + Send + Sync>, AgentError> {
278 let def;
279 {
280 let defs = askit.defs.lock().unwrap();
281 def = defs
282 .get(def_name)
283 .ok_or_else(|| AgentError::UnknownDefName(def_name.to_string()))?
284 .clone();
285 }
286
287 let default_config = def.default_configs.clone();
288 let configs = match (default_config, configs) {
289 (Some(def_cfg), Some(mut cfg)) => {
290 for (k, v) in def_cfg.iter() {
291 if !cfg.contains_key(k) {
292 cfg.set(k.clone(), v.value.clone());
293 }
294 }
295 Some(cfg)
296 }
297 (Some(def_cfg), None) => {
298 let mut cfg = AgentConfigs::default();
299 for (k, v) in def_cfg.iter() {
300 cfg.set(k.clone(), v.value.clone());
301 }
302 Some(cfg)
303 }
304 (None, Some(cfg)) => Some(cfg),
305 (None, None) => None,
306 };
307
308 if let Some(new_boxed) = def.new_boxed {
309 return new_boxed(askit, agent_id, def_name.to_string(), configs);
310 }
311
312 match def.kind.as_str() {
313 _ => return Err(AgentError::UnknownDefKind(def.kind.to_string()).into()),
322 }
323}