agent_stream_kit/
agent.rs

1use std::any::Any;
2
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5
6use crate::askit::ASKit;
7use crate::config::AgentConfigs;
8use crate::context::AgentContext;
9use crate::definition::{AgentConfigSpecs, AgentDisplayConfigSpecs};
10use crate::error::AgentError;
11use crate::runtime::runtime;
12use crate::value::AgentValue;
13
14#[derive(Debug, Default, Clone, PartialEq)]
15pub enum AgentStatus {
16    #[default]
17    Init,
18    Start,
19    Stop,
20}
21
22pub enum AgentMessage {
23    Input {
24        ctx: AgentContext,
25        pin: String,
26        value: AgentValue,
27    },
28    Config {
29        configs: AgentConfigs,
30    },
31    Stop,
32}
33
34/// The core trait for all agents.
35#[async_trait]
36pub trait Agent: Send + Sync + 'static {
37    fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError>
38    where
39        Self: Sized;
40
41    fn askit(&self) -> &ASKit;
42
43    fn id(&self) -> &str;
44
45    fn status(&self) -> &AgentStatus;
46
47    fn spec(&self) -> &AgentSpec;
48
49    fn def_name(&self) -> &str;
50
51    fn configs(&self) -> Result<&AgentConfigs, AgentError>;
52
53    fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError>;
54
55    fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError>;
56
57    fn get_global_configs(&self) -> Option<AgentConfigs> {
58        self.askit().get_global_configs(self.def_name())
59    }
60
61    fn flow_id(&self) -> &str;
62
63    fn set_flow_id(&mut self, flow_id: String);
64
65    async fn start(&mut self) -> Result<(), AgentError>;
66
67    async fn stop(&mut self) -> Result<(), AgentError>;
68
69    async fn process(
70        &mut self,
71        ctx: AgentContext,
72        pin: String,
73        value: AgentValue,
74    ) -> Result<(), AgentError>;
75
76    fn runtime(&self) -> &tokio::runtime::Runtime {
77        runtime()
78    }
79
80    fn as_any(&self) -> &dyn Any;
81
82    fn as_any_mut(&mut self) -> &mut dyn Any;
83}
84
85impl dyn Agent {
86    pub fn as_agent<T: Agent>(&self) -> Option<&T> {
87        self.as_any().downcast_ref::<T>()
88    }
89
90    pub fn as_agent_mut<T: Agent>(&mut self) -> Option<&mut T> {
91        self.as_any_mut().downcast_mut::<T>()
92    }
93}
94
95/// The core data structure for an agent.
96pub struct AgentData {
97    /// The ASKit instance.
98    pub askit: ASKit,
99
100    /// The unique identifier for the agent.
101    pub id: String,
102
103    /// The specification of the agent.
104    pub spec: AgentSpec,
105
106    /// The flow identifier for the agent.
107    /// Empty string when the agent does not belong to any flow.
108    pub flow_id: String,
109
110    /// The current status of the agent.
111    pub status: AgentStatus,
112}
113
114impl AgentData {
115    pub fn new(askit: ASKit, id: String, spec: AgentSpec) -> Self {
116        Self {
117            askit,
118            id,
119            spec,
120            flow_id: String::new(),
121            status: AgentStatus::Init,
122        }
123    }
124}
125
126/// Information held by each agent.
127#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct AgentSpec {
129    /// Name of the AgentDefinition.
130    pub def_name: String,
131
132    /// List of input pin names.
133    #[serde(skip_serializing_if = "Option::is_none", default)]
134    pub inputs: Option<Vec<String>>,
135
136    /// List of output pin names.
137    #[serde(skip_serializing_if = "Option::is_none", default)]
138    pub outputs: Option<Vec<String>>,
139
140    /// Config values.
141    #[serde(skip_serializing_if = "Option::is_none")]
142    pub configs: Option<AgentConfigs>,
143
144    /// Config specs.
145    #[serde(skip_serializing_if = "Option::is_none")]
146    pub config_specs: Option<AgentConfigSpecs>,
147
148    /// Display config specs.
149    #[serde(skip_serializing_if = "Option::is_none")]
150    pub display_config_specs: Option<AgentDisplayConfigSpecs>,
151}
152
153pub trait HasAgentData {
154    fn data(&self) -> &AgentData;
155
156    fn mut_data(&mut self) -> &mut AgentData;
157}
158
159#[async_trait]
160pub trait AsAgent: HasAgentData + Send + Sync + 'static {
161    fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError>
162    where
163        Self: Sized;
164
165    fn configs_changed(&mut self) -> Result<(), AgentError> {
166        Ok(())
167    }
168
169    async fn start(&mut self) -> Result<(), AgentError> {
170        Ok(())
171    }
172
173    async fn stop(&mut self) -> Result<(), AgentError> {
174        Ok(())
175    }
176
177    async fn process(
178        &mut self,
179        _ctx: AgentContext,
180        _pin: String,
181        _value: AgentValue,
182    ) -> Result<(), AgentError> {
183        Ok(())
184    }
185}
186
187#[async_trait]
188impl<T: AsAgent> Agent for T {
189    fn new(askit: ASKit, id: String, spec: AgentSpec) -> Result<Self, AgentError> {
190        let mut agent = T::new(askit, id, spec)?;
191        agent.mut_data().status = AgentStatus::Init;
192        Ok(agent)
193    }
194
195    fn askit(&self) -> &ASKit {
196        &self.data().askit
197    }
198
199    fn id(&self) -> &str {
200        &self.data().id
201    }
202
203    fn spec(&self) -> &AgentSpec {
204        &self.data().spec
205    }
206
207    fn status(&self) -> &AgentStatus {
208        &self.data().status
209    }
210
211    fn def_name(&self) -> &str {
212        self.data().spec.def_name.as_str()
213    }
214
215    fn configs(&self) -> Result<&AgentConfigs, AgentError> {
216        self.data()
217            .spec
218            .configs
219            .as_ref()
220            .ok_or(AgentError::NoConfig)
221    }
222
223    fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError> {
224        if let Some(configs) = &mut self.mut_data().spec.configs {
225            configs.set(key.clone(), value.clone());
226            self.configs_changed()?;
227        }
228        Ok(())
229    }
230
231    fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError> {
232        self.mut_data().spec.configs = Some(configs);
233        self.configs_changed()
234    }
235
236    fn flow_id(&self) -> &str {
237        &self.data().flow_id
238    }
239
240    fn set_flow_id(&mut self, flow_id: String) {
241        self.mut_data().flow_id = flow_id.clone();
242    }
243
244    async fn start(&mut self) -> Result<(), AgentError> {
245        self.mut_data().status = AgentStatus::Start;
246
247        if let Err(e) = self.start().await {
248            self.askit()
249                .emit_agent_error(self.id().to_string(), e.to_string());
250            return Err(e);
251        }
252
253        Ok(())
254    }
255
256    async fn stop(&mut self) -> Result<(), AgentError> {
257        self.mut_data().status = AgentStatus::Stop;
258        self.stop().await?;
259        self.mut_data().status = AgentStatus::Init;
260        Ok(())
261    }
262
263    async fn process(
264        &mut self,
265        ctx: AgentContext,
266        pin: String,
267        value: AgentValue,
268    ) -> Result<(), AgentError> {
269        if let Err(e) = self.process(ctx, pin, value).await {
270            self.askit()
271                .emit_agent_error(self.id().to_string(), e.to_string());
272            return Err(e);
273        }
274        Ok(())
275    }
276
277    fn get_global_configs(&self) -> Option<AgentConfigs> {
278        self.askit().get_global_configs(self.def_name())
279    }
280
281    fn as_any(&self) -> &dyn Any {
282        self
283    }
284
285    fn as_any_mut(&mut self) -> &mut dyn Any {
286        self
287    }
288}
289
290pub fn new_agent_boxed<T: Agent>(
291    askit: ASKit,
292    id: String,
293    spec: AgentSpec,
294) -> Result<Box<dyn Agent>, AgentError> {
295    Ok(Box::new(T::new(askit, id, spec)?))
296}
297
298pub fn agent_new(
299    askit: ASKit,
300    agent_id: String,
301    spec: AgentSpec,
302) -> Result<Box<dyn Agent>, AgentError> {
303    let def;
304    {
305        let def_name = &spec.def_name;
306        let defs = askit.defs.lock().unwrap();
307        def = defs
308            .get(def_name)
309            .ok_or_else(|| AgentError::UnknownDefName(def_name.to_string()))?
310            .clone();
311    }
312
313    if let Some(new_boxed) = def.new_boxed {
314        return new_boxed(askit, agent_id, spec);
315    }
316
317    match def.kind.as_str() {
318        // "Command" => {
319        //     return new_boxed::<super::builtins::CommandAgent>(
320        //         askit,
321        //         agent_id,
322        //         def_name.to_string(),
323        //         config,
324        //     );
325        // }
326        _ => return Err(AgentError::UnknownDefKind(def.kind.to_string()).into()),
327    }
328}