1use std::collections::HashMap;
31use std::sync::Arc;
32use std::time::Duration;
33
34use helios_fhir::FhirVersion;
35use parking_lot::RwLock;
36use serde_json::Value;
37use tokio::sync::mpsc;
38use tokio::time::sleep;
39use tracing::{debug, error, warn};
40
41use crate::core::ResourceStorage;
42use crate::error::{StorageError, StorageResult};
43use crate::tenant::{TenantContext, TenantId, TenantPermissions};
44use crate::types::StoredResource;
45
46use super::config::{RetryConfig, SyncConfig, SyncMode};
47
48#[derive(Debug, Clone)]
50pub enum SyncEvent {
51 Create {
53 resource_type: String,
55 resource_id: String,
57 content: Value,
59 tenant_id: TenantId,
61 fhir_version: FhirVersion,
63 },
64
65 Update {
67 resource_type: String,
69 resource_id: String,
71 content: Value,
73 tenant_id: TenantId,
75 version: String,
77 fhir_version: FhirVersion,
79 },
80
81 Delete {
83 resource_type: String,
85 resource_id: String,
87 tenant_id: TenantId,
89 },
90
91 BulkSync {
93 resources: Vec<StoredResource>,
95 tenant_id: TenantId,
97 },
98}
99
100impl SyncEvent {
101 pub fn resource_type(&self) -> &str {
103 match self {
104 SyncEvent::Create { resource_type, .. } => resource_type,
105 SyncEvent::Update { resource_type, .. } => resource_type,
106 SyncEvent::Delete { resource_type, .. } => resource_type,
107 SyncEvent::BulkSync { .. } => "bulk",
108 }
109 }
110
111 pub fn resource_id(&self) -> Option<&str> {
113 match self {
114 SyncEvent::Create { resource_id, .. } => Some(resource_id),
115 SyncEvent::Update { resource_id, .. } => Some(resource_id),
116 SyncEvent::Delete { resource_id, .. } => Some(resource_id),
117 SyncEvent::BulkSync { .. } => None,
118 }
119 }
120
121 pub fn tenant_id(&self) -> &TenantId {
123 match self {
124 SyncEvent::Create { tenant_id, .. } => tenant_id,
125 SyncEvent::Update { tenant_id, .. } => tenant_id,
126 SyncEvent::Delete { tenant_id, .. } => tenant_id,
127 SyncEvent::BulkSync { tenant_id, .. } => tenant_id,
128 }
129 }
130}
131
132#[derive(Debug, Clone)]
134pub struct SyncStatus {
135 pub backend_id: String,
137
138 pub success: bool,
140
141 pub error: Option<String>,
143
144 pub retry_count: u32,
146
147 pub duration: Duration,
149}
150
151pub struct SyncManager {
153 config: SyncConfig,
155
156 event_sender: Option<mpsc::Sender<QueuedEvent>>,
158
159 status: Arc<RwLock<HashMap<String, BackendSyncStatus>>>,
161}
162
163#[derive(Debug, Clone, Default)]
165pub struct BackendSyncStatus {
166 pub last_success: Option<std::time::Instant>,
168
169 pub pending_events: usize,
171
172 pub total_synced: u64,
174
175 pub total_errors: u64,
177
178 pub healthy: bool,
180}
181
182struct QueuedEvent {
184 event: SyncEvent,
185 backend_ids: Vec<String>,
186 #[allow(dead_code)]
187 created_at: std::time::Instant,
188}
189
190impl SyncManager {
191 pub fn new(config: SyncConfig) -> Self {
193 Self {
194 config,
195 event_sender: None,
196 status: Arc::new(RwLock::new(HashMap::new())),
197 }
198 }
199
200 pub fn start_async_worker(
202 &mut self,
203 backends: HashMap<String, Arc<dyn ResourceStorage + Send + Sync>>,
204 ) -> tokio::task::JoinHandle<()> {
205 let (sender, receiver) = mpsc::channel::<QueuedEvent>(1000);
206 self.event_sender = Some(sender);
207
208 let config = self.config.clone();
209 let status = self.status.clone();
210
211 tokio::spawn(async move {
212 Self::async_worker(receiver, backends, config, status).await;
213 })
214 }
215
216 async fn async_worker(
218 mut receiver: mpsc::Receiver<QueuedEvent>,
219 backends: HashMap<String, Arc<dyn ResourceStorage + Send + Sync>>,
220 config: SyncConfig,
221 status: Arc<RwLock<HashMap<String, BackendSyncStatus>>>,
222 ) {
223 let mut batch = Vec::new();
224 let batch_timeout = Duration::from_millis(100);
225
226 loop {
227 let deadline = tokio::time::Instant::now() + batch_timeout;
229
230 loop {
231 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
232 if remaining.is_zero() || batch.len() >= config.batch_size {
233 break;
234 }
235
236 match tokio::time::timeout(remaining, receiver.recv()).await {
237 Ok(Some(event)) => batch.push(event),
238 Ok(None) => return, Err(_) => break, }
241 }
242
243 if batch.is_empty() {
244 continue;
245 }
246
247 let events: Vec<_> = std::mem::take(&mut batch);
249
250 for queued in events {
251 for backend_id in &queued.backend_ids {
252 if let Some(backend) = backends.get(backend_id) {
253 let result = Self::sync_event_to_backend(
254 &queued.event,
255 backend.as_ref(),
256 &config.retry,
257 )
258 .await;
259
260 let mut status_map = status.write();
262 let backend_status = status_map.entry(backend_id.clone()).or_default();
263
264 match result {
265 Ok(_) => {
266 backend_status.last_success = Some(std::time::Instant::now());
267 backend_status.total_synced += 1;
268 backend_status.healthy = true;
269 }
270 Err(e) => {
271 backend_status.total_errors += 1;
272 error!(
273 backend = %backend_id,
274 error = %e,
275 "Async sync failed"
276 );
277 }
278 }
279
280 if backend_status.pending_events > 0 {
281 backend_status.pending_events -= 1;
282 }
283 }
284 }
285 }
286 }
287 }
288
289 pub async fn sync(
291 &self,
292 event: &SyncEvent,
293 backends: &HashMap<String, Arc<dyn ResourceStorage + Send + Sync>>,
294 ) -> StorageResult<Vec<SyncStatus>> {
295 match self.config.mode {
296 SyncMode::Synchronous => self.sync_synchronous(event, backends).await,
297 SyncMode::Asynchronous => self.sync_asynchronous(event, backends).await,
298 SyncMode::Hybrid { sync_for_search } => {
299 let is_search_related = matches!(
301 event,
302 SyncEvent::Create { .. } | SyncEvent::Update { .. } | SyncEvent::Delete { .. }
303 );
304
305 if sync_for_search && is_search_related {
306 self.sync_synchronous(event, backends).await
307 } else {
308 self.sync_asynchronous(event, backends).await
309 }
310 }
311 }
312 }
313
314 async fn sync_synchronous(
316 &self,
317 event: &SyncEvent,
318 backends: &HashMap<String, Arc<dyn ResourceStorage + Send + Sync>>,
319 ) -> StorageResult<Vec<SyncStatus>> {
320 use tokio::task::JoinSet;
321
322 let mut tasks: JoinSet<SyncStatus> = JoinSet::new();
323 let event = event.clone();
324
325 for (backend_id, backend) in backends {
326 let event = event.clone();
327 let backend = backend.clone();
328 let backend_id = backend_id.clone();
329 let retry_config = self.config.retry.clone();
330
331 tasks.spawn(async move {
332 let start = std::time::Instant::now();
333
334 match Self::sync_event_to_backend(&event, backend.as_ref(), &retry_config).await {
335 Ok(_) => SyncStatus {
336 backend_id,
337 success: true,
338 error: None,
339 retry_count: 0,
340 duration: start.elapsed(),
341 },
342 Err(e) => SyncStatus {
343 backend_id,
344 success: false,
345 error: Some(e.to_string()),
346 retry_count: retry_config.max_retries,
347 duration: start.elapsed(),
348 },
349 }
350 });
351 }
352
353 let mut results = Vec::new();
354 while let Some(result) = tasks.join_next().await {
355 match result {
356 Ok(status) => {
357 let mut status_map = self.status.write();
359 let backend_status = status_map.entry(status.backend_id.clone()).or_default();
360
361 if status.success {
362 backend_status.last_success = Some(std::time::Instant::now());
363 backend_status.total_synced += 1;
364 backend_status.healthy = true;
365 } else {
366 backend_status.total_errors += 1;
367 }
368
369 results.push(status);
370 }
371 Err(e) => {
372 warn!(error = %e, "Sync task failed");
373 }
374 }
375 }
376
377 Ok(results)
378 }
379
380 async fn sync_asynchronous(
382 &self,
383 event: &SyncEvent,
384 backends: &HashMap<String, Arc<dyn ResourceStorage + Send + Sync>>,
385 ) -> StorageResult<Vec<SyncStatus>> {
386 if let Some(ref sender) = self.event_sender {
387 let backend_ids: Vec<_> = backends.keys().cloned().collect();
388
389 {
391 let mut status_map = self.status.write();
392 for id in &backend_ids {
393 let status = status_map.entry(id.clone()).or_default();
394 status.pending_events += 1;
395 }
396 }
397
398 sender
399 .send(QueuedEvent {
400 event: event.clone(),
401 backend_ids: backend_ids.clone(),
402 created_at: std::time::Instant::now(),
403 })
404 .await
405 .map_err(|e| {
406 StorageError::Backend(crate::error::BackendError::ConnectionFailed {
407 backend_name: "sync".to_string(),
408 message: format!("Failed to queue sync event: {}", e),
409 })
410 })?;
411
412 Ok(backend_ids
414 .into_iter()
415 .map(|id| SyncStatus {
416 backend_id: id,
417 success: true,
418 error: None,
419 retry_count: 0,
420 duration: Duration::ZERO,
421 })
422 .collect())
423 } else {
424 warn!("Async sync requested but no worker started, falling back to sync");
426 self.sync_synchronous(event, backends).await
427 }
428 }
429
430 async fn sync_event_to_backend(
432 event: &SyncEvent,
433 backend: &dyn ResourceStorage,
434 retry_config: &RetryConfig,
435 ) -> StorageResult<()> {
436 let mut delay = retry_config.initial_delay;
437 let mut attempts = 0;
438
439 loop {
440 attempts += 1;
441
442 let result = match event {
443 SyncEvent::Create {
444 resource_type,
445 content,
446 tenant_id,
447 fhir_version,
448 ..
449 } => {
450 let tenant =
451 TenantContext::new(tenant_id.clone(), TenantPermissions::full_access());
452 backend
453 .create(&tenant, resource_type, content.clone(), *fhir_version)
454 .await
455 .map(|_| ())
456 }
457 SyncEvent::Update {
458 resource_type,
459 resource_id,
460 content,
461 tenant_id,
462 fhir_version,
463 ..
464 } => {
465 let tenant =
466 TenantContext::new(tenant_id.clone(), TenantPermissions::full_access());
467
468 backend
471 .create_or_update(
472 &tenant,
473 resource_type,
474 resource_id,
475 content.clone(),
476 *fhir_version,
477 )
478 .await
479 .map(|_| ())
480 }
481 SyncEvent::Delete {
482 resource_type,
483 resource_id,
484 tenant_id,
485 } => {
486 let tenant =
487 TenantContext::new(tenant_id.clone(), TenantPermissions::full_access());
488 backend.delete(&tenant, resource_type, resource_id).await
489 }
490 SyncEvent::BulkSync {
491 resources,
492 tenant_id,
493 } => {
494 let tenant =
495 TenantContext::new(tenant_id.clone(), TenantPermissions::full_access());
496
497 for resource in resources {
498 backend
499 .create_or_update(
500 &tenant,
501 resource.resource_type(),
502 resource.id(),
503 resource.content().clone(),
504 resource.fhir_version(),
505 )
506 .await?;
507 }
508 Ok(())
509 }
510 };
511
512 match result {
513 Ok(()) => {
514 if attempts > 1 {
515 debug!(attempts = attempts, "Sync succeeded after retries");
516 }
517 return Ok(());
518 }
519 Err(e) => {
520 if attempts > retry_config.max_retries {
521 return Err(e);
522 }
523
524 warn!(
525 attempt = attempts,
526 max_retries = retry_config.max_retries,
527 delay_ms = delay.as_millis(),
528 error = %e,
529 "Sync attempt failed, retrying"
530 );
531
532 sleep(delay).await;
533 delay = std::cmp::min(
534 Duration::from_secs_f64(
535 delay.as_secs_f64() * retry_config.backoff_multiplier,
536 ),
537 retry_config.max_delay,
538 );
539 }
540 }
541 }
542 }
543
544 pub fn backend_status(&self, backend_id: &str) -> Option<BackendSyncStatus> {
546 self.status.read().get(backend_id).cloned()
547 }
548
549 pub fn all_statuses(&self) -> HashMap<String, BackendSyncStatus> {
551 self.status.read().clone()
552 }
553
554 pub fn is_healthy(&self) -> bool {
556 let _max_lag = self.config.max_read_lag_ms;
557 let status = self.status.read();
558
559 for (_, backend_status) in status.iter() {
560 if backend_status.pending_events > self.config.batch_size * 10 {
563 return false;
564 }
565 }
566
567 true
568 }
569
570 pub async fn wait_for_sync(&self, timeout: Duration) -> bool {
572 let deadline = tokio::time::Instant::now() + timeout;
573
574 while tokio::time::Instant::now() < deadline {
575 if self.is_healthy() {
576 let status = self.status.read();
577 let all_synced = status.values().all(|s| s.pending_events == 0);
578 if all_synced {
579 return true;
580 }
581 }
582 sleep(Duration::from_millis(10)).await;
583 }
584
585 false
586 }
587}
588
589pub struct SyncReconciler {
591 #[allow(dead_code)]
593 batch_size: usize,
594}
595
596impl SyncReconciler {
597 pub fn new() -> Self {
599 Self { batch_size: 100 }
600 }
601
602 pub async fn reconcile(
604 &self,
605 tenant: &TenantContext,
606 primary: &dyn ResourceStorage,
607 secondary: &dyn ResourceStorage,
608 resource_type: &str,
609 ) -> StorageResult<ReconciliationResult> {
610 let mut result = ReconciliationResult::default();
611
612 let primary_count = primary.count(tenant, Some(resource_type)).await?;
614 result.primary_count = primary_count;
615
616 let secondary_count = secondary.count(tenant, Some(resource_type)).await?;
617 result.secondary_count = secondary_count;
618
619 if primary_count != secondary_count {
627 result.differences = (primary_count as i64 - secondary_count as i64).unsigned_abs();
628 }
629
630 Ok(result)
631 }
632}
633
634impl Default for SyncReconciler {
635 fn default() -> Self {
636 Self::new()
637 }
638}
639
640#[derive(Debug, Default)]
642pub struct ReconciliationResult {
643 pub primary_count: u64,
645
646 pub secondary_count: u64,
648
649 pub differences: u64,
651
652 pub missing_in_secondary: Vec<String>,
654
655 pub extra_in_secondary: Vec<String>,
657
658 pub content_mismatches: Vec<String>,
660}
661
662#[cfg(test)]
663mod tests {
664 use super::*;
665 use helios_fhir::FhirVersion;
666
667 #[test]
668 fn test_sync_event_accessors() {
669 let event = SyncEvent::Create {
670 resource_type: "Patient".to_string(),
671 resource_id: "123".to_string(),
672 content: serde_json::json!({}),
673 tenant_id: TenantId::new("test"),
674 fhir_version: FhirVersion::default(),
675 };
676
677 assert_eq!(event.resource_type(), "Patient");
678 assert_eq!(event.resource_id(), Some("123"));
679 assert_eq!(event.tenant_id().as_str(), "test");
680 }
681
682 #[test]
683 fn test_sync_status_default() {
684 let status = BackendSyncStatus::default();
685 assert!(status.last_success.is_none());
686 assert_eq!(status.pending_events, 0);
687 assert_eq!(status.total_synced, 0);
688 assert!(!status.healthy);
689 }
690
691 #[test]
692 fn test_reconciliation_result() {
693 let result = ReconciliationResult {
694 primary_count: 100,
695 secondary_count: 95,
696 differences: 5,
697 ..Default::default()
698 };
699
700 assert_eq!(result.differences, 5);
701 }
702
703 #[test]
704 fn test_sync_manager_creation() {
705 let config = SyncConfig::default();
706 let manager = SyncManager::new(config);
707 assert!(manager.is_healthy());
708 }
709}