Skip to main content

rustvello_proto/trigger/
condition.rs

1//! Trigger condition types and polymorphic condition enum.
2
3use serde::{Deserialize, Serialize};
4use std::collections::BTreeMap;
5
6use crate::identifiers::TaskId;
7use crate::status::InvocationStatus;
8
9use super::context::ConditionContext;
10use super::filter::{argument_filter_id, result_filter_id};
11use super::filter::{check_argument_filter, check_payload_filter, check_result_filter};
12use super::ConditionId;
13
14// ---------------------------------------------------------------------------
15// Condition types
16// ---------------------------------------------------------------------------
17
18/// A condition that fires on a cron schedule.
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct CronCondition {
21    /// Cron expression (e.g. "0 * * * *").
22    pub cron_expression: String,
23    /// Minimum seconds between firings (default 50, prevents double-fire).
24    pub min_interval_seconds: u64,
25}
26
27/// A condition that fires on invocation status changes.
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct StatusCondition {
30    /// The task to watch.
31    pub task_id: TaskId,
32    /// Which statuses trigger the condition.
33    pub statuses: Vec<InvocationStatus>,
34    /// Optional argument subset match filter (None = match any).
35    #[serde(default, skip_serializing_if = "Option::is_none")]
36    pub argument_filter: Option<BTreeMap<String, serde_json::Value>>,
37}
38
39/// A condition that fires on custom application events.
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct EventCondition {
42    /// Event code to match.
43    pub event_code: String,
44    /// Optional payload subset match filter (None = match any).
45    #[serde(default, skip_serializing_if = "Option::is_none")]
46    pub payload_filter: Option<BTreeMap<String, serde_json::Value>>,
47}
48
49/// A condition that fires on successful task completion.
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct ResultCondition {
52    /// The task whose results to watch.
53    pub task_id: TaskId,
54    /// Optional argument subset match filter (None = match any).
55    #[serde(default, skip_serializing_if = "Option::is_none")]
56    pub argument_filter: Option<BTreeMap<String, serde_json::Value>>,
57    /// Optional result value equality filter (None = match any result).
58    #[serde(default, skip_serializing_if = "Option::is_none")]
59    pub result_filter: Option<serde_json::Value>,
60}
61
62/// A condition that fires on task failure with specific error types.
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct ExceptionCondition {
65    /// The task whose failures to watch.
66    pub task_id: TaskId,
67    /// Error type names to match (empty = match all errors).
68    pub exception_types: Vec<String>,
69    /// Optional argument subset match filter (None = match any).
70    #[serde(default, skip_serializing_if = "Option::is_none")]
71    pub argument_filter: Option<BTreeMap<String, serde_json::Value>>,
72}
73
74// ---------------------------------------------------------------------------
75// Polymorphic condition enum
76// ---------------------------------------------------------------------------
77
78/// Logic for combining conditions inside a `CompositeCondition`.
79#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
80#[non_exhaustive]
81pub enum CompositeLogic {
82    And,
83    Or,
84}
85
86/// A composite condition that combines children with AND/OR logic.
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct CompositeCondition {
89    pub logic: CompositeLogic,
90    pub children: Vec<TriggerCondition>,
91}
92
93/// A trigger condition — polymorphic via enum.
94///
95/// Each variant maps 1:1 to a pynenc TriggerCondition subclass.
96#[derive(Debug, Clone, Serialize, Deserialize)]
97#[non_exhaustive]
98pub enum TriggerCondition {
99    Cron(CronCondition),
100    Status(StatusCondition),
101    Event(EventCondition),
102    Result(ResultCondition),
103    Exception(ExceptionCondition),
104    Composite(CompositeCondition),
105}
106
107impl std::fmt::Display for TriggerCondition {
108    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109        match self {
110            Self::Cron(c) => write!(f, "Cron({})", c.cron_expression),
111            Self::Status(c) => write!(f, "Status(task={})", c.task_id),
112            Self::Event(c) => write!(f, "Event({})", c.event_code),
113            Self::Result(c) => write!(f, "Result(task={})", c.task_id),
114            Self::Exception(c) => write!(f, "Exception(task={})", c.task_id),
115            Self::Composite(c) => {
116                write!(f, "Composite({:?}, {} children)", c.logic, c.children.len())
117            }
118        }
119    }
120}
121
122impl TriggerCondition {
123    /// Compute a deterministic condition ID matching pynenc's format.
124    ///
125    /// Status values use the lowercase StrEnum representation (e.g. `"success"`,
126    /// `"concurrency_controlled"`) — produced by lowercasing the UPPER_SNAKE
127    /// `Display` output of `InvocationStatus`.
128    pub fn condition_id(&self) -> ConditionId {
129        match self {
130            Self::Cron(c) => ConditionId(format!("cron_{}", c.cron_expression)),
131            Self::Status(c) => {
132                let mut statuses: Vec<String> = c
133                    .statuses
134                    .iter()
135                    .map(|s| s.to_string().to_lowercase())
136                    .collect();
137                statuses.sort();
138                let statuses_str = statuses.join("_");
139                let filter_id = argument_filter_id(&c.argument_filter);
140                ConditionId(format!(
141                    "condition#{}#{}#{}",
142                    c.task_id, statuses_str, filter_id
143                ))
144            }
145            Self::Event(c) => {
146                let filter_id = argument_filter_id(&c.payload_filter);
147                ConditionId(format!("event#{}#{}", c.event_code, filter_id))
148            }
149            Self::Result(c) => {
150                // Build the inner status condition_id (always SUCCESS)
151                let status_cond = Self::Status(StatusCondition {
152                    task_id: c.task_id.clone(),
153                    statuses: vec![InvocationStatus::Success],
154                    argument_filter: c.argument_filter.clone(),
155                });
156                let base_id = status_cond.condition_id();
157                let rf_id = result_filter_id(&c.result_filter);
158                ConditionId(format!("{}_result_{}", base_id.0, rf_id))
159            }
160            Self::Exception(c) => {
161                // Build the inner status condition_id (always FAILED)
162                let status_cond = Self::Status(StatusCondition {
163                    task_id: c.task_id.clone(),
164                    statuses: vec![InvocationStatus::Failed],
165                    argument_filter: c.argument_filter.clone(),
166                });
167                let base_id = status_cond.condition_id();
168                let exception_str = if c.exception_types.is_empty() {
169                    "any".to_string()
170                } else {
171                    let mut types = c.exception_types.clone();
172                    types.sort();
173                    types.join("_")
174                };
175                ConditionId(format!("{}_exception_{}", base_id.0, exception_str))
176            }
177            Self::Composite(c) => {
178                let mut child_ids: Vec<String> =
179                    c.children.iter().map(|ch| ch.condition_id().0).collect();
180                child_ids.sort();
181                let logic = format!("{:?}", c.logic).to_lowercase();
182                ConditionId(format!("composite#{}#{}", logic, child_ids.join(",")))
183            }
184        }
185    }
186
187    /// Returns the task IDs this condition watches (empty for Cron/Event/Composite).
188    pub fn source_task_ids(&self) -> Vec<TaskId> {
189        match self {
190            Self::Cron(_) | Self::Event(_) => vec![],
191            Self::Status(c) => vec![c.task_id.clone()],
192            Self::Result(c) => vec![c.task_id.clone()],
193            Self::Exception(c) => vec![c.task_id.clone()],
194            Self::Composite(c) => {
195                let mut ids: Vec<TaskId> =
196                    c.children.iter().flat_map(Self::source_task_ids).collect();
197                ids.sort_by_key(std::string::ToString::to_string);
198                ids.dedup_by(|a, b| a.to_string() == b.to_string());
199                ids
200            }
201        }
202    }
203
204    /// Check if this condition is satisfied by the given context.
205    pub fn is_satisfied_by(&self, ctx: &ConditionContext) -> bool {
206        match (self, ctx) {
207            (Self::Cron(_), ConditionContext::Cron(_)) => {
208                // Cron satisfaction is handled externally via schedule checks
209                true
210            }
211            (Self::Status(cond), ConditionContext::Status(ctx)) => {
212                cond.task_id == ctx.task_id
213                    && cond.statuses.contains(&ctx.status)
214                    && check_argument_filter(&cond.argument_filter, &ctx.arguments)
215            }
216            (Self::Event(cond), ConditionContext::Event(ctx)) => {
217                cond.event_code == ctx.event_code
218                    && check_payload_filter(&cond.payload_filter, &ctx.payload)
219            }
220            (Self::Result(cond), ConditionContext::Result(ctx)) => {
221                cond.task_id == ctx.task_id
222                    && check_argument_filter(&cond.argument_filter, &ctx.arguments)
223                    && check_result_filter(&cond.result_filter, &ctx.result)
224            }
225            (Self::Exception(cond), ConditionContext::Exception(ctx)) => {
226                cond.task_id == ctx.task_id
227                    && (cond.exception_types.is_empty()
228                        || cond.exception_types.contains(&ctx.error_type))
229                    && check_argument_filter(&cond.argument_filter, &ctx.arguments)
230            }
231            (Self::Composite(cond), ctx) => match cond.logic {
232                CompositeLogic::And => cond.children.iter().all(|c| c.is_satisfied_by(ctx)),
233                CompositeLogic::Or => cond.children.iter().any(|c| c.is_satisfied_by(ctx)),
234            },
235            _ => false, // mismatched condition/context types
236        }
237    }
238}