mockforge_core/
latency.rs

1//! Pillars: [Reality]
2//!
3//! Latency simulation and fault injection for MockForge
4
5use crate::Result;
6use rand::Rng;
7use std::collections::HashMap;
8use std::time::Duration;
9
10/// Latency distribution types
11#[derive(Debug, Clone, serde::Deserialize, serde::Serialize, Default)]
12#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
13#[serde(rename_all = "lowercase")]
14pub enum LatencyDistribution {
15    /// Fixed latency with optional jitter (backward compatible)
16    #[default]
17    Fixed,
18    /// Normal (Gaussian) distribution
19    Normal,
20    /// Pareto (power-law) distribution for heavy-tailed latency
21    Pareto,
22}
23
24/// Latency profile configuration
25#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
26#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
27pub struct LatencyProfile {
28    /// Base latency in milliseconds (mean for distributions)
29    pub base_ms: u64,
30    /// Random jitter range in milliseconds (for fixed distribution)
31    pub jitter_ms: u64,
32    /// Distribution type for latency variation
33    #[serde(default)]
34    pub distribution: LatencyDistribution,
35    /// Standard deviation for normal distribution (in milliseconds)
36    #[serde(default)]
37    pub std_dev_ms: Option<f64>,
38    /// Shape parameter for pareto distribution (alpha > 0)
39    #[serde(default)]
40    pub pareto_shape: Option<f64>,
41    /// Minimum latency bound (prevents negative values)
42    #[serde(default)]
43    pub min_ms: u64,
44    /// Maximum latency bound (prevents extreme values)
45    #[serde(default)]
46    pub max_ms: Option<u64>,
47    /// Tag-based latency overrides
48    pub tag_overrides: HashMap<String, u64>,
49}
50
51impl Default for LatencyProfile {
52    fn default() -> Self {
53        Self {
54            base_ms: 50,   // 50ms base latency
55            jitter_ms: 20, // ±20ms jitter
56            distribution: LatencyDistribution::Fixed,
57            std_dev_ms: None,
58            pareto_shape: None,
59            min_ms: 0,
60            max_ms: None,
61            tag_overrides: HashMap::new(),
62        }
63    }
64}
65
66impl LatencyProfile {
67    /// Create a new latency profile with fixed distribution (backward compatible)
68    pub fn new(base_ms: u64, jitter_ms: u64) -> Self {
69        Self {
70            base_ms,
71            jitter_ms,
72            distribution: LatencyDistribution::Fixed,
73            std_dev_ms: None,
74            pareto_shape: None,
75            min_ms: 0,
76            max_ms: None,
77            tag_overrides: HashMap::new(),
78        }
79    }
80
81    /// Create a new latency profile with normal distribution
82    pub fn with_normal_distribution(base_ms: u64, std_dev_ms: f64) -> Self {
83        Self {
84            base_ms,
85            jitter_ms: 0, // Not used for normal distribution
86            distribution: LatencyDistribution::Normal,
87            std_dev_ms: Some(std_dev_ms),
88            pareto_shape: None,
89            min_ms: 0,
90            max_ms: None,
91            tag_overrides: HashMap::new(),
92        }
93    }
94
95    /// Create a new latency profile with pareto distribution
96    pub fn with_pareto_distribution(base_ms: u64, shape: f64) -> Self {
97        Self {
98            base_ms,
99            jitter_ms: 0, // Not used for pareto distribution
100            distribution: LatencyDistribution::Pareto,
101            std_dev_ms: None,
102            pareto_shape: Some(shape),
103            min_ms: 0,
104            max_ms: None,
105            tag_overrides: HashMap::new(),
106        }
107    }
108
109    /// Add a tag-based latency override
110    pub fn with_tag_override(mut self, tag: String, latency_ms: u64) -> Self {
111        self.tag_overrides.insert(tag, latency_ms);
112        self
113    }
114
115    /// Set minimum latency bound
116    pub fn with_min_ms(mut self, min_ms: u64) -> Self {
117        self.min_ms = min_ms;
118        self
119    }
120
121    /// Set maximum latency bound
122    pub fn with_max_ms(mut self, max_ms: u64) -> Self {
123        self.max_ms = Some(max_ms);
124        self
125    }
126
127    /// Calculate latency for a request with optional tags
128    pub fn calculate_latency(&self, tags: &[String]) -> Duration {
129        let mut rng = rand::rng();
130
131        // Check for tag overrides (use the first matching tag)
132        // Note: Tag overrides always use fixed latency for simplicity
133        if let Some(&override_ms) = tags.iter().find_map(|tag| self.tag_overrides.get(tag)) {
134            return Duration::from_millis(override_ms);
135        }
136
137        let mut latency_ms = match self.distribution {
138            LatencyDistribution::Fixed => {
139                // Original behavior: base + jitter
140                let jitter = if self.jitter_ms > 0 {
141                    rng.random_range(0..=self.jitter_ms * 2).saturating_sub(self.jitter_ms)
142                } else {
143                    0
144                };
145                self.base_ms.saturating_add(jitter)
146            }
147            LatencyDistribution::Normal => {
148                // Simple approximation of normal distribution using Box-Muller transform
149                let std_dev = self.std_dev_ms.unwrap_or((self.base_ms as f64) * 0.2);
150                let mean = self.base_ms as f64;
151
152                // Generate two uniform random numbers
153                let u1: f64 = rng.random();
154                let u2: f64 = rng.random();
155
156                // Box-Muller transform
157                let z0 = (-2.0 * u1.ln()).sqrt() * (2.0 * std::f64::consts::PI * u2).cos();
158                (mean + std_dev * z0).max(0.0) as u64
159            }
160            LatencyDistribution::Pareto => {
161                // Pareto distribution: P(x) = shape * scale^shape / x^(shape+1) for x >= scale
162                let shape = self.pareto_shape.unwrap_or(2.0);
163                let scale = self.base_ms as f64;
164
165                // Inverse CDF method for Pareto distribution
166                let u: f64 = rng.random();
167                (scale / (1.0 - u).powf(1.0 / shape)) as u64
168            }
169        };
170
171        // Apply bounds
172        latency_ms = latency_ms.max(self.min_ms);
173        if let Some(max_ms) = self.max_ms {
174            latency_ms = latency_ms.min(max_ms);
175        }
176
177        Duration::from_millis(latency_ms)
178    }
179}
180
181/// Fault injection configuration
182#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
183pub struct FaultConfig {
184    /// Probability of failure (0.0 to 1.0)
185    pub failure_rate: f64,
186    /// HTTP status codes to return on failure
187    pub status_codes: Vec<u16>,
188    /// Custom error responses
189    pub error_responses: HashMap<String, serde_json::Value>,
190}
191
192impl Default for FaultConfig {
193    fn default() -> Self {
194        Self {
195            failure_rate: 0.0,
196            status_codes: vec![500, 502, 503, 504],
197            error_responses: HashMap::new(),
198        }
199    }
200}
201
202impl FaultConfig {
203    /// Create a new fault configuration
204    pub fn new(failure_rate: f64) -> Self {
205        Self {
206            failure_rate: failure_rate.clamp(0.0, 1.0),
207            ..Default::default()
208        }
209    }
210
211    /// Add a status code to the failure responses
212    pub fn with_status_code(mut self, code: u16) -> Self {
213        if !self.status_codes.contains(&code) {
214            self.status_codes.push(code);
215        }
216        self
217    }
218
219    /// Add a custom error response
220    pub fn with_error_response(mut self, key: String, response: serde_json::Value) -> Self {
221        self.error_responses.insert(key, response);
222        self
223    }
224
225    /// Determine if a failure should occur
226    pub fn should_fail(&self) -> bool {
227        if self.failure_rate <= 0.0 {
228            return false;
229        }
230        if self.failure_rate >= 1.0 {
231            return true;
232        }
233
234        let mut rng = rand::rng();
235        rng.random_bool(self.failure_rate)
236    }
237
238    /// Get a random failure response
239    pub fn get_failure_response(&self) -> (u16, Option<serde_json::Value>) {
240        let mut rng = rand::rng();
241
242        let status_code = if self.status_codes.is_empty() {
243            500
244        } else {
245            let index = rng.random_range(0..self.status_codes.len());
246            self.status_codes[index]
247        };
248
249        let error_response = if self.error_responses.is_empty() {
250            None
251        } else {
252            let keys: Vec<&String> = self.error_responses.keys().collect();
253            let key = keys[rng.random_range(0..keys.len())];
254            self.error_responses.get(key).cloned()
255        };
256
257        (status_code, error_response)
258    }
259}
260
261/// Latency and fault injector
262#[derive(Debug, Clone)]
263pub struct LatencyInjector {
264    /// Latency profile
265    latency_profile: LatencyProfile,
266    /// Fault configuration
267    fault_config: FaultConfig,
268    /// Whether injection is enabled
269    enabled: bool,
270}
271
272impl LatencyInjector {
273    /// Create a new latency injector
274    pub fn new(latency_profile: LatencyProfile, fault_config: FaultConfig) -> Self {
275        Self {
276            latency_profile,
277            fault_config,
278            enabled: true,
279        }
280    }
281
282    /// Enable or disable injection
283    pub fn set_enabled(&mut self, enabled: bool) {
284        self.enabled = enabled;
285    }
286
287    /// Check if injection is enabled
288    pub fn is_enabled(&self) -> bool {
289        self.enabled
290    }
291
292    /// Inject latency for a request
293    pub async fn inject_latency(&self, tags: &[String]) -> Result<()> {
294        if !self.enabled {
295            return Ok(());
296        }
297
298        let latency = self.latency_profile.calculate_latency(tags);
299        if !latency.is_zero() {
300            tokio::time::sleep(latency).await;
301        }
302
303        Ok(())
304    }
305
306    /// Check if a failure should be injected
307    pub fn should_inject_failure(&self) -> bool {
308        if !self.enabled {
309            return false;
310        }
311
312        self.fault_config.should_fail()
313    }
314
315    /// Get failure response details
316    pub fn get_failure_response(&self) -> (u16, Option<serde_json::Value>) {
317        self.fault_config.get_failure_response()
318    }
319
320    /// Process a request with latency and potential fault injection
321    pub async fn process_request(
322        &self,
323        tags: &[String],
324    ) -> Result<Option<(u16, Option<serde_json::Value>)>> {
325        if !self.enabled {
326            return Ok(None);
327        }
328
329        // Inject latency first
330        self.inject_latency(tags).await?;
331
332        // Check for fault injection
333        if self.should_inject_failure() {
334            let (status, response) = self.get_failure_response();
335            return Ok(Some((status, response)));
336        }
337
338        Ok(None)
339    }
340
341    /// Update latency profile at runtime
342    ///
343    /// This allows changing the latency profile without recreating the injector.
344    /// Useful for hot-reloading reality level configurations.
345    pub fn update_profile(&mut self, profile: LatencyProfile) {
346        self.latency_profile = profile;
347    }
348
349    /// Update latency profile (async version for Arc<RwLock>)
350    ///
351    /// Convenience method for updating a latency injector wrapped in Arc<RwLock>.
352    /// This is the recommended way to update latency profiles at runtime.
353    ///
354    /// # Returns
355    /// `Ok(())` on success, or an error if the update fails.
356    pub async fn update_profile_async(
357        this: &std::sync::Arc<tokio::sync::RwLock<Self>>,
358        profile: LatencyProfile,
359    ) -> Result<()> {
360        let mut injector = this.write().await;
361        injector.update_profile(profile);
362        Ok(())
363    }
364}
365
366impl Default for LatencyInjector {
367    fn default() -> Self {
368        Self::new(LatencyProfile::default(), FaultConfig::default())
369    }
370}
371
372#[cfg(test)]
373mod tests {
374    use super::*;
375
376    #[tokio::test]
377    async fn test_update_profile() {
378        let mut injector =
379            LatencyInjector::new(LatencyProfile::new(50, 20), FaultConfig::default());
380
381        // Update to a new profile
382        let new_profile = LatencyProfile::new(100, 30);
383        injector.update_profile(new_profile.clone());
384
385        // Verify the profile was updated
386        // Note: We can't directly access latency_profile, but we can test via behavior
387        assert!(injector.is_enabled());
388    }
389
390    #[tokio::test]
391    async fn test_update_profile_async() {
392        use std::sync::Arc;
393        use tokio::sync::RwLock;
394
395        let injector = Arc::new(RwLock::new(LatencyInjector::new(
396            LatencyProfile::new(50, 20),
397            FaultConfig::default(),
398        )));
399
400        // Update profile using async method
401        let new_profile = LatencyProfile::new(200, 50);
402        LatencyInjector::update_profile_async(&injector, new_profile).await;
403
404        // Verify it still works
405        assert!(injector.read().await.is_enabled());
406    }
407
408    #[test]
409    fn test_latency_profile_default() {
410        let profile = LatencyProfile::default();
411        assert_eq!(profile.base_ms, 50);
412        assert_eq!(profile.jitter_ms, 20);
413        assert_eq!(profile.min_ms, 0);
414        assert!(profile.max_ms.is_none());
415        assert!(matches!(profile.distribution, LatencyDistribution::Fixed));
416    }
417
418    #[test]
419    fn test_latency_profile_new() {
420        let profile = LatencyProfile::new(100, 25);
421        assert_eq!(profile.base_ms, 100);
422        assert_eq!(profile.jitter_ms, 25);
423        assert!(matches!(profile.distribution, LatencyDistribution::Fixed));
424    }
425
426    #[test]
427    fn test_latency_profile_normal_distribution() {
428        let profile = LatencyProfile::with_normal_distribution(100, 20.0);
429        assert_eq!(profile.base_ms, 100);
430        assert!(matches!(profile.distribution, LatencyDistribution::Normal));
431        assert_eq!(profile.std_dev_ms, Some(20.0));
432    }
433
434    #[test]
435    fn test_latency_profile_pareto_distribution() {
436        let profile = LatencyProfile::with_pareto_distribution(100, 2.5);
437        assert_eq!(profile.base_ms, 100);
438        assert!(matches!(profile.distribution, LatencyDistribution::Pareto));
439        assert_eq!(profile.pareto_shape, Some(2.5));
440    }
441
442    #[test]
443    fn test_latency_profile_with_tag_override() {
444        let profile = LatencyProfile::default()
445            .with_tag_override("slow".to_string(), 500)
446            .with_tag_override("fast".to_string(), 10);
447
448        assert_eq!(profile.tag_overrides.get("slow"), Some(&500));
449        assert_eq!(profile.tag_overrides.get("fast"), Some(&10));
450    }
451
452    #[test]
453    fn test_latency_profile_with_bounds() {
454        let profile = LatencyProfile::default().with_min_ms(10).with_max_ms(1000);
455
456        assert_eq!(profile.min_ms, 10);
457        assert_eq!(profile.max_ms, Some(1000));
458    }
459
460    #[test]
461    fn test_calculate_latency_with_tag_override() {
462        let profile = LatencyProfile::default().with_tag_override("slow".to_string(), 500);
463
464        let tags = vec!["slow".to_string()];
465        let latency = profile.calculate_latency(&tags);
466        assert_eq!(latency, Duration::from_millis(500));
467    }
468
469    #[test]
470    fn test_calculate_latency_fixed_distribution() {
471        let profile = LatencyProfile::new(100, 0);
472        let tags = Vec::new();
473        let latency = profile.calculate_latency(&tags);
474        assert_eq!(latency, Duration::from_millis(100));
475    }
476
477    #[test]
478    fn test_calculate_latency_respects_min_bound() {
479        let profile = LatencyProfile::new(10, 0).with_min_ms(50);
480        let tags = Vec::new();
481        let latency = profile.calculate_latency(&tags);
482        assert!(latency >= Duration::from_millis(50));
483    }
484
485    #[test]
486    fn test_calculate_latency_respects_max_bound() {
487        let profile = LatencyProfile::with_pareto_distribution(100, 2.0).with_max_ms(200);
488
489        for _ in 0..100 {
490            let latency = profile.calculate_latency(&[]);
491            assert!(latency <= Duration::from_millis(200));
492        }
493    }
494
495    #[test]
496    fn test_fault_config_default() {
497        let config = FaultConfig::default();
498        assert_eq!(config.failure_rate, 0.0);
499        assert!(!config.status_codes.is_empty());
500        assert!(config.error_responses.is_empty());
501    }
502
503    #[test]
504    fn test_fault_config_new() {
505        let config = FaultConfig::new(0.5);
506        assert_eq!(config.failure_rate, 0.5);
507    }
508
509    #[test]
510    fn test_fault_config_clamps_failure_rate() {
511        let config = FaultConfig::new(1.5);
512        assert_eq!(config.failure_rate, 1.0);
513
514        let config = FaultConfig::new(-0.5);
515        assert_eq!(config.failure_rate, 0.0);
516    }
517
518    #[test]
519    fn test_fault_config_with_status_code() {
520        let config = FaultConfig::default().with_status_code(400).with_status_code(404);
521
522        assert!(config.status_codes.contains(&400));
523        assert!(config.status_codes.contains(&404));
524    }
525
526    #[test]
527    fn test_fault_config_with_error_response() {
528        let response = serde_json::json!({"error": "test"});
529        let config =
530            FaultConfig::default().with_error_response("test".to_string(), response.clone());
531
532        assert_eq!(config.error_responses.get("test"), Some(&response));
533    }
534
535    #[test]
536    fn test_fault_config_should_fail_zero_rate() {
537        let config = FaultConfig::new(0.0);
538        assert!(!config.should_fail());
539    }
540
541    #[test]
542    fn test_fault_config_should_fail_full_rate() {
543        let config = FaultConfig::new(1.0);
544        assert!(config.should_fail());
545    }
546
547    #[test]
548    fn test_fault_config_should_fail_probabilistic() {
549        let config = FaultConfig::new(0.5);
550        let mut failures = 0;
551        let iterations = 1000;
552
553        for _ in 0..iterations {
554            if config.should_fail() {
555                failures += 1;
556            }
557        }
558
559        // Should be roughly 50% with some tolerance
560        let failure_rate = failures as f64 / iterations as f64;
561        assert!(failure_rate > 0.4 && failure_rate < 0.6);
562    }
563
564    #[test]
565    fn test_fault_config_get_failure_response() {
566        let config = FaultConfig::new(1.0).with_status_code(502);
567
568        let (status, _) = config.get_failure_response();
569        assert!(config.status_codes.contains(&status));
570    }
571
572    #[test]
573    fn test_latency_injector_new() {
574        let injector = LatencyInjector::new(LatencyProfile::default(), FaultConfig::default());
575        assert!(injector.is_enabled());
576    }
577
578    #[test]
579    fn test_latency_injector_enable_disable() {
580        let mut injector = LatencyInjector::default();
581        assert!(injector.is_enabled());
582
583        injector.set_enabled(false);
584        assert!(!injector.is_enabled());
585
586        injector.set_enabled(true);
587        assert!(injector.is_enabled());
588    }
589
590    #[tokio::test]
591    async fn test_latency_injector_inject_latency() {
592        let injector = LatencyInjector::new(LatencyProfile::new(10, 0), FaultConfig::default());
593
594        let start = std::time::Instant::now();
595        injector.inject_latency(&[]).await.unwrap();
596        let elapsed = start.elapsed();
597
598        assert!(elapsed >= Duration::from_millis(8));
599    }
600
601    #[tokio::test]
602    async fn test_latency_injector_disabled_no_latency() {
603        let mut injector =
604            LatencyInjector::new(LatencyProfile::new(100, 0), FaultConfig::default());
605        injector.set_enabled(false);
606
607        let start = std::time::Instant::now();
608        injector.inject_latency(&[]).await.unwrap();
609        let elapsed = start.elapsed();
610
611        assert!(elapsed < Duration::from_millis(10));
612    }
613
614    #[test]
615    fn test_latency_injector_should_inject_failure() {
616        let injector = LatencyInjector::new(LatencyProfile::default(), FaultConfig::new(1.0));
617
618        assert!(injector.should_inject_failure());
619    }
620
621    #[test]
622    fn test_latency_injector_disabled_no_failure() {
623        let mut injector = LatencyInjector::new(LatencyProfile::default(), FaultConfig::new(1.0));
624        injector.set_enabled(false);
625
626        assert!(!injector.should_inject_failure());
627    }
628
629    #[tokio::test]
630    async fn test_latency_injector_process_request_no_failure() {
631        let injector = LatencyInjector::new(LatencyProfile::new(10, 0), FaultConfig::new(0.0));
632
633        let result = injector.process_request(&[]).await.unwrap();
634        assert!(result.is_none());
635    }
636
637    #[tokio::test]
638    async fn test_latency_injector_process_request_with_failure() {
639        let fault_config = FaultConfig {
640            failure_rate: 1.0,
641            status_codes: vec![503], // Set to only one status code
642            ..Default::default()
643        };
644
645        let injector = LatencyInjector::new(LatencyProfile::new(10, 0), fault_config);
646
647        let result = injector.process_request(&[]).await.unwrap();
648        assert!(result.is_some());
649
650        let (status, _) = result.unwrap();
651        assert_eq!(status, 503);
652    }
653
654    #[tokio::test]
655    async fn test_latency_injector_process_request_disabled() {
656        let mut injector = LatencyInjector::new(LatencyProfile::new(100, 0), FaultConfig::new(1.0));
657        injector.set_enabled(false);
658
659        let result = injector.process_request(&[]).await.unwrap();
660        assert!(result.is_none());
661    }
662}