1use crate::{
7 database::RecorderDatabase,
8 diff::{ComparisonResult, ResponseComparator},
9 Result,
10};
11use reqwest::Client;
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::RwLock;
17use tokio::time::{interval, MissedTickBehavior};
18use tracing::{debug, info, warn};
19use uuid::Uuid;
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct GitOpsConfig {
24 pub enabled: bool,
26 pub pr_provider: String, pub repo_owner: String,
30 pub repo_name: String,
32 #[serde(default = "default_main_branch")]
34 pub base_branch: String,
35 #[serde(default = "default_true")]
37 pub update_fixtures: bool,
38 #[serde(default)]
40 pub regenerate_sdks: bool,
41 #[serde(default = "default_true")]
43 pub update_docs: bool,
44 #[serde(default)]
46 pub auto_merge: bool,
47 #[serde(skip_serializing)]
49 pub token: Option<String>,
50}
51
52fn default_main_branch() -> String {
53 "main".to_string()
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct TrafficAwareConfig {
59 pub enabled: bool,
61 pub min_requests_threshold: Option<usize>,
63 pub top_percentage: Option<f64>,
65 #[serde(default = "default_lookback_days")]
67 pub lookback_days: u64,
68 #[serde(default)]
70 pub sync_real_endpoints: bool,
71 #[serde(default = "default_count_weight")]
73 pub weight_count: f64,
74 #[serde(default = "default_recency_weight")]
76 pub weight_recency: f64,
77 #[serde(default = "default_reality_weight")]
79 pub weight_reality: f64,
80}
81
82fn default_lookback_days() -> u64 {
83 7
84}
85
86fn default_count_weight() -> f64 {
87 1.0
88}
89
90fn default_recency_weight() -> f64 {
91 0.5
92}
93
94fn default_reality_weight() -> f64 {
95 -0.3
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct SyncConfig {
101 pub enabled: bool,
103 pub upstream_url: Option<String>,
105 pub interval_seconds: u64,
107 pub auto_update: bool,
109 pub max_requests_per_sync: usize,
111 pub request_timeout_seconds: u64,
113 pub headers: HashMap<String, String>,
115 #[serde(default = "default_true")]
117 pub sync_get_only: bool,
118 pub gitops_mode: Option<GitOpsConfig>,
120 pub traffic_aware: Option<TrafficAwareConfig>,
122}
123
124fn default_true() -> bool {
125 true
126}
127
128impl Default for SyncConfig {
129 fn default() -> Self {
130 Self {
131 enabled: false,
132 upstream_url: None,
133 interval_seconds: 3600, auto_update: false,
135 max_requests_per_sync: 100,
136 request_timeout_seconds: 30,
137 headers: HashMap::new(),
138 sync_get_only: true,
139 gitops_mode: None,
140 traffic_aware: None,
141 }
142 }
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct SyncStatus {
148 pub is_running: bool,
150 pub last_sync: Option<chrono::DateTime<chrono::Utc>>,
152 pub last_changes_detected: usize,
154 pub last_fixtures_updated: usize,
156 pub last_error: Option<String>,
158 pub total_syncs: u64,
160}
161
162#[derive(Debug, Clone, Serialize, Deserialize)]
164pub struct DetectedChange {
165 pub request_id: String,
167 pub method: String,
169 pub path: String,
171 pub comparison: ComparisonResult,
173 pub updated: bool,
175}
176
177pub struct SyncService {
179 config: Arc<RwLock<SyncConfig>>,
180 database: Arc<RecorderDatabase>,
181 status: Arc<RwLock<SyncStatus>>,
182 http_client: Client,
183}
184
185impl SyncService {
186 pub fn new(config: SyncConfig, database: Arc<RecorderDatabase>) -> Self {
188 let http_client = Client::builder()
189 .timeout(Duration::from_secs(config.request_timeout_seconds))
190 .build()
191 .expect("Failed to create HTTP client");
192
193 Self {
194 config: Arc::new(RwLock::new(config)),
195 database,
196 status: Arc::new(RwLock::new(SyncStatus {
197 is_running: false,
198 last_sync: None,
199 last_changes_detected: 0,
200 last_fixtures_updated: 0,
201 last_error: None,
202 total_syncs: 0,
203 })),
204 http_client,
205 }
206 }
207
208 pub fn start(&self) -> tokio::task::JoinHandle<()> {
210 let config = Arc::clone(&self.config);
211 let database = Arc::clone(&self.database);
212 let status = Arc::clone(&self.status);
213 let http_client = self.http_client.clone();
214
215 tokio::spawn(async move {
216 let mut interval_timer =
217 interval(Duration::from_secs(config.read().await.interval_seconds));
218 interval_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
219
220 loop {
221 interval_timer.tick().await;
222
223 let config_guard = config.read().await;
224 if !config_guard.enabled {
225 continue;
226 }
227
228 let upstream_url = match &config_guard.upstream_url {
229 Some(url) => url.clone(),
230 None => {
231 warn!("Sync enabled but no upstream_url configured");
232 continue;
233 }
234 };
235
236 let auto_update = config_guard.auto_update;
237 let max_requests = config_guard.max_requests_per_sync;
238 let sync_get_only = config_guard.sync_get_only;
239 let headers = config_guard.headers.clone();
240 drop(config_guard);
241
242 {
244 let mut status_guard = status.write().await;
245 status_guard.is_running = true;
246 }
247
248 info!("Starting automatic sync from upstream: {}", upstream_url);
249
250 let config_guard = config.read().await;
251 let traffic_analyzer = config_guard
252 .traffic_aware
253 .as_ref()
254 .map(|ta_config| crate::sync_traffic::TrafficAnalyzer::new(ta_config.clone()));
255 drop(config_guard);
256
257 match Self::sync_once(
258 &http_client,
259 &database,
260 &upstream_url,
261 auto_update,
262 max_requests,
263 sync_get_only,
264 &headers,
265 traffic_analyzer.as_ref(),
266 None, )
268 .await
269 {
270 Ok((changes, updated)) => {
271 let mut status_guard = status.write().await;
272 status_guard.is_running = false;
273 status_guard.last_sync = Some(chrono::Utc::now());
274 status_guard.last_changes_detected = changes.len();
275 status_guard.last_fixtures_updated = updated;
276 status_guard.last_error = None;
277 status_guard.total_syncs += 1;
278
279 if !changes.is_empty() {
280 info!(
281 "Sync complete: {} changes detected, {} fixtures updated",
282 changes.len(),
283 updated
284 );
285 } else {
286 debug!("Sync complete: No changes detected");
287 }
288 }
289 Err(e) => {
290 let mut status_guard = status.write().await;
291 status_guard.is_running = false;
292 status_guard.last_error = Some(e.to_string());
293 warn!("Sync failed: {}", e);
294 }
295 }
296 }
297 })
298 }
299
300 async fn sync_once(
302 http_client: &Client,
303 database: &RecorderDatabase,
304 upstream_url: &str,
305 auto_update: bool,
306 max_requests: usize,
307 sync_get_only: bool,
308 headers: &HashMap<String, String>,
309 traffic_analyzer: Option<&crate::sync_traffic::TrafficAnalyzer>,
310 continuum_engine: Option<
311 &mockforge_core::reality_continuum::engine::RealityContinuumEngine,
312 >,
313 ) -> Result<(Vec<DetectedChange>, usize)> {
314 let sync_cycle_id = format!("sync_{}", Uuid::new_v4());
316
317 let mut recorded_requests = database.list_recent(max_requests as i32).await?;
319
320 if let Some(analyzer) = traffic_analyzer {
322 let usage_stats = analyzer.aggregate_usage_stats_from_db(database).await;
324
325 let endpoints: Vec<(&str, &str)> =
327 recorded_requests.iter().map(|r| (r.method.as_str(), r.path.as_str())).collect();
328
329 let reality_ratios = analyzer.get_reality_ratios(&endpoints, continuum_engine).await;
331
332 let priorities = analyzer.calculate_priorities(&usage_stats, &reality_ratios);
334
335 let prioritized_endpoints: std::collections::HashSet<String> =
337 priorities.iter().map(|p| format!("{} {}", p.method, p.endpoint)).collect();
338
339 recorded_requests.retain(|req| {
340 let key = format!("{} {}", req.method, req.path);
341 prioritized_endpoints.contains(&key)
342 });
343
344 debug!(
345 "Traffic-aware filtering: {} requests after filtering (from {} total)",
346 recorded_requests.len(),
347 max_requests
348 );
349 }
350
351 let mut changes = Vec::new();
352 let mut updated_count = 0;
353
354 for request in recorded_requests {
355 if sync_get_only && request.method.to_uppercase() != "GET" {
357 continue;
358 }
359
360 let full_url =
362 if request.path.starts_with("http://") || request.path.starts_with("https://") {
363 request.path.clone()
364 } else {
365 format!("{}{}", upstream_url.trim_end_matches('/'), request.path)
366 };
367
368 match Self::replay_to_upstream(
370 http_client,
371 &full_url,
372 &request.method,
373 &request.headers,
374 headers,
375 )
376 .await
377 {
378 Ok((status, response_headers, response_body)) => {
379 if let Ok(Some(exchange)) = database.get_exchange(&request.id).await {
381 if let Some(original_response) = exchange.response {
382 let original_headers = original_response.headers_map();
383 let original_body =
384 original_response.decoded_body().unwrap_or_default();
385
386 let comparison = ResponseComparator::compare(
388 original_response.status_code as i32,
389 &original_headers,
390 &original_body,
391 status as i32,
392 &response_headers,
393 &response_body,
394 );
395
396 if !comparison.matches {
397 debug!(
398 "Change detected for {} {}: {} differences",
399 request.method,
400 request.path,
401 comparison.differences.len()
402 );
403
404 let snapshot_before = crate::sync_snapshots::SnapshotData {
406 status_code: original_response.status_code as u16,
407 headers: original_headers.clone(),
408 body: original_body.clone(),
409 body_json: serde_json::from_slice(&original_body).ok(),
410 };
411
412 let snapshot_after = crate::sync_snapshots::SnapshotData {
413 status_code: status as u16,
414 headers: response_headers.clone(),
415 body: response_body.clone(),
416 body_json: serde_json::from_slice(&response_body).ok(),
417 };
418
419 let snapshot = crate::sync_snapshots::SyncSnapshot::new(
420 request.path.clone(),
421 request.method.clone(),
422 sync_cycle_id.clone(),
423 snapshot_before,
424 snapshot_after,
425 comparison.clone(),
426 request.duration_ms.map(|d| d as u64),
427 None, );
429
430 if let Err(e) = database.insert_sync_snapshot(&snapshot).await {
432 warn!(
433 "Failed to store snapshot for {} {}: {}",
434 request.method, request.path, e
435 );
436 }
437
438 let mut updated = false;
439 if auto_update {
440 match Self::update_fixture(
442 database,
443 &request.id,
444 status,
445 &response_headers,
446 &response_body,
447 )
448 .await
449 {
450 Ok(_) => {
451 updated = true;
452 updated_count += 1;
453 info!(
454 "Updated fixture for {} {}",
455 request.method, request.path
456 );
457 }
458 Err(e) => {
459 warn!(
460 "Failed to update fixture for {} {}: {}",
461 request.method, request.path, e
462 );
463 }
464 }
465 }
466
467 changes.push(DetectedChange {
468 request_id: request.id.clone(),
469 method: request.method.clone(),
470 path: request.path.clone(),
471 comparison,
472 updated,
473 });
474 }
475 }
476 }
477 }
478 Err(e) => {
479 debug!(
480 "Failed to replay {} {} to upstream: {}",
481 request.method, request.path, e
482 );
483 }
485 }
486 }
487
488 Ok((changes, updated_count))
489 }
490
491 pub async fn sync_with_gitops(
493 &self,
494 gitops_handler: Option<&crate::sync_gitops::GitOpsSyncHandler>,
495 ) -> Result<(Vec<DetectedChange>, usize, Option<mockforge_core::pr_generation::PRResult>)> {
496 self.sync_with_gitops_and_drift(
497 gitops_handler,
498 None, )
500 .await
501 }
502
503 pub async fn sync_with_gitops_and_drift(
505 &self,
506 gitops_handler: Option<&crate::sync_gitops::GitOpsSyncHandler>,
507 drift_evaluator: Option<&crate::sync_drift::SyncDriftEvaluator>,
508 ) -> Result<(Vec<DetectedChange>, usize, Option<mockforge_core::pr_generation::PRResult>)> {
509 let config = self.config.read().await.clone();
510 let upstream_url = config.upstream_url.ok_or_else(|| {
511 crate::RecorderError::InvalidFilter("No upstream_url configured".to_string())
512 })?;
513
514 {
515 let mut status = self.status.write().await;
516 status.is_running = true;
517 }
518
519 let sync_cycle_id = format!("sync_{}", Uuid::new_v4());
521
522 let traffic_analyzer = config
523 .traffic_aware
524 .as_ref()
525 .map(|ta_config| crate::sync_traffic::TrafficAnalyzer::new(ta_config.clone()));
526
527 let result = Self::sync_once(
528 &self.http_client,
529 &self.database,
530 &upstream_url,
531 false, config.max_requests_per_sync,
533 config.sync_get_only,
534 &config.headers,
535 traffic_analyzer.as_ref(),
536 None, )
538 .await;
539
540 let (changes, _updated_count) = match &result {
541 Ok((c, u)) => (c.clone(), *u),
542 Err(_) => (Vec::new(), 0),
543 };
544
545 let pr_result = if let Some(handler) = gitops_handler {
547 handler
548 .process_sync_changes(&self.database, &changes, &sync_cycle_id)
549 .await
550 .ok()
551 .flatten()
552 } else {
553 None
554 };
555
556 if let Some(evaluator) = drift_evaluator {
558 if let Err(e) = evaluator
559 .evaluate_sync_changes(&changes, &sync_cycle_id, None, None, None)
560 .await
561 {
562 warn!("Failed to evaluate drift budgets for sync changes: {}", e);
563 }
564 }
565
566 {
567 let mut status = self.status.write().await;
568 status.is_running = false;
569 match &result {
570 Ok((changes, updated)) => {
571 status.last_sync = Some(chrono::Utc::now());
572 status.last_changes_detected = changes.len();
573 status.last_fixtures_updated = *updated;
574 status.last_error = None;
575 status.total_syncs += 1;
576 }
577 Err(e) => {
578 status.last_error = Some(e.to_string());
579 }
580 }
581 }
582
583 match result {
584 Ok((changes, updated)) => Ok((changes, updated, pr_result)),
585 Err(e) => Err(e),
586 }
587 }
588
589 async fn replay_to_upstream(
591 http_client: &Client,
592 url: &str,
593 method: &str,
594 original_headers: &str,
595 additional_headers: &HashMap<String, String>,
596 ) -> Result<(u16, HashMap<String, String>, Vec<u8>)> {
597 let mut headers_map = HashMap::new();
599 if let Ok(json) = serde_json::from_str::<HashMap<String, String>>(original_headers) {
600 headers_map = json;
601 }
602
603 for (key, value) in additional_headers {
605 headers_map.insert(key.clone(), value.clone());
606 }
607
608 let reqwest_method = match method.to_uppercase().as_str() {
610 "GET" => reqwest::Method::GET,
611 "POST" => reqwest::Method::POST,
612 "PUT" => reqwest::Method::PUT,
613 "DELETE" => reqwest::Method::DELETE,
614 "PATCH" => reqwest::Method::PATCH,
615 "HEAD" => reqwest::Method::HEAD,
616 "OPTIONS" => reqwest::Method::OPTIONS,
617 _ => {
618 return Err(crate::RecorderError::InvalidFilter(format!(
619 "Unsupported method: {}",
620 method
621 )))
622 }
623 };
624
625 let mut request_builder = http_client.request(reqwest_method, url);
626
627 for (key, value) in &headers_map {
629 if let Ok(header_name) = reqwest::header::HeaderName::from_bytes(key.as_bytes()) {
630 if let Ok(header_value) = reqwest::header::HeaderValue::from_str(value) {
631 request_builder = request_builder.header(header_name, header_value);
632 }
633 }
634 }
635
636 let response = request_builder
638 .send()
639 .await
640 .map_err(|e| crate::RecorderError::InvalidFilter(format!("Request failed: {}", e)))?;
641
642 let status = response.status().as_u16();
643 let mut response_headers = HashMap::new();
644
645 for (key, value) in response.headers() {
646 if let Ok(value_str) = value.to_str() {
647 response_headers.insert(key.to_string(), value_str.to_string());
648 }
649 }
650
651 let response_body = response
652 .bytes()
653 .await
654 .map_err(|e| {
655 crate::RecorderError::InvalidFilter(format!("Failed to read response body: {}", e))
656 })?
657 .to_vec();
658
659 Ok((status, response_headers, response_body))
660 }
661
662 async fn update_fixture(
664 database: &RecorderDatabase,
665 request_id: &str,
666 status_code: u16,
667 headers: &HashMap<String, String>,
668 body: &[u8],
669 ) -> Result<()> {
670 let headers_json = serde_json::to_string(headers)?;
672 let body_encoded = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, body);
673 let body_size = body.len() as i64;
674
675 database
676 .update_response(
677 request_id,
678 status_code as i32,
679 &headers_json,
680 &body_encoded,
681 body_size,
682 )
683 .await?;
684
685 Ok(())
686 }
687
688 pub async fn get_status(&self) -> SyncStatus {
690 self.status.read().await.clone()
691 }
692
693 pub async fn get_config(&self) -> SyncConfig {
695 self.config.read().await.clone()
696 }
697
698 pub async fn update_config(&self, new_config: SyncConfig) {
700 *self.config.write().await = new_config;
701 }
702
703 pub async fn sync_now(&self) -> Result<(Vec<DetectedChange>, usize)> {
705 let config = self.config.read().await.clone();
706 let upstream_url = config.upstream_url.ok_or_else(|| {
707 crate::RecorderError::InvalidFilter("No upstream_url configured".to_string())
708 })?;
709
710 {
711 let mut status = self.status.write().await;
712 status.is_running = true;
713 }
714
715 let traffic_analyzer = config
716 .traffic_aware
717 .as_ref()
718 .map(|ta_config| crate::sync_traffic::TrafficAnalyzer::new(ta_config.clone()));
719
720 let result = Self::sync_once(
721 &self.http_client,
722 &self.database,
723 &upstream_url,
724 config.auto_update,
725 config.max_requests_per_sync,
726 config.sync_get_only,
727 &config.headers,
728 traffic_analyzer.as_ref(),
729 None, )
731 .await;
732
733 {
734 let mut status = self.status.write().await;
735 status.is_running = false;
736 match &result {
737 Ok((changes, updated)) => {
738 status.last_sync = Some(chrono::Utc::now());
739 status.last_changes_detected = changes.len();
740 status.last_fixtures_updated = *updated;
741 status.last_error = None;
742 status.total_syncs += 1;
743 }
744 Err(e) => {
745 status.last_error = Some(e.to_string());
746 }
747 }
748 }
749
750 result
751 }
752}
753
754#[cfg(test)]
755mod tests {
756 use super::*;
757
758 #[test]
759 fn test_sync_config_default() {
760 let config = SyncConfig::default();
761 assert!(!config.enabled);
762 assert_eq!(config.interval_seconds, 3600);
763 assert!(!config.auto_update);
764 assert_eq!(config.max_requests_per_sync, 100);
765 }
766
767 #[test]
768 fn test_sync_status_creation() {
769 let status = SyncStatus {
770 is_running: false,
771 last_sync: None,
772 last_changes_detected: 0,
773 last_fixtures_updated: 0,
774 last_error: None,
775 total_syncs: 0,
776 };
777
778 assert!(!status.is_running);
779 assert_eq!(status.total_syncs, 0);
780 }
781}