1use std::collections::BTreeMap;
2
3use async_trait::async_trait;
4
5use super::askit::ASKit;
6use super::config::AgentConfigs;
7use super::context::AgentContext;
8use super::error::AgentError;
9use super::runtime::runtime;
10use super::value::AgentValue;
11
12#[derive(Debug, Default, Clone, PartialEq)]
13pub enum AgentStatus {
14 #[default]
15 Init,
16 Start,
17 Stop,
18}
19
20pub enum AgentMessage {
21 Input {
22 ctx: AgentContext,
23 pin: String,
24 value: AgentValue,
25 },
26 Config {
27 configs: AgentConfigs,
28 },
29 Stop,
30}
31
32pub struct Pin {
33 pub name: String,
34 pub value: AgentValue,
35}
36
37#[async_trait]
38pub trait Agent {
39 fn new(
40 askit: ASKit,
41 id: String,
42 def_name: String,
43 configs: Option<AgentConfigs>,
44 ) -> Result<Self, AgentError>
45 where
46 Self: Sized;
47
48 fn askit(&self) -> &ASKit;
49
50 fn id(&self) -> &str;
51
52 fn status(&self) -> &AgentStatus;
53
54 fn def_name(&self) -> &str;
55
56 fn out_pin(&self, name: &str) -> Option<&Pin>;
57
58 fn set_out_pin(&mut self, name: String, value: AgentValue);
59
60 fn configs(&self) -> Result<&AgentConfigs, AgentError>;
61
62 fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError>;
63
64 fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError>;
65
66 fn get_global_configs(&self) -> Option<AgentConfigs> {
67 self.askit().get_global_configs(self.def_name())
68 }
69
70 fn flow_name(&self) -> &str;
71
72 fn set_flow_name(&mut self, flow_name: String);
73
74 async fn start(&mut self) -> Result<(), AgentError>;
75
76 async fn stop(&mut self) -> Result<(), AgentError>;
77
78 async fn process(
79 &mut self,
80 ctx: AgentContext,
81 pin: String,
82 value: AgentValue,
83 ) -> Result<(), AgentError>;
84
85 fn runtime(&self) -> &tokio::runtime::Runtime {
86 runtime()
87 }
88}
89
90pub struct AsAgentData {
91 pub askit: ASKit,
92
93 pub id: String,
94 pub status: AgentStatus,
95 pub def_name: String,
96 pub flow_name: String,
97 pub out_pins: Option<BTreeMap<String, Pin>>,
98 pub configs: Option<AgentConfigs>,
99}
100
101impl AsAgentData {
102 pub fn new(askit: ASKit, id: String, def_name: String, configs: Option<AgentConfigs>) -> Self {
103 Self {
104 askit,
105 id,
106 status: AgentStatus::Init,
107 def_name,
108 flow_name: String::new(),
109 out_pins: None,
110 configs,
111 }
112 }
113}
114
115#[async_trait]
116pub trait AsAgent {
117 fn new(
118 askit: ASKit,
119 id: String,
120 def_name: String,
121 configs: Option<AgentConfigs>,
122 ) -> Result<Self, AgentError>
123 where
124 Self: Sized + Send + Sync;
125
126 fn data(&self) -> &AsAgentData;
127
128 fn mut_data(&mut self) -> &mut AsAgentData;
129
130 fn configs_changed(&mut self) -> Result<(), AgentError> {
131 Ok(())
132 }
133
134 async fn start(&mut self) -> Result<(), AgentError> {
135 Ok(())
136 }
137
138 async fn stop(&mut self) -> Result<(), AgentError> {
139 Ok(())
140 }
141
142 async fn process(
143 &mut self,
144 _ctx: AgentContext,
145 _pin: String,
146 _value: AgentValue,
147 ) -> Result<(), AgentError> {
148 Ok(())
149 }
150}
151
152#[async_trait]
153impl<T: AsAgent + Send + Sync> Agent for T {
154 fn new(
155 askit: ASKit,
156 id: String,
157 def_name: String,
158 configs: Option<AgentConfigs>,
159 ) -> Result<Self, AgentError> {
160 let mut agent = T::new(askit, id, def_name, configs)?;
161 agent.mut_data().status = AgentStatus::Init;
162 Ok(agent)
163 }
164
165 fn askit(&self) -> &ASKit {
166 &self.data().askit
167 }
168
169 fn id(&self) -> &str {
170 &self.data().id
171 }
172
173 fn status(&self) -> &AgentStatus {
174 &self.data().status
175 }
176
177 fn def_name(&self) -> &str {
178 self.data().def_name.as_str()
179 }
180
181 fn out_pin(&self, name: &str) -> Option<&Pin> {
182 if let Some(out_pins) = &self.data().out_pins {
183 return out_pins.get(name);
184 }
185 None
186 }
187
188 fn set_out_pin(&mut self, name: String, value: AgentValue) {
189 if let Some(out_pins) = &mut self.mut_data().out_pins {
190 out_pins.insert(name.clone(), Pin { name, value });
191 } else {
192 let mut out_pins = BTreeMap::new();
193 out_pins.insert(name.clone(), Pin { name, value });
194 self.mut_data().out_pins = Some(out_pins);
195 }
196 }
197
198 fn configs(&self) -> Result<&AgentConfigs, AgentError> {
199 self.data().configs.as_ref().ok_or(AgentError::NoConfig)
200 }
201
202 fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError> {
203 if let Some(configs) = &mut self.mut_data().configs {
204 configs.set(key.clone(), value.clone());
205 self.configs_changed()?;
206 }
207 Ok(())
208 }
209
210 fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError> {
211 self.mut_data().configs = Some(configs);
212 self.configs_changed()
213 }
214
215 fn flow_name(&self) -> &str {
216 &self.data().flow_name
217 }
218
219 fn set_flow_name(&mut self, flow_name: String) {
220 self.mut_data().flow_name = flow_name.clone();
221 }
222
223 async fn start(&mut self) -> Result<(), AgentError> {
224 self.mut_data().status = AgentStatus::Start;
225
226 if let Err(e) = self.start().await {
227 self.askit()
228 .emit_agent_error(self.id().to_string(), e.to_string());
229 return Err(e);
230 }
231
232 Ok(())
233 }
234
235 async fn stop(&mut self) -> Result<(), AgentError> {
236 self.mut_data().status = AgentStatus::Stop;
237 self.stop().await?;
238 self.mut_data().status = AgentStatus::Init;
239 Ok(())
240 }
241
242 async fn process(
243 &mut self,
244 ctx: AgentContext,
245 pin: String,
246 value: AgentValue,
247 ) -> Result<(), AgentError> {
248 if let Err(e) = self.process(ctx, pin, value).await {
249 self.askit()
250 .emit_agent_error(self.id().to_string(), e.to_string());
251 return Err(e);
252 }
253 Ok(())
254 }
255
256 fn get_global_configs(&self) -> Option<AgentConfigs> {
257 self.askit().get_global_configs(self.def_name())
258 }
259}
260
261pub fn new_agent_boxed<T: Agent + Send + Sync + 'static>(
262 askit: ASKit,
263 id: String,
264 def_name: String,
265 configs: Option<AgentConfigs>,
266) -> Result<Box<dyn Agent + Send + Sync>, AgentError> {
267 Ok(Box::new(T::new(askit, id, def_name, configs)?))
268}
269
270pub fn agent_new(
271 askit: ASKit,
272 agent_id: String,
273 def_name: &str,
274 configs: Option<AgentConfigs>,
275) -> Result<Box<dyn Agent + Send + Sync>, AgentError> {
276 let def;
277 {
278 let defs = askit.defs.lock().unwrap();
279 def = defs
280 .get(def_name)
281 .ok_or_else(|| AgentError::UnknownDefName(def_name.to_string()))?
282 .clone();
283 }
284
285 let default_config = def.default_configs.clone();
286 let configs = match (default_config, configs) {
287 (Some(def_cfg), Some(mut cfg)) => {
288 for (k, v) in def_cfg.iter() {
289 if !cfg.contains_key(k) {
290 cfg.set(k.clone(), v.value.clone());
291 }
292 }
293 Some(cfg)
294 }
295 (Some(def_cfg), None) => {
296 let mut cfg = AgentConfigs::default();
297 for (k, v) in def_cfg.iter() {
298 cfg.set(k.clone(), v.value.clone());
299 }
300 Some(cfg)
301 }
302 (None, Some(cfg)) => Some(cfg),
303 (None, None) => None,
304 };
305
306 if let Some(new_boxed) = def.new_boxed {
307 return new_boxed(askit, agent_id, def_name.to_string(), configs);
308 }
309
310 match def.kind.as_str() {
311 _ => return Err(AgentError::UnknownDefKind(def.kind.to_string()).into()),
320 }
321}