bronzeflow_core/trigger/
mod.rs1#[cfg(feature = "async_tokio")]
2pub mod tokio_trigger;
3
4use crate::runtime::Runnable;
5use crate::store::Storage;
6use crate::task::RunnableHolder;
7use bronzeflow_time::schedule_time::ScheduleTime;
8use bronzeflow_utils::info;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::{Arc, Mutex};
11use std::thread::JoinHandle;
12use std::{thread, time};
13
14pub type StopSignal = Arc<AtomicBool>;
15
16pub trait Trigger {
17 fn trigger<SG, TC>(&mut self, storage: Arc<Mutex<SG>>, trigger_caller: TriggerCallerType<TC>)
18 where
19 SG: Storage + 'static,
20 TC: TriggerCaller + 'static;
21
22 fn stop(&mut self) {
23 let is_stop = self.sig();
24 if is_stop.load(Ordering::SeqCst) {
25 return;
26 }
27 is_stop.store(true, Ordering::SeqCst);
28 self.do_stop();
29 }
30 fn do_stop(&mut self);
31
32 fn sig(&self) -> &StopSignal;
33}
34
35pub trait TriggerCaller: Send {
36 fn trigger(&self, runnable: &mut impl Runnable, report_msg: bool);
37
38 #[inline(always)]
39 fn trigger_safe<F>(&self, mut runnable: F, report_msg: bool)
40 where
41 F: Runnable + Send + Sync + 'static,
42 {
43 self.trigger(&mut runnable, report_msg)
44 }
45
46 #[inline(always)]
47 fn trigger_holder(&self, runnable: RunnableHolder, report_msg: bool) {
48 match runnable {
49 RunnableHolder::Task(task) => self.trigger_safe(task.task, report_msg),
50 RunnableHolder::Dag(dag) => dag.run_task(|task| {
51 let runner = task.as_ref().lock().unwrap().task.clone();
52 self.trigger_safe(runner, report_msg);
53 }),
54 }
55 }
56}
57
58pub type TriggerCallerType<TC> = Arc<Mutex<TC>>;
59
60pub struct ThreadTrigger {
61 join_handler: Option<JoinHandle<()>>,
62 is_stop: Arc<AtomicBool>,
63}
64
65impl Default for ThreadTrigger {
66 fn default() -> Self {
67 ThreadTrigger::new()
68 }
69}
70impl ThreadTrigger {
71 pub fn new() -> Self {
72 ThreadTrigger {
73 join_handler: None,
74 is_stop: Arc::new(AtomicBool::new(false)),
75 }
76 }
77}
78
79impl Trigger for ThreadTrigger {
80 fn trigger<SG, TC>(&mut self, storage: Arc<Mutex<SG>>, trigger_caller: TriggerCallerType<TC>)
81 where
82 SG: Storage + 'static,
83 TC: TriggerCaller + 'static,
84 {
85 let is_stop = Arc::clone(&self.is_stop);
86 let handler = thread::spawn(move || {
87 loop {
89 if is_stop.load(Ordering::SeqCst) {
90 info!("Stop!!!");
91 break;
92 }
93 let runs = { storage.lock().unwrap().load_runnable() };
95
96 let now = ScheduleTime::from_now();
98 for mut d in runs {
99 if let Some(ref mut tm) = d.time_holder() {
100 if tm
101 .lock()
102 .unwrap()
103 .schedule
104 .as_mut()
105 .unwrap()
106 .cmp_and_to_next(&now)
107 {
108 trigger_caller.lock().unwrap().trigger_holder(d, true);
109 }
110 }
111 }
112 thread::sleep(time::Duration::from_millis(500));
113 }
114 });
115 self.join_handler = Some(handler);
116 }
117
118 fn do_stop(&mut self) {
119 if let Some(handler) = self.join_handler.take() {
120 handler.join().unwrap()
121 }
122 }
123
124 fn sig(&self) -> &StopSignal {
125 &self.is_stop
126 }
127}
128
129impl Drop for ThreadTrigger {
130 fn drop(&mut self) {
131 self.stop();
132 }
133}