1use crate::sync::{DetectedChange, TrafficAwareConfig};
7use chrono::{DateTime, Duration, Utc};
8use std::collections::HashMap;
9
10#[derive(Debug, Clone)]
12pub struct EndpointUsageStats {
13 pub endpoint: String,
15 pub method: String,
17 pub total_requests: u64,
19 pub last_used_at: Option<DateTime<Utc>>,
21 pub unique_consumers: usize,
23}
24
25#[derive(Debug, Clone)]
27pub struct EndpointPriority {
28 pub endpoint: String,
30 pub method: String,
32 pub score: f64,
34 pub request_count: u64,
36 pub recency_score: f64,
38 pub reality_ratio: f64,
40}
41
42pub struct TrafficAnalyzer {
44 config: TrafficAwareConfig,
45}
46
47impl TrafficAnalyzer {
48 pub fn new(config: TrafficAwareConfig) -> Self {
50 Self { config }
51 }
52
53 pub async fn aggregate_usage_stats_from_db(
57 &self,
58 database: &crate::database::RecorderDatabase,
59 ) -> HashMap<String, EndpointUsageStats> {
60 let mut aggregated: HashMap<String, EndpointUsageStats> = HashMap::new();
61 let cutoff_time = Utc::now() - Duration::days(self.config.lookback_days as i64);
62
63 if let Ok(requests) = database.list_recent(10000).await {
65 for request in requests {
66 let key = format!("{} {}", request.method, request.path);
67
68 let request_time = if request.timestamp >= cutoff_time {
70 Some(request.timestamp)
71 } else {
72 None
73 };
74
75 let stats = aggregated.entry(key.clone()).or_insert_with(|| EndpointUsageStats {
76 endpoint: request.path.clone(),
77 method: request.method.clone(),
78 total_requests: 0,
79 last_used_at: None,
80 unique_consumers: 0,
81 });
82
83 stats.total_requests += 1;
84
85 if let Some(rt) = request_time {
87 if stats.last_used_at.is_none_or(|last| rt > last) {
88 stats.last_used_at = Some(rt);
89 }
90 }
91 }
92 }
93
94 aggregated
95 }
96
97 pub fn calculate_priorities(
99 &self,
100 usage_stats: &HashMap<String, EndpointUsageStats>,
101 reality_ratios: &HashMap<String, f64>,
102 ) -> Vec<EndpointPriority> {
103 let mut priorities = Vec::new();
104 let now = Utc::now();
105 let lookback_duration = Duration::days(self.config.lookback_days as i64);
106 let cutoff_time = now - lookback_duration;
107
108 for stats in usage_stats.values() {
109 if let Some(last_used) = stats.last_used_at {
111 if last_used < cutoff_time {
112 continue;
113 }
114 }
115
116 let reality_ratio = reality_ratios
118 .get(&format!("{} {}", stats.method, stats.endpoint))
119 .copied()
120 .unwrap_or(0.0);
121
122 if !self.config.sync_real_endpoints && reality_ratio > 0.7 {
124 continue;
125 }
126
127 let recency_score = if let Some(last_used) = stats.last_used_at {
129 let age_seconds = (now - last_used).num_seconds().max(0) as f64;
130 let lookback_seconds = lookback_duration.num_seconds() as f64;
131 (1.0 - (age_seconds / lookback_seconds)).clamp(0.0, 1.0)
132 } else {
133 0.0
134 };
135
136 let score = (stats.total_requests as f64 * self.config.weight_count)
138 + (recency_score * self.config.weight_recency)
139 + (reality_ratio * self.config.weight_reality);
140
141 priorities.push(EndpointPriority {
142 endpoint: stats.endpoint.clone(),
143 method: stats.method.clone(),
144 score,
145 request_count: stats.total_requests,
146 recency_score,
147 reality_ratio,
148 });
149 }
150
151 priorities
153 .sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap_or(std::cmp::Ordering::Equal));
154
155 priorities
156 }
157
158 pub fn filter_changes(
160 &self,
161 changes: &[DetectedChange],
162 priorities: &[EndpointPriority],
163 ) -> Vec<DetectedChange> {
164 if !self.config.enabled {
165 return changes.to_vec();
166 }
167
168 let mut prioritized_endpoints = std::collections::HashSet::new();
170
171 let filtered_priorities: Vec<&EndpointPriority> =
173 if let Some(min_requests) = self.config.min_requests_threshold {
174 priorities.iter().filter(|p| p.request_count >= min_requests as u64).collect()
175 } else {
176 priorities.iter().collect()
177 };
178
179 let selected_priorities: Vec<&EndpointPriority> =
181 if let Some(top_pct) = self.config.top_percentage {
182 let count =
183 ((filtered_priorities.len() as f64 * top_pct / 100.0).ceil() as usize).max(1);
184 filtered_priorities.into_iter().take(count).collect()
185 } else {
186 filtered_priorities
187 };
188
189 for priority in selected_priorities {
191 prioritized_endpoints.insert(format!("{} {}", priority.method, priority.endpoint));
192 }
193
194 changes
196 .iter()
197 .filter(|change| {
198 let key = format!("{} {}", change.method, change.path);
199 prioritized_endpoints.contains(&key)
200 })
201 .cloned()
202 .collect()
203 }
204
205 pub async fn get_reality_ratios(
207 &self,
208 endpoints: &[(&str, &str)],
209 continuum_engine: Option<
210 &mockforge_core::reality_continuum::engine::RealityContinuumEngine,
211 >,
212 ) -> HashMap<String, f64> {
213 let mut ratios = HashMap::new();
214
215 if let Some(engine) = continuum_engine {
216 for (method, endpoint) in endpoints {
217 let ratio = engine.get_blend_ratio(endpoint).await;
218 ratios.insert(format!("{} {}", method, endpoint), ratio);
219 }
220 } else {
221 for (method, endpoint) in endpoints {
223 ratios.insert(format!("{} {}", method, endpoint), 0.0);
224 }
225 }
226
227 ratios
228 }
229}
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234 use crate::database::RecorderDatabase;
235 use crate::models::{Protocol, RecordedRequest};
236
237 fn create_test_summary() -> crate::diff::ComparisonSummary {
238 crate::diff::ComparisonSummary {
239 total_differences: 0,
240 added_fields: 0,
241 removed_fields: 0,
242 changed_fields: 0,
243 type_changes: 0,
244 }
245 }
246
247 fn create_test_comparison_result(matches: bool) -> crate::diff::ComparisonResult {
248 crate::diff::ComparisonResult {
249 matches,
250 status_match: matches,
251 headers_match: matches,
252 body_match: matches,
253 differences: vec![],
254 summary: create_test_summary(),
255 }
256 }
257
258 fn create_test_traffic_config() -> TrafficAwareConfig {
259 TrafficAwareConfig {
260 enabled: true,
261 min_requests_threshold: Some(5),
262 top_percentage: Some(50.0),
263 lookback_days: 7,
264 sync_real_endpoints: false,
265 weight_count: 1.0,
266 weight_recency: 0.5,
267 weight_reality: -0.3,
268 }
269 }
270
271 fn create_test_usage_stats() -> HashMap<String, EndpointUsageStats> {
272 let mut stats = HashMap::new();
273
274 stats.insert(
275 "GET /api/users".to_string(),
276 EndpointUsageStats {
277 endpoint: "/api/users".to_string(),
278 method: "GET".to_string(),
279 total_requests: 100,
280 last_used_at: Some(Utc::now() - Duration::hours(1)),
281 unique_consumers: 5,
282 },
283 );
284
285 stats.insert(
286 "POST /api/posts".to_string(),
287 EndpointUsageStats {
288 endpoint: "/api/posts".to_string(),
289 method: "POST".to_string(),
290 total_requests: 50,
291 last_used_at: Some(Utc::now() - Duration::days(2)),
292 unique_consumers: 3,
293 },
294 );
295
296 stats.insert(
297 "DELETE /api/old".to_string(),
298 EndpointUsageStats {
299 endpoint: "/api/old".to_string(),
300 method: "DELETE".to_string(),
301 total_requests: 10,
302 last_used_at: Some(Utc::now() - Duration::days(10)),
303 unique_consumers: 1,
304 },
305 );
306
307 stats
308 }
309
310 #[test]
311 fn test_traffic_analyzer_creation() {
312 let config = create_test_traffic_config();
313 let analyzer = TrafficAnalyzer::new(config);
314
315 assert!(std::mem::size_of_val(&analyzer) > 0);
317 }
318
319 #[tokio::test]
320 async fn test_aggregate_usage_stats_empty_db() {
321 let config = create_test_traffic_config();
322 let analyzer = TrafficAnalyzer::new(config);
323
324 let database = RecorderDatabase::new_in_memory().await.unwrap();
325 let stats = analyzer.aggregate_usage_stats_from_db(&database).await;
326
327 assert!(stats.is_empty());
328 }
329
330 #[tokio::test]
331 async fn test_aggregate_usage_stats_with_requests() {
332 let config = create_test_traffic_config();
333 let analyzer = TrafficAnalyzer::new(config);
334
335 let database = RecorderDatabase::new_in_memory().await.unwrap();
336
337 for i in 0..5 {
339 let request = RecordedRequest {
340 id: format!("req-{}", i),
341 protocol: Protocol::Http,
342 timestamp: Utc::now(),
343 method: "GET".to_string(),
344 path: "/api/users".to_string(),
345 query_params: None,
346 headers: "{}".to_string(),
347 body: None,
348 body_encoding: "utf8".to_string(),
349 client_ip: None,
350 trace_id: None,
351 span_id: None,
352 duration_ms: Some(100),
353 status_code: Some(200),
354 tags: None,
355 };
356 database.insert_request(&request).await.unwrap();
357 }
358
359 let stats = analyzer.aggregate_usage_stats_from_db(&database).await;
360
361 assert_eq!(stats.len(), 1);
362 let user_stats = stats.get("GET /api/users").unwrap();
363 assert_eq!(user_stats.total_requests, 5);
364 assert_eq!(user_stats.endpoint, "/api/users");
365 assert_eq!(user_stats.method, "GET");
366 }
367
368 #[test]
369 fn test_calculate_priorities_basic() {
370 let config = create_test_traffic_config();
371 let analyzer = TrafficAnalyzer::new(config);
372
373 let usage_stats = create_test_usage_stats();
374 let reality_ratios = HashMap::new();
375
376 let priorities = analyzer.calculate_priorities(&usage_stats, &reality_ratios);
377
378 assert!(!priorities.is_empty());
380 assert_eq!(priorities[0].endpoint, "/api/users");
381 assert!(priorities[0].score > 0.0);
382 }
383
384 #[test]
385 fn test_calculate_priorities_with_reality_ratios() {
386 let config = create_test_traffic_config();
387 let analyzer = TrafficAnalyzer::new(config);
388
389 let usage_stats = create_test_usage_stats();
390 let mut reality_ratios = HashMap::new();
391 reality_ratios.insert("GET /api/users".to_string(), 0.5);
392 reality_ratios.insert("POST /api/posts".to_string(), 0.2);
393
394 let priorities = analyzer.calculate_priorities(&usage_stats, &reality_ratios);
395
396 assert!(!priorities.is_empty());
397 for priority in &priorities {
398 if priority.endpoint == "/api/users" {
399 assert_eq!(priority.reality_ratio, 0.5);
400 }
401 }
402 }
403
404 #[test]
405 fn test_calculate_priorities_filters_old_endpoints() {
406 let config = create_test_traffic_config();
407 let analyzer = TrafficAnalyzer::new(config);
408
409 let usage_stats = create_test_usage_stats();
410 let reality_ratios = HashMap::new();
411
412 let priorities = analyzer.calculate_priorities(&usage_stats, &reality_ratios);
413
414 let has_old = priorities.iter().any(|p| p.endpoint == "/api/old");
416 assert!(!has_old);
417 }
418
419 #[test]
420 fn test_calculate_priorities_filters_high_reality_ratio() {
421 let mut config = create_test_traffic_config();
422 config.sync_real_endpoints = false;
423 let analyzer = TrafficAnalyzer::new(config);
424
425 let usage_stats = create_test_usage_stats();
426 let mut reality_ratios = HashMap::new();
427 reality_ratios.insert("GET /api/users".to_string(), 0.9); let priorities = analyzer.calculate_priorities(&usage_stats, &reality_ratios);
430
431 let has_high_reality = priorities.iter().any(|p| p.endpoint == "/api/users");
433 assert!(!has_high_reality);
434 }
435
436 #[test]
437 fn test_calculate_priorities_includes_high_reality_when_enabled() {
438 let mut config = create_test_traffic_config();
439 config.sync_real_endpoints = true;
440 let analyzer = TrafficAnalyzer::new(config);
441
442 let usage_stats = create_test_usage_stats();
443 let mut reality_ratios = HashMap::new();
444 reality_ratios.insert("GET /api/users".to_string(), 0.9);
445
446 let priorities = analyzer.calculate_priorities(&usage_stats, &reality_ratios);
447
448 let has_high_reality = priorities.iter().any(|p| p.endpoint == "/api/users");
450 assert!(has_high_reality);
451 }
452
453 #[test]
454 fn test_calculate_priorities_recency_score() {
455 let config = create_test_traffic_config();
456 let analyzer = TrafficAnalyzer::new(config);
457
458 let usage_stats = create_test_usage_stats();
459 let reality_ratios = HashMap::new();
460
461 let priorities = analyzer.calculate_priorities(&usage_stats, &reality_ratios);
462
463 let users_priority = priorities.iter().find(|p| p.endpoint == "/api/users").unwrap();
465 let posts_priority = priorities.iter().find(|p| p.endpoint == "/api/posts").unwrap();
466
467 assert!(users_priority.recency_score > posts_priority.recency_score);
468 }
469
470 #[test]
471 fn test_filter_changes_disabled() {
472 let mut config = create_test_traffic_config();
473 config.enabled = false;
474 let analyzer = TrafficAnalyzer::new(config);
475
476 let changes = vec![
477 crate::sync::DetectedChange {
478 request_id: "req-1".to_string(),
479 method: "GET".to_string(),
480 path: "/api/users".to_string(),
481 comparison: create_test_comparison_result(false),
482 updated: false,
483 },
484 crate::sync::DetectedChange {
485 request_id: "req-2".to_string(),
486 method: "POST".to_string(),
487 path: "/api/posts".to_string(),
488 comparison: create_test_comparison_result(false),
489 updated: false,
490 },
491 ];
492
493 let priorities = vec![];
494 let filtered = analyzer.filter_changes(&changes, &priorities);
495
496 assert_eq!(filtered.len(), 2);
498 }
499
500 #[test]
501 fn test_filter_changes_with_min_requests() {
502 let config = create_test_traffic_config();
503 let analyzer = TrafficAnalyzer::new(config);
504
505 let changes = vec![
506 crate::sync::DetectedChange {
507 request_id: "req-1".to_string(),
508 method: "GET".to_string(),
509 path: "/api/users".to_string(),
510 comparison: create_test_comparison_result(false),
511 updated: false,
512 },
513 crate::sync::DetectedChange {
514 request_id: "req-2".to_string(),
515 method: "POST".to_string(),
516 path: "/api/posts".to_string(),
517 comparison: create_test_comparison_result(false),
518 updated: false,
519 },
520 ];
521
522 let priorities = vec![
523 EndpointPriority {
524 endpoint: "/api/users".to_string(),
525 method: "GET".to_string(),
526 score: 100.0,
527 request_count: 100,
528 recency_score: 0.9,
529 reality_ratio: 0.0,
530 },
531 EndpointPriority {
532 endpoint: "/api/posts".to_string(),
533 method: "POST".to_string(),
534 score: 50.0,
535 request_count: 3, recency_score: 0.5,
537 reality_ratio: 0.0,
538 },
539 ];
540
541 let filtered = analyzer.filter_changes(&changes, &priorities);
542
543 assert_eq!(filtered.len(), 1);
545 assert_eq!(filtered[0].path, "/api/users");
546 }
547
548 #[test]
549 fn test_filter_changes_with_top_percentage() {
550 let config = create_test_traffic_config();
551 let analyzer = TrafficAnalyzer::new(config);
552
553 let changes = vec![
554 crate::sync::DetectedChange {
555 request_id: "req-1".to_string(),
556 method: "GET".to_string(),
557 path: "/api/users".to_string(),
558 comparison: create_test_comparison_result(false),
559 updated: false,
560 },
561 crate::sync::DetectedChange {
562 request_id: "req-2".to_string(),
563 method: "POST".to_string(),
564 path: "/api/posts".to_string(),
565 comparison: create_test_comparison_result(false),
566 updated: false,
567 },
568 ];
569
570 let priorities = vec![
571 EndpointPriority {
572 endpoint: "/api/users".to_string(),
573 method: "GET".to_string(),
574 score: 100.0,
575 request_count: 100,
576 recency_score: 0.9,
577 reality_ratio: 0.0,
578 },
579 EndpointPriority {
580 endpoint: "/api/posts".to_string(),
581 method: "POST".to_string(),
582 score: 50.0,
583 request_count: 50,
584 recency_score: 0.5,
585 reality_ratio: 0.0,
586 },
587 ];
588
589 let filtered = analyzer.filter_changes(&changes, &priorities);
590
591 assert_eq!(filtered.len(), 1);
593 assert_eq!(filtered[0].path, "/api/users");
594 }
595
596 #[test]
597 fn test_filter_changes_empty_priorities() {
598 let config = create_test_traffic_config();
599 let analyzer = TrafficAnalyzer::new(config);
600
601 let changes = vec![crate::sync::DetectedChange {
602 request_id: "req-1".to_string(),
603 method: "GET".to_string(),
604 path: "/api/users".to_string(),
605 comparison: create_test_comparison_result(false),
606 updated: false,
607 }];
608
609 let priorities = vec![];
610 let filtered = analyzer.filter_changes(&changes, &priorities);
611
612 assert!(filtered.is_empty());
614 }
615
616 #[test]
617 fn test_endpoint_usage_stats_creation() {
618 let stats = EndpointUsageStats {
619 endpoint: "/api/test".to_string(),
620 method: "GET".to_string(),
621 total_requests: 42,
622 last_used_at: Some(Utc::now()),
623 unique_consumers: 3,
624 };
625
626 assert_eq!(stats.endpoint, "/api/test");
627 assert_eq!(stats.method, "GET");
628 assert_eq!(stats.total_requests, 42);
629 assert_eq!(stats.unique_consumers, 3);
630 }
631
632 #[test]
633 fn test_endpoint_priority_creation() {
634 let priority = EndpointPriority {
635 endpoint: "/api/test".to_string(),
636 method: "POST".to_string(),
637 score: 75.5,
638 request_count: 100,
639 recency_score: 0.8,
640 reality_ratio: 0.3,
641 };
642
643 assert_eq!(priority.endpoint, "/api/test");
644 assert_eq!(priority.method, "POST");
645 assert_eq!(priority.score, 75.5);
646 assert_eq!(priority.request_count, 100);
647 assert_eq!(priority.recency_score, 0.8);
648 assert_eq!(priority.reality_ratio, 0.3);
649 }
650
651 #[test]
652 fn test_traffic_aware_config_serialization() {
653 let config = create_test_traffic_config();
654 let json = serde_json::to_string(&config).unwrap();
655
656 assert!(json.contains("enabled"));
657 assert!(json.contains("min_requests_threshold"));
658 assert!(json.contains("top_percentage"));
659 }
660
661 #[test]
662 fn test_traffic_aware_config_deserialization() {
663 let json = r#"{
664 "enabled": true,
665 "min_requests_threshold": 10,
666 "top_percentage": 75.0,
667 "lookback_days": 14,
668 "sync_real_endpoints": true,
669 "weight_count": 2.0,
670 "weight_recency": 1.0,
671 "weight_reality": 0.5
672 }"#;
673
674 let config: TrafficAwareConfig = serde_json::from_str(json).unwrap();
675 assert!(config.enabled);
676 assert_eq!(config.min_requests_threshold, Some(10));
677 assert_eq!(config.top_percentage, Some(75.0));
678 assert_eq!(config.lookback_days, 14);
679 assert!(config.sync_real_endpoints);
680 }
681
682 #[test]
683 fn test_traffic_aware_config_defaults() {
684 let json = r#"{
685 "enabled": true
686 }"#;
687
688 let config: TrafficAwareConfig = serde_json::from_str(json).unwrap();
689 assert_eq!(config.lookback_days, 7);
690 assert_eq!(config.weight_count, 1.0);
691 assert_eq!(config.weight_recency, 0.5);
692 assert_eq!(config.weight_reality, -0.3);
693 assert!(!config.sync_real_endpoints);
694 }
695}