1use async_trait::async_trait;
2
3use crate::AgentValue;
4
5use super::askit::ASKit;
6use super::config::AgentConfigs;
7use super::context::AgentContext;
8use super::data::AgentData;
9use super::error::AgentError;
10use super::runtime::runtime;
11
12#[derive(Debug, Default, Clone, PartialEq)]
13pub enum AgentStatus {
14 #[default]
15 Init,
16 Start,
17 Stop,
18}
19
20pub enum AgentMessage {
21 Input {
22 ctx: AgentContext,
23 pin: String,
24 data: AgentData,
25 },
26 Config {
27 configs: AgentConfigs,
28 },
29 Stop,
30}
31
32#[async_trait]
33pub trait Agent {
34 fn new(
35 askit: ASKit,
36 id: String,
37 def_name: String,
38 configs: Option<AgentConfigs>,
39 ) -> Result<Self, AgentError>
40 where
41 Self: Sized;
42
43 fn askit(&self) -> &ASKit;
44
45 fn id(&self) -> &str;
46
47 fn status(&self) -> &AgentStatus;
48
49 fn def_name(&self) -> &str;
50
51 fn configs(&self) -> Result<&AgentConfigs, AgentError>;
52
53 fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError>;
54
55 fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError>;
56
57 fn get_global_configs(&self) -> Option<AgentConfigs> {
58 self.askit().get_global_configs(self.def_name())
59 }
60
61 fn flow_name(&self) -> &str;
62
63 fn set_flow_name(&mut self, flow_name: String);
64
65 fn start(&mut self) -> Result<(), AgentError>;
66
67 fn stop(&mut self) -> Result<(), AgentError>;
68
69 async fn process(
70 &mut self,
71 ctx: AgentContext,
72 pin: String,
73 data: AgentData,
74 ) -> Result<(), AgentError>;
75
76 fn runtime(&self) -> &tokio::runtime::Runtime {
77 runtime()
78 }
79}
80
81pub struct AsAgentData {
82 pub askit: ASKit,
83
84 pub id: String,
85 pub status: AgentStatus,
86 pub def_name: String,
87 pub flow_name: String,
88 pub configs: Option<AgentConfigs>,
89}
90
91impl AsAgentData {
92 pub fn new(askit: ASKit, id: String, def_name: String, configs: Option<AgentConfigs>) -> Self {
93 Self {
94 askit,
95 id,
96 status: AgentStatus::Init,
97 def_name,
98 flow_name: String::new(),
99 configs,
100 }
101 }
102}
103
104#[async_trait]
105pub trait AsAgent {
106 fn new(
107 askit: ASKit,
108 id: String,
109 def_name: String,
110 configs: Option<AgentConfigs>,
111 ) -> Result<Self, AgentError>
112 where
113 Self: Sized + Send + Sync;
114
115 fn data(&self) -> &AsAgentData;
116
117 fn mut_data(&mut self) -> &mut AsAgentData;
118
119 fn configs_changed(&mut self) -> Result<(), AgentError> {
120 Ok(())
121 }
122
123 fn start(&mut self) -> Result<(), AgentError> {
124 Ok(())
125 }
126
127 fn stop(&mut self) -> Result<(), AgentError> {
128 Ok(())
129 }
130
131 async fn process(
132 &mut self,
133 _ctx: AgentContext,
134 _pin: String,
135 _data: AgentData,
136 ) -> Result<(), AgentError> {
137 Ok(())
138 }
139}
140
141#[async_trait]
142impl<T: AsAgent + Send + Sync> Agent for T {
143 fn new(
144 askit: ASKit,
145 id: String,
146 def_name: String,
147 configs: Option<AgentConfigs>,
148 ) -> Result<Self, AgentError> {
149 let mut agent = T::new(askit, id, def_name, configs)?;
150 agent.mut_data().status = AgentStatus::Init;
151 Ok(agent)
152 }
153
154 fn askit(&self) -> &ASKit {
155 &self.data().askit
156 }
157
158 fn id(&self) -> &str {
159 &self.data().id
160 }
161
162 fn status(&self) -> &AgentStatus {
163 &self.data().status
164 }
165
166 fn def_name(&self) -> &str {
167 self.data().def_name.as_str()
168 }
169
170 fn configs(&self) -> Result<&AgentConfigs, AgentError> {
171 self.data().configs.as_ref().ok_or(AgentError::NoConfig)
172 }
173
174 fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError> {
175 if let Some(configs) = &mut self.mut_data().configs {
176 configs.set(key.clone(), value.clone());
177 self.configs_changed()?;
178 }
179 Ok(())
180 }
181
182 fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError> {
183 self.mut_data().configs = Some(configs);
184 self.configs_changed()
185 }
186
187 fn flow_name(&self) -> &str {
188 &self.data().flow_name
189 }
190
191 fn set_flow_name(&mut self, flow_name: String) {
192 self.mut_data().flow_name = flow_name.clone();
193 }
194
195 fn start(&mut self) -> Result<(), AgentError> {
196 self.mut_data().status = AgentStatus::Start;
197
198 if let Err(e) = self.start() {
199 self.askit()
200 .emit_agent_error(self.id().to_string(), e.to_string());
201 return Err(e);
202 }
203
204 Ok(())
205 }
206
207 fn stop(&mut self) -> Result<(), AgentError> {
208 self.mut_data().status = AgentStatus::Stop;
209 self.stop()?;
210 self.mut_data().status = AgentStatus::Init;
211 Ok(())
212 }
213
214 async fn process(
215 &mut self,
216 ctx: AgentContext,
217 pin: String,
218 data: AgentData,
219 ) -> Result<(), AgentError> {
220 if let Err(e) = self.process(ctx, pin, data).await {
221 self.askit()
222 .emit_agent_error(self.id().to_string(), e.to_string());
223 return Err(e);
224 }
225 Ok(())
226 }
227
228 fn get_global_configs(&self) -> Option<AgentConfigs> {
229 self.askit().get_global_configs(self.def_name())
230 }
231}
232
233pub fn new_agent_boxed<T: Agent + Send + Sync + 'static>(
234 askit: ASKit,
235 id: String,
236 def_name: String,
237 configs: Option<AgentConfigs>,
238) -> Result<Box<dyn Agent + Send + Sync>, AgentError> {
239 Ok(Box::new(T::new(askit, id, def_name, configs)?))
240}
241
242pub fn agent_new(
243 askit: ASKit,
244 agent_id: String,
245 def_name: &str,
246 configs: Option<AgentConfigs>,
247) -> Result<Box<dyn Agent + Send + Sync>, AgentError> {
248 let def;
249 {
250 let defs = askit.defs.lock().unwrap();
251 def = defs
252 .get(def_name)
253 .ok_or_else(|| AgentError::UnknownDefName(def_name.to_string()))?
254 .clone();
255 }
256
257 let default_config = def.default_configs.clone();
258 let configs = match (default_config, configs) {
259 (Some(def_cfg), Some(mut cfg)) => {
260 for (k, v) in def_cfg.iter() {
261 if !cfg.contains_key(k) {
262 cfg.set(k.clone(), v.value.clone());
263 }
264 }
265 Some(cfg)
266 }
267 (Some(def_cfg), None) => {
268 let mut cfg = AgentConfigs::default();
269 for (k, v) in def_cfg.iter() {
270 cfg.set(k.clone(), v.value.clone());
271 }
272 Some(cfg)
273 }
274 (None, Some(cfg)) => Some(cfg),
275 (None, None) => None,
276 };
277
278 if let Some(new_boxed) = def.new_boxed {
279 return new_boxed(askit, agent_id, def_name.to_string(), configs);
280 }
281
282 match def.kind.as_str() {
283 _ => return Err(AgentError::UnknownDefKind(def.kind.to_string()).into()),
292 }
293}