dbx_core/engine/
automation_api.rs1use crate::automation::callable::ExecutionContext;
6use crate::automation::{EventHook, EventHookEvent, ScheduledJob, Scheduler};
7use crate::engine::Database;
8use crate::error::DbxResult;
9use std::sync::{Arc, RwLock};
10
11pub struct TriggerRegistry {
13 triggers: RwLock<Vec<Arc<EventHook>>>,
14}
15
16impl TriggerRegistry {
17 pub fn new() -> Self {
18 Self {
19 triggers: RwLock::new(Vec::new()),
20 }
21 }
22
23 pub fn register(&self, hook: Arc<EventHook>) -> DbxResult<()> {
24 let mut triggers = self
25 .triggers
26 .write()
27 .map_err(|_| crate::error::DbxError::LockPoisoned)?;
28
29 if triggers.iter().any(|t| t.name() == hook.name()) {
31 return Err(crate::error::DbxError::DuplicateCallable(
32 hook.name().to_string(),
33 ));
34 }
35
36 triggers.push(hook);
37 Ok(())
38 }
39
40 pub fn unregister(&self, name: &str) -> DbxResult<()> {
41 let mut triggers = self
42 .triggers
43 .write()
44 .map_err(|_| crate::error::DbxError::LockPoisoned)?;
45
46 let pos = triggers
47 .iter()
48 .position(|t| t.name() == name)
49 .ok_or_else(|| crate::error::DbxError::CallableNotFound(name.to_string()))?;
50
51 triggers.remove(pos);
52 Ok(())
53 }
54
55 pub fn fire(&self, ctx: &ExecutionContext, event: &EventHookEvent) -> DbxResult<Vec<String>> {
57 let triggers = self
58 .triggers
59 .read()
60 .map_err(|_| crate::error::DbxError::LockPoisoned)?;
61
62 let mut executed = Vec::new();
63
64 for hook in triggers.iter() {
65 match hook.fire(ctx, event) {
66 Ok(true) => executed.push(hook.name().to_string()),
67 Ok(false) => {} Err(e) => {
69 eprintln!("[EVENT HOOK ERROR] {}: {}", hook.name(), e);
71 }
72 }
73 }
74
75 Ok(executed)
76 }
77
78 pub fn list(&self) -> DbxResult<Vec<String>> {
79 let triggers = self
80 .triggers
81 .read()
82 .map_err(|_| crate::error::DbxError::LockPoisoned)?;
83
84 Ok(triggers.iter().map(|t| t.name().to_string()).collect())
85 }
86}
87
88impl Default for TriggerRegistry {
89 fn default() -> Self {
90 Self::new()
91 }
92}
93
94impl Database {
95 pub fn register_trigger(&self, hook: EventHook) -> DbxResult<()> {
122 let hook = Arc::new(hook);
123 self.automation_engine
125 .register(Arc::clone(&hook) as Arc<dyn crate::automation::callable::Callable>)?;
126 self.trigger_registry.register(hook)
128 }
129
130 pub fn unregister_trigger(&self, name: &str) -> DbxResult<()> {
132 self.automation_engine.unregister(name)?;
133 self.trigger_registry.unregister(name)
134 }
135
136 pub fn fire_trigger(&self, event: EventHookEvent) -> DbxResult<Vec<String>> {
140 let ctx = ExecutionContext::new(Arc::new(Database::open_in_memory()?));
141 self.trigger_registry.fire(&ctx, &event)
142 }
143
144 pub fn fire_trigger_with_ctx(
146 &self,
147 ctx: &ExecutionContext,
148 event: EventHookEvent,
149 ) -> DbxResult<Vec<String>> {
150 self.trigger_registry.fire(ctx, &event)
151 }
152
153 pub fn list_triggers(&self) -> DbxResult<Vec<String>> {
155 self.trigger_registry.list()
156 }
157
158 pub fn create_scheduler(&self) -> Scheduler {
160 Scheduler::new(Arc::clone(&self.automation_engine))
161 }
162
163 pub fn register_scheduled_job(
165 &self,
166 scheduler: &Scheduler,
167 job: ScheduledJob,
168 ) -> DbxResult<()> {
169 scheduler.register(job)
170 }
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176 use crate::automation::callable::{DataType, Signature, Value};
177 use crate::automation::event_hook::{
178 EventHook, EventHookAction, EventHookCondition, EventHookEvent, EventHookEventType,
179 };
180 use crate::automation::scheduler::{Schedule, ScheduleType, ScheduledJob};
181 use std::sync::Mutex;
182 use std::time::Duration;
183
184 #[test]
185 fn test_register_trigger() {
186 let db = Database::open_in_memory().unwrap();
187
188 let executed = Arc::new(Mutex::new(false));
189 let executed_clone = Arc::clone(&executed);
190
191 let hook = EventHook::new(
192 "test_trigger",
193 EventHookEventType::AfterInsert,
194 "users",
195 EventHookCondition::Always,
196 EventHookAction::Custom(Box::new(move |_ctx, _event| {
197 *executed_clone.lock().unwrap() = true;
198 Ok(())
199 })),
200 );
201
202 db.register_trigger(hook).unwrap();
203
204 let triggers = db.list_triggers().unwrap();
205 assert_eq!(triggers.len(), 1);
206 assert!(triggers.contains(&"test_trigger".to_string()));
207 }
208
209 #[test]
210 fn test_unregister_trigger() {
211 let db = Database::open_in_memory().unwrap();
212
213 let hook = EventHook::new(
214 "test_trigger",
215 EventHookEventType::AfterInsert,
216 "users",
217 EventHookCondition::Always,
218 EventHookAction::Custom(Box::new(|_ctx, _event| Ok(()))),
219 );
220
221 db.register_trigger(hook).unwrap();
222 assert_eq!(db.list_triggers().unwrap().len(), 1);
223
224 db.unregister_trigger("test_trigger").unwrap();
225 assert_eq!(db.list_triggers().unwrap().len(), 0);
226 }
227
228 #[test]
229 fn test_create_scheduler() {
230 let db = Database::open_in_memory().unwrap();
231 let scheduler = db.create_scheduler();
232
233 let schedule = Schedule::new(ScheduleType::Interval(Duration::from_secs(60)));
234 let job = ScheduledJob::new("test_job", schedule, "test_udf", vec![]);
235
236 scheduler.register(job).unwrap();
237
238 let jobs = scheduler.list().unwrap();
239 assert_eq!(jobs.len(), 1);
240 }
241
242 #[test]
243 fn test_trigger_with_udf() {
244 let db = Database::open_in_memory().unwrap();
245
246 db.register_scalar_udf(
248 "double",
249 Signature {
250 params: vec![DataType::Int],
251 return_type: DataType::Int,
252 is_variadic: false,
253 },
254 |args| {
255 let x = args[0].as_i64()?;
256 Ok(Value::Int(x * 2))
257 },
258 )
259 .unwrap();
260
261 let result = Arc::new(Mutex::new(0i64));
263 let result_clone = Arc::clone(&result);
264
265 let hook = EventHook::new(
266 "double_trigger",
267 EventHookEventType::AfterInsert,
268 "users",
269 EventHookCondition::Always,
270 EventHookAction::Custom(Box::new(move |ctx, event| {
271 if let Some(value) = event.data.get("id") {
272 let doubled = ctx.dbx.call_udf("double", std::slice::from_ref(value))?;
273 *result_clone.lock().unwrap() = doubled.as_i64()?;
274 }
275 Ok(())
276 })),
277 );
278
279 db.register_trigger(hook).unwrap();
280
281 let event = EventHookEvent::new(EventHookEventType::AfterInsert, "users")
283 .with_data("id", Value::Int(21));
284
285 db.fire_trigger(event).unwrap();
286
287 let callables = db.list_triggers().unwrap();
289 assert!(!callables.is_empty()); assert!(callables.contains(&"double_trigger".to_string()));
291 }
292
293 #[test]
294 fn test_scheduler_with_udf() {
295 let db = Database::open_in_memory().unwrap();
296
297 db.register_scalar_udf(
299 "triple",
300 Signature {
301 params: vec![DataType::Int],
302 return_type: DataType::Int,
303 is_variadic: false,
304 },
305 |args| {
306 let x = args[0].as_i64()?;
307 Ok(Value::Int(x * 3))
308 },
309 )
310 .unwrap();
311
312 let scheduler = db.create_scheduler();
314
315 let schedule = Schedule::new(ScheduleType::Once(Duration::from_secs(0)));
317 let job = ScheduledJob::new("triple_job", schedule, "triple", vec![Value::Int(14)]);
318
319 scheduler.register(job).unwrap();
320
321 let jobs = scheduler.list().unwrap();
322 assert_eq!(jobs.len(), 1);
323 assert!(jobs.contains(&"triple_job".to_string()));
324 }
325}