use rustvello_core::error::{RustvelloError, RustvelloResult};
use rustvello_core::trigger::TriggerStore;
use rustvello_proto::identifiers::TaskId;
use rustvello_proto::status::InvocationStatus;
use rustvello_proto::trigger::{
CronCondition, EventCondition, ExceptionCondition, ResultCondition, StatusCondition,
TriggerCondition, TriggerDefinitionDTO, TriggerLogic,
};
use std::str::FromStr;
use std::sync::Arc;
#[must_use]
pub struct TriggerBuilder {
conditions: Vec<TriggerCondition>,
logic: TriggerLogic,
argument_template: Option<serde_json::Value>,
}
impl TriggerBuilder {
pub fn new() -> Self {
Self {
conditions: Vec::new(),
logic: TriggerLogic::And,
argument_template: None,
}
}
pub fn on_cron(self, expression: &str) -> RustvelloResult<Self> {
self.on_cron_with_interval(expression, 50)
}
pub fn on_cron_with_interval(
mut self,
expression: &str,
min_interval_seconds: u64,
) -> RustvelloResult<Self> {
croner::Cron::from_str(expression).map_err(|e| RustvelloError::Configuration {
message: format!("invalid cron expression {:?}: {}", expression, e),
})?;
self.conditions.push(TriggerCondition::Cron(CronCondition {
cron_expression: expression.to_string(),
min_interval_seconds,
}));
Ok(self)
}
pub fn on_status(mut self, task_id: &TaskId, statuses: &[InvocationStatus]) -> Self {
self.conditions
.push(TriggerCondition::Status(StatusCondition {
task_id: task_id.clone(),
statuses: statuses.to_vec(),
argument_filter: None,
}));
self
}
pub fn on_event(mut self, event_code: &str) -> Self {
self.conditions
.push(TriggerCondition::Event(EventCondition {
event_code: event_code.to_string(),
payload_filter: None,
}));
self
}
pub fn on_result(mut self, task_id: &TaskId) -> Self {
self.conditions
.push(TriggerCondition::Result(ResultCondition {
task_id: task_id.clone(),
argument_filter: None,
result_filter: None,
}));
self
}
pub fn on_any_result(self, task_id: &TaskId) -> Self {
self.on_result(task_id)
}
pub fn on_exception(mut self, task_id: &TaskId, exception_types: &[&str]) -> Self {
self.conditions
.push(TriggerCondition::Exception(ExceptionCondition {
task_id: task_id.clone(),
exception_types: exception_types.iter().map(ToString::to_string).collect(),
argument_filter: None,
}));
self
}
pub fn with_logic(mut self, logic: TriggerLogic) -> Self {
self.logic = logic;
self
}
pub fn with_static_args(mut self, args: serde_json::Value) -> Self {
self.argument_template = Some(args);
self
}
pub fn build(self, target_task_id: &TaskId) -> RustvelloResult<TriggerDefinitionDTO> {
if self.conditions.is_empty() {
return Err(RustvelloError::Configuration {
message: "trigger must have at least one condition".into(),
});
}
let condition_ids: Vec<_> = self
.conditions
.iter()
.map(TriggerCondition::condition_id)
.collect();
let trigger_id =
TriggerDefinitionDTO::compute_trigger_id(target_task_id, &condition_ids, self.logic);
Ok(TriggerDefinitionDTO {
trigger_id,
task_id: target_task_id.clone(),
condition_ids,
logic: self.logic,
argument_template: self.argument_template,
})
}
pub async fn build_and_register(
self,
target_task_id: &TaskId,
store: &Arc<dyn TriggerStore>,
) -> RustvelloResult<TriggerDefinitionDTO> {
if self.conditions.is_empty() {
return Err(RustvelloError::Configuration {
message: "trigger must have at least one condition".into(),
});
}
let mut condition_ids = Vec::with_capacity(self.conditions.len());
for cond in &self.conditions {
let cid = store.register_condition(cond).await?;
condition_ids.push(cid);
}
let trigger_id =
TriggerDefinitionDTO::compute_trigger_id(target_task_id, &condition_ids, self.logic);
let definition = TriggerDefinitionDTO {
trigger_id,
task_id: target_task_id.clone(),
condition_ids,
logic: self.logic,
argument_template: self.argument_template,
};
store.register_trigger(&definition).await?;
Ok(definition)
}
pub fn conditions(&self) -> &[TriggerCondition] {
&self.conditions
}
}
impl Default for TriggerBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn builder_empty_conditions_error() {
let result = TriggerBuilder::new().build(&TaskId::new("mod", "task"));
assert!(result.is_err());
}
#[test]
fn builder_single_condition() {
let task_id = TaskId::new("mod", "target");
let trigger = TriggerBuilder::new()
.on_event("payment")
.build(&task_id)
.unwrap();
assert_eq!(trigger.task_id, task_id);
assert_eq!(trigger.condition_ids.len(), 1);
assert_eq!(trigger.logic, TriggerLogic::And);
}
#[test]
fn builder_multiple_conditions_or_logic() {
let source = TaskId::new("mod", "source");
let target = TaskId::new("mod", "target");
let trigger = TriggerBuilder::new()
.on_status(&source, &[InvocationStatus::Success])
.on_event("manual_trigger")
.with_logic(TriggerLogic::Or)
.with_static_args(serde_json::json!({"key": "value"}))
.build(&target)
.unwrap();
assert_eq!(trigger.condition_ids.len(), 2);
assert_eq!(trigger.logic, TriggerLogic::Or);
assert!(trigger.argument_template.is_some());
}
#[test]
fn builder_cron() {
let target = TaskId::new("mod", "cleanup");
let trigger = TriggerBuilder::new()
.on_cron("0 * * * *")
.unwrap()
.build(&target)
.unwrap();
assert_eq!(trigger.condition_ids.len(), 1);
}
#[test]
fn builder_cron_invalid_expression() {
let result = TriggerBuilder::new().on_cron("garbage");
assert!(result.is_err());
}
#[test]
fn builder_exception_condition() {
let source = TaskId::new("mod", "risky_task");
let target = TaskId::new("mod", "alert_task");
let trigger = TriggerBuilder::new()
.on_exception(&source, &["TimeoutError", "ConnectionError"])
.build(&target)
.unwrap();
assert_eq!(trigger.condition_ids.len(), 1);
}
#[test]
fn builder_result_condition() {
let source = TaskId::new("mod", "producer");
let target = TaskId::new("mod", "consumer");
let trigger = TriggerBuilder::new()
.on_result(&source)
.build(&target)
.unwrap();
assert_eq!(trigger.condition_ids.len(), 1);
}
}