paladin-ai-core 0.5.1

Pure domain types for the Paladin framework — zero infrastructure dependencies
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
/*
Trigger Container

A Trigger is a container that represents a request to execute an Action in response to an Event.
Triggers are created by Listener services when they detect Events that match their criteria.

The Trigger acts as a bridge between Events and Actions, containing:
- The Event that caused the trigger
- The Action to be executed
- Trigger-specific configuration and metadata
- Condition matching and filtering logic

This container is the base for more complex Trigger types that can be built in the
Application Layer for specific use cases.
*/

use crate::base::component::action::{Action, ActionStatus};
use crate::base::component::event::Event;
use crate::base::entity::message::MessagePriority;
use chrono::{DateTime, Datelike, Timelike, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use uuid::Uuid;

/// Trigger execution status
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum TriggerStatus {
    /// Trigger has been created and is ready to be processed
    #[default]
    Pending,
    /// Trigger is being processed
    Processing,
    /// Trigger has been successfully processed (action executed)
    Completed,
    /// Trigger processing failed
    Failed,
    /// Trigger was cancelled
    Cancelled,
    /// Trigger was skipped due to conditions not being met
    Skipped,
    /// Trigger expired before processing
    Expired,
}

/// Trigger condition for determining when to fire
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TriggerCondition {
    /// Event type pattern to match (supports wildcards)
    pub event_type_pattern: String,
    /// Source pattern to match
    pub source_pattern: Option<String>,
    /// Payload conditions (JSONPath expressions)
    pub payload_conditions: Vec<String>,
    /// Minimum priority level to trigger
    pub min_priority: Option<MessagePriority>,
    /// Time-based conditions
    pub time_conditions: Option<TimeCondition>,
}

/// Time-based trigger conditions
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeCondition {
    /// Only trigger during these hours (24-hour format)
    pub active_hours: Option<(u8, u8)>,
    /// Days of week to be active (0 = Sunday)
    pub active_days: Option<Vec<u8>>,
    /// Cooldown period in seconds between triggers
    pub cooldown_seconds: Option<u64>,
}

/// Trigger configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TriggerConfig {
    /// Maximum number of retries if action fails
    pub max_retries: u32,
    /// Timeout for action execution in seconds
    pub timeout_seconds: u64,
    /// Whether to preserve the trigger after completion
    pub preserve_after_completion: bool,
    /// Time-to-live for the trigger in seconds
    pub ttl_seconds: u64,
    /// Priority for trigger processing
    pub processing_priority: MessagePriority,
}

impl Default for TriggerConfig {
    fn default() -> Self {
        Self {
            max_retries: 3,
            timeout_seconds: 300, // 5 minutes
            preserve_after_completion: true,
            ttl_seconds: 3600, // 1 hour
            processing_priority: MessagePriority::Normal,
        }
    }
}

/// Main Trigger container
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Trigger {
    /// Unique identifier for the trigger
    pub id: Uuid,
    /// Human-readable name
    pub name: String,
    /// Description of what this trigger does
    pub description: String,
    /// Source listener service that created this trigger
    pub source: String,
    /// Target service that will process the trigger
    pub target: String,
    /// The event that caused this trigger
    pub triggering_event: Event,
    /// The action to be executed
    pub action: Action,
    /// Trigger condition that was matched
    pub condition: TriggerCondition,
    /// Configuration for trigger processing
    pub config: TriggerConfig,
    /// Current processing status
    pub status: TriggerStatus,
    /// When the trigger was created
    pub created_at: DateTime<Utc>,
    /// When the trigger was last updated
    pub updated_at: DateTime<Utc>,
    /// When the trigger was processed
    pub processed_at: Option<DateTime<Utc>>,
    /// Number of processing attempts
    pub attempt_count: u32,
    /// Processing worker identifier
    pub worker_id: Option<String>,
    /// Additional trigger-specific metadata
    pub metadata: HashMap<String, serde_json::Value>,
    /// Correlation chain for tracking related triggers
    pub correlation_chain: Vec<Uuid>,
}

impl Trigger {
    /// Create a new trigger
    pub fn new(
        name: String,
        description: String,
        source: String,
        target: String,
        triggering_event: Event,
        action: Action,
        condition: TriggerCondition,
    ) -> Self {
        let now = Utc::now();
        Self {
            id: Uuid::new_v4(),
            name,
            description,
            source,
            target,
            triggering_event,
            action,
            condition,
            config: TriggerConfig::default(),
            status: TriggerStatus::Pending,
            created_at: now,
            updated_at: now,
            processed_at: None,
            attempt_count: 0,
            worker_id: None,
            metadata: HashMap::new(),
            correlation_chain: Vec::new(),
        }
    }

