dbx_core/automation/
trigger_executor.rs1use crate::automation::{Trigger, TriggerOperation, TriggerTiming};
6use crate::engine::Database;
7use crate::error::DbxResult;
8
9pub struct TriggerExecutor {
11 triggers: Vec<Trigger>,
13}
14
15impl TriggerExecutor {
16 pub fn new() -> Self {
18 Self {
19 triggers: Vec::new(),
20 }
21 }
22
23 pub fn register(&mut self, trigger: Trigger) {
25 self.triggers.push(trigger);
26 }
27
28 pub fn register_all(&mut self, triggers: Vec<Trigger>) {
30 self.triggers.extend(triggers);
31 }
32
33 pub fn unregister(&mut self, name: &str) -> bool {
35 if let Some(pos) = self.triggers.iter().position(|t| t.name == name) {
36 self.triggers.remove(pos);
37 true
38 } else {
39 false
40 }
41 }
42
43 pub fn list_triggers(&self) -> &[Trigger] {
45 &self.triggers
46 }
47
48 pub fn fire_before(
50 &self,
51 db: &Database,
52 operation: TriggerOperation,
53 table: &str,
54 ) -> DbxResult<()> {
55 self.fire_triggers(db, TriggerTiming::Before, operation, table)
56 }
57
58 pub fn fire_after(
60 &self,
61 db: &Database,
62 operation: TriggerOperation,
63 table: &str,
64 ) -> DbxResult<()> {
65 self.fire_triggers(db, TriggerTiming::After, operation, table)
66 }
67
68 fn evaluate_condition_result(result: &[arrow::record_batch::RecordBatch]) -> bool {
74 use arrow::array::AsArray;
75
76 if result.is_empty() {
77 return false;
78 }
79
80 let batch = &result[0];
81 if batch.num_rows() == 0 || batch.num_columns() == 0 {
82 return false;
83 }
84
85 let column = batch.column(0);
87
88 use arrow::datatypes::DataType;
90 match column.data_type() {
91 DataType::Boolean => {
92 let bool_array = column.as_boolean();
93 bool_array.value(0)
94 }
95 DataType::Int64 | DataType::Int32 | DataType::Int16 | DataType::Int8 => {
96 let int_array = column.as_primitive::<arrow::datatypes::Int64Type>();
97 int_array.value(0) != 0
98 }
99 DataType::UInt64 | DataType::UInt32 | DataType::UInt16 | DataType::UInt8 => {
100 let uint_array = column.as_primitive::<arrow::datatypes::UInt64Type>();
101 uint_array.value(0) != 0
102 }
103 DataType::Float64 | DataType::Float32 => {
104 let float_array = column.as_primitive::<arrow::datatypes::Float64Type>();
105 float_array.value(0) != 0.0
106 }
107 DataType::Utf8 => {
108 let str_array = column.as_string::<i32>();
109 let val = str_array.value(0);
110 !val.is_empty() && val != "0" && val.to_lowercase() != "false"
111 }
112 _ => false, }
114 }
115
116 fn fire_triggers(
118 &self,
119 db: &Database,
120 timing: TriggerTiming,
121 operation: TriggerOperation,
122 table: &str,
123 ) -> DbxResult<()> {
124 let matching_triggers: Vec<&Trigger> = self
126 .triggers
127 .iter()
128 .filter(|t| t.timing == timing && t.operation == operation && t.table == table)
129 .collect();
130
131 for trigger in matching_triggers {
133 if let Some(ref condition) = trigger.condition {
135 match db.execute_sql(condition) {
137 Ok(result) => {
138 if !Self::evaluate_condition_result(&result) {
140 #[cfg(debug_assertions)]
141 println!(
142 "[Trigger] Skipping '{}' - WHEN condition evaluated to false",
143 trigger.name
144 );
145 continue; }
147 }
148 Err(e) => {
149 eprintln!(
150 "[Trigger] Failed to evaluate WHEN condition for '{}': {}",
151 trigger.name, e
152 );
153 continue; }
155 }
156 }
157
158 for sql in &trigger.body {
160 match db.execute_sql(sql) {
162 Ok(_) => {
163 #[cfg(debug_assertions)]
164 println!(
165 "[Trigger] Successfully executed '{}' on {:?} {:?}: {}",
166 trigger.name, timing, operation, sql
167 );
168 }
169 Err(e) => {
170 eprintln!(
172 "[Trigger] Failed to execute '{}': {} (SQL: {})",
173 trigger.name, e, sql
174 );
175 }
176 }
177 }
178 }
179
180 Ok(())
181 }
182}
183
184impl Default for TriggerExecutor {
185 fn default() -> Self {
186 Self::new()
187 }
188}
189
190#[cfg(test)]
191mod tests {
192 use super::*;
193 use crate::automation::ForEachType;
194
195 #[test]
196 fn test_trigger_executor_register() {
197 let mut executor = TriggerExecutor::new();
198
199 let trigger = Trigger::new(
200 "test_trigger",
201 TriggerTiming::After,
202 TriggerOperation::Insert,
203 "users",
204 ForEachType::Row,
205 None,
206 vec!["INSERT INTO audit_logs VALUES (1, 'test')".to_string()],
207 );
208
209 executor.register(trigger);
210 assert_eq!(executor.list_triggers().len(), 1);
211 }
212
213 #[test]
214 fn test_trigger_executor_unregister() {
215 let mut executor = TriggerExecutor::new();
216
217 let trigger = Trigger::new(
218 "test_trigger",
219 TriggerTiming::After,
220 TriggerOperation::Insert,
221 "users",
222 ForEachType::Row,
223 None,
224 vec!["INSERT INTO audit_logs VALUES (1, 'test')".to_string()],
225 );
226
227 executor.register(trigger);
228 assert_eq!(executor.list_triggers().len(), 1);
229
230 let removed = executor.unregister("test_trigger");
231 assert!(removed);
232 assert_eq!(executor.list_triggers().len(), 0);
233 }
234
235 #[test]
236 fn test_trigger_executor_filter() {
237 let mut executor = TriggerExecutor::new();
238
239 executor.register(Trigger::new(
241 "after_insert",
242 TriggerTiming::After,
243 TriggerOperation::Insert,
244 "users",
245 ForEachType::Row,
246 None,
247 vec!["INSERT INTO logs VALUES (1)".to_string()],
248 ));
249
250 executor.register(Trigger::new(
252 "before_update",
253 TriggerTiming::Before,
254 TriggerOperation::Update,
255 "users",
256 ForEachType::Row,
257 None,
258 vec!["UPDATE stats SET count = count + 1".to_string()],
259 ));
260
261 assert_eq!(executor.list_triggers().len(), 2);
262 }
263}