rustvello-core 0.1.3

Core traits and types for the Rustvello distributed task library
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
//! Trigger system — backend trait and evaluation logic.
//!
//! Provides [`TriggerStore`] (async trait for persistence) and
//! [`TriggerManager`] (business logic for condition evaluation and
//! trigger firing).

use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use croner::Cron;

use crate::error::RustvelloResult;
use rustvello_proto::identifiers::TaskId;
use rustvello_proto::trigger::{
    ConditionContext, ConditionId, TriggerCondition, TriggerDefinitionDTO, TriggerDefinitionId,
    TriggerLogic, TriggerRunId, ValidCondition,
};

// ---------------------------------------------------------------------------
// TriggerStore — backend trait
// ---------------------------------------------------------------------------

/// Async persistence interface for the trigger subsystem.
///
/// Mirrors pynenc's `BaseTrigger` storage methods. Implementations must
/// be thread-safe (`Send + Sync`).
#[async_trait]
pub trait TriggerStore: Send + Sync {
    // -- Condition CRUD --

    /// Register a condition and return its deterministic ID.
    async fn register_condition(
        &self,
        condition: &TriggerCondition,
    ) -> RustvelloResult<ConditionId>;

    /// Get a condition by ID.
    async fn get_condition(&self, id: &ConditionId) -> RustvelloResult<Option<TriggerCondition>>;

    /// Get all conditions that watch a specific task.
    async fn get_conditions_for_task(
        &self,
        task_id: &TaskId,
    ) -> RustvelloResult<Vec<(ConditionId, TriggerCondition)>>;

    /// Get all cron conditions.
    async fn get_cron_conditions(&self) -> RustvelloResult<Vec<(ConditionId, TriggerCondition)>>;

    /// Get all event conditions matching an event code.
    async fn get_event_conditions(
        &self,
        event_code: &str,
    ) -> RustvelloResult<Vec<(ConditionId, TriggerCondition)>>;

    // -- Trigger definition CRUD --

    /// Register a trigger definition.
    async fn register_trigger(&self, trigger: &TriggerDefinitionDTO) -> RustvelloResult<()>;

    /// Get a trigger definition by ID.
    async fn get_trigger(
        &self,
        id: &TriggerDefinitionId,
    ) -> RustvelloResult<Option<TriggerDefinitionDTO>>;

    /// Get all trigger definitions that reference a given condition.
    async fn get_triggers_for_condition(
        &self,
        cond_id: &ConditionId,
    ) -> RustvelloResult<Vec<TriggerDefinitionDTO>>;

    /// Remove all trigger definitions for a task.
    async fn remove_triggers_for_task(&self, task_id: &TaskId) -> RustvelloResult<u32>;

    // -- Valid condition management --

    /// Record a condition that has been evaluated and found satisfied.
    async fn record_valid_condition(&self, vc: &ValidCondition) -> RustvelloResult<()>;

    /// Get all pending valid conditions.
    async fn get_valid_conditions(&self) -> RustvelloResult<Vec<ValidCondition>>;

    /// Clear valid conditions by their IDs (after processing).
    async fn clear_valid_conditions(&self, ids: &[String]) -> RustvelloResult<()>;

    // -- Cron state --

    /// Get the last cron execution time for a condition.
    async fn get_last_cron_execution(
        &self,
        cond_id: &ConditionId,
    ) -> RustvelloResult<Option<DateTime<Utc>>>;

    /// Store a cron execution time with optimistic locking.
    /// Returns `true` if the store succeeded (expected_last matched).
    async fn store_cron_execution(
        &self,
        cond_id: &ConditionId,
        time: DateTime<Utc>,
        expected_last: Option<DateTime<Utc>>,
    ) -> RustvelloResult<bool>;

    // -- Execution claims (distributed dedup) --

    /// Attempt to claim a trigger run. Returns `true` if this is first claim.
    async fn claim_trigger_run(&self, run_id: &TriggerRunId) -> RustvelloResult<bool>;

    /// Purge all trigger data.
    async fn purge(&self) -> RustvelloResult<()>;

    /// Get all registered conditions regardless of type.
    ///
    /// Every backend **must** return the complete set of stored conditions
    /// (Cron, Status, Event, Result, Exception, Composite).
    async fn get_all_conditions(&self) -> RustvelloResult<Vec<(ConditionId, TriggerCondition)>>;
}

// ---------------------------------------------------------------------------
// TriggerManager — evaluation logic
// ---------------------------------------------------------------------------

/// Business logic layer for the trigger system.
///
/// Wraps a `dyn TriggerStore` and implements condition evaluation,
/// trigger firing, and execution dedup. Modelled after pynenc's
/// `BaseTrigger` evaluation methods.
#[derive(Clone)]
pub struct TriggerManager {
    store: Arc<dyn TriggerStore>,
}

impl TriggerManager {
    pub fn new(store: Arc<dyn TriggerStore>) -> Self {
        Self { store }
    }

    /// Access the underlying store (for registration passthrough).
    pub fn store(&self) -> &Arc<dyn TriggerStore> {
        &self.store
    }

