agent_stream_kit/
agent.rs

1use std::any::Any;
2
3use async_trait::async_trait;
4
5use crate::askit::ASKit;
6use crate::config::AgentConfigs;
7use crate::context::AgentContext;
8use crate::error::AgentError;
9use crate::runtime::runtime;
10use crate::spec::AgentSpec;
11use crate::value::AgentValue;
12
13#[derive(Debug, Default, Clone, PartialEq)]
14pub enum AgentStatus {
15    #[default]
16    Init,
17    Start,
18    Stop,
19}
20
21pub enum AgentMessage {
22    Input {
23        ctx: AgentContext,
24        pin: String,
25        value: AgentValue,
26    },
27    Config {
28        configs: AgentConfigs,
29    },
30    Stop,
31}
32
33/// The core trait for all agents.
34#[async_trait]
35pub trait Agent: Send + Sync + 'static {
36    fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError>
37    where
38        Self: Sized;
39
40    fn askit(&self) -> &ASKit;
41
42    fn id(&self) -> &str;
43
44    fn status(&self) -> &AgentStatus;
45
46    fn spec(&self) -> &AgentSpec;
47
48    fn def_name(&self) -> &str;
49
50    fn configs(&self) -> Result<&AgentConfigs, AgentError>;
51
52    fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError>;
53
54    fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError>;
55
56    fn get_global_configs(&self) -> Option<AgentConfigs> {
57        self.askit().get_global_configs(self.def_name())
58    }
59
60    fn stream_id(&self) -> &str;
61
62    fn set_stream_id(&mut self, stream_id: String);
63
64    async fn start(&mut self) -> Result<(), AgentError>;
65
66    async fn stop(&mut self) -> Result<(), AgentError>;
67
68    async fn process(
69        &mut self,
70        ctx: AgentContext,
71        pin: String,
72        value: AgentValue,
73    ) -> Result<(), AgentError>;
74
75    fn runtime(&self) -> &tokio::runtime::Runtime {
76        runtime()
77    }
78
79    fn as_any(&self) -> &dyn Any;
80
81    fn as_any_mut(&mut self) -> &mut dyn Any;
82}
83
84impl dyn Agent {
85    pub fn as_agent<T: Agent>(&self) -> Option<&T> {
86        self.as_any().downcast_ref::<T>()
87    }
88
89    pub fn as_agent_mut<T: Agent>(&mut self) -> Option<&mut T> {
90        self.as_any_mut().downcast_mut::<T>()
91    }
92}
93
94/// The core data structure for an agent.
95pub struct AgentData {
96    /// The ASKit instance.
97    pub askit: ASKit,
98
99    /// The unique identifier for the agent.
100    pub id: String,
101
102    /// The specification of the agent.
103    pub spec: AgentSpec,
104
105    /// The stream identifier for the agent.
106    /// Empty string when the agent does not belong to any stream.
107    pub stream_id: String,
108
109    /// The current status of the agent.
110    pub status: AgentStatus,
111}
112
113impl AgentData {
114    pub fn new(askit: ASKit, id: String, spec: AgentSpec) -> Self {
115        Self {
116            askit,
117            id,
118            spec,
119            stream_id: String::new(),
120            status: AgentStatus::Init,
121        }
122    }
123}
124
125pub trait HasAgentData {
126    fn data(&self) -> &AgentData;
127
128    fn mut_data(&mut self) -> &mut AgentData;
129}
130
131#[async_trait]
132pub trait AsAgent: HasAgentData + Send + Sync + 'static {
133    fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError>
134    where
135        Self: Sized;
136
137    fn configs_changed(&mut self) -> Result<(), AgentError> {
138        Ok(())
139    }
140
141    async fn start(&mut self) -> Result<(), AgentError> {
142        Ok(())
143    }
144
145    async fn stop(&mut self) -> Result<(), AgentError> {
146        Ok(())
147    }
148
149    async fn process(
150        &mut self,
151        _ctx: AgentContext,
152        _pin: String,
153        _value: AgentValue,
154    ) -> Result<(), AgentError> {
155        Ok(())
156    }
157}
158
159#[async_trait]
160impl<T: AsAgent> Agent for T {
161    fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError> {
162        let mut agent = T::new(askit, id, spec)?;
163        agent.mut_data().status = AgentStatus::Init;
164        Ok(agent)
165    }
166
167    fn askit(&self) -> &ASKit {
168        &self.data().askit
169    }
170
171    fn id(&self) -> &str {
172        &self.data().id
173    }
174
175    fn spec(&self) -> &AgentSpec {
176        &self.data().spec
177    }
178
179    fn status(&self) -> &AgentStatus {
180        &self.data().status
181    }
182
183    fn def_name(&self) -> &str {
184        self.data().spec.def_name.as_str()
185    }
186
187    fn configs(&self) -> Result<&AgentConfigs, AgentError> {
188        self.data()
189            .spec
190            .configs
191            .as_ref()
192            .ok_or(AgentError::NoConfig)
193    }
194
195    fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError> {
196        if let Some(configs) = &mut self.mut_data().spec.configs {
197            configs.set(key.clone(), value.clone());
198            self.configs_changed()?;
199        }
200        Ok(())
201    }
202
203    fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError> {
204        self.mut_data().spec.configs = Some(configs);
205        self.configs_changed()
206    }
207
208    fn stream_id(&self) -> &str {
209        &self.data().stream_id
210    }
211
212    fn set_stream_id(&mut self, stream_id: String) {
213        self.mut_data().stream_id = stream_id.clone();
214    }
215
216    async fn start(&mut self) -> Result<(), AgentError> {
217        self.mut_data().status = AgentStatus::Start;
218
219        if let Err(e) = self.start().await {
220            self.askit()
221                .emit_agent_error(self.id().to_string(), e.to_string());
222            return Err(e);
223        }
224
225        Ok(())
226    }
227
228    async fn stop(&mut self) -> Result<(), AgentError> {
229        self.mut_data().status = AgentStatus::Stop;
230        self.stop().await?;
231        self.mut_data().status = AgentStatus::Init;
232        Ok(())
233    }
234
235    async fn process(
236        &mut self,
237        ctx: AgentContext,
238        pin: String,
239        value: AgentValue,
240    ) -> Result<(), AgentError> {
241        if let Err(e) = self.process(ctx, pin, value).await {
242            self.askit()
243                .emit_agent_error(self.id().to_string(), e.to_string());
244            return Err(e);
245        }
246        Ok(())
247    }
248
249    fn get_global_configs(&self) -> Option<AgentConfigs> {
250        self.askit().get_global_configs(self.def_name())
251    }
252
253    fn as_any(&self) -> &dyn Any {
254        self
255    }
256
257    fn as_any_mut(&mut self) -> &mut dyn Any {
258        self
259    }
260}
261
262pub fn new_agent_boxed<T: Agent>(
263    askit: ASKit,
264    id: String,
265    spec: AgentSpec,
266) -> Result<Box<dyn Agent>, AgentError> {
267    Ok(Box::new(T::new(askit, id, spec)?))
268}
269
270pub fn agent_new(
271    askit: ASKit,
272    agent_id: String,
273    spec: AgentSpec,
274) -> Result<Box<dyn Agent>, AgentError> {
275    let def;
276    {
277        let def_name = &spec.def_name;
278        let defs = askit.defs.lock().unwrap();
279        def = defs
280            .get(def_name)
281            .ok_or_else(|| AgentError::UnknownDefName(def_name.to_string()))?
282            .clone();
283    }
284
285    if let Some(new_boxed) = def.new_boxed {
286        return new_boxed(askit, agent_id, spec);
287    }
288
289    match def.kind.as_str() {
290        // "Command" => {
291        //     return new_boxed::<super::builtins::CommandAgent>(
292        //         askit,
293        //         agent_id,
294        //         def_name.to_string(),
295        //         config,
296        //     );
297        // }
298        _ => return Err(AgentError::UnknownDefKind(def.kind.to_string()).into()),
299    }
300}