agent_stream_kit/
agent.rs

1use std::any::Any;
2
3use async_trait::async_trait;
4
5use super::askit::ASKit;
6use super::config::AgentConfigs;
7use super::context::AgentContext;
8use super::error::AgentError;
9use super::runtime::runtime;
10use super::value::AgentValue;
11
12#[derive(Debug, Default, Clone, PartialEq)]
13pub enum AgentStatus {
14    #[default]
15    Init,
16    Start,
17    Stop,
18}
19
20pub enum AgentMessage {
21    Input {
22        ctx: AgentContext,
23        pin: String,
24        value: AgentValue,
25    },
26    Config {
27        configs: AgentConfigs,
28    },
29    Stop,
30}
31
32#[async_trait]
33pub trait Agent: Send + Sync + 'static {
34    fn new(
35        askit: ASKit,
36        id: String,
37        def_name: String,
38        configs: Option<AgentConfigs>,
39    ) -> Result<Self, AgentError>
40    where
41        Self: Sized;
42
43    fn askit(&self) -> &ASKit;
44
45    fn id(&self) -> &str;
46
47    fn status(&self) -> &AgentStatus;
48
49    fn def_name(&self) -> &str;
50
51    fn configs(&self) -> Result<&AgentConfigs, AgentError>;
52
53    fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError>;
54
55    fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError>;
56
57    fn get_global_configs(&self) -> Option<AgentConfigs> {
58        self.askit().get_global_configs(self.def_name())
59    }
60
61    fn flow_id(&self) -> &str;
62
63    fn set_flow_id(&mut self, flow_id: String);
64
65    async fn start(&mut self) -> Result<(), AgentError>;
66
67    async fn stop(&mut self) -> Result<(), AgentError>;
68
69    async fn process(
70        &mut self,
71        ctx: AgentContext,
72        pin: String,
73        value: AgentValue,
74    ) -> Result<(), AgentError>;
75
76    fn runtime(&self) -> &tokio::runtime::Runtime {
77        runtime()
78    }
79
80    fn as_any(&self) -> &dyn Any;
81
82    fn as_any_mut(&mut self) -> &mut dyn Any;
83}
84
85impl dyn Agent {
86    pub fn as_agent<T: Agent>(&self) -> Option<&T> {
87        self.as_any().downcast_ref::<T>()
88    }
89
90    pub fn as_agent_mut<T: Agent>(&mut self) -> Option<&mut T> {
91        self.as_any_mut().downcast_mut::<T>()
92    }
93}
94
95pub struct AgentData {
96    pub askit: ASKit,
97
98    pub id: String,
99    pub status: AgentStatus,
100    pub def_name: String,
101    pub flow_id: String,
102    pub configs: Option<AgentConfigs>,
103}
104
105impl AgentData {
106    pub fn new(askit: ASKit, id: String, def_name: String, configs: Option<AgentConfigs>) -> Self {
107        Self {
108            askit,
109            id,
110            status: AgentStatus::Init,
111            def_name,
112            flow_id: String::new(),
113            configs,
114        }
115    }
116}
117
118pub trait HasAgentData {
119    fn data(&self) -> &AgentData;
120
121    fn mut_data(&mut self) -> &mut AgentData;
122}
123
124#[async_trait]
125pub trait AsAgent: HasAgentData + Send + Sync + 'static {
126    fn new(
127        askit: ASKit,
128        id: String,
129        def_name: String,
130        configs: Option<AgentConfigs>,
131    ) -> Result<Self, AgentError>
132    where
133        Self: Sized;
134
135    fn configs_changed(&mut self) -> Result<(), AgentError> {
136        Ok(())
137    }
138
139    async fn start(&mut self) -> Result<(), AgentError> {
140        Ok(())
141    }
142
143    async fn stop(&mut self) -> Result<(), AgentError> {
144        Ok(())
145    }
146
147    async fn process(
148        &mut self,
149        _ctx: AgentContext,
150        _pin: String,
151        _value: AgentValue,
152    ) -> Result<(), AgentError> {
153        Ok(())
154    }
155}
156
157#[async_trait]
158impl<T: AsAgent> Agent for T {
159    fn new(
160        askit: ASKit,
161        id: String,
162        def_name: String,
163        configs: Option<AgentConfigs>,
164    ) -> Result<Self, AgentError> {
165        let mut agent = T::new(askit, id, def_name, configs)?;
166        agent.mut_data().status = AgentStatus::Init;
167        Ok(agent)
168    }
169
170    fn askit(&self) -> &ASKit {
171        &self.data().askit
172    }
173
174    fn id(&self) -> &str {
175        &self.data().id
176    }
177
178    fn status(&self) -> &AgentStatus {
179        &self.data().status
180    }
181
182    fn def_name(&self) -> &str {
183        self.data().def_name.as_str()
184    }
185
186    fn configs(&self) -> Result<&AgentConfigs, AgentError> {
187        self.data().configs.as_ref().ok_or(AgentError::NoConfig)
188    }
189
190    fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError> {
191        if let Some(configs) = &mut self.mut_data().configs {
192            configs.set(key.clone(), value.clone());
193            self.configs_changed()?;
194        }
195        Ok(())
196    }
197
198    fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError> {
199        self.mut_data().configs = Some(configs);
200        self.configs_changed()
201    }
202
203    fn flow_id(&self) -> &str {
204        &self.data().flow_id
205    }
206
207    fn set_flow_id(&mut self, flow_id: String) {
208        self.mut_data().flow_id = flow_id.clone();
209    }
210
211    async fn start(&mut self) -> Result<(), AgentError> {
212        self.mut_data().status = AgentStatus::Start;
213
214        if let Err(e) = self.start().await {
215            self.askit()
216                .emit_agent_error(self.id().to_string(), e.to_string());
217            return Err(e);
218        }
219
220        Ok(())
221    }
222
223    async fn stop(&mut self) -> Result<(), AgentError> {
224        self.mut_data().status = AgentStatus::Stop;
225        self.stop().await?;
226        self.mut_data().status = AgentStatus::Init;
227        Ok(())
228    }
229
230    async fn process(
231        &mut self,
232        ctx: AgentContext,
233        pin: String,
234        value: AgentValue,
235    ) -> Result<(), AgentError> {
236        if let Err(e) = self.process(ctx, pin, value).await {
237            self.askit()
238                .emit_agent_error(self.id().to_string(), e.to_string());
239            return Err(e);
240        }
241        Ok(())
242    }
243
244    fn get_global_configs(&self) -> Option<AgentConfigs> {
245        self.askit().get_global_configs(self.def_name())
246    }
247
248    fn as_any(&self) -> &dyn Any {
249        self
250    }
251
252    fn as_any_mut(&mut self) -> &mut dyn Any {
253        self
254    }
255}
256
257pub fn new_agent_boxed<T: Agent>(
258    askit: ASKit,
259    id: String,
260    def_name: String,
261    configs: Option<AgentConfigs>,
262) -> Result<Box<dyn Agent>, AgentError> {
263    Ok(Box::new(T::new(askit, id, def_name, configs)?))
264}
265
266pub fn agent_new(
267    askit: ASKit,
268    agent_id: String,
269    def_name: &str,
270    configs: Option<AgentConfigs>,
271) -> Result<Box<dyn Agent>, AgentError> {
272    let def;
273    {
274        let defs = askit.defs.lock().unwrap();
275        def = defs
276            .get(def_name)
277            .ok_or_else(|| AgentError::UnknownDefName(def_name.to_string()))?
278            .clone();
279    }
280
281    let default_config = def.default_configs.clone();
282    let configs = match (default_config, configs) {
283        (Some(def_cfg), Some(mut cfg)) => {
284            for (k, v) in def_cfg.iter() {
285                if !cfg.contains_key(k) {
286                    cfg.set(k.clone(), v.value.clone());
287                }
288            }
289            Some(cfg)
290        }
291        (Some(def_cfg), None) => {
292            let mut cfg = AgentConfigs::default();
293            for (k, v) in def_cfg.iter() {
294                cfg.set(k.clone(), v.value.clone());
295            }
296            Some(cfg)
297        }
298        (None, Some(cfg)) => Some(cfg),
299        (None, None) => None,
300    };
301
302    if let Some(new_boxed) = def.new_boxed {
303        return new_boxed(askit, agent_id, def_name.to_string(), configs);
304    }
305
306    match def.kind.as_str() {
307        // "Command" => {
308        //     return new_boxed::<super::builtins::CommandAgent>(
309        //         askit,
310        //         agent_id,
311        //         def_name.to_string(),
312        //         config,
313        //     );
314        // }
315        _ => return Err(AgentError::UnknownDefKind(def.kind.to_string()).into()),
316    }
317}