agent_stream_kit/
agent.rs

1use std::collections::BTreeMap;
2
3use async_trait::async_trait;
4
5use super::askit::ASKit;
6use super::config::AgentConfigs;
7use super::context::AgentContext;
8use super::error::AgentError;
9use super::runtime::runtime;
10use super::value::AgentValue;
11
12#[derive(Debug, Default, Clone, PartialEq)]
13pub enum AgentStatus {
14    #[default]
15    Init,
16    Start,
17    Stop,
18}
19
20pub enum AgentMessage {
21    Input {
22        ctx: AgentContext,
23        pin: String,
24        value: AgentValue,
25    },
26    Config {
27        configs: AgentConfigs,
28    },
29    Stop,
30}
31
32pub struct Pin {
33    pub name: String,
34    pub value: AgentValue,
35}
36
37#[async_trait]
38pub trait Agent {
39    fn new(
40        askit: ASKit,
41        id: String,
42        def_name: String,
43        configs: Option<AgentConfigs>,
44    ) -> Result<Self, AgentError>
45    where
46        Self: Sized;
47
48    fn askit(&self) -> &ASKit;
49
50    fn id(&self) -> &str;
51
52    fn status(&self) -> &AgentStatus;
53
54    fn def_name(&self) -> &str;
55
56    fn out_pin(&self, name: &str) -> Option<&Pin>;
57
58    fn set_out_pin(&mut self, name: String, value: AgentValue);
59
60    fn configs(&self) -> Result<&AgentConfigs, AgentError>;
61
62    fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError>;
63
64    fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError>;
65
66    fn get_global_configs(&self) -> Option<AgentConfigs> {
67        self.askit().get_global_configs(self.def_name())
68    }
69
70    fn flow_name(&self) -> &str;
71
72    fn set_flow_name(&mut self, flow_name: String);
73
74    async fn start(&mut self) -> Result<(), AgentError>;
75
76    async fn stop(&mut self) -> Result<(), AgentError>;
77
78    async fn process(
79        &mut self,
80        ctx: AgentContext,
81        pin: String,
82        value: AgentValue,
83    ) -> Result<(), AgentError>;
84
85    fn runtime(&self) -> &tokio::runtime::Runtime {
86        runtime()
87    }
88}
89
90pub struct AsAgentData {
91    pub askit: ASKit,
92
93    pub id: String,
94    pub status: AgentStatus,
95    pub def_name: String,
96    pub flow_name: String,
97    pub out_pins: Option<BTreeMap<String, Pin>>,
98    pub configs: Option<AgentConfigs>,
99}
100
101impl AsAgentData {
102    pub fn new(askit: ASKit, id: String, def_name: String, configs: Option<AgentConfigs>) -> Self {
103        Self {
104            askit,
105            id,
106            status: AgentStatus::Init,
107            def_name,
108            flow_name: String::new(),
109            out_pins: None,
110            configs,
111        }
112    }
113}
114
115pub trait HasAgentData {
116    fn data(&self) -> &AsAgentData;
117
118    fn mut_data(&mut self) -> &mut AsAgentData;
119}
120
121#[async_trait]
122pub trait AsAgent: HasAgentData {
123    fn new(
124        askit: ASKit,
125        id: String,
126        def_name: String,
127        configs: Option<AgentConfigs>,
128    ) -> Result<Self, AgentError>
129    where
130        Self: Sized + Send + Sync;
131
132    fn configs_changed(&mut self) -> Result<(), AgentError> {
133        Ok(())
134    }
135
136    async fn start(&mut self) -> Result<(), AgentError> {
137        Ok(())
138    }
139
140    async fn stop(&mut self) -> Result<(), AgentError> {
141        Ok(())
142    }
143
144    async fn process(
145        &mut self,
146        _ctx: AgentContext,
147        _pin: String,
148        _value: AgentValue,
149    ) -> Result<(), AgentError> {
150        Ok(())
151    }
152}
153
154#[async_trait]
155impl<T: AsAgent + Send + Sync> Agent for T {
156    fn new(
157        askit: ASKit,
158        id: String,
159        def_name: String,
160        configs: Option<AgentConfigs>,
161    ) -> Result<Self, AgentError> {
162        let mut agent = T::new(askit, id, def_name, configs)?;
163        agent.mut_data().status = AgentStatus::Init;
164        Ok(agent)
165    }
166
167    fn askit(&self) -> &ASKit {
168        &self.data().askit
169    }
170
171    fn id(&self) -> &str {
172        &self.data().id
173    }
174
175    fn status(&self) -> &AgentStatus {
176        &self.data().status
177    }
178
179    fn def_name(&self) -> &str {
180        self.data().def_name.as_str()
181    }
182
183    fn out_pin(&self, name: &str) -> Option<&Pin> {
184        if let Some(out_pins) = &self.data().out_pins {
185            return out_pins.get(name);
186        }
187        None
188    }
189
190    fn set_out_pin(&mut self, name: String, value: AgentValue) {
191        if let Some(out_pins) = &mut self.mut_data().out_pins {
192            out_pins.insert(name.clone(), Pin { name, value });
193        } else {
194            let mut out_pins = BTreeMap::new();
195            out_pins.insert(name.clone(), Pin { name, value });
196            self.mut_data().out_pins = Some(out_pins);
197        }
198    }
199
200    fn configs(&self) -> Result<&AgentConfigs, AgentError> {
201        self.data().configs.as_ref().ok_or(AgentError::NoConfig)
202    }
203
204    fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError> {
205        if let Some(configs) = &mut self.mut_data().configs {
206            configs.set(key.clone(), value.clone());
207            self.configs_changed()?;
208        }
209        Ok(())
210    }
211
212    fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError> {
213        self.mut_data().configs = Some(configs);
214        self.configs_changed()
215    }
216
217    fn flow_name(&self) -> &str {
218        &self.data().flow_name
219    }
220
221    fn set_flow_name(&mut self, flow_name: String) {
222        self.mut_data().flow_name = flow_name.clone();
223    }
224
225    async fn start(&mut self) -> Result<(), AgentError> {
226        self.mut_data().status = AgentStatus::Start;
227
228        if let Err(e) = self.start().await {
229            self.askit()
230                .emit_agent_error(self.id().to_string(), e.to_string());
231            return Err(e);
232        }
233
234        Ok(())
235    }
236
237    async fn stop(&mut self) -> Result<(), AgentError> {
238        self.mut_data().status = AgentStatus::Stop;
239        self.stop().await?;
240        self.mut_data().status = AgentStatus::Init;
241        Ok(())
242    }
243
244    async fn process(
245        &mut self,
246        ctx: AgentContext,
247        pin: String,
248        value: AgentValue,
249    ) -> Result<(), AgentError> {
250        if let Err(e) = self.process(ctx, pin, value).await {
251            self.askit()
252                .emit_agent_error(self.id().to_string(), e.to_string());
253            return Err(e);
254        }
255        Ok(())
256    }
257
258    fn get_global_configs(&self) -> Option<AgentConfigs> {
259        self.askit().get_global_configs(self.def_name())
260    }
261}
262
263pub fn new_agent_boxed<T: Agent + Send + Sync + 'static>(
264    askit: ASKit,
265    id: String,
266    def_name: String,
267    configs: Option<AgentConfigs>,
268) -> Result<Box<dyn Agent + Send + Sync>, AgentError> {
269    Ok(Box::new(T::new(askit, id, def_name, configs)?))
270}
271
272pub fn agent_new(
273    askit: ASKit,
274    agent_id: String,
275    def_name: &str,
276    configs: Option<AgentConfigs>,
277) -> Result<Box<dyn Agent + Send + Sync>, AgentError> {
278    let def;
279    {
280        let defs = askit.defs.lock().unwrap();
281        def = defs
282            .get(def_name)
283            .ok_or_else(|| AgentError::UnknownDefName(def_name.to_string()))?
284            .clone();
285    }
286
287    let default_config = def.default_configs.clone();
288    let configs = match (default_config, configs) {
289        (Some(def_cfg), Some(mut cfg)) => {
290            for (k, v) in def_cfg.iter() {
291                if !cfg.contains_key(k) {
292                    cfg.set(k.clone(), v.value.clone());
293                }
294            }
295            Some(cfg)
296        }
297        (Some(def_cfg), None) => {
298            let mut cfg = AgentConfigs::default();
299            for (k, v) in def_cfg.iter() {
300                cfg.set(k.clone(), v.value.clone());
301            }
302            Some(cfg)
303        }
304        (None, Some(cfg)) => Some(cfg),
305        (None, None) => None,
306    };
307
308    if let Some(new_boxed) = def.new_boxed {
309        return new_boxed(askit, agent_id, def_name.to_string(), configs);
310    }
311
312    match def.kind.as_str() {
313        // "Command" => {
314        //     return new_boxed::<super::builtins::CommandAgent>(
315        //         askit,
316        //         agent_id,
317        //         def_name.to_string(),
318        //         config,
319        //     );
320        // }
321        _ => return Err(AgentError::UnknownDefKind(def.kind.to_string()).into()),
322    }
323}