agent_rt/
context.rs

1use crate::{Node, Output, Plan, RTError, Runtime, Service, END_NODE_CODE, END_RESULT_ERROR};
2use std::any::Any;
3use std::collections::{HashMap, VecDeque};
4use std::error::Error;
5use std::fmt::{Debug, Formatter};
6use std::sync::atomic::{AtomicU8, Ordering};
7use std::sync::{Arc, Mutex};
8use wd_tools::PFErr;
9
10pub struct Context {
11    pub parent_code: Option<String>,
12    //任务流名称
13    pub code: String,
14    //状态
15    pub status: AtomicU8, //0:init 1:running, 2:success, 3:error
16    //堆栈信息
17    pub stack: Arc<Mutex<ContextStack>>,
18    //执行计划
19    pub plan: Arc<dyn Plan>,
20    //全局扩展字段
21    pub extend: Mutex<HashMap<String, Box<dyn Any + Send + Sync + 'static>>>,
22    //结束时回调
23    pub over_callback: Option<Mutex<Vec<Box<dyn FnOnce(Arc<Context>) + Send + Sync + 'static>>>>,
24    //可能存在父亲流程
25    // pub(crate) parent_ctx:Option<Arc<Context>>,
26    // pub(crate) middle:VecDeque<Arc<dyn Service>>,
27    // pub(crate) nodes:Arc<dyn ServiceLoader>,
28    // pub(crate) waker:Arc<dyn WakerWaitPool>,
29    pub(crate) runtime: Arc<Runtime>,
30}
31// impl Drop for Context{
32//     fn drop(&mut self) {
33//         self.meta.set_status(CtxStatus::SUCCESS)
34//     }
35// }
36impl Debug for Context {
37    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
38        write!(
39            f,
40            "{}",
41            format!("code:{},status:{:?}", self.code, self.status)
42        )
43    }
44}
45
46pub struct Flow {
47    pub ctx: Arc<Context>,
48    pub code: String,
49    pub node_type_id: String, //类型节点id
50    pub node_config: String,  //类型节点配置
51    //中间流程
52    pub(crate) middle: VecDeque<Arc<dyn Service>>,
53}
54
55impl Debug for Flow {
56    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
57        write!(
58            f,
59            "ctx:{:?},code:{},node_type_id:{},node_config:{}",
60            self.ctx, self.code, self.node_type_id, self.node_config
61        )
62    }
63}
64
65#[derive(Debug, Default)]
66pub struct Meta {
67    pub status: AtomicU8, //0:init 1:running, 2:success, 3:error
68    pub stack: Arc<Mutex<ContextStack>>,
69}
70
71#[derive(Debug, Eq, PartialEq)]
72pub enum CtxStatus {
73    INIT,
74    RUNNING,
75    SUCCESS,
76    ERROR,
77}
78impl From<CtxStatus> for u8 {
79    fn from(value: CtxStatus) -> Self {
80        match value {
81            CtxStatus::INIT => 0u8,
82            CtxStatus::RUNNING => 1u8,
83            CtxStatus::SUCCESS => 2u8,
84            CtxStatus::ERROR => 3u8,
85        }
86    }
87}
88impl From<u8> for CtxStatus {
89    fn from(value: u8) -> Self {
90        match value {
91            0 => CtxStatus::INIT,
92            1 => CtxStatus::RUNNING,
93            2 => CtxStatus::SUCCESS,
94            _ => CtxStatus::ERROR,
95        }
96    }
97}
98
99#[derive(Debug)]
100pub struct ContextStack {
101    //start节点会固定占用一个栈位置
102    max_stack: usize,
103    round: usize,
104    //round,parent_id,node_id -> node_id
105    stack: Vec<(usize, String, String, String)>,
106}
107
108impl Context {
109    pub fn new<C: Into<String>, P: Plan + 'static>(
110        code: C,
111        plan: P,
112        runtime: Arc<Runtime>,
113    ) -> Self {
114        Self {
115            parent_code: None,
116            code: code.into(),
117            status: AtomicU8::default(),
118            stack: Arc::new(Mutex::new(Default::default())),
119            plan: Arc::new(plan),
120            extend: Mutex::new(Default::default()),
121            over_callback: None,
122            runtime,
123        }
124    }
125    pub fn sub_ctx<C: Into<String>, P: Plan + 'static>(&self, code: C, plan: P) -> Self {
126        let parent_code = self.code.clone();
127        Self::new(code, plan, self.runtime.clone())
128            .updates(|x| x.parent_code = Some(parent_code))
129    }
130    pub fn updates(mut self, f: impl FnOnce(&mut Self)) -> Self {
131        f(&mut self);
132        self
133    }
134    pub fn set<S: Into<String>, V: Any + Send + Sync>(&self, key: S, value: V) {
135        let mut lock = self.extend.lock().unwrap();
136        lock.insert(key.into(), Box::new(value));
137    }
138    pub fn exist(&self, key: &str) -> bool {
139        let lock = self.extend.lock().unwrap();
140        lock.contains_key(key)
141    }
142    pub fn get_opt<In: 'static, Out, F: FnOnce(Option<&mut In>) -> Out>(
143        &self,
144        key: &str,
145        function: F,
146    ) -> Out {
147        let mut lock = self.extend.lock().unwrap();
148        let val = match lock.get_mut(key) {
149            Some(o) => o,
150            None => return function(None),
151        };
152        let input = match val.downcast_mut::<In>() {
153            Some(o) => o,
154            None => return function(None),
155        };
156        return function(Some(input));
157    }
158    pub fn get<In: 'static, Out, F: FnOnce(&mut In) -> Out>(
159        &self,
160        key: &str,
161        function: F,
162    ) -> Option<Out> {
163        let mut lock = self.extend.lock().unwrap();
164        let val = lock.get_mut(key)?;
165        let input = val.downcast_mut::<In>()?;
166        let out = function(input);
167        Some(out)
168    }
169    pub fn set_box<S: Into<String>>(&self, key: S, value: Box<dyn Any + Send + Sync + 'static>) {
170        let mut lock = self.extend.lock().unwrap();
171        lock.insert(key.into(), value);
172    }
173    pub fn remove<V: Any>(&self, key: &str) -> Option<V> {
174        let mut lock = self.extend.lock().unwrap();
175        let val = lock.get(key)?;
176        let opt = val.downcast_ref::<V>();
177        if opt.is_none() {
178            return None;
179        }
180        let val = lock.remove(key).unwrap();
181        let box_val: Box<V> = val.downcast().unwrap();
182        return Some(*box_val);
183    }
184    pub fn push_callback(
185        mut self,
186        function: impl FnOnce(Arc<Context>) + Send + Sync + 'static,
187    ) -> Self {
188        if self.over_callback.is_none() {
189            self.over_callback = Some(Mutex::new(vec![]));
190        }
191        if let Some(ref lock) = self.over_callback {
192            let mut lock = lock.lock().unwrap();
193            lock.push(Box::new(function));
194        }
195        self
196    }
197    pub(crate) fn exec_over_callback(self: &Arc<Self>) {
198        if let Some(ref fs) = self.over_callback {
199            let mut lock = fs.lock().unwrap();
200            while let Some(function) = lock.pop() {
201                function(self.clone())
202            }
203        }
204    }
205    pub(crate) fn at_rt_waker_waiter(&self) {
206        if let Some(waker) = self.runtime.waker.remove(self.code.as_str()) {
207            waker.waker.wake_by_ref()
208        }
209    }
210    pub fn error_over(&self, err: impl Error) {
211        let err = format!("{}", err);
212        self.set(END_RESULT_ERROR, err);
213        //fixme cas
214        self.set_status(CtxStatus::ERROR);
215    }
216    pub fn end_over<V: Any + Send + Sync>(&self, val: Option<V>) {
217        if let Some(val) = val {
218            self.set(END_NODE_CODE, val);
219        }
220        //fixme cas
221        self.set_status(CtxStatus::SUCCESS);
222    }
223    pub fn end_output<V: Any>(&self) -> anyhow::Result<V> {
224        let status = self.status();
225        return match status {
226            CtxStatus::INIT | CtxStatus::RUNNING => anyhow::anyhow!("context is not over").err(),
227            CtxStatus::SUCCESS => {
228                if let Some(s) = self.remove::<V>(END_NODE_CODE) {
229                    Ok(s)
230                } else {
231                    anyhow::anyhow!("end output type abnormal").err()
232                }
233            }
234            CtxStatus::ERROR => {
235                let err: String = self
236                    .remove(END_RESULT_ERROR)
237                    .unwrap_or("nil error".to_string());
238                anyhow::Error::msg(err).err()
239            }
240        };
241    }
242
243    pub fn spawn<V: Any + Send + Sync>(self: Arc<Self>, args: V) -> anyhow::Result<()> {
244        self.runtime.clone().spawn(self, args)
245    }
246    pub async fn block_on<Out: Any, V: Any + Send + Sync>(
247        self: Arc<Self>,
248        args: V,
249    ) -> anyhow::Result<Out> {
250        self.runtime.clone().block_on(self, args).await
251    }
252
253    pub fn set_status(&self, status: CtxStatus) {
254        //fixme cas
255        self.status.store(status.into(), Ordering::Relaxed)
256    }
257    pub fn status(&self) -> CtxStatus {
258        let status = self.status.load(Ordering::Relaxed);
259        status.into()
260    }
261    pub fn push_stack_info<T: Into<String>, P: Into<String>, C: Into<String>>(
262        &self,
263        parent_ctx_code: T,
264        prev: P,
265        next: C,
266    ) {
267        let mut lock = self.stack.lock().unwrap();
268        lock.round += 1;
269        let round = lock.round;
270        lock.stack
271            .push((round, parent_ctx_code.into(), prev.into(), next.into()));
272    }
273    pub fn set_max_stack(&self, max: usize) {
274        let mut lock = self.stack.lock().unwrap();
275        lock.max_stack = max
276    }
277    pub fn used_stack(&self) -> usize {
278        let lock = self.stack.lock().unwrap();
279        return lock.round;
280    }
281    pub fn usable_stack(&self) -> usize {
282        let lock = self.stack.lock().unwrap();
283        if lock.max_stack > lock.round {
284            lock.max_stack - lock.round
285        } else {
286            0
287        }
288    }
289}
290
291impl Flow {
292    pub fn new(node: Node, ctx: Arc<Context>, middle: VecDeque<Arc<dyn Service>>) -> Self {
293        let Node {
294            code,
295            node_type_id,
296            node_config,
297            ..
298        } = node;
299        Self {
300            ctx,
301            code,
302            node_type_id,
303            node_config,
304            middle,
305        }
306    }
307    pub async fn call(mut self) -> anyhow::Result<Output> {
308        let opt = self.middle.pop_front();
309        let n = match opt {
310            None => return RTError::FlowLastNodeNil.anyhow(),
311            Some(n) => n,
312        };
313        n.call(self).await
314    }
315}
316
317impl Meta {}
318
319impl Default for ContextStack {
320    fn default() -> Self {
321        Self {
322            max_stack: 1024,
323            round: 0,
324            stack: vec![],
325        }
326    }
327}
328
329// impl ContextStack {
330//     pub fn push_stack_info<T:Into<String>,P:Into<String>,C:Into<String>>(&mut self,parent_ctx_code:T,prev:P,next:C){
331//         self.round+=1;
332//         self.stack.push((lock.round,parent_ctx_code.into(),prev.into(),next.into()));
333//     }
334// }