Skip to main content

dbx_core/automation/
trigger_executor.rs

1//! Trigger Execution Engine
2//!
3//! Trigger 실행 엔진: 이벤트 발생 시 SQL Trigger 자동 실행
4
5use crate::automation::{Trigger, TriggerOperation, TriggerTiming};
6use crate::engine::Database;
7use crate::error::DbxResult;
8
9/// Trigger 실행 엔진
10pub struct TriggerExecutor {
11    /// 등록된 Trigger 목록
12    triggers: Vec<Trigger>,
13}
14
15impl TriggerExecutor {
16    /// 새 Trigger 실행 엔진 생성
17    pub fn new() -> Self {
18        Self {
19            triggers: Vec::new(),
20        }
21    }
22
23    /// Trigger 등록
24    pub fn register(&mut self, trigger: Trigger) {
25        self.triggers.push(trigger);
26    }
27
28    /// 모든 Trigger 등록
29    pub fn register_all(&mut self, triggers: Vec<Trigger>) {
30        self.triggers.extend(triggers);
31    }
32
33    /// Trigger 제거
34    pub fn unregister(&mut self, name: &str) -> bool {
35        if let Some(pos) = self.triggers.iter().position(|t| t.name == name) {
36            self.triggers.remove(pos);
37            true
38        } else {
39            false
40        }
41    }
42
43    /// 등록된 모든 Trigger 조회
44    pub fn list_triggers(&self) -> &[Trigger] {
45        &self.triggers
46    }
47
48    /// BEFORE 이벤트 처리
49    pub fn fire_before(
50        &self,
51        db: &Database,
52        operation: TriggerOperation,
53        table: &str,
54    ) -> DbxResult<()> {
55        self.fire_triggers(db, TriggerTiming::Before, operation, table)
56    }
57
58    /// AFTER 이벤트 처리
59    pub fn fire_after(
60        &self,
61        db: &Database,
62        operation: TriggerOperation,
63        table: &str,
64    ) -> DbxResult<()> {
65        self.fire_triggers(db, TriggerTiming::After, operation, table)
66    }
67
68    /// WHEN 조건 결과 평가
69    ///
70    /// RecordBatch 결과를 받아서 true/false로 평가합니다.
71    /// - 결과가 비어있으면 false
72    /// - 첨번째 행의 첨번째 열이 true/1/non-zero면 true
73    fn evaluate_condition_result(result: &[arrow::record_batch::RecordBatch]) -> bool {
74        use arrow::array::AsArray;
75
76        if result.is_empty() {
77            return false;
78        }
79
80        let batch = &result[0];
81        if batch.num_rows() == 0 || batch.num_columns() == 0 {
82            return false;
83        }
84
85        // 첨번째 열 가져오기
86        let column = batch.column(0);
87
88        // 타입에 따라 평가
89        use arrow::datatypes::DataType;
90        match column.data_type() {
91            DataType::Boolean => {
92                let bool_array = column.as_boolean();
93                bool_array.value(0)
94            }
95            DataType::Int64 | DataType::Int32 | DataType::Int16 | DataType::Int8 => {
96                let int_array = column.as_primitive::<arrow::datatypes::Int64Type>();
97                int_array.value(0) != 0
98            }
99            DataType::UInt64 | DataType::UInt32 | DataType::UInt16 | DataType::UInt8 => {
100                let uint_array = column.as_primitive::<arrow::datatypes::UInt64Type>();
101                uint_array.value(0) != 0
102            }
103            DataType::Float64 | DataType::Float32 => {
104                let float_array = column.as_primitive::<arrow::datatypes::Float64Type>();
105                float_array.value(0) != 0.0
106            }
107            DataType::Utf8 => {
108                let str_array = column.as_string::<i32>();
109                let val = str_array.value(0);
110                !val.is_empty() && val != "0" && val.to_lowercase() != "false"
111            }
112            _ => false, // 기타 타입은 false
113        }
114    }
115
116    /// Trigger 실행
117    fn fire_triggers(
118        &self,
119        db: &Database,
120        timing: TriggerTiming,
121        operation: TriggerOperation,
122        table: &str,
123    ) -> DbxResult<()> {
124        // 조건에 맞는 Trigger 필터링
125        let matching_triggers: Vec<&Trigger> = self
126            .triggers
127            .iter()
128            .filter(|t| t.timing == timing && t.operation == operation && t.table == table)
129            .collect();
130
131        // 각 Trigger의 SQL 문장 실행
132        for trigger in matching_triggers {
133            // WHEN 조건 평가
134            if let Some(ref condition) = trigger.condition {
135                // 조건 SQL 실행 (SELECT 문으로 평가)
136                match db.execute_sql(condition) {
137                    Ok(result) => {
138                        // 결과가 true인지 확인
139                        if !Self::evaluate_condition_result(&result) {
140                            #[cfg(debug_assertions)]
141                            println!(
142                                "[Trigger] Skipping '{}' - WHEN condition evaluated to false",
143                                trigger.name
144                            );
145                            continue; // 조건 false면 스킵
146                        }
147                    }
148                    Err(e) => {
149                        eprintln!(
150                            "[Trigger] Failed to evaluate WHEN condition for '{}': {}",
151                            trigger.name, e
152                        );
153                        continue; // 조건 평가 실패 시 스킵
154                    }
155                }
156            }
157
158            // Trigger body의 각 SQL 문장 실행
159            for sql in &trigger.body {
160                // 실제 SQL 실행
161                match db.execute_sql(sql) {
162                    Ok(_) => {
163                        #[cfg(debug_assertions)]
164                        println!(
165                            "[Trigger] Successfully executed '{}' on {:?} {:?}: {}",
166                            trigger.name, timing, operation, sql
167                        );
168                    }
169                    Err(e) => {
170                        // Trigger 실행 실패 시 경고 로그 출력 (에러 전파 안 함)
171                        eprintln!(
172                            "[Trigger] Failed to execute '{}': {} (SQL: {})",
173                            trigger.name, e, sql
174                        );
175                    }
176                }
177            }
178        }
179
180        Ok(())
181    }
182}
183
184impl Default for TriggerExecutor {
185    fn default() -> Self {
186        Self::new()
187    }
188}
189
190#[cfg(test)]
191mod tests {
192    use super::*;
193    use crate::automation::ForEachType;
194
195    #[test]
196    fn test_trigger_executor_register() {
197        let mut executor = TriggerExecutor::new();
198
199        let trigger = Trigger::new(
200            "test_trigger",
201            TriggerTiming::After,
202            TriggerOperation::Insert,
203            "users",
204            ForEachType::Row,
205            None,
206            vec!["INSERT INTO audit_logs VALUES (1, 'test')".to_string()],
207        );
208
209        executor.register(trigger);
210        assert_eq!(executor.list_triggers().len(), 1);
211    }
212
213    #[test]
214    fn test_trigger_executor_unregister() {
215        let mut executor = TriggerExecutor::new();
216
217        let trigger = Trigger::new(
218            "test_trigger",
219            TriggerTiming::After,
220            TriggerOperation::Insert,
221            "users",
222            ForEachType::Row,
223            None,
224            vec!["INSERT INTO audit_logs VALUES (1, 'test')".to_string()],
225        );
226
227        executor.register(trigger);
228        assert_eq!(executor.list_triggers().len(), 1);
229
230        let removed = executor.unregister("test_trigger");
231        assert!(removed);
232        assert_eq!(executor.list_triggers().len(), 0);
233    }
234
235    #[test]
236    fn test_trigger_executor_filter() {
237        let mut executor = TriggerExecutor::new();
238
239        // AFTER INSERT trigger
240        executor.register(Trigger::new(
241            "after_insert",
242            TriggerTiming::After,
243            TriggerOperation::Insert,
244            "users",
245            ForEachType::Row,
246            None,
247            vec!["INSERT INTO logs VALUES (1)".to_string()],
248        ));
249
250        // BEFORE UPDATE trigger
251        executor.register(Trigger::new(
252            "before_update",
253            TriggerTiming::Before,
254            TriggerOperation::Update,
255            "users",
256            ForEachType::Row,
257            None,
258            vec!["UPDATE stats SET count = count + 1".to_string()],
259        ));
260
261        assert_eq!(executor.list_triggers().len(), 2);
262    }
263}