1use std::any::Any;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5
6use crate::askit::ASKit;
7use crate::config::AgentConfigs;
8use crate::context::AgentContext;
9use crate::error::AgentError;
10use crate::runtime::runtime;
11use crate::spec::AgentSpec;
12use crate::value::AgentValue;
13
14#[derive(Debug, Default, Clone, PartialEq)]
15pub enum AgentStatus {
16 #[default]
17 Init,
18 Start,
19 Stop,
20}
21
22pub enum AgentMessage {
23 Input {
24 ctx: AgentContext,
25 pin: String,
26 value: AgentValue,
27 },
28 Config {
29 key: String,
30 value: AgentValue,
31 },
32 Configs {
33 configs: AgentConfigs,
34 },
35 Stop,
36}
37
38#[async_trait]
40pub trait Agent: Send + Sync + 'static {
41 fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError>
42 where
43 Self: Sized;
44
45 fn askit(&self) -> &ASKit;
46
47 fn id(&self) -> &str;
48
49 fn status(&self) -> &AgentStatus;
50
51 fn spec(&self) -> &AgentSpec;
52
53 fn def_name(&self) -> &str;
54
55 fn configs(&self) -> Result<&AgentConfigs, AgentError>;
56
57 fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError>;
58
59 fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError>;
60
61 fn get_global_configs(&self) -> Option<AgentConfigs> {
62 self.askit().get_global_configs(self.def_name())
63 }
64
65 fn stream_id(&self) -> &str;
66
67 fn set_stream_id(&mut self, stream_id: String);
68
69 async fn start(&mut self) -> Result<(), AgentError>;
70
71 async fn stop(&mut self) -> Result<(), AgentError>;
72
73 async fn process(
74 &mut self,
75 ctx: AgentContext,
76 pin: String,
77 value: AgentValue,
78 ) -> Result<(), AgentError>;
79
80 fn runtime(&self) -> &tokio::runtime::Runtime {
81 runtime()
82 }
83
84 fn as_any(&self) -> &dyn Any;
85
86 fn as_any_mut(&mut self) -> &mut dyn Any;
87}
88
89impl dyn Agent {
90 pub fn as_agent<T: Agent>(&self) -> Option<&T> {
91 self.as_any().downcast_ref::<T>()
92 }
93
94 pub fn as_agent_mut<T: Agent>(&mut self) -> Option<&mut T> {
95 self.as_any_mut().downcast_mut::<T>()
96 }
97}
98
99pub struct AgentData {
101 pub askit: ASKit,
103
104 pub id: String,
106
107 pub spec: AgentSpec,
109
110 pub stream_id: String,
113
114 pub status: AgentStatus,
116}
117
118impl AgentData {
119 pub fn new(askit: ASKit, id: String, spec: AgentSpec) -> Self {
120 Self {
121 askit,
122 id,
123 spec,
124 stream_id: String::new(),
125 status: AgentStatus::Init,
126 }
127 }
128}
129
130pub trait HasAgentData {
131 fn data(&self) -> &AgentData;
132
133 fn mut_data(&mut self) -> &mut AgentData;
134}
135
136#[async_trait]
137pub trait AsAgent: HasAgentData + Send + Sync + 'static {
138 fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError>
139 where
140 Self: Sized;
141
142 fn configs_changed(&mut self) -> Result<(), AgentError> {
143 Ok(())
144 }
145
146 async fn start(&mut self) -> Result<(), AgentError> {
147 Ok(())
148 }
149
150 async fn stop(&mut self) -> Result<(), AgentError> {
151 Ok(())
152 }
153
154 async fn process(
155 &mut self,
156 _ctx: AgentContext,
157 _pin: String,
158 _value: AgentValue,
159 ) -> Result<(), AgentError> {
160 Ok(())
161 }
162}
163
164#[async_trait]
165impl<T: AsAgent> Agent for T {
166 fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError> {
167 let mut agent = T::new(askit, id, spec)?;
168 agent.mut_data().status = AgentStatus::Init;
169 Ok(agent)
170 }
171
172 fn askit(&self) -> &ASKit {
173 &self.data().askit
174 }
175
176 fn id(&self) -> &str {
177 &self.data().id
178 }
179
180 fn spec(&self) -> &AgentSpec {
181 &self.data().spec
182 }
183
184 fn status(&self) -> &AgentStatus {
185 &self.data().status
186 }
187
188 fn def_name(&self) -> &str {
189 self.data().spec.def_name.as_str()
190 }
191
192 fn configs(&self) -> Result<&AgentConfigs, AgentError> {
193 self.data()
194 .spec
195 .configs
196 .as_ref()
197 .ok_or(AgentError::NoConfig)
198 }
199
200 fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError> {
201 if let Some(configs) = &mut self.mut_data().spec.configs {
202 configs.set(key.clone(), value.clone());
203 self.configs_changed()?;
204 }
205 Ok(())
206 }
207
208 fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError> {
209 self.mut_data().spec.configs = Some(configs);
210 self.configs_changed()
211 }
212
213 fn stream_id(&self) -> &str {
214 &self.data().stream_id
215 }
216
217 fn set_stream_id(&mut self, stream_id: String) {
218 self.mut_data().stream_id = stream_id.clone();
219 }
220
221 async fn start(&mut self) -> Result<(), AgentError> {
222 self.mut_data().status = AgentStatus::Start;
223
224 if let Err(e) = self.start().await {
225 self.askit()
226 .emit_agent_error(self.id().to_string(), e.to_string());
227 return Err(e);
228 }
229
230 Ok(())
231 }
232
233 async fn stop(&mut self) -> Result<(), AgentError> {
234 self.mut_data().status = AgentStatus::Stop;
235 self.stop().await?;
236 self.mut_data().status = AgentStatus::Init;
237 Ok(())
238 }
239
240 async fn process(
241 &mut self,
242 ctx: AgentContext,
243 pin: String,
244 value: AgentValue,
245 ) -> Result<(), AgentError> {
246 if let Err(e) = self.process(ctx.clone(), pin, value).await {
247 self.askit()
248 .emit_agent_error(self.id().to_string(), e.to_string());
249 self.askit()
250 .send_agent_out(
251 self.id().to_string(),
252 ctx,
253 "err".to_string(),
254 AgentValue::Error(Arc::new(e.clone())),
255 )
256 .await
257 .unwrap_or_else(|e| {
258 log::error!("Failed to send error message for {}: {}", self.id(), e);
259 });
260 return Err(e);
261 }
262 Ok(())
263 }
264
265 fn get_global_configs(&self) -> Option<AgentConfigs> {
266 self.askit().get_global_configs(self.def_name())
267 }
268
269 fn as_any(&self) -> &dyn Any {
270 self
271 }
272
273 fn as_any_mut(&mut self) -> &mut dyn Any {
274 self
275 }
276}
277
278pub fn new_agent_boxed<T: Agent>(
279 askit: ASKit,
280 id: String,
281 spec: AgentSpec,
282) -> Result<Box<dyn Agent>, AgentError> {
283 Ok(Box::new(T::new(askit, id, spec)?))
284}
285
286pub fn agent_new(
287 askit: ASKit,
288 agent_id: String,
289 spec: AgentSpec,
290) -> Result<Box<dyn Agent>, AgentError> {
291 let def;
292 {
293 let def_name = &spec.def_name;
294 let defs = askit.defs.lock().unwrap();
295 def = defs
296 .get(def_name)
297 .ok_or_else(|| AgentError::UnknownDefName(def_name.to_string()))?
298 .clone();
299 }
300
301 if let Some(new_boxed) = def.new_boxed {
302 return new_boxed(askit, agent_id, spec);
303 }
304
305 match def.kind.as_str() {
306 _ => return Err(AgentError::UnknownDefKind(def.kind.to_string()).into()),
315 }
316}