bronzeflow_core/trigger/
mod.rs

1#[cfg(feature = "async_tokio")]
2pub mod tokio_trigger;
3
4use crate::runtime::Runnable;
5use crate::store::Storage;
6use crate::task::RunnableHolder;
7use bronzeflow_time::schedule_time::ScheduleTime;
8use bronzeflow_utils::info;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::{Arc, Mutex};
11use std::thread::JoinHandle;
12use std::{thread, time};
13
14pub type StopSignal = Arc<AtomicBool>;
15
16pub trait Trigger {
17    fn trigger<SG, TC>(&mut self, storage: Arc<Mutex<SG>>, trigger_caller: TriggerCallerType<TC>)
18    where
19        SG: Storage + 'static,
20        TC: TriggerCaller + 'static;
21
22    fn stop(&mut self) {
23        let is_stop = self.sig();
24        if is_stop.load(Ordering::SeqCst) {
25            return;
26        }
27        is_stop.store(true, Ordering::SeqCst);
28        self.do_stop();
29    }
30    fn do_stop(&mut self);
31
32    fn sig(&self) -> &StopSignal;
33}
34
35pub trait TriggerCaller: Send {
36    fn trigger(&self, runnable: &mut impl Runnable, report_msg: bool);
37
38    #[inline(always)]
39    fn trigger_safe<F>(&self, mut runnable: F, report_msg: bool)
40    where
41        F: Runnable + Send + Sync + 'static,
42    {
43        self.trigger(&mut runnable, report_msg)
44    }
45
46    #[inline(always)]
47    fn trigger_holder(&self, runnable: RunnableHolder, report_msg: bool) {
48        match runnable {
49            RunnableHolder::Task(task) => self.trigger_safe(task.task, report_msg),
50            RunnableHolder::Dag(dag) => dag.run_task(|task| {
51                let runner = task.as_ref().lock().unwrap().task.clone();
52                self.trigger_safe(runner, report_msg);
53            }),
54        }
55    }
56}
57
58pub type TriggerCallerType<TC> = Arc<Mutex<TC>>;
59
60pub struct ThreadTrigger {
61    join_handler: Option<JoinHandle<()>>,
62    is_stop: Arc<AtomicBool>,
63}
64
65impl Default for ThreadTrigger {
66    fn default() -> Self {
67        ThreadTrigger::new()
68    }
69}
70impl ThreadTrigger {
71    pub fn new() -> Self {
72        ThreadTrigger {
73            join_handler: None,
74            is_stop: Arc::new(AtomicBool::new(false)),
75        }
76    }
77}
78
79impl Trigger for ThreadTrigger {
80    fn trigger<SG, TC>(&mut self, storage: Arc<Mutex<SG>>, trigger_caller: TriggerCallerType<TC>)
81    where
82        SG: Storage + 'static,
83        TC: TriggerCaller + 'static,
84    {
85        let is_stop = Arc::clone(&self.is_stop);
86        let handler = thread::spawn(move || {
87            // info!("Enter loop");
88            loop {
89                if is_stop.load(Ordering::SeqCst) {
90                    info!("Stop!!!");
91                    break;
92                }
93                // info!("Hello, world");
94                let runs = { storage.lock().unwrap().load_runnable() };
95
96                // info!("Get dags size: {}", dags.len());
97                let now = ScheduleTime::from_now();
98                for mut d in runs {
99                    if let Some(ref mut tm) = d.time_holder() {
100                        if tm
101                            .lock()
102                            .unwrap()
103                            .schedule
104                            .as_mut()
105                            .unwrap()
106                            .cmp_and_to_next(&now)
107                        {
108                            trigger_caller.lock().unwrap().trigger_holder(d, true);
109                        }
110                    }
111                }
112                thread::sleep(time::Duration::from_millis(500));
113            }
114        });
115        self.join_handler = Some(handler);
116    }
117
118    fn do_stop(&mut self) {
119        if let Some(handler) = self.join_handler.take() {
120            handler.join().unwrap()
121        }
122    }
123
124    fn sig(&self) -> &StopSignal {
125        &self.is_stop
126    }
127}
128
129impl Drop for ThreadTrigger {
130    fn drop(&mut self) {
131        self.stop();
132    }
133}