    /// Create a trigger with custom configuration
    #[allow(clippy::too_many_arguments)]
    pub fn with_config(
        name: String,
        description: String,
        source: String,
        target: String,
        triggering_event: Event,
        action: Action,
        condition: TriggerCondition,
        config: TriggerConfig,
    ) -> Self {
        let mut trigger = Self::new(
            name,
            description,
            source,
            target,
            triggering_event,
            action,
            condition,
        );
        trigger.config = config;
        trigger
    }

    /// Check if the trigger matches the given event
    pub fn matches_event(&self, event: &Event) -> bool {
        // Check event type pattern
        if !self.matches_pattern(&self.condition.event_type_pattern, &event.event_type) {
            return false;
        }

        // Check source pattern
        if let Some(source_pattern) = &self.condition.source_pattern
            && !self.matches_pattern(source_pattern, &event.source)
        {
            return false;
        }

        // Check time conditions
        if let Some(time_cond) = &self.condition.time_conditions
            && !self.matches_time_condition(time_cond)
        {
            return false;
        }

        // TODO: Implement payload condition matching (JSONPath)
        // This would require a JSONPath library for complex payload filtering

        true
    }

    /// Check if a string matches a pattern (supports simple wildcards)
    fn matches_pattern(&self, pattern: &str, value: &str) -> bool {
        if pattern == "*" {
            return true;
        }

        if pattern.contains('*') {
            // Simple wildcard matching
            let parts: Vec<&str> = pattern.split('*').collect();
            if parts.len() == 2 {
                let prefix = parts[0];
                let suffix = parts[1];
                return value.starts_with(prefix) && value.ends_with(suffix);
            }
        }

        pattern == value
    }

    /// Check if current time matches time conditions
    fn matches_time_condition(&self, time_cond: &TimeCondition) -> bool {
        let now = Utc::now();

        // Check active hours
        if let Some((start_hour, end_hour)) = time_cond.active_hours {
            let current_hour = now.hour() as u8;
            if current_hour < start_hour || current_hour > end_hour {
                return false;
            }
        }

        // Check active days
        if let Some(active_days) = &time_cond.active_days {
            let current_day = now.weekday().num_days_from_sunday() as u8;
            if !active_days.contains(&current_day) {
                return false;
            }
        }

        // TODO: Check cooldown period
        // This would require tracking the last execution time

        true
    }

    /// Check if the trigger can be processed
    pub fn can_process(&self) -> bool {
        match self.status {
            TriggerStatus::Pending => !self.is_expired(),
            _ => false,
        }
    }

    /// Check if the trigger has expired
    pub fn is_expired(&self) -> bool {
        let age_seconds = Utc::now().timestamp() - self.created_at.timestamp();
        age_seconds > self.config.ttl_seconds as i64
    }

    /// Check if retry attempts are exhausted
    pub fn is_retry_exhausted(&self) -> bool {
        self.attempt_count > self.config.max_retries
    }

    /// Start processing the trigger
    pub fn start_processing(&mut self, worker_id: String) -> Result<(), String> {
        if !self.can_process() {
            return Err(format!(
                "Trigger cannot be processed, current status: {:?}",
                self.status
            ));
        }

        if self.is_expired() {
            self.status = TriggerStatus::Expired;
            return Err("Trigger has expired".to_string());
        }

        self.status = TriggerStatus::Processing;
        self.worker_id = Some(worker_id);
        self.attempt_count += 1;
        self.updated_at = Utc::now();

        // Start the underlying action
        self.action.start_execution();

        Ok(())
    }

    /// Complete processing successfully
    pub fn complete_processing(&mut self, result_data: Option<serde_json::Value>) {
        self.status = TriggerStatus::Completed;
        self.processed_at = Some(Utc::now());
        self.updated_at = Utc::now();
        self.worker_id = None;

        // Complete the underlying action
        let action_result = crate::base::component::action::ActionResult {
            success: true,
            duration_ms: self.processing_duration_ms(),
            data: result_data,
            error: None,
            metadata: self.metadata.clone(),
        };

        self.action.complete_execution(action_result);
    }

    /// Fail processing with an error
    pub fn fail_processing(&mut self, error: String) -> bool {
        let duration_ms = self.processing_duration_ms();
        let can_retry =
            self.action.fail_execution(error.clone(), duration_ms) && !self.is_retry_exhausted();

        if can_retry {
            self.status = TriggerStatus::Pending; // Reset to pending for retry
        } else {
            self.status = TriggerStatus::Failed;
            self.processed_at = Some(Utc::now());
        }

        self.updated_at = Utc::now();
        self.worker_id = None;

        can_retry
    }

