1use crate::{Error, Result};
34use chrono::{DateTime, Duration, Utc};
35use mockforge_core::time_travel_now;
36use serde::{Deserialize, Serialize};
37use std::collections::HashMap;
38use std::sync::Arc;
39use tokio::sync::RwLock;
40use tracing::{debug, info, warn};
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(tag = "type", rename_all = "lowercase")]
45pub enum MutationTrigger {
46 Interval {
48 duration_seconds: u64,
50 },
51 AtTime {
53 hour: u8,
55 minute: u8,
57 },
58 FieldThreshold {
60 field: String,
62 threshold: serde_json::Value,
64 operator: ComparisonOperator,
66 },
67}
68
69#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
71#[serde(rename_all = "lowercase")]
72pub enum ComparisonOperator {
73 Gt,
75 Lt,
77 Eq,
79 Gte,
81 Lte,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
87#[serde(tag = "type", rename_all = "lowercase")]
88pub enum MutationOperation {
89 Set {
91 field: String,
93 value: serde_json::Value,
95 },
96 Increment {
98 field: String,
100 amount: f64,
102 },
103 Decrement {
105 field: String,
107 amount: f64,
109 },
110 Transform {
112 field: String,
114 expression: String,
116 },
117 UpdateStatus {
119 status: String,
121 },
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct MutationRule {
127 pub id: String,
129 pub entity_name: String,
131 pub trigger: MutationTrigger,
133 pub operation: MutationOperation,
135 #[serde(default = "default_true")]
137 pub enabled: bool,
138 #[serde(default)]
140 pub description: Option<String>,
141 #[serde(default)]
143 pub condition: Option<String>,
144 #[serde(default)]
146 pub last_execution: Option<DateTime<Utc>>,
147 #[serde(default)]
149 pub next_execution: Option<DateTime<Utc>>,
150 #[serde(default)]
152 pub execution_count: usize,
153}
154
155fn default_true() -> bool {
156 true
157}
158
159impl MutationRule {
160 pub fn new(
162 id: String,
163 entity_name: String,
164 trigger: MutationTrigger,
165 operation: MutationOperation,
166 ) -> Self {
167 Self {
168 id,
169 entity_name,
170 trigger,
171 operation,
172 enabled: true,
173 description: None,
174 condition: None,
175 last_execution: None,
176 next_execution: None,
177 execution_count: 0,
178 }
179 }
180
181 pub fn calculate_next_execution(&self, from: DateTime<Utc>) -> Option<DateTime<Utc>> {
183 if !self.enabled {
184 return None;
185 }
186
187 match &self.trigger {
188 MutationTrigger::Interval { duration_seconds } => {
189 Some(from + Duration::seconds(*duration_seconds as i64))
190 }
191 MutationTrigger::AtTime { hour, minute } => {
192 let mut next =
194 from.date_naive().and_hms_opt(*hour as u32, *minute as u32, 0)?.and_utc();
195
196 if next <= from {
198 next = next + Duration::days(1);
199 }
200
201 Some(next)
202 }
203 MutationTrigger::FieldThreshold { .. } => {
204 None
206 }
207 }
208 }
209}
210
211pub struct MutationRuleManager {
213 rules: Arc<RwLock<HashMap<String, MutationRule>>>,
215}
216
217impl MutationRuleManager {
218 pub fn new() -> Self {
220 Self {
221 rules: Arc::new(RwLock::new(HashMap::new())),
222 }
223 }
224
225 pub async fn add_rule(&self, mut rule: MutationRule) -> Result<()> {
227 let now = time_travel_now();
229 rule.next_execution = rule.calculate_next_execution(now);
230
231 let rule_id = rule.id.clone();
232
233 let mut rules = self.rules.write().await;
234 rules.insert(rule_id.clone(), rule);
235
236 info!("Added mutation rule '{}' for entity '{}'", rule_id, rules[&rule_id].entity_name);
237 Ok(())
238 }
239
240 pub async fn remove_rule(&self, rule_id: &str) -> bool {
242 let mut rules = self.rules.write().await;
243 let removed = rules.remove(rule_id).is_some();
244
245 if removed {
246 info!("Removed mutation rule '{}'", rule_id);
247 }
248
249 removed
250 }
251
252 pub async fn get_rule(&self, rule_id: &str) -> Option<MutationRule> {
254 let rules = self.rules.read().await;
255 rules.get(rule_id).cloned()
256 }
257
258 pub async fn list_rules(&self) -> Vec<MutationRule> {
260 let rules = self.rules.read().await;
261 rules.values().cloned().collect()
262 }
263
264 pub async fn list_rules_for_entity(&self, entity_name: &str) -> Vec<MutationRule> {
266 let rules = self.rules.read().await;
267 rules.values().filter(|rule| rule.entity_name == entity_name).cloned().collect()
268 }
269
270 pub async fn set_rule_enabled(&self, rule_id: &str, enabled: bool) -> Result<()> {
272 let mut rules = self.rules.write().await;
273
274 if let Some(rule) = rules.get_mut(rule_id) {
275 rule.enabled = enabled;
276
277 if enabled {
279 let now = time_travel_now();
280 rule.next_execution = rule.calculate_next_execution(now);
281 } else {
282 rule.next_execution = None;
283 }
284
285 info!("Mutation rule '{}' {}", rule_id, if enabled { "enabled" } else { "disabled" });
286 Ok(())
287 } else {
288 Err(crate::Error::generic(format!("Mutation rule '{}' not found", rule_id)))
289 }
290 }
291
292 pub async fn check_and_execute(
297 &self,
298 database: &dyn crate::database::VirtualDatabase,
299 registry: &crate::entities::EntityRegistry,
300 ) -> Result<usize> {
301 let now = time_travel_now();
302 let mut executed = 0;
303
304 let mut rules_to_execute = Vec::new();
306
307 {
308 let rules = self.rules.read().await;
309 for rule in rules.values() {
310 if !rule.enabled {
311 continue;
312 }
313
314 if let Some(next) = rule.next_execution {
315 if now >= next {
316 rules_to_execute.push(rule.id.clone());
317 }
318 }
319 }
320 }
321
322 for rule_id in rules_to_execute {
324 if let Err(e) = self.execute_rule(&rule_id, database, registry).await {
325 warn!("Error executing mutation rule '{}': {}", rule_id, e);
326 } else {
327 executed += 1;
328 }
329 }
330
331 Ok(executed)
332 }
333
334 async fn execute_rule(
336 &self,
337 rule_id: &str,
338 database: &dyn crate::database::VirtualDatabase,
339 registry: &crate::entities::EntityRegistry,
340 ) -> Result<()> {
341 let now = time_travel_now();
342
343 let rule = {
345 let rules = self.rules.read().await;
346 rules
347 .get(rule_id)
348 .ok_or_else(|| Error::generic(format!("Mutation rule '{}' not found", rule_id)))?
349 .clone()
350 };
351
352 let entity = registry
354 .get(&rule.entity_name)
355 .ok_or_else(|| Error::generic(format!("Entity '{}' not found", rule.entity_name)))?;
356
357 let table_name = entity.table_name();
358
359 let query = format!("SELECT * FROM {}", table_name);
361 let records = database.query(&query, &[]).await?;
362
363 let pk_field = entity.schema.primary_key.first().map(|s| s.as_str()).unwrap_or("id");
365
366 for record in records {
367 if let Some(ref condition) = rule.condition {
369 debug!("Condition evaluation not yet implemented, skipping record");
372 continue;
373 }
374
375 let pk_value = record
377 .get(pk_field)
378 .ok_or_else(|| Error::generic(format!("Primary key '{}' not found", pk_field)))?;
379
380 match &rule.operation {
382 MutationOperation::Set { field, value } => {
383 let update_query =
384 format!("UPDATE {} SET {} = ? WHERE {} = ?", table_name, field, pk_field);
385 database.execute(&update_query, &[value.clone(), pk_value.clone()]).await?;
386 }
387 MutationOperation::Increment { field, amount } => {
388 if let Some(current) = record.get(field) {
390 let new_value = if let Some(num) = current.as_f64() {
391 serde_json::Value::Number(
392 serde_json::Number::from_f64(num + amount)
393 .unwrap_or_else(|| serde_json::Number::from(0)),
394 )
395 } else if let Some(num) = current.as_i64() {
396 serde_json::Value::Number(serde_json::Number::from(
397 num + *amount as i64,
398 ))
399 } else {
400 continue; };
402
403 let update_query = format!(
404 "UPDATE {} SET {} = ? WHERE {} = ?",
405 table_name, field, pk_field
406 );
407 database.execute(&update_query, &[new_value, pk_value.clone()]).await?;
408 }
409 }
410 MutationOperation::Decrement { field, amount } => {
411 if let Some(current) = record.get(field) {
413 let new_value = if let Some(num) = current.as_f64() {
414 serde_json::Value::Number(
415 serde_json::Number::from_f64(num - amount)
416 .unwrap_or_else(|| serde_json::Number::from(0)),
417 )
418 } else if let Some(num) = current.as_i64() {
419 serde_json::Value::Number(serde_json::Number::from(
420 num - *amount as i64,
421 ))
422 } else {
423 continue; };
425
426 let update_query = format!(
427 "UPDATE {} SET {} = ? WHERE {} = ?",
428 table_name, field, pk_field
429 );
430 database.execute(&update_query, &[new_value, pk_value.clone()]).await?;
431 }
432 }
433 MutationOperation::Transform {
434 field,
435 expression: _,
436 } => {
437 warn!("Transform operation not yet implemented for field '{}'", field);
439 }
440 MutationOperation::UpdateStatus { status } => {
441 let update_query =
442 format!("UPDATE {} SET status = ? WHERE {} = ?", table_name, pk_field);
443 database
444 .execute(
445 &update_query,
446 &[serde_json::Value::String(status.clone()), pk_value.clone()],
447 )
448 .await?;
449 }
450 }
451 }
452
453 {
455 let mut rules = self.rules.write().await;
456 if let Some(rule) = rules.get_mut(rule_id) {
457 rule.last_execution = Some(now);
458 rule.execution_count += 1;
459
460 rule.next_execution = rule.calculate_next_execution(now);
462 }
463 }
464
465 info!("Executed mutation rule '{}' on entity '{}'", rule_id, rule.entity_name);
466 Ok(())
467 }
468}
469
470impl Default for MutationRuleManager {
471 fn default() -> Self {
472 Self::new()
473 }
474}
475
476#[cfg(test)]
477mod tests {
478 use super::*;
479
480 #[test]
481 fn test_mutation_rule_creation() {
482 let rule = MutationRule::new(
483 "test-1".to_string(),
484 "User".to_string(),
485 MutationTrigger::Interval {
486 duration_seconds: 3600,
487 },
488 MutationOperation::Increment {
489 field: "count".to_string(),
490 amount: 1.0,
491 },
492 );
493
494 assert_eq!(rule.id, "test-1");
495 assert_eq!(rule.entity_name, "User");
496 assert!(rule.enabled);
497 }
498
499 #[test]
500 fn test_mutation_trigger_interval() {
501 let rule = MutationRule::new(
502 "test-1".to_string(),
503 "User".to_string(),
504 MutationTrigger::Interval {
505 duration_seconds: 3600,
506 },
507 MutationOperation::Set {
508 field: "status".to_string(),
509 value: serde_json::json!("active"),
510 },
511 );
512
513 let now = Utc::now();
514 let next = rule.calculate_next_execution(now).unwrap();
515 let duration = next - now;
516
517 assert!(duration.num_seconds() >= 3599 && duration.num_seconds() <= 3601);
519 }
520
521 #[tokio::test]
522 async fn test_mutation_rule_manager() {
523 let manager = MutationRuleManager::new();
524
525 let rule = MutationRule::new(
526 "test-1".to_string(),
527 "User".to_string(),
528 MutationTrigger::Interval {
529 duration_seconds: 3600,
530 },
531 MutationOperation::Increment {
532 field: "count".to_string(),
533 amount: 1.0,
534 },
535 );
536
537 manager.add_rule(rule).await.unwrap();
538
539 let rules = manager.list_rules().await;
540 assert_eq!(rules.len(), 1);
541 assert_eq!(rules[0].id, "test-1");
542 }
543}