mockforge_vbr/
mutation_rules.rs

1//! Time-triggered data mutation rules
2//!
3//! This module provides a system for automatically mutating VBR entity data
4//! based on time triggers. It supports both aging-style rules (expiration-based)
5//! and arbitrary field mutations (time-triggered changes).
6//!
7//! ## Usage
8//!
9//! ```rust,no_run
10//! use mockforge_vbr::mutation_rules::{MutationRule, MutationRuleManager, MutationTrigger, MutationOperation};
11//! use mockforge_core::time_travel_now;
12//! use std::sync::Arc;
13//!
14//! let manager = MutationRuleManager::new();
15//!
16//! // Create a rule that increments a counter every hour
17//! let rule = MutationRule {
18//!     id: "hourly-counter".to_string(),
19//!     entity_name: "User".to_string(),
20//!     trigger: MutationTrigger::Interval {
21//!         duration_seconds: 3600,
22//!     },
23//!     operation: MutationOperation::Increment {
24//!         field: "login_count".to_string(),
25//!         amount: 1.0,
26//!     },
27//!     enabled: true,
28//! };
29//!
30//! manager.add_rule(rule).await;
31//! ```
32
33use crate::{Error, Result};
34use chrono::{DateTime, Duration, Utc};
35use mockforge_core::time_travel_now;
36use serde::{Deserialize, Serialize};
37use std::collections::HashMap;
38use std::sync::Arc;
39use tokio::sync::RwLock;
40use tracing::{debug, info, warn};
41
42/// Trigger condition for a mutation rule
43#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(tag = "type", rename_all = "lowercase")]
45pub enum MutationTrigger {
46    /// Trigger after a duration has elapsed
47    Interval {
48        /// Duration in seconds
49        duration_seconds: u64,
50    },
51    /// Trigger at a specific time (cron-like, but simpler)
52    AtTime {
53        /// Hour (0-23)
54        hour: u8,
55        /// Minute (0-59)
56        minute: u8,
57    },
58    /// Trigger when a field value reaches a threshold
59    FieldThreshold {
60        /// Field to check
61        field: String,
62        /// Threshold value
63        threshold: serde_json::Value,
64        /// Comparison operator
65        operator: ComparisonOperator,
66    },
67}
68
69/// Comparison operator for field threshold triggers
70#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
71#[serde(rename_all = "lowercase")]
72pub enum ComparisonOperator {
73    /// Greater than
74    Gt,
75    /// Less than
76    Lt,
77    /// Equal to
78    Eq,
79    /// Greater than or equal
80    Gte,
81    /// Less than or equal
82    Lte,
83}
84
85/// Mutation operation to perform
86#[derive(Debug, Clone, Serialize, Deserialize)]
87#[serde(tag = "type", rename_all = "lowercase")]
88pub enum MutationOperation {
89    /// Set a field to a specific value
90    Set {
91        /// Field name
92        field: String,
93        /// Value to set
94        value: serde_json::Value,
95    },
96    /// Increment a numeric field
97    Increment {
98        /// Field name
99        field: String,
100        /// Amount to increment by
101        amount: f64,
102    },
103    /// Decrement a numeric field
104    Decrement {
105        /// Field name
106        field: String,
107        /// Amount to decrement by
108        amount: f64,
109    },
110    /// Transform a field using a template or expression
111    Transform {
112        /// Field name
113        field: String,
114        /// Transformation expression (e.g., "{{field}} * 2")
115        expression: String,
116    },
117    /// Update status field
118    UpdateStatus {
119        /// New status value
120        status: String,
121    },
122}
123
124/// A mutation rule definition
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct MutationRule {
127    /// Unique identifier for this rule
128    pub id: String,
129    /// Entity name to apply mutation to
130    pub entity_name: String,
131    /// Trigger condition
132    pub trigger: MutationTrigger,
133    /// Mutation operation
134    pub operation: MutationOperation,
135    /// Whether this rule is enabled
136    #[serde(default = "default_true")]
137    pub enabled: bool,
138    /// Optional description
139    #[serde(default)]
140    pub description: Option<String>,
141    /// Optional condition (JSONPath expression) that must be true
142    #[serde(default)]
143    pub condition: Option<String>,
144    /// Last execution time
145    #[serde(default)]
146    pub last_execution: Option<DateTime<Utc>>,
147    /// Next scheduled execution time
148    #[serde(default)]
149    pub next_execution: Option<DateTime<Utc>>,
150    /// Number of times this rule has executed
151    #[serde(default)]
152    pub execution_count: usize,
153}
154
155fn default_true() -> bool {
156    true
157}
158
159impl MutationRule {
160    /// Create a new mutation rule
161    pub fn new(
162        id: String,
163        entity_name: String,
164        trigger: MutationTrigger,
165        operation: MutationOperation,
166    ) -> Self {
167        Self {
168            id,
169            entity_name,
170            trigger,
171            operation,
172            enabled: true,
173            description: None,
174            condition: None,
175            last_execution: None,
176            next_execution: None,
177            execution_count: 0,
178        }
179    }
180
181    /// Calculate the next execution time based on the trigger
182    pub fn calculate_next_execution(&self, from: DateTime<Utc>) -> Option<DateTime<Utc>> {
183        if !self.enabled {
184            return None;
185        }
186
187        match &self.trigger {
188            MutationTrigger::Interval { duration_seconds } => {
189                Some(from + Duration::seconds(*duration_seconds as i64))
190            }
191            MutationTrigger::AtTime { hour, minute } => {
192                // Calculate next occurrence of this time
193                let mut next =
194                    from.date_naive().and_hms_opt(*hour as u32, *minute as u32, 0)?.and_utc();
195
196                // If the time has already passed today, move to tomorrow
197                if next <= from {
198                    next = next + Duration::days(1);
199                }
200
201                Some(next)
202            }
203            MutationTrigger::FieldThreshold { .. } => {
204                // Field threshold triggers are evaluated on-demand, not scheduled
205                None
206            }
207        }
208    }
209}
210
211/// Manager for mutation rules
212pub struct MutationRuleManager {
213    /// Registered mutation rules
214    rules: Arc<RwLock<HashMap<String, MutationRule>>>,
215}
216
217impl MutationRuleManager {
218    /// Create a new mutation rule manager
219    pub fn new() -> Self {
220        Self {
221            rules: Arc::new(RwLock::new(HashMap::new())),
222        }
223    }
224
225    /// Add a mutation rule
226    pub async fn add_rule(&self, mut rule: MutationRule) -> Result<()> {
227        // Calculate next execution time
228        let now = time_travel_now();
229        rule.next_execution = rule.calculate_next_execution(now);
230
231        let rule_id = rule.id.clone();
232
233        let mut rules = self.rules.write().await;
234        rules.insert(rule_id.clone(), rule);
235
236        info!("Added mutation rule '{}' for entity '{}'", rule_id, rules[&rule_id].entity_name);
237        Ok(())
238    }
239
240    /// Remove a mutation rule
241    pub async fn remove_rule(&self, rule_id: &str) -> bool {
242        let mut rules = self.rules.write().await;
243        let removed = rules.remove(rule_id).is_some();
244
245        if removed {
246            info!("Removed mutation rule '{}'", rule_id);
247        }
248
249        removed
250    }
251
252    /// Get a mutation rule by ID
253    pub async fn get_rule(&self, rule_id: &str) -> Option<MutationRule> {
254        let rules = self.rules.read().await;
255        rules.get(rule_id).cloned()
256    }
257
258    /// List all mutation rules
259    pub async fn list_rules(&self) -> Vec<MutationRule> {
260        let rules = self.rules.read().await;
261        rules.values().cloned().collect()
262    }
263
264    /// List rules for a specific entity
265    pub async fn list_rules_for_entity(&self, entity_name: &str) -> Vec<MutationRule> {
266        let rules = self.rules.read().await;
267        rules.values().filter(|rule| rule.entity_name == entity_name).cloned().collect()
268    }
269
270    /// Enable or disable a mutation rule
271    pub async fn set_rule_enabled(&self, rule_id: &str, enabled: bool) -> Result<()> {
272        let mut rules = self.rules.write().await;
273
274        if let Some(rule) = rules.get_mut(rule_id) {
275            rule.enabled = enabled;
276
277            // Recalculate next execution if enabling
278            if enabled {
279                let now = time_travel_now();
280                rule.next_execution = rule.calculate_next_execution(now);
281            } else {
282                rule.next_execution = None;
283            }
284
285            info!("Mutation rule '{}' {}", rule_id, if enabled { "enabled" } else { "disabled" });
286            Ok(())
287        } else {
288            Err(crate::Error::generic(format!("Mutation rule '{}' not found", rule_id)))
289        }
290    }
291
292    /// Check for rules that should execute now and execute them
293    ///
294    /// This should be called periodically or when time advances
295    /// to check if any rules are due for execution.
296    pub async fn check_and_execute(
297        &self,
298        database: &dyn crate::database::VirtualDatabase,
299        registry: &crate::entities::EntityRegistry,
300    ) -> Result<usize> {
301        let now = time_travel_now();
302        let mut executed = 0;
303
304        // Get rules that need to execute
305        let mut rules_to_execute = Vec::new();
306
307        {
308            let rules = self.rules.read().await;
309            for rule in rules.values() {
310                if !rule.enabled {
311                    continue;
312                }
313
314                if let Some(next) = rule.next_execution {
315                    if now >= next {
316                        rules_to_execute.push(rule.id.clone());
317                    }
318                }
319            }
320        }
321
322        // Execute rules
323        for rule_id in rules_to_execute {
324            if let Err(e) = self.execute_rule(&rule_id, database, registry).await {
325                warn!("Error executing mutation rule '{}': {}", rule_id, e);
326            } else {
327                executed += 1;
328            }
329        }
330
331        Ok(executed)
332    }
333
334    /// Execute a specific mutation rule
335    async fn execute_rule(
336        &self,
337        rule_id: &str,
338        database: &dyn crate::database::VirtualDatabase,
339        registry: &crate::entities::EntityRegistry,
340    ) -> Result<()> {
341        let now = time_travel_now();
342
343        // Get rule
344        let rule = {
345            let rules = self.rules.read().await;
346            rules
347                .get(rule_id)
348                .ok_or_else(|| Error::generic(format!("Mutation rule '{}' not found", rule_id)))?
349                .clone()
350        };
351
352        // Get entity info
353        let entity = registry
354            .get(&rule.entity_name)
355            .ok_or_else(|| Error::generic(format!("Entity '{}' not found", rule.entity_name)))?;
356
357        let table_name = entity.table_name();
358
359        // Query all records for this entity
360        let query = format!("SELECT * FROM {}", table_name);
361        let records = database.query(&query, &[]).await?;
362
363        // Apply mutation to each record
364        let pk_field = entity.schema.primary_key.first().map(|s| s.as_str()).unwrap_or("id");
365
366        for record in records {
367            // Check condition if specified
368            if let Some(ref condition) = rule.condition {
369                // TODO: Implement condition evaluation (JSONPath)
370                // For now, skip if condition is specified
371                debug!("Condition evaluation not yet implemented, skipping record");
372                continue;
373            }
374
375            // Get primary key value
376            let pk_value = record
377                .get(pk_field)
378                .ok_or_else(|| Error::generic(format!("Primary key '{}' not found", pk_field)))?;
379
380            // Apply mutation operation
381            match &rule.operation {
382                MutationOperation::Set { field, value } => {
383                    let update_query =
384                        format!("UPDATE {} SET {} = ? WHERE {} = ?", table_name, field, pk_field);
385                    database.execute(&update_query, &[value.clone(), pk_value.clone()]).await?;
386                }
387                MutationOperation::Increment { field, amount } => {
388                    // Get current value
389                    if let Some(current) = record.get(field) {
390                        let new_value = if let Some(num) = current.as_f64() {
391                            serde_json::Value::Number(
392                                serde_json::Number::from_f64(num + amount)
393                                    .unwrap_or_else(|| serde_json::Number::from(0)),
394                            )
395                        } else if let Some(num) = current.as_i64() {
396                            serde_json::Value::Number(serde_json::Number::from(
397                                num + *amount as i64,
398                            ))
399                        } else {
400                            continue; // Skip non-numeric fields
401                        };
402
403                        let update_query = format!(
404                            "UPDATE {} SET {} = ? WHERE {} = ?",
405                            table_name, field, pk_field
406                        );
407                        database.execute(&update_query, &[new_value, pk_value.clone()]).await?;
408                    }
409                }
410                MutationOperation::Decrement { field, amount } => {
411                    // Get current value
412                    if let Some(current) = record.get(field) {
413                        let new_value = if let Some(num) = current.as_f64() {
414                            serde_json::Value::Number(
415                                serde_json::Number::from_f64(num - amount)
416                                    .unwrap_or_else(|| serde_json::Number::from(0)),
417                            )
418                        } else if let Some(num) = current.as_i64() {
419                            serde_json::Value::Number(serde_json::Number::from(
420                                num - *amount as i64,
421                            ))
422                        } else {
423                            continue; // Skip non-numeric fields
424                        };
425
426                        let update_query = format!(
427                            "UPDATE {} SET {} = ? WHERE {} = ?",
428                            table_name, field, pk_field
429                        );
430                        database.execute(&update_query, &[new_value, pk_value.clone()]).await?;
431                    }
432                }
433                MutationOperation::Transform {
434                    field,
435                    expression: _,
436                } => {
437                    // TODO: Implement transformation expressions
438                    warn!("Transform operation not yet implemented for field '{}'", field);
439                }
440                MutationOperation::UpdateStatus { status } => {
441                    let update_query =
442                        format!("UPDATE {} SET status = ? WHERE {} = ?", table_name, pk_field);
443                    database
444                        .execute(
445                            &update_query,
446                            &[serde_json::Value::String(status.clone()), pk_value.clone()],
447                        )
448                        .await?;
449                }
450            }
451        }
452
453        // Update rule state
454        {
455            let mut rules = self.rules.write().await;
456            if let Some(rule) = rules.get_mut(rule_id) {
457                rule.last_execution = Some(now);
458                rule.execution_count += 1;
459
460                // Calculate next execution
461                rule.next_execution = rule.calculate_next_execution(now);
462            }
463        }
464
465        info!("Executed mutation rule '{}' on entity '{}'", rule_id, rule.entity_name);
466        Ok(())
467    }
468}
469
470impl Default for MutationRuleManager {
471    fn default() -> Self {
472        Self::new()
473    }
474}
475
476#[cfg(test)]
477mod tests {
478    use super::*;
479
480    #[test]
481    fn test_mutation_rule_creation() {
482        let rule = MutationRule::new(
483            "test-1".to_string(),
484            "User".to_string(),
485            MutationTrigger::Interval {
486                duration_seconds: 3600,
487            },
488            MutationOperation::Increment {
489                field: "count".to_string(),
490                amount: 1.0,
491            },
492        );
493
494        assert_eq!(rule.id, "test-1");
495        assert_eq!(rule.entity_name, "User");
496        assert!(rule.enabled);
497    }
498
499    #[test]
500    fn test_mutation_trigger_interval() {
501        let rule = MutationRule::new(
502            "test-1".to_string(),
503            "User".to_string(),
504            MutationTrigger::Interval {
505                duration_seconds: 3600,
506            },
507            MutationOperation::Set {
508                field: "status".to_string(),
509                value: serde_json::json!("active"),
510            },
511        );
512
513        let now = Utc::now();
514        let next = rule.calculate_next_execution(now).unwrap();
515        let duration = next - now;
516
517        // Should be approximately 1 hour
518        assert!(duration.num_seconds() >= 3599 && duration.num_seconds() <= 3601);
519    }
520
521    #[tokio::test]
522    async fn test_mutation_rule_manager() {
523        let manager = MutationRuleManager::new();
524
525        let rule = MutationRule::new(
526            "test-1".to_string(),
527            "User".to_string(),
528            MutationTrigger::Interval {
529                duration_seconds: 3600,
530            },
531            MutationOperation::Increment {
532                field: "count".to_string(),
533                amount: 1.0,
534            },
535        );
536
537        manager.add_rule(rule).await.unwrap();
538
539        let rules = manager.list_rules().await;
540        assert_eq!(rules.len(), 1);
541        assert_eq!(rules[0].id, "test-1");
542    }
543}