    // -- Event reporting (called by runner after task completion) --

    /// Match conditions for a task against a context, record valid ones, and return them.
    async fn evaluate_task_conditions(
        &self,
        task_id: &rustvello_proto::identifiers::TaskId,
        condition_ctx: ConditionContext,
    ) -> RustvelloResult<Vec<ValidCondition>> {
        let conditions = self.store.get_conditions_for_task(task_id).await?;
        let mut valid = Vec::new();

        for (cond_id, cond) in &conditions {
            if cond.is_satisfied_by(&condition_ctx) {
                let vc = ValidCondition::new(cond_id.clone(), condition_ctx.clone());
                self.store.record_valid_condition(&vc).await?;
                valid.push(vc);
            }
        }

        Ok(valid)
    }

    /// Report a status change — finds and records matching StatusConditions.
    pub async fn report_status_change(
        &self,
        ctx: &rustvello_proto::trigger::StatusContext,
    ) -> RustvelloResult<Vec<ValidCondition>> {
        self.evaluate_task_conditions(&ctx.task_id, ConditionContext::Status(ctx.clone()))
            .await
    }

    /// Report a successful task result — finds and records matching ResultConditions.
    pub async fn report_result(
        &self,
        ctx: &rustvello_proto::trigger::ResultContext,
    ) -> RustvelloResult<Vec<ValidCondition>> {
        self.evaluate_task_conditions(&ctx.task_id, ConditionContext::Result(ctx.clone()))
            .await
    }

    /// Report a task failure — finds and records matching ExceptionConditions.
    pub async fn report_failure(
        &self,
        ctx: &rustvello_proto::trigger::ExceptionContext,
    ) -> RustvelloResult<Vec<ValidCondition>> {
        self.evaluate_task_conditions(&ctx.task_id, ConditionContext::Exception(ctx.clone()))
            .await
    }

    /// Emit a custom event — finds and records matching EventConditions.
    /// Returns the generated event ID.
    pub async fn emit_event(
        &self,
        event_code: &str,
        payload: serde_json::Value,
    ) -> RustvelloResult<String> {
        let event_id = uuid::Uuid::new_v4().to_string();
        let event_ctx = rustvello_proto::trigger::EventContext {
            event_id: event_id.clone(),
            event_code: event_code.to_string(),
            payload,
        };
        let condition_ctx = ConditionContext::Event(event_ctx);

        let conditions = self.store.get_event_conditions(event_code).await?;
        for (cond_id, cond) in &conditions {
            if cond.is_satisfied_by(&condition_ctx) {
                let vc = ValidCondition::new(cond_id.clone(), condition_ctx.clone());
                self.store.record_valid_condition(&vc).await?;
            }
        }

        Ok(event_id)
    }

    // -- Cron evaluation --

    /// Evaluate all cron conditions against the current time.
    ///
    /// For each cron condition:
    /// 1. Parse the `cron_expression` with the `croner` crate; log and skip on syntax error.
    /// 2. Check whether the current minute matches the schedule via `is_time_matched`.
    /// 3. Also enforce `min_interval_seconds` to prevent double-firing within the same minute.
    /// 4. Use optimistic locking (`store_cron_execution`) across multiple runner instances.
    pub async fn evaluate_cron_conditions(&self) -> RustvelloResult<Vec<ValidCondition>> {
        let cron_conditions = self.store.get_cron_conditions().await?;
        let now = Utc::now();
        let mut valid = Vec::new();

        for (cond_id, cond) in &cron_conditions {
            if let rustvello_proto::trigger::TriggerCondition::Cron(cron) = cond {
                // Parse and validate the cron expression.
                let schedule = match Cron::from_str(&cron.cron_expression) {
                    Ok(s) => s,
                    Err(e) => {
                        tracing::error!(
                            "Cron condition {} has invalid expression {:?}: {}",
                            cond_id,
                            cron.cron_expression,
                            e
                        );
                        continue;
                    }
                };

                // Check minimum interval (prevents double-firing within the same schedule slot).
                let last_exec = self.store.get_last_cron_execution(cond_id).await?;
                let interval_ok = match last_exec {
                    Some(last) => {
                        (now - last).num_seconds()
                            >= i64::try_from(cron.min_interval_seconds).unwrap_or(i64::MAX)
                    }
                    None => true,
                };
                if !interval_ok {
                    continue;
                }

                // Check if the current time matches the cron schedule.
                let matches = match schedule.is_time_matching(&now) {
                    Ok(m) => m,
                    Err(e) => {
                        tracing::warn!(
                            "Cron match check failed for condition {} (expr {:?}): {}",
                            cond_id,
                            cron.cron_expression,
                            e
                        );
                        continue;
                    }
                };
                if !matches {
                    continue;
                }

                // Optimistic lock — only one runner claims this execution slot.
                let claimed = self
                    .store
                    .store_cron_execution(cond_id, now, last_exec)
                    .await?;

                if claimed {
                    let ctx = ConditionContext::Cron(rustvello_proto::trigger::CronContext {
                        timestamp: now,
                        last_execution: last_exec,
                    });
                    let vc = ValidCondition::new(cond_id.clone(), ctx);
                    self.store.record_valid_condition(&vc).await?;
                    valid.push(vc);
                }
            }
        }

        Ok(valid)
    }

