agent_stream_kit/
agent.rs

1use std::any::Any;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5
6use crate::askit::ASKit;
7use crate::config::AgentConfigs;
8use crate::context::AgentContext;
9use crate::error::AgentError;
10use crate::runtime::runtime;
11use crate::spec::AgentSpec;
12use crate::value::AgentValue;
13
14#[derive(Debug, Default, Clone, PartialEq)]
15pub enum AgentStatus {
16    #[default]
17    Init,
18    Start,
19    Stop,
20}
21
22pub enum AgentMessage {
23    Input {
24        ctx: AgentContext,
25        pin: String,
26        value: AgentValue,
27    },
28    Config {
29        configs: AgentConfigs,
30    },
31    Stop,
32}
33
34/// The core trait for all agents.
35#[async_trait]
36pub trait Agent: Send + Sync + 'static {
37    fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError>
38    where
39        Self: Sized;
40
41    fn askit(&self) -> &ASKit;
42
43    fn id(&self) -> &str;
44
45    fn status(&self) -> &AgentStatus;
46
47    fn spec(&self) -> &AgentSpec;
48
49    fn def_name(&self) -> &str;
50
51    fn configs(&self) -> Result<&AgentConfigs, AgentError>;
52
53    fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError>;
54
55    fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError>;
56
57    fn get_global_configs(&self) -> Option<AgentConfigs> {
58        self.askit().get_global_configs(self.def_name())
59    }
60
61    fn stream_id(&self) -> &str;
62
63    fn set_stream_id(&mut self, stream_id: String);
64
65    async fn start(&mut self) -> Result<(), AgentError>;
66
67    async fn stop(&mut self) -> Result<(), AgentError>;
68
69    async fn process(
70        &mut self,
71        ctx: AgentContext,
72        pin: String,
73        value: AgentValue,
74    ) -> Result<(), AgentError>;
75
76    fn runtime(&self) -> &tokio::runtime::Runtime {
77        runtime()
78    }
79
80    fn as_any(&self) -> &dyn Any;
81
82    fn as_any_mut(&mut self) -> &mut dyn Any;
83}
84
85impl dyn Agent {
86    pub fn as_agent<T: Agent>(&self) -> Option<&T> {
87        self.as_any().downcast_ref::<T>()
88    }
89
90    pub fn as_agent_mut<T: Agent>(&mut self) -> Option<&mut T> {
91        self.as_any_mut().downcast_mut::<T>()
92    }
93}
94
95/// The core data structure for an agent.
96pub struct AgentData {
97    /// The ASKit instance.
98    pub askit: ASKit,
99
100    /// The unique identifier for the agent.
101    pub id: String,
102
103    /// The specification of the agent.
104    pub spec: AgentSpec,
105
106    /// The stream identifier for the agent.
107    /// Empty string when the agent does not belong to any stream.
108    pub stream_id: String,
109
110    /// The current status of the agent.
111    pub status: AgentStatus,
112}
113
114impl AgentData {
115    pub fn new(askit: ASKit, id: String, spec: AgentSpec) -> Self {
116        Self {
117            askit,
118            id,
119            spec,
120            stream_id: String::new(),
121            status: AgentStatus::Init,
122        }
123    }
124}
125
126pub trait HasAgentData {
127    fn data(&self) -> &AgentData;
128
129    fn mut_data(&mut self) -> &mut AgentData;
130}
131
132#[async_trait]
133pub trait AsAgent: HasAgentData + Send + Sync + 'static {
134    fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError>
135    where
136        Self: Sized;
137
138    fn configs_changed(&mut self) -> Result<(), AgentError> {
139        Ok(())
140    }
141
142    async fn start(&mut self) -> Result<(), AgentError> {
143        Ok(())
144    }
145
146    async fn stop(&mut self) -> Result<(), AgentError> {
147        Ok(())
148    }
149
150    async fn process(
151        &mut self,
152        _ctx: AgentContext,
153        _pin: String,
154        _value: AgentValue,
155    ) -> Result<(), AgentError> {
156        Ok(())
157    }
158}
159
160#[async_trait]
161impl<T: AsAgent> Agent for T {
162    fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError> {
163        let mut agent = T::new(askit, id, spec)?;
164        agent.mut_data().status = AgentStatus::Init;
165        Ok(agent)
166    }
167
168    fn askit(&self) -> &ASKit {
169        &self.data().askit
170    }
171
172    fn id(&self) -> &str {
173        &self.data().id
174    }
175
176    fn spec(&self) -> &AgentSpec {
177        &self.data().spec
178    }
179
180    fn status(&self) -> &AgentStatus {
181        &self.data().status
182    }
183
184    fn def_name(&self) -> &str {
185        self.data().spec.def_name.as_str()
186    }
187
188    fn configs(&self) -> Result<&AgentConfigs, AgentError> {
189        self.data()
190            .spec
191            .configs
192            .as_ref()
193            .ok_or(AgentError::NoConfig)
194    }
195
196    fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError> {
197        if let Some(configs) = &mut self.mut_data().spec.configs {
198            configs.set(key.clone(), value.clone());
199            self.configs_changed()?;
200        }
201        Ok(())
202    }
203
204    fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError> {
205        self.mut_data().spec.configs = Some(configs);
206        self.configs_changed()
207    }
208
209    fn stream_id(&self) -> &str {
210        &self.data().stream_id
211    }
212
213    fn set_stream_id(&mut self, stream_id: String) {
214        self.mut_data().stream_id = stream_id.clone();
215    }
216
217    async fn start(&mut self) -> Result<(), AgentError> {
218        self.mut_data().status = AgentStatus::Start;
219
220        if let Err(e) = self.start().await {
221            self.askit()
222                .emit_agent_error(self.id().to_string(), e.to_string());
223            return Err(e);
224        }
225
226        Ok(())
227    }
228
229    async fn stop(&mut self) -> Result<(), AgentError> {
230        self.mut_data().status = AgentStatus::Stop;
231        self.stop().await?;
232        self.mut_data().status = AgentStatus::Init;
233        Ok(())
234    }
235
236    async fn process(
237        &mut self,
238        ctx: AgentContext,
239        pin: String,
240        value: AgentValue,
241    ) -> Result<(), AgentError> {
242        if let Err(e) = self.process(ctx.clone(), pin, value).await {
243            self.askit()
244                .emit_agent_error(self.id().to_string(), e.to_string());
245            self.askit()
246                .send_agent_out(
247                    self.id().to_string(),
248                    ctx,
249                    "err".to_string(),
250                    AgentValue::Error(Arc::new(e.clone())),
251                )
252                .await
253                .unwrap_or_else(|e| {
254                    log::error!("Failed to send error message for {}: {}", self.id(), e);
255                });
256            return Err(e);
257        }
258        Ok(())
259    }
260
261    fn get_global_configs(&self) -> Option<AgentConfigs> {
262        self.askit().get_global_configs(self.def_name())
263    }
264
265    fn as_any(&self) -> &dyn Any {
266        self
267    }
268
269    fn as_any_mut(&mut self) -> &mut dyn Any {
270        self
271    }
272}
273
274pub fn new_agent_boxed<T: Agent>(
275    askit: ASKit,
276    id: String,
277    spec: AgentSpec,
278) -> Result<Box<dyn Agent>, AgentError> {
279    Ok(Box::new(T::new(askit, id, spec)?))
280}
281
282pub fn agent_new(
283    askit: ASKit,
284    agent_id: String,
285    spec: AgentSpec,
286) -> Result<Box<dyn Agent>, AgentError> {
287    let def;
288    {
289        let def_name = &spec.def_name;
290        let defs = askit.defs.lock().unwrap();
291        def = defs
292            .get(def_name)
293            .ok_or_else(|| AgentError::UnknownDefName(def_name.to_string()))?
294            .clone();
295    }
296
297    if let Some(new_boxed) = def.new_boxed {
298        return new_boxed(askit, agent_id, spec);
299    }
300
301    match def.kind.as_str() {
302        // "Command" => {
303        //     return new_boxed::<super::builtins::CommandAgent>(
304        //         askit,
305        //         agent_id,
306        //         def_name.to_string(),
307        //         config,
308        //     );
309        // }
310        _ => return Err(AgentError::UnknownDefKind(def.kind.to_string()).into()),
311    }
312}