1use chrono::Utc;
7use rustvello_core::trigger::TriggerStore;
8use rustvello_proto::identifiers::TaskId;
9use rustvello_proto::status::InvocationStatus;
10use rustvello_proto::trigger::{
11 ConditionContext, ConditionId, CronCondition, EventCondition, EventContext, ExceptionCondition,
12 ResultCondition, StatusCondition, TriggerCondition, TriggerDefinitionDTO, TriggerDefinitionId,
13 TriggerLogic, TriggerRunId, ValidCondition,
14};
15
16use crate::helpers::test_task_id;
17
18pub async fn test_register_and_get_condition(store: &dyn TriggerStore) {
20 let cond = TriggerCondition::Event(EventCondition {
21 event_code: "payment".to_string(),
22 payload_filter: None,
23 });
24 let id = store.register_condition(&cond).await.unwrap();
25 let got = store.get_condition(&id).await.unwrap();
26 assert!(got.is_some());
27 assert_eq!(got.unwrap().condition_id(), id);
28}
29
30pub async fn test_get_conditions_for_task(store: &dyn TriggerStore) {
32 let task_id = test_task_id("task");
33 let cond = TriggerCondition::Status(StatusCondition {
34 task_id: task_id.clone(),
35 statuses: vec![InvocationStatus::Success],
36 argument_filter: None,
37 });
38 store.register_condition(&cond).await.unwrap();
39
40 let conds = store.get_conditions_for_task(&task_id).await.unwrap();
41 assert_eq!(conds.len(), 1);
42
43 let other = TaskId::new("mod", "other");
44 let conds = store.get_conditions_for_task(&other).await.unwrap();
45 assert!(conds.is_empty());
46}
47
48pub async fn test_get_event_conditions(store: &dyn TriggerStore) {
50 let cond = TriggerCondition::Event(EventCondition {
51 event_code: "payment".to_string(),
52 payload_filter: None,
53 });
54 store.register_condition(&cond).await.unwrap();
55
56 let got = store.get_event_conditions("payment").await.unwrap();
57 assert_eq!(got.len(), 1);
58
59 let got = store.get_event_conditions("other").await.unwrap();
60 assert!(got.is_empty());
61}
62
63pub async fn test_get_cron_conditions(store: &dyn TriggerStore) {
65 let cond = TriggerCondition::Cron(CronCondition {
66 cron_expression: "* * * * *".to_string(),
67 min_interval_seconds: 50,
68 });
69 store.register_condition(&cond).await.unwrap();
70
71 let conds = store.get_cron_conditions().await.unwrap();
72 assert_eq!(conds.len(), 1);
73}
74
75pub async fn test_register_and_get_trigger(store: &dyn TriggerStore) {
77 let task_id = test_task_id("target");
78 let cond_ids = vec![ConditionId::from("c1".to_string())];
79 let trigger_id =
80 TriggerDefinitionDTO::compute_trigger_id(&task_id, &cond_ids, TriggerLogic::Or);
81
82 let trigger = TriggerDefinitionDTO {
83 trigger_id: trigger_id.clone(),
84 task_id,
85 condition_ids: cond_ids,
86 logic: TriggerLogic::Or,
87 argument_template: Some(serde_json::json!({"key": "value"})),
88 };
89 store.register_trigger(&trigger).await.unwrap();
90
91 let got = store.get_trigger(&trigger_id).await.unwrap();
92 assert!(got.is_some());
93 let got = got.unwrap();
94 assert_eq!(got.logic, TriggerLogic::Or);
95 assert!(got.argument_template.is_some());
96}
97
98pub async fn test_get_triggers_for_condition(store: &dyn TriggerStore) {
100 let cond_id = ConditionId::from("c1".to_string());
101 let task_id = test_task_id("target");
102 let trigger = TriggerDefinitionDTO {
103 trigger_id: TriggerDefinitionDTO::compute_trigger_id(
104 &task_id,
105 &[cond_id.clone()],
106 TriggerLogic::Or,
107 ),
108 task_id,
109 condition_ids: vec![cond_id.clone()],
110 logic: TriggerLogic::Or,
111 argument_template: None,
112 };
113 store.register_trigger(&trigger).await.unwrap();
114
115 let triggers = store.get_triggers_for_condition(&cond_id).await.unwrap();
116 assert_eq!(triggers.len(), 1);
117}
118
119pub async fn test_remove_triggers_for_task(store: &dyn TriggerStore) {
121 let task_id = test_task_id("target");
122 let trigger = TriggerDefinitionDTO {
123 trigger_id: TriggerDefinitionId::from("t1".to_string()),
124 task_id: task_id.clone(),
125 condition_ids: vec![],
126 logic: TriggerLogic::And,
127 argument_template: None,
128 };
129 store.register_trigger(&trigger).await.unwrap();
130
131 let removed = store.remove_triggers_for_task(&task_id).await.unwrap();
132 assert_eq!(removed, 1);
133
134 let got = store
135 .get_trigger(&TriggerDefinitionId::from("t1".to_string()))
136 .await
137 .unwrap();
138 assert!(got.is_none());
139}
140
141pub async fn test_valid_condition_lifecycle(store: &dyn TriggerStore) {
143 let vc = ValidCondition::new(
144 ConditionId::from("c1".to_string()),
145 ConditionContext::Event(EventContext {
146 event_id: "e1".to_string(),
147 event_code: "test".to_string(),
148 payload: serde_json::json!({}),
149 }),
150 );
151 let vc_id = vc.valid_condition_id.clone();
152
153 store.record_valid_condition(&vc).await.unwrap();
154 let vcs = store.get_valid_conditions().await.unwrap();
155 assert_eq!(vcs.len(), 1);
156
157 store.clear_valid_conditions(&[vc_id]).await.unwrap();
158 let vcs = store.get_valid_conditions().await.unwrap();
159 assert!(vcs.is_empty());
160}
161
162pub async fn test_cron_execution_optimistic_lock(store: &dyn TriggerStore) {
164 let cond_id = ConditionId::from("cron1".to_string());
165 let now = Utc::now();
166
167 assert!(store
169 .store_cron_execution(&cond_id, now, None)
170 .await
171 .unwrap());
172
173 assert!(!store
175 .store_cron_execution(&cond_id, now, None)
176 .await
177 .unwrap());
178
179 let later = now + chrono::Duration::seconds(60);
181 assert!(store
182 .store_cron_execution(&cond_id, later, Some(now))
183 .await
184 .unwrap());
185}
186
187pub async fn test_claim_trigger_run_dedup(store: &dyn TriggerStore) {
189 let run_id = TriggerRunId::from("run-1".to_string());
190
191 assert!(store.claim_trigger_run(&run_id).await.unwrap());
192 assert!(!store.claim_trigger_run(&run_id).await.unwrap());
193}
194
195pub async fn test_purge_clears_all(store: &dyn TriggerStore) {
197 let cond = TriggerCondition::Event(EventCondition {
198 event_code: "test".to_string(),
199 payload_filter: None,
200 });
201 store.register_condition(&cond).await.unwrap();
202 store.purge().await.unwrap();
203
204 let got = store.get_event_conditions("test").await.unwrap();
205 assert!(got.is_empty());
206}
207
208pub async fn test_multiple_cron_conditions(store: &dyn TriggerStore) {
210 let cond1 = TriggerCondition::Cron(CronCondition {
211 cron_expression: "0 * * * *".to_string(), min_interval_seconds: 50,
213 });
214 let cond2 = TriggerCondition::Cron(CronCondition {
215 cron_expression: "*/5 * * * *".to_string(), min_interval_seconds: 240,
217 });
218 let cond3 = TriggerCondition::Cron(CronCondition {
219 cron_expression: "0 0 * * *".to_string(), min_interval_seconds: 3500,
221 });
222
223 let id1 = store.register_condition(&cond1).await.unwrap();
224 let id2 = store.register_condition(&cond2).await.unwrap();
225 let id3 = store.register_condition(&cond3).await.unwrap();
226
227 assert_ne!(id1, id2);
229 assert_ne!(id2, id3);
230
231 let crons = store.get_cron_conditions().await.unwrap();
232 assert_eq!(crons.len(), 3);
233}
234
235pub async fn test_cron_execution_history(store: &dyn TriggerStore) {
238 let cond_id = ConditionId::from("cron_hist_1".to_string());
239
240 let last = store.get_last_cron_execution(&cond_id).await.unwrap();
242 assert!(last.is_none(), "Should have no initial execution");
243
244 let now = Utc::now();
246 assert!(store
247 .store_cron_execution(&cond_id, now, None)
248 .await
249 .unwrap());
250
251 let last = store.get_last_cron_execution(&cond_id).await.unwrap();
253 assert!(last.is_some(), "Should have recorded execution");
254 let diff = (last.unwrap() - now).num_milliseconds().unsigned_abs();
255 assert!(diff < 1000, "Stored timestamp should match");
256}
257
258pub async fn test_cron_sequential_executions(store: &dyn TriggerStore) {
260 let cond_id = ConditionId::from("cron_seq_1".to_string());
261 let t1 = Utc::now() - chrono::Duration::seconds(120);
262 let t2 = Utc::now() - chrono::Duration::seconds(60);
263 let t3 = Utc::now();
264
265 assert!(store
267 .store_cron_execution(&cond_id, t1, None)
268 .await
269 .unwrap());
270
271 assert!(store
273 .store_cron_execution(&cond_id, t2, Some(t1))
274 .await
275 .unwrap());
276
277 assert!(!store
279 .store_cron_execution(&cond_id, t3, Some(t1))
280 .await
281 .unwrap());
282
283 assert!(store
285 .store_cron_execution(&cond_id, t3, Some(t2))
286 .await
287 .unwrap());
288
289 let last = store.get_last_cron_execution(&cond_id).await.unwrap();
291 assert!(last.is_some());
292 let diff = (last.unwrap() - t3).num_milliseconds().unsigned_abs();
293 assert!(diff < 1000, "Final timestamp should be t3");
294}
295
296pub async fn test_trigger_and_logic(store: &dyn TriggerStore) {
298 let task_id = test_task_id("and_target");
299 let cond1 = TriggerCondition::Event(EventCondition {
300 event_code: "evt_a".to_string(),
301 payload_filter: None,
302 });
303 let cond2 = TriggerCondition::Event(EventCondition {
304 event_code: "evt_b".to_string(),
305 payload_filter: None,
306 });
307 let id1 = store.register_condition(&cond1).await.unwrap();
308 let id2 = store.register_condition(&cond2).await.unwrap();
309
310 let trigger = TriggerDefinitionDTO {
311 trigger_id: TriggerDefinitionDTO::compute_trigger_id(
312 &task_id,
313 &[id1.clone(), id2.clone()],
314 TriggerLogic::And,
315 ),
316 task_id,
317 condition_ids: vec![id1.clone(), id2.clone()],
318 logic: TriggerLogic::And,
319 argument_template: None,
320 };
321 store.register_trigger(&trigger).await.unwrap();
322
323 let triggers1 = store.get_triggers_for_condition(&id1).await.unwrap();
325 let triggers2 = store.get_triggers_for_condition(&id2).await.unwrap();
326 assert_eq!(triggers1.len(), 1);
327 assert_eq!(triggers2.len(), 1);
328 assert_eq!(triggers1[0].logic, TriggerLogic::And);
329 assert_eq!(triggers1[0].condition_ids.len(), 2);
330}
331
332pub async fn test_get_all_conditions(store: &dyn TriggerStore) {
334 let task_id = test_task_id("all_cond_task");
336
337 let cron = TriggerCondition::Cron(CronCondition {
338 cron_expression: "0 0 * * *".to_string(),
339 min_interval_seconds: 50,
340 });
341 let event = TriggerCondition::Event(EventCondition {
342 event_code: "all_evt".to_string(),
343 payload_filter: None,
344 });
345 let status = TriggerCondition::Status(StatusCondition {
346 task_id: task_id.clone(),
347 statuses: vec![InvocationStatus::Success],
348 argument_filter: None,
349 });
350 let result = TriggerCondition::Result(ResultCondition {
351 task_id: task_id.clone(),
352 argument_filter: None,
353 result_filter: None,
354 });
355 let exception = TriggerCondition::Exception(ExceptionCondition {
356 task_id: task_id.clone(),
357 exception_types: vec!["ValueError".to_string()],
358 argument_filter: None,
359 });
360
361 let id_cron = store.register_condition(&cron).await.unwrap();
362 let id_event = store.register_condition(&event).await.unwrap();
363 let id_status = store.register_condition(&status).await.unwrap();
364 let id_result = store.register_condition(&result).await.unwrap();
365 let id_exception = store.register_condition(&exception).await.unwrap();
366
367 let all = store.get_all_conditions().await.unwrap();
368 let all_ids: std::collections::HashSet<String> =
369 all.iter().map(|(id, _)| id.as_str().to_owned()).collect();
370
371 assert!(
372 all_ids.contains(id_cron.as_str()),
373 "get_all_conditions must include Cron conditions"
374 );
375 assert!(
376 all_ids.contains(id_event.as_str()),
377 "get_all_conditions must include Event conditions"
378 );
379 assert!(
380 all_ids.contains(id_status.as_str()),
381 "get_all_conditions must include Status conditions"
382 );
383 assert!(
384 all_ids.contains(id_result.as_str()),
385 "get_all_conditions must include Result conditions"
386 );
387 assert!(
388 all_ids.contains(id_exception.as_str()),
389 "get_all_conditions must include Exception conditions"
390 );
391 assert!(
392 all.len() >= 5,
393 "Expected at least 5 conditions, got {}",
394 all.len()
395 );
396}
397
398#[macro_export]
409macro_rules! trigger_suite {
410 ($setup:expr) => {
411 #[tokio::test]
412 async fn suite_trigger_register_and_get_condition() {
413 let store = $setup;
414 $crate::trigger::test_register_and_get_condition(&store).await;
415 }
416
417 #[tokio::test]
418 async fn suite_trigger_get_conditions_for_task() {
419 let store = $setup;
420 $crate::trigger::test_get_conditions_for_task(&store).await;
421 }
422
423 #[tokio::test]
424 async fn suite_trigger_get_event_conditions() {
425 let store = $setup;
426 $crate::trigger::test_get_event_conditions(&store).await;
427 }
428
429 #[tokio::test]
430 async fn suite_trigger_get_cron_conditions() {
431 let store = $setup;
432 $crate::trigger::test_get_cron_conditions(&store).await;
433 }
434
435 #[tokio::test]
436 async fn suite_trigger_register_and_get_trigger() {
437 let store = $setup;
438 $crate::trigger::test_register_and_get_trigger(&store).await;
439 }
440
441 #[tokio::test]
442 async fn suite_trigger_get_triggers_for_condition() {
443 let store = $setup;
444 $crate::trigger::test_get_triggers_for_condition(&store).await;
445 }
446
447 #[tokio::test]
448 async fn suite_trigger_remove_triggers_for_task() {
449 let store = $setup;
450 $crate::trigger::test_remove_triggers_for_task(&store).await;
451 }
452
453 #[tokio::test]
454 async fn suite_trigger_valid_condition_lifecycle() {
455 let store = $setup;
456 $crate::trigger::test_valid_condition_lifecycle(&store).await;
457 }
458
459 #[tokio::test]
460 async fn suite_trigger_cron_execution_optimistic_lock() {
461 let store = $setup;
462 $crate::trigger::test_cron_execution_optimistic_lock(&store).await;
463 }
464
465 #[tokio::test]
466 async fn suite_trigger_claim_trigger_run_dedup() {
467 let store = $setup;
468 $crate::trigger::test_claim_trigger_run_dedup(&store).await;
469 }
470
471 #[tokio::test]
472 async fn suite_trigger_purge_clears_all() {
473 let store = $setup;
474 $crate::trigger::test_purge_clears_all(&store).await;
475 }
476
477 #[tokio::test]
478 async fn suite_trigger_multiple_cron_conditions() {
479 let store = $setup;
480 $crate::trigger::test_multiple_cron_conditions(&store).await;
481 }
482
483 #[tokio::test]
484 async fn suite_trigger_cron_execution_history() {
485 let store = $setup;
486 $crate::trigger::test_cron_execution_history(&store).await;
487 }
488
489 #[tokio::test]
490 async fn suite_trigger_cron_sequential_executions() {
491 let store = $setup;
492 $crate::trigger::test_cron_sequential_executions(&store).await;
493 }
494
495 #[tokio::test]
496 async fn suite_trigger_and_logic() {
497 let store = $setup;
498 $crate::trigger::test_trigger_and_logic(&store).await;
499 }
500
501 #[tokio::test]
502 async fn suite_trigger_get_all_conditions() {
503 let store = $setup;
504 $crate::trigger::test_get_all_conditions(&store).await;
505 }
506 };
507}
508
509#[macro_export]
514macro_rules! async_trigger_suite {
515 ($setup:expr) => {
516 #[tokio::test]
517 #[ignore = "requires Docker"]
518 async fn suite_trigger_register_and_get_condition() {
519 let (_c, store) = $setup.await;
520 $crate::trigger::test_register_and_get_condition(&store).await;
521 }
522
523 #[tokio::test]
524 #[ignore = "requires Docker"]
525 async fn suite_trigger_get_conditions_for_task() {
526 let (_c, store) = $setup.await;
527 $crate::trigger::test_get_conditions_for_task(&store).await;
528 }
529
530 #[tokio::test]
531 #[ignore = "requires Docker"]
532 async fn suite_trigger_get_event_conditions() {
533 let (_c, store) = $setup.await;
534 $crate::trigger::test_get_event_conditions(&store).await;
535 }
536
537 #[tokio::test]
538 #[ignore = "requires Docker"]
539 async fn suite_trigger_get_cron_conditions() {
540 let (_c, store) = $setup.await;
541 $crate::trigger::test_get_cron_conditions(&store).await;
542 }
543
544 #[tokio::test]
545 #[ignore = "requires Docker"]
546 async fn suite_trigger_register_and_get_trigger() {
547 let (_c, store) = $setup.await;
548 $crate::trigger::test_register_and_get_trigger(&store).await;
549 }
550
551 #[tokio::test]
552 #[ignore = "requires Docker"]
553 async fn suite_trigger_get_triggers_for_condition() {
554 let (_c, store) = $setup.await;
555 $crate::trigger::test_get_triggers_for_condition(&store).await;
556 }
557
558 #[tokio::test]
559 #[ignore = "requires Docker"]
560 async fn suite_trigger_remove_triggers_for_task() {
561 let (_c, store) = $setup.await;
562 $crate::trigger::test_remove_triggers_for_task(&store).await;
563 }
564
565 #[tokio::test]
566 #[ignore = "requires Docker"]
567 async fn suite_trigger_valid_condition_lifecycle() {
568 let (_c, store) = $setup.await;
569 $crate::trigger::test_valid_condition_lifecycle(&store).await;
570 }
571
572 #[tokio::test]
573 #[ignore = "requires Docker"]
574 async fn suite_trigger_cron_execution_optimistic_lock() {
575 let (_c, store) = $setup.await;
576 $crate::trigger::test_cron_execution_optimistic_lock(&store).await;
577 }
578
579 #[tokio::test]
580 #[ignore = "requires Docker"]
581 async fn suite_trigger_claim_trigger_run_dedup() {
582 let (_c, store) = $setup.await;
583 $crate::trigger::test_claim_trigger_run_dedup(&store).await;
584 }
585
586 #[tokio::test]
587 #[ignore = "requires Docker"]
588 async fn suite_trigger_purge_clears_all() {
589 let (_c, store) = $setup.await;
590 $crate::trigger::test_purge_clears_all(&store).await;
591 }
592
593 #[tokio::test]
594 #[ignore = "requires Docker"]
595 async fn suite_trigger_multiple_cron_conditions() {
596 let (_c, store) = $setup.await;
597 $crate::trigger::test_multiple_cron_conditions(&store).await;
598 }
599
600 #[tokio::test]
601 #[ignore = "requires Docker"]
602 async fn suite_trigger_cron_execution_history() {
603 let (_c, store) = $setup.await;
604 $crate::trigger::test_cron_execution_history(&store).await;
605 }
606
607 #[tokio::test]
608 #[ignore = "requires Docker"]
609 async fn suite_trigger_cron_sequential_executions() {
610 let (_c, store) = $setup.await;
611 $crate::trigger::test_cron_sequential_executions(&store).await;
612 }
613
614 #[tokio::test]
615 #[ignore = "requires Docker"]
616 async fn suite_trigger_and_logic() {
617 let (_c, store) = $setup.await;
618 $crate::trigger::test_trigger_and_logic(&store).await;
619 }
620
621 #[tokio::test]
622 #[ignore = "requires Docker"]
623 async fn suite_trigger_get_all_conditions() {
624 let (_c, store) = $setup.await;
625 $crate::trigger::test_get_all_conditions(&store).await;
626 }
627 };
628}