Skip to main content

rustvello_core/
trigger.rs

1//! Trigger system — backend trait and evaluation logic.
2//!
3//! Provides [`TriggerStore`] (async trait for persistence) and
4//! [`TriggerManager`] (business logic for condition evaluation and
5//! trigger firing).
6
7use std::collections::HashMap;
8use std::str::FromStr;
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use chrono::{DateTime, Utc};
13use croner::Cron;
14
15use crate::error::RustvelloResult;
16use rustvello_proto::identifiers::TaskId;
17use rustvello_proto::trigger::{
18    ConditionContext, ConditionId, TriggerCondition, TriggerDefinitionDTO, TriggerDefinitionId,
19    TriggerLogic, TriggerRunId, ValidCondition,
20};
21
22// ---------------------------------------------------------------------------
23// TriggerStore — backend trait
24// ---------------------------------------------------------------------------
25
26/// Async persistence interface for the trigger subsystem.
27///
28/// Mirrors pynenc's `BaseTrigger` storage methods. Implementations must
29/// be thread-safe (`Send + Sync`).
30#[async_trait]
31pub trait TriggerStore: Send + Sync {
32    // -- Condition CRUD --
33
34    /// Register a condition and return its deterministic ID.
35    async fn register_condition(
36        &self,
37        condition: &TriggerCondition,
38    ) -> RustvelloResult<ConditionId>;
39
40    /// Get a condition by ID.
41    async fn get_condition(&self, id: &ConditionId) -> RustvelloResult<Option<TriggerCondition>>;
42
43    /// Get all conditions that watch a specific task.
44    async fn get_conditions_for_task(
45        &self,
46        task_id: &TaskId,
47    ) -> RustvelloResult<Vec<(ConditionId, TriggerCondition)>>;
48
49    /// Get all cron conditions.
50    async fn get_cron_conditions(&self) -> RustvelloResult<Vec<(ConditionId, TriggerCondition)>>;
51
52    /// Get all event conditions matching an event code.
53    async fn get_event_conditions(
54        &self,
55        event_code: &str,
56    ) -> RustvelloResult<Vec<(ConditionId, TriggerCondition)>>;
57
58    // -- Trigger definition CRUD --
59
60    /// Register a trigger definition.
61    async fn register_trigger(&self, trigger: &TriggerDefinitionDTO) -> RustvelloResult<()>;
62
63    /// Get a trigger definition by ID.
64    async fn get_trigger(
65        &self,
66        id: &TriggerDefinitionId,
67    ) -> RustvelloResult<Option<TriggerDefinitionDTO>>;
68
69    /// Get all trigger definitions that reference a given condition.
70    async fn get_triggers_for_condition(
71        &self,
72        cond_id: &ConditionId,
73    ) -> RustvelloResult<Vec<TriggerDefinitionDTO>>;
74
75    /// Remove all trigger definitions for a task.
76    async fn remove_triggers_for_task(&self, task_id: &TaskId) -> RustvelloResult<u32>;
77
78    // -- Valid condition management --
79
80    /// Record a condition that has been evaluated and found satisfied.
81    async fn record_valid_condition(&self, vc: &ValidCondition) -> RustvelloResult<()>;
82
83    /// Get all pending valid conditions.
84    async fn get_valid_conditions(&self) -> RustvelloResult<Vec<ValidCondition>>;
85
86    /// Clear valid conditions by their IDs (after processing).
87    async fn clear_valid_conditions(&self, ids: &[String]) -> RustvelloResult<()>;
88
89    // -- Cron state --
90
91    /// Get the last cron execution time for a condition.
92    async fn get_last_cron_execution(
93        &self,
94        cond_id: &ConditionId,
95    ) -> RustvelloResult<Option<DateTime<Utc>>>;
96
97    /// Store a cron execution time with optimistic locking.
98    /// Returns `true` if the store succeeded (expected_last matched).
99    async fn store_cron_execution(
100        &self,
101        cond_id: &ConditionId,
102        time: DateTime<Utc>,
103        expected_last: Option<DateTime<Utc>>,
104    ) -> RustvelloResult<bool>;
105
106    // -- Execution claims (distributed dedup) --
107
108    /// Attempt to claim a trigger run. Returns `true` if this is first claim.
109    async fn claim_trigger_run(&self, run_id: &TriggerRunId) -> RustvelloResult<bool>;
110
111    /// Purge all trigger data.
112    async fn purge(&self) -> RustvelloResult<()>;
113
114    /// Get all registered conditions regardless of type.
115    ///
116    /// Every backend **must** return the complete set of stored conditions
117    /// (Cron, Status, Event, Result, Exception, Composite).
118    async fn get_all_conditions(&self) -> RustvelloResult<Vec<(ConditionId, TriggerCondition)>>;
119}
120
121// ---------------------------------------------------------------------------
122// TriggerManager — evaluation logic
123// ---------------------------------------------------------------------------
124
125/// Business logic layer for the trigger system.
126///
127/// Wraps a `dyn TriggerStore` and implements condition evaluation,
128/// trigger firing, and execution dedup. Modelled after pynenc's
129/// `BaseTrigger` evaluation methods.
130#[derive(Clone)]
131pub struct TriggerManager {
132    store: Arc<dyn TriggerStore>,
133}
134
135impl TriggerManager {
136    pub fn new(store: Arc<dyn TriggerStore>) -> Self {
137        Self { store }
138    }
139
140    /// Access the underlying store (for registration passthrough).
141    pub fn store(&self) -> &Arc<dyn TriggerStore> {
142        &self.store
143    }
144
145    // -- Event reporting (called by runner after task completion) --
146
147    /// Match conditions for a task against a context, record valid ones, and return them.
148    async fn evaluate_task_conditions(
149        &self,
150        task_id: &rustvello_proto::identifiers::TaskId,
151        condition_ctx: ConditionContext,
152    ) -> RustvelloResult<Vec<ValidCondition>> {
153        let conditions = self.store.get_conditions_for_task(task_id).await?;
154        let mut valid = Vec::new();
155
156        for (cond_id, cond) in &conditions {
157            if cond.is_satisfied_by(&condition_ctx) {
158                let vc = ValidCondition::new(cond_id.clone(), condition_ctx.clone());
159                self.store.record_valid_condition(&vc).await?;
160                valid.push(vc);
161            }
162        }
163
164        Ok(valid)
165    }
166
167    /// Report a status change — finds and records matching StatusConditions.
168    pub async fn report_status_change(
169        &self,
170        ctx: &rustvello_proto::trigger::StatusContext,
171    ) -> RustvelloResult<Vec<ValidCondition>> {
172        self.evaluate_task_conditions(&ctx.task_id, ConditionContext::Status(ctx.clone()))
173            .await
174    }
175
176    /// Report a successful task result — finds and records matching ResultConditions.
177    pub async fn report_result(
178        &self,
179        ctx: &rustvello_proto::trigger::ResultContext,
180    ) -> RustvelloResult<Vec<ValidCondition>> {
181        self.evaluate_task_conditions(&ctx.task_id, ConditionContext::Result(ctx.clone()))
182            .await
183    }
184
185    /// Report a task failure — finds and records matching ExceptionConditions.
186    pub async fn report_failure(
187        &self,
188        ctx: &rustvello_proto::trigger::ExceptionContext,
189    ) -> RustvelloResult<Vec<ValidCondition>> {
190        self.evaluate_task_conditions(&ctx.task_id, ConditionContext::Exception(ctx.clone()))
191            .await
192    }
193
194    /// Emit a custom event — finds and records matching EventConditions.
195    /// Returns the generated event ID.
196    pub async fn emit_event(
197        &self,
198        event_code: &str,
199        payload: serde_json::Value,
200    ) -> RustvelloResult<String> {
201        let event_id = uuid::Uuid::new_v4().to_string();
202        let event_ctx = rustvello_proto::trigger::EventContext {
203            event_id: event_id.clone(),
204            event_code: event_code.to_string(),
205            payload,
206        };
207        let condition_ctx = ConditionContext::Event(event_ctx);
208
209        let conditions = self.store.get_event_conditions(event_code).await?;
210        for (cond_id, cond) in &conditions {
211            if cond.is_satisfied_by(&condition_ctx) {
212                let vc = ValidCondition::new(cond_id.clone(), condition_ctx.clone());
213                self.store.record_valid_condition(&vc).await?;
214            }
215        }
216
217        Ok(event_id)
218    }
219
220    // -- Cron evaluation --
221
222    /// Evaluate all cron conditions against the current time.
223    ///
224    /// For each cron condition:
225    /// 1. Parse the `cron_expression` with the `croner` crate; log and skip on syntax error.
226    /// 2. Check whether the current minute matches the schedule via `is_time_matched`.
227    /// 3. Also enforce `min_interval_seconds` to prevent double-firing within the same minute.
228    /// 4. Use optimistic locking (`store_cron_execution`) across multiple runner instances.
229    pub async fn evaluate_cron_conditions(&self) -> RustvelloResult<Vec<ValidCondition>> {
230        let cron_conditions = self.store.get_cron_conditions().await?;
231        let now = Utc::now();
232        let mut valid = Vec::new();
233
234        for (cond_id, cond) in &cron_conditions {
235            if let rustvello_proto::trigger::TriggerCondition::Cron(cron) = cond {
236                // Parse and validate the cron expression.
237                let schedule = match Cron::from_str(&cron.cron_expression) {
238                    Ok(s) => s,
239                    Err(e) => {
240                        tracing::error!(
241                            "Cron condition {} has invalid expression {:?}: {}",
242                            cond_id,
243                            cron.cron_expression,
244                            e
245                        );
246                        continue;
247                    }
248                };
249
250                // Check minimum interval (prevents double-firing within the same schedule slot).
251                let last_exec = self.store.get_last_cron_execution(cond_id).await?;
252                let interval_ok = match last_exec {
253                    Some(last) => {
254                        (now - last).num_seconds()
255                            >= i64::try_from(cron.min_interval_seconds).unwrap_or(i64::MAX)
256                    }
257                    None => true,
258                };
259                if !interval_ok {
260                    continue;
261                }
262
263                // Check if the current time matches the cron schedule.
264                let matches = match schedule.is_time_matching(&now) {
265                    Ok(m) => m,
266                    Err(e) => {
267                        tracing::warn!(
268                            "Cron match check failed for condition {} (expr {:?}): {}",
269                            cond_id,
270                            cron.cron_expression,
271                            e
272                        );
273                        continue;
274                    }
275                };
276                if !matches {
277                    continue;
278                }
279
280                // Optimistic lock — only one runner claims this execution slot.
281                let claimed = self
282                    .store
283                    .store_cron_execution(cond_id, now, last_exec)
284                    .await?;
285
286                if claimed {
287                    let ctx = ConditionContext::Cron(rustvello_proto::trigger::CronContext {
288                        timestamp: now,
289                        last_execution: last_exec,
290                    });
291                    let vc = ValidCondition::new(cond_id.clone(), ctx);
292                    self.store.record_valid_condition(&vc).await?;
293                    valid.push(vc);
294                }
295            }
296        }
297
298        Ok(valid)
299    }
300
301    // -- Trigger evaluation pipeline --
302
303    /// Process all pending valid conditions and determine which triggers should fire.
304    ///
305    /// Returns a list of (trigger definition, arguments) pairs ready for invocation.
306    pub async fn evaluate_triggers(
307        &self,
308    ) -> RustvelloResult<Vec<(TriggerDefinitionDTO, serde_json::Value)>> {
309        let valid_conditions = self.store.get_valid_conditions().await?;
310        if valid_conditions.is_empty() {
311            return Ok(vec![]);
312        }
313
314        // Group valid conditions by condition_id
315        let mut by_condition: HashMap<ConditionId, Vec<&ValidCondition>> = HashMap::new();
316        for vc in &valid_conditions {
317            by_condition
318                .entry(vc.condition_id.clone())
319                .or_default()
320                .push(vc);
321        }
322
323        // Find all trigger definitions affected by these conditions
324        let mut trigger_map: HashMap<TriggerDefinitionId, TriggerDefinitionDTO> = HashMap::new();
325        for cond_id in by_condition.keys() {
326            let triggers = self.store.get_triggers_for_condition(cond_id).await?;
327            for t in triggers {
328                trigger_map.entry(t.trigger_id.clone()).or_insert(t);
329            }
330        }
331
332        let mut to_invoke = Vec::new();
333        let mut to_clear: Vec<String> = Vec::new();
334
335        for trigger in trigger_map.values() {
336            match trigger.logic {
337                TriggerLogic::And => {
338                    // All conditions must be satisfied
339                    let all_satisfied = trigger
340                        .condition_ids
341                        .iter()
342                        .all(|cid| by_condition.contains_key(cid));
343
344                    if all_satisfied {
345                        // Build a run ID from all valid condition IDs (deterministic)
346                        let mut vc_ids: Vec<String> = trigger
347                            .condition_ids
348                            .iter()
349                            .filter_map(|cid| {
350                                by_condition.get(cid).and_then(|vcs| {
351                                    vcs.first().map(|vc| vc.valid_condition_id.clone())
352                                })
353                            })
354                            .collect();
355                        vc_ids.sort();
356                        let run_id = TriggerRunId::from(format!(
357                            "run_{}_{}",
358                            trigger.trigger_id.as_str(),
359                            vc_ids.join("_")
360                        ));
361
362                        if self.store.claim_trigger_run(&run_id).await? {
363                            let args = trigger
364                                .argument_template
365                                .clone()
366                                .unwrap_or(serde_json::Value::Object(Default::default()));
367                            to_invoke.push((trigger.clone(), args));
368
369                            // Mark all valid conditions used in this trigger for clearing
370                            for cid in &trigger.condition_ids {
371                                if let Some(vcs) = by_condition.get(cid) {
372                                    for vc in vcs {
373                                        to_clear.push(vc.valid_condition_id.clone());
374                                    }
375                                }
376                            }
377                        }
378                    }
379                }
380                TriggerLogic::Or => {
381                    // Any condition is sufficient — one invocation per valid condition
382                    for cid in &trigger.condition_ids {
383                        if let Some(vcs) = by_condition.get(cid) {
384                            for vc in vcs {
385                                let run_id = TriggerRunId::from(format!(
386                                    "run_{}_{}",
387                                    trigger.trigger_id.as_str(),
388                                    vc.valid_condition_id
389                                ));
390
391                                if self.store.claim_trigger_run(&run_id).await? {
392                                    let args = trigger
393                                        .argument_template
394                                        .clone()
395                                        .unwrap_or(serde_json::Value::Object(Default::default()));
396                                    to_invoke.push((trigger.clone(), args));
397                                    to_clear.push(vc.valid_condition_id.clone());
398                                }
399                            }
400                        }
401                    }
402                }
403                _ => {
404                    // Future logic variants: treat like Or for forward-compat
405                    tracing::warn!(
406                        trigger_id = %trigger.trigger_id,
407                        logic = ?trigger.logic,
408                        "Unknown TriggerLogic variant; falling back to Or semantics"
409                    );
410                    for cid in &trigger.condition_ids {
411                        if let Some(vcs) = by_condition.get(cid) {
412                            for vc in vcs {
413                                let run_id = TriggerRunId::from(format!(
414                                    "run_{}_{}",
415                                    trigger.trigger_id.as_str(),
416                                    vc.valid_condition_id
417                                ));
418                                if self.store.claim_trigger_run(&run_id).await? {
419                                    let args = trigger
420                                        .argument_template
421                                        .clone()
422                                        .unwrap_or(serde_json::Value::Object(Default::default()));
423                                    to_invoke.push((trigger.clone(), args));
424                                    to_clear.push(vc.valid_condition_id.clone());
425                                }
426                            }
427                        }
428                    }
429                }
430            }
431        }
432
433        // Clear processed valid conditions
434        if !to_clear.is_empty() {
435            self.store.clear_valid_conditions(&to_clear).await?;
436        }
437
438        Ok(to_invoke)
439    }
440}
441
442#[cfg(test)]
443mod tests {
444    use super::*;
445
446    // TriggerManager tests require a backend — see rustvello-mem tests
447    // and integration tests. Here we just verify construction.
448
449    #[test]
450    fn trigger_logic_display() {
451        assert_eq!(TriggerLogic::And.to_string(), "AND");
452        assert_eq!(TriggerLogic::Or.to_string(), "OR");
453    }
454}