    // -- Trigger evaluation pipeline --

    /// Process all pending valid conditions and determine which triggers should fire.
    ///
    /// Returns a list of (trigger definition, arguments) pairs ready for invocation.
    pub async fn evaluate_triggers(
        &self,
    ) -> RustvelloResult<Vec<(TriggerDefinitionDTO, serde_json::Value)>> {
        let valid_conditions = self.store.get_valid_conditions().await?;
        if valid_conditions.is_empty() {
            return Ok(vec![]);
        }

        // Group valid conditions by condition_id
        let mut by_condition: HashMap<ConditionId, Vec<&ValidCondition>> = HashMap::new();
        for vc in &valid_conditions {
            by_condition
                .entry(vc.condition_id.clone())
                .or_default()
                .push(vc);
        }

        // Find all trigger definitions affected by these conditions
        let mut trigger_map: HashMap<TriggerDefinitionId, TriggerDefinitionDTO> = HashMap::new();
        for cond_id in by_condition.keys() {
            let triggers = self.store.get_triggers_for_condition(cond_id).await?;
            for t in triggers {
                trigger_map.entry(t.trigger_id.clone()).or_insert(t);
            }
        }

        let mut to_invoke = Vec::new();
        let mut to_clear: Vec<String> = Vec::new();

        for trigger in trigger_map.values() {
            match trigger.logic {
                TriggerLogic::And => {
                    // All conditions must be satisfied
                    let all_satisfied = trigger
                        .condition_ids
                        .iter()
                        .all(|cid| by_condition.contains_key(cid));

                    if all_satisfied {
                        // Build a run ID from all valid condition IDs (deterministic)
                        let mut vc_ids: Vec<String> = trigger
                            .condition_ids
                            .iter()
                            .filter_map(|cid| {
                                by_condition.get(cid).and_then(|vcs| {
                                    vcs.first().map(|vc| vc.valid_condition_id.clone())
                                })
                            })
                            .collect();
                        vc_ids.sort();
                        let run_id = TriggerRunId::from(format!(
                            "run_{}_{}",
                            trigger.trigger_id.as_str(),
                            vc_ids.join("_")
                        ));

                        if self.store.claim_trigger_run(&run_id).await? {
                            let args = trigger
                                .argument_template
                                .clone()
                                .unwrap_or(serde_json::Value::Object(Default::default()));
                            to_invoke.push((trigger.clone(), args));

                            // Mark all valid conditions used in this trigger for clearing
                            for cid in &trigger.condition_ids {
                                if let Some(vcs) = by_condition.get(cid) {
                                    for vc in vcs {
                                        to_clear.push(vc.valid_condition_id.clone());
                                    }
                                }
                            }
                        }
                    }
                }
                TriggerLogic::Or => {
                    // Any condition is sufficient — one invocation per valid condition
                    for cid in &trigger.condition_ids {
                        if let Some(vcs) = by_condition.get(cid) {
                            for vc in vcs {
                                let run_id = TriggerRunId::from(format!(
                                    "run_{}_{}",
                                    trigger.trigger_id.as_str(),
                                    vc.valid_condition_id
                                ));

                                if self.store.claim_trigger_run(&run_id).await? {
                                    let args = trigger
                                        .argument_template
                                        .clone()
                                        .unwrap_or(serde_json::Value::Object(Default::default()));
                                    to_invoke.push((trigger.clone(), args));
                                    to_clear.push(vc.valid_condition_id.clone());
                                }
                            }
                        }
                    }
                }
                _ => {
                    // Future logic variants: treat like Or for forward-compat
                    tracing::warn!(
                        trigger_id = %trigger.trigger_id,
                        logic = ?trigger.logic,
                        "Unknown TriggerLogic variant; falling back to Or semantics"
                    );
                    for cid in &trigger.condition_ids {
                        if let Some(vcs) = by_condition.get(cid) {
                            for vc in vcs {
                                let run_id = TriggerRunId::from(format!(
                                    "run_{}_{}",
                                    trigger.trigger_id.as_str(),
                                    vc.valid_condition_id
                                ));
                                if self.store.claim_trigger_run(&run_id).await? {
                                    let args = trigger
                                        .argument_template
                                        .clone()
                                        .unwrap_or(serde_json::Value::Object(Default::default()));
                                    to_invoke.push((trigger.clone(), args));
                                    to_clear.push(vc.valid_condition_id.clone());
                                }
                            }
                        }
                    }
                }
            }
        }

        // Clear processed valid conditions
        if !to_clear.is_empty() {
            self.store.clear_valid_conditions(&to_clear).await?;
        }

        Ok(to_invoke)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    // TriggerManager tests require a backend — see rustvello-mem tests
    // and integration tests. Here we just verify construction.

    #[test]
    fn trigger_logic_display() {
        assert_eq!(TriggerLogic::And.to_string(), "AND");
        assert_eq!(TriggerLogic::Or.to_string(), "OR");
    }
}