1use std::any::Any;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use serde_json::Value;
6
7use crate::askit::ASKit;
8use crate::config::AgentConfigs;
9use crate::context::AgentContext;
10use crate::error::AgentError;
11use crate::runtime::runtime;
12use crate::spec::AgentSpec;
13use crate::value::AgentValue;
14
15#[derive(Debug, Default, Clone, PartialEq)]
16pub enum AgentStatus {
17 #[default]
18 Init,
19 Start,
20 Stop,
21}
22
23pub enum AgentMessage {
24 Input {
25 ctx: AgentContext,
26 pin: String,
27 value: AgentValue,
28 },
29 Config {
30 key: String,
31 value: AgentValue,
32 },
33 Configs {
34 configs: AgentConfigs,
35 },
36 Stop,
37}
38
39#[async_trait]
41pub trait Agent: Send + Sync + 'static {
42 fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError>
43 where
44 Self: Sized;
45
46 fn askit(&self) -> &ASKit;
47
48 fn id(&self) -> &str;
49
50 fn status(&self) -> &AgentStatus;
51
52 fn spec(&self) -> &AgentSpec;
53
54 fn update_spec(&mut self, spec_update: &Value) -> Result<(), AgentError>;
55
56 fn def_name(&self) -> &str;
57
58 fn configs(&self) -> Result<&AgentConfigs, AgentError>;
59
60 fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError>;
61
62 fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError>;
63
64 fn get_global_configs(&self) -> Option<AgentConfigs> {
65 self.askit().get_global_configs(self.def_name())
66 }
67
68 fn stream_id(&self) -> &str;
69
70 fn set_stream_id(&mut self, stream_id: String);
71
72 async fn start(&mut self) -> Result<(), AgentError>;
73
74 async fn stop(&mut self) -> Result<(), AgentError>;
75
76 async fn process(
77 &mut self,
78 ctx: AgentContext,
79 pin: String,
80 value: AgentValue,
81 ) -> Result<(), AgentError>;
82
83 fn runtime(&self) -> &tokio::runtime::Runtime {
84 runtime()
85 }
86
87 fn as_any(&self) -> &dyn Any;
88
89 fn as_any_mut(&mut self) -> &mut dyn Any;
90}
91
92impl dyn Agent {
93 pub fn as_agent<T: Agent>(&self) -> Option<&T> {
94 self.as_any().downcast_ref::<T>()
95 }
96
97 pub fn as_agent_mut<T: Agent>(&mut self) -> Option<&mut T> {
98 self.as_any_mut().downcast_mut::<T>()
99 }
100}
101
102pub struct AgentData {
104 pub askit: ASKit,
106
107 pub id: String,
109
110 pub spec: AgentSpec,
112
113 pub stream_id: String,
116
117 pub status: AgentStatus,
119}
120
121impl AgentData {
122 pub fn new(askit: ASKit, id: String, spec: AgentSpec) -> Self {
123 Self {
124 askit,
125 id,
126 spec,
127 stream_id: String::new(),
128 status: AgentStatus::Init,
129 }
130 }
131}
132
133pub trait HasAgentData {
134 fn data(&self) -> &AgentData;
135
136 fn mut_data(&mut self) -> &mut AgentData;
137}
138
139#[async_trait]
140pub trait AsAgent: HasAgentData + Send + Sync + 'static {
141 fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError>
142 where
143 Self: Sized;
144
145 fn configs_changed(&mut self) -> Result<(), AgentError> {
146 Ok(())
147 }
148
149 async fn start(&mut self) -> Result<(), AgentError> {
150 Ok(())
151 }
152
153 async fn stop(&mut self) -> Result<(), AgentError> {
154 Ok(())
155 }
156
157 async fn process(
158 &mut self,
159 _ctx: AgentContext,
160 _pin: String,
161 _value: AgentValue,
162 ) -> Result<(), AgentError> {
163 Ok(())
164 }
165}
166
167#[async_trait]
168impl<T: AsAgent> Agent for T {
169 fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError> {
170 let mut agent = T::new(askit, id, spec)?;
171 agent.mut_data().status = AgentStatus::Init;
172 Ok(agent)
173 }
174
175 fn askit(&self) -> &ASKit {
176 &self.data().askit
177 }
178
179 fn id(&self) -> &str {
180 &self.data().id
181 }
182
183 fn spec(&self) -> &AgentSpec {
184 &self.data().spec
185 }
186
187 fn update_spec(&mut self, value: &Value) -> Result<(), AgentError> {
188 self.mut_data().spec.update(value)
189 }
190
191 fn status(&self) -> &AgentStatus {
192 &self.data().status
193 }
194
195 fn def_name(&self) -> &str {
196 self.data().spec.def_name.as_str()
197 }
198
199 fn configs(&self) -> Result<&AgentConfigs, AgentError> {
200 self.data()
201 .spec
202 .configs
203 .as_ref()
204 .ok_or(AgentError::NoConfig)
205 }
206
207 fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError> {
208 if let Some(configs) = &mut self.mut_data().spec.configs {
209 configs.set(key, value);
210 self.configs_changed()?;
211 }
212 Ok(())
213 }
214
215 fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError> {
216 self.mut_data().spec.configs = Some(configs);
217 self.configs_changed()
218 }
219
220 fn stream_id(&self) -> &str {
221 &self.data().stream_id
222 }
223
224 fn set_stream_id(&mut self, stream_id: String) {
225 self.mut_data().stream_id = stream_id;
226 }
227
228 async fn start(&mut self) -> Result<(), AgentError> {
229 self.mut_data().status = AgentStatus::Start;
230
231 if let Err(e) = self.start().await {
232 self.askit()
233 .emit_agent_error(self.id().to_string(), e.to_string());
234 return Err(e);
235 }
236
237 Ok(())
238 }
239
240 async fn stop(&mut self) -> Result<(), AgentError> {
241 self.mut_data().status = AgentStatus::Stop;
242 self.stop().await?;
243 self.mut_data().status = AgentStatus::Init;
244 Ok(())
245 }
246
247 async fn process(
248 &mut self,
249 ctx: AgentContext,
250 pin: String,
251 value: AgentValue,
252 ) -> Result<(), AgentError> {
253 if let Err(e) = self.process(ctx.clone(), pin, value).await {
254 self.askit()
255 .emit_agent_error(self.id().to_string(), e.to_string());
256 self.askit()
257 .send_agent_out(
258 self.id().to_string(),
259 ctx,
260 "err".to_string(),
261 AgentValue::Error(Arc::new(e.clone())),
262 )
263 .await
264 .unwrap_or_else(|e| {
265 log::error!("Failed to send error message for {}: {}", self.id(), e);
266 });
267 return Err(e);
268 }
269 Ok(())
270 }
271
272 fn get_global_configs(&self) -> Option<AgentConfigs> {
273 self.askit().get_global_configs(self.def_name())
274 }
275
276 fn as_any(&self) -> &dyn Any {
277 self
278 }
279
280 fn as_any_mut(&mut self) -> &mut dyn Any {
281 self
282 }
283}
284
285pub fn new_agent_boxed<T: Agent>(
286 askit: ASKit,
287 id: String,
288 spec: AgentSpec,
289) -> Result<Box<dyn Agent>, AgentError> {
290 Ok(Box::new(T::new(askit, id, spec)?))
291}
292
293pub fn agent_new(
294 askit: ASKit,
295 agent_id: String,
296 spec: AgentSpec,
297) -> Result<Box<dyn Agent>, AgentError> {
298 let def;
299 {
300 let def_name = &spec.def_name;
301 let defs = askit.defs.lock().unwrap();
302 def = defs
303 .get(def_name)
304 .ok_or_else(|| AgentError::UnknownDefName(def_name.to_string()))?
305 .clone();
306 }
307
308 if let Some(new_boxed) = def.new_boxed {
309 return new_boxed(askit, agent_id, spec);
310 }
311
312 match def.kind.as_str() {
313 _ => return Err(AgentError::UnknownDefKind(def.kind.to_string()).into()),
322 }
323}