1use crate::{
7 database::RecorderDatabase,
8 diff::{ComparisonResult, ResponseComparator},
9 models::RecordedExchange,
10 Result,
11};
12use reqwest::Client;
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::RwLock;
18use tokio::time::{interval, MissedTickBehavior};
19use tracing::{debug, info, warn};
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct SyncConfig {
24 pub enabled: bool,
26 pub upstream_url: Option<String>,
28 pub interval_seconds: u64,
30 pub auto_update: bool,
32 pub max_requests_per_sync: usize,
34 pub request_timeout_seconds: u64,
36 pub headers: HashMap<String, String>,
38 #[serde(default = "default_true")]
40 pub sync_get_only: bool,
41}
42
43fn default_true() -> bool {
44 true
45}
46
47impl Default for SyncConfig {
48 fn default() -> Self {
49 Self {
50 enabled: false,
51 upstream_url: None,
52 interval_seconds: 3600, auto_update: false,
54 max_requests_per_sync: 100,
55 request_timeout_seconds: 30,
56 headers: HashMap::new(),
57 sync_get_only: true,
58 }
59 }
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct SyncStatus {
65 pub is_running: bool,
67 pub last_sync: Option<chrono::DateTime<chrono::Utc>>,
69 pub last_changes_detected: usize,
71 pub last_fixtures_updated: usize,
73 pub last_error: Option<String>,
75 pub total_syncs: u64,
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct DetectedChange {
82 pub request_id: String,
84 pub method: String,
86 pub path: String,
88 pub comparison: ComparisonResult,
90 pub updated: bool,
92}
93
94pub struct SyncService {
96 config: Arc<RwLock<SyncConfig>>,
97 database: Arc<RecorderDatabase>,
98 status: Arc<RwLock<SyncStatus>>,
99 http_client: Client,
100}
101
102impl SyncService {
103 pub fn new(config: SyncConfig, database: Arc<RecorderDatabase>) -> Self {
105 let http_client = Client::builder()
106 .timeout(Duration::from_secs(config.request_timeout_seconds))
107 .build()
108 .expect("Failed to create HTTP client");
109
110 Self {
111 config: Arc::new(RwLock::new(config)),
112 database,
113 status: Arc::new(RwLock::new(SyncStatus {
114 is_running: false,
115 last_sync: None,
116 last_changes_detected: 0,
117 last_fixtures_updated: 0,
118 last_error: None,
119 total_syncs: 0,
120 })),
121 http_client,
122 }
123 }
124
125 pub fn start(&self) -> tokio::task::JoinHandle<()> {
127 let config = Arc::clone(&self.config);
128 let database = Arc::clone(&self.database);
129 let status = Arc::clone(&self.status);
130 let http_client = self.http_client.clone();
131
132 tokio::spawn(async move {
133 let mut interval_timer =
134 interval(Duration::from_secs(config.read().await.interval_seconds));
135 interval_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
136
137 loop {
138 interval_timer.tick().await;
139
140 let config_guard = config.read().await;
141 if !config_guard.enabled {
142 continue;
143 }
144
145 let upstream_url = match &config_guard.upstream_url {
146 Some(url) => url.clone(),
147 None => {
148 warn!("Sync enabled but no upstream_url configured");
149 continue;
150 }
151 };
152
153 let auto_update = config_guard.auto_update;
154 let max_requests = config_guard.max_requests_per_sync;
155 let sync_get_only = config_guard.sync_get_only;
156 let headers = config_guard.headers.clone();
157 drop(config_guard);
158
159 {
161 let mut status_guard = status.write().await;
162 status_guard.is_running = true;
163 }
164
165 info!("Starting automatic sync from upstream: {}", upstream_url);
166
167 match Self::sync_once(
168 &http_client,
169 &database,
170 &upstream_url,
171 auto_update,
172 max_requests,
173 sync_get_only,
174 &headers,
175 )
176 .await
177 {
178 Ok((changes, updated)) => {
179 let mut status_guard = status.write().await;
180 status_guard.is_running = false;
181 status_guard.last_sync = Some(chrono::Utc::now());
182 status_guard.last_changes_detected = changes.len();
183 status_guard.last_fixtures_updated = updated;
184 status_guard.last_error = None;
185 status_guard.total_syncs += 1;
186
187 if !changes.is_empty() {
188 info!(
189 "Sync complete: {} changes detected, {} fixtures updated",
190 changes.len(),
191 updated
192 );
193 } else {
194 debug!("Sync complete: No changes detected");
195 }
196 }
197 Err(e) => {
198 let mut status_guard = status.write().await;
199 status_guard.is_running = false;
200 status_guard.last_error = Some(e.to_string());
201 warn!("Sync failed: {}", e);
202 }
203 }
204 }
205 })
206 }
207
208 async fn sync_once(
210 http_client: &Client,
211 database: &RecorderDatabase,
212 upstream_url: &str,
213 auto_update: bool,
214 max_requests: usize,
215 sync_get_only: bool,
216 headers: &HashMap<String, String>,
217 ) -> Result<(Vec<DetectedChange>, usize)> {
218 let recorded_requests = database.list_recent(max_requests as i32).await?;
220
221 let mut changes = Vec::new();
222 let mut updated_count = 0;
223
224 for request in recorded_requests {
225 if sync_get_only && request.method.to_uppercase() != "GET" {
227 continue;
228 }
229
230 let full_url =
232 if request.path.starts_with("http://") || request.path.starts_with("https://") {
233 request.path.clone()
234 } else {
235 format!("{}{}", upstream_url.trim_end_matches('/'), request.path)
236 };
237
238 match Self::replay_to_upstream(
240 http_client,
241 &full_url,
242 &request.method,
243 &request.headers,
244 headers,
245 )
246 .await
247 {
248 Ok((status, response_headers, response_body)) => {
249 if let Ok(Some(exchange)) = database.get_exchange(&request.id).await {
251 if let Some(original_response) = exchange.response {
252 let original_headers = original_response.headers_map();
253 let original_body =
254 original_response.decoded_body().unwrap_or_default();
255
256 let comparison = ResponseComparator::compare(
258 original_response.status_code as i32,
259 &original_headers,
260 &original_body,
261 status as i32,
262 &response_headers,
263 &response_body,
264 );
265
266 if !comparison.matches {
267 debug!(
268 "Change detected for {} {}: {} differences",
269 request.method,
270 request.path,
271 comparison.differences.len()
272 );
273
274 let mut updated = false;
275 if auto_update {
276 match Self::update_fixture(
278 database,
279 &request.id,
280 status,
281 &response_headers,
282 &response_body,
283 )
284 .await
285 {
286 Ok(_) => {
287 updated = true;
288 updated_count += 1;
289 info!(
290 "Updated fixture for {} {}",
291 request.method, request.path
292 );
293 }
294 Err(e) => {
295 warn!(
296 "Failed to update fixture for {} {}: {}",
297 request.method, request.path, e
298 );
299 }
300 }
301 }
302
303 changes.push(DetectedChange {
304 request_id: request.id.clone(),
305 method: request.method.clone(),
306 path: request.path.clone(),
307 comparison,
308 updated,
309 });
310 }
311 }
312 }
313 }
314 Err(e) => {
315 debug!(
316 "Failed to replay {} {} to upstream: {}",
317 request.method, request.path, e
318 );
319 }
321 }
322 }
323
324 Ok((changes, updated_count))
325 }
326
327 async fn replay_to_upstream(
329 http_client: &Client,
330 url: &str,
331 method: &str,
332 original_headers: &str,
333 additional_headers: &HashMap<String, String>,
334 ) -> Result<(u16, HashMap<String, String>, Vec<u8>)> {
335 let mut headers_map = HashMap::new();
337 if let Ok(json) = serde_json::from_str::<HashMap<String, String>>(original_headers) {
338 headers_map = json;
339 }
340
341 for (key, value) in additional_headers {
343 headers_map.insert(key.clone(), value.clone());
344 }
345
346 let reqwest_method = match method.to_uppercase().as_str() {
348 "GET" => reqwest::Method::GET,
349 "POST" => reqwest::Method::POST,
350 "PUT" => reqwest::Method::PUT,
351 "DELETE" => reqwest::Method::DELETE,
352 "PATCH" => reqwest::Method::PATCH,
353 "HEAD" => reqwest::Method::HEAD,
354 "OPTIONS" => reqwest::Method::OPTIONS,
355 _ => {
356 return Err(crate::RecorderError::InvalidFilter(format!(
357 "Unsupported method: {}",
358 method
359 )))
360 }
361 };
362
363 let mut request_builder = http_client.request(reqwest_method, url);
364
365 for (key, value) in &headers_map {
367 if let Ok(header_name) = reqwest::header::HeaderName::from_bytes(key.as_bytes()) {
368 if let Ok(header_value) = reqwest::header::HeaderValue::from_str(value) {
369 request_builder = request_builder.header(header_name, header_value);
370 }
371 }
372 }
373
374 let response = request_builder
376 .send()
377 .await
378 .map_err(|e| crate::RecorderError::InvalidFilter(format!("Request failed: {}", e)))?;
379
380 let status = response.status().as_u16();
381 let mut response_headers = HashMap::new();
382
383 for (key, value) in response.headers() {
384 if let Ok(value_str) = value.to_str() {
385 response_headers.insert(key.to_string(), value_str.to_string());
386 }
387 }
388
389 let response_body = response
390 .bytes()
391 .await
392 .map_err(|e| {
393 crate::RecorderError::InvalidFilter(format!("Failed to read response body: {}", e))
394 })?
395 .to_vec();
396
397 Ok((status, response_headers, response_body))
398 }
399
400 async fn update_fixture(
402 database: &RecorderDatabase,
403 request_id: &str,
404 status_code: u16,
405 headers: &HashMap<String, String>,
406 body: &[u8],
407 ) -> Result<()> {
408 let headers_json = serde_json::to_string(headers)?;
410 let body_encoded = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, body);
411 let body_size = body.len() as i64;
412
413 database
414 .update_response(
415 request_id,
416 status_code as i32,
417 &headers_json,
418 &body_encoded,
419 body_size,
420 )
421 .await?;
422
423 Ok(())
424 }
425
426 pub async fn get_status(&self) -> SyncStatus {
428 self.status.read().await.clone()
429 }
430
431 pub async fn get_config(&self) -> SyncConfig {
433 self.config.read().await.clone()
434 }
435
436 pub async fn update_config(&self, new_config: SyncConfig) {
438 *self.config.write().await = new_config;
439 }
440
441 pub async fn sync_now(&self) -> Result<(Vec<DetectedChange>, usize)> {
443 let config = self.config.read().await.clone();
444 let upstream_url = config.upstream_url.ok_or_else(|| {
445 crate::RecorderError::InvalidFilter("No upstream_url configured".to_string())
446 })?;
447
448 {
449 let mut status = self.status.write().await;
450 status.is_running = true;
451 }
452
453 let result = Self::sync_once(
454 &self.http_client,
455 &self.database,
456 &upstream_url,
457 config.auto_update,
458 config.max_requests_per_sync,
459 config.sync_get_only,
460 &config.headers,
461 )
462 .await;
463
464 {
465 let mut status = self.status.write().await;
466 status.is_running = false;
467 match &result {
468 Ok((changes, updated)) => {
469 status.last_sync = Some(chrono::Utc::now());
470 status.last_changes_detected = changes.len();
471 status.last_fixtures_updated = *updated;
472 status.last_error = None;
473 status.total_syncs += 1;
474 }
475 Err(e) => {
476 status.last_error = Some(e.to_string());
477 }
478 }
479 }
480
481 result
482 }
483}
484
485#[cfg(test)]
486mod tests {
487 use super::*;
488
489 #[test]
490 fn test_sync_config_default() {
491 let config = SyncConfig::default();
492 assert!(!config.enabled);
493 assert_eq!(config.interval_seconds, 3600);
494 assert!(!config.auto_update);
495 assert_eq!(config.max_requests_per_sync, 100);
496 }
497
498 #[test]
499 fn test_sync_status_creation() {
500 let status = SyncStatus {
501 is_running: false,
502 last_sync: None,
503 last_changes_detected: 0,
504 last_fixtures_updated: 0,
505 last_error: None,
506 total_syncs: 0,
507 };
508
509 assert!(!status.is_running);
510 assert_eq!(status.total_syncs, 0);
511 }
512}