use crate::base::component::action::{Action, ActionStatus};
use crate::base::component::event::Event;
use crate::base::entity::message::MessagePriority;
use chrono::{DateTime, Datelike, Timelike, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use uuid::Uuid;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum TriggerStatus {
#[default]
Pending,
Processing,
Completed,
Failed,
Cancelled,
Skipped,
Expired,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TriggerCondition {
pub event_type_pattern: String,
pub source_pattern: Option<String>,
pub payload_conditions: Vec<String>,
pub min_priority: Option<MessagePriority>,
pub time_conditions: Option<TimeCondition>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeCondition {
pub active_hours: Option<(u8, u8)>,
pub active_days: Option<Vec<u8>>,
pub cooldown_seconds: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TriggerConfig {
pub max_retries: u32,
pub timeout_seconds: u64,
pub preserve_after_completion: bool,
pub ttl_seconds: u64,
pub processing_priority: MessagePriority,
}
impl Default for TriggerConfig {
fn default() -> Self {
Self {
max_retries: 3,
timeout_seconds: 300, preserve_after_completion: true,
ttl_seconds: 3600, processing_priority: MessagePriority::Normal,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Trigger {
pub id: Uuid,
pub name: String,
pub description: String,
pub source: String,
pub target: String,
pub triggering_event: Event,
pub action: Action,
pub condition: TriggerCondition,
pub config: TriggerConfig,
pub status: TriggerStatus,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub processed_at: Option<DateTime<Utc>>,
pub attempt_count: u32,
pub worker_id: Option<String>,
pub metadata: HashMap<String, serde_json::Value>,
pub correlation_chain: Vec<Uuid>,
}
impl Trigger {
pub fn new(
name: String,
description: String,
source: String,
target: String,
triggering_event: Event,
action: Action,
condition: TriggerCondition,
) -> Self {
let now = Utc::now();
Self {
id: Uuid::new_v4(),
name,
description,
source,
target,
triggering_event,
action,
condition,
config: TriggerConfig::default(),
status: TriggerStatus::Pending,
created_at: now,
updated_at: now,
processed_at: None,
attempt_count: 0,
worker_id: None,
metadata: HashMap::new(),
correlation_chain: Vec::new(),
}
}
#[allow(clippy::too_many_arguments)]
pub fn with_config(
name: String,
description: String,
source: String,
target: String,
triggering_event: Event,
action: Action,
condition: TriggerCondition,
config: TriggerConfig,
) -> Self {
let mut trigger = Self::new(
name,
description,
source,
target,
triggering_event,
action,
condition,
);
trigger.config = config;
trigger
}
pub fn matches_event(&self, event: &Event) -> bool {
if !self.matches_pattern(&self.condition.event_type_pattern, &event.event_type) {
return false;
}
if let Some(source_pattern) = &self.condition.source_pattern
&& !self.matches_pattern(source_pattern, &event.source)
{
return false;
}
if let Some(time_cond) = &self.condition.time_conditions
&& !self.matches_time_condition(time_cond)
{
return false;
}
true
}
fn matches_pattern(&self, pattern: &str, value: &str) -> bool {
if pattern == "*" {
return true;
}
if pattern.contains('*') {
let parts: Vec<&str> = pattern.split('*').collect();
if parts.len() == 2 {
let prefix = parts[0];
let suffix = parts[1];
return value.starts_with(prefix) && value.ends_with(suffix);
}
}
pattern == value
}
fn matches_time_condition(&self, time_cond: &TimeCondition) -> bool {
let now = Utc::now();
if let Some((start_hour, end_hour)) = time_cond.active_hours {
let current_hour = now.hour() as u8;
if current_hour < start_hour || current_hour > end_hour {
return false;
}
}
if let Some(active_days) = &time_cond.active_days {
let current_day = now.weekday().num_days_from_sunday() as u8;
if !active_days.contains(¤t_day) {
return false;
}
}
true
}
pub fn can_process(&self) -> bool {
match self.status {
TriggerStatus::Pending => !self.is_expired(),
_ => false,
}
}
pub fn is_expired(&self) -> bool {
let age_seconds = Utc::now().timestamp() - self.created_at.timestamp();
age_seconds > self.config.ttl_seconds as i64
}
pub fn is_retry_exhausted(&self) -> bool {
self.attempt_count > self.config.max_retries
}
pub fn start_processing(&mut self, worker_id: String) -> Result<(), String> {
if !self.can_process() {
return Err(format!(
"Trigger cannot be processed, current status: {:?}",
self.status
));
}
if self.is_expired() {
self.status = TriggerStatus::Expired;
return Err("Trigger has expired".to_string());
}
self.status = TriggerStatus::Processing;
self.worker_id = Some(worker_id);
self.attempt_count += 1;
self.updated_at = Utc::now();
self.action.start_execution();
Ok(())
}
pub fn complete_processing(&mut self, result_data: Option<serde_json::Value>) {
self.status = TriggerStatus::Completed;
self.processed_at = Some(Utc::now());
self.updated_at = Utc::now();
self.worker_id = None;
let action_result = crate::base::component::action::ActionResult {
success: true,
duration_ms: self.processing_duration_ms(),
data: result_data,
error: None,
metadata: self.metadata.clone(),
};
self.action.complete_execution(action_result);
}
pub fn fail_processing(&mut self, error: String) -> bool {
let duration_ms = self.processing_duration_ms();
let can_retry =
self.action.fail_execution(error.clone(), duration_ms) && !self.is_retry_exhausted();
if can_retry {
self.status = TriggerStatus::Pending; } else {
self.status = TriggerStatus::Failed;
self.processed_at = Some(Utc::now());
}
self.updated_at = Utc::now();
self.worker_id = None;
can_retry
}
pub fn cancel(&mut self) {
self.status = TriggerStatus::Cancelled;
self.updated_at = Utc::now();
self.worker_id = None;
self.action.cancel();
}
pub fn skip(&mut self, reason: String) {
self.status = TriggerStatus::Skipped;
self.updated_at = Utc::now();
self.add_metadata("skip_reason".to_string(), serde_json::Value::String(reason));
}
fn processing_duration_ms(&self) -> u64 {
if let Some(processed_at) = self.processed_at {
let duration = processed_at.timestamp_millis() - self.created_at.timestamp_millis();
duration.max(0) as u64
} else {
0
}
}
pub fn add_metadata(&mut self, key: String, value: serde_json::Value) {
self.metadata.insert(key, value);
self.updated_at = Utc::now();
}
pub fn get_metadata(&self, key: &str) -> Option<&serde_json::Value> {
self.metadata.get(key)
}
pub fn add_correlation(&mut self, correlation_id: Uuid) {
if !self.correlation_chain.contains(&correlation_id) {
self.correlation_chain.push(correlation_id);
self.updated_at = Utc::now();
}
}
pub fn summary(&self) -> TriggerSummary {
TriggerSummary {
id: self.id,
name: self.name.clone(),
source: self.source.clone(),
target: self.target.clone(),
status: self.status.clone(),
event_type: self.triggering_event.event_type.clone(),
event_source: self.triggering_event.source.clone(),
action_name: self.action.name.clone(),
action_status: self.action.status.clone(),
attempt_count: self.attempt_count,
age_seconds: Utc::now().timestamp() - self.created_at.timestamp(),
processing_duration_ms: if self.status == TriggerStatus::Processing {
Some(self.processing_duration_ms())
} else {
None
},
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListenerConfig {
pub enabled: bool,
pub max_triggers_per_window: usize,
pub time_window_seconds: u64,
pub default_trigger_config: TriggerConfig,
pub batch_size: usize,
pub processing_timeout_seconds: u64,
}
impl Default for ListenerConfig {
fn default() -> Self {
Self {
enabled: true,
max_triggers_per_window: 1000,
time_window_seconds: 60,
default_trigger_config: TriggerConfig::default(),
batch_size: 10,
processing_timeout_seconds: 30,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListenerStats {
pub name: String,
pub enabled: bool,
pub events_processed: u64,
pub triggers_created: u64,
pub triggers_completed: u64,
pub triggers_failed: u64,
pub average_processing_time_ms: Option<u64>,
pub last_event_processed: Option<DateTime<Utc>>,
pub last_trigger_created: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TriggerSummary {
pub id: Uuid,
pub name: String,
pub source: String,
pub target: String,
pub status: TriggerStatus,
pub event_type: String,
pub event_source: String,
pub action_name: String,
pub action_status: ActionStatus,
pub attempt_count: u32,
pub age_seconds: i64,
pub processing_duration_ms: Option<u64>,
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_trigger_creation() {
let event = Event::new(
"test_event".to_string(),
json!({"test": "data"}),
"test_source".to_string(),
);
let action = Action::new(
"Test Action".to_string(),
"Test action".to_string(),
"test_source".to_string(),
"test_service".to_string(),
);
let condition = TriggerCondition {
event_type_pattern: "test_*".to_string(),
source_pattern: None,
payload_conditions: vec![],
min_priority: None,
time_conditions: None,
};
let trigger = Trigger::new(
"Test Trigger".to_string(),
"Test trigger".to_string(),
"listener_service".to_string(),
"action_service".to_string(),
event,
action,
condition,
);
assert_eq!(trigger.name, "Test Trigger");
assert_eq!(trigger.status, TriggerStatus::Pending);
assert!(trigger.can_process());
}
#[test]
fn test_trigger_event_matching() {
let event = Event::new(
"user_created".to_string(),
json!({"user_id": "123"}),
"user_service".to_string(),
);
let action = Action::default();
let condition = TriggerCondition {
event_type_pattern: "user_*".to_string(),
source_pattern: Some("user_service".to_string()),
payload_conditions: vec![],
min_priority: None,
time_conditions: None,
};
let trigger = Trigger::new(
"User Event Trigger".to_string(),
"Triggers on user events".to_string(),
"listener".to_string(),
"processor".to_string(),
event.clone(),
action,
condition,
);
assert!(trigger.matches_event(&event));
let different_event = Event::new(
"order_created".to_string(),
json!({}),
"user_service".to_string(),
);
assert!(!trigger.matches_event(&different_event));
}
#[test]
fn test_trigger_processing_lifecycle() {
let event = Event::default();
let action = Action::default();
let condition = TriggerCondition {
event_type_pattern: "*".to_string(),
source_pattern: None,
payload_conditions: vec![],
min_priority: None,
time_conditions: None,
};
let mut trigger = Trigger::new(
"Test".to_string(),
"Test".to_string(),
"listener".to_string(),
"processor".to_string(),
event,
action,
condition,
);
assert!(trigger.start_processing("worker-1".to_string()).is_ok());
assert_eq!(trigger.status, TriggerStatus::Processing);
assert_eq!(trigger.attempt_count, 1);
trigger.complete_processing(Some(json!({"result": "success"})));
assert_eq!(trigger.status, TriggerStatus::Completed);
assert!(trigger.processed_at.is_some());
}
#[test]
fn test_trigger_pattern_matching() {
let event = Event::default();
let action = Action::default();
let condition = TriggerCondition {
event_type_pattern: "*".to_string(),
source_pattern: None,
payload_conditions: vec![],
min_priority: None,
time_conditions: None,
};
let trigger = Trigger::new(
"Test".to_string(),
"Test".to_string(),
"listener".to_string(),
"processor".to_string(),
event,
action,
condition,
);
assert!(trigger.matches_pattern("*", "anything"));
assert!(trigger.matches_pattern("user_*", "user_created"));
assert!(trigger.matches_pattern("*_event", "test_event"));
assert!(!trigger.matches_pattern("user_*", "order_created"));
assert!(trigger.matches_pattern("exact_match", "exact_match"));
assert!(!trigger.matches_pattern("exact_match", "different"));
}
#[test]
fn test_listener_config_default() {
let config = ListenerConfig::default();
assert!(config.enabled);
assert_eq!(config.max_triggers_per_window, 1000);
assert_eq!(config.time_window_seconds, 60);
assert_eq!(config.batch_size, 10);
assert_eq!(config.processing_timeout_seconds, 30);
}
#[test]
fn test_listener_config_serde_round_trip() {
let config = ListenerConfig::default();
let json = serde_json::to_string(&config).unwrap();
let restored: ListenerConfig = serde_json::from_str(&json).unwrap();
assert_eq!(restored.enabled, config.enabled);
assert_eq!(
restored.max_triggers_per_window,
config.max_triggers_per_window
);
assert_eq!(restored.batch_size, config.batch_size);
}
#[test]
fn test_listener_stats_serde_round_trip() {
let stats = ListenerStats {
name: "test-listener".to_string(),
enabled: true,
events_processed: 42,
triggers_created: 10,
triggers_completed: 8,
triggers_failed: 2,
average_processing_time_ms: Some(150),
last_event_processed: None,
last_trigger_created: None,
};
let json = serde_json::to_string(&stats).unwrap();
let restored: ListenerStats = serde_json::from_str(&json).unwrap();
assert_eq!(restored.name, stats.name);
assert_eq!(restored.events_processed, stats.events_processed);
assert_eq!(restored.triggers_completed, stats.triggers_completed);
}
}