agent_stream_kit/
agent.rs

1use std::any::Any;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5
6use crate::askit::ASKit;
7use crate::config::AgentConfigs;
8use crate::context::AgentContext;
9use crate::error::AgentError;
10use crate::runtime::runtime;
11use crate::spec::AgentSpec;
12use crate::value::AgentValue;
13
14#[derive(Debug, Default, Clone, PartialEq)]
15pub enum AgentStatus {
16    #[default]
17    Init,
18    Start,
19    Stop,
20}
21
22pub enum AgentMessage {
23    Input {
24        ctx: AgentContext,
25        pin: String,
26        value: AgentValue,
27    },
28    Config {
29        key: String,
30        value: AgentValue,
31    },
32    Configs {
33        configs: AgentConfigs,
34    },
35    Stop,
36}
37
38/// The core trait for all agents.
39#[async_trait]
40pub trait Agent: Send + Sync + 'static {
41    fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError>
42    where
43        Self: Sized;
44
45    fn askit(&self) -> &ASKit;
46
47    fn id(&self) -> &str;
48
49    fn status(&self) -> &AgentStatus;
50
51    fn spec(&self) -> &AgentSpec;
52
53    fn def_name(&self) -> &str;
54
55    fn configs(&self) -> Result<&AgentConfigs, AgentError>;
56
57    fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError>;
58
59    fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError>;
60
61    fn get_global_configs(&self) -> Option<AgentConfigs> {
62        self.askit().get_global_configs(self.def_name())
63    }
64
65    fn stream_id(&self) -> &str;
66
67    fn set_stream_id(&mut self, stream_id: String);
68
69    async fn start(&mut self) -> Result<(), AgentError>;
70
71    async fn stop(&mut self) -> Result<(), AgentError>;
72
73    async fn process(
74        &mut self,
75        ctx: AgentContext,
76        pin: String,
77        value: AgentValue,
78    ) -> Result<(), AgentError>;
79
80    fn runtime(&self) -> &tokio::runtime::Runtime {
81        runtime()
82    }
83
84    fn as_any(&self) -> &dyn Any;
85
86    fn as_any_mut(&mut self) -> &mut dyn Any;
87}
88
89impl dyn Agent {
90    pub fn as_agent<T: Agent>(&self) -> Option<&T> {
91        self.as_any().downcast_ref::<T>()
92    }
93
94    pub fn as_agent_mut<T: Agent>(&mut self) -> Option<&mut T> {
95        self.as_any_mut().downcast_mut::<T>()
96    }
97}
98
99/// The core data structure for an agent.
100pub struct AgentData {
101    /// The ASKit instance.
102    pub askit: ASKit,
103
104    /// The unique identifier for the agent.
105    pub id: String,
106
107    /// The specification of the agent.
108    pub spec: AgentSpec,
109
110    /// The stream identifier for the agent.
111    /// Empty string when the agent does not belong to any stream.
112    pub stream_id: String,
113
114    /// The current status of the agent.
115    pub status: AgentStatus,
116}
117
118impl AgentData {
119    pub fn new(askit: ASKit, id: String, spec: AgentSpec) -> Self {
120        Self {
121            askit,
122            id,
123            spec,
124            stream_id: String::new(),
125            status: AgentStatus::Init,
126        }
127    }
128}
129
130pub trait HasAgentData {
131    fn data(&self) -> &AgentData;
132
133    fn mut_data(&mut self) -> &mut AgentData;
134}
135
136#[async_trait]
137pub trait AsAgent: HasAgentData + Send + Sync + 'static {
138    fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError>
139    where
140        Self: Sized;
141
142    fn configs_changed(&mut self) -> Result<(), AgentError> {
143        Ok(())
144    }
145
146    async fn start(&mut self) -> Result<(), AgentError> {
147        Ok(())
148    }
149
150    async fn stop(&mut self) -> Result<(), AgentError> {
151        Ok(())
152    }
153
154    async fn process(
155        &mut self,
156        _ctx: AgentContext,
157        _pin: String,
158        _value: AgentValue,
159    ) -> Result<(), AgentError> {
160        Ok(())
161    }
162}
163
164#[async_trait]
165impl<T: AsAgent> Agent for T {
166    fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError> {
167        let mut agent = T::new(askit, id, spec)?;
168        agent.mut_data().status = AgentStatus::Init;
169        Ok(agent)
170    }
171
172    fn askit(&self) -> &ASKit {
173        &self.data().askit
174    }
175
176    fn id(&self) -> &str {
177        &self.data().id
178    }
179
180    fn spec(&self) -> &AgentSpec {
181        &self.data().spec
182    }
183
184    fn status(&self) -> &AgentStatus {
185        &self.data().status
186    }
187
188    fn def_name(&self) -> &str {
189        self.data().spec.def_name.as_str()
190    }
191
192    fn configs(&self) -> Result<&AgentConfigs, AgentError> {
193        self.data()
194            .spec
195            .configs
196            .as_ref()
197            .ok_or(AgentError::NoConfig)
198    }
199
200    fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError> {
201        if let Some(configs) = &mut self.mut_data().spec.configs {
202            configs.set(key.clone(), value.clone());
203            self.configs_changed()?;
204        }
205        Ok(())
206    }
207
208    fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError> {
209        self.mut_data().spec.configs = Some(configs);
210        self.configs_changed()
211    }
212
213    fn stream_id(&self) -> &str {
214        &self.data().stream_id
215    }
216
217    fn set_stream_id(&mut self, stream_id: String) {
218        self.mut_data().stream_id = stream_id.clone();
219    }
220
221    async fn start(&mut self) -> Result<(), AgentError> {
222        self.mut_data().status = AgentStatus::Start;
223
224        if let Err(e) = self.start().await {
225            self.askit()
226                .emit_agent_error(self.id().to_string(), e.to_string());
227            return Err(e);
228        }
229
230        Ok(())
231    }
232
233    async fn stop(&mut self) -> Result<(), AgentError> {
234        self.mut_data().status = AgentStatus::Stop;
235        self.stop().await?;
236        self.mut_data().status = AgentStatus::Init;
237        Ok(())
238    }
239
240    async fn process(
241        &mut self,
242        ctx: AgentContext,
243        pin: String,
244        value: AgentValue,
245    ) -> Result<(), AgentError> {
246        if let Err(e) = self.process(ctx.clone(), pin, value).await {
247            self.askit()
248                .emit_agent_error(self.id().to_string(), e.to_string());
249            self.askit()
250                .send_agent_out(
251                    self.id().to_string(),
252                    ctx,
253                    "err".to_string(),
254                    AgentValue::Error(Arc::new(e.clone())),
255                )
256                .await
257                .unwrap_or_else(|e| {
258                    log::error!("Failed to send error message for {}: {}", self.id(), e);
259                });
260            return Err(e);
261        }
262        Ok(())
263    }
264
265    fn get_global_configs(&self) -> Option<AgentConfigs> {
266        self.askit().get_global_configs(self.def_name())
267    }
268
269    fn as_any(&self) -> &dyn Any {
270        self
271    }
272
273    fn as_any_mut(&mut self) -> &mut dyn Any {
274        self
275    }
276}
277
278pub fn new_agent_boxed<T: Agent>(
279    askit: ASKit,
280    id: String,
281    spec: AgentSpec,
282) -> Result<Box<dyn Agent>, AgentError> {
283    Ok(Box::new(T::new(askit, id, spec)?))
284}
285
286pub fn agent_new(
287    askit: ASKit,
288    agent_id: String,
289    spec: AgentSpec,
290) -> Result<Box<dyn Agent>, AgentError> {
291    let def;
292    {
293        let def_name = &spec.def_name;
294        let defs = askit.defs.lock().unwrap();
295        def = defs
296            .get(def_name)
297            .ok_or_else(|| AgentError::UnknownDefName(def_name.to_string()))?
298            .clone();
299    }
300
301    if let Some(new_boxed) = def.new_boxed {
302        return new_boxed(askit, agent_id, spec);
303    }
304
305    match def.kind.as_str() {
306        // "Command" => {
307        //     return new_boxed::<super::builtins::CommandAgent>(
308        //         askit,
309        //         agent_id,
310        //         def_name.to_string(),
311        //         config,
312        //     );
313        // }
314        _ => return Err(AgentError::UnknownDefKind(def.kind.to_string()).into()),
315    }
316}