pipeline_script/plugin/task.rs
1// use crate::context::{Context, ContextKey, ContextValue};
2// use crate::engine::Engine;
3// use crate::error::PipelineResult;
4// use crate::module::Module;
5// use crate::plugin::Plugin;
6// use crate::types::Value;
7// use std::sync::{Arc, Mutex, RwLock};
8// use std::thread;
9// use std::thread::JoinHandle;
10//
11//
12// pub struct TaskPlugin;
13// impl Plugin for TaskPlugin {
14// fn apply(e: &mut Engine) {
15// let mut m = Module::new("pipeline");
16// m.register_pipe_function("pipeline", |ctx, args| {
17// let pipeline_name = args.get(0).unwrap().as_string().unwrap();
18// let blocks = args
19// .get(1)
20// .unwrap()
21// .as_dynamic()
22// .as_fn_ptr()
23// .unwrap()
24// .fn_def
25// .unwrap()
26// .body;
27// let path: String = ctx.get_value("PIPELINE_NAME").unwrap().into();
28// let mut ctx = Context::with_value(
29// ctx,
30// ContextKey::NativeObject("join_set".into()),
31// ContextValue::Native(Arc::new(
32// RwLock::<Vec<JoinHandle<PipelineResult<Value>>>>::new(vec![]),
33// )),
34// );
35// if path == pipeline_name || path == "*" {
36// for i in &blocks {
37// ctx.eval_stmt(i)?;
38// }
39// }
40// let join_set = ctx
41// .get(ContextKey::NativeObject("join_set".into()))
42// .unwrap()
43// .as_native()
44// .unwrap();
45//
46// let mut binding = join_set.write().unwrap();
47// let join_set = binding
48// .downcast_mut::<Vec<JoinHandle<PipelineResult<Value>>>>()
49// .unwrap();
50// while !join_set.is_empty() {
51// let e = join_set.pop().unwrap();
52// e.join().unwrap().unwrap();
53// }
54// Ok(().into())
55// });
56// m.register_pipe_function("step", |ctx, args| {
57// let task_name = args.get(0).unwrap().as_string().unwrap();
58// let mut ptr = args.get(1).unwrap().as_dynamic().as_fn_ptr().unwrap();
59// let active_task: String = ctx.get_value("ACTIVE_TASK").unwrap().into();
60// if active_task == task_name || active_task.as_str() == "*" {
61// return ptr.call(ctx);
62// }
63// Ok(().into())
64// });
65// m.register_pipe_function("parallel", |ctx, args| {
66// let task_name = args.get(0).unwrap().as_string().unwrap();
67// let mut ptr = args.get(1).unwrap().as_dynamic().as_fn_ptr().unwrap();
68// let active_task: String = ctx.get_value("ACTIVE_TASK").unwrap().into();
69// if active_task == task_name || active_task.as_str() == "*" {
70// let join_set = ctx
71// .get(ContextKey::NativeObject("join_set".into()))
72// .unwrap()
73// .as_native()
74// .unwrap();
75// let mut binding = join_set.write().unwrap();
76// let join_set = binding
77// .downcast_mut::<Vec<JoinHandle<PipelineResult<Value>>>>()
78// .unwrap();
79// // 创建一个 Arc<Mutex<Context>> 来共享 ctx
80// let shared_ctx = Arc::new(Mutex::new(ctx.clone()));
81// let join_handle = thread::spawn(move || {
82// let mut ctx = shared_ctx.lock().unwrap();
83// return ptr.call(&mut ctx);
84// });
85// join_set.push(join_handle);
86// }
87// Ok(().into())
88// });
89// e.register_module(m)
90// }
91// }