agent_stream_kit/
agent.rs

1use std::collections::BTreeMap;
2
3use async_trait::async_trait;
4
5use super::askit::ASKit;
6use super::config::AgentConfigs;
7use super::context::AgentContext;
8use super::error::AgentError;
9use super::runtime::runtime;
10use super::value::AgentValue;
11
12#[derive(Debug, Default, Clone, PartialEq)]
13pub enum AgentStatus {
14    #[default]
15    Init,
16    Start,
17    Stop,
18}
19
20pub enum AgentMessage {
21    Input {
22        ctx: AgentContext,
23        pin: String,
24        value: AgentValue,
25    },
26    Config {
27        configs: AgentConfigs,
28    },
29    Stop,
30}
31
32pub struct Pin {
33    pub name: String,
34    pub value: AgentValue,
35}
36
37#[async_trait]
38pub trait Agent {
39    fn new(
40        askit: ASKit,
41        id: String,
42        def_name: String,
43        configs: Option<AgentConfigs>,
44    ) -> Result<Self, AgentError>
45    where
46        Self: Sized;
47
48    fn askit(&self) -> &ASKit;
49
50    fn id(&self) -> &str;
51
52    fn status(&self) -> &AgentStatus;
53
54    fn def_name(&self) -> &str;
55
56    fn out_pin(&self, name: &str) -> Option<&Pin>;
57
58    fn set_out_pin(&mut self, name: String, value: AgentValue);
59
60    fn configs(&self) -> Result<&AgentConfigs, AgentError>;
61
62    fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError>;
63
64    fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError>;
65
66    fn get_global_configs(&self) -> Option<AgentConfigs> {
67        self.askit().get_global_configs(self.def_name())
68    }
69
70    fn flow_name(&self) -> &str;
71
72    fn set_flow_name(&mut self, flow_name: String);
73
74    async fn start(&mut self) -> Result<(), AgentError>;
75
76    async fn stop(&mut self) -> Result<(), AgentError>;
77
78    async fn process(
79        &mut self,
80        ctx: AgentContext,
81        pin: String,
82        value: AgentValue,
83    ) -> Result<(), AgentError>;
84
85    fn runtime(&self) -> &tokio::runtime::Runtime {
86        runtime()
87    }
88}
89
90pub struct AsAgentData {
91    pub askit: ASKit,
92
93    pub id: String,
94    pub status: AgentStatus,
95    pub def_name: String,
96    pub flow_name: String,
97    pub out_pins: Option<BTreeMap<String, Pin>>,
98    pub configs: Option<AgentConfigs>,
99}
100
101impl AsAgentData {
102    pub fn new(askit: ASKit, id: String, def_name: String, configs: Option<AgentConfigs>) -> Self {
103        Self {
104            askit,
105            id,
106            status: AgentStatus::Init,
107            def_name,
108            flow_name: String::new(),
109            out_pins: None,
110            configs,
111        }
112    }
113}
114
115#[async_trait]
116pub trait AsAgent {
117    fn new(
118        askit: ASKit,
119        id: String,
120        def_name: String,
121        configs: Option<AgentConfigs>,
122    ) -> Result<Self, AgentError>
123    where
124        Self: Sized + Send + Sync;
125
126    fn data(&self) -> &AsAgentData;
127
128    fn mut_data(&mut self) -> &mut AsAgentData;
129
130    fn configs_changed(&mut self) -> Result<(), AgentError> {
131        Ok(())
132    }
133
134    async fn start(&mut self) -> Result<(), AgentError> {
135        Ok(())
136    }
137
138    async fn stop(&mut self) -> Result<(), AgentError> {
139        Ok(())
140    }
141
142    async fn process(
143        &mut self,
144        _ctx: AgentContext,
145        _pin: String,
146        _value: AgentValue,
147    ) -> Result<(), AgentError> {
148        Ok(())
149    }
150}
151
152#[async_trait]
153impl<T: AsAgent + Send + Sync> Agent for T {
154    fn new(
155        askit: ASKit,
156        id: String,
157        def_name: String,
158        configs: Option<AgentConfigs>,
159    ) -> Result<Self, AgentError> {
160        let mut agent = T::new(askit, id, def_name, configs)?;
161        agent.mut_data().status = AgentStatus::Init;
162        Ok(agent)
163    }
164
165    fn askit(&self) -> &ASKit {
166        &self.data().askit
167    }
168
169    fn id(&self) -> &str {
170        &self.data().id
171    }
172
173    fn status(&self) -> &AgentStatus {
174        &self.data().status
175    }
176
177    fn def_name(&self) -> &str {
178        self.data().def_name.as_str()
179    }
180
181    fn out_pin(&self, name: &str) -> Option<&Pin> {
182        if let Some(out_pins) = &self.data().out_pins {
183            return out_pins.get(name);
184        }
185        None
186    }
187
188    fn set_out_pin(&mut self, name: String, value: AgentValue) {
189        if let Some(out_pins) = &mut self.mut_data().out_pins {
190            out_pins.insert(name.clone(), Pin { name, value });
191        } else {
192            let mut out_pins = BTreeMap::new();
193            out_pins.insert(name.clone(), Pin { name, value });
194            self.mut_data().out_pins = Some(out_pins);
195        }
196    }
197
198    fn configs(&self) -> Result<&AgentConfigs, AgentError> {
199        self.data().configs.as_ref().ok_or(AgentError::NoConfig)
200    }
201
202    fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError> {
203        if let Some(configs) = &mut self.mut_data().configs {
204            configs.set(key.clone(), value.clone());
205            self.configs_changed()?;
206        }
207        Ok(())
208    }
209
210    fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError> {
211        self.mut_data().configs = Some(configs);
212        self.configs_changed()
213    }
214
215    fn flow_name(&self) -> &str {
216        &self.data().flow_name
217    }
218
219    fn set_flow_name(&mut self, flow_name: String) {
220        self.mut_data().flow_name = flow_name.clone();
221    }
222
223    async fn start(&mut self) -> Result<(), AgentError> {
224        self.mut_data().status = AgentStatus::Start;
225
226        if let Err(e) = self.start().await {
227            self.askit()
228                .emit_agent_error(self.id().to_string(), e.to_string());
229            return Err(e);
230        }
231
232        Ok(())
233    }
234
235    async fn stop(&mut self) -> Result<(), AgentError> {
236        self.mut_data().status = AgentStatus::Stop;
237        self.stop().await?;
238        self.mut_data().status = AgentStatus::Init;
239        Ok(())
240    }
241
242    async fn process(
243        &mut self,
244        ctx: AgentContext,
245        pin: String,
246        value: AgentValue,
247    ) -> Result<(), AgentError> {
248        if let Err(e) = self.process(ctx, pin, value).await {
249            self.askit()
250                .emit_agent_error(self.id().to_string(), e.to_string());
251            return Err(e);
252        }
253        Ok(())
254    }
255
256    fn get_global_configs(&self) -> Option<AgentConfigs> {
257        self.askit().get_global_configs(self.def_name())
258    }
259}
260
261pub fn new_agent_boxed<T: Agent + Send + Sync + 'static>(
262    askit: ASKit,
263    id: String,
264    def_name: String,
265    configs: Option<AgentConfigs>,
266) -> Result<Box<dyn Agent + Send + Sync>, AgentError> {
267    Ok(Box::new(T::new(askit, id, def_name, configs)?))
268}
269
270pub fn agent_new(
271    askit: ASKit,
272    agent_id: String,
273    def_name: &str,
274    configs: Option<AgentConfigs>,
275) -> Result<Box<dyn Agent + Send + Sync>, AgentError> {
276    let def;
277    {
278        let defs = askit.defs.lock().unwrap();
279        def = defs
280            .get(def_name)
281            .ok_or_else(|| AgentError::UnknownDefName(def_name.to_string()))?
282            .clone();
283    }
284
285    let default_config = def.default_configs.clone();
286    let configs = match (default_config, configs) {
287        (Some(def_cfg), Some(mut cfg)) => {
288            for (k, v) in def_cfg.iter() {
289                if !cfg.contains_key(k) {
290                    cfg.set(k.clone(), v.value.clone());
291                }
292            }
293            Some(cfg)
294        }
295        (Some(def_cfg), None) => {
296            let mut cfg = AgentConfigs::default();
297            for (k, v) in def_cfg.iter() {
298                cfg.set(k.clone(), v.value.clone());
299            }
300            Some(cfg)
301        }
302        (None, Some(cfg)) => Some(cfg),
303        (None, None) => None,
304    };
305
306    if let Some(new_boxed) = def.new_boxed {
307        return new_boxed(askit, agent_id, def_name.to_string(), configs);
308    }
309
310    match def.kind.as_str() {
311        // "Command" => {
312        //     return new_boxed::<super::builtins::CommandAgent>(
313        //         askit,
314        //         agent_id,
315        //         def_name.to_string(),
316        //         config,
317        //     );
318        // }
319        _ => return Err(AgentError::UnknownDefKind(def.kind.to_string()).into()),
320    }
321}