mockforge_recorder/
sync.rs

1//! Automatic sync/polling for detecting upstream API changes
2//!
3//! This module provides functionality to periodically poll upstream APIs,
4//! compare responses with recorded fixtures, and detect changes.
5
6use crate::{
7    database::RecorderDatabase,
8    diff::{ComparisonResult, ResponseComparator},
9    Result,
10};
11use reqwest::Client;
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::RwLock;
17use tokio::time::{interval, MissedTickBehavior};
18use tracing::{debug, info, warn};
19use uuid::Uuid;
20
21/// GitOps configuration for sync
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct GitOpsConfig {
24    /// Whether GitOps mode is enabled
25    pub enabled: bool,
26    /// PR provider (GitHub or GitLab)
27    pub pr_provider: String, // "github" or "gitlab"
28    /// Repository owner/org
29    pub repo_owner: String,
30    /// Repository name
31    pub repo_name: String,
32    /// Base branch (default: main)
33    #[serde(default = "default_main_branch")]
34    pub base_branch: String,
35    /// Whether to update fixture files
36    #[serde(default = "default_true")]
37    pub update_fixtures: bool,
38    /// Whether to regenerate SDKs
39    #[serde(default)]
40    pub regenerate_sdks: bool,
41    /// Whether to update OpenAPI specs
42    #[serde(default = "default_true")]
43    pub update_docs: bool,
44    /// Whether to auto-merge PRs
45    #[serde(default)]
46    pub auto_merge: bool,
47    /// Authentication token (GitHub PAT or GitLab token)
48    #[serde(skip_serializing)]
49    pub token: Option<String>,
50}
51
52fn default_main_branch() -> String {
53    "main".to_string()
54}
55
56/// Traffic-aware sync configuration
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct TrafficAwareConfig {
59    /// Whether traffic-aware sync is enabled
60    pub enabled: bool,
61    /// Minimum request count threshold (only sync if > N requests)
62    pub min_requests_threshold: Option<usize>,
63    /// Top percentage threshold (sync top X% of endpoints)
64    pub top_percentage: Option<f64>,
65    /// Lookback window in days for usage statistics
66    #[serde(default = "default_lookback_days")]
67    pub lookback_days: u64,
68    /// Whether to sync endpoints with high reality ratio (mostly real)
69    #[serde(default)]
70    pub sync_real_endpoints: bool,
71    /// Weight for request count in priority calculation
72    #[serde(default = "default_count_weight")]
73    pub weight_count: f64,
74    /// Weight for recency in priority calculation
75    #[serde(default = "default_recency_weight")]
76    pub weight_recency: f64,
77    /// Weight for reality ratio in priority calculation
78    #[serde(default = "default_reality_weight")]
79    pub weight_reality: f64,
80}
81
82fn default_lookback_days() -> u64 {
83    7
84}
85
86fn default_count_weight() -> f64 {
87    1.0
88}
89
90fn default_recency_weight() -> f64 {
91    0.5
92}
93
94fn default_reality_weight() -> f64 {
95    -0.3
96}
97
98/// Sync configuration
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct SyncConfig {
101    /// Whether sync is enabled
102    pub enabled: bool,
103    /// Upstream base URL to sync from
104    pub upstream_url: Option<String>,
105    /// Sync interval in seconds
106    pub interval_seconds: u64,
107    /// Whether to automatically update fixtures when changes detected
108    pub auto_update: bool,
109    /// Maximum number of requests to sync per interval
110    pub max_requests_per_sync: usize,
111    /// Timeout for sync requests in seconds
112    pub request_timeout_seconds: u64,
113    /// Headers to add to sync requests
114    pub headers: HashMap<String, String>,
115    /// Only sync GET requests (default: true)
116    #[serde(default = "default_true")]
117    pub sync_get_only: bool,
118    /// GitOps configuration (optional)
119    pub gitops_mode: Option<GitOpsConfig>,
120    /// Traffic-aware sync configuration (optional)
121    pub traffic_aware: Option<TrafficAwareConfig>,
122}
123
124fn default_true() -> bool {
125    true
126}
127
128impl Default for SyncConfig {
129    fn default() -> Self {
130        Self {
131            enabled: false,
132            upstream_url: None,
133            interval_seconds: 3600, // 1 hour default
134            auto_update: false,
135            max_requests_per_sync: 100,
136            request_timeout_seconds: 30,
137            headers: HashMap::new(),
138            sync_get_only: true,
139            gitops_mode: None,
140            traffic_aware: None,
141        }
142    }
143}
144
145/// Sync status
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct SyncStatus {
148    /// Whether sync is currently running
149    pub is_running: bool,
150    /// Last sync timestamp
151    pub last_sync: Option<chrono::DateTime<chrono::Utc>>,
152    /// Number of changes detected in last sync
153    pub last_changes_detected: usize,
154    /// Number of fixtures updated in last sync
155    pub last_fixtures_updated: usize,
156    /// Last sync error (if any)
157    pub last_error: Option<String>,
158    /// Total syncs performed
159    pub total_syncs: u64,
160}
161
162/// Detected change in an API response
163#[derive(Debug, Clone, Serialize, Deserialize)]
164pub struct DetectedChange {
165    /// Request ID from database
166    pub request_id: String,
167    /// Request method
168    pub method: String,
169    /// Request path
170    pub path: String,
171    /// Comparison result
172    pub comparison: ComparisonResult,
173    /// Whether fixture was updated
174    pub updated: bool,
175}
176
177/// Sync service for polling upstream APIs and detecting changes
178pub struct SyncService {
179    config: Arc<RwLock<SyncConfig>>,
180    database: Arc<RecorderDatabase>,
181    status: Arc<RwLock<SyncStatus>>,
182    http_client: Client,
183}
184
185impl SyncService {
186    /// Create a new sync service
187    pub fn new(config: SyncConfig, database: Arc<RecorderDatabase>) -> Self {
188        let http_client = Client::builder()
189            .timeout(Duration::from_secs(config.request_timeout_seconds))
190            .build()
191            .expect("Failed to create HTTP client");
192
193        Self {
194            config: Arc::new(RwLock::new(config)),
195            database,
196            status: Arc::new(RwLock::new(SyncStatus {
197                is_running: false,
198                last_sync: None,
199                last_changes_detected: 0,
200                last_fixtures_updated: 0,
201                last_error: None,
202                total_syncs: 0,
203            })),
204            http_client,
205        }
206    }
207
208    /// Start the sync service (runs in background)
209    pub fn start(&self) -> tokio::task::JoinHandle<()> {
210        let config = Arc::clone(&self.config);
211        let database = Arc::clone(&self.database);
212        let status = Arc::clone(&self.status);
213        let http_client = self.http_client.clone();
214
215        tokio::spawn(async move {
216            let mut interval_timer =
217                interval(Duration::from_secs(config.read().await.interval_seconds));
218            interval_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
219
220            loop {
221                interval_timer.tick().await;
222
223                let config_guard = config.read().await;
224                if !config_guard.enabled {
225                    continue;
226                }
227
228                let upstream_url = match &config_guard.upstream_url {
229                    Some(url) => url.clone(),
230                    None => {
231                        warn!("Sync enabled but no upstream_url configured");
232                        continue;
233                    }
234                };
235
236                let auto_update = config_guard.auto_update;
237                let max_requests = config_guard.max_requests_per_sync;
238                let sync_get_only = config_guard.sync_get_only;
239                let headers = config_guard.headers.clone();
240                drop(config_guard);
241
242                // Update status
243                {
244                    let mut status_guard = status.write().await;
245                    status_guard.is_running = true;
246                }
247
248                info!("Starting automatic sync from upstream: {}", upstream_url);
249
250                let config_guard = config.read().await;
251                let traffic_analyzer = config_guard
252                    .traffic_aware
253                    .as_ref()
254                    .map(|ta_config| crate::sync_traffic::TrafficAnalyzer::new(ta_config.clone()));
255                drop(config_guard);
256
257                match Self::sync_once(
258                    &http_client,
259                    &database,
260                    &upstream_url,
261                    auto_update,
262                    max_requests,
263                    sync_get_only,
264                    &headers,
265                    traffic_analyzer.as_ref(),
266                    None, // Continuum engine not available in background sync yet
267                )
268                .await
269                {
270                    Ok((changes, updated)) => {
271                        let mut status_guard = status.write().await;
272                        status_guard.is_running = false;
273                        status_guard.last_sync = Some(chrono::Utc::now());
274                        status_guard.last_changes_detected = changes.len();
275                        status_guard.last_fixtures_updated = updated;
276                        status_guard.last_error = None;
277                        status_guard.total_syncs += 1;
278
279                        if !changes.is_empty() {
280                            info!(
281                                "Sync complete: {} changes detected, {} fixtures updated",
282                                changes.len(),
283                                updated
284                            );
285                        } else {
286                            debug!("Sync complete: No changes detected");
287                        }
288                    }
289                    Err(e) => {
290                        let mut status_guard = status.write().await;
291                        status_guard.is_running = false;
292                        status_guard.last_error = Some(e.to_string());
293                        warn!("Sync failed: {}", e);
294                    }
295                }
296            }
297        })
298    }
299
300    /// Perform a single sync operation
301    async fn sync_once(
302        http_client: &Client,
303        database: &RecorderDatabase,
304        upstream_url: &str,
305        auto_update: bool,
306        max_requests: usize,
307        sync_get_only: bool,
308        headers: &HashMap<String, String>,
309        traffic_analyzer: Option<&crate::sync_traffic::TrafficAnalyzer>,
310        continuum_engine: Option<
311            &mockforge_core::reality_continuum::engine::RealityContinuumEngine,
312        >,
313    ) -> Result<(Vec<DetectedChange>, usize)> {
314        // Generate sync cycle ID for grouping snapshots from this sync operation
315        let sync_cycle_id = format!("sync_{}", Uuid::new_v4());
316
317        // Get recent recorded requests
318        let mut recorded_requests = database.list_recent(max_requests as i32).await?;
319
320        // Apply traffic-aware filtering if enabled
321        if let Some(analyzer) = traffic_analyzer {
322            // Aggregate usage stats from database
323            let usage_stats = analyzer.aggregate_usage_stats_from_db(database).await;
324
325            // Get endpoint list for reality ratio lookup
326            let endpoints: Vec<(&str, &str)> =
327                recorded_requests.iter().map(|r| (r.method.as_str(), r.path.as_str())).collect();
328
329            // Get reality ratios
330            let reality_ratios = analyzer.get_reality_ratios(&endpoints, continuum_engine).await;
331
332            // Calculate priorities
333            let priorities = analyzer.calculate_priorities(&usage_stats, &reality_ratios);
334
335            // Filter requests based on priorities
336            let prioritized_endpoints: std::collections::HashSet<String> =
337                priorities.iter().map(|p| format!("{} {}", p.method, p.endpoint)).collect();
338
339            recorded_requests.retain(|req| {
340                let key = format!("{} {}", req.method, req.path);
341                prioritized_endpoints.contains(&key)
342            });
343
344            debug!(
345                "Traffic-aware filtering: {} requests after filtering (from {} total)",
346                recorded_requests.len(),
347                max_requests
348            );
349        }
350
351        let mut changes = Vec::new();
352        let mut updated_count = 0;
353
354        for request in recorded_requests {
355            // Skip non-GET requests if configured
356            if sync_get_only && request.method.to_uppercase() != "GET" {
357                continue;
358            }
359
360            // Build full URL
361            let full_url =
362                if request.path.starts_with("http://") || request.path.starts_with("https://") {
363                    request.path.clone()
364                } else {
365                    format!("{}{}", upstream_url.trim_end_matches('/'), request.path)
366                };
367
368            // Replay the request to upstream
369            match Self::replay_to_upstream(
370                http_client,
371                &full_url,
372                &request.method,
373                &request.headers,
374                headers,
375            )
376            .await
377            {
378                Ok((status, response_headers, response_body)) => {
379                    // Get original exchange
380                    if let Ok(Some(exchange)) = database.get_exchange(&request.id).await {
381                        if let Some(original_response) = exchange.response {
382                            let original_headers = original_response.headers_map();
383                            let original_body =
384                                original_response.decoded_body().unwrap_or_default();
385
386                            // Compare responses
387                            let comparison = ResponseComparator::compare(
388                                original_response.status_code as i32,
389                                &original_headers,
390                                &original_body,
391                                status as i32,
392                                &response_headers,
393                                &response_body,
394                            );
395
396                            if !comparison.matches {
397                                debug!(
398                                    "Change detected for {} {}: {} differences",
399                                    request.method,
400                                    request.path,
401                                    comparison.differences.len()
402                                );
403
404                                // Create snapshot before updating fixture (Shadow Snapshot Mode)
405                                let snapshot_before = crate::sync_snapshots::SnapshotData {
406                                    status_code: original_response.status_code as u16,
407                                    headers: original_headers.clone(),
408                                    body: original_body.clone(),
409                                    body_json: serde_json::from_slice(&original_body).ok(),
410                                };
411
412                                let snapshot_after = crate::sync_snapshots::SnapshotData {
413                                    status_code: status as u16,
414                                    headers: response_headers.clone(),
415                                    body: response_body.clone(),
416                                    body_json: serde_json::from_slice(&response_body).ok(),
417                                };
418
419                                let snapshot = crate::sync_snapshots::SyncSnapshot::new(
420                                    request.path.clone(),
421                                    request.method.clone(),
422                                    sync_cycle_id.clone(),
423                                    snapshot_before,
424                                    snapshot_after,
425                                    comparison.clone(),
426                                    request.duration_ms.map(|d| d as u64),
427                                    None, // Response time after sync not available yet
428                                );
429
430                                // Store snapshot in database
431                                if let Err(e) = database.insert_sync_snapshot(&snapshot).await {
432                                    warn!(
433                                        "Failed to store snapshot for {} {}: {}",
434                                        request.method, request.path, e
435                                    );
436                                }
437
438                                let mut updated = false;
439                                if auto_update {
440                                    // Update the fixture with new response
441                                    match Self::update_fixture(
442                                        database,
443                                        &request.id,
444                                        status,
445                                        &response_headers,
446                                        &response_body,
447                                    )
448                                    .await
449                                    {
450                                        Ok(_) => {
451                                            updated = true;
452                                            updated_count += 1;
453                                            info!(
454                                                "Updated fixture for {} {}",
455                                                request.method, request.path
456                                            );
457                                        }
458                                        Err(e) => {
459                                            warn!(
460                                                "Failed to update fixture for {} {}: {}",
461                                                request.method, request.path, e
462                                            );
463                                        }
464                                    }
465                                }
466
467                                changes.push(DetectedChange {
468                                    request_id: request.id.clone(),
469                                    method: request.method.clone(),
470                                    path: request.path.clone(),
471                                    comparison,
472                                    updated,
473                                });
474                            }
475                        }
476                    }
477                }
478                Err(e) => {
479                    debug!(
480                        "Failed to replay {} {} to upstream: {}",
481                        request.method, request.path, e
482                    );
483                    // Continue with other requests
484                }
485            }
486        }
487
488        Ok((changes, updated_count))
489    }
490
491    /// Perform sync with GitOps integration
492    pub async fn sync_with_gitops(
493        &self,
494        gitops_handler: Option<&crate::sync_gitops::GitOpsSyncHandler>,
495    ) -> Result<(Vec<DetectedChange>, usize, Option<mockforge_core::pr_generation::PRResult>)> {
496        self.sync_with_gitops_and_drift(
497            gitops_handler,
498            None, // drift_evaluator
499        )
500        .await
501    }
502
503    /// Perform sync with GitOps and drift budget evaluation
504    pub async fn sync_with_gitops_and_drift(
505        &self,
506        gitops_handler: Option<&crate::sync_gitops::GitOpsSyncHandler>,
507        drift_evaluator: Option<&crate::sync_drift::SyncDriftEvaluator>,
508    ) -> Result<(Vec<DetectedChange>, usize, Option<mockforge_core::pr_generation::PRResult>)> {
509        let config = self.config.read().await.clone();
510        let upstream_url = config.upstream_url.ok_or_else(|| {
511            crate::RecorderError::InvalidFilter("No upstream_url configured".to_string())
512        })?;
513
514        {
515            let mut status = self.status.write().await;
516            status.is_running = true;
517        }
518
519        // Generate sync cycle ID
520        let sync_cycle_id = format!("sync_{}", Uuid::new_v4());
521
522        let traffic_analyzer = config
523            .traffic_aware
524            .as_ref()
525            .map(|ta_config| crate::sync_traffic::TrafficAnalyzer::new(ta_config.clone()));
526
527        let result = Self::sync_once(
528            &self.http_client,
529            &self.database,
530            &upstream_url,
531            false, // Don't auto-update when GitOps is enabled
532            config.max_requests_per_sync,
533            config.sync_get_only,
534            &config.headers,
535            traffic_analyzer.as_ref(),
536            None, // Continuum engine not available in sync_with_gitops yet
537        )
538        .await;
539
540        let (changes, _updated_count) = match &result {
541            Ok((c, u)) => (c.clone(), *u),
542            Err(_) => (Vec::new(), 0),
543        };
544
545        // Process changes with GitOps if enabled
546        let pr_result = if let Some(handler) = gitops_handler {
547            handler
548                .process_sync_changes(&self.database, &changes, &sync_cycle_id)
549                .await
550                .ok()
551                .flatten()
552        } else {
553            None
554        };
555
556        // Evaluate drift budgets and create incidents if enabled
557        if let Some(evaluator) = drift_evaluator {
558            if let Err(e) = evaluator
559                .evaluate_sync_changes(&changes, &sync_cycle_id, None, None, None)
560                .await
561            {
562                warn!("Failed to evaluate drift budgets for sync changes: {}", e);
563            }
564        }
565
566        {
567            let mut status = self.status.write().await;
568            status.is_running = false;
569            match &result {
570                Ok((changes, updated)) => {
571                    status.last_sync = Some(chrono::Utc::now());
572                    status.last_changes_detected = changes.len();
573                    status.last_fixtures_updated = *updated;
574                    status.last_error = None;
575                    status.total_syncs += 1;
576                }
577                Err(e) => {
578                    status.last_error = Some(e.to_string());
579                }
580            }
581        }
582
583        match result {
584            Ok((changes, updated)) => Ok((changes, updated, pr_result)),
585            Err(e) => Err(e),
586        }
587    }
588
589    /// Replay a request to the upstream URL
590    async fn replay_to_upstream(
591        http_client: &Client,
592        url: &str,
593        method: &str,
594        original_headers: &str,
595        additional_headers: &HashMap<String, String>,
596    ) -> Result<(u16, HashMap<String, String>, Vec<u8>)> {
597        // Parse original headers
598        let mut headers_map = HashMap::new();
599        if let Ok(json) = serde_json::from_str::<HashMap<String, String>>(original_headers) {
600            headers_map = json;
601        }
602
603        // Add additional headers (merge)
604        for (key, value) in additional_headers {
605            headers_map.insert(key.clone(), value.clone());
606        }
607
608        // Build request
609        let reqwest_method = match method.to_uppercase().as_str() {
610            "GET" => reqwest::Method::GET,
611            "POST" => reqwest::Method::POST,
612            "PUT" => reqwest::Method::PUT,
613            "DELETE" => reqwest::Method::DELETE,
614            "PATCH" => reqwest::Method::PATCH,
615            "HEAD" => reqwest::Method::HEAD,
616            "OPTIONS" => reqwest::Method::OPTIONS,
617            _ => {
618                return Err(crate::RecorderError::InvalidFilter(format!(
619                    "Unsupported method: {}",
620                    method
621                )))
622            }
623        };
624
625        let mut request_builder = http_client.request(reqwest_method, url);
626
627        // Add headers
628        for (key, value) in &headers_map {
629            if let Ok(header_name) = reqwest::header::HeaderName::from_bytes(key.as_bytes()) {
630                if let Ok(header_value) = reqwest::header::HeaderValue::from_str(value) {
631                    request_builder = request_builder.header(header_name, header_value);
632                }
633            }
634        }
635
636        // Execute request
637        let response = request_builder
638            .send()
639            .await
640            .map_err(|e| crate::RecorderError::InvalidFilter(format!("Request failed: {}", e)))?;
641
642        let status = response.status().as_u16();
643        let mut response_headers = HashMap::new();
644
645        for (key, value) in response.headers() {
646            if let Ok(value_str) = value.to_str() {
647                response_headers.insert(key.to_string(), value_str.to_string());
648            }
649        }
650
651        let response_body = response
652            .bytes()
653            .await
654            .map_err(|e| {
655                crate::RecorderError::InvalidFilter(format!("Failed to read response body: {}", e))
656            })?
657            .to_vec();
658
659        Ok((status, response_headers, response_body))
660    }
661
662    /// Update a fixture with new response data
663    async fn update_fixture(
664        database: &RecorderDatabase,
665        request_id: &str,
666        status_code: u16,
667        headers: &HashMap<String, String>,
668        body: &[u8],
669    ) -> Result<()> {
670        // Update the response in the database
671        let headers_json = serde_json::to_string(headers)?;
672        let body_encoded = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, body);
673        let body_size = body.len() as i64;
674
675        database
676            .update_response(
677                request_id,
678                status_code as i32,
679                &headers_json,
680                &body_encoded,
681                body_size,
682            )
683            .await?;
684
685        Ok(())
686    }
687
688    /// Get current sync status
689    pub async fn get_status(&self) -> SyncStatus {
690        self.status.read().await.clone()
691    }
692
693    /// Get sync configuration
694    pub async fn get_config(&self) -> SyncConfig {
695        self.config.read().await.clone()
696    }
697
698    /// Update sync configuration
699    pub async fn update_config(&self, new_config: SyncConfig) {
700        *self.config.write().await = new_config;
701    }
702
703    /// Manually trigger a sync
704    pub async fn sync_now(&self) -> Result<(Vec<DetectedChange>, usize)> {
705        let config = self.config.read().await.clone();
706        let upstream_url = config.upstream_url.ok_or_else(|| {
707            crate::RecorderError::InvalidFilter("No upstream_url configured".to_string())
708        })?;
709
710        {
711            let mut status = self.status.write().await;
712            status.is_running = true;
713        }
714
715        let traffic_analyzer = config
716            .traffic_aware
717            .as_ref()
718            .map(|ta_config| crate::sync_traffic::TrafficAnalyzer::new(ta_config.clone()));
719
720        let result = Self::sync_once(
721            &self.http_client,
722            &self.database,
723            &upstream_url,
724            config.auto_update,
725            config.max_requests_per_sync,
726            config.sync_get_only,
727            &config.headers,
728            traffic_analyzer.as_ref(),
729            None, // Continuum engine not available in sync_now yet
730        )
731        .await;
732
733        {
734            let mut status = self.status.write().await;
735            status.is_running = false;
736            match &result {
737                Ok((changes, updated)) => {
738                    status.last_sync = Some(chrono::Utc::now());
739                    status.last_changes_detected = changes.len();
740                    status.last_fixtures_updated = *updated;
741                    status.last_error = None;
742                    status.total_syncs += 1;
743                }
744                Err(e) => {
745                    status.last_error = Some(e.to_string());
746                }
747            }
748        }
749
750        result
751    }
752}
753
754#[cfg(test)]
755mod tests {
756    use super::*;
757
758    #[test]
759    fn test_sync_config_default() {
760        let config = SyncConfig::default();
761        assert!(!config.enabled);
762        assert_eq!(config.interval_seconds, 3600);
763        assert!(!config.auto_update);
764        assert_eq!(config.max_requests_per_sync, 100);
765    }
766
767    #[test]
768    fn test_sync_status_creation() {
769        let status = SyncStatus {
770            is_running: false,
771            last_sync: None,
772            last_changes_detected: 0,
773            last_fixtures_updated: 0,
774            last_error: None,
775            total_syncs: 0,
776        };
777
778        assert!(!status.is_running);
779        assert_eq!(status.total_syncs, 0);
780    }
781}