1use std::collections::HashMap;
8use std::str::FromStr;
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use chrono::{DateTime, Utc};
13use croner::Cron;
14
15use crate::error::RustvelloResult;
16use rustvello_proto::identifiers::TaskId;
17use rustvello_proto::trigger::{
18 ConditionContext, ConditionId, TriggerCondition, TriggerDefinitionDTO, TriggerDefinitionId,
19 TriggerLogic, TriggerRunId, ValidCondition,
20};
21
22#[async_trait]
31pub trait TriggerStore: Send + Sync {
32 async fn register_condition(
36 &self,
37 condition: &TriggerCondition,
38 ) -> RustvelloResult<ConditionId>;
39
40 async fn get_condition(&self, id: &ConditionId) -> RustvelloResult<Option<TriggerCondition>>;
42
43 async fn get_conditions_for_task(
45 &self,
46 task_id: &TaskId,
47 ) -> RustvelloResult<Vec<(ConditionId, TriggerCondition)>>;
48
49 async fn get_cron_conditions(&self) -> RustvelloResult<Vec<(ConditionId, TriggerCondition)>>;
51
52 async fn get_event_conditions(
54 &self,
55 event_code: &str,
56 ) -> RustvelloResult<Vec<(ConditionId, TriggerCondition)>>;
57
58 async fn register_trigger(&self, trigger: &TriggerDefinitionDTO) -> RustvelloResult<()>;
62
63 async fn get_trigger(
65 &self,
66 id: &TriggerDefinitionId,
67 ) -> RustvelloResult<Option<TriggerDefinitionDTO>>;
68
69 async fn get_triggers_for_condition(
71 &self,
72 cond_id: &ConditionId,
73 ) -> RustvelloResult<Vec<TriggerDefinitionDTO>>;
74
75 async fn remove_triggers_for_task(&self, task_id: &TaskId) -> RustvelloResult<u32>;
77
78 async fn record_valid_condition(&self, vc: &ValidCondition) -> RustvelloResult<()>;
82
83 async fn get_valid_conditions(&self) -> RustvelloResult<Vec<ValidCondition>>;
85
86 async fn clear_valid_conditions(&self, ids: &[String]) -> RustvelloResult<()>;
88
89 async fn get_last_cron_execution(
93 &self,
94 cond_id: &ConditionId,
95 ) -> RustvelloResult<Option<DateTime<Utc>>>;
96
97 async fn store_cron_execution(
100 &self,
101 cond_id: &ConditionId,
102 time: DateTime<Utc>,
103 expected_last: Option<DateTime<Utc>>,
104 ) -> RustvelloResult<bool>;
105
106 async fn claim_trigger_run(&self, run_id: &TriggerRunId) -> RustvelloResult<bool>;
110
111 async fn purge(&self) -> RustvelloResult<()>;
113
114 async fn get_all_conditions(&self) -> RustvelloResult<Vec<(ConditionId, TriggerCondition)>>;
119}
120
121#[derive(Clone)]
131pub struct TriggerManager {
132 store: Arc<dyn TriggerStore>,
133}
134
135impl TriggerManager {
136 pub fn new(store: Arc<dyn TriggerStore>) -> Self {
137 Self { store }
138 }
139
140 pub fn store(&self) -> &Arc<dyn TriggerStore> {
142 &self.store
143 }
144
145 async fn evaluate_task_conditions(
149 &self,
150 task_id: &rustvello_proto::identifiers::TaskId,
151 condition_ctx: ConditionContext,
152 ) -> RustvelloResult<Vec<ValidCondition>> {
153 let conditions = self.store.get_conditions_for_task(task_id).await?;
154 let mut valid = Vec::new();
155
156 for (cond_id, cond) in &conditions {
157 if cond.is_satisfied_by(&condition_ctx) {
158 let vc = ValidCondition::new(cond_id.clone(), condition_ctx.clone());
159 self.store.record_valid_condition(&vc).await?;
160 valid.push(vc);
161 }
162 }
163
164 Ok(valid)
165 }
166
167 pub async fn report_status_change(
169 &self,
170 ctx: &rustvello_proto::trigger::StatusContext,
171 ) -> RustvelloResult<Vec<ValidCondition>> {
172 self.evaluate_task_conditions(&ctx.task_id, ConditionContext::Status(ctx.clone()))
173 .await
174 }
175
176 pub async fn report_result(
178 &self,
179 ctx: &rustvello_proto::trigger::ResultContext,
180 ) -> RustvelloResult<Vec<ValidCondition>> {
181 self.evaluate_task_conditions(&ctx.task_id, ConditionContext::Result(ctx.clone()))
182 .await
183 }
184
185 pub async fn report_failure(
187 &self,
188 ctx: &rustvello_proto::trigger::ExceptionContext,
189 ) -> RustvelloResult<Vec<ValidCondition>> {
190 self.evaluate_task_conditions(&ctx.task_id, ConditionContext::Exception(ctx.clone()))
191 .await
192 }
193
194 pub async fn emit_event(
197 &self,
198 event_code: &str,
199 payload: serde_json::Value,
200 ) -> RustvelloResult<String> {
201 let event_id = uuid::Uuid::new_v4().to_string();
202 let event_ctx = rustvello_proto::trigger::EventContext {
203 event_id: event_id.clone(),
204 event_code: event_code.to_string(),
205 payload,
206 };
207 let condition_ctx = ConditionContext::Event(event_ctx);
208
209 let conditions = self.store.get_event_conditions(event_code).await?;
210 for (cond_id, cond) in &conditions {
211 if cond.is_satisfied_by(&condition_ctx) {
212 let vc = ValidCondition::new(cond_id.clone(), condition_ctx.clone());
213 self.store.record_valid_condition(&vc).await?;
214 }
215 }
216
217 Ok(event_id)
218 }
219
220 pub async fn evaluate_cron_conditions(&self) -> RustvelloResult<Vec<ValidCondition>> {
230 let cron_conditions = self.store.get_cron_conditions().await?;
231 let now = Utc::now();
232 let mut valid = Vec::new();
233
234 for (cond_id, cond) in &cron_conditions {
235 if let rustvello_proto::trigger::TriggerCondition::Cron(cron) = cond {
236 let schedule = match Cron::from_str(&cron.cron_expression) {
238 Ok(s) => s,
239 Err(e) => {
240 tracing::error!(
241 "Cron condition {} has invalid expression {:?}: {}",
242 cond_id,
243 cron.cron_expression,
244 e
245 );
246 continue;
247 }
248 };
249
250 let last_exec = self.store.get_last_cron_execution(cond_id).await?;
252 let interval_ok = match last_exec {
253 Some(last) => {
254 (now - last).num_seconds()
255 >= i64::try_from(cron.min_interval_seconds).unwrap_or(i64::MAX)
256 }
257 None => true,
258 };
259 if !interval_ok {
260 continue;
261 }
262
263 let matches = match schedule.is_time_matching(&now) {
265 Ok(m) => m,
266 Err(e) => {
267 tracing::warn!(
268 "Cron match check failed for condition {} (expr {:?}): {}",
269 cond_id,
270 cron.cron_expression,
271 e
272 );
273 continue;
274 }
275 };
276 if !matches {
277 continue;
278 }
279
280 let claimed = self
282 .store
283 .store_cron_execution(cond_id, now, last_exec)
284 .await?;
285
286 if claimed {
287 let ctx = ConditionContext::Cron(rustvello_proto::trigger::CronContext {
288 timestamp: now,
289 last_execution: last_exec,
290 });
291 let vc = ValidCondition::new(cond_id.clone(), ctx);
292 self.store.record_valid_condition(&vc).await?;
293 valid.push(vc);
294 }
295 }
296 }
297
298 Ok(valid)
299 }
300
301 pub async fn evaluate_triggers(
307 &self,
308 ) -> RustvelloResult<Vec<(TriggerDefinitionDTO, serde_json::Value)>> {
309 let valid_conditions = self.store.get_valid_conditions().await?;
310 if valid_conditions.is_empty() {
311 return Ok(vec![]);
312 }
313
314 let mut by_condition: HashMap<ConditionId, Vec<&ValidCondition>> = HashMap::new();
316 for vc in &valid_conditions {
317 by_condition
318 .entry(vc.condition_id.clone())
319 .or_default()
320 .push(vc);
321 }
322
323 let mut trigger_map: HashMap<TriggerDefinitionId, TriggerDefinitionDTO> = HashMap::new();
325 for cond_id in by_condition.keys() {
326 let triggers = self.store.get_triggers_for_condition(cond_id).await?;
327 for t in triggers {
328 trigger_map.entry(t.trigger_id.clone()).or_insert(t);
329 }
330 }
331
332 let mut to_invoke = Vec::new();
333 let mut to_clear: Vec<String> = Vec::new();
334
335 for trigger in trigger_map.values() {
336 match trigger.logic {
337 TriggerLogic::And => {
338 let all_satisfied = trigger
340 .condition_ids
341 .iter()
342 .all(|cid| by_condition.contains_key(cid));
343
344 if all_satisfied {
345 let mut vc_ids: Vec<String> = trigger
347 .condition_ids
348 .iter()
349 .filter_map(|cid| {
350 by_condition.get(cid).and_then(|vcs| {
351 vcs.first().map(|vc| vc.valid_condition_id.clone())
352 })
353 })
354 .collect();
355 vc_ids.sort();
356 let run_id = TriggerRunId::from(format!(
357 "run_{}_{}",
358 trigger.trigger_id.as_str(),
359 vc_ids.join("_")
360 ));
361
362 if self.store.claim_trigger_run(&run_id).await? {
363 let args = trigger
364 .argument_template
365 .clone()
366 .unwrap_or(serde_json::Value::Object(Default::default()));
367 to_invoke.push((trigger.clone(), args));
368
369 for cid in &trigger.condition_ids {
371 if let Some(vcs) = by_condition.get(cid) {
372 for vc in vcs {
373 to_clear.push(vc.valid_condition_id.clone());
374 }
375 }
376 }
377 }
378 }
379 }
380 TriggerLogic::Or => {
381 for cid in &trigger.condition_ids {
383 if let Some(vcs) = by_condition.get(cid) {
384 for vc in vcs {
385 let run_id = TriggerRunId::from(format!(
386 "run_{}_{}",
387 trigger.trigger_id.as_str(),
388 vc.valid_condition_id
389 ));
390
391 if self.store.claim_trigger_run(&run_id).await? {
392 let args = trigger
393 .argument_template
394 .clone()
395 .unwrap_or(serde_json::Value::Object(Default::default()));
396 to_invoke.push((trigger.clone(), args));
397 to_clear.push(vc.valid_condition_id.clone());
398 }
399 }
400 }
401 }
402 }
403 _ => {
404 tracing::warn!(
406 trigger_id = %trigger.trigger_id,
407 logic = ?trigger.logic,
408 "Unknown TriggerLogic variant; falling back to Or semantics"
409 );
410 for cid in &trigger.condition_ids {
411 if let Some(vcs) = by_condition.get(cid) {
412 for vc in vcs {
413 let run_id = TriggerRunId::from(format!(
414 "run_{}_{}",
415 trigger.trigger_id.as_str(),
416 vc.valid_condition_id
417 ));
418 if self.store.claim_trigger_run(&run_id).await? {
419 let args = trigger
420 .argument_template
421 .clone()
422 .unwrap_or(serde_json::Value::Object(Default::default()));
423 to_invoke.push((trigger.clone(), args));
424 to_clear.push(vc.valid_condition_id.clone());
425 }
426 }
427 }
428 }
429 }
430 }
431 }
432
433 if !to_clear.is_empty() {
435 self.store.clear_valid_conditions(&to_clear).await?;
436 }
437
438 Ok(to_invoke)
439 }
440}
441
442#[cfg(test)]
443mod tests {
444 use super::*;
445
446 #[test]
450 fn trigger_logic_display() {
451 assert_eq!(TriggerLogic::And.to_string(), "AND");
452 assert_eq!(TriggerLogic::Or.to_string(), "OR");
453 }
454}