mockforge_recorder/
sync.rs

1//! Automatic sync/polling for detecting upstream API changes
2//!
3//! This module provides functionality to periodically poll upstream APIs,
4//! compare responses with recorded fixtures, and detect changes.
5
6use crate::{
7    database::RecorderDatabase,
8    diff::{ComparisonResult, ResponseComparator},
9    models::RecordedExchange,
10    Result,
11};
12use reqwest::Client;
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::RwLock;
18use tokio::time::{interval, MissedTickBehavior};
19use tracing::{debug, info, warn};
20
21/// Sync configuration
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct SyncConfig {
24    /// Whether sync is enabled
25    pub enabled: bool,
26    /// Upstream base URL to sync from
27    pub upstream_url: Option<String>,
28    /// Sync interval in seconds
29    pub interval_seconds: u64,
30    /// Whether to automatically update fixtures when changes detected
31    pub auto_update: bool,
32    /// Maximum number of requests to sync per interval
33    pub max_requests_per_sync: usize,
34    /// Timeout for sync requests in seconds
35    pub request_timeout_seconds: u64,
36    /// Headers to add to sync requests
37    pub headers: HashMap<String, String>,
38    /// Only sync GET requests (default: true)
39    #[serde(default = "default_true")]
40    pub sync_get_only: bool,
41}
42
43fn default_true() -> bool {
44    true
45}
46
47impl Default for SyncConfig {
48    fn default() -> Self {
49        Self {
50            enabled: false,
51            upstream_url: None,
52            interval_seconds: 3600, // 1 hour default
53            auto_update: false,
54            max_requests_per_sync: 100,
55            request_timeout_seconds: 30,
56            headers: HashMap::new(),
57            sync_get_only: true,
58        }
59    }
60}
61
62/// Sync status
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct SyncStatus {
65    /// Whether sync is currently running
66    pub is_running: bool,
67    /// Last sync timestamp
68    pub last_sync: Option<chrono::DateTime<chrono::Utc>>,
69    /// Number of changes detected in last sync
70    pub last_changes_detected: usize,
71    /// Number of fixtures updated in last sync
72    pub last_fixtures_updated: usize,
73    /// Last sync error (if any)
74    pub last_error: Option<String>,
75    /// Total syncs performed
76    pub total_syncs: u64,
77}
78
79/// Detected change in an API response
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct DetectedChange {
82    /// Request ID from database
83    pub request_id: String,
84    /// Request method
85    pub method: String,
86    /// Request path
87    pub path: String,
88    /// Comparison result
89    pub comparison: ComparisonResult,
90    /// Whether fixture was updated
91    pub updated: bool,
92}
93
94/// Sync service for polling upstream APIs and detecting changes
95pub struct SyncService {
96    config: Arc<RwLock<SyncConfig>>,
97    database: Arc<RecorderDatabase>,
98    status: Arc<RwLock<SyncStatus>>,
99    http_client: Client,
100}
101
102impl SyncService {
103    /// Create a new sync service
104    pub fn new(config: SyncConfig, database: Arc<RecorderDatabase>) -> Self {
105        let http_client = Client::builder()
106            .timeout(Duration::from_secs(config.request_timeout_seconds))
107            .build()
108            .expect("Failed to create HTTP client");
109
110        Self {
111            config: Arc::new(RwLock::new(config)),
112            database,
113            status: Arc::new(RwLock::new(SyncStatus {
114                is_running: false,
115                last_sync: None,
116                last_changes_detected: 0,
117                last_fixtures_updated: 0,
118                last_error: None,
119                total_syncs: 0,
120            })),
121            http_client,
122        }
123    }
124
125    /// Start the sync service (runs in background)
126    pub fn start(&self) -> tokio::task::JoinHandle<()> {
127        let config = Arc::clone(&self.config);
128        let database = Arc::clone(&self.database);
129        let status = Arc::clone(&self.status);
130        let http_client = self.http_client.clone();
131
132        tokio::spawn(async move {
133            let mut interval_timer =
134                interval(Duration::from_secs(config.read().await.interval_seconds));
135            interval_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
136
137            loop {
138                interval_timer.tick().await;
139
140                let config_guard = config.read().await;
141                if !config_guard.enabled {
142                    continue;
143                }
144
145                let upstream_url = match &config_guard.upstream_url {
146                    Some(url) => url.clone(),
147                    None => {
148                        warn!("Sync enabled but no upstream_url configured");
149                        continue;
150                    }
151                };
152
153                let auto_update = config_guard.auto_update;
154                let max_requests = config_guard.max_requests_per_sync;
155                let sync_get_only = config_guard.sync_get_only;
156                let headers = config_guard.headers.clone();
157                drop(config_guard);
158
159                // Update status
160                {
161                    let mut status_guard = status.write().await;
162                    status_guard.is_running = true;
163                }
164
165                info!("Starting automatic sync from upstream: {}", upstream_url);
166
167                match Self::sync_once(
168                    &http_client,
169                    &database,
170                    &upstream_url,
171                    auto_update,
172                    max_requests,
173                    sync_get_only,
174                    &headers,
175                )
176                .await
177                {
178                    Ok((changes, updated)) => {
179                        let mut status_guard = status.write().await;
180                        status_guard.is_running = false;
181                        status_guard.last_sync = Some(chrono::Utc::now());
182                        status_guard.last_changes_detected = changes.len();
183                        status_guard.last_fixtures_updated = updated;
184                        status_guard.last_error = None;
185                        status_guard.total_syncs += 1;
186
187                        if !changes.is_empty() {
188                            info!(
189                                "Sync complete: {} changes detected, {} fixtures updated",
190                                changes.len(),
191                                updated
192                            );
193                        } else {
194                            debug!("Sync complete: No changes detected");
195                        }
196                    }
197                    Err(e) => {
198                        let mut status_guard = status.write().await;
199                        status_guard.is_running = false;
200                        status_guard.last_error = Some(e.to_string());
201                        warn!("Sync failed: {}", e);
202                    }
203                }
204            }
205        })
206    }
207
208    /// Perform a single sync operation
209    async fn sync_once(
210        http_client: &Client,
211        database: &RecorderDatabase,
212        upstream_url: &str,
213        auto_update: bool,
214        max_requests: usize,
215        sync_get_only: bool,
216        headers: &HashMap<String, String>,
217    ) -> Result<(Vec<DetectedChange>, usize)> {
218        // Get recent recorded requests
219        let recorded_requests = database.list_recent(max_requests as i32).await?;
220
221        let mut changes = Vec::new();
222        let mut updated_count = 0;
223
224        for request in recorded_requests {
225            // Skip non-GET requests if configured
226            if sync_get_only && request.method.to_uppercase() != "GET" {
227                continue;
228            }
229
230            // Build full URL
231            let full_url =
232                if request.path.starts_with("http://") || request.path.starts_with("https://") {
233                    request.path.clone()
234                } else {
235                    format!("{}{}", upstream_url.trim_end_matches('/'), request.path)
236                };
237
238            // Replay the request to upstream
239            match Self::replay_to_upstream(
240                http_client,
241                &full_url,
242                &request.method,
243                &request.headers,
244                headers,
245            )
246            .await
247            {
248                Ok((status, response_headers, response_body)) => {
249                    // Get original exchange
250                    if let Ok(Some(exchange)) = database.get_exchange(&request.id).await {
251                        if let Some(original_response) = exchange.response {
252                            let original_headers = original_response.headers_map();
253                            let original_body =
254                                original_response.decoded_body().unwrap_or_default();
255
256                            // Compare responses
257                            let comparison = ResponseComparator::compare(
258                                original_response.status_code as i32,
259                                &original_headers,
260                                &original_body,
261                                status as i32,
262                                &response_headers,
263                                &response_body,
264                            );
265
266                            if !comparison.matches {
267                                debug!(
268                                    "Change detected for {} {}: {} differences",
269                                    request.method,
270                                    request.path,
271                                    comparison.differences.len()
272                                );
273
274                                let mut updated = false;
275                                if auto_update {
276                                    // Update the fixture with new response
277                                    match Self::update_fixture(
278                                        database,
279                                        &request.id,
280                                        status,
281                                        &response_headers,
282                                        &response_body,
283                                    )
284                                    .await
285                                    {
286                                        Ok(_) => {
287                                            updated = true;
288                                            updated_count += 1;
289                                            info!(
290                                                "Updated fixture for {} {}",
291                                                request.method, request.path
292                                            );
293                                        }
294                                        Err(e) => {
295                                            warn!(
296                                                "Failed to update fixture for {} {}: {}",
297                                                request.method, request.path, e
298                                            );
299                                        }
300                                    }
301                                }
302
303                                changes.push(DetectedChange {
304                                    request_id: request.id.clone(),
305                                    method: request.method.clone(),
306                                    path: request.path.clone(),
307                                    comparison,
308                                    updated,
309                                });
310                            }
311                        }
312                    }
313                }
314                Err(e) => {
315                    debug!(
316                        "Failed to replay {} {} to upstream: {}",
317                        request.method, request.path, e
318                    );
319                    // Continue with other requests
320                }
321            }
322        }
323
324        Ok((changes, updated_count))
325    }
326
327    /// Replay a request to the upstream URL
328    async fn replay_to_upstream(
329        http_client: &Client,
330        url: &str,
331        method: &str,
332        original_headers: &str,
333        additional_headers: &HashMap<String, String>,
334    ) -> Result<(u16, HashMap<String, String>, Vec<u8>)> {
335        // Parse original headers
336        let mut headers_map = HashMap::new();
337        if let Ok(json) = serde_json::from_str::<HashMap<String, String>>(original_headers) {
338            headers_map = json;
339        }
340
341        // Add additional headers (merge)
342        for (key, value) in additional_headers {
343            headers_map.insert(key.clone(), value.clone());
344        }
345
346        // Build request
347        let reqwest_method = match method.to_uppercase().as_str() {
348            "GET" => reqwest::Method::GET,
349            "POST" => reqwest::Method::POST,
350            "PUT" => reqwest::Method::PUT,
351            "DELETE" => reqwest::Method::DELETE,
352            "PATCH" => reqwest::Method::PATCH,
353            "HEAD" => reqwest::Method::HEAD,
354            "OPTIONS" => reqwest::Method::OPTIONS,
355            _ => {
356                return Err(crate::RecorderError::InvalidFilter(format!(
357                    "Unsupported method: {}",
358                    method
359                )))
360            }
361        };
362
363        let mut request_builder = http_client.request(reqwest_method, url);
364
365        // Add headers
366        for (key, value) in &headers_map {
367            if let Ok(header_name) = reqwest::header::HeaderName::from_bytes(key.as_bytes()) {
368                if let Ok(header_value) = reqwest::header::HeaderValue::from_str(value) {
369                    request_builder = request_builder.header(header_name, header_value);
370                }
371            }
372        }
373
374        // Execute request
375        let response = request_builder
376            .send()
377            .await
378            .map_err(|e| crate::RecorderError::InvalidFilter(format!("Request failed: {}", e)))?;
379
380        let status = response.status().as_u16();
381        let mut response_headers = HashMap::new();
382
383        for (key, value) in response.headers() {
384            if let Ok(value_str) = value.to_str() {
385                response_headers.insert(key.to_string(), value_str.to_string());
386            }
387        }
388
389        let response_body = response
390            .bytes()
391            .await
392            .map_err(|e| {
393                crate::RecorderError::InvalidFilter(format!("Failed to read response body: {}", e))
394            })?
395            .to_vec();
396
397        Ok((status, response_headers, response_body))
398    }
399
400    /// Update a fixture with new response data
401    async fn update_fixture(
402        database: &RecorderDatabase,
403        request_id: &str,
404        status_code: u16,
405        headers: &HashMap<String, String>,
406        body: &[u8],
407    ) -> Result<()> {
408        // Update the response in the database
409        let headers_json = serde_json::to_string(headers)?;
410        let body_encoded = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, body);
411        let body_size = body.len() as i64;
412
413        database
414            .update_response(
415                request_id,
416                status_code as i32,
417                &headers_json,
418                &body_encoded,
419                body_size,
420            )
421            .await?;
422
423        Ok(())
424    }
425
426    /// Get current sync status
427    pub async fn get_status(&self) -> SyncStatus {
428        self.status.read().await.clone()
429    }
430
431    /// Get sync configuration
432    pub async fn get_config(&self) -> SyncConfig {
433        self.config.read().await.clone()
434    }
435
436    /// Update sync configuration
437    pub async fn update_config(&self, new_config: SyncConfig) {
438        *self.config.write().await = new_config;
439    }
440
441    /// Manually trigger a sync
442    pub async fn sync_now(&self) -> Result<(Vec<DetectedChange>, usize)> {
443        let config = self.config.read().await.clone();
444        let upstream_url = config.upstream_url.ok_or_else(|| {
445            crate::RecorderError::InvalidFilter("No upstream_url configured".to_string())
446        })?;
447
448        {
449            let mut status = self.status.write().await;
450            status.is_running = true;
451        }
452
453        let result = Self::sync_once(
454            &self.http_client,
455            &self.database,
456            &upstream_url,
457            config.auto_update,
458            config.max_requests_per_sync,
459            config.sync_get_only,
460            &config.headers,
461        )
462        .await;
463
464        {
465            let mut status = self.status.write().await;
466            status.is_running = false;
467            match &result {
468                Ok((changes, updated)) => {
469                    status.last_sync = Some(chrono::Utc::now());
470                    status.last_changes_detected = changes.len();
471                    status.last_fixtures_updated = *updated;
472                    status.last_error = None;
473                    status.total_syncs += 1;
474                }
475                Err(e) => {
476                    status.last_error = Some(e.to_string());
477                }
478            }
479        }
480
481        result
482    }
483}
484
485#[cfg(test)]
486mod tests {
487    use super::*;
488
489    #[test]
490    fn test_sync_config_default() {
491        let config = SyncConfig::default();
492        assert!(!config.enabled);
493        assert_eq!(config.interval_seconds, 3600);
494        assert!(!config.auto_update);
495        assert_eq!(config.max_requests_per_sync, 100);
496    }
497
498    #[test]
499    fn test_sync_status_creation() {
500        let status = SyncStatus {
501            is_running: false,
502            last_sync: None,
503            last_changes_detected: 0,
504            last_fixtures_updated: 0,
505            last_error: None,
506            total_syncs: 0,
507        };
508
509        assert!(!status.is_running);
510        assert_eq!(status.total_syncs, 0);
511    }
512}