1use std::any::Any;
2
3use async_trait::async_trait;
4
5use crate::askit::ASKit;
6use crate::config::AgentConfigs;
7use crate::context::AgentContext;
8use crate::error::AgentError;
9use crate::runtime::runtime;
10use crate::spec::AgentSpec;
11use crate::value::AgentValue;
12
13#[derive(Debug, Default, Clone, PartialEq)]
14pub enum AgentStatus {
15 #[default]
16 Init,
17 Start,
18 Stop,
19}
20
21pub enum AgentMessage {
22 Input {
23 ctx: AgentContext,
24 pin: String,
25 value: AgentValue,
26 },
27 Config {
28 configs: AgentConfigs,
29 },
30 Stop,
31}
32
33#[async_trait]
35pub trait Agent: Send + Sync + 'static {
36 fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError>
37 where
38 Self: Sized;
39
40 fn askit(&self) -> &ASKit;
41
42 fn id(&self) -> &str;
43
44 fn status(&self) -> &AgentStatus;
45
46 fn spec(&self) -> &AgentSpec;
47
48 fn def_name(&self) -> &str;
49
50 fn configs(&self) -> Result<&AgentConfigs, AgentError>;
51
52 fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError>;
53
54 fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError>;
55
56 fn get_global_configs(&self) -> Option<AgentConfigs> {
57 self.askit().get_global_configs(self.def_name())
58 }
59
60 fn stream_id(&self) -> &str;
61
62 fn set_stream_id(&mut self, stream_id: String);
63
64 async fn start(&mut self) -> Result<(), AgentError>;
65
66 async fn stop(&mut self) -> Result<(), AgentError>;
67
68 async fn process(
69 &mut self,
70 ctx: AgentContext,
71 pin: String,
72 value: AgentValue,
73 ) -> Result<(), AgentError>;
74
75 fn runtime(&self) -> &tokio::runtime::Runtime {
76 runtime()
77 }
78
79 fn as_any(&self) -> &dyn Any;
80
81 fn as_any_mut(&mut self) -> &mut dyn Any;
82}
83
84impl dyn Agent {
85 pub fn as_agent<T: Agent>(&self) -> Option<&T> {
86 self.as_any().downcast_ref::<T>()
87 }
88
89 pub fn as_agent_mut<T: Agent>(&mut self) -> Option<&mut T> {
90 self.as_any_mut().downcast_mut::<T>()
91 }
92}
93
94pub struct AgentData {
96 pub askit: ASKit,
98
99 pub id: String,
101
102 pub spec: AgentSpec,
104
105 pub stream_id: String,
108
109 pub status: AgentStatus,
111}
112
113impl AgentData {
114 pub fn new(askit: ASKit, id: String, spec: AgentSpec) -> Self {
115 Self {
116 askit,
117 id,
118 spec,
119 stream_id: String::new(),
120 status: AgentStatus::Init,
121 }
122 }
123}
124
125pub trait HasAgentData {
126 fn data(&self) -> &AgentData;
127
128 fn mut_data(&mut self) -> &mut AgentData;
129}
130
131#[async_trait]
132pub trait AsAgent: HasAgentData + Send + Sync + 'static {
133 fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError>
134 where
135 Self: Sized;
136
137 fn configs_changed(&mut self) -> Result<(), AgentError> {
138 Ok(())
139 }
140
141 async fn start(&mut self) -> Result<(), AgentError> {
142 Ok(())
143 }
144
145 async fn stop(&mut self) -> Result<(), AgentError> {
146 Ok(())
147 }
148
149 async fn process(
150 &mut self,
151 _ctx: AgentContext,
152 _pin: String,
153 _value: AgentValue,
154 ) -> Result<(), AgentError> {
155 Ok(())
156 }
157}
158
159#[async_trait]
160impl<T: AsAgent> Agent for T {
161 fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError> {
162 let mut agent = T::new(askit, id, spec)?;
163 agent.mut_data().status = AgentStatus::Init;
164 Ok(agent)
165 }
166
167 fn askit(&self) -> &ASKit {
168 &self.data().askit
169 }
170
171 fn id(&self) -> &str {
172 &self.data().id
173 }
174
175 fn spec(&self) -> &AgentSpec {
176 &self.data().spec
177 }
178
179 fn status(&self) -> &AgentStatus {
180 &self.data().status
181 }
182
183 fn def_name(&self) -> &str {
184 self.data().spec.def_name.as_str()
185 }
186
187 fn configs(&self) -> Result<&AgentConfigs, AgentError> {
188 self.data()
189 .spec
190 .configs
191 .as_ref()
192 .ok_or(AgentError::NoConfig)
193 }
194
195 fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError> {
196 if let Some(configs) = &mut self.mut_data().spec.configs {
197 configs.set(key.clone(), value.clone());
198 self.configs_changed()?;
199 }
200 Ok(())
201 }
202
203 fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError> {
204 self.mut_data().spec.configs = Some(configs);
205 self.configs_changed()
206 }
207
208 fn stream_id(&self) -> &str {
209 &self.data().stream_id
210 }
211
212 fn set_stream_id(&mut self, stream_id: String) {
213 self.mut_data().stream_id = stream_id.clone();
214 }
215
216 async fn start(&mut self) -> Result<(), AgentError> {
217 self.mut_data().status = AgentStatus::Start;
218
219 if let Err(e) = self.start().await {
220 self.askit()
221 .emit_agent_error(self.id().to_string(), e.to_string());
222 return Err(e);
223 }
224
225 Ok(())
226 }
227
228 async fn stop(&mut self) -> Result<(), AgentError> {
229 self.mut_data().status = AgentStatus::Stop;
230 self.stop().await?;
231 self.mut_data().status = AgentStatus::Init;
232 Ok(())
233 }
234
235 async fn process(
236 &mut self,
237 ctx: AgentContext,
238 pin: String,
239 value: AgentValue,
240 ) -> Result<(), AgentError> {
241 if let Err(e) = self.process(ctx, pin, value).await {
242 self.askit()
243 .emit_agent_error(self.id().to_string(), e.to_string());
244 return Err(e);
245 }
246 Ok(())
247 }
248
249 fn get_global_configs(&self) -> Option<AgentConfigs> {
250 self.askit().get_global_configs(self.def_name())
251 }
252
253 fn as_any(&self) -> &dyn Any {
254 self
255 }
256
257 fn as_any_mut(&mut self) -> &mut dyn Any {
258 self
259 }
260}
261
262pub fn new_agent_boxed<T: Agent>(
263 askit: ASKit,
264 id: String,
265 spec: AgentSpec,
266) -> Result<Box<dyn Agent>, AgentError> {
267 Ok(Box::new(T::new(askit, id, spec)?))
268}
269
270pub fn agent_new(
271 askit: ASKit,
272 agent_id: String,
273 spec: AgentSpec,
274) -> Result<Box<dyn Agent>, AgentError> {
275 let def;
276 {
277 let def_name = &spec.def_name;
278 let defs = askit.defs.lock().unwrap();
279 def = defs
280 .get(def_name)
281 .ok_or_else(|| AgentError::UnknownDefName(def_name.to_string()))?
282 .clone();
283 }
284
285 if let Some(new_boxed) = def.new_boxed {
286 return new_boxed(askit, agent_id, spec);
287 }
288
289 match def.kind.as_str() {
290 _ => return Err(AgentError::UnknownDefKind(def.kind.to_string()).into()),
299 }
300}