Skip to main content

cortex_runtime/temporal/
watch.rs

1//! Watch system — alert rules that monitor temporal data.
2
3use crate::temporal::query::TemporalQuery;
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7
8/// A watch rule that monitors a temporal query for conditions.
9#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct WatchRule {
11    /// Unique rule ID.
12    pub id: String,
13    /// Domain to watch.
14    pub domain: String,
15    /// Model type to watch (e.g., "Product").
16    pub model_type: Option<String>,
17    /// Feature dimension to monitor.
18    pub feature_dim: u8,
19    /// Condition that triggers the alert.
20    pub condition: WatchCondition,
21    /// Where to send notifications.
22    pub notify: NotifyTarget,
23    /// Whether this rule is active.
24    pub active: bool,
25    /// When this rule was created.
26    pub created_at: DateTime<Utc>,
27    /// When this rule last triggered.
28    pub last_triggered: Option<DateTime<Utc>>,
29}
30
31/// Condition that triggers a watch alert.
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub enum WatchCondition {
34    /// Feature value goes above threshold.
35    ValueAbove(f32),
36    /// Feature value goes below threshold.
37    ValueBelow(f32),
38    /// Feature changes by more than a percentage threshold.
39    ChangeByPercent(f32),
40    /// A previously unavailable item becomes available.
41    Available,
42    /// A new node of the watched type appears.
43    NewInstance,
44}
45
46/// Notification target.
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub enum NotifyTarget {
49    /// Send to a webhook URL.
50    Webhook(String),
51    /// Emit on the event bus.
52    EventBus,
53    /// Send to connected protocol agents.
54    Protocol,
55}
56
57/// Watch alert notification.
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct WatchAlert {
60    /// Rule that triggered.
61    pub rule_id: String,
62    /// Domain.
63    pub domain: String,
64    /// What was detected.
65    pub message: String,
66    /// Current value.
67    pub current_value: f32,
68    /// Previous value (if applicable).
69    pub previous_value: Option<f32>,
70    /// When the alert was generated.
71    pub timestamp: DateTime<Utc>,
72}
73
74/// Manages active watch rules.
75pub struct WatchManager {
76    /// Active rules.
77    rules: HashMap<String, WatchRule>,
78    /// Alerts generated.
79    alerts: Vec<WatchAlert>,
80}
81
82impl WatchManager {
83    /// Create a new watch manager.
84    pub fn new() -> Self {
85        Self {
86            rules: HashMap::new(),
87            alerts: Vec::new(),
88        }
89    }
90
91    /// Add a new watch rule.
92    pub fn add_rule(&mut self, rule: WatchRule) -> String {
93        let id = rule.id.clone();
94        self.rules.insert(id.clone(), rule);
95        id
96    }
97
98    /// Remove a watch rule.
99    pub fn remove_rule(&mut self, id: &str) -> bool {
100        self.rules.remove(id).is_some()
101    }
102
103    /// List all active rules.
104    pub fn list_rules(&self) -> Vec<&WatchRule> {
105        self.rules.values().collect()
106    }
107
108    /// Evaluate all rules against new data.
109    ///
110    /// Returns any alerts that were triggered.
111    pub fn evaluate(
112        &mut self,
113        domain: &str,
114        feature_dim: u8,
115        current_value: f32,
116        previous_value: f32,
117    ) -> Vec<WatchAlert> {
118        let mut triggered = Vec::new();
119
120        for rule in self.rules.values_mut() {
121            if rule.domain != domain || rule.feature_dim != feature_dim || !rule.active {
122                continue;
123            }
124
125            let alert = match &rule.condition {
126                WatchCondition::ValueAbove(threshold) => {
127                    if current_value > *threshold && previous_value <= *threshold {
128                        Some(WatchAlert {
129                            rule_id: rule.id.clone(),
130                            domain: domain.to_string(),
131                            message: format!("Value rose above {threshold}: {current_value}"),
132                            current_value,
133                            previous_value: Some(previous_value),
134                            timestamp: Utc::now(),
135                        })
136                    } else {
137                        None
138                    }
139                }
140                WatchCondition::ValueBelow(threshold) => {
141                    if current_value < *threshold && previous_value >= *threshold {
142                        Some(WatchAlert {
143                            rule_id: rule.id.clone(),
144                            domain: domain.to_string(),
145                            message: format!("Value dropped below {threshold}: {current_value}"),
146                            current_value,
147                            previous_value: Some(previous_value),
148                            timestamp: Utc::now(),
149                        })
150                    } else {
151                        None
152                    }
153                }
154                WatchCondition::ChangeByPercent(pct) => {
155                    if previous_value != 0.0 {
156                        let change = ((current_value - previous_value) / previous_value).abs();
157                        if change > *pct {
158                            Some(WatchAlert {
159                                rule_id: rule.id.clone(),
160                                domain: domain.to_string(),
161                                message: format!(
162                                    "Value changed by {:.1}% (threshold: {:.1}%)",
163                                    change * 100.0,
164                                    pct * 100.0
165                                ),
166                                current_value,
167                                previous_value: Some(previous_value),
168                                timestamp: Utc::now(),
169                            })
170                        } else {
171                            None
172                        }
173                    } else {
174                        None
175                    }
176                }
177                WatchCondition::Available => {
178                    if previous_value <= 0.0 && current_value > 0.0 {
179                        Some(WatchAlert {
180                            rule_id: rule.id.clone(),
181                            domain: domain.to_string(),
182                            message: "Item became available".to_string(),
183                            current_value,
184                            previous_value: Some(previous_value),
185                            timestamp: Utc::now(),
186                        })
187                    } else {
188                        None
189                    }
190                }
191                WatchCondition::NewInstance => None, // Handled separately
192            };
193
194            if let Some(alert) = alert {
195                rule.last_triggered = Some(Utc::now());
196                triggered.push(alert);
197            }
198        }
199
200        self.alerts.extend(triggered.clone());
201        triggered
202    }
203
204    /// Get recent alerts.
205    pub fn recent_alerts(&self, limit: usize) -> &[WatchAlert] {
206        let start = if self.alerts.len() > limit {
207            self.alerts.len() - limit
208        } else {
209            0
210        };
211        &self.alerts[start..]
212    }
213}
214
215impl Default for WatchManager {
216    fn default() -> Self {
217        Self::new()
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224
225    fn make_rule(condition: WatchCondition) -> WatchRule {
226        WatchRule {
227            id: "test-1".to_string(),
228            domain: "shop.com".to_string(),
229            model_type: Some("Product".to_string()),
230            feature_dim: 48, // price
231            condition,
232            notify: NotifyTarget::EventBus,
233            active: true,
234            created_at: Utc::now(),
235            last_triggered: None,
236        }
237    }
238
239    #[test]
240    fn test_watch_value_below() {
241        let mut wm = WatchManager::new();
242        wm.add_rule(make_rule(WatchCondition::ValueBelow(80.0)));
243
244        // Price drops from 100 to 75
245        let alerts = wm.evaluate("shop.com", 48, 75.0, 100.0);
246        assert_eq!(alerts.len(), 1);
247        assert!(alerts[0].message.contains("below"));
248    }
249
250    #[test]
251    fn test_watch_value_above() {
252        let mut wm = WatchManager::new();
253        wm.add_rule(make_rule(WatchCondition::ValueAbove(150.0)));
254
255        // Price rises from 100 to 200
256        let alerts = wm.evaluate("shop.com", 48, 200.0, 100.0);
257        assert_eq!(alerts.len(), 1);
258    }
259
260    #[test]
261    fn test_watch_no_trigger() {
262        let mut wm = WatchManager::new();
263        wm.add_rule(make_rule(WatchCondition::ValueBelow(80.0)));
264
265        // Price stays above threshold
266        let alerts = wm.evaluate("shop.com", 48, 100.0, 95.0);
267        assert!(alerts.is_empty());
268    }
269
270    #[test]
271    fn test_watch_change_by_percent() {
272        let mut wm = WatchManager::new();
273        wm.add_rule(make_rule(WatchCondition::ChangeByPercent(0.1))); // 10%
274
275        // 20% change
276        let alerts = wm.evaluate("shop.com", 48, 80.0, 100.0);
277        assert_eq!(alerts.len(), 1);
278    }
279
280    #[test]
281    fn test_watch_available() {
282        let mut wm = WatchManager::new();
283        let mut rule = make_rule(WatchCondition::Available);
284        rule.feature_dim = 51; // availability
285        wm.add_rule(rule);
286
287        let alerts = wm.evaluate("shop.com", 51, 1.0, 0.0);
288        assert_eq!(alerts.len(), 1);
289        assert!(alerts[0].message.contains("available"));
290    }
291
292    #[test]
293    fn test_watch_manage_rules() {
294        let mut wm = WatchManager::new();
295        wm.add_rule(make_rule(WatchCondition::ValueBelow(80.0)));
296        assert_eq!(wm.list_rules().len(), 1);
297
298        wm.remove_rule("test-1");
299        assert!(wm.list_rules().is_empty());
300    }
301
302    // ── v4 Test Suite: Phase 3D — Watch/Alert System ──
303
304    #[test]
305    fn test_v4_watch_create_and_list() {
306        let mut wm = WatchManager::new();
307
308        wm.add_rule(WatchRule {
309            id: "watch-1".to_string(),
310            domain: "amazon.com".to_string(),
311            model_type: Some("Product".to_string()),
312            feature_dim: 48,
313            condition: WatchCondition::ValueBelow(50.0),
314            notify: NotifyTarget::EventBus,
315            active: true,
316            created_at: Utc::now(),
317            last_triggered: None,
318        });
319
320        wm.add_rule(WatchRule {
321            id: "watch-2".to_string(),
322            domain: "amazon.com".to_string(),
323            model_type: Some("Product".to_string()),
324            feature_dim: 48,
325            condition: WatchCondition::ValueAbove(1000.0),
326            notify: NotifyTarget::EventBus,
327            active: true,
328            created_at: Utc::now(),
329            last_triggered: None,
330        });
331
332        let rules = wm.list_rules();
333        assert_eq!(rules.len(), 2);
334        assert!(rules.iter().any(|r| r.id == "watch-1"));
335        assert!(rules.iter().any(|r| r.id == "watch-2"));
336    }
337
338    #[test]
339    fn test_v4_watch_remove_and_verify() {
340        let mut wm = WatchManager::new();
341
342        wm.add_rule(WatchRule {
343            id: "to-remove".to_string(),
344            domain: "test.com".to_string(),
345            model_type: Some("Product".to_string()),
346            feature_dim: 48,
347            condition: WatchCondition::ValueBelow(1.0),
348            notify: NotifyTarget::EventBus,
349            active: true,
350            created_at: Utc::now(),
351            last_triggered: None,
352        });
353
354        assert_eq!(wm.list_rules().len(), 1);
355        wm.remove_rule("to-remove");
356        assert!(wm.list_rules().is_empty());
357
358        // Remove non-existent should not panic
359        wm.remove_rule("non-existent");
360    }
361
362    #[test]
363    fn test_v4_watch_unrealistic_threshold_no_trigger() {
364        let mut wm = WatchManager::new();
365
366        // Price below $0.01 — should NOT trigger for normal data
367        wm.add_rule(WatchRule {
368            id: "unrealistic".to_string(),
369            domain: "amazon.com".to_string(),
370            model_type: Some("Product".to_string()),
371            feature_dim: 48,
372            condition: WatchCondition::ValueBelow(0.01),
373            notify: NotifyTarget::EventBus,
374            active: true,
375            created_at: Utc::now(),
376            last_triggered: None,
377        });
378
379        let alerts = wm.evaluate("amazon.com", 48, 100.0, 95.0);
380        assert!(
381            alerts.is_empty(),
382            "unrealistic threshold should not trigger"
383        );
384    }
385
386    #[test]
387    fn test_v4_watch_realistic_trigger() {
388        let mut wm = WatchManager::new();
389
390        wm.add_rule(WatchRule {
391            id: "price-drop".to_string(),
392            domain: "amazon.com".to_string(),
393            model_type: Some("Product".to_string()),
394            feature_dim: 48,
395            condition: WatchCondition::ValueBelow(80.0),
396            notify: NotifyTarget::EventBus,
397            active: true,
398            created_at: Utc::now(),
399            last_triggered: None,
400        });
401
402        // Price drops to 75 — should trigger
403        let alerts = wm.evaluate("amazon.com", 48, 75.0, 100.0);
404        assert_eq!(alerts.len(), 1);
405
406        // Check alert has the watch rule ID
407        assert_eq!(alerts[0].rule_id, "price-drop");
408    }
409
410    #[test]
411    fn test_v4_watch_recent_alerts() {
412        let mut wm = WatchManager::new();
413
414        wm.add_rule(WatchRule {
415            id: "test-alert".to_string(),
416            domain: "test.com".to_string(),
417            model_type: Some("Product".to_string()),
418            feature_dim: 48,
419            condition: WatchCondition::ValueAbove(100.0),
420            notify: NotifyTarget::EventBus,
421            active: true,
422            created_at: Utc::now(),
423            last_triggered: None,
424        });
425
426        // Trigger an alert
427        let alerts = wm.evaluate("test.com", 48, 150.0, 50.0);
428        assert!(!alerts.is_empty());
429
430        // Recent alerts should include it
431        let recent = wm.recent_alerts(10);
432        assert!(
433            !recent.is_empty(),
434            "recent alerts should include triggered alert"
435        );
436    }
437}