1use std::any::Any;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use serde_json::Value;
6
7use crate::config::AgentConfigs;
8use crate::context::AgentContext;
9use crate::error::AgentError;
10use crate::modular_agent::ModularAgent;
11use crate::runtime::runtime;
12use crate::spec::AgentSpec;
13use crate::value::AgentValue;
14
15#[derive(Debug, Default, Clone, PartialEq)]
17pub enum AgentStatus {
18 #[default]
19 Init,
20 Start,
21 Stop,
22}
23
24pub(crate) enum AgentMessage {
26 Input {
28 ctx: AgentContext,
29 port: String,
30 value: AgentValue,
31 },
32
33 Config {
35 key: String,
36 value: AgentValue,
37 },
38
39 Configs {
41 configs: AgentConfigs,
42 },
43
44 Stop,
46}
47
48#[async_trait]
53pub trait Agent: Send + Sync + 'static {
54 fn new(ma: ModularAgent, id: String, spec: AgentSpec) -> Result<Self, AgentError>
56 where
57 Self: Sized;
58
59 fn ma(&self) -> &ModularAgent;
61
62 fn id(&self) -> &str;
64
65 fn status(&self) -> &AgentStatus;
67
68 fn spec(&self) -> &AgentSpec;
70
71 fn update_spec(&mut self, spec_update: &Value) -> Result<(), AgentError>;
73
74 fn def_name(&self) -> &str;
76
77 fn configs(&self) -> Result<&AgentConfigs, AgentError>;
83
84 fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError>;
86
87 fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError>;
89
90 fn get_global_configs(&self) -> Option<AgentConfigs> {
92 self.ma().get_global_configs(self.def_name())
93 }
94
95 fn preset_id(&self) -> &str;
97
98 fn set_preset_id(&mut self, preset_id: String);
100
101 async fn start(&mut self) -> Result<(), AgentError>;
105
106 async fn stop(&mut self) -> Result<(), AgentError>;
108
109 async fn process(
113 &mut self,
114 ctx: AgentContext,
115 port: String,
116 value: AgentValue,
117 ) -> Result<(), AgentError>;
118
119 fn runtime(&self) -> &tokio::runtime::Runtime {
121 runtime()
122 }
123
124 fn as_any(&self) -> &dyn Any;
125
126 fn as_any_mut(&mut self) -> &mut dyn Any;
127}
128
129impl dyn Agent {
130 pub fn as_agent<T: Agent>(&self) -> Option<&T> {
131 self.as_any().downcast_ref::<T>()
132 }
133
134 pub fn as_agent_mut<T: Agent>(&mut self) -> Option<&mut T> {
135 self.as_any_mut().downcast_mut::<T>()
136 }
137}
138
139pub struct AgentData {
144 pub ma: ModularAgent,
146
147 pub id: String,
149
150 pub spec: AgentSpec,
152
153 pub preset_id: String,
156
157 pub status: AgentStatus,
159}
160
161impl AgentData {
162 pub fn new(ma: ModularAgent, id: String, mut spec: AgentSpec) -> Self {
169 if let Some(ref mut configs) = spec.configs {
170 configs.retain(|key, _| !key.starts_with('_'));
171 }
172 Self {
173 ma,
174 id,
175 spec,
176 preset_id: String::new(),
177 status: AgentStatus::Init,
178 }
179 }
180}
181
182pub trait HasAgentData {
186 fn data(&self) -> &AgentData;
187
188 fn mut_data(&mut self) -> &mut AgentData;
189}
190
191#[async_trait]
196pub trait AsAgent: HasAgentData + Send + Sync + 'static {
197 fn new(ma: ModularAgent, id: String, spec: AgentSpec) -> Result<Self, AgentError>
199 where
200 Self: Sized;
201
202 fn configs_changed(&mut self) -> Result<(), AgentError> {
206 Ok(())
207 }
208
209 async fn start(&mut self) -> Result<(), AgentError> {
213 Ok(())
214 }
215
216 async fn stop(&mut self) -> Result<(), AgentError> {
220 Ok(())
221 }
222
223 async fn process(
227 &mut self,
228 _ctx: AgentContext,
229 _port: String,
230 _value: AgentValue,
231 ) -> Result<(), AgentError> {
232 Ok(())
233 }
234}
235
236#[async_trait]
237impl<T: AsAgent> Agent for T {
238 fn new(ma: ModularAgent, id: String, spec: AgentSpec) -> Result<Self, AgentError> {
239 let mut agent = T::new(ma, id, spec)?;
240 agent.mut_data().status = AgentStatus::Init;
241 Ok(agent)
242 }
243
244 fn ma(&self) -> &ModularAgent {
245 &self.data().ma
246 }
247
248 fn id(&self) -> &str {
249 &self.data().id
250 }
251
252 fn spec(&self) -> &AgentSpec {
253 &self.data().spec
254 }
255
256 fn update_spec(&mut self, value: &Value) -> Result<(), AgentError> {
257 self.mut_data().spec.update(value)
258 }
259
260 fn status(&self) -> &AgentStatus {
261 &self.data().status
262 }
263
264 fn def_name(&self) -> &str {
265 self.data().spec.def_name.as_str()
266 }
267
268 fn configs(&self) -> Result<&AgentConfigs, AgentError> {
269 self.data()
270 .spec
271 .configs
272 .as_ref()
273 .ok_or(AgentError::NoConfig)
274 }
275
276 fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError> {
277 if let Some(configs) = &mut self.mut_data().spec.configs {
278 configs.set(key, value);
279 self.configs_changed()?;
280 }
281 Ok(())
282 }
283
284 fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError> {
285 self.mut_data().spec.configs = Some(configs);
286 self.configs_changed()
287 }
288
289 fn preset_id(&self) -> &str {
290 &self.data().preset_id
291 }
292
293 fn set_preset_id(&mut self, preset_id: String) {
294 self.mut_data().preset_id = preset_id;
295 }
296
297 async fn start(&mut self) -> Result<(), AgentError> {
298 self.mut_data().status = AgentStatus::Start;
299
300 if let Err(e) = <T as AsAgent>::start(self).await {
301 self.ma()
302 .emit_agent_error(self.id().to_string(), e.to_string());
303 return Err(e);
304 }
305
306 Ok(())
307 }
308
309 async fn stop(&mut self) -> Result<(), AgentError> {
310 self.mut_data().status = AgentStatus::Stop;
311 <T as AsAgent>::stop(self).await?;
312 self.mut_data().status = AgentStatus::Init;
313 Ok(())
314 }
315
316 async fn process(
317 &mut self,
318 ctx: AgentContext,
319 port: String,
320 value: AgentValue,
321 ) -> Result<(), AgentError> {
322 if let Err(e) = <T as AsAgent>::process(self, ctx.clone(), port, value).await {
323 self.ma()
324 .emit_agent_error(self.id().to_string(), e.to_string());
325 self.ma()
326 .send_agent_out(
327 self.id().to_string(),
328 ctx,
329 "err".to_string(),
330 AgentValue::Error(Arc::new(e.clone())),
331 )
332 .await
333 .unwrap_or_else(|e| {
334 log::error!("Failed to send error message for {}: {}", self.id(), e);
335 });
336 return Err(e);
337 }
338 Ok(())
339 }
340
341 fn get_global_configs(&self) -> Option<AgentConfigs> {
342 self.ma().get_global_configs(self.def_name())
343 }
344
345 fn as_any(&self) -> &dyn Any {
346 self
347 }
348
349 fn as_any_mut(&mut self) -> &mut dyn Any {
350 self
351 }
352}
353
354#[doc(hidden)]
356pub fn new_agent_boxed<T: Agent>(
357 ma: ModularAgent,
358 id: String,
359 spec: AgentSpec,
360) -> Result<Box<dyn Agent>, AgentError> {
361 Ok(Box::new(T::new(ma, id, spec)?))
362}
363
364pub(crate) fn agent_new(
368 ma: ModularAgent,
369 agent_id: String,
370 mut spec: AgentSpec,
371) -> Result<Box<dyn Agent>, AgentError> {
372 let def;
373 {
374 let def_name = &spec.def_name;
375 let defs = ma.defs.lock().unwrap();
376 def = defs
377 .get(def_name)
378 .ok_or_else(|| AgentError::UnknownDefName(def_name.to_string()))?
379 .clone();
380 }
381
382 def.reconcile_spec(&mut spec);
383
384 if let Some(new_boxed) = def.new_boxed {
385 return new_boxed(ma, agent_id, spec);
386 }
387
388 match def.kind.as_str() {
389 _ => return Err(AgentError::UnknownDefKind(def.kind.to_string()).into()),
398 }
399}
400
401#[cfg(test)]
402mod tests {
403 use super::*;
404 use crate::config::AgentConfigs;
405 use crate::value::AgentValue;
406
407 #[test]
408 fn test_agent_data_new_strips_prefixed_keys() {
409 let ma = ModularAgent::init().unwrap();
410 let mut configs = AgentConfigs::new();
411 configs.set("name".into(), AgentValue::string("hello"));
412 configs.set("count".into(), AgentValue::integer(10));
413 configs.set("_old_key".into(), AgentValue::string("stale"));
414 configs.set("_removed".into(), AgentValue::integer(42));
415
416 let spec = AgentSpec {
417 configs: Some(configs),
418 ..Default::default()
419 };
420
421 let data = AgentData::new(ma.clone(), "test_id".into(), spec);
422
423 let c = data.spec.configs.as_ref().unwrap();
424 assert_eq!(c.get_string_or_default("name"), "hello");
425 assert_eq!(c.get_integer_or_default("count"), 10);
426 assert!(c.get("_old_key").is_err());
427 assert!(c.get("_removed").is_err());
428
429 ma.quit();
430 }
431}