Skip to main content

mockforge_recorder/
sync.rs

1//! Automatic sync/polling for detecting upstream API changes
2//!
3//! This module provides functionality to periodically poll upstream APIs,
4//! compare responses with recorded fixtures, and detect changes.
5
6use crate::{
7    database::RecorderDatabase,
8    diff::{ComparisonResult, ResponseComparator},
9    Result,
10};
11use reqwest::Client;
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::RwLock;
17use tokio::time::{interval, MissedTickBehavior};
18use tracing::{debug, info, warn};
19use uuid::Uuid;
20
21/// GitOps configuration for sync
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct GitOpsConfig {
24    /// Whether GitOps mode is enabled
25    pub enabled: bool,
26    /// PR provider (GitHub or GitLab)
27    pub pr_provider: String, // "github" or "gitlab"
28    /// Repository owner/org
29    pub repo_owner: String,
30    /// Repository name
31    pub repo_name: String,
32    /// Base branch (default: main)
33    #[serde(default = "default_main_branch")]
34    pub base_branch: String,
35    /// Whether to update fixture files
36    #[serde(default = "default_true")]
37    pub update_fixtures: bool,
38    /// Whether to regenerate SDKs
39    #[serde(default)]
40    pub regenerate_sdks: bool,
41    /// Whether to update OpenAPI specs
42    #[serde(default = "default_true")]
43    pub update_docs: bool,
44    /// Whether to auto-merge PRs
45    #[serde(default)]
46    pub auto_merge: bool,
47    /// Authentication token (GitHub PAT or GitLab token)
48    #[serde(skip_serializing)]
49    pub token: Option<String>,
50}
51
52fn default_main_branch() -> String {
53    "main".to_string()
54}
55
56/// Traffic-aware sync configuration
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct TrafficAwareConfig {
59    /// Whether traffic-aware sync is enabled
60    pub enabled: bool,
61    /// Minimum request count threshold (only sync if > N requests)
62    pub min_requests_threshold: Option<usize>,
63    /// Top percentage threshold (sync top X% of endpoints)
64    pub top_percentage: Option<f64>,
65    /// Lookback window in days for usage statistics
66    #[serde(default = "default_lookback_days")]
67    pub lookback_days: u64,
68    /// Whether to sync endpoints with high reality ratio (mostly real)
69    #[serde(default)]
70    pub sync_real_endpoints: bool,
71    /// Weight for request count in priority calculation
72    #[serde(default = "default_count_weight")]
73    pub weight_count: f64,
74    /// Weight for recency in priority calculation
75    #[serde(default = "default_recency_weight")]
76    pub weight_recency: f64,
77    /// Weight for reality ratio in priority calculation
78    #[serde(default = "default_reality_weight")]
79    pub weight_reality: f64,
80}
81
82fn default_lookback_days() -> u64 {
83    7
84}
85
86fn default_count_weight() -> f64 {
87    1.0
88}
89
90fn default_recency_weight() -> f64 {
91    0.5
92}
93
94fn default_reality_weight() -> f64 {
95    -0.3
96}
97
98/// Sync configuration
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct SyncConfig {
101    /// Whether sync is enabled
102    pub enabled: bool,
103    /// Upstream base URL to sync from
104    pub upstream_url: Option<String>,
105    /// Sync interval in seconds
106    pub interval_seconds: u64,
107    /// Whether to automatically update fixtures when changes detected
108    pub auto_update: bool,
109    /// Maximum number of requests to sync per interval
110    pub max_requests_per_sync: usize,
111    /// Timeout for sync requests in seconds
112    pub request_timeout_seconds: u64,
113    /// Headers to add to sync requests
114    pub headers: HashMap<String, String>,
115    /// Only sync GET requests (default: true)
116    #[serde(default = "default_true")]
117    pub sync_get_only: bool,
118    /// GitOps configuration (optional)
119    pub gitops_mode: Option<GitOpsConfig>,
120    /// Traffic-aware sync configuration (optional)
121    pub traffic_aware: Option<TrafficAwareConfig>,
122}
123
124fn default_true() -> bool {
125    true
126}
127
128impl Default for SyncConfig {
129    fn default() -> Self {
130        Self {
131            enabled: false,
132            upstream_url: None,
133            interval_seconds: 3600, // 1 hour default
134            auto_update: false,
135            max_requests_per_sync: 100,
136            request_timeout_seconds: 30,
137            headers: HashMap::new(),
138            sync_get_only: true,
139            gitops_mode: None,
140            traffic_aware: None,
141        }
142    }
143}
144
145/// Sync status
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct SyncStatus {
148    /// Whether sync is currently running
149    pub is_running: bool,
150    /// Last sync timestamp
151    pub last_sync: Option<chrono::DateTime<chrono::Utc>>,
152    /// Number of changes detected in last sync
153    pub last_changes_detected: usize,
154    /// Number of fixtures updated in last sync
155    pub last_fixtures_updated: usize,
156    /// Last sync error (if any)
157    pub last_error: Option<String>,
158    /// Total syncs performed
159    pub total_syncs: u64,
160}
161
162/// Detected change in an API response
163#[derive(Debug, Clone, Serialize, Deserialize)]
164pub struct DetectedChange {
165    /// Request ID from database
166    pub request_id: String,
167    /// Request method
168    pub method: String,
169    /// Request path
170    pub path: String,
171    /// Comparison result
172    pub comparison: ComparisonResult,
173    /// Whether fixture was updated
174    pub updated: bool,
175}
176
177/// Sync service for polling upstream APIs and detecting changes
178pub struct SyncService {
179    config: Arc<RwLock<SyncConfig>>,
180    database: Arc<RecorderDatabase>,
181    status: Arc<RwLock<SyncStatus>>,
182    http_client: Client,
183}
184
185impl SyncService {
186    /// Create a new sync service
187    pub fn new(config: SyncConfig, database: Arc<RecorderDatabase>) -> Self {
188        let http_client = Client::builder()
189            .timeout(Duration::from_secs(config.request_timeout_seconds))
190            .build()
191            .expect("Failed to create HTTP client");
192
193        Self {
194            config: Arc::new(RwLock::new(config)),
195            database,
196            status: Arc::new(RwLock::new(SyncStatus {
197                is_running: false,
198                last_sync: None,
199                last_changes_detected: 0,
200                last_fixtures_updated: 0,
201                last_error: None,
202                total_syncs: 0,
203            })),
204            http_client,
205        }
206    }
207
208    /// Start the sync service (runs in background)
209    pub fn start(&self) -> tokio::task::JoinHandle<()> {
210        let config = Arc::clone(&self.config);
211        let database = Arc::clone(&self.database);
212        let status = Arc::clone(&self.status);
213        let http_client = self.http_client.clone();
214
215        tokio::spawn(async move {
216            let mut interval_timer =
217                interval(Duration::from_secs(config.read().await.interval_seconds));
218            interval_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
219
220            loop {
221                interval_timer.tick().await;
222
223                let config_guard = config.read().await;
224                if !config_guard.enabled {
225                    continue;
226                }
227
228                let upstream_url = match &config_guard.upstream_url {
229                    Some(url) => url.clone(),
230                    None => {
231                        warn!("Sync enabled but no upstream_url configured");
232                        continue;
233                    }
234                };
235
236                let auto_update = config_guard.auto_update;
237                let max_requests = config_guard.max_requests_per_sync;
238                let sync_get_only = config_guard.sync_get_only;
239                let headers = config_guard.headers.clone();
240                drop(config_guard);
241
242                // Update status
243                {
244                    let mut status_guard = status.write().await;
245                    status_guard.is_running = true;
246                }
247
248                info!("Starting automatic sync from upstream: {}", upstream_url);
249
250                let config_guard = config.read().await;
251                let traffic_analyzer = config_guard
252                    .traffic_aware
253                    .as_ref()
254                    .map(|ta_config| crate::sync_traffic::TrafficAnalyzer::new(ta_config.clone()));
255                drop(config_guard);
256
257                match Self::sync_once(
258                    &http_client,
259                    &database,
260                    &upstream_url,
261                    auto_update,
262                    max_requests,
263                    sync_get_only,
264                    &headers,
265                    traffic_analyzer.as_ref(),
266                    None, // Continuum engine not available in background sync yet
267                )
268                .await
269                {
270                    Ok((changes, updated)) => {
271                        let mut status_guard = status.write().await;
272                        status_guard.is_running = false;
273                        status_guard.last_sync = Some(chrono::Utc::now());
274                        status_guard.last_changes_detected = changes.len();
275                        status_guard.last_fixtures_updated = updated;
276                        status_guard.last_error = None;
277                        status_guard.total_syncs += 1;
278
279                        if !changes.is_empty() {
280                            info!(
281                                "Sync complete: {} changes detected, {} fixtures updated",
282                                changes.len(),
283                                updated
284                            );
285                        } else {
286                            debug!("Sync complete: No changes detected");
287                        }
288                    }
289                    Err(e) => {
290                        let mut status_guard = status.write().await;
291                        status_guard.is_running = false;
292                        status_guard.last_error = Some(e.to_string());
293                        warn!("Sync failed: {}", e);
294                    }
295                }
296            }
297        })
298    }
299
300    /// Perform a single sync operation
301    #[allow(clippy::too_many_arguments)]
302    async fn sync_once(
303        http_client: &Client,
304        database: &RecorderDatabase,
305        upstream_url: &str,
306        auto_update: bool,
307        max_requests: usize,
308        sync_get_only: bool,
309        headers: &HashMap<String, String>,
310        traffic_analyzer: Option<&crate::sync_traffic::TrafficAnalyzer>,
311        continuum_engine: Option<
312            &mockforge_core::reality_continuum::engine::RealityContinuumEngine,
313        >,
314    ) -> Result<(Vec<DetectedChange>, usize)> {
315        // Generate sync cycle ID for grouping snapshots from this sync operation
316        let sync_cycle_id = format!("sync_{}", Uuid::new_v4());
317
318        // Get recent recorded requests
319        let mut recorded_requests = database.list_recent(max_requests as i32).await?;
320
321        // Apply traffic-aware filtering if enabled
322        if let Some(analyzer) = traffic_analyzer {
323            // Aggregate usage stats from database
324            let usage_stats = analyzer.aggregate_usage_stats_from_db(database).await;
325
326            // Get endpoint list for reality ratio lookup
327            let endpoints: Vec<(&str, &str)> =
328                recorded_requests.iter().map(|r| (r.method.as_str(), r.path.as_str())).collect();
329
330            // Get reality ratios
331            let reality_ratios = analyzer.get_reality_ratios(&endpoints, continuum_engine).await;
332
333            // Calculate priorities
334            let priorities = analyzer.calculate_priorities(&usage_stats, &reality_ratios);
335
336            // Filter requests based on priorities
337            let prioritized_endpoints: std::collections::HashSet<String> =
338                priorities.iter().map(|p| format!("{} {}", p.method, p.endpoint)).collect();
339
340            recorded_requests.retain(|req| {
341                let key = format!("{} {}", req.method, req.path);
342                prioritized_endpoints.contains(&key)
343            });
344
345            debug!(
346                "Traffic-aware filtering: {} requests after filtering (from {} total)",
347                recorded_requests.len(),
348                max_requests
349            );
350        }
351
352        let mut changes = Vec::new();
353        let mut updated_count = 0;
354
355        for request in recorded_requests {
356            // Skip non-GET requests if configured
357            if sync_get_only && request.method.to_uppercase() != "GET" {
358                continue;
359            }
360
361            // Build full URL
362            let full_url =
363                if request.path.starts_with("http://") || request.path.starts_with("https://") {
364                    request.path.clone()
365                } else {
366                    format!("{}{}", upstream_url.trim_end_matches('/'), request.path)
367                };
368
369            // Replay the request to upstream
370            match Self::replay_to_upstream(
371                http_client,
372                &full_url,
373                &request.method,
374                &request.headers,
375                headers,
376            )
377            .await
378            {
379                Ok((status, response_headers, response_body)) => {
380                    // Get original exchange
381                    if let Ok(Some(exchange)) = database.get_exchange(&request.id).await {
382                        if let Some(original_response) = exchange.response {
383                            let original_headers = original_response.headers_map();
384                            let original_body =
385                                original_response.decoded_body().unwrap_or_default();
386
387                            // Compare responses
388                            let comparison = ResponseComparator::compare(
389                                original_response.status_code,
390                                &original_headers,
391                                &original_body,
392                                status as i32,
393                                &response_headers,
394                                &response_body,
395                            );
396
397                            if !comparison.matches {
398                                debug!(
399                                    "Change detected for {} {}: {} differences",
400                                    request.method,
401                                    request.path,
402                                    comparison.differences.len()
403                                );
404
405                                // Create snapshot before updating fixture (Shadow Snapshot Mode)
406                                let snapshot_before = crate::sync_snapshots::SnapshotData {
407                                    status_code: original_response.status_code as u16,
408                                    headers: original_headers.clone(),
409                                    body: original_body.clone(),
410                                    body_json: serde_json::from_slice(&original_body).ok(),
411                                };
412
413                                let snapshot_after = crate::sync_snapshots::SnapshotData {
414                                    status_code: status,
415                                    headers: response_headers.clone(),
416                                    body: response_body.clone(),
417                                    body_json: serde_json::from_slice(&response_body).ok(),
418                                };
419
420                                let snapshot = crate::sync_snapshots::SyncSnapshot::new(
421                                    request.path.clone(),
422                                    request.method.clone(),
423                                    sync_cycle_id.clone(),
424                                    snapshot_before,
425                                    snapshot_after,
426                                    comparison.clone(),
427                                    request.duration_ms.map(|d| d as u64),
428                                    None, // Response time after sync not available yet
429                                );
430
431                                // Store snapshot in database
432                                if let Err(e) = database.insert_sync_snapshot(&snapshot).await {
433                                    warn!(
434                                        "Failed to store snapshot for {} {}: {}",
435                                        request.method, request.path, e
436                                    );
437                                }
438
439                                let mut updated = false;
440                                if auto_update {
441                                    // Update the fixture with new response
442                                    match Self::update_fixture(
443                                        database,
444                                        &request.id,
445                                        status,
446                                        &response_headers,
447                                        &response_body,
448                                    )
449                                    .await
450                                    {
451                                        Ok(_) => {
452                                            updated = true;
453                                            updated_count += 1;
454                                            info!(
455                                                "Updated fixture for {} {}",
456                                                request.method, request.path
457                                            );
458                                        }
459                                        Err(e) => {
460                                            warn!(
461                                                "Failed to update fixture for {} {}: {}",
462                                                request.method, request.path, e
463                                            );
464                                        }
465                                    }
466                                }
467
468                                changes.push(DetectedChange {
469                                    request_id: request.id.clone(),
470                                    method: request.method.clone(),
471                                    path: request.path.clone(),
472                                    comparison,
473                                    updated,
474                                });
475                            }
476                        }
477                    }
478                }
479                Err(e) => {
480                    debug!(
481                        "Failed to replay {} {} to upstream: {}",
482                        request.method, request.path, e
483                    );
484                    // Continue with other requests
485                }
486            }
487        }
488
489        Ok((changes, updated_count))
490    }
491
492    /// Perform sync with GitOps integration
493    pub async fn sync_with_gitops(
494        &self,
495        gitops_handler: Option<&crate::sync_gitops::GitOpsSyncHandler>,
496    ) -> Result<(Vec<DetectedChange>, usize, Option<mockforge_core::pr_generation::PRResult>)> {
497        self.sync_with_gitops_and_drift(
498            gitops_handler,
499            None, // drift_evaluator
500        )
501        .await
502    }
503
504    /// Perform sync with GitOps and drift budget evaluation
505    pub async fn sync_with_gitops_and_drift(
506        &self,
507        gitops_handler: Option<&crate::sync_gitops::GitOpsSyncHandler>,
508        drift_evaluator: Option<&crate::sync_drift::SyncDriftEvaluator>,
509    ) -> Result<(Vec<DetectedChange>, usize, Option<mockforge_core::pr_generation::PRResult>)> {
510        let config = self.config.read().await.clone();
511        let upstream_url = config.upstream_url.ok_or_else(|| {
512            crate::RecorderError::InvalidFilter("No upstream_url configured".to_string())
513        })?;
514
515        {
516            let mut status = self.status.write().await;
517            status.is_running = true;
518        }
519
520        // Generate sync cycle ID
521        let sync_cycle_id = format!("sync_{}", Uuid::new_v4());
522
523        let traffic_analyzer = config
524            .traffic_aware
525            .as_ref()
526            .map(|ta_config| crate::sync_traffic::TrafficAnalyzer::new(ta_config.clone()));
527
528        let result = Self::sync_once(
529            &self.http_client,
530            &self.database,
531            &upstream_url,
532            false, // Don't auto-update when GitOps is enabled
533            config.max_requests_per_sync,
534            config.sync_get_only,
535            &config.headers,
536            traffic_analyzer.as_ref(),
537            None, // Continuum engine not available in sync_with_gitops yet
538        )
539        .await;
540
541        let (changes, _updated_count) = match &result {
542            Ok((c, u)) => (c.clone(), *u),
543            Err(_) => (Vec::new(), 0),
544        };
545
546        // Process changes with GitOps if enabled
547        let pr_result = if let Some(handler) = gitops_handler {
548            handler
549                .process_sync_changes(&self.database, &changes, &sync_cycle_id)
550                .await
551                .ok()
552                .flatten()
553        } else {
554            None
555        };
556
557        // Evaluate drift budgets and create incidents if enabled
558        if let Some(evaluator) = drift_evaluator {
559            if let Err(e) = evaluator
560                .evaluate_sync_changes(&changes, &sync_cycle_id, None, None, None)
561                .await
562            {
563                warn!("Failed to evaluate drift budgets for sync changes: {}", e);
564            }
565        }
566
567        {
568            let mut status = self.status.write().await;
569            status.is_running = false;
570            match &result {
571                Ok((changes, updated)) => {
572                    status.last_sync = Some(chrono::Utc::now());
573                    status.last_changes_detected = changes.len();
574                    status.last_fixtures_updated = *updated;
575                    status.last_error = None;
576                    status.total_syncs += 1;
577                }
578                Err(e) => {
579                    status.last_error = Some(e.to_string());
580                }
581            }
582        }
583
584        match result {
585            Ok((changes, updated)) => Ok((changes, updated, pr_result)),
586            Err(e) => Err(e),
587        }
588    }
589
590    /// Replay a request to the upstream URL
591    async fn replay_to_upstream(
592        http_client: &Client,
593        url: &str,
594        method: &str,
595        original_headers: &str,
596        additional_headers: &HashMap<String, String>,
597    ) -> Result<(u16, HashMap<String, String>, Vec<u8>)> {
598        // Parse original headers
599        let mut headers_map = HashMap::new();
600        if let Ok(json) = serde_json::from_str::<HashMap<String, String>>(original_headers) {
601            headers_map = json;
602        }
603
604        // Add additional headers (merge)
605        for (key, value) in additional_headers {
606            headers_map.insert(key.clone(), value.clone());
607        }
608
609        // Build request
610        let reqwest_method = match method.to_uppercase().as_str() {
611            "GET" => reqwest::Method::GET,
612            "POST" => reqwest::Method::POST,
613            "PUT" => reqwest::Method::PUT,
614            "DELETE" => reqwest::Method::DELETE,
615            "PATCH" => reqwest::Method::PATCH,
616            "HEAD" => reqwest::Method::HEAD,
617            "OPTIONS" => reqwest::Method::OPTIONS,
618            _ => {
619                return Err(crate::RecorderError::InvalidFilter(format!(
620                    "Unsupported method: {}",
621                    method
622                )))
623            }
624        };
625
626        let mut request_builder = http_client.request(reqwest_method, url);
627
628        // Add headers
629        for (key, value) in &headers_map {
630            if let Ok(header_name) = reqwest::header::HeaderName::from_bytes(key.as_bytes()) {
631                if let Ok(header_value) = reqwest::header::HeaderValue::from_str(value) {
632                    request_builder = request_builder.header(header_name, header_value);
633                }
634            }
635        }
636
637        // Execute request
638        let response = request_builder
639            .send()
640            .await
641            .map_err(|e| crate::RecorderError::InvalidFilter(format!("Request failed: {}", e)))?;
642
643        let status = response.status().as_u16();
644        let mut response_headers = HashMap::new();
645
646        for (key, value) in response.headers() {
647            if let Ok(value_str) = value.to_str() {
648                response_headers.insert(key.to_string(), value_str.to_string());
649            }
650        }
651
652        let response_body = response
653            .bytes()
654            .await
655            .map_err(|e| {
656                crate::RecorderError::InvalidFilter(format!("Failed to read response body: {}", e))
657            })?
658            .to_vec();
659
660        Ok((status, response_headers, response_body))
661    }
662
663    /// Update a fixture with new response data
664    async fn update_fixture(
665        database: &RecorderDatabase,
666        request_id: &str,
667        status_code: u16,
668        headers: &HashMap<String, String>,
669        body: &[u8],
670    ) -> Result<()> {
671        // Update the response in the database
672        let headers_json = serde_json::to_string(headers)?;
673        let body_encoded = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, body);
674        let body_size = body.len() as i64;
675
676        database
677            .update_response(
678                request_id,
679                status_code as i32,
680                &headers_json,
681                &body_encoded,
682                body_size,
683            )
684            .await?;
685
686        Ok(())
687    }
688
689    /// Get current sync status
690    pub async fn get_status(&self) -> SyncStatus {
691        self.status.read().await.clone()
692    }
693
694    /// Get sync configuration
695    pub async fn get_config(&self) -> SyncConfig {
696        self.config.read().await.clone()
697    }
698
699    /// Update sync configuration
700    pub async fn update_config(&self, new_config: SyncConfig) {
701        *self.config.write().await = new_config;
702    }
703
704    /// Manually trigger a sync
705    pub async fn sync_now(&self) -> Result<(Vec<DetectedChange>, usize)> {
706        let config = self.config.read().await.clone();
707        let upstream_url = config.upstream_url.ok_or_else(|| {
708            crate::RecorderError::InvalidFilter("No upstream_url configured".to_string())
709        })?;
710
711        {
712            let mut status = self.status.write().await;
713            status.is_running = true;
714        }
715
716        let traffic_analyzer = config
717            .traffic_aware
718            .as_ref()
719            .map(|ta_config| crate::sync_traffic::TrafficAnalyzer::new(ta_config.clone()));
720
721        let result = Self::sync_once(
722            &self.http_client,
723            &self.database,
724            &upstream_url,
725            config.auto_update,
726            config.max_requests_per_sync,
727            config.sync_get_only,
728            &config.headers,
729            traffic_analyzer.as_ref(),
730            None, // Continuum engine not available in sync_now yet
731        )
732        .await;
733
734        {
735            let mut status = self.status.write().await;
736            status.is_running = false;
737            match &result {
738                Ok((changes, updated)) => {
739                    status.last_sync = Some(chrono::Utc::now());
740                    status.last_changes_detected = changes.len();
741                    status.last_fixtures_updated = *updated;
742                    status.last_error = None;
743                    status.total_syncs += 1;
744                }
745                Err(e) => {
746                    status.last_error = Some(e.to_string());
747                }
748            }
749        }
750
751        result
752    }
753}
754
755#[cfg(test)]
756mod tests {
757    use super::*;
758
759    #[test]
760    fn test_sync_config_default() {
761        let config = SyncConfig::default();
762        assert!(!config.enabled);
763        assert_eq!(config.interval_seconds, 3600);
764        assert!(!config.auto_update);
765        assert_eq!(config.max_requests_per_sync, 100);
766    }
767
768    #[test]
769    fn test_sync_status_creation() {
770        let status = SyncStatus {
771            is_running: false,
772            last_sync: None,
773            last_changes_detected: 0,
774            last_fixtures_updated: 0,
775            last_error: None,
776            total_syncs: 0,
777        };
778
779        assert!(!status.is_running);
780        assert_eq!(status.total_syncs, 0);
781    }
782}