mockforge_recorder/
sync_traffic.rs1use crate::sync::{DetectedChange, TrafficAwareConfig};
7use chrono::{DateTime, Duration, Utc};
8use std::collections::HashMap;
9
10#[derive(Debug, Clone)]
12pub struct EndpointUsageStats {
13 pub endpoint: String,
15 pub method: String,
17 pub total_requests: u64,
19 pub last_used_at: Option<DateTime<Utc>>,
21 pub unique_consumers: usize,
23}
24
25#[derive(Debug, Clone)]
27pub struct EndpointPriority {
28 pub endpoint: String,
30 pub method: String,
32 pub score: f64,
34 pub request_count: u64,
36 pub recency_score: f64,
38 pub reality_ratio: f64,
40}
41
42pub struct TrafficAnalyzer {
44 config: TrafficAwareConfig,
45}
46
47impl TrafficAnalyzer {
48 pub fn new(config: TrafficAwareConfig) -> Self {
50 Self { config }
51 }
52
53 pub async fn aggregate_usage_stats_from_db(
57 &self,
58 database: &crate::database::RecorderDatabase,
59 ) -> HashMap<String, EndpointUsageStats> {
60 let mut aggregated: HashMap<String, EndpointUsageStats> = HashMap::new();
61 let cutoff_time = Utc::now() - chrono::Duration::days(self.config.lookback_days as i64);
62
63 if let Ok(requests) = database.list_recent(10000).await {
65 for request in requests {
66 let key = format!("{} {}", request.method, request.path);
67
68 let request_time = if request.timestamp >= cutoff_time {
70 Some(request.timestamp)
71 } else {
72 None
73 };
74
75 let stats = aggregated.entry(key.clone()).or_insert_with(|| EndpointUsageStats {
76 endpoint: request.path.clone(),
77 method: request.method.clone(),
78 total_requests: 0,
79 last_used_at: None,
80 unique_consumers: 0,
81 });
82
83 stats.total_requests += 1;
84
85 if let Some(rt) = request_time {
87 if stats.last_used_at.map_or(true, |last| rt > last) {
88 stats.last_used_at = Some(rt);
89 }
90 }
91 }
92 }
93
94 aggregated
95 }
96
97 pub fn calculate_priorities(
99 &self,
100 usage_stats: &HashMap<String, EndpointUsageStats>,
101 reality_ratios: &HashMap<String, f64>,
102 ) -> Vec<EndpointPriority> {
103 let mut priorities = Vec::new();
104 let now = Utc::now();
105 let lookback_duration = Duration::days(self.config.lookback_days as i64);
106 let cutoff_time = now - lookback_duration;
107
108 for (_key, stats) in usage_stats {
109 if let Some(last_used) = stats.last_used_at {
111 if last_used < cutoff_time {
112 continue;
113 }
114 }
115
116 let reality_ratio = reality_ratios
118 .get(&format!("{} {}", stats.method, stats.endpoint))
119 .copied()
120 .unwrap_or(0.0);
121
122 if !self.config.sync_real_endpoints && reality_ratio > 0.7 {
124 continue;
125 }
126
127 let recency_score = if let Some(last_used) = stats.last_used_at {
129 let age_seconds = (now - last_used).num_seconds().max(0) as f64;
130 let lookback_seconds = lookback_duration.num_seconds() as f64;
131 (1.0 - (age_seconds / lookback_seconds)).max(0.0).min(1.0)
132 } else {
133 0.0
134 };
135
136 let score = (stats.total_requests as f64 * self.config.weight_count)
138 + (recency_score * self.config.weight_recency)
139 + (reality_ratio * self.config.weight_reality);
140
141 priorities.push(EndpointPriority {
142 endpoint: stats.endpoint.clone(),
143 method: stats.method.clone(),
144 score,
145 request_count: stats.total_requests,
146 recency_score,
147 reality_ratio,
148 });
149 }
150
151 priorities
153 .sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap_or(std::cmp::Ordering::Equal));
154
155 priorities
156 }
157
158 pub fn filter_changes(
160 &self,
161 changes: &[DetectedChange],
162 priorities: &[EndpointPriority],
163 ) -> Vec<DetectedChange> {
164 if !self.config.enabled {
165 return changes.to_vec();
166 }
167
168 let mut prioritized_endpoints = std::collections::HashSet::new();
170
171 let filtered_priorities: Vec<&EndpointPriority> =
173 if let Some(min_requests) = self.config.min_requests_threshold {
174 priorities.iter().filter(|p| p.request_count >= min_requests as u64).collect()
175 } else {
176 priorities.iter().collect()
177 };
178
179 let selected_priorities: Vec<&EndpointPriority> =
181 if let Some(top_pct) = self.config.top_percentage {
182 let count =
183 ((filtered_priorities.len() as f64 * top_pct / 100.0).ceil() as usize).max(1);
184 filtered_priorities.into_iter().take(count).collect()
185 } else {
186 filtered_priorities
187 };
188
189 for priority in selected_priorities {
191 prioritized_endpoints.insert(format!("{} {}", priority.method, priority.endpoint));
192 }
193
194 changes
196 .iter()
197 .filter(|change| {
198 let key = format!("{} {}", change.method, change.path);
199 prioritized_endpoints.contains(&key)
200 })
201 .cloned()
202 .collect()
203 }
204
205 pub async fn get_reality_ratios(
207 &self,
208 endpoints: &[(&str, &str)],
209 continuum_engine: Option<
210 &mockforge_core::reality_continuum::engine::RealityContinuumEngine,
211 >,
212 ) -> HashMap<String, f64> {
213 let mut ratios = HashMap::new();
214
215 if let Some(engine) = continuum_engine {
216 for (method, endpoint) in endpoints {
217 let ratio = engine.get_blend_ratio(endpoint).await;
218 ratios.insert(format!("{} {}", method, endpoint), ratio);
219 }
220 } else {
221 for (method, endpoint) in endpoints {
223 ratios.insert(format!("{} {}", method, endpoint), 0.0);
224 }
225 }
226
227 ratios
228 }
229}