Skip to main content

mockforge_recorder/
sync_traffic.rs

1//! Traffic-aware sync filtering
2//!
3//! This module provides functionality to filter and prioritize endpoints for sync
4//! based on usage statistics and Reality Continuum blend ratios.
5
6use crate::sync::{DetectedChange, TrafficAwareConfig};
7use chrono::{DateTime, Duration, Utc};
8use std::collections::HashMap;
9
10/// Endpoint usage statistics aggregated across all consumers
11#[derive(Debug, Clone)]
12pub struct EndpointUsageStats {
13    /// Endpoint path
14    pub endpoint: String,
15    /// HTTP method
16    pub method: String,
17    /// Total request count across all consumers
18    pub total_requests: u64,
19    /// Last usage timestamp
20    pub last_used_at: Option<DateTime<Utc>>,
21    /// Number of unique consumers
22    pub unique_consumers: usize,
23}
24
25/// Priority score for an endpoint
26#[derive(Debug, Clone)]
27pub struct EndpointPriority {
28    /// Endpoint path
29    pub endpoint: String,
30    /// HTTP method
31    pub method: String,
32    /// Priority score (higher = more important)
33    pub score: f64,
34    /// Request count
35    pub request_count: u64,
36    /// Recency score (0.0 to 1.0, 1.0 = most recent)
37    pub recency_score: f64,
38    /// Reality ratio (0.0 = all mock, 1.0 = all real)
39    pub reality_ratio: f64,
40}
41
42/// Traffic analyzer for sync operations
43pub struct TrafficAnalyzer {
44    config: TrafficAwareConfig,
45}
46
47impl TrafficAnalyzer {
48    /// Create a new traffic analyzer
49    pub fn new(config: TrafficAwareConfig) -> Self {
50        Self { config }
51    }
52
53    /// Aggregate usage statistics from database requests
54    ///
55    /// This aggregates usage from recorded requests in the database
56    pub async fn aggregate_usage_stats_from_db(
57        &self,
58        database: &crate::database::RecorderDatabase,
59    ) -> HashMap<String, EndpointUsageStats> {
60        let mut aggregated: HashMap<String, EndpointUsageStats> = HashMap::new();
61        let cutoff_time = Utc::now() - Duration::days(self.config.lookback_days as i64);
62
63        // Get recent requests from database
64        if let Ok(requests) = database.list_recent(10000).await {
65            for request in requests {
66                let key = format!("{} {}", request.method, request.path);
67
68                // Use timestamp directly (it's already DateTime<Utc>)
69                let request_time = if request.timestamp >= cutoff_time {
70                    Some(request.timestamp)
71                } else {
72                    None
73                };
74
75                let stats = aggregated.entry(key.clone()).or_insert_with(|| EndpointUsageStats {
76                    endpoint: request.path.clone(),
77                    method: request.method.clone(),
78                    total_requests: 0,
79                    last_used_at: None,
80                    unique_consumers: 0,
81                });
82
83                stats.total_requests += 1;
84
85                // Update last used time if this is more recent
86                if let Some(rt) = request_time {
87                    if stats.last_used_at.is_none_or(|last| rt > last) {
88                        stats.last_used_at = Some(rt);
89                    }
90                }
91            }
92        }
93
94        aggregated
95    }
96
97    /// Calculate priority scores for endpoints
98    pub fn calculate_priorities(
99        &self,
100        usage_stats: &HashMap<String, EndpointUsageStats>,
101        reality_ratios: &HashMap<String, f64>,
102    ) -> Vec<EndpointPriority> {
103        let mut priorities = Vec::new();
104        let now = Utc::now();
105        let lookback_duration = Duration::days(self.config.lookback_days as i64);
106        let cutoff_time = now - lookback_duration;
107
108        for stats in usage_stats.values() {
109            // Skip if last used before lookback window
110            if let Some(last_used) = stats.last_used_at {
111                if last_used < cutoff_time {
112                    continue;
113                }
114            }
115
116            // Get reality ratio (default to 0.0 if not found)
117            let reality_ratio = reality_ratios
118                .get(&format!("{} {}", stats.method, stats.endpoint))
119                .copied()
120                .unwrap_or(0.0);
121
122            // Skip endpoints with high reality ratio if configured
123            if !self.config.sync_real_endpoints && reality_ratio > 0.7 {
124                continue;
125            }
126
127            // Calculate recency score (0.0 to 1.0)
128            let recency_score = if let Some(last_used) = stats.last_used_at {
129                let age_seconds = (now - last_used).num_seconds().max(0) as f64;
130                let lookback_seconds = lookback_duration.num_seconds() as f64;
131                (1.0 - (age_seconds / lookback_seconds)).clamp(0.0, 1.0)
132            } else {
133                0.0
134            };
135
136            // Calculate priority score
137            let score = (stats.total_requests as f64 * self.config.weight_count)
138                + (recency_score * self.config.weight_recency)
139                + (reality_ratio * self.config.weight_reality);
140
141            priorities.push(EndpointPriority {
142                endpoint: stats.endpoint.clone(),
143                method: stats.method.clone(),
144                score,
145                request_count: stats.total_requests,
146                recency_score,
147                reality_ratio,
148            });
149        }
150
151        // Sort by priority score (descending)
152        priorities
153            .sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap_or(std::cmp::Ordering::Equal));
154
155        priorities
156    }
157
158    /// Filter changes based on traffic-aware configuration
159    pub fn filter_changes(
160        &self,
161        changes: &[DetectedChange],
162        priorities: &[EndpointPriority],
163    ) -> Vec<DetectedChange> {
164        if !self.config.enabled {
165            return changes.to_vec();
166        }
167
168        // Create a set of prioritized endpoints
169        let mut prioritized_endpoints = std::collections::HashSet::new();
170
171        // Apply threshold filters
172        let filtered_priorities: Vec<&EndpointPriority> =
173            if let Some(min_requests) = self.config.min_requests_threshold {
174                priorities.iter().filter(|p| p.request_count >= min_requests as u64).collect()
175            } else {
176                priorities.iter().collect()
177            };
178
179        // Apply top percentage filter
180        let selected_priorities: Vec<&EndpointPriority> =
181            if let Some(top_pct) = self.config.top_percentage {
182                let count =
183                    ((filtered_priorities.len() as f64 * top_pct / 100.0).ceil() as usize).max(1);
184                filtered_priorities.into_iter().take(count).collect()
185            } else {
186                filtered_priorities
187            };
188
189        // Build set of endpoints to sync
190        for priority in selected_priorities {
191            prioritized_endpoints.insert(format!("{} {}", priority.method, priority.endpoint));
192        }
193
194        // Filter changes to only include prioritized endpoints
195        changes
196            .iter()
197            .filter(|change| {
198                let key = format!("{} {}", change.method, change.path);
199                prioritized_endpoints.contains(&key)
200            })
201            .cloned()
202            .collect()
203    }
204
205    /// Get reality ratios for endpoints from Reality Continuum engine
206    pub async fn get_reality_ratios(
207        &self,
208        endpoints: &[(&str, &str)],
209        continuum_engine: Option<
210            &mockforge_core::reality_continuum::engine::RealityContinuumEngine,
211        >,
212    ) -> HashMap<String, f64> {
213        let mut ratios = HashMap::new();
214
215        if let Some(engine) = continuum_engine {
216            for (method, endpoint) in endpoints {
217                let ratio = engine.get_blend_ratio(endpoint).await;
218                ratios.insert(format!("{} {}", method, endpoint), ratio);
219            }
220        } else {
221            // Default to 0.0 (all mock) if no continuum engine
222            for (method, endpoint) in endpoints {
223                ratios.insert(format!("{} {}", method, endpoint), 0.0);
224            }
225        }
226
227        ratios
228    }
229}
230
231#[cfg(test)]
232mod tests {
233    use super::*;
234    use crate::database::RecorderDatabase;
235    use crate::models::{Protocol, RecordedRequest};
236
237    fn create_test_summary() -> crate::diff::ComparisonSummary {
238        crate::diff::ComparisonSummary {
239            total_differences: 0,
240            added_fields: 0,
241            removed_fields: 0,
242            changed_fields: 0,
243            type_changes: 0,
244        }
245    }
246
247    fn create_test_comparison_result(matches: bool) -> crate::diff::ComparisonResult {
248        crate::diff::ComparisonResult {
249            matches,
250            status_match: matches,
251            headers_match: matches,
252            body_match: matches,
253            differences: vec![],
254            summary: create_test_summary(),
255        }
256    }
257
258    fn create_test_traffic_config() -> TrafficAwareConfig {
259        TrafficAwareConfig {
260            enabled: true,
261            min_requests_threshold: Some(5),
262            top_percentage: Some(50.0),
263            lookback_days: 7,
264            sync_real_endpoints: false,
265            weight_count: 1.0,
266            weight_recency: 0.5,
267            weight_reality: -0.3,
268        }
269    }
270
271    fn create_test_usage_stats() -> HashMap<String, EndpointUsageStats> {
272        let mut stats = HashMap::new();
273
274        stats.insert(
275            "GET /api/users".to_string(),
276            EndpointUsageStats {
277                endpoint: "/api/users".to_string(),
278                method: "GET".to_string(),
279                total_requests: 100,
280                last_used_at: Some(Utc::now() - Duration::hours(1)),
281                unique_consumers: 5,
282            },
283        );
284
285        stats.insert(
286            "POST /api/posts".to_string(),
287            EndpointUsageStats {
288                endpoint: "/api/posts".to_string(),
289                method: "POST".to_string(),
290                total_requests: 50,
291                last_used_at: Some(Utc::now() - Duration::days(2)),
292                unique_consumers: 3,
293            },
294        );
295
296        stats.insert(
297            "DELETE /api/old".to_string(),
298            EndpointUsageStats {
299                endpoint: "/api/old".to_string(),
300                method: "DELETE".to_string(),
301                total_requests: 10,
302                last_used_at: Some(Utc::now() - Duration::days(10)),
303                unique_consumers: 1,
304            },
305        );
306
307        stats
308    }
309
310    #[test]
311    fn test_traffic_analyzer_creation() {
312        let config = create_test_traffic_config();
313        let analyzer = TrafficAnalyzer::new(config);
314
315        // Should create successfully
316        assert!(std::mem::size_of_val(&analyzer) > 0);
317    }
318
319    #[tokio::test]
320    async fn test_aggregate_usage_stats_empty_db() {
321        let config = create_test_traffic_config();
322        let analyzer = TrafficAnalyzer::new(config);
323
324        let database = RecorderDatabase::new_in_memory().await.unwrap();
325        let stats = analyzer.aggregate_usage_stats_from_db(&database).await;
326
327        assert!(stats.is_empty());
328    }
329
330    #[tokio::test]
331    async fn test_aggregate_usage_stats_with_requests() {
332        let config = create_test_traffic_config();
333        let analyzer = TrafficAnalyzer::new(config);
334
335        let database = RecorderDatabase::new_in_memory().await.unwrap();
336
337        // Insert test requests
338        for i in 0..5 {
339            let request = RecordedRequest {
340                id: format!("req-{}", i),
341                protocol: Protocol::Http,
342                timestamp: Utc::now(),
343                method: "GET".to_string(),
344                path: "/api/users".to_string(),
345                query_params: None,
346                headers: "{}".to_string(),
347                body: None,
348                body_encoding: "utf8".to_string(),
349                client_ip: None,
350                trace_id: None,
351                span_id: None,
352                duration_ms: Some(100),
353                status_code: Some(200),
354                tags: None,
355            };
356            database.insert_request(&request).await.unwrap();
357        }
358
359        let stats = analyzer.aggregate_usage_stats_from_db(&database).await;
360
361        assert_eq!(stats.len(), 1);
362        let user_stats = stats.get("GET /api/users").unwrap();
363        assert_eq!(user_stats.total_requests, 5);
364        assert_eq!(user_stats.endpoint, "/api/users");
365        assert_eq!(user_stats.method, "GET");
366    }
367
368    #[test]
369    fn test_calculate_priorities_basic() {
370        let config = create_test_traffic_config();
371        let analyzer = TrafficAnalyzer::new(config);
372
373        let usage_stats = create_test_usage_stats();
374        let reality_ratios = HashMap::new();
375
376        let priorities = analyzer.calculate_priorities(&usage_stats, &reality_ratios);
377
378        // Should return priorities sorted by score
379        assert!(!priorities.is_empty());
380        assert_eq!(priorities[0].endpoint, "/api/users");
381        assert!(priorities[0].score > 0.0);
382    }
383
384    #[test]
385    fn test_calculate_priorities_with_reality_ratios() {
386        let config = create_test_traffic_config();
387        let analyzer = TrafficAnalyzer::new(config);
388
389        let usage_stats = create_test_usage_stats();
390        let mut reality_ratios = HashMap::new();
391        reality_ratios.insert("GET /api/users".to_string(), 0.5);
392        reality_ratios.insert("POST /api/posts".to_string(), 0.2);
393
394        let priorities = analyzer.calculate_priorities(&usage_stats, &reality_ratios);
395
396        assert!(!priorities.is_empty());
397        for priority in &priorities {
398            if priority.endpoint == "/api/users" {
399                assert_eq!(priority.reality_ratio, 0.5);
400            }
401        }
402    }
403
404    #[test]
405    fn test_calculate_priorities_filters_old_endpoints() {
406        let config = create_test_traffic_config();
407        let analyzer = TrafficAnalyzer::new(config);
408
409        let usage_stats = create_test_usage_stats();
410        let reality_ratios = HashMap::new();
411
412        let priorities = analyzer.calculate_priorities(&usage_stats, &reality_ratios);
413
414        // Old endpoint (10 days ago, beyond lookback window) should be filtered
415        let has_old = priorities.iter().any(|p| p.endpoint == "/api/old");
416        assert!(!has_old);
417    }
418
419    #[test]
420    fn test_calculate_priorities_filters_high_reality_ratio() {
421        let mut config = create_test_traffic_config();
422        config.sync_real_endpoints = false;
423        let analyzer = TrafficAnalyzer::new(config);
424
425        let usage_stats = create_test_usage_stats();
426        let mut reality_ratios = HashMap::new();
427        reality_ratios.insert("GET /api/users".to_string(), 0.9); // High reality ratio
428
429        let priorities = analyzer.calculate_priorities(&usage_stats, &reality_ratios);
430
431        // Endpoint with high reality ratio should be filtered
432        let has_high_reality = priorities.iter().any(|p| p.endpoint == "/api/users");
433        assert!(!has_high_reality);
434    }
435
436    #[test]
437    fn test_calculate_priorities_includes_high_reality_when_enabled() {
438        let mut config = create_test_traffic_config();
439        config.sync_real_endpoints = true;
440        let analyzer = TrafficAnalyzer::new(config);
441
442        let usage_stats = create_test_usage_stats();
443        let mut reality_ratios = HashMap::new();
444        reality_ratios.insert("GET /api/users".to_string(), 0.9);
445
446        let priorities = analyzer.calculate_priorities(&usage_stats, &reality_ratios);
447
448        // Should include high reality ratio when enabled
449        let has_high_reality = priorities.iter().any(|p| p.endpoint == "/api/users");
450        assert!(has_high_reality);
451    }
452
453    #[test]
454    fn test_calculate_priorities_recency_score() {
455        let config = create_test_traffic_config();
456        let analyzer = TrafficAnalyzer::new(config);
457
458        let usage_stats = create_test_usage_stats();
459        let reality_ratios = HashMap::new();
460
461        let priorities = analyzer.calculate_priorities(&usage_stats, &reality_ratios);
462
463        // More recent endpoint should have higher recency score
464        let users_priority = priorities.iter().find(|p| p.endpoint == "/api/users").unwrap();
465        let posts_priority = priorities.iter().find(|p| p.endpoint == "/api/posts").unwrap();
466
467        assert!(users_priority.recency_score > posts_priority.recency_score);
468    }
469
470    #[test]
471    fn test_filter_changes_disabled() {
472        let mut config = create_test_traffic_config();
473        config.enabled = false;
474        let analyzer = TrafficAnalyzer::new(config);
475
476        let changes = vec![
477            crate::sync::DetectedChange {
478                request_id: "req-1".to_string(),
479                method: "GET".to_string(),
480                path: "/api/users".to_string(),
481                comparison: create_test_comparison_result(false),
482                updated: false,
483            },
484            crate::sync::DetectedChange {
485                request_id: "req-2".to_string(),
486                method: "POST".to_string(),
487                path: "/api/posts".to_string(),
488                comparison: create_test_comparison_result(false),
489                updated: false,
490            },
491        ];
492
493        let priorities = vec![];
494        let filtered = analyzer.filter_changes(&changes, &priorities);
495
496        // Should return all changes when disabled
497        assert_eq!(filtered.len(), 2);
498    }
499
500    #[test]
501    fn test_filter_changes_with_min_requests() {
502        let config = create_test_traffic_config();
503        let analyzer = TrafficAnalyzer::new(config);
504
505        let changes = vec![
506            crate::sync::DetectedChange {
507                request_id: "req-1".to_string(),
508                method: "GET".to_string(),
509                path: "/api/users".to_string(),
510                comparison: create_test_comparison_result(false),
511                updated: false,
512            },
513            crate::sync::DetectedChange {
514                request_id: "req-2".to_string(),
515                method: "POST".to_string(),
516                path: "/api/posts".to_string(),
517                comparison: create_test_comparison_result(false),
518                updated: false,
519            },
520        ];
521
522        let priorities = vec![
523            EndpointPriority {
524                endpoint: "/api/users".to_string(),
525                method: "GET".to_string(),
526                score: 100.0,
527                request_count: 100,
528                recency_score: 0.9,
529                reality_ratio: 0.0,
530            },
531            EndpointPriority {
532                endpoint: "/api/posts".to_string(),
533                method: "POST".to_string(),
534                score: 50.0,
535                request_count: 3, // Below min_requests_threshold of 5
536                recency_score: 0.5,
537                reality_ratio: 0.0,
538            },
539        ];
540
541        let filtered = analyzer.filter_changes(&changes, &priorities);
542
543        // Only /api/users should pass the threshold
544        assert_eq!(filtered.len(), 1);
545        assert_eq!(filtered[0].path, "/api/users");
546    }
547
548    #[test]
549    fn test_filter_changes_with_top_percentage() {
550        let config = create_test_traffic_config();
551        let analyzer = TrafficAnalyzer::new(config);
552
553        let changes = vec![
554            crate::sync::DetectedChange {
555                request_id: "req-1".to_string(),
556                method: "GET".to_string(),
557                path: "/api/users".to_string(),
558                comparison: create_test_comparison_result(false),
559                updated: false,
560            },
561            crate::sync::DetectedChange {
562                request_id: "req-2".to_string(),
563                method: "POST".to_string(),
564                path: "/api/posts".to_string(),
565                comparison: create_test_comparison_result(false),
566                updated: false,
567            },
568        ];
569
570        let priorities = vec![
571            EndpointPriority {
572                endpoint: "/api/users".to_string(),
573                method: "GET".to_string(),
574                score: 100.0,
575                request_count: 100,
576                recency_score: 0.9,
577                reality_ratio: 0.0,
578            },
579            EndpointPriority {
580                endpoint: "/api/posts".to_string(),
581                method: "POST".to_string(),
582                score: 50.0,
583                request_count: 50,
584                recency_score: 0.5,
585                reality_ratio: 0.0,
586            },
587        ];
588
589        let filtered = analyzer.filter_changes(&changes, &priorities);
590
591        // With 50% top percentage, should get top 1 endpoint
592        assert_eq!(filtered.len(), 1);
593        assert_eq!(filtered[0].path, "/api/users");
594    }
595
596    #[test]
597    fn test_filter_changes_empty_priorities() {
598        let config = create_test_traffic_config();
599        let analyzer = TrafficAnalyzer::new(config);
600
601        let changes = vec![crate::sync::DetectedChange {
602            request_id: "req-1".to_string(),
603            method: "GET".to_string(),
604            path: "/api/users".to_string(),
605            comparison: create_test_comparison_result(false),
606            updated: false,
607        }];
608
609        let priorities = vec![];
610        let filtered = analyzer.filter_changes(&changes, &priorities);
611
612        // No priorities means nothing passes filter
613        assert!(filtered.is_empty());
614    }
615
616    #[test]
617    fn test_endpoint_usage_stats_creation() {
618        let stats = EndpointUsageStats {
619            endpoint: "/api/test".to_string(),
620            method: "GET".to_string(),
621            total_requests: 42,
622            last_used_at: Some(Utc::now()),
623            unique_consumers: 3,
624        };
625
626        assert_eq!(stats.endpoint, "/api/test");
627        assert_eq!(stats.method, "GET");
628        assert_eq!(stats.total_requests, 42);
629        assert_eq!(stats.unique_consumers, 3);
630    }
631
632    #[test]
633    fn test_endpoint_priority_creation() {
634        let priority = EndpointPriority {
635            endpoint: "/api/test".to_string(),
636            method: "POST".to_string(),
637            score: 75.5,
638            request_count: 100,
639            recency_score: 0.8,
640            reality_ratio: 0.3,
641        };
642
643        assert_eq!(priority.endpoint, "/api/test");
644        assert_eq!(priority.method, "POST");
645        assert_eq!(priority.score, 75.5);
646        assert_eq!(priority.request_count, 100);
647        assert_eq!(priority.recency_score, 0.8);
648        assert_eq!(priority.reality_ratio, 0.3);
649    }
650
651    #[test]
652    fn test_traffic_aware_config_serialization() {
653        let config = create_test_traffic_config();
654        let json = serde_json::to_string(&config).unwrap();
655
656        assert!(json.contains("enabled"));
657        assert!(json.contains("min_requests_threshold"));
658        assert!(json.contains("top_percentage"));
659    }
660
661    #[test]
662    fn test_traffic_aware_config_deserialization() {
663        let json = r#"{
664            "enabled": true,
665            "min_requests_threshold": 10,
666            "top_percentage": 75.0,
667            "lookback_days": 14,
668            "sync_real_endpoints": true,
669            "weight_count": 2.0,
670            "weight_recency": 1.0,
671            "weight_reality": 0.5
672        }"#;
673
674        let config: TrafficAwareConfig = serde_json::from_str(json).unwrap();
675        assert!(config.enabled);
676        assert_eq!(config.min_requests_threshold, Some(10));
677        assert_eq!(config.top_percentage, Some(75.0));
678        assert_eq!(config.lookback_days, 14);
679        assert!(config.sync_real_endpoints);
680    }
681
682    #[test]
683    fn test_traffic_aware_config_defaults() {
684        let json = r#"{
685            "enabled": true
686        }"#;
687
688        let config: TrafficAwareConfig = serde_json::from_str(json).unwrap();
689        assert_eq!(config.lookback_days, 7);
690        assert_eq!(config.weight_count, 1.0);
691        assert_eq!(config.weight_recency, 0.5);
692        assert_eq!(config.weight_reality, -0.3);
693        assert!(!config.sync_real_endpoints);
694    }
695}