1use crate::{Node, Output, Plan, RTError, Runtime, Service, END_NODE_CODE, END_RESULT_ERROR};
2use std::any::Any;
3use std::collections::{HashMap, VecDeque};
4use std::error::Error;
5use std::fmt::{Debug, Formatter};
6use std::sync::atomic::{AtomicU8, Ordering};
7use std::sync::{Arc, Mutex};
8use wd_tools::PFErr;
9
10pub struct Context {
11 pub parent_code: Option<String>,
12 pub code: String,
14 pub status: AtomicU8, pub stack: Arc<Mutex<ContextStack>>,
18 pub plan: Arc<dyn Plan>,
20 pub extend: Mutex<HashMap<String, Box<dyn Any + Send + Sync + 'static>>>,
22 pub over_callback: Option<Mutex<Vec<Box<dyn FnOnce(Arc<Context>) + Send + Sync + 'static>>>>,
24 pub(crate) runtime: Arc<Runtime>,
30}
31impl Debug for Context {
37 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
38 write!(
39 f,
40 "{}",
41 format!("code:{},status:{:?}", self.code, self.status)
42 )
43 }
44}
45
46pub struct Flow {
47 pub ctx: Arc<Context>,
48 pub code: String,
49 pub node_type_id: String, pub node_config: String, pub(crate) middle: VecDeque<Arc<dyn Service>>,
53}
54
55impl Debug for Flow {
56 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
57 write!(
58 f,
59 "ctx:{:?},code:{},node_type_id:{},node_config:{}",
60 self.ctx, self.code, self.node_type_id, self.node_config
61 )
62 }
63}
64
65#[derive(Debug, Default)]
66pub struct Meta {
67 pub status: AtomicU8, pub stack: Arc<Mutex<ContextStack>>,
69}
70
71#[derive(Debug, Eq, PartialEq)]
72pub enum CtxStatus {
73 INIT,
74 RUNNING,
75 SUCCESS,
76 ERROR,
77}
78impl From<CtxStatus> for u8 {
79 fn from(value: CtxStatus) -> Self {
80 match value {
81 CtxStatus::INIT => 0u8,
82 CtxStatus::RUNNING => 1u8,
83 CtxStatus::SUCCESS => 2u8,
84 CtxStatus::ERROR => 3u8,
85 }
86 }
87}
88impl From<u8> for CtxStatus {
89 fn from(value: u8) -> Self {
90 match value {
91 0 => CtxStatus::INIT,
92 1 => CtxStatus::RUNNING,
93 2 => CtxStatus::SUCCESS,
94 _ => CtxStatus::ERROR,
95 }
96 }
97}
98
99#[derive(Debug)]
100pub struct ContextStack {
101 max_stack: usize,
103 round: usize,
104 stack: Vec<(usize, String, String, String)>,
106}
107
108impl Context {
109 pub fn new<C: Into<String>, P: Plan + 'static>(
110 code: C,
111 plan: P,
112 runtime: Arc<Runtime>,
113 ) -> Self {
114 Self {
115 parent_code: None,
116 code: code.into(),
117 status: AtomicU8::default(),
118 stack: Arc::new(Mutex::new(Default::default())),
119 plan: Arc::new(plan),
120 extend: Mutex::new(Default::default()),
121 over_callback: None,
122 runtime,
123 }
124 }
125 pub fn sub_ctx<C: Into<String>, P: Plan + 'static>(&self, code: C, plan: P) -> Self {
126 let parent_code = self.code.clone();
127 Self::new(code, plan, self.runtime.clone())
128 .updates(|x| x.parent_code = Some(parent_code))
129 }
130 pub fn updates(mut self, f: impl FnOnce(&mut Self)) -> Self {
131 f(&mut self);
132 self
133 }
134 pub fn set<S: Into<String>, V: Any + Send + Sync>(&self, key: S, value: V) {
135 let mut lock = self.extend.lock().unwrap();
136 lock.insert(key.into(), Box::new(value));
137 }
138 pub fn exist(&self, key: &str) -> bool {
139 let lock = self.extend.lock().unwrap();
140 lock.contains_key(key)
141 }
142 pub fn get_opt<In: 'static, Out, F: FnOnce(Option<&mut In>) -> Out>(
143 &self,
144 key: &str,
145 function: F,
146 ) -> Out {
147 let mut lock = self.extend.lock().unwrap();
148 let val = match lock.get_mut(key) {
149 Some(o) => o,
150 None => return function(None),
151 };
152 let input = match val.downcast_mut::<In>() {
153 Some(o) => o,
154 None => return function(None),
155 };
156 return function(Some(input));
157 }
158 pub fn get<In: 'static, Out, F: FnOnce(&mut In) -> Out>(
159 &self,
160 key: &str,
161 function: F,
162 ) -> Option<Out> {
163 let mut lock = self.extend.lock().unwrap();
164 let val = lock.get_mut(key)?;
165 let input = val.downcast_mut::<In>()?;
166 let out = function(input);
167 Some(out)
168 }
169 pub fn set_box<S: Into<String>>(&self, key: S, value: Box<dyn Any + Send + Sync + 'static>) {
170 let mut lock = self.extend.lock().unwrap();
171 lock.insert(key.into(), value);
172 }
173 pub fn remove<V: Any>(&self, key: &str) -> Option<V> {
174 let mut lock = self.extend.lock().unwrap();
175 let val = lock.get(key)?;
176 let opt = val.downcast_ref::<V>();
177 if opt.is_none() {
178 return None;
179 }
180 let val = lock.remove(key).unwrap();
181 let box_val: Box<V> = val.downcast().unwrap();
182 return Some(*box_val);
183 }
184 pub fn push_callback(
185 mut self,
186 function: impl FnOnce(Arc<Context>) + Send + Sync + 'static,
187 ) -> Self {
188 if self.over_callback.is_none() {
189 self.over_callback = Some(Mutex::new(vec![]));
190 }
191 if let Some(ref lock) = self.over_callback {
192 let mut lock = lock.lock().unwrap();
193 lock.push(Box::new(function));
194 }
195 self
196 }
197 pub(crate) fn exec_over_callback(self: &Arc<Self>) {
198 if let Some(ref fs) = self.over_callback {
199 let mut lock = fs.lock().unwrap();
200 while let Some(function) = lock.pop() {
201 function(self.clone())
202 }
203 }
204 }
205 pub(crate) fn at_rt_waker_waiter(&self) {
206 if let Some(waker) = self.runtime.waker.remove(self.code.as_str()) {
207 waker.waker.wake_by_ref()
208 }
209 }
210 pub fn error_over(&self, err: impl Error) {
211 let err = format!("{}", err);
212 self.set(END_RESULT_ERROR, err);
213 self.set_status(CtxStatus::ERROR);
215 }
216 pub fn end_over<V: Any + Send + Sync>(&self, val: Option<V>) {
217 if let Some(val) = val {
218 self.set(END_NODE_CODE, val);
219 }
220 self.set_status(CtxStatus::SUCCESS);
222 }
223 pub fn end_output<V: Any>(&self) -> anyhow::Result<V> {
224 let status = self.status();
225 return match status {
226 CtxStatus::INIT | CtxStatus::RUNNING => anyhow::anyhow!("context is not over").err(),
227 CtxStatus::SUCCESS => {
228 if let Some(s) = self.remove::<V>(END_NODE_CODE) {
229 Ok(s)
230 } else {
231 anyhow::anyhow!("end output type abnormal").err()
232 }
233 }
234 CtxStatus::ERROR => {
235 let err: String = self
236 .remove(END_RESULT_ERROR)
237 .unwrap_or("nil error".to_string());
238 anyhow::Error::msg(err).err()
239 }
240 };
241 }
242
243 pub fn spawn<V: Any + Send + Sync>(self: Arc<Self>, args: V) -> anyhow::Result<()> {
244 self.runtime.clone().spawn(self, args)
245 }
246 pub async fn block_on<Out: Any, V: Any + Send + Sync>(
247 self: Arc<Self>,
248 args: V,
249 ) -> anyhow::Result<Out> {
250 self.runtime.clone().block_on(self, args).await
251 }
252
253 pub fn set_status(&self, status: CtxStatus) {
254 self.status.store(status.into(), Ordering::Relaxed)
256 }
257 pub fn status(&self) -> CtxStatus {
258 let status = self.status.load(Ordering::Relaxed);
259 status.into()
260 }
261 pub fn push_stack_info<T: Into<String>, P: Into<String>, C: Into<String>>(
262 &self,
263 parent_ctx_code: T,
264 prev: P,
265 next: C,
266 ) {
267 let mut lock = self.stack.lock().unwrap();
268 lock.round += 1;
269 let round = lock.round;
270 lock.stack
271 .push((round, parent_ctx_code.into(), prev.into(), next.into()));
272 }
273 pub fn set_max_stack(&self, max: usize) {
274 let mut lock = self.stack.lock().unwrap();
275 lock.max_stack = max
276 }
277 pub fn used_stack(&self) -> usize {
278 let lock = self.stack.lock().unwrap();
279 return lock.round;
280 }
281 pub fn usable_stack(&self) -> usize {
282 let lock = self.stack.lock().unwrap();
283 if lock.max_stack > lock.round {
284 lock.max_stack - lock.round
285 } else {
286 0
287 }
288 }
289}
290
291impl Flow {
292 pub fn new(node: Node, ctx: Arc<Context>, middle: VecDeque<Arc<dyn Service>>) -> Self {
293 let Node {
294 code,
295 node_type_id,
296 node_config,
297 ..
298 } = node;
299 Self {
300 ctx,
301 code,
302 node_type_id,
303 node_config,
304 middle,
305 }
306 }
307 pub async fn call(mut self) -> anyhow::Result<Output> {
308 let opt = self.middle.pop_front();
309 let n = match opt {
310 None => return RTError::FlowLastNodeNil.anyhow(),
311 Some(n) => n,
312 };
313 n.call(self).await
314 }
315}
316
317impl Meta {}
318
319impl Default for ContextStack {
320 fn default() -> Self {
321 Self {
322 max_stack: 1024,
323 round: 0,
324 stack: vec![],
325 }
326 }
327}
328
329