    /// Cancel the trigger
    pub fn cancel(&mut self) {
        self.status = TriggerStatus::Cancelled;
        self.updated_at = Utc::now();
        self.worker_id = None;
        self.action.cancel();
    }

    /// Skip the trigger (conditions not met after initial match)
    pub fn skip(&mut self, reason: String) {
        self.status = TriggerStatus::Skipped;
        self.updated_at = Utc::now();
        self.add_metadata("skip_reason".to_string(), serde_json::Value::String(reason));
    }

    /// Get processing duration in milliseconds
    fn processing_duration_ms(&self) -> u64 {
        if let Some(processed_at) = self.processed_at {
            let duration = processed_at.timestamp_millis() - self.created_at.timestamp_millis();
            duration.max(0) as u64
        } else {
            0
        }
    }

    /// Add metadata
    pub fn add_metadata(&mut self, key: String, value: serde_json::Value) {
        self.metadata.insert(key, value);
        self.updated_at = Utc::now();
    }

    /// Get metadata
    pub fn get_metadata(&self, key: &str) -> Option<&serde_json::Value> {
        self.metadata.get(key)
    }

    /// Add to correlation chain
    pub fn add_correlation(&mut self, correlation_id: Uuid) {
        if !self.correlation_chain.contains(&correlation_id) {
            self.correlation_chain.push(correlation_id);
            self.updated_at = Utc::now();
        }
    }

    /// Create a summary for monitoring
    pub fn summary(&self) -> TriggerSummary {
        TriggerSummary {
            id: self.id,
            name: self.name.clone(),
            source: self.source.clone(),
            target: self.target.clone(),
            status: self.status.clone(),
            event_type: self.triggering_event.event_type.clone(),
            event_source: self.triggering_event.source.clone(),
            action_name: self.action.name.clone(),
            action_status: self.action.status.clone(),
            attempt_count: self.attempt_count,
            age_seconds: Utc::now().timestamp() - self.created_at.timestamp(),
            processing_duration_ms: if self.status == TriggerStatus::Processing {
                Some(self.processing_duration_ms())
            } else {
                None
            },
        }
    }
}

/// Configuration for an event listener.
///
/// Plain config struct with `Default` impl and no port references.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListenerConfig {
    /// Whether the listener is active
    pub enabled: bool,
    /// Maximum number of triggers to create per time window
    pub max_triggers_per_window: usize,
    /// Time window duration in seconds
    pub time_window_seconds: u64,
    /// Default trigger configuration
    pub default_trigger_config: TriggerConfig,
    /// Batch processing settings
    pub batch_size: usize,
    /// Processing timeout in seconds
    pub processing_timeout_seconds: u64,
}

impl Default for ListenerConfig {
    fn default() -> Self {
        Self {
            enabled: true,
            max_triggers_per_window: 1000,
            time_window_seconds: 60,
            default_trigger_config: TriggerConfig::default(),
            batch_size: 10,
            processing_timeout_seconds: 30,
        }
    }
}

/// Runtime statistics for an event listener.
///
/// Plain data type with `Serialize`/`Deserialize` — suitable for core container layer.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListenerStats {
    /// Listener name
    pub name: String,
    /// Whether the listener is currently enabled
    pub enabled: bool,
    /// Total events processed
    pub events_processed: u64,
    /// Total triggers created
    pub triggers_created: u64,
    /// Triggers successfully completed
    pub triggers_completed: u64,
    /// Triggers that failed
    pub triggers_failed: u64,
    /// Average event-to-trigger processing time
    pub average_processing_time_ms: Option<u64>,
    /// Timestamp of the last processed event
    pub last_event_processed: Option<DateTime<Utc>>,
    /// Timestamp of the last created trigger
    pub last_trigger_created: Option<DateTime<Utc>>,
}

/// Summary information for trigger monitoring
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TriggerSummary {
    pub id: Uuid,
    pub name: String,
    pub source: String,
    pub target: String,
    pub status: TriggerStatus,
    pub event_type: String,
    pub event_source: String,
    pub action_name: String,
    pub action_status: ActionStatus,
    pub attempt_count: u32,
    pub age_seconds: i64,
    pub processing_duration_ms: Option<u64>,
}

#[cfg(test)]
mod tests {
    use super::*;
    use serde_json::json;

    #[test]
    fn test_trigger_creation() {
        let event = Event::new(
            "test_event".to_string(),
            json!({"test": "data"}),
            "test_source".to_string(),
        );

        let action = Action::new(
            "Test Action".to_string(),
            "Test action".to_string(),
            "test_source".to_string(),
            "test_service".to_string(),
        );

        let condition = TriggerCondition {
            event_type_pattern: "test_*".to_string(),
            source_pattern: None,
            payload_conditions: vec![],
            min_priority: None,
            time_conditions: None,
        };

        let trigger = Trigger::new(
            "Test Trigger".to_string(),
            "Test trigger".to_string(),
            "listener_service".to_string(),
            "action_service".to_string(),
            event,
            action,
            condition,
        );

        assert_eq!(trigger.name, "Test Trigger");
        assert_eq!(trigger.status, TriggerStatus::Pending);
        assert!(trigger.can_process());
    }

