1use crate::{
7 database::RecorderDatabase,
8 diff::{ComparisonResult, ResponseComparator},
9 Result,
10};
11use reqwest::Client;
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::RwLock;
17use tokio::time::{interval, MissedTickBehavior};
18use tracing::{debug, info, warn};
19use uuid::Uuid;
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct GitOpsConfig {
24 pub enabled: bool,
26 pub pr_provider: String, pub repo_owner: String,
30 pub repo_name: String,
32 #[serde(default = "default_main_branch")]
34 pub base_branch: String,
35 #[serde(default = "default_true")]
37 pub update_fixtures: bool,
38 #[serde(default)]
40 pub regenerate_sdks: bool,
41 #[serde(default = "default_true")]
43 pub update_docs: bool,
44 #[serde(default)]
46 pub auto_merge: bool,
47 #[serde(skip_serializing)]
49 pub token: Option<String>,
50}
51
52fn default_main_branch() -> String {
53 "main".to_string()
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct TrafficAwareConfig {
59 pub enabled: bool,
61 pub min_requests_threshold: Option<usize>,
63 pub top_percentage: Option<f64>,
65 #[serde(default = "default_lookback_days")]
67 pub lookback_days: u64,
68 #[serde(default)]
70 pub sync_real_endpoints: bool,
71 #[serde(default = "default_count_weight")]
73 pub weight_count: f64,
74 #[serde(default = "default_recency_weight")]
76 pub weight_recency: f64,
77 #[serde(default = "default_reality_weight")]
79 pub weight_reality: f64,
80}
81
82fn default_lookback_days() -> u64 {
83 7
84}
85
86fn default_count_weight() -> f64 {
87 1.0
88}
89
90fn default_recency_weight() -> f64 {
91 0.5
92}
93
94fn default_reality_weight() -> f64 {
95 -0.3
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct SyncConfig {
101 pub enabled: bool,
103 pub upstream_url: Option<String>,
105 pub interval_seconds: u64,
107 pub auto_update: bool,
109 pub max_requests_per_sync: usize,
111 pub request_timeout_seconds: u64,
113 pub headers: HashMap<String, String>,
115 #[serde(default = "default_true")]
117 pub sync_get_only: bool,
118 pub gitops_mode: Option<GitOpsConfig>,
120 pub traffic_aware: Option<TrafficAwareConfig>,
122}
123
124fn default_true() -> bool {
125 true
126}
127
128impl Default for SyncConfig {
129 fn default() -> Self {
130 Self {
131 enabled: false,
132 upstream_url: None,
133 interval_seconds: 3600, auto_update: false,
135 max_requests_per_sync: 100,
136 request_timeout_seconds: 30,
137 headers: HashMap::new(),
138 sync_get_only: true,
139 gitops_mode: None,
140 traffic_aware: None,
141 }
142 }
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct SyncStatus {
148 pub is_running: bool,
150 pub last_sync: Option<chrono::DateTime<chrono::Utc>>,
152 pub last_changes_detected: usize,
154 pub last_fixtures_updated: usize,
156 pub last_error: Option<String>,
158 pub total_syncs: u64,
160}
161
162#[derive(Debug, Clone, Serialize, Deserialize)]
164pub struct DetectedChange {
165 pub request_id: String,
167 pub method: String,
169 pub path: String,
171 pub comparison: ComparisonResult,
173 pub updated: bool,
175}
176
177pub struct SyncService {
179 config: Arc<RwLock<SyncConfig>>,
180 database: Arc<RecorderDatabase>,
181 status: Arc<RwLock<SyncStatus>>,
182 http_client: Client,
183}
184
185impl SyncService {
186 pub fn new(config: SyncConfig, database: Arc<RecorderDatabase>) -> Self {
188 let http_client = Client::builder()
189 .timeout(Duration::from_secs(config.request_timeout_seconds))
190 .build()
191 .expect("Failed to create HTTP client");
192
193 Self {
194 config: Arc::new(RwLock::new(config)),
195 database,
196 status: Arc::new(RwLock::new(SyncStatus {
197 is_running: false,
198 last_sync: None,
199 last_changes_detected: 0,
200 last_fixtures_updated: 0,
201 last_error: None,
202 total_syncs: 0,
203 })),
204 http_client,
205 }
206 }
207
208 pub fn start(&self) -> tokio::task::JoinHandle<()> {
210 let config = Arc::clone(&self.config);
211 let database = Arc::clone(&self.database);
212 let status = Arc::clone(&self.status);
213 let http_client = self.http_client.clone();
214
215 tokio::spawn(async move {
216 let mut interval_timer =
217 interval(Duration::from_secs(config.read().await.interval_seconds));
218 interval_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
219
220 loop {
221 interval_timer.tick().await;
222
223 let config_guard = config.read().await;
224 if !config_guard.enabled {
225 continue;
226 }
227
228 let upstream_url = match &config_guard.upstream_url {
229 Some(url) => url.clone(),
230 None => {
231 warn!("Sync enabled but no upstream_url configured");
232 continue;
233 }
234 };
235
236 let auto_update = config_guard.auto_update;
237 let max_requests = config_guard.max_requests_per_sync;
238 let sync_get_only = config_guard.sync_get_only;
239 let headers = config_guard.headers.clone();
240 drop(config_guard);
241
242 {
244 let mut status_guard = status.write().await;
245 status_guard.is_running = true;
246 }
247
248 info!("Starting automatic sync from upstream: {}", upstream_url);
249
250 let config_guard = config.read().await;
251 let traffic_analyzer = config_guard
252 .traffic_aware
253 .as_ref()
254 .map(|ta_config| crate::sync_traffic::TrafficAnalyzer::new(ta_config.clone()));
255 drop(config_guard);
256
257 match Self::sync_once(
258 &http_client,
259 &database,
260 &upstream_url,
261 auto_update,
262 max_requests,
263 sync_get_only,
264 &headers,
265 traffic_analyzer.as_ref(),
266 None, )
268 .await
269 {
270 Ok((changes, updated)) => {
271 let mut status_guard = status.write().await;
272 status_guard.is_running = false;
273 status_guard.last_sync = Some(chrono::Utc::now());
274 status_guard.last_changes_detected = changes.len();
275 status_guard.last_fixtures_updated = updated;
276 status_guard.last_error = None;
277 status_guard.total_syncs += 1;
278
279 if !changes.is_empty() {
280 info!(
281 "Sync complete: {} changes detected, {} fixtures updated",
282 changes.len(),
283 updated
284 );
285 } else {
286 debug!("Sync complete: No changes detected");
287 }
288 }
289 Err(e) => {
290 let mut status_guard = status.write().await;
291 status_guard.is_running = false;
292 status_guard.last_error = Some(e.to_string());
293 warn!("Sync failed: {}", e);
294 }
295 }
296 }
297 })
298 }
299
300 #[allow(clippy::too_many_arguments)]
302 async fn sync_once(
303 http_client: &Client,
304 database: &RecorderDatabase,
305 upstream_url: &str,
306 auto_update: bool,
307 max_requests: usize,
308 sync_get_only: bool,
309 headers: &HashMap<String, String>,
310 traffic_analyzer: Option<&crate::sync_traffic::TrafficAnalyzer>,
311 continuum_engine: Option<
312 &mockforge_core::reality_continuum::engine::RealityContinuumEngine,
313 >,
314 ) -> Result<(Vec<DetectedChange>, usize)> {
315 let sync_cycle_id = format!("sync_{}", Uuid::new_v4());
317
318 let mut recorded_requests = database.list_recent(max_requests as i32).await?;
320
321 if let Some(analyzer) = traffic_analyzer {
323 let usage_stats = analyzer.aggregate_usage_stats_from_db(database).await;
325
326 let endpoints: Vec<(&str, &str)> =
328 recorded_requests.iter().map(|r| (r.method.as_str(), r.path.as_str())).collect();
329
330 let reality_ratios = analyzer.get_reality_ratios(&endpoints, continuum_engine).await;
332
333 let priorities = analyzer.calculate_priorities(&usage_stats, &reality_ratios);
335
336 let prioritized_endpoints: std::collections::HashSet<String> =
338 priorities.iter().map(|p| format!("{} {}", p.method, p.endpoint)).collect();
339
340 recorded_requests.retain(|req| {
341 let key = format!("{} {}", req.method, req.path);
342 prioritized_endpoints.contains(&key)
343 });
344
345 debug!(
346 "Traffic-aware filtering: {} requests after filtering (from {} total)",
347 recorded_requests.len(),
348 max_requests
349 );
350 }
351
352 let mut changes = Vec::new();
353 let mut updated_count = 0;
354
355 for request in recorded_requests {
356 if sync_get_only && request.method.to_uppercase() != "GET" {
358 continue;
359 }
360
361 let full_url =
363 if request.path.starts_with("http://") || request.path.starts_with("https://") {
364 request.path.clone()
365 } else {
366 format!("{}{}", upstream_url.trim_end_matches('/'), request.path)
367 };
368
369 match Self::replay_to_upstream(
371 http_client,
372 &full_url,
373 &request.method,
374 &request.headers,
375 headers,
376 )
377 .await
378 {
379 Ok((status, response_headers, response_body)) => {
380 if let Ok(Some(exchange)) = database.get_exchange(&request.id).await {
382 if let Some(original_response) = exchange.response {
383 let original_headers = original_response.headers_map();
384 let original_body =
385 original_response.decoded_body().unwrap_or_default();
386
387 let comparison = ResponseComparator::compare(
389 original_response.status_code,
390 &original_headers,
391 &original_body,
392 status as i32,
393 &response_headers,
394 &response_body,
395 );
396
397 if !comparison.matches {
398 debug!(
399 "Change detected for {} {}: {} differences",
400 request.method,
401 request.path,
402 comparison.differences.len()
403 );
404
405 let snapshot_before = crate::sync_snapshots::SnapshotData {
407 status_code: original_response.status_code as u16,
408 headers: original_headers.clone(),
409 body: original_body.clone(),
410 body_json: serde_json::from_slice(&original_body).ok(),
411 };
412
413 let snapshot_after = crate::sync_snapshots::SnapshotData {
414 status_code: status,
415 headers: response_headers.clone(),
416 body: response_body.clone(),
417 body_json: serde_json::from_slice(&response_body).ok(),
418 };
419
420 let snapshot = crate::sync_snapshots::SyncSnapshot::new(
421 request.path.clone(),
422 request.method.clone(),
423 sync_cycle_id.clone(),
424 snapshot_before,
425 snapshot_after,
426 comparison.clone(),
427 request.duration_ms.map(|d| d as u64),
428 None, );
430
431 if let Err(e) = database.insert_sync_snapshot(&snapshot).await {
433 warn!(
434 "Failed to store snapshot for {} {}: {}",
435 request.method, request.path, e
436 );
437 }
438
439 let mut updated = false;
440 if auto_update {
441 match Self::update_fixture(
443 database,
444 &request.id,
445 status,
446 &response_headers,
447 &response_body,
448 )
449 .await
450 {
451 Ok(_) => {
452 updated = true;
453 updated_count += 1;
454 info!(
455 "Updated fixture for {} {}",
456 request.method, request.path
457 );
458 }
459 Err(e) => {
460 warn!(
461 "Failed to update fixture for {} {}: {}",
462 request.method, request.path, e
463 );
464 }
465 }
466 }
467
468 changes.push(DetectedChange {
469 request_id: request.id.clone(),
470 method: request.method.clone(),
471 path: request.path.clone(),
472 comparison,
473 updated,
474 });
475 }
476 }
477 }
478 }
479 Err(e) => {
480 debug!(
481 "Failed to replay {} {} to upstream: {}",
482 request.method, request.path, e
483 );
484 }
486 }
487 }
488
489 Ok((changes, updated_count))
490 }
491
492 pub async fn sync_with_gitops(
494 &self,
495 gitops_handler: Option<&crate::sync_gitops::GitOpsSyncHandler>,
496 ) -> Result<(Vec<DetectedChange>, usize, Option<mockforge_core::pr_generation::PRResult>)> {
497 self.sync_with_gitops_and_drift(
498 gitops_handler,
499 None, )
501 .await
502 }
503
504 pub async fn sync_with_gitops_and_drift(
506 &self,
507 gitops_handler: Option<&crate::sync_gitops::GitOpsSyncHandler>,
508 drift_evaluator: Option<&crate::sync_drift::SyncDriftEvaluator>,
509 ) -> Result<(Vec<DetectedChange>, usize, Option<mockforge_core::pr_generation::PRResult>)> {
510 let config = self.config.read().await.clone();
511 let upstream_url = config.upstream_url.ok_or_else(|| {
512 crate::RecorderError::InvalidFilter("No upstream_url configured".to_string())
513 })?;
514
515 {
516 let mut status = self.status.write().await;
517 status.is_running = true;
518 }
519
520 let sync_cycle_id = format!("sync_{}", Uuid::new_v4());
522
523 let traffic_analyzer = config
524 .traffic_aware
525 .as_ref()
526 .map(|ta_config| crate::sync_traffic::TrafficAnalyzer::new(ta_config.clone()));
527
528 let result = Self::sync_once(
529 &self.http_client,
530 &self.database,
531 &upstream_url,
532 false, config.max_requests_per_sync,
534 config.sync_get_only,
535 &config.headers,
536 traffic_analyzer.as_ref(),
537 None, )
539 .await;
540
541 let (changes, _updated_count) = match &result {
542 Ok((c, u)) => (c.clone(), *u),
543 Err(_) => (Vec::new(), 0),
544 };
545
546 let pr_result = if let Some(handler) = gitops_handler {
548 handler
549 .process_sync_changes(&self.database, &changes, &sync_cycle_id)
550 .await
551 .ok()
552 .flatten()
553 } else {
554 None
555 };
556
557 if let Some(evaluator) = drift_evaluator {
559 if let Err(e) = evaluator
560 .evaluate_sync_changes(&changes, &sync_cycle_id, None, None, None)
561 .await
562 {
563 warn!("Failed to evaluate drift budgets for sync changes: {}", e);
564 }
565 }
566
567 {
568 let mut status = self.status.write().await;
569 status.is_running = false;
570 match &result {
571 Ok((changes, updated)) => {
572 status.last_sync = Some(chrono::Utc::now());
573 status.last_changes_detected = changes.len();
574 status.last_fixtures_updated = *updated;
575 status.last_error = None;
576 status.total_syncs += 1;
577 }
578 Err(e) => {
579 status.last_error = Some(e.to_string());
580 }
581 }
582 }
583
584 match result {
585 Ok((changes, updated)) => Ok((changes, updated, pr_result)),
586 Err(e) => Err(e),
587 }
588 }
589
590 async fn replay_to_upstream(
592 http_client: &Client,
593 url: &str,
594 method: &str,
595 original_headers: &str,
596 additional_headers: &HashMap<String, String>,
597 ) -> Result<(u16, HashMap<String, String>, Vec<u8>)> {
598 let mut headers_map = HashMap::new();
600 if let Ok(json) = serde_json::from_str::<HashMap<String, String>>(original_headers) {
601 headers_map = json;
602 }
603
604 for (key, value) in additional_headers {
606 headers_map.insert(key.clone(), value.clone());
607 }
608
609 let reqwest_method = match method.to_uppercase().as_str() {
611 "GET" => reqwest::Method::GET,
612 "POST" => reqwest::Method::POST,
613 "PUT" => reqwest::Method::PUT,
614 "DELETE" => reqwest::Method::DELETE,
615 "PATCH" => reqwest::Method::PATCH,
616 "HEAD" => reqwest::Method::HEAD,
617 "OPTIONS" => reqwest::Method::OPTIONS,
618 _ => {
619 return Err(crate::RecorderError::InvalidFilter(format!(
620 "Unsupported method: {}",
621 method
622 )))
623 }
624 };
625
626 let mut request_builder = http_client.request(reqwest_method, url);
627
628 for (key, value) in &headers_map {
630 if let Ok(header_name) = reqwest::header::HeaderName::from_bytes(key.as_bytes()) {
631 if let Ok(header_value) = reqwest::header::HeaderValue::from_str(value) {
632 request_builder = request_builder.header(header_name, header_value);
633 }
634 }
635 }
636
637 let response = request_builder
639 .send()
640 .await
641 .map_err(|e| crate::RecorderError::InvalidFilter(format!("Request failed: {}", e)))?;
642
643 let status = response.status().as_u16();
644 let mut response_headers = HashMap::new();
645
646 for (key, value) in response.headers() {
647 if let Ok(value_str) = value.to_str() {
648 response_headers.insert(key.to_string(), value_str.to_string());
649 }
650 }
651
652 let response_body = response
653 .bytes()
654 .await
655 .map_err(|e| {
656 crate::RecorderError::InvalidFilter(format!("Failed to read response body: {}", e))
657 })?
658 .to_vec();
659
660 Ok((status, response_headers, response_body))
661 }
662
663 async fn update_fixture(
665 database: &RecorderDatabase,
666 request_id: &str,
667 status_code: u16,
668 headers: &HashMap<String, String>,
669 body: &[u8],
670 ) -> Result<()> {
671 let headers_json = serde_json::to_string(headers)?;
673 let body_encoded = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, body);
674 let body_size = body.len() as i64;
675
676 database
677 .update_response(
678 request_id,
679 status_code as i32,
680 &headers_json,
681 &body_encoded,
682 body_size,
683 )
684 .await?;
685
686 Ok(())
687 }
688
689 pub async fn get_status(&self) -> SyncStatus {
691 self.status.read().await.clone()
692 }
693
694 pub async fn get_config(&self) -> SyncConfig {
696 self.config.read().await.clone()
697 }
698
699 pub async fn update_config(&self, new_config: SyncConfig) {
701 *self.config.write().await = new_config;
702 }
703
704 pub async fn sync_now(&self) -> Result<(Vec<DetectedChange>, usize)> {
706 let config = self.config.read().await.clone();
707 let upstream_url = config.upstream_url.ok_or_else(|| {
708 crate::RecorderError::InvalidFilter("No upstream_url configured".to_string())
709 })?;
710
711 {
712 let mut status = self.status.write().await;
713 status.is_running = true;
714 }
715
716 let traffic_analyzer = config
717 .traffic_aware
718 .as_ref()
719 .map(|ta_config| crate::sync_traffic::TrafficAnalyzer::new(ta_config.clone()));
720
721 let result = Self::sync_once(
722 &self.http_client,
723 &self.database,
724 &upstream_url,
725 config.auto_update,
726 config.max_requests_per_sync,
727 config.sync_get_only,
728 &config.headers,
729 traffic_analyzer.as_ref(),
730 None, )
732 .await;
733
734 {
735 let mut status = self.status.write().await;
736 status.is_running = false;
737 match &result {
738 Ok((changes, updated)) => {
739 status.last_sync = Some(chrono::Utc::now());
740 status.last_changes_detected = changes.len();
741 status.last_fixtures_updated = *updated;
742 status.last_error = None;
743 status.total_syncs += 1;
744 }
745 Err(e) => {
746 status.last_error = Some(e.to_string());
747 }
748 }
749 }
750
751 result
752 }
753}
754
755#[cfg(test)]
756mod tests {
757 use super::*;
758
759 #[test]
760 fn test_sync_config_default() {
761 let config = SyncConfig::default();
762 assert!(!config.enabled);
763 assert_eq!(config.interval_seconds, 3600);
764 assert!(!config.auto_update);
765 assert_eq!(config.max_requests_per_sync, 100);
766 }
767
768 #[test]
769 fn test_sync_status_creation() {
770 let status = SyncStatus {
771 is_running: false,
772 last_sync: None,
773 last_changes_detected: 0,
774 last_fixtures_updated: 0,
775 last_error: None,
776 total_syncs: 0,
777 };
778
779 assert!(!status.is_running);
780 assert_eq!(status.total_syncs, 0);
781 }
782}