Skip to main content

rustvello_test_suite/
trigger.rs

1//! Shared trigger store test definitions.
2//!
3//! Each function tests a specific behavior of the [`TriggerStore`] trait.
4//! Backend crates call these with their concrete implementation.
5
6use chrono::Utc;
7use rustvello_core::trigger::TriggerStore;
8use rustvello_proto::identifiers::TaskId;
9use rustvello_proto::status::InvocationStatus;
10use rustvello_proto::trigger::{
11    ConditionContext, ConditionId, CronCondition, EventCondition, EventContext, ExceptionCondition,
12    ResultCondition, StatusCondition, TriggerCondition, TriggerDefinitionDTO, TriggerDefinitionId,
13    TriggerLogic, TriggerRunId, ValidCondition,
14};
15
16use crate::helpers::test_task_id;
17
18/// Register a condition and retrieve it by ID.
19pub async fn test_register_and_get_condition(store: &dyn TriggerStore) {
20    let cond = TriggerCondition::Event(EventCondition {
21        event_code: "payment".to_string(),
22        payload_filter: None,
23    });
24    let id = store.register_condition(&cond).await.unwrap();
25    let got = store.get_condition(&id).await.unwrap();
26    assert!(got.is_some());
27    assert_eq!(got.unwrap().condition_id(), id);
28}
29
30/// Query conditions by source task.
31pub async fn test_get_conditions_for_task(store: &dyn TriggerStore) {
32    let task_id = test_task_id("task");
33    let cond = TriggerCondition::Status(StatusCondition {
34        task_id: task_id.clone(),
35        statuses: vec![InvocationStatus::Success],
36        argument_filter: None,
37    });
38    store.register_condition(&cond).await.unwrap();
39
40    let conds = store.get_conditions_for_task(&task_id).await.unwrap();
41    assert_eq!(conds.len(), 1);
42
43    let other = TaskId::new("mod", "other");
44    let conds = store.get_conditions_for_task(&other).await.unwrap();
45    assert!(conds.is_empty());
46}
47
48/// Query event conditions by event code.
49pub async fn test_get_event_conditions(store: &dyn TriggerStore) {
50    let cond = TriggerCondition::Event(EventCondition {
51        event_code: "payment".to_string(),
52        payload_filter: None,
53    });
54    store.register_condition(&cond).await.unwrap();
55
56    let got = store.get_event_conditions("payment").await.unwrap();
57    assert_eq!(got.len(), 1);
58
59    let got = store.get_event_conditions("other").await.unwrap();
60    assert!(got.is_empty());
61}
62
63/// Query cron conditions.
64pub async fn test_get_cron_conditions(store: &dyn TriggerStore) {
65    let cond = TriggerCondition::Cron(CronCondition {
66        cron_expression: "* * * * *".to_string(),
67        min_interval_seconds: 50,
68    });
69    store.register_condition(&cond).await.unwrap();
70
71    let conds = store.get_cron_conditions().await.unwrap();
72    assert_eq!(conds.len(), 1);
73}
74
75/// Register a trigger definition and retrieve it.
76pub async fn test_register_and_get_trigger(store: &dyn TriggerStore) {
77    let task_id = test_task_id("target");
78    let cond_ids = vec![ConditionId::from("c1".to_string())];
79    let trigger_id =
80        TriggerDefinitionDTO::compute_trigger_id(&task_id, &cond_ids, TriggerLogic::Or);
81
82    let trigger = TriggerDefinitionDTO {
83        trigger_id: trigger_id.clone(),
84        task_id,
85        condition_ids: cond_ids,
86        logic: TriggerLogic::Or,
87        argument_template: Some(serde_json::json!({"key": "value"})),
88    };
89    store.register_trigger(&trigger).await.unwrap();
90
91    let got = store.get_trigger(&trigger_id).await.unwrap();
92    assert!(got.is_some());
93    let got = got.unwrap();
94    assert_eq!(got.logic, TriggerLogic::Or);
95    assert!(got.argument_template.is_some());
96}
97
98/// Query triggers linked to a condition.
99pub async fn test_get_triggers_for_condition(store: &dyn TriggerStore) {
100    let cond_id = ConditionId::from("c1".to_string());
101    let task_id = test_task_id("target");
102    let trigger = TriggerDefinitionDTO {
103        trigger_id: TriggerDefinitionDTO::compute_trigger_id(
104            &task_id,
105            &[cond_id.clone()],
106            TriggerLogic::Or,
107        ),
108        task_id,
109        condition_ids: vec![cond_id.clone()],
110        logic: TriggerLogic::Or,
111        argument_template: None,
112    };
113    store.register_trigger(&trigger).await.unwrap();
114
115    let triggers = store.get_triggers_for_condition(&cond_id).await.unwrap();
116    assert_eq!(triggers.len(), 1);
117}
118
119/// Remove all triggers for a task.
120pub async fn test_remove_triggers_for_task(store: &dyn TriggerStore) {
121    let task_id = test_task_id("target");
122    let trigger = TriggerDefinitionDTO {
123        trigger_id: TriggerDefinitionId::from("t1".to_string()),
124        task_id: task_id.clone(),
125        condition_ids: vec![],
126        logic: TriggerLogic::And,
127        argument_template: None,
128    };
129    store.register_trigger(&trigger).await.unwrap();
130
131    let removed = store.remove_triggers_for_task(&task_id).await.unwrap();
132    assert_eq!(removed, 1);
133
134    let got = store
135        .get_trigger(&TriggerDefinitionId::from("t1".to_string()))
136        .await
137        .unwrap();
138    assert!(got.is_none());
139}
140
141/// Record, retrieve, and clear valid conditions.
142pub async fn test_valid_condition_lifecycle(store: &dyn TriggerStore) {
143    let vc = ValidCondition::new(
144        ConditionId::from("c1".to_string()),
145        ConditionContext::Event(EventContext {
146            event_id: "e1".to_string(),
147            event_code: "test".to_string(),
148            payload: serde_json::json!({}),
149        }),
150    );
151    let vc_id = vc.valid_condition_id.clone();
152
153    store.record_valid_condition(&vc).await.unwrap();
154    let vcs = store.get_valid_conditions().await.unwrap();
155    assert_eq!(vcs.len(), 1);
156
157    store.clear_valid_conditions(&[vc_id]).await.unwrap();
158    let vcs = store.get_valid_conditions().await.unwrap();
159    assert!(vcs.is_empty());
160}
161
162/// Cron execution with optimistic locking.
163pub async fn test_cron_execution_optimistic_lock(store: &dyn TriggerStore) {
164    let cond_id = ConditionId::from("cron1".to_string());
165    let now = Utc::now();
166
167    // First store succeeds (no previous execution)
168    assert!(store
169        .store_cron_execution(&cond_id, now, None)
170        .await
171        .unwrap());
172
173    // Second store with wrong expected value fails
174    assert!(!store
175        .store_cron_execution(&cond_id, now, None)
176        .await
177        .unwrap());
178
179    // Store with correct expected value succeeds
180    let later = now + chrono::Duration::seconds(60);
181    assert!(store
182        .store_cron_execution(&cond_id, later, Some(now))
183        .await
184        .unwrap());
185}
186
187/// Claim trigger run deduplication.
188pub async fn test_claim_trigger_run_dedup(store: &dyn TriggerStore) {
189    let run_id = TriggerRunId::from("run-1".to_string());
190
191    assert!(store.claim_trigger_run(&run_id).await.unwrap());
192    assert!(!store.claim_trigger_run(&run_id).await.unwrap());
193}
194
195/// Purge clears all trigger data.
196pub async fn test_purge_clears_all(store: &dyn TriggerStore) {
197    let cond = TriggerCondition::Event(EventCondition {
198        event_code: "test".to_string(),
199        payload_filter: None,
200    });
201    store.register_condition(&cond).await.unwrap();
202    store.purge().await.unwrap();
203
204    let got = store.get_event_conditions("test").await.unwrap();
205    assert!(got.is_empty());
206}
207
208/// Multiple cron conditions can coexist and be retrieved.
209pub async fn test_multiple_cron_conditions(store: &dyn TriggerStore) {
210    let cond1 = TriggerCondition::Cron(CronCondition {
211        cron_expression: "0 * * * *".to_string(), // every hour
212        min_interval_seconds: 50,
213    });
214    let cond2 = TriggerCondition::Cron(CronCondition {
215        cron_expression: "*/5 * * * *".to_string(), // every 5 minutes
216        min_interval_seconds: 240,
217    });
218    let cond3 = TriggerCondition::Cron(CronCondition {
219        cron_expression: "0 0 * * *".to_string(), // daily at midnight
220        min_interval_seconds: 3500,
221    });
222
223    let id1 = store.register_condition(&cond1).await.unwrap();
224    let id2 = store.register_condition(&cond2).await.unwrap();
225    let id3 = store.register_condition(&cond3).await.unwrap();
226
227    // All three should be distinct
228    assert_ne!(id1, id2);
229    assert_ne!(id2, id3);
230
231    let crons = store.get_cron_conditions().await.unwrap();
232    assert_eq!(crons.len(), 3);
233}
234
235/// Cron execution history: get_last_cron_execution returns None initially,
236/// then the stored value.
237pub async fn test_cron_execution_history(store: &dyn TriggerStore) {
238    let cond_id = ConditionId::from("cron_hist_1".to_string());
239
240    // Initially no execution
241    let last = store.get_last_cron_execution(&cond_id).await.unwrap();
242    assert!(last.is_none(), "Should have no initial execution");
243
244    // Store an execution
245    let now = Utc::now();
246    assert!(store
247        .store_cron_execution(&cond_id, now, None)
248        .await
249        .unwrap());
250
251    // Now last execution should be set
252    let last = store.get_last_cron_execution(&cond_id).await.unwrap();
253    assert!(last.is_some(), "Should have recorded execution");
254    let diff = (last.unwrap() - now).num_milliseconds().unsigned_abs();
255    assert!(diff < 1000, "Stored timestamp should match");
256}
257
258/// Sequential cron executions with correct optimistic lock chaining.
259pub async fn test_cron_sequential_executions(store: &dyn TriggerStore) {
260    let cond_id = ConditionId::from("cron_seq_1".to_string());
261    let t1 = Utc::now() - chrono::Duration::seconds(120);
262    let t2 = Utc::now() - chrono::Duration::seconds(60);
263    let t3 = Utc::now();
264
265    // First execution (no previous)
266    assert!(store
267        .store_cron_execution(&cond_id, t1, None)
268        .await
269        .unwrap());
270
271    // Second with correct expected (t1)
272    assert!(store
273        .store_cron_execution(&cond_id, t2, Some(t1))
274        .await
275        .unwrap());
276
277    // Third with wrong expected (t1 instead of t2) → rejected
278    assert!(!store
279        .store_cron_execution(&cond_id, t3, Some(t1))
280        .await
281        .unwrap());
282
283    // Third with correct expected (t2)
284    assert!(store
285        .store_cron_execution(&cond_id, t3, Some(t2))
286        .await
287        .unwrap());
288
289    // Verify final state
290    let last = store.get_last_cron_execution(&cond_id).await.unwrap();
291    assert!(last.is_some());
292    let diff = (last.unwrap() - t3).num_milliseconds().unsigned_abs();
293    assert!(diff < 1000, "Final timestamp should be t3");
294}
295
296/// AND trigger logic requires all conditions to be valid.
297pub async fn test_trigger_and_logic(store: &dyn TriggerStore) {
298    let task_id = test_task_id("and_target");
299    let cond1 = TriggerCondition::Event(EventCondition {
300        event_code: "evt_a".to_string(),
301        payload_filter: None,
302    });
303    let cond2 = TriggerCondition::Event(EventCondition {
304        event_code: "evt_b".to_string(),
305        payload_filter: None,
306    });
307    let id1 = store.register_condition(&cond1).await.unwrap();
308    let id2 = store.register_condition(&cond2).await.unwrap();
309
310    let trigger = TriggerDefinitionDTO {
311        trigger_id: TriggerDefinitionDTO::compute_trigger_id(
312            &task_id,
313            &[id1.clone(), id2.clone()],
314            TriggerLogic::And,
315        ),
316        task_id,
317        condition_ids: vec![id1.clone(), id2.clone()],
318        logic: TriggerLogic::And,
319        argument_template: None,
320    };
321    store.register_trigger(&trigger).await.unwrap();
322
323    // Verify both conditions are linked
324    let triggers1 = store.get_triggers_for_condition(&id1).await.unwrap();
325    let triggers2 = store.get_triggers_for_condition(&id2).await.unwrap();
326    assert_eq!(triggers1.len(), 1);
327    assert_eq!(triggers2.len(), 1);
328    assert_eq!(triggers1[0].logic, TriggerLogic::And);
329    assert_eq!(triggers1[0].condition_ids.len(), 2);
330}
331
332/// `get_all_conditions` returns every condition type, not just cron.
333pub async fn test_get_all_conditions(store: &dyn TriggerStore) {
334    // Register one condition per type (Cron, Event, Status, Result, Exception)
335    let task_id = test_task_id("all_cond_task");
336
337    let cron = TriggerCondition::Cron(CronCondition {
338        cron_expression: "0 0 * * *".to_string(),
339        min_interval_seconds: 50,
340    });
341    let event = TriggerCondition::Event(EventCondition {
342        event_code: "all_evt".to_string(),
343        payload_filter: None,
344    });
345    let status = TriggerCondition::Status(StatusCondition {
346        task_id: task_id.clone(),
347        statuses: vec![InvocationStatus::Success],
348        argument_filter: None,
349    });
350    let result = TriggerCondition::Result(ResultCondition {
351        task_id: task_id.clone(),
352        argument_filter: None,
353        result_filter: None,
354    });
355    let exception = TriggerCondition::Exception(ExceptionCondition {
356        task_id: task_id.clone(),
357        exception_types: vec!["ValueError".to_string()],
358        argument_filter: None,
359    });
360
361    let id_cron = store.register_condition(&cron).await.unwrap();
362    let id_event = store.register_condition(&event).await.unwrap();
363    let id_status = store.register_condition(&status).await.unwrap();
364    let id_result = store.register_condition(&result).await.unwrap();
365    let id_exception = store.register_condition(&exception).await.unwrap();
366
367    let all = store.get_all_conditions().await.unwrap();
368    let all_ids: std::collections::HashSet<String> =
369        all.iter().map(|(id, _)| id.as_str().to_owned()).collect();
370
371    assert!(
372        all_ids.contains(id_cron.as_str()),
373        "get_all_conditions must include Cron conditions"
374    );
375    assert!(
376        all_ids.contains(id_event.as_str()),
377        "get_all_conditions must include Event conditions"
378    );
379    assert!(
380        all_ids.contains(id_status.as_str()),
381        "get_all_conditions must include Status conditions"
382    );
383    assert!(
384        all_ids.contains(id_result.as_str()),
385        "get_all_conditions must include Result conditions"
386    );
387    assert!(
388        all_ids.contains(id_exception.as_str()),
389        "get_all_conditions must include Exception conditions"
390    );
391    assert!(
392        all.len() >= 5,
393        "Expected at least 5 conditions, got {}",
394        all.len()
395    );
396}
397
398/// Macro to generate all trigger suite tests for a given setup expression.
399///
400/// # Example
401///
402/// ```rust,ignore
403/// use rustvello_test_suite::trigger_suite;
404/// use rustvello_mem::trigger::MemTriggerStore;
405///
406/// trigger_suite!(MemTriggerStore::new());
407/// ```
408#[macro_export]
409macro_rules! trigger_suite {
410    ($setup:expr) => {
411        #[tokio::test]
412        async fn suite_trigger_register_and_get_condition() {
413            let store = $setup;
414            $crate::trigger::test_register_and_get_condition(&store).await;
415        }
416
417        #[tokio::test]
418        async fn suite_trigger_get_conditions_for_task() {
419            let store = $setup;
420            $crate::trigger::test_get_conditions_for_task(&store).await;
421        }
422
423        #[tokio::test]
424        async fn suite_trigger_get_event_conditions() {
425            let store = $setup;
426            $crate::trigger::test_get_event_conditions(&store).await;
427        }
428
429        #[tokio::test]
430        async fn suite_trigger_get_cron_conditions() {
431            let store = $setup;
432            $crate::trigger::test_get_cron_conditions(&store).await;
433        }
434
435        #[tokio::test]
436        async fn suite_trigger_register_and_get_trigger() {
437            let store = $setup;
438            $crate::trigger::test_register_and_get_trigger(&store).await;
439        }
440
441        #[tokio::test]
442        async fn suite_trigger_get_triggers_for_condition() {
443            let store = $setup;
444            $crate::trigger::test_get_triggers_for_condition(&store).await;
445        }
446
447        #[tokio::test]
448        async fn suite_trigger_remove_triggers_for_task() {
449            let store = $setup;
450            $crate::trigger::test_remove_triggers_for_task(&store).await;
451        }
452
453        #[tokio::test]
454        async fn suite_trigger_valid_condition_lifecycle() {
455            let store = $setup;
456            $crate::trigger::test_valid_condition_lifecycle(&store).await;
457        }
458
459        #[tokio::test]
460        async fn suite_trigger_cron_execution_optimistic_lock() {
461            let store = $setup;
462            $crate::trigger::test_cron_execution_optimistic_lock(&store).await;
463        }
464
465        #[tokio::test]
466        async fn suite_trigger_claim_trigger_run_dedup() {
467            let store = $setup;
468            $crate::trigger::test_claim_trigger_run_dedup(&store).await;
469        }
470
471        #[tokio::test]
472        async fn suite_trigger_purge_clears_all() {
473            let store = $setup;
474            $crate::trigger::test_purge_clears_all(&store).await;
475        }
476
477        #[tokio::test]
478        async fn suite_trigger_multiple_cron_conditions() {
479            let store = $setup;
480            $crate::trigger::test_multiple_cron_conditions(&store).await;
481        }
482
483        #[tokio::test]
484        async fn suite_trigger_cron_execution_history() {
485            let store = $setup;
486            $crate::trigger::test_cron_execution_history(&store).await;
487        }
488
489        #[tokio::test]
490        async fn suite_trigger_cron_sequential_executions() {
491            let store = $setup;
492            $crate::trigger::test_cron_sequential_executions(&store).await;
493        }
494
495        #[tokio::test]
496        async fn suite_trigger_and_logic() {
497            let store = $setup;
498            $crate::trigger::test_trigger_and_logic(&store).await;
499        }
500
501        #[tokio::test]
502        async fn suite_trigger_get_all_conditions() {
503            let store = $setup;
504            $crate::trigger::test_get_all_conditions(&store).await;
505        }
506    };
507}
508
509/// Async-setup variant of [`trigger_suite!`] for testcontainers backends.
510///
511/// `$setup` is an async expression returning `(_guard, trigger_store)`.
512/// Tests are `#[ignore = "requires Docker"]`.
513#[macro_export]
514macro_rules! async_trigger_suite {
515    ($setup:expr) => {
516        #[tokio::test]
517        #[ignore = "requires Docker"]
518        async fn suite_trigger_register_and_get_condition() {
519            let (_c, store) = $setup.await;
520            $crate::trigger::test_register_and_get_condition(&store).await;
521        }
522
523        #[tokio::test]
524        #[ignore = "requires Docker"]
525        async fn suite_trigger_get_conditions_for_task() {
526            let (_c, store) = $setup.await;
527            $crate::trigger::test_get_conditions_for_task(&store).await;
528        }
529
530        #[tokio::test]
531        #[ignore = "requires Docker"]
532        async fn suite_trigger_get_event_conditions() {
533            let (_c, store) = $setup.await;
534            $crate::trigger::test_get_event_conditions(&store).await;
535        }
536
537        #[tokio::test]
538        #[ignore = "requires Docker"]
539        async fn suite_trigger_get_cron_conditions() {
540            let (_c, store) = $setup.await;
541            $crate::trigger::test_get_cron_conditions(&store).await;
542        }
543
544        #[tokio::test]
545        #[ignore = "requires Docker"]
546        async fn suite_trigger_register_and_get_trigger() {
547            let (_c, store) = $setup.await;
548            $crate::trigger::test_register_and_get_trigger(&store).await;
549        }
550
551        #[tokio::test]
552        #[ignore = "requires Docker"]
553        async fn suite_trigger_get_triggers_for_condition() {
554            let (_c, store) = $setup.await;
555            $crate::trigger::test_get_triggers_for_condition(&store).await;
556        }
557
558        #[tokio::test]
559        #[ignore = "requires Docker"]
560        async fn suite_trigger_remove_triggers_for_task() {
561            let (_c, store) = $setup.await;
562            $crate::trigger::test_remove_triggers_for_task(&store).await;
563        }
564
565        #[tokio::test]
566        #[ignore = "requires Docker"]
567        async fn suite_trigger_valid_condition_lifecycle() {
568            let (_c, store) = $setup.await;
569            $crate::trigger::test_valid_condition_lifecycle(&store).await;
570        }
571
572        #[tokio::test]
573        #[ignore = "requires Docker"]
574        async fn suite_trigger_cron_execution_optimistic_lock() {
575            let (_c, store) = $setup.await;
576            $crate::trigger::test_cron_execution_optimistic_lock(&store).await;
577        }
578
579        #[tokio::test]
580        #[ignore = "requires Docker"]
581        async fn suite_trigger_claim_trigger_run_dedup() {
582            let (_c, store) = $setup.await;
583            $crate::trigger::test_claim_trigger_run_dedup(&store).await;
584        }
585
586        #[tokio::test]
587        #[ignore = "requires Docker"]
588        async fn suite_trigger_purge_clears_all() {
589            let (_c, store) = $setup.await;
590            $crate::trigger::test_purge_clears_all(&store).await;
591        }
592
593        #[tokio::test]
594        #[ignore = "requires Docker"]
595        async fn suite_trigger_multiple_cron_conditions() {
596            let (_c, store) = $setup.await;
597            $crate::trigger::test_multiple_cron_conditions(&store).await;
598        }
599
600        #[tokio::test]
601        #[ignore = "requires Docker"]
602        async fn suite_trigger_cron_execution_history() {
603            let (_c, store) = $setup.await;
604            $crate::trigger::test_cron_execution_history(&store).await;
605        }
606
607        #[tokio::test]
608        #[ignore = "requires Docker"]
609        async fn suite_trigger_cron_sequential_executions() {
610            let (_c, store) = $setup.await;
611            $crate::trigger::test_cron_sequential_executions(&store).await;
612        }
613
614        #[tokio::test]
615        #[ignore = "requires Docker"]
616        async fn suite_trigger_and_logic() {
617            let (_c, store) = $setup.await;
618            $crate::trigger::test_trigger_and_logic(&store).await;
619        }
620
621        #[tokio::test]
622        #[ignore = "requires Docker"]
623        async fn suite_trigger_get_all_conditions() {
624            let (_c, store) = $setup.await;
625            $crate::trigger::test_get_all_conditions(&store).await;
626        }
627    };
628}