agent_stream_kit/
agent.rs

1use std::any::Any;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use serde_json::Value;
6
7use crate::askit::ASKit;
8use crate::config::AgentConfigs;
9use crate::context::AgentContext;
10use crate::error::AgentError;
11use crate::runtime::runtime;
12use crate::spec::AgentSpec;
13use crate::value::AgentValue;
14
15#[derive(Debug, Default, Clone, PartialEq)]
16pub enum AgentStatus {
17    #[default]
18    Init,
19    Start,
20    Stop,
21}
22
23pub enum AgentMessage {
24    Input {
25        ctx: AgentContext,
26        pin: String,
27        value: AgentValue,
28    },
29    Config {
30        key: String,
31        value: AgentValue,
32    },
33    Configs {
34        configs: AgentConfigs,
35    },
36    Stop,
37}
38
39/// The core trait for all agents.
40#[async_trait]
41pub trait Agent: Send + Sync + 'static {
42    fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError>
43    where
44        Self: Sized;
45
46    fn askit(&self) -> &ASKit;
47
48    fn id(&self) -> &str;
49
50    fn status(&self) -> &AgentStatus;
51
52    fn spec(&self) -> &AgentSpec;
53
54    fn update_spec(&mut self, spec_update: &Value) -> Result<(), AgentError>;
55
56    fn def_name(&self) -> &str;
57
58    fn configs(&self) -> Result<&AgentConfigs, AgentError>;
59
60    fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError>;
61
62    fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError>;
63
64    fn get_global_configs(&self) -> Option<AgentConfigs> {
65        self.askit().get_global_configs(self.def_name())
66    }
67
68    fn stream_id(&self) -> &str;
69
70    fn set_stream_id(&mut self, stream_id: String);
71
72    async fn start(&mut self) -> Result<(), AgentError>;
73
74    async fn stop(&mut self) -> Result<(), AgentError>;
75
76    async fn process(
77        &mut self,
78        ctx: AgentContext,
79        pin: String,
80        value: AgentValue,
81    ) -> Result<(), AgentError>;
82
83    fn runtime(&self) -> &tokio::runtime::Runtime {
84        runtime()
85    }
86
87    fn as_any(&self) -> &dyn Any;
88
89    fn as_any_mut(&mut self) -> &mut dyn Any;
90}
91
92impl dyn Agent {
93    pub fn as_agent<T: Agent>(&self) -> Option<&T> {
94        self.as_any().downcast_ref::<T>()
95    }
96
97    pub fn as_agent_mut<T: Agent>(&mut self) -> Option<&mut T> {
98        self.as_any_mut().downcast_mut::<T>()
99    }
100}
101
102/// The core data structure for an agent.
103pub struct AgentData {
104    /// The ASKit instance.
105    pub askit: ASKit,
106
107    /// The unique identifier for the agent.
108    pub id: String,
109
110    /// The specification of the agent.
111    pub spec: AgentSpec,
112
113    /// The stream identifier for the agent.
114    /// Empty string when the agent does not belong to any stream.
115    pub stream_id: String,
116
117    /// The current status of the agent.
118    pub status: AgentStatus,
119}
120
121impl AgentData {
122    pub fn new(askit: ASKit, id: String, spec: AgentSpec) -> Self {
123        Self {
124            askit,
125            id,
126            spec,
127            stream_id: String::new(),
128            status: AgentStatus::Init,
129        }
130    }
131}
132
133pub trait HasAgentData {
134    fn data(&self) -> &AgentData;
135
136    fn mut_data(&mut self) -> &mut AgentData;
137}
138
139#[async_trait]
140pub trait AsAgent: HasAgentData + Send + Sync + 'static {
141    fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError>
142    where
143        Self: Sized;
144
145    fn configs_changed(&mut self) -> Result<(), AgentError> {
146        Ok(())
147    }
148
149    async fn start(&mut self) -> Result<(), AgentError> {
150        Ok(())
151    }
152
153    async fn stop(&mut self) -> Result<(), AgentError> {
154        Ok(())
155    }
156
157    async fn process(
158        &mut self,
159        _ctx: AgentContext,
160        _pin: String,
161        _value: AgentValue,
162    ) -> Result<(), AgentError> {
163        Ok(())
164    }
165}
166
167#[async_trait]
168impl<T: AsAgent> Agent for T {
169    fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError> {
170        let mut agent = T::new(askit, id, spec)?;
171        agent.mut_data().status = AgentStatus::Init;
172        Ok(agent)
173    }
174
175    fn askit(&self) -> &ASKit {
176        &self.data().askit
177    }
178
179    fn id(&self) -> &str {
180        &self.data().id
181    }
182
183    fn spec(&self) -> &AgentSpec {
184        &self.data().spec
185    }
186
187    fn update_spec(&mut self, value: &Value) -> Result<(), AgentError> {
188        self.mut_data().spec.update(value)
189    }
190
191    fn status(&self) -> &AgentStatus {
192        &self.data().status
193    }
194
195    fn def_name(&self) -> &str {
196        self.data().spec.def_name.as_str()
197    }
198
199    fn configs(&self) -> Result<&AgentConfigs, AgentError> {
200        self.data()
201            .spec
202            .configs
203            .as_ref()
204            .ok_or(AgentError::NoConfig)
205    }
206
207    fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError> {
208        if let Some(configs) = &mut self.mut_data().spec.configs {
209            configs.set(key, value);
210            self.configs_changed()?;
211        }
212        Ok(())
213    }
214
215    fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError> {
216        self.mut_data().spec.configs = Some(configs);
217        self.configs_changed()
218    }
219
220    fn stream_id(&self) -> &str {
221        &self.data().stream_id
222    }
223
224    fn set_stream_id(&mut self, stream_id: String) {
225        self.mut_data().stream_id = stream_id;
226    }
227
228    async fn start(&mut self) -> Result<(), AgentError> {
229        self.mut_data().status = AgentStatus::Start;
230
231        if let Err(e) = self.start().await {
232            self.askit()
233                .emit_agent_error(self.id().to_string(), e.to_string());
234            return Err(e);
235        }
236
237        Ok(())
238    }
239
240    async fn stop(&mut self) -> Result<(), AgentError> {
241        self.mut_data().status = AgentStatus::Stop;
242        self.stop().await?;
243        self.mut_data().status = AgentStatus::Init;
244        Ok(())
245    }
246
247    async fn process(
248        &mut self,
249        ctx: AgentContext,
250        pin: String,
251        value: AgentValue,
252    ) -> Result<(), AgentError> {
253        if let Err(e) = self.process(ctx.clone(), pin, value).await {
254            self.askit()
255                .emit_agent_error(self.id().to_string(), e.to_string());
256            self.askit()
257                .send_agent_out(
258                    self.id().to_string(),
259                    ctx,
260                    "err".to_string(),
261                    AgentValue::Error(Arc::new(e.clone())),
262                )
263                .await
264                .unwrap_or_else(|e| {
265                    log::error!("Failed to send error message for {}: {}", self.id(), e);
266                });
267            return Err(e);
268        }
269        Ok(())
270    }
271
272    fn get_global_configs(&self) -> Option<AgentConfigs> {
273        self.askit().get_global_configs(self.def_name())
274    }
275
276    fn as_any(&self) -> &dyn Any {
277        self
278    }
279
280    fn as_any_mut(&mut self) -> &mut dyn Any {
281        self
282    }
283}
284
285pub fn new_agent_boxed<T: Agent>(
286    askit: ASKit,
287    id: String,
288    spec: AgentSpec,
289) -> Result<Box<dyn Agent>, AgentError> {
290    Ok(Box::new(T::new(askit, id, spec)?))
291}
292
293pub fn agent_new(
294    askit: ASKit,
295    agent_id: String,
296    spec: AgentSpec,
297) -> Result<Box<dyn Agent>, AgentError> {
298    let def;
299    {
300        let def_name = &spec.def_name;
301        let defs = askit.defs.lock().unwrap();
302        def = defs
303            .get(def_name)
304            .ok_or_else(|| AgentError::UnknownDefName(def_name.to_string()))?
305            .clone();
306    }
307
308    if let Some(new_boxed) = def.new_boxed {
309        return new_boxed(askit, agent_id, spec);
310    }
311
312    match def.kind.as_str() {
313        // "Command" => {
314        //     return new_boxed::<super::builtins::CommandAgent>(
315        //         askit,
316        //         agent_id,
317        //         def_name.to_string(),
318        //         config,
319        //     );
320        // }
321        _ => return Err(AgentError::UnknownDefKind(def.kind.to_string()).into()),
322    }
323}