Skip to main content

helios_persistence/composite/
sync.rs

1//! Synchronization for secondary backends.
2//!
3//! This module provides synchronization mechanisms to keep secondary backends
4//! in sync with the primary backend.
5//!
6//! # Sync Modes
7//!
8//! | Mode | Description | Latency | Consistency |
9//! |------|-------------|---------|-------------|
10//! | Synchronous | Update secondaries in same operation | Higher | Strong |
11//! | Asynchronous | Update via event queue | Lower | Eventual |
12//! | Hybrid | Sync for some, async for others | Medium | Configurable |
13//!
14//! # Example
15//!
16//! ```ignore
17//! use helios_persistence::composite::sync::{SyncManager, SyncEvent};
18//!
19//! let manager = SyncManager::new(SyncConfig::default());
20//!
21//! // Sync a create event to secondaries
22//! manager.sync(&SyncEvent::Create {
23//!     resource_type: "Patient".to_string(),
24//!     resource_id: "123".to_string(),
25//!     content: patient_json,
26//!     tenant_id: tenant.tenant_id().clone(),
27//! }, &secondaries).await?;
28//! ```
29
30use std::collections::HashMap;
31use std::sync::Arc;
32use std::time::Duration;
33
34use helios_fhir::FhirVersion;
35use parking_lot::RwLock;
36use serde_json::Value;
37use tokio::sync::mpsc;
38use tokio::time::sleep;
39use tracing::{debug, error, warn};
40
41use crate::core::ResourceStorage;
42use crate::error::{StorageError, StorageResult};
43use crate::tenant::{TenantContext, TenantId, TenantPermissions};
44use crate::types::StoredResource;
45
46use super::config::{RetryConfig, SyncConfig, SyncMode};
47
48/// A synchronization event to propagate to secondary backends.
49#[derive(Debug, Clone)]
50pub enum SyncEvent {
51    /// Resource was created.
52    Create {
53        /// Resource type.
54        resource_type: String,
55        /// Resource ID.
56        resource_id: String,
57        /// Resource content.
58        content: Value,
59        /// Tenant ID.
60        tenant_id: TenantId,
61        /// FHIR version.
62        fhir_version: FhirVersion,
63    },
64
65    /// Resource was updated.
66    Update {
67        /// Resource type.
68        resource_type: String,
69        /// Resource ID.
70        resource_id: String,
71        /// New resource content.
72        content: Value,
73        /// Tenant ID.
74        tenant_id: TenantId,
75        /// New version.
76        version: String,
77        /// FHIR version.
78        fhir_version: FhirVersion,
79    },
80
81    /// Resource was deleted.
82    Delete {
83        /// Resource type.
84        resource_type: String,
85        /// Resource ID.
86        resource_id: String,
87        /// Tenant ID.
88        tenant_id: TenantId,
89    },
90
91    /// Bulk sync request.
92    BulkSync {
93        /// Resources to sync.
94        resources: Vec<StoredResource>,
95        /// Tenant ID.
96        tenant_id: TenantId,
97    },
98}
99
100impl SyncEvent {
101    /// Returns the resource type for this event.
102    pub fn resource_type(&self) -> &str {
103        match self {
104            SyncEvent::Create { resource_type, .. } => resource_type,
105            SyncEvent::Update { resource_type, .. } => resource_type,
106            SyncEvent::Delete { resource_type, .. } => resource_type,
107            SyncEvent::BulkSync { .. } => "bulk",
108        }
109    }
110
111    /// Returns the resource ID for this event (if applicable).
112    pub fn resource_id(&self) -> Option<&str> {
113        match self {
114            SyncEvent::Create { resource_id, .. } => Some(resource_id),
115            SyncEvent::Update { resource_id, .. } => Some(resource_id),
116            SyncEvent::Delete { resource_id, .. } => Some(resource_id),
117            SyncEvent::BulkSync { .. } => None,
118        }
119    }
120
121    /// Returns the tenant ID for this event.
122    pub fn tenant_id(&self) -> &TenantId {
123        match self {
124            SyncEvent::Create { tenant_id, .. } => tenant_id,
125            SyncEvent::Update { tenant_id, .. } => tenant_id,
126            SyncEvent::Delete { tenant_id, .. } => tenant_id,
127            SyncEvent::BulkSync { tenant_id, .. } => tenant_id,
128        }
129    }
130}
131
132/// Status of a sync operation.
133#[derive(Debug, Clone)]
134pub struct SyncStatus {
135    /// Backend ID.
136    pub backend_id: String,
137
138    /// Whether the sync succeeded.
139    pub success: bool,
140
141    /// Error message if failed.
142    pub error: Option<String>,
143
144    /// Retry count.
145    pub retry_count: u32,
146
147    /// Duration of the operation.
148    pub duration: Duration,
149}
150
151/// Synchronization manager for secondary backends.
152pub struct SyncManager {
153    /// Configuration.
154    config: SyncConfig,
155
156    /// Event queue for async mode.
157    event_sender: Option<mpsc::Sender<QueuedEvent>>,
158
159    /// Sync status per backend.
160    status: Arc<RwLock<HashMap<String, BackendSyncStatus>>>,
161}
162
163/// Status tracking for a backend.
164#[derive(Debug, Clone, Default)]
165pub struct BackendSyncStatus {
166    /// Last successful sync timestamp.
167    pub last_success: Option<std::time::Instant>,
168
169    /// Current sync lag (events pending).
170    pub pending_events: usize,
171
172    /// Total events synced.
173    pub total_synced: u64,
174
175    /// Total errors.
176    pub total_errors: u64,
177
178    /// Whether sync is healthy.
179    pub healthy: bool,
180}
181
182/// Event queued for async processing.
183struct QueuedEvent {
184    event: SyncEvent,
185    backend_ids: Vec<String>,
186    #[allow(dead_code)]
187    created_at: std::time::Instant,
188}
189
190impl SyncManager {
191    /// Creates a new sync manager.
192    pub fn new(config: SyncConfig) -> Self {
193        Self {
194            config,
195            event_sender: None,
196            status: Arc::new(RwLock::new(HashMap::new())),
197        }
198    }
199
200    /// Starts the async sync worker.
201    pub fn start_async_worker(
202        &mut self,
203        backends: HashMap<String, Arc<dyn ResourceStorage + Send + Sync>>,
204    ) -> tokio::task::JoinHandle<()> {
205        let (sender, receiver) = mpsc::channel::<QueuedEvent>(1000);
206        self.event_sender = Some(sender);
207
208        let config = self.config.clone();
209        let status = self.status.clone();
210
211        tokio::spawn(async move {
212            Self::async_worker(receiver, backends, config, status).await;
213        })
214    }
215
216    /// Async worker that processes queued events.
217    async fn async_worker(
218        mut receiver: mpsc::Receiver<QueuedEvent>,
219        backends: HashMap<String, Arc<dyn ResourceStorage + Send + Sync>>,
220        config: SyncConfig,
221        status: Arc<RwLock<HashMap<String, BackendSyncStatus>>>,
222    ) {
223        let mut batch = Vec::new();
224        let batch_timeout = Duration::from_millis(100);
225
226        loop {
227            // Collect events into batches
228            let deadline = tokio::time::Instant::now() + batch_timeout;
229
230            loop {
231                let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
232                if remaining.is_zero() || batch.len() >= config.batch_size {
233                    break;
234                }
235
236                match tokio::time::timeout(remaining, receiver.recv()).await {
237                    Ok(Some(event)) => batch.push(event),
238                    Ok(None) => return, // Channel closed
239                    Err(_) => break,    // Timeout
240                }
241            }
242
243            if batch.is_empty() {
244                continue;
245            }
246
247            // Process batch
248            let events: Vec<_> = std::mem::take(&mut batch);
249
250            for queued in events {
251                for backend_id in &queued.backend_ids {
252                    if let Some(backend) = backends.get(backend_id) {
253                        let result = Self::sync_event_to_backend(
254                            &queued.event,
255                            backend.as_ref(),
256                            &config.retry,
257                        )
258                        .await;
259
260                        // Update status
261                        let mut status_map = status.write();
262                        let backend_status = status_map.entry(backend_id.clone()).or_default();
263
264                        match result {
265                            Ok(_) => {
266                                backend_status.last_success = Some(std::time::Instant::now());
267                                backend_status.total_synced += 1;
268                                backend_status.healthy = true;
269                            }
270                            Err(e) => {
271                                backend_status.total_errors += 1;
272                                error!(
273                                    backend = %backend_id,
274                                    error = %e,
275                                    "Async sync failed"
276                                );
277                            }
278                        }
279
280                        if backend_status.pending_events > 0 {
281                            backend_status.pending_events -= 1;
282                        }
283                    }
284                }
285            }
286        }
287    }
288
289    /// Synchronizes an event to secondary backends.
290    pub async fn sync(
291        &self,
292        event: &SyncEvent,
293        backends: &HashMap<String, Arc<dyn ResourceStorage + Send + Sync>>,
294    ) -> StorageResult<Vec<SyncStatus>> {
295        match self.config.mode {
296            SyncMode::Synchronous => self.sync_synchronous(event, backends).await,
297            SyncMode::Asynchronous => self.sync_asynchronous(event, backends).await,
298            SyncMode::Hybrid { sync_for_search } => {
299                // In hybrid mode, sync search-related events synchronously
300                let is_search_related = matches!(
301                    event,
302                    SyncEvent::Create { .. } | SyncEvent::Update { .. } | SyncEvent::Delete { .. }
303                );
304
305                if sync_for_search && is_search_related {
306                    self.sync_synchronous(event, backends).await
307                } else {
308                    self.sync_asynchronous(event, backends).await
309                }
310            }
311        }
312    }
313
314    /// Synchronous sync - waits for all backends.
315    async fn sync_synchronous(
316        &self,
317        event: &SyncEvent,
318        backends: &HashMap<String, Arc<dyn ResourceStorage + Send + Sync>>,
319    ) -> StorageResult<Vec<SyncStatus>> {
320        use tokio::task::JoinSet;
321
322        let mut tasks: JoinSet<SyncStatus> = JoinSet::new();
323        let event = event.clone();
324
325        for (backend_id, backend) in backends {
326            let event = event.clone();
327            let backend = backend.clone();
328            let backend_id = backend_id.clone();
329            let retry_config = self.config.retry.clone();
330
331            tasks.spawn(async move {
332                let start = std::time::Instant::now();
333
334                match Self::sync_event_to_backend(&event, backend.as_ref(), &retry_config).await {
335                    Ok(_) => SyncStatus {
336                        backend_id,
337                        success: true,
338                        error: None,
339                        retry_count: 0,
340                        duration: start.elapsed(),
341                    },
342                    Err(e) => SyncStatus {
343                        backend_id,
344                        success: false,
345                        error: Some(e.to_string()),
346                        retry_count: retry_config.max_retries,
347                        duration: start.elapsed(),
348                    },
349                }
350            });
351        }
352
353        let mut results = Vec::new();
354        while let Some(result) = tasks.join_next().await {
355            match result {
356                Ok(status) => {
357                    // Update internal status
358                    let mut status_map = self.status.write();
359                    let backend_status = status_map.entry(status.backend_id.clone()).or_default();
360
361                    if status.success {
362                        backend_status.last_success = Some(std::time::Instant::now());
363                        backend_status.total_synced += 1;
364                        backend_status.healthy = true;
365                    } else {
366                        backend_status.total_errors += 1;
367                    }
368
369                    results.push(status);
370                }
371                Err(e) => {
372                    warn!(error = %e, "Sync task failed");
373                }
374            }
375        }
376
377        Ok(results)
378    }
379
380    /// Asynchronous sync - queues events for background processing.
381    async fn sync_asynchronous(
382        &self,
383        event: &SyncEvent,
384        backends: &HashMap<String, Arc<dyn ResourceStorage + Send + Sync>>,
385    ) -> StorageResult<Vec<SyncStatus>> {
386        if let Some(ref sender) = self.event_sender {
387            let backend_ids: Vec<_> = backends.keys().cloned().collect();
388
389            // Update pending counts
390            {
391                let mut status_map = self.status.write();
392                for id in &backend_ids {
393                    let status = status_map.entry(id.clone()).or_default();
394                    status.pending_events += 1;
395                }
396            }
397
398            sender
399                .send(QueuedEvent {
400                    event: event.clone(),
401                    backend_ids: backend_ids.clone(),
402                    created_at: std::time::Instant::now(),
403                })
404                .await
405                .map_err(|e| {
406                    StorageError::Backend(crate::error::BackendError::ConnectionFailed {
407                        backend_name: "sync".to_string(),
408                        message: format!("Failed to queue sync event: {}", e),
409                    })
410                })?;
411
412            // Return pending status
413            Ok(backend_ids
414                .into_iter()
415                .map(|id| SyncStatus {
416                    backend_id: id,
417                    success: true,
418                    error: None,
419                    retry_count: 0,
420                    duration: Duration::ZERO,
421                })
422                .collect())
423        } else {
424            // No async worker, fall back to sync
425            warn!("Async sync requested but no worker started, falling back to sync");
426            self.sync_synchronous(event, backends).await
427        }
428    }
429
430    /// Syncs a single event to a backend with retries.
431    async fn sync_event_to_backend(
432        event: &SyncEvent,
433        backend: &dyn ResourceStorage,
434        retry_config: &RetryConfig,
435    ) -> StorageResult<()> {
436        let mut delay = retry_config.initial_delay;
437        let mut attempts = 0;
438
439        loop {
440            attempts += 1;
441
442            let result = match event {
443                SyncEvent::Create {
444                    resource_type,
445                    content,
446                    tenant_id,
447                    fhir_version,
448                    ..
449                } => {
450                    let tenant =
451                        TenantContext::new(tenant_id.clone(), TenantPermissions::full_access());
452                    backend
453                        .create(&tenant, resource_type, content.clone(), *fhir_version)
454                        .await
455                        .map(|_| ())
456                }
457                SyncEvent::Update {
458                    resource_type,
459                    resource_id,
460                    content,
461                    tenant_id,
462                    fhir_version,
463                    ..
464                } => {
465                    let tenant =
466                        TenantContext::new(tenant_id.clone(), TenantPermissions::full_access());
467
468                    // For secondary backends, we do a create_or_update
469                    // since we don't track versions in secondaries
470                    backend
471                        .create_or_update(
472                            &tenant,
473                            resource_type,
474                            resource_id,
475                            content.clone(),
476                            *fhir_version,
477                        )
478                        .await
479                        .map(|_| ())
480                }
481                SyncEvent::Delete {
482                    resource_type,
483                    resource_id,
484                    tenant_id,
485                } => {
486                    let tenant =
487                        TenantContext::new(tenant_id.clone(), TenantPermissions::full_access());
488                    backend.delete(&tenant, resource_type, resource_id).await
489                }
490                SyncEvent::BulkSync {
491                    resources,
492                    tenant_id,
493                } => {
494                    let tenant =
495                        TenantContext::new(tenant_id.clone(), TenantPermissions::full_access());
496
497                    for resource in resources {
498                        backend
499                            .create_or_update(
500                                &tenant,
501                                resource.resource_type(),
502                                resource.id(),
503                                resource.content().clone(),
504                                resource.fhir_version(),
505                            )
506                            .await?;
507                    }
508                    Ok(())
509                }
510            };
511
512            match result {
513                Ok(()) => {
514                    if attempts > 1 {
515                        debug!(attempts = attempts, "Sync succeeded after retries");
516                    }
517                    return Ok(());
518                }
519                Err(e) => {
520                    if attempts > retry_config.max_retries {
521                        return Err(e);
522                    }
523
524                    warn!(
525                        attempt = attempts,
526                        max_retries = retry_config.max_retries,
527                        delay_ms = delay.as_millis(),
528                        error = %e,
529                        "Sync attempt failed, retrying"
530                    );
531
532                    sleep(delay).await;
533                    delay = std::cmp::min(
534                        Duration::from_secs_f64(
535                            delay.as_secs_f64() * retry_config.backoff_multiplier,
536                        ),
537                        retry_config.max_delay,
538                    );
539                }
540            }
541        }
542    }
543
544    /// Returns the sync status for a backend.
545    pub fn backend_status(&self, backend_id: &str) -> Option<BackendSyncStatus> {
546        self.status.read().get(backend_id).cloned()
547    }
548
549    /// Returns all backend statuses.
550    pub fn all_statuses(&self) -> HashMap<String, BackendSyncStatus> {
551        self.status.read().clone()
552    }
553
554    /// Checks if all backends are healthy (no excessive lag).
555    pub fn is_healthy(&self) -> bool {
556        let _max_lag = self.config.max_read_lag_ms;
557        let status = self.status.read();
558
559        for (_, backend_status) in status.iter() {
560            // Consider unhealthy if pending events exceed threshold
561            // (rough approximation of lag)
562            if backend_status.pending_events > self.config.batch_size * 10 {
563                return false;
564            }
565        }
566
567        true
568    }
569
570    /// Waits for sync lag to be below threshold.
571    pub async fn wait_for_sync(&self, timeout: Duration) -> bool {
572        let deadline = tokio::time::Instant::now() + timeout;
573
574        while tokio::time::Instant::now() < deadline {
575            if self.is_healthy() {
576                let status = self.status.read();
577                let all_synced = status.values().all(|s| s.pending_events == 0);
578                if all_synced {
579                    return true;
580                }
581            }
582            sleep(Duration::from_millis(10)).await;
583        }
584
585        false
586    }
587}
588
589/// Sync reconciliation for detecting and fixing inconsistencies.
590pub struct SyncReconciler {
591    /// Maximum resources to check per batch.
592    #[allow(dead_code)]
593    batch_size: usize,
594}
595
596impl SyncReconciler {
597    /// Creates a new reconciler.
598    pub fn new() -> Self {
599        Self { batch_size: 100 }
600    }
601
602    /// Reconciles a secondary backend with the primary.
603    pub async fn reconcile(
604        &self,
605        tenant: &TenantContext,
606        primary: &dyn ResourceStorage,
607        secondary: &dyn ResourceStorage,
608        resource_type: &str,
609    ) -> StorageResult<ReconciliationResult> {
610        let mut result = ReconciliationResult::default();
611
612        // Get count from both
613        let primary_count = primary.count(tenant, Some(resource_type)).await?;
614        result.primary_count = primary_count;
615
616        let secondary_count = secondary.count(tenant, Some(resource_type)).await?;
617        result.secondary_count = secondary_count;
618
619        // TODO: Implement full reconciliation by:
620        // 1. Iterating through primary resources
621        // 2. Checking if they exist in secondary
622        // 3. Checking if content matches
623        // 4. Syncing any differences
624
625        // For now, just report counts
626        if primary_count != secondary_count {
627            result.differences = (primary_count as i64 - secondary_count as i64).unsigned_abs();
628        }
629
630        Ok(result)
631    }
632}
633
634impl Default for SyncReconciler {
635    fn default() -> Self {
636        Self::new()
637    }
638}
639
640/// Result of a reconciliation operation.
641#[derive(Debug, Default)]
642pub struct ReconciliationResult {
643    /// Resource count in primary.
644    pub primary_count: u64,
645
646    /// Resource count in secondary.
647    pub secondary_count: u64,
648
649    /// Number of differences found.
650    pub differences: u64,
651
652    /// Resources missing from secondary.
653    pub missing_in_secondary: Vec<String>,
654
655    /// Resources extra in secondary (should be deleted).
656    pub extra_in_secondary: Vec<String>,
657
658    /// Resources with content mismatch.
659    pub content_mismatches: Vec<String>,
660}
661
662#[cfg(test)]
663mod tests {
664    use super::*;
665    use helios_fhir::FhirVersion;
666
667    #[test]
668    fn test_sync_event_accessors() {
669        let event = SyncEvent::Create {
670            resource_type: "Patient".to_string(),
671            resource_id: "123".to_string(),
672            content: serde_json::json!({}),
673            tenant_id: TenantId::new("test"),
674            fhir_version: FhirVersion::default(),
675        };
676
677        assert_eq!(event.resource_type(), "Patient");
678        assert_eq!(event.resource_id(), Some("123"));
679        assert_eq!(event.tenant_id().as_str(), "test");
680    }
681
682    #[test]
683    fn test_sync_status_default() {
684        let status = BackendSyncStatus::default();
685        assert!(status.last_success.is_none());
686        assert_eq!(status.pending_events, 0);
687        assert_eq!(status.total_synced, 0);
688        assert!(!status.healthy);
689    }
690
691    #[test]
692    fn test_reconciliation_result() {
693        let result = ReconciliationResult {
694            primary_count: 100,
695            secondary_count: 95,
696            differences: 5,
697            ..Default::default()
698        };
699
700        assert_eq!(result.differences, 5);
701    }
702
703    #[test]
704    fn test_sync_manager_creation() {
705        let config = SyncConfig::default();
706        let manager = SyncManager::new(config);
707        assert!(manager.is_healthy());
708    }
709}