Skip to main content

modular_agent_core/
agent.rs

1use std::any::Any;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use serde_json::Value;
6
7use crate::config::AgentConfigs;
8use crate::context::AgentContext;
9use crate::error::AgentError;
10use crate::modular_agent::ModularAgent;
11use crate::runtime::runtime;
12use crate::spec::AgentSpec;
13use crate::value::AgentValue;
14
15/// The lifecycle status of an agent.
16#[derive(Debug, Default, Clone, PartialEq)]
17pub enum AgentStatus {
18    #[default]
19    Init,
20    Start,
21    Stop,
22}
23
24/// Internal messages sent to agents.
25pub(crate) enum AgentMessage {
26    /// Input value received on a port.
27    Input {
28        ctx: AgentContext,
29        port: String,
30        value: AgentValue,
31    },
32
33    /// Configuration value update.
34    Config {
35        key: String,
36        value: AgentValue,
37    },
38
39    /// Full configuration update.
40    Configs {
41        configs: AgentConfigs,
42    },
43
44    /// Stop the agent.
45    Stop,
46}
47
48/// The core trait for all agents.
49///
50/// All agents implement this trait. Defines lifecycle management,
51/// configuration access, and message processing.
52#[async_trait]
53pub trait Agent: Send + Sync + 'static {
54    /// Constructs a new agent instance.
55    fn new(ma: ModularAgent, id: String, spec: AgentSpec) -> Result<Self, AgentError>
56    where
57        Self: Sized;
58
59    /// Returns the `ModularAgent`.
60    fn ma(&self) -> &ModularAgent;
61
62    /// Returns the unique agent ID.
63    fn id(&self) -> &str;
64
65    /// Returns the current lifecycle status.
66    fn status(&self) -> &AgentStatus;
67
68    /// Returns the agent specification.
69    fn spec(&self) -> &AgentSpec;
70
71    /// Updates the agent specification.
72    fn update_spec(&mut self, spec_update: &Value) -> Result<(), AgentError>;
73
74    /// Returns the agent definition name.
75    fn def_name(&self) -> &str;
76
77    /// Returns the agent's configuration.
78    ///
79    /// # Errors
80    ///
81    /// Returns `NoConfig` if no configuration is available.
82    fn configs(&self) -> Result<&AgentConfigs, AgentError>;
83
84    /// Sets a configuration value.
85    fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError>;
86
87    /// Sets the entire configuration.
88    fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError>;
89
90    /// Gets global configuration for this agent.
91    fn get_global_configs(&self) -> Option<AgentConfigs> {
92        self.ma().get_global_configs(self.def_name())
93    }
94
95    /// Returns the preset ID this agent belongs to.
96    fn preset_id(&self) -> &str;
97
98    /// Sets the preset ID.
99    fn set_preset_id(&mut self, preset_id: String);
100
101    /// Starts the agent.
102    ///
103    /// Called when the workflow starts. Use for initialization and initial output.
104    async fn start(&mut self) -> Result<(), AgentError>;
105
106    /// Stops the agent.
107    async fn stop(&mut self) -> Result<(), AgentError>;
108
109    /// Processes an input message.
110    ///
111    /// Called when the agent receives a value on an input port.
112    async fn process(
113        &mut self,
114        ctx: AgentContext,
115        port: String,
116        value: AgentValue,
117    ) -> Result<(), AgentError>;
118
119    /// Returns the tokio runtime.
120    fn runtime(&self) -> &tokio::runtime::Runtime {
121        runtime()
122    }
123
124    fn as_any(&self) -> &dyn Any;
125
126    fn as_any_mut(&mut self) -> &mut dyn Any;
127}
128
129impl dyn Agent {
130    pub fn as_agent<T: Agent>(&self) -> Option<&T> {
131        self.as_any().downcast_ref::<T>()
132    }
133
134    pub fn as_agent_mut<T: Agent>(&mut self) -> Option<&mut T> {
135        self.as_any_mut().downcast_mut::<T>()
136    }
137}
138
139/// Core data structure for an agent.
140///
141/// Used by agents implementing `AsAgent` to store common state.
142/// The `#[modular_agent]` macro generates a struct with this as a field.
143pub struct AgentData {
144    /// The ModularAgent instance.
145    pub ma: ModularAgent,
146
147    /// The unique identifier for this agent.
148    pub id: String,
149
150    /// The specification of the agent (definition, config, etc.).
151    pub spec: AgentSpec,
152
153    /// The preset identifier for the agent.
154    /// Empty string when the agent does not belong to any preset.
155    pub preset_id: String,
156
157    /// The current lifecycle status of the agent.
158    pub status: AgentStatus,
159}
160
161impl AgentData {
162    /// Creates a new `AgentData` instance.
163    ///
164    /// Removes any `_`-prefixed config keys that were preserved by
165    /// `AgentDefinition::reconcile_spec()` for lazy migration.
166    /// Agents can read these keys from the `spec` parameter in `AsAgent::new()`
167    /// before calling this method.
168    pub fn new(ma: ModularAgent, id: String, mut spec: AgentSpec) -> Self {
169        if let Some(ref mut configs) = spec.configs {
170            configs.retain(|key, _| !key.starts_with('_'));
171        }
172        Self {
173            ma,
174            id,
175            spec,
176            preset_id: String::new(),
177            status: AgentStatus::Init,
178        }
179    }
180}
181
182/// Trait for types that contain `AgentData`.
183///
184/// Required by `AsAgent`. Usually implemented automatically via `#[modular_agent]` macro.
185pub trait HasAgentData {
186    fn data(&self) -> &AgentData;
187
188    fn mut_data(&mut self) -> &mut AgentData;
189}
190
191/// Simplified trait for implementing custom agents.
192///
193/// Implement this trait instead of `Agent` directly.
194/// The `Agent` trait is automatically implemented for all types that implement `AsAgent`.
195#[async_trait]
196pub trait AsAgent: HasAgentData + Send + Sync + 'static {
197    /// Constructs a new agent instance.
198    fn new(ma: ModularAgent, id: String, spec: AgentSpec) -> Result<Self, AgentError>
199    where
200        Self: Sized;
201
202    /// Called when configuration values change.
203    ///
204    /// Override to react to configuration changes at runtime.
205    fn configs_changed(&mut self) -> Result<(), AgentError> {
206        Ok(())
207    }
208
209    /// Called when the agent starts.
210    ///
211    /// Override for initialization logic or to emit initial values.
212    async fn start(&mut self) -> Result<(), AgentError> {
213        Ok(())
214    }
215
216    /// Called when the agent stops.
217    ///
218    /// Override for cleanup logic.
219    async fn stop(&mut self) -> Result<(), AgentError> {
220        Ok(())
221    }
222
223    /// Processes an input message.
224    ///
225    /// Override to implement the agent's main logic.
226    async fn process(
227        &mut self,
228        _ctx: AgentContext,
229        _port: String,
230        _value: AgentValue,
231    ) -> Result<(), AgentError> {
232        Ok(())
233    }
234}
235
236#[async_trait]
237impl<T: AsAgent> Agent for T {
238    fn new(ma: ModularAgent, id: String, spec: AgentSpec) -> Result<Self, AgentError> {
239        let mut agent = T::new(ma, id, spec)?;
240        agent.mut_data().status = AgentStatus::Init;
241        Ok(agent)
242    }
243
244    fn ma(&self) -> &ModularAgent {
245        &self.data().ma
246    }
247
248    fn id(&self) -> &str {
249        &self.data().id
250    }
251
252    fn spec(&self) -> &AgentSpec {
253        &self.data().spec
254    }
255
256    fn update_spec(&mut self, value: &Value) -> Result<(), AgentError> {
257        self.mut_data().spec.update(value)
258    }
259
260    fn status(&self) -> &AgentStatus {
261        &self.data().status
262    }
263
264    fn def_name(&self) -> &str {
265        self.data().spec.def_name.as_str()
266    }
267
268    fn configs(&self) -> Result<&AgentConfigs, AgentError> {
269        self.data()
270            .spec
271            .configs
272            .as_ref()
273            .ok_or(AgentError::NoConfig)
274    }
275
276    fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError> {
277        if let Some(configs) = &mut self.mut_data().spec.configs {
278            configs.set(key, value);
279            self.configs_changed()?;
280        }
281        Ok(())
282    }
283
284    fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError> {
285        self.mut_data().spec.configs = Some(configs);
286        self.configs_changed()
287    }
288
289    fn preset_id(&self) -> &str {
290        &self.data().preset_id
291    }
292
293    fn set_preset_id(&mut self, preset_id: String) {
294        self.mut_data().preset_id = preset_id;
295    }
296
297    async fn start(&mut self) -> Result<(), AgentError> {
298        self.mut_data().status = AgentStatus::Start;
299
300        if let Err(e) = <T as AsAgent>::start(self).await {
301            self.ma()
302                .emit_agent_error(self.id().to_string(), e.to_string());
303            return Err(e);
304        }
305
306        Ok(())
307    }
308
309    async fn stop(&mut self) -> Result<(), AgentError> {
310        self.mut_data().status = AgentStatus::Stop;
311        <T as AsAgent>::stop(self).await?;
312        self.mut_data().status = AgentStatus::Init;
313        Ok(())
314    }
315
316    async fn process(
317        &mut self,
318        ctx: AgentContext,
319        port: String,
320        value: AgentValue,
321    ) -> Result<(), AgentError> {
322        if let Err(e) = <T as AsAgent>::process(self, ctx.clone(), port, value).await {
323            self.ma()
324                .emit_agent_error(self.id().to_string(), e.to_string());
325            self.ma()
326                .send_agent_out(
327                    self.id().to_string(),
328                    ctx,
329                    "err".to_string(),
330                    AgentValue::Error(Arc::new(e.clone())),
331                )
332                .await
333                .unwrap_or_else(|e| {
334                    log::error!("Failed to send error message for {}: {}", self.id(), e);
335                });
336            return Err(e);
337        }
338        Ok(())
339    }
340
341    fn get_global_configs(&self) -> Option<AgentConfigs> {
342        self.ma().get_global_configs(self.def_name())
343    }
344
345    fn as_any(&self) -> &dyn Any {
346        self
347    }
348
349    fn as_any_mut(&mut self) -> &mut dyn Any {
350        self
351    }
352}
353
354/// Creates a boxed agent instance from a concrete type.
355#[doc(hidden)]
356pub fn new_agent_boxed<T: Agent>(
357    ma: ModularAgent,
358    id: String,
359    spec: AgentSpec,
360) -> Result<Box<dyn Agent>, AgentError> {
361    Ok(Box::new(T::new(ma, id, spec)?))
362}
363
364/// Creates an agent based on its definition.
365///
366/// Looks up the agent definition by name and calls the appropriate constructor.
367pub(crate) fn agent_new(
368    ma: ModularAgent,
369    agent_id: String,
370    mut spec: AgentSpec,
371) -> Result<Box<dyn Agent>, AgentError> {
372    let def;
373    {
374        let def_name = &spec.def_name;
375        let defs = ma.defs.lock().unwrap();
376        def = defs
377            .get(def_name)
378            .ok_or_else(|| AgentError::UnknownDefName(def_name.to_string()))?
379            .clone();
380    }
381
382    def.reconcile_spec(&mut spec);
383
384    if let Some(new_boxed) = def.new_boxed {
385        return new_boxed(ma, agent_id, spec);
386    }
387
388    match def.kind.as_str() {
389        // "Command" => {
390        //     return new_boxed::<super::builtins::CommandAgent>(
391        //         ma,
392        //         agent_id,
393        //         def_name.to_string(),
394        //         config,
395        //     );
396        // }
397        _ => return Err(AgentError::UnknownDefKind(def.kind.to_string()).into()),
398    }
399}
400
401#[cfg(test)]
402mod tests {
403    use super::*;
404    use crate::config::AgentConfigs;
405    use crate::value::AgentValue;
406
407    #[test]
408    fn test_agent_data_new_strips_prefixed_keys() {
409        let ma = ModularAgent::init().unwrap();
410        let mut configs = AgentConfigs::new();
411        configs.set("name".into(), AgentValue::string("hello"));
412        configs.set("count".into(), AgentValue::integer(10));
413        configs.set("_old_key".into(), AgentValue::string("stale"));
414        configs.set("_removed".into(), AgentValue::integer(42));
415
416        let spec = AgentSpec {
417            configs: Some(configs),
418            ..Default::default()
419        };
420
421        let data = AgentData::new(ma.clone(), "test_id".into(), spec);
422
423        let c = data.spec.configs.as_ref().unwrap();
424        assert_eq!(c.get_string_or_default("name"), "hello");
425        assert_eq!(c.get_integer_or_default("count"), 10);
426        assert!(c.get("_old_key").is_err());
427        assert!(c.get("_removed").is_err());
428
429        ma.quit();
430    }
431}