oxigdal_workflow/scheduler/
event.rs1use crate::error::{Result, WorkflowError};
4use crate::scheduler::SchedulerConfig;
5use chrono::{DateTime, Utc};
6use dashmap::DashMap;
7use regex::Regex;
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::sync::RwLock;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct EventTrigger {
16 pub event_type: String,
18 pub pattern: EventPattern,
20 pub filters: Vec<EventFilter>,
22 pub description: Option<String>,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
28#[serde(tag = "type")]
29pub enum EventPattern {
30 Exact {
32 value: String,
34 },
35 Regex {
37 pattern: String,
39 },
40 Prefix {
42 prefix: String,
44 },
45 Suffix {
47 suffix: String,
49 },
50 Contains {
52 substring: String,
54 },
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct EventFilter {
60 pub field: String,
62 pub operator: FilterOperator,
64 pub value: serde_json::Value,
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
70pub enum FilterOperator {
71 Eq,
73 Ne,
75 Gt,
77 Gte,
79 Lt,
81 Lte,
83 Contains,
85 Exists,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct WorkflowEvent {
92 pub id: String,
94 pub event_type: String,
96 pub timestamp: DateTime<Utc>,
98 pub payload: serde_json::Value,
100 pub source: String,
102 pub metadata: HashMap<String, String>,
104}
105
106impl WorkflowEvent {
107 pub fn new<S: Into<String>>(event_type: S, payload: serde_json::Value) -> Self {
109 Self {
110 id: uuid::Uuid::new_v4().to_string(),
111 event_type: event_type.into(),
112 timestamp: Utc::now(),
113 payload,
114 source: "system".to_string(),
115 metadata: HashMap::new(),
116 }
117 }
118
119 pub fn with_source<S: Into<String>>(mut self, source: S) -> Self {
121 self.source = source.into();
122 self
123 }
124
125 pub fn with_metadata<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
127 self.metadata.insert(key.into(), value.into());
128 self
129 }
130
131 fn get_field_value(&self, field: &str) -> Option<&serde_json::Value> {
133 let parts: Vec<&str> = field.split('.').collect();
134 let mut current = &self.payload;
135
136 for part in parts {
137 current = current.get(part)?;
138 }
139
140 Some(current)
141 }
142}
143
144impl EventTrigger {
145 pub fn exact<S: Into<String>>(event_type: S, value: S) -> Self {
147 Self {
148 event_type: event_type.into(),
149 pattern: EventPattern::Exact {
150 value: value.into(),
151 },
152 filters: Vec::new(),
153 description: None,
154 }
155 }
156
157 pub fn regex<S: Into<String>>(event_type: S, pattern: S) -> Result<Self> {
159 let pattern_str = pattern.into();
160
161 Regex::new(&pattern_str)
163 .map_err(|e| WorkflowError::validation(format!("Invalid regex pattern: {}", e)))?;
164
165 Ok(Self {
166 event_type: event_type.into(),
167 pattern: EventPattern::Regex {
168 pattern: pattern_str,
169 },
170 filters: Vec::new(),
171 description: None,
172 })
173 }
174
175 pub fn with_filter(mut self, filter: EventFilter) -> Self {
177 self.filters.push(filter);
178 self
179 }
180
181 pub fn with_description<S: Into<String>>(mut self, description: S) -> Self {
183 self.description = Some(description.into());
184 self
185 }
186
187 pub fn matches(&self, event: &WorkflowEvent) -> Result<bool> {
189 if event.event_type != self.event_type {
191 return Ok(false);
192 }
193
194 let pattern_matches = match &self.pattern {
197 EventPattern::Exact { value } => {
198 if value.is_empty() {
199 true
201 } else {
202 event.payload.as_str() == Some(value)
203 }
204 }
205 EventPattern::Regex { pattern } => {
206 let re = Regex::new(pattern)
207 .map_err(|e| WorkflowError::validation(format!("Invalid regex: {}", e)))?;
208 event
209 .payload
210 .as_str()
211 .map(|s| re.is_match(s))
212 .unwrap_or(false)
213 }
214 EventPattern::Prefix { prefix } => event
215 .payload
216 .as_str()
217 .map(|s| s.starts_with(prefix))
218 .unwrap_or(false),
219 EventPattern::Suffix { suffix } => event
220 .payload
221 .as_str()
222 .map(|s| s.ends_with(suffix))
223 .unwrap_or(false),
224 EventPattern::Contains { substring } => event
225 .payload
226 .as_str()
227 .map(|s| s.contains(substring.as_str()))
228 .unwrap_or(false),
229 };
230
231 if !pattern_matches {
232 return Ok(false);
233 }
234
235 for filter in &self.filters {
237 if !self.evaluate_filter(filter, event)? {
238 return Ok(false);
239 }
240 }
241
242 Ok(true)
243 }
244
245 fn evaluate_filter(&self, filter: &EventFilter, event: &WorkflowEvent) -> Result<bool> {
247 let field_value = event.get_field_value(&filter.field);
248
249 match filter.operator {
250 FilterOperator::Exists => Ok(field_value.is_some()),
251 FilterOperator::Eq => Ok(field_value == Some(&filter.value)),
252 FilterOperator::Ne => Ok(field_value != Some(&filter.value)),
253 FilterOperator::Gt => {
254 if let (Some(field), Some(value)) =
255 (field_value.and_then(|v| v.as_f64()), filter.value.as_f64())
256 {
257 Ok(field > value)
258 } else {
259 Ok(false)
260 }
261 }
262 FilterOperator::Gte => {
263 if let (Some(field), Some(value)) =
264 (field_value.and_then(|v| v.as_f64()), filter.value.as_f64())
265 {
266 Ok(field >= value)
267 } else {
268 Ok(false)
269 }
270 }
271 FilterOperator::Lt => {
272 if let (Some(field), Some(value)) =
273 (field_value.and_then(|v| v.as_f64()), filter.value.as_f64())
274 {
275 Ok(field < value)
276 } else {
277 Ok(false)
278 }
279 }
280 FilterOperator::Lte => {
281 if let (Some(field), Some(value)) =
282 (field_value.and_then(|v| v.as_f64()), filter.value.as_f64())
283 {
284 Ok(field <= value)
285 } else {
286 Ok(false)
287 }
288 }
289 FilterOperator::Contains => {
290 if let Some(field_array) = field_value.and_then(|v| v.as_array()) {
291 Ok(field_array.contains(&filter.value))
292 } else if let (Some(field_str), Some(value_str)) =
293 (field_value.and_then(|v| v.as_str()), filter.value.as_str())
294 {
295 Ok(field_str.contains(value_str))
296 } else {
297 Ok(false)
298 }
299 }
300 }
301 }
302}
303
304pub struct EventScheduler {
306 _config: SchedulerConfig,
308 triggers: Arc<DashMap<String, EventTrigger>>,
309 event_queue: Arc<RwLock<Vec<WorkflowEvent>>>,
310}
311
312impl EventScheduler {
313 pub fn new(config: SchedulerConfig) -> Self {
315 Self {
316 _config: config,
317 triggers: Arc::new(DashMap::new()),
318 event_queue: Arc::new(RwLock::new(Vec::new())),
319 }
320 }
321
322 pub async fn register_trigger(&self, trigger_id: String, trigger: EventTrigger) -> Result<()> {
324 self.triggers.insert(trigger_id, trigger);
325 Ok(())
326 }
327
328 pub async fn unregister_trigger(&self, trigger_id: &str) -> Result<()> {
330 self.triggers
331 .remove(trigger_id)
332 .ok_or_else(|| WorkflowError::not_found(trigger_id))?;
333 Ok(())
334 }
335
336 pub async fn publish_event(&self, event: WorkflowEvent) -> Result<Vec<String>> {
338 let mut matched_triggers = Vec::new();
339
340 for entry in self.triggers.iter() {
341 let (trigger_id, trigger) = (entry.key(), entry.value());
342 if trigger.matches(&event)? {
343 matched_triggers.push(trigger_id.clone());
344 }
345 }
346
347 let mut queue = self.event_queue.write().await;
349 queue.push(event);
350
351 if queue.len() > 1000 {
353 queue.remove(0);
354 }
355
356 Ok(matched_triggers)
357 }
358
359 pub async fn get_recent_events(&self, limit: usize) -> Vec<WorkflowEvent> {
361 let queue = self.event_queue.read().await;
362 let start = if queue.len() > limit {
363 queue.len() - limit
364 } else {
365 0
366 };
367 queue[start..].to_vec()
368 }
369
370 pub async fn clear_queue(&self) {
372 let mut queue = self.event_queue.write().await;
373 queue.clear();
374 }
375}
376
377#[cfg(test)]
378mod tests {
379 use super::*;
380
381 #[test]
382 fn test_event_trigger_exact_match() {
383 let trigger = EventTrigger::exact("test_event", "test_value");
384 let event = WorkflowEvent::new("test_event", serde_json::json!("test_value"));
385
386 assert!(trigger.matches(&event).expect("Match failed"));
387 }
388
389 #[test]
390 fn test_event_trigger_regex_match() {
391 let trigger =
392 EventTrigger::regex("test_event", r"^test_.*").expect("Failed to create trigger");
393 let event = WorkflowEvent::new("test_event", serde_json::json!("test_value"));
394
395 assert!(trigger.matches(&event).expect("Match failed"));
396 }
397
398 #[test]
399 fn test_event_filter() {
400 let filter = EventFilter {
401 field: "value".to_string(),
402 operator: FilterOperator::Gt,
403 value: serde_json::json!(10),
404 };
405
406 let trigger = EventTrigger::exact("test_event", "").with_filter(filter);
407
408 let event = WorkflowEvent::new("test_event", serde_json::json!({"value": 15}));
409
410 assert!(trigger.matches(&event).expect("Match failed"));
411 }
412
413 #[tokio::test]
414 async fn test_event_scheduler() {
415 let scheduler = EventScheduler::new(SchedulerConfig::default());
416
417 let trigger = EventTrigger::exact("test_event", "test");
418 scheduler
419 .register_trigger("trigger1".to_string(), trigger)
420 .await
421 .expect("Failed to register trigger");
422
423 let event = WorkflowEvent::new("test_event", serde_json::json!("test"));
424 let matched = scheduler
425 .publish_event(event)
426 .await
427 .expect("Failed to publish event");
428
429 assert_eq!(matched.len(), 1);
430 assert_eq!(matched[0], "trigger1");
431 }
432}