agent_stream_kit/
agent.rs

1use async_trait::async_trait;
2
3use crate::AgentValue;
4
5use super::askit::ASKit;
6use super::config::AgentConfigs;
7use super::context::AgentContext;
8use super::data::AgentData;
9use super::error::AgentError;
10use super::runtime::runtime;
11
12#[derive(Debug, Default, Clone, PartialEq)]
13pub enum AgentStatus {
14    #[default]
15    Init,
16    Start,
17    Stop,
18}
19
20pub enum AgentMessage {
21    Input {
22        ctx: AgentContext,
23        pin: String,
24        data: AgentData,
25    },
26    Config {
27        configs: AgentConfigs,
28    },
29    Stop,
30}
31
32#[async_trait]
33pub trait Agent {
34    fn new(
35        askit: ASKit,
36        id: String,
37        def_name: String,
38        configs: Option<AgentConfigs>,
39    ) -> Result<Self, AgentError>
40    where
41        Self: Sized;
42
43    fn askit(&self) -> &ASKit;
44
45    fn id(&self) -> &str;
46
47    fn status(&self) -> &AgentStatus;
48
49    fn def_name(&self) -> &str;
50
51    fn configs(&self) -> Result<&AgentConfigs, AgentError>;
52
53    fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError>;
54
55    fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError>;
56
57    fn get_global_configs(&self) -> Option<AgentConfigs> {
58        self.askit().get_global_configs(self.def_name())
59    }
60
61    fn flow_name(&self) -> &str;
62
63    fn set_flow_name(&mut self, flow_name: String);
64
65    fn start(&mut self) -> Result<(), AgentError>;
66
67    fn stop(&mut self) -> Result<(), AgentError>;
68
69    async fn process(
70        &mut self,
71        ctx: AgentContext,
72        pin: String,
73        data: AgentData,
74    ) -> Result<(), AgentError>;
75
76    fn runtime(&self) -> &tokio::runtime::Runtime {
77        runtime()
78    }
79}
80
81pub struct AsAgentData {
82    pub askit: ASKit,
83
84    pub id: String,
85    pub status: AgentStatus,
86    pub def_name: String,
87    pub flow_name: String,
88    pub configs: Option<AgentConfigs>,
89}
90
91impl AsAgentData {
92    pub fn new(askit: ASKit, id: String, def_name: String, configs: Option<AgentConfigs>) -> Self {
93        Self {
94            askit,
95            id,
96            status: AgentStatus::Init,
97            def_name,
98            flow_name: String::new(),
99            configs,
100        }
101    }
102}
103
104#[async_trait]
105pub trait AsAgent {
106    fn new(
107        askit: ASKit,
108        id: String,
109        def_name: String,
110        configs: Option<AgentConfigs>,
111    ) -> Result<Self, AgentError>
112    where
113        Self: Sized + Send + Sync;
114
115    fn data(&self) -> &AsAgentData;
116
117    fn mut_data(&mut self) -> &mut AsAgentData;
118
119    fn configs_changed(&mut self) -> Result<(), AgentError> {
120        Ok(())
121    }
122
123    fn start(&mut self) -> Result<(), AgentError> {
124        Ok(())
125    }
126
127    fn stop(&mut self) -> Result<(), AgentError> {
128        Ok(())
129    }
130
131    async fn process(
132        &mut self,
133        _ctx: AgentContext,
134        _pin: String,
135        _data: AgentData,
136    ) -> Result<(), AgentError> {
137        Ok(())
138    }
139}
140
141#[async_trait]
142impl<T: AsAgent + Send + Sync> Agent for T {
143    fn new(
144        askit: ASKit,
145        id: String,
146        def_name: String,
147        configs: Option<AgentConfigs>,
148    ) -> Result<Self, AgentError> {
149        let mut agent = T::new(askit, id, def_name, configs)?;
150        agent.mut_data().status = AgentStatus::Init;
151        Ok(agent)
152    }
153
154    fn askit(&self) -> &ASKit {
155        &self.data().askit
156    }
157
158    fn id(&self) -> &str {
159        &self.data().id
160    }
161
162    fn status(&self) -> &AgentStatus {
163        &self.data().status
164    }
165
166    fn def_name(&self) -> &str {
167        self.data().def_name.as_str()
168    }
169
170    fn configs(&self) -> Result<&AgentConfigs, AgentError> {
171        self.data().configs.as_ref().ok_or(AgentError::NoConfig)
172    }
173
174    fn set_config(&mut self, key: String, value: AgentValue) -> Result<(), AgentError> {
175        if let Some(configs) = &mut self.mut_data().configs {
176            configs.set(key.clone(), value.clone());
177            self.configs_changed()?;
178        }
179        Ok(())
180    }
181
182    fn set_configs(&mut self, configs: AgentConfigs) -> Result<(), AgentError> {
183        self.mut_data().configs = Some(configs);
184        self.configs_changed()
185    }
186
187    fn flow_name(&self) -> &str {
188        &self.data().flow_name
189    }
190
191    fn set_flow_name(&mut self, flow_name: String) {
192        self.mut_data().flow_name = flow_name.clone();
193    }
194
195    fn start(&mut self) -> Result<(), AgentError> {
196        self.mut_data().status = AgentStatus::Start;
197
198        if let Err(e) = self.start() {
199            self.askit()
200                .emit_agent_error(self.id().to_string(), e.to_string());
201            return Err(e);
202        }
203
204        Ok(())
205    }
206
207    fn stop(&mut self) -> Result<(), AgentError> {
208        self.mut_data().status = AgentStatus::Stop;
209        self.stop()?;
210        self.mut_data().status = AgentStatus::Init;
211        Ok(())
212    }
213
214    async fn process(
215        &mut self,
216        ctx: AgentContext,
217        pin: String,
218        data: AgentData,
219    ) -> Result<(), AgentError> {
220        if let Err(e) = self.process(ctx, pin, data).await {
221            self.askit()
222                .emit_agent_error(self.id().to_string(), e.to_string());
223            return Err(e);
224        }
225        Ok(())
226    }
227
228    fn get_global_configs(&self) -> Option<AgentConfigs> {
229        self.askit().get_global_configs(self.def_name())
230    }
231}
232
233pub fn new_agent_boxed<T: Agent + Send + Sync + 'static>(
234    askit: ASKit,
235    id: String,
236    def_name: String,
237    configs: Option<AgentConfigs>,
238) -> Result<Box<dyn Agent + Send + Sync>, AgentError> {
239    Ok(Box::new(T::new(askit, id, def_name, configs)?))
240}
241
242pub fn agent_new(
243    askit: ASKit,
244    agent_id: String,
245    def_name: &str,
246    configs: Option<AgentConfigs>,
247) -> Result<Box<dyn Agent + Send + Sync>, AgentError> {
248    let def;
249    {
250        let defs = askit.defs.lock().unwrap();
251        def = defs
252            .get(def_name)
253            .ok_or_else(|| AgentError::UnknownDefName(def_name.to_string()))?
254            .clone();
255    }
256
257    let default_config = def.default_configs.clone();
258    let configs = match (default_config, configs) {
259        (Some(def_cfg), Some(mut cfg)) => {
260            for (k, v) in def_cfg.iter() {
261                if !cfg.contains_key(k) {
262                    cfg.set(k.clone(), v.value.clone());
263                }
264            }
265            Some(cfg)
266        }
267        (Some(def_cfg), None) => {
268            let mut cfg = AgentConfigs::default();
269            for (k, v) in def_cfg.iter() {
270                cfg.set(k.clone(), v.value.clone());
271            }
272            Some(cfg)
273        }
274        (None, Some(cfg)) => Some(cfg),
275        (None, None) => None,
276    };
277
278    if let Some(new_boxed) = def.new_boxed {
279        return new_boxed(askit, agent_id, def_name.to_string(), configs);
280    }
281
282    match def.kind.as_str() {
283        // "Command" => {
284        //     return new_boxed::<super::builtins::CommandAgent>(
285        //         askit,
286        //         agent_id,
287        //         def_name.to_string(),
288        //         config,
289        //     );
290        // }
291        _ => return Err(AgentError::UnknownDefKind(def.kind.to_string()).into()),
292    }
293}