1use std::any::Any;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5
6use crate::askit::ASKit;
7use crate::config::AgentConfigs;
8use crate::context::AgentContext;
9use crate::error::AgentError;
10use crate::runtime::runtime;
11use crate::spec::AgentSpec;
12use crate::value::AgentValue;
13
14#[derive(Debug, Default, Clone, PartialEq)]
15pub enum AgentStatus {
16 #[default]
17 Init,
18 Start,
19 Stop,
20}
21
22pub enum AgentMessage {
23 Input {
24 ctx: AgentContext,
25 pin: String,
26 value: AgentValue,
27 },
28 Config {
29 configs: AgentConfigs,
30 },
31 Stop,
32}
33
34#[async_trait]
36pub trait Agent: Send + Sync + 'static {
37 fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError>
38 where
39 Self: Sized;
40
41 fn askit(&self) -> &ASKit;
42
43 fn id(&self) -> &str;
44
45 fn status(&self) -> &AgentStatus;
46
47 fn spec(&self) -> &AgentSpec;
48
49 fn def_name(&self) -> &str;
50
51 fn configs(&self) -> Result<&AgentConfigs, AgentError>;
52
53 fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError>;
54
55 fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError>;
56
57 fn get_global_configs(&self) -> Option<AgentConfigs> {
58 self.askit().get_global_configs(self.def_name())
59 }
60
61 fn stream_id(&self) -> &str;
62
63 fn set_stream_id(&mut self, stream_id: String);
64
65 async fn start(&mut self) -> Result<(), AgentError>;
66
67 async fn stop(&mut self) -> Result<(), AgentError>;
68
69 async fn process(
70 &mut self,
71 ctx: AgentContext,
72 pin: String,
73 value: AgentValue,
74 ) -> Result<(), AgentError>;
75
76 fn runtime(&self) -> &tokio::runtime::Runtime {
77 runtime()
78 }
79
80 fn as_any(&self) -> &dyn Any;
81
82 fn as_any_mut(&mut self) -> &mut dyn Any;
83}
84
85impl dyn Agent {
86 pub fn as_agent<T: Agent>(&self) -> Option<&T> {
87 self.as_any().downcast_ref::<T>()
88 }
89
90 pub fn as_agent_mut<T: Agent>(&mut self) -> Option<&mut T> {
91 self.as_any_mut().downcast_mut::<T>()
92 }
93}
94
95pub struct AgentData {
97 pub askit: ASKit,
99
100 pub id: String,
102
103 pub spec: AgentSpec,
105
106 pub stream_id: String,
109
110 pub status: AgentStatus,
112}
113
114impl AgentData {
115 pub fn new(askit: ASKit, id: String, spec: AgentSpec) -> Self {
116 Self {
117 askit,
118 id,
119 spec,
120 stream_id: String::new(),
121 status: AgentStatus::Init,
122 }
123 }
124}
125
126pub trait HasAgentData {
127 fn data(&self) -> &AgentData;
128
129 fn mut_data(&mut self) -> &mut AgentData;
130}
131
132#[async_trait]
133pub trait AsAgent: HasAgentData + Send + Sync + 'static {
134 fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError>
135 where
136 Self: Sized;
137
138 fn configs_changed(&mut self) -> Result<(), AgentError> {
139 Ok(())
140 }
141
142 async fn start(&mut self) -> Result<(), AgentError> {
143 Ok(())
144 }
145
146 async fn stop(&mut self) -> Result<(), AgentError> {
147 Ok(())
148 }
149
150 async fn process(
151 &mut self,
152 _ctx: AgentContext,
153 _pin: String,
154 _value: AgentValue,
155 ) -> Result<(), AgentError> {
156 Ok(())
157 }
158}
159
160#[async_trait]
161impl<T: AsAgent> Agent for T {
162 fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError> {
163 let mut agent = T::new(askit, id, spec)?;
164 agent.mut_data().status = AgentStatus::Init;
165 Ok(agent)
166 }
167
168 fn askit(&self) -> &ASKit {
169 &self.data().askit
170 }
171
172 fn id(&self) -> &str {
173 &self.data().id
174 }
175
176 fn spec(&self) -> &AgentSpec {
177 &self.data().spec
178 }
179
180 fn status(&self) -> &AgentStatus {
181 &self.data().status
182 }
183
184 fn def_name(&self) -> &str {
185 self.data().spec.def_name.as_str()
186 }
187
188 fn configs(&self) -> Result<&AgentConfigs, AgentError> {
189 self.data()
190 .spec
191 .configs
192 .as_ref()
193 .ok_or(AgentError::NoConfig)
194 }
195
196 fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError> {
197 if let Some(configs) = &mut self.mut_data().spec.configs {
198 configs.set(key.clone(), value.clone());
199 self.configs_changed()?;
200 }
201 Ok(())
202 }
203
204 fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError> {
205 self.mut_data().spec.configs = Some(configs);
206 self.configs_changed()
207 }
208
209 fn stream_id(&self) -> &str {
210 &self.data().stream_id
211 }
212
213 fn set_stream_id(&mut self, stream_id: String) {
214 self.mut_data().stream_id = stream_id.clone();
215 }
216
217 async fn start(&mut self) -> Result<(), AgentError> {
218 self.mut_data().status = AgentStatus::Start;
219
220 if let Err(e) = self.start().await {
221 self.askit()
222 .emit_agent_error(self.id().to_string(), e.to_string());
223 return Err(e);
224 }
225
226 Ok(())
227 }
228
229 async fn stop(&mut self) -> Result<(), AgentError> {
230 self.mut_data().status = AgentStatus::Stop;
231 self.stop().await?;
232 self.mut_data().status = AgentStatus::Init;
233 Ok(())
234 }
235
236 async fn process(
237 &mut self,
238 ctx: AgentContext,
239 pin: String,
240 value: AgentValue,
241 ) -> Result<(), AgentError> {
242 if let Err(e) = self.process(ctx.clone(), pin, value).await {
243 self.askit()
244 .emit_agent_error(self.id().to_string(), e.to_string());
245 self.askit()
246 .send_agent_out(
247 self.id().to_string(),
248 ctx,
249 "err".to_string(),
250 AgentValue::Error(Arc::new(e.clone())),
251 )
252 .await
253 .unwrap_or_else(|e| {
254 log::error!("Failed to send error message for {}: {}", self.id(), e);
255 });
256 return Err(e);
257 }
258 Ok(())
259 }
260
261 fn get_global_configs(&self) -> Option<AgentConfigs> {
262 self.askit().get_global_configs(self.def_name())
263 }
264
265 fn as_any(&self) -> &dyn Any {
266 self
267 }
268
269 fn as_any_mut(&mut self) -> &mut dyn Any {
270 self
271 }
272}
273
274pub fn new_agent_boxed<T: Agent>(
275 askit: ASKit,
276 id: String,
277 spec: AgentSpec,
278) -> Result<Box<dyn Agent>, AgentError> {
279 Ok(Box::new(T::new(askit, id, spec)?))
280}
281
282pub fn agent_new(
283 askit: ASKit,
284 agent_id: String,
285 spec: AgentSpec,
286) -> Result<Box<dyn Agent>, AgentError> {
287 let def;
288 {
289 let def_name = &spec.def_name;
290 let defs = askit.defs.lock().unwrap();
291 def = defs
292 .get(def_name)
293 .ok_or_else(|| AgentError::UnknownDefName(def_name.to_string()))?
294 .clone();
295 }
296
297 if let Some(new_boxed) = def.new_boxed {
298 return new_boxed(askit, agent_id, spec);
299 }
300
301 match def.kind.as_str() {
302 _ => return Err(AgentError::UnknownDefKind(def.kind.to_string()).into()),
311 }
312}