    #[test]
    fn test_trigger_event_matching() {
        let event = Event::new(
            "user_created".to_string(),
            json!({"user_id": "123"}),
            "user_service".to_string(),
        );

        let action = Action::default();

        let condition = TriggerCondition {
            event_type_pattern: "user_*".to_string(),
            source_pattern: Some("user_service".to_string()),
            payload_conditions: vec![],
            min_priority: None,
            time_conditions: None,
        };

        let trigger = Trigger::new(
            "User Event Trigger".to_string(),
            "Triggers on user events".to_string(),
            "listener".to_string(),
            "processor".to_string(),
            event.clone(),
            action,
            condition,
        );

        // Should match the same event
        assert!(trigger.matches_event(&event));

        // Should not match different event type
        let different_event = Event::new(
            "order_created".to_string(),
            json!({}),
            "user_service".to_string(),
        );
        assert!(!trigger.matches_event(&different_event));
    }

    #[test]
    fn test_trigger_processing_lifecycle() {
        let event = Event::default();
        let action = Action::default();
        let condition = TriggerCondition {
            event_type_pattern: "*".to_string(),
            source_pattern: None,
            payload_conditions: vec![],
            min_priority: None,
            time_conditions: None,
        };

        let mut trigger = Trigger::new(
            "Test".to_string(),
            "Test".to_string(),
            "listener".to_string(),
            "processor".to_string(),
            event,
            action,
            condition,
        );

        // Start processing
        assert!(trigger.start_processing("worker-1".to_string()).is_ok());
        assert_eq!(trigger.status, TriggerStatus::Processing);
        assert_eq!(trigger.attempt_count, 1);

        // Complete processing
        trigger.complete_processing(Some(json!({"result": "success"})));
        assert_eq!(trigger.status, TriggerStatus::Completed);
        assert!(trigger.processed_at.is_some());
    }

    #[test]
    fn test_trigger_pattern_matching() {
        let event = Event::default();
        let action = Action::default();
        let condition = TriggerCondition {
            event_type_pattern: "*".to_string(),
            source_pattern: None,
            payload_conditions: vec![],
            min_priority: None,
            time_conditions: None,
        };

        let trigger = Trigger::new(
            "Test".to_string(),
            "Test".to_string(),
            "listener".to_string(),
            "processor".to_string(),
            event,
            action,
            condition,
        );

        // Test wildcard matching
        assert!(trigger.matches_pattern("*", "anything"));
        assert!(trigger.matches_pattern("user_*", "user_created"));
        assert!(trigger.matches_pattern("*_event", "test_event"));
        assert!(!trigger.matches_pattern("user_*", "order_created"));

        // Test exact matching
        assert!(trigger.matches_pattern("exact_match", "exact_match"));
        assert!(!trigger.matches_pattern("exact_match", "different"));
    }

    #[test]
    fn test_listener_config_default() {
        let config = ListenerConfig::default();
        assert!(config.enabled);
        assert_eq!(config.max_triggers_per_window, 1000);
        assert_eq!(config.time_window_seconds, 60);
        assert_eq!(config.batch_size, 10);
        assert_eq!(config.processing_timeout_seconds, 30);
    }

    #[test]
    fn test_listener_config_serde_round_trip() {
        let config = ListenerConfig::default();
        let json = serde_json::to_string(&config).unwrap();
        let restored: ListenerConfig = serde_json::from_str(&json).unwrap();
        assert_eq!(restored.enabled, config.enabled);
        assert_eq!(
            restored.max_triggers_per_window,
            config.max_triggers_per_window
        );
        assert_eq!(restored.batch_size, config.batch_size);
    }

    #[test]
    fn test_listener_stats_serde_round_trip() {
        let stats = ListenerStats {
            name: "test-listener".to_string(),
            enabled: true,
            events_processed: 42,
            triggers_created: 10,
            triggers_completed: 8,
            triggers_failed: 2,
            average_processing_time_ms: Some(150),
            last_event_processed: None,
            last_trigger_created: None,
        };
        let json = serde_json::to_string(&stats).unwrap();
        let restored: ListenerStats = serde_json::from_str(&json).unwrap();
        assert_eq!(restored.name, stats.name);
        assert_eq!(restored.events_processed, stats.events_processed);
        assert_eq!(restored.triggers_completed, stats.triggers_completed);
    }
}