mockforge_recorder/
sync_traffic.rs

1//! Traffic-aware sync filtering
2//!
3//! This module provides functionality to filter and prioritize endpoints for sync
4//! based on usage statistics and Reality Continuum blend ratios.
5
6use crate::sync::{DetectedChange, TrafficAwareConfig};
7use chrono::{DateTime, Duration, Utc};
8use std::collections::HashMap;
9
10/// Endpoint usage statistics aggregated across all consumers
11#[derive(Debug, Clone)]
12pub struct EndpointUsageStats {
13    /// Endpoint path
14    pub endpoint: String,
15    /// HTTP method
16    pub method: String,
17    /// Total request count across all consumers
18    pub total_requests: u64,
19    /// Last usage timestamp
20    pub last_used_at: Option<DateTime<Utc>>,
21    /// Number of unique consumers
22    pub unique_consumers: usize,
23}
24
25/// Priority score for an endpoint
26#[derive(Debug, Clone)]
27pub struct EndpointPriority {
28    /// Endpoint path
29    pub endpoint: String,
30    /// HTTP method
31    pub method: String,
32    /// Priority score (higher = more important)
33    pub score: f64,
34    /// Request count
35    pub request_count: u64,
36    /// Recency score (0.0 to 1.0, 1.0 = most recent)
37    pub recency_score: f64,
38    /// Reality ratio (0.0 = all mock, 1.0 = all real)
39    pub reality_ratio: f64,
40}
41
42/// Traffic analyzer for sync operations
43pub struct TrafficAnalyzer {
44    config: TrafficAwareConfig,
45}
46
47impl TrafficAnalyzer {
48    /// Create a new traffic analyzer
49    pub fn new(config: TrafficAwareConfig) -> Self {
50        Self { config }
51    }
52
53    /// Aggregate usage statistics from database requests
54    ///
55    /// This aggregates usage from recorded requests in the database
56    pub async fn aggregate_usage_stats_from_db(
57        &self,
58        database: &crate::database::RecorderDatabase,
59    ) -> HashMap<String, EndpointUsageStats> {
60        let mut aggregated: HashMap<String, EndpointUsageStats> = HashMap::new();
61        let cutoff_time = Utc::now() - chrono::Duration::days(self.config.lookback_days as i64);
62
63        // Get recent requests from database
64        if let Ok(requests) = database.list_recent(10000).await {
65            for request in requests {
66                let key = format!("{} {}", request.method, request.path);
67
68                // Use timestamp directly (it's already DateTime<Utc>)
69                let request_time = if request.timestamp >= cutoff_time {
70                    Some(request.timestamp)
71                } else {
72                    None
73                };
74
75                let stats = aggregated.entry(key.clone()).or_insert_with(|| EndpointUsageStats {
76                    endpoint: request.path.clone(),
77                    method: request.method.clone(),
78                    total_requests: 0,
79                    last_used_at: None,
80                    unique_consumers: 0,
81                });
82
83                stats.total_requests += 1;
84
85                // Update last used time if this is more recent
86                if let Some(rt) = request_time {
87                    if stats.last_used_at.map_or(true, |last| rt > last) {
88                        stats.last_used_at = Some(rt);
89                    }
90                }
91            }
92        }
93
94        aggregated
95    }
96
97    /// Calculate priority scores for endpoints
98    pub fn calculate_priorities(
99        &self,
100        usage_stats: &HashMap<String, EndpointUsageStats>,
101        reality_ratios: &HashMap<String, f64>,
102    ) -> Vec<EndpointPriority> {
103        let mut priorities = Vec::new();
104        let now = Utc::now();
105        let lookback_duration = Duration::days(self.config.lookback_days as i64);
106        let cutoff_time = now - lookback_duration;
107
108        for (_key, stats) in usage_stats {
109            // Skip if last used before lookback window
110            if let Some(last_used) = stats.last_used_at {
111                if last_used < cutoff_time {
112                    continue;
113                }
114            }
115
116            // Get reality ratio (default to 0.0 if not found)
117            let reality_ratio = reality_ratios
118                .get(&format!("{} {}", stats.method, stats.endpoint))
119                .copied()
120                .unwrap_or(0.0);
121
122            // Skip endpoints with high reality ratio if configured
123            if !self.config.sync_real_endpoints && reality_ratio > 0.7 {
124                continue;
125            }
126
127            // Calculate recency score (0.0 to 1.0)
128            let recency_score = if let Some(last_used) = stats.last_used_at {
129                let age_seconds = (now - last_used).num_seconds().max(0) as f64;
130                let lookback_seconds = lookback_duration.num_seconds() as f64;
131                (1.0 - (age_seconds / lookback_seconds)).max(0.0).min(1.0)
132            } else {
133                0.0
134            };
135
136            // Calculate priority score
137            let score = (stats.total_requests as f64 * self.config.weight_count)
138                + (recency_score * self.config.weight_recency)
139                + (reality_ratio * self.config.weight_reality);
140
141            priorities.push(EndpointPriority {
142                endpoint: stats.endpoint.clone(),
143                method: stats.method.clone(),
144                score,
145                request_count: stats.total_requests,
146                recency_score,
147                reality_ratio,
148            });
149        }
150
151        // Sort by priority score (descending)
152        priorities
153            .sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap_or(std::cmp::Ordering::Equal));
154
155        priorities
156    }
157
158    /// Filter changes based on traffic-aware configuration
159    pub fn filter_changes(
160        &self,
161        changes: &[DetectedChange],
162        priorities: &[EndpointPriority],
163    ) -> Vec<DetectedChange> {
164        if !self.config.enabled {
165            return changes.to_vec();
166        }
167
168        // Create a set of prioritized endpoints
169        let mut prioritized_endpoints = std::collections::HashSet::new();
170
171        // Apply threshold filters
172        let filtered_priorities: Vec<&EndpointPriority> =
173            if let Some(min_requests) = self.config.min_requests_threshold {
174                priorities.iter().filter(|p| p.request_count >= min_requests as u64).collect()
175            } else {
176                priorities.iter().collect()
177            };
178
179        // Apply top percentage filter
180        let selected_priorities: Vec<&EndpointPriority> =
181            if let Some(top_pct) = self.config.top_percentage {
182                let count =
183                    ((filtered_priorities.len() as f64 * top_pct / 100.0).ceil() as usize).max(1);
184                filtered_priorities.into_iter().take(count).collect()
185            } else {
186                filtered_priorities
187            };
188
189        // Build set of endpoints to sync
190        for priority in selected_priorities {
191            prioritized_endpoints.insert(format!("{} {}", priority.method, priority.endpoint));
192        }
193
194        // Filter changes to only include prioritized endpoints
195        changes
196            .iter()
197            .filter(|change| {
198                let key = format!("{} {}", change.method, change.path);
199                prioritized_endpoints.contains(&key)
200            })
201            .cloned()
202            .collect()
203    }
204
205    /// Get reality ratios for endpoints from Reality Continuum engine
206    pub async fn get_reality_ratios(
207        &self,
208        endpoints: &[(&str, &str)],
209        continuum_engine: Option<
210            &mockforge_core::reality_continuum::engine::RealityContinuumEngine,
211        >,
212    ) -> HashMap<String, f64> {
213        let mut ratios = HashMap::new();
214
215        if let Some(engine) = continuum_engine {
216            for (method, endpoint) in endpoints {
217                let ratio = engine.get_blend_ratio(endpoint).await;
218                ratios.insert(format!("{} {}", method, endpoint), ratio);
219            }
220        } else {
221            // Default to 0.0 (all mock) if no continuum engine
222            for (method, endpoint) in endpoints {
223                ratios.insert(format!("{} {}", method, endpoint), 0.0);
224            }
225        }
226
227        ratios
228    }
229}