pipeline_script/plugin/
task.rs

1// use crate::context::{Context, ContextKey, ContextValue};
2// use crate::engine::Engine;
3// use crate::error::PipelineResult;
4// use crate::module::Module;
5// use crate::plugin::Plugin;
6// use crate::types::Value;
7// use std::sync::{Arc, Mutex, RwLock};
8// use std::thread;
9// use std::thread::JoinHandle;
10//
11//
12// pub struct TaskPlugin;
13// impl Plugin for TaskPlugin {
14//     fn apply(e: &mut Engine) {
15//         let mut m = Module::new("pipeline");
16//         m.register_pipe_function("pipeline", |ctx, args| {
17//             let pipeline_name = args.get(0).unwrap().as_string().unwrap();
18//             let blocks = args
19//                 .get(1)
20//                 .unwrap()
21//                 .as_dynamic()
22//                 .as_fn_ptr()
23//                 .unwrap()
24//                 .fn_def
25//                 .unwrap()
26//                 .body;
27//             let path: String = ctx.get_value("PIPELINE_NAME").unwrap().into();
28//             let mut ctx = Context::with_value(
29//                 ctx,
30//                 ContextKey::NativeObject("join_set".into()),
31//                 ContextValue::Native(Arc::new(
32//                     RwLock::<Vec<JoinHandle<PipelineResult<Value>>>>::new(vec![]),
33//                 )),
34//             );
35//             if path == pipeline_name || path == "*" {
36//                 for i in &blocks {
37//                     ctx.eval_stmt(i)?;
38//                 }
39//             }
40//             let join_set = ctx
41//                 .get(ContextKey::NativeObject("join_set".into()))
42//                 .unwrap()
43//                 .as_native()
44//                 .unwrap();
45//
46//             let mut binding = join_set.write().unwrap();
47//             let join_set = binding
48//                 .downcast_mut::<Vec<JoinHandle<PipelineResult<Value>>>>()
49//                 .unwrap();
50//             while !join_set.is_empty() {
51//                 let e = join_set.pop().unwrap();
52//                 e.join().unwrap().unwrap();
53//             }
54//             Ok(().into())
55//         });
56//         m.register_pipe_function("step", |ctx, args| {
57//             let task_name = args.get(0).unwrap().as_string().unwrap();
58//             let mut ptr = args.get(1).unwrap().as_dynamic().as_fn_ptr().unwrap();
59//             let active_task: String = ctx.get_value("ACTIVE_TASK").unwrap().into();
60//             if active_task == task_name || active_task.as_str() == "*" {
61//                 return ptr.call(ctx);
62//             }
63//             Ok(().into())
64//         });
65//         m.register_pipe_function("parallel", |ctx, args| {
66//             let task_name = args.get(0).unwrap().as_string().unwrap();
67//             let mut ptr = args.get(1).unwrap().as_dynamic().as_fn_ptr().unwrap();
68//             let active_task: String = ctx.get_value("ACTIVE_TASK").unwrap().into();
69//             if active_task == task_name || active_task.as_str() == "*" {
70//                 let join_set = ctx
71//                     .get(ContextKey::NativeObject("join_set".into()))
72//                     .unwrap()
73//                     .as_native()
74//                     .unwrap();
75//                 let mut binding = join_set.write().unwrap();
76//                 let join_set = binding
77//                     .downcast_mut::<Vec<JoinHandle<PipelineResult<Value>>>>()
78//                     .unwrap();
79//                 // 创建一个 Arc<Mutex<Context>> 来共享 ctx
80//                 let shared_ctx = Arc::new(Mutex::new(ctx.clone()));
81//                 let join_handle = thread::spawn(move || {
82//                     let mut ctx = shared_ctx.lock().unwrap();
83//                     return ptr.call(&mut ctx);
84//                 });
85//                 join_set.push(join_handle);
86//             }
87//             Ok(().into())
88//         });
89//         e.register_module(m)
90//     }
91// }