Skip to main content

oxigdal_workflow/scheduler/
event.rs

1//! Event-driven workflow scheduling.
2
3use crate::error::{Result, WorkflowError};
4use crate::scheduler::SchedulerConfig;
5use chrono::{DateTime, Utc};
6use dashmap::DashMap;
7use regex::Regex;
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::sync::RwLock;
12
13/// Event trigger definition.
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct EventTrigger {
16    /// Event type/name to match.
17    pub event_type: String,
18    /// Event pattern (regex or exact match).
19    pub pattern: EventPattern,
20    /// Event filter conditions.
21    pub filters: Vec<EventFilter>,
22    /// Description of the trigger.
23    pub description: Option<String>,
24}
25
26/// Event pattern matching strategy.
27#[derive(Debug, Clone, Serialize, Deserialize)]
28#[serde(tag = "type")]
29pub enum EventPattern {
30    /// Exact match.
31    Exact {
32        /// Value to match exactly.
33        value: String,
34    },
35    /// Regex pattern.
36    Regex {
37        /// Regex pattern string.
38        pattern: String,
39    },
40    /// Prefix match.
41    Prefix {
42        /// Prefix to match.
43        prefix: String,
44    },
45    /// Suffix match.
46    Suffix {
47        /// Suffix to match.
48        suffix: String,
49    },
50    /// Contains match.
51    Contains {
52        /// Substring to search for.
53        substring: String,
54    },
55}
56
57/// Event filter for conditional matching.
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct EventFilter {
60    /// Field path to check (dot-separated).
61    pub field: String,
62    /// Filter operator.
63    pub operator: FilterOperator,
64    /// Filter value.
65    pub value: serde_json::Value,
66}
67
68/// Filter operator enumeration.
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
70pub enum FilterOperator {
71    /// Equal to.
72    Eq,
73    /// Not equal to.
74    Ne,
75    /// Greater than.
76    Gt,
77    /// Greater than or equal.
78    Gte,
79    /// Less than.
80    Lt,
81    /// Less than or equal.
82    Lte,
83    /// Contains (for arrays/strings).
84    Contains,
85    /// Exists (field is present).
86    Exists,
87}
88
89/// Workflow event for triggering executions.
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct WorkflowEvent {
92    /// Event ID.
93    pub id: String,
94    /// Event type.
95    pub event_type: String,
96    /// Event timestamp.
97    pub timestamp: DateTime<Utc>,
98    /// Event payload.
99    pub payload: serde_json::Value,
100    /// Event source.
101    pub source: String,
102    /// Event metadata.
103    pub metadata: HashMap<String, String>,
104}
105
106impl WorkflowEvent {
107    /// Create a new workflow event.
108    pub fn new<S: Into<String>>(event_type: S, payload: serde_json::Value) -> Self {
109        Self {
110            id: uuid::Uuid::new_v4().to_string(),
111            event_type: event_type.into(),
112            timestamp: Utc::now(),
113            payload,
114            source: "system".to_string(),
115            metadata: HashMap::new(),
116        }
117    }
118
119    /// Set the event source.
120    pub fn with_source<S: Into<String>>(mut self, source: S) -> Self {
121        self.source = source.into();
122        self
123    }
124
125    /// Add metadata to the event.
126    pub fn with_metadata<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
127        self.metadata.insert(key.into(), value.into());
128        self
129    }
130
131    /// Get a value from the payload by field path.
132    fn get_field_value(&self, field: &str) -> Option<&serde_json::Value> {
133        let parts: Vec<&str> = field.split('.').collect();
134        let mut current = &self.payload;
135
136        for part in parts {
137            current = current.get(part)?;
138        }
139
140        Some(current)
141    }
142}
143
144impl EventTrigger {
145    /// Create a new event trigger with exact matching.
146    pub fn exact<S: Into<String>>(event_type: S, value: S) -> Self {
147        Self {
148            event_type: event_type.into(),
149            pattern: EventPattern::Exact {
150                value: value.into(),
151            },
152            filters: Vec::new(),
153            description: None,
154        }
155    }
156
157    /// Create a new event trigger with regex matching.
158    pub fn regex<S: Into<String>>(event_type: S, pattern: S) -> Result<Self> {
159        let pattern_str = pattern.into();
160
161        // Validate regex
162        Regex::new(&pattern_str)
163            .map_err(|e| WorkflowError::validation(format!("Invalid regex pattern: {}", e)))?;
164
165        Ok(Self {
166            event_type: event_type.into(),
167            pattern: EventPattern::Regex {
168                pattern: pattern_str,
169            },
170            filters: Vec::new(),
171            description: None,
172        })
173    }
174
175    /// Add a filter to this trigger.
176    pub fn with_filter(mut self, filter: EventFilter) -> Self {
177        self.filters.push(filter);
178        self
179    }
180
181    /// Set the description.
182    pub fn with_description<S: Into<String>>(mut self, description: S) -> Self {
183        self.description = Some(description.into());
184        self
185    }
186
187    /// Check if this trigger matches the given event.
188    pub fn matches(&self, event: &WorkflowEvent) -> Result<bool> {
189        // Check event type
190        if event.event_type != self.event_type {
191            return Ok(false);
192        }
193
194        // Check pattern matching (if applicable)
195        // Skip pattern matching if payload is not a string or pattern is empty
196        let pattern_matches = match &self.pattern {
197            EventPattern::Exact { value } => {
198                if value.is_empty() {
199                    // Empty pattern matches any payload (useful for filter-only matching)
200                    true
201                } else {
202                    event.payload.as_str() == Some(value)
203                }
204            }
205            EventPattern::Regex { pattern } => {
206                let re = Regex::new(pattern)
207                    .map_err(|e| WorkflowError::validation(format!("Invalid regex: {}", e)))?;
208                event
209                    .payload
210                    .as_str()
211                    .map(|s| re.is_match(s))
212                    .unwrap_or(false)
213            }
214            EventPattern::Prefix { prefix } => event
215                .payload
216                .as_str()
217                .map(|s| s.starts_with(prefix))
218                .unwrap_or(false),
219            EventPattern::Suffix { suffix } => event
220                .payload
221                .as_str()
222                .map(|s| s.ends_with(suffix))
223                .unwrap_or(false),
224            EventPattern::Contains { substring } => event
225                .payload
226                .as_str()
227                .map(|s| s.contains(substring.as_str()))
228                .unwrap_or(false),
229        };
230
231        if !pattern_matches {
232            return Ok(false);
233        }
234
235        // Check all filters
236        for filter in &self.filters {
237            if !self.evaluate_filter(filter, event)? {
238                return Ok(false);
239            }
240        }
241
242        Ok(true)
243    }
244
245    /// Evaluate a single filter against an event.
246    fn evaluate_filter(&self, filter: &EventFilter, event: &WorkflowEvent) -> Result<bool> {
247        let field_value = event.get_field_value(&filter.field);
248
249        match filter.operator {
250            FilterOperator::Exists => Ok(field_value.is_some()),
251            FilterOperator::Eq => Ok(field_value == Some(&filter.value)),
252            FilterOperator::Ne => Ok(field_value != Some(&filter.value)),
253            FilterOperator::Gt => {
254                if let (Some(field), Some(value)) =
255                    (field_value.and_then(|v| v.as_f64()), filter.value.as_f64())
256                {
257                    Ok(field > value)
258                } else {
259                    Ok(false)
260                }
261            }
262            FilterOperator::Gte => {
263                if let (Some(field), Some(value)) =
264                    (field_value.and_then(|v| v.as_f64()), filter.value.as_f64())
265                {
266                    Ok(field >= value)
267                } else {
268                    Ok(false)
269                }
270            }
271            FilterOperator::Lt => {
272                if let (Some(field), Some(value)) =
273                    (field_value.and_then(|v| v.as_f64()), filter.value.as_f64())
274                {
275                    Ok(field < value)
276                } else {
277                    Ok(false)
278                }
279            }
280            FilterOperator::Lte => {
281                if let (Some(field), Some(value)) =
282                    (field_value.and_then(|v| v.as_f64()), filter.value.as_f64())
283                {
284                    Ok(field <= value)
285                } else {
286                    Ok(false)
287                }
288            }
289            FilterOperator::Contains => {
290                if let Some(field_array) = field_value.and_then(|v| v.as_array()) {
291                    Ok(field_array.contains(&filter.value))
292                } else if let (Some(field_str), Some(value_str)) =
293                    (field_value.and_then(|v| v.as_str()), filter.value.as_str())
294                {
295                    Ok(field_str.contains(value_str))
296                } else {
297                    Ok(false)
298                }
299            }
300        }
301    }
302}
303
304/// Event scheduler for managing event-driven workflow executions.
305pub struct EventScheduler {
306    /// Scheduler configuration (reserved for future enhancements).
307    _config: SchedulerConfig,
308    triggers: Arc<DashMap<String, EventTrigger>>,
309    event_queue: Arc<RwLock<Vec<WorkflowEvent>>>,
310}
311
312impl EventScheduler {
313    /// Create a new event scheduler.
314    pub fn new(config: SchedulerConfig) -> Self {
315        Self {
316            _config: config,
317            triggers: Arc::new(DashMap::new()),
318            event_queue: Arc::new(RwLock::new(Vec::new())),
319        }
320    }
321
322    /// Register a new event trigger.
323    pub async fn register_trigger(&self, trigger_id: String, trigger: EventTrigger) -> Result<()> {
324        self.triggers.insert(trigger_id, trigger);
325        Ok(())
326    }
327
328    /// Unregister an event trigger.
329    pub async fn unregister_trigger(&self, trigger_id: &str) -> Result<()> {
330        self.triggers
331            .remove(trigger_id)
332            .ok_or_else(|| WorkflowError::not_found(trigger_id))?;
333        Ok(())
334    }
335
336    /// Publish an event to the scheduler.
337    pub async fn publish_event(&self, event: WorkflowEvent) -> Result<Vec<String>> {
338        let mut matched_triggers = Vec::new();
339
340        for entry in self.triggers.iter() {
341            let (trigger_id, trigger) = (entry.key(), entry.value());
342            if trigger.matches(&event)? {
343                matched_triggers.push(trigger_id.clone());
344            }
345        }
346
347        // Add event to queue
348        let mut queue = self.event_queue.write().await;
349        queue.push(event);
350
351        // Keep queue size manageable
352        if queue.len() > 1000 {
353            queue.remove(0);
354        }
355
356        Ok(matched_triggers)
357    }
358
359    /// Get recent events.
360    pub async fn get_recent_events(&self, limit: usize) -> Vec<WorkflowEvent> {
361        let queue = self.event_queue.read().await;
362        let start = if queue.len() > limit {
363            queue.len() - limit
364        } else {
365            0
366        };
367        queue[start..].to_vec()
368    }
369
370    /// Clear event queue.
371    pub async fn clear_queue(&self) {
372        let mut queue = self.event_queue.write().await;
373        queue.clear();
374    }
375}
376
377#[cfg(test)]
378mod tests {
379    use super::*;
380
381    #[test]
382    fn test_event_trigger_exact_match() {
383        let trigger = EventTrigger::exact("test_event", "test_value");
384        let event = WorkflowEvent::new("test_event", serde_json::json!("test_value"));
385
386        assert!(trigger.matches(&event).expect("Match failed"));
387    }
388
389    #[test]
390    fn test_event_trigger_regex_match() {
391        let trigger =
392            EventTrigger::regex("test_event", r"^test_.*").expect("Failed to create trigger");
393        let event = WorkflowEvent::new("test_event", serde_json::json!("test_value"));
394
395        assert!(trigger.matches(&event).expect("Match failed"));
396    }
397
398    #[test]
399    fn test_event_filter() {
400        let filter = EventFilter {
401            field: "value".to_string(),
402            operator: FilterOperator::Gt,
403            value: serde_json::json!(10),
404        };
405
406        let trigger = EventTrigger::exact("test_event", "").with_filter(filter);
407
408        let event = WorkflowEvent::new("test_event", serde_json::json!({"value": 15}));
409
410        assert!(trigger.matches(&event).expect("Match failed"));
411    }
412
413    #[tokio::test]
414    async fn test_event_scheduler() {
415        let scheduler = EventScheduler::new(SchedulerConfig::default());
416
417        let trigger = EventTrigger::exact("test_event", "test");
418        scheduler
419            .register_trigger("trigger1".to_string(), trigger)
420            .await
421            .expect("Failed to register trigger");
422
423        let event = WorkflowEvent::new("test_event", serde_json::json!("test"));
424        let matched = scheduler
425            .publish_event(event)
426            .await
427            .expect("Failed to publish event");
428
429        assert_eq!(matched.len(), 1);
430        assert_eq!(matched[0], "trigger1");
431    }
432}