Skip to main content

ceres_core/
sync.rs

1//! Sync service layer for portal synchronization logic.
2//!
3//! This module provides pure business logic for delta detection and sync statistics,
4//! decoupled from I/O operations and CLI orchestration.
5
6use std::sync::atomic::{AtomicUsize, Ordering};
7
8/// Outcome of processing a single dataset during sync.
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum SyncOutcome {
11    /// Dataset content hash matches existing - no changes needed
12    Unchanged,
13    /// Dataset content changed - embedding regenerated
14    Updated,
15    /// New dataset - first time seeing this dataset
16    Created,
17    /// Processing failed for this dataset
18    Failed,
19    /// Dataset skipped due to circuit breaker being open
20    Skipped,
21}
22
23/// Statistics for a portal sync operation.
24#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)]
25pub struct SyncStats {
26    pub unchanged: usize,
27    pub updated: usize,
28    pub created: usize,
29    pub failed: usize,
30    /// Number of datasets skipped due to circuit breaker being open.
31    #[serde(default)]
32    pub skipped: usize,
33}
34
35impl SyncStats {
36    /// Creates a new empty stats tracker.
37    pub fn new() -> Self {
38        Self::default()
39    }
40
41    /// Records an outcome, incrementing the appropriate counter.
42    pub fn record(&mut self, outcome: SyncOutcome) {
43        match outcome {
44            SyncOutcome::Unchanged => self.unchanged += 1,
45            SyncOutcome::Updated => self.updated += 1,
46            SyncOutcome::Created => self.created += 1,
47            SyncOutcome::Failed => self.failed += 1,
48            SyncOutcome::Skipped => self.skipped += 1,
49        }
50    }
51
52    /// Returns the total number of processed datasets.
53    pub fn total(&self) -> usize {
54        self.unchanged + self.updated + self.created + self.failed + self.skipped
55    }
56
57    /// Returns the number of successfully processed datasets.
58    pub fn successful(&self) -> usize {
59        self.unchanged + self.updated + self.created
60    }
61}
62
63// =============================================================================
64// Sync Status and Result Types (for cancellation support)
65// =============================================================================
66
67/// Overall status of a sync operation.
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum SyncStatus {
70    /// Sync completed successfully with all datasets processed.
71    Completed,
72    /// Sync was cancelled but partial progress was saved.
73    Cancelled,
74}
75
76impl SyncStatus {
77    /// Returns the string representation for database storage.
78    pub fn as_str(&self) -> &'static str {
79        match self {
80            SyncStatus::Completed => "completed",
81            SyncStatus::Cancelled => "cancelled",
82        }
83    }
84
85    /// Returns true if the sync was completed successfully.
86    pub fn is_completed(&self) -> bool {
87        matches!(self, SyncStatus::Completed)
88    }
89
90    /// Returns true if the sync was cancelled.
91    pub fn is_cancelled(&self) -> bool {
92        matches!(self, SyncStatus::Cancelled)
93    }
94}
95
96/// Result of a sync operation including status and statistics.
97#[derive(Debug, Clone)]
98pub struct SyncResult {
99    /// The final status of the sync operation.
100    pub status: SyncStatus,
101    /// Statistics about processed datasets.
102    pub stats: SyncStats,
103    /// Optional message providing context (e.g., cancellation reason).
104    pub message: Option<String>,
105}
106
107impl SyncResult {
108    /// Creates a completed sync result.
109    pub fn completed(stats: SyncStats) -> Self {
110        Self {
111            status: SyncStatus::Completed,
112            stats,
113            message: None,
114        }
115    }
116
117    /// Creates a cancelled sync result with partial statistics.
118    pub fn cancelled(stats: SyncStats) -> Self {
119        Self {
120            status: SyncStatus::Cancelled,
121            stats,
122            message: Some("Operation cancelled - partial progress saved".to_string()),
123        }
124    }
125
126    /// Returns true if the sync was fully successful.
127    pub fn is_completed(&self) -> bool {
128        self.status.is_completed()
129    }
130
131    /// Returns true if the sync was cancelled.
132    pub fn is_cancelled(&self) -> bool {
133        self.status.is_cancelled()
134    }
135}
136
137/// Thread-safe wrapper for [`SyncStats`] using atomic counters.
138///
139/// This is useful for concurrent harvesting where multiple tasks
140/// update statistics simultaneously without requiring a mutex.
141///
142/// # Example
143///
144/// ```
145/// use ceres_core::sync::{AtomicSyncStats, SyncOutcome};
146///
147/// let stats = AtomicSyncStats::new();
148///
149/// // Can be safely called from multiple threads
150/// stats.record(SyncOutcome::Created);
151/// stats.record(SyncOutcome::Unchanged);
152///
153/// let snapshot = stats.to_stats();
154/// assert_eq!(snapshot.created, 1);
155/// assert_eq!(snapshot.unchanged, 1);
156/// ```
157pub struct AtomicSyncStats {
158    unchanged: AtomicUsize,
159    updated: AtomicUsize,
160    created: AtomicUsize,
161    failed: AtomicUsize,
162    skipped: AtomicUsize,
163}
164
165impl AtomicSyncStats {
166    /// Creates a new zeroed stats tracker.
167    pub fn new() -> Self {
168        Self {
169            unchanged: AtomicUsize::new(0),
170            updated: AtomicUsize::new(0),
171            created: AtomicUsize::new(0),
172            failed: AtomicUsize::new(0),
173            skipped: AtomicUsize::new(0),
174        }
175    }
176
177    /// Records an outcome, incrementing the appropriate counter atomically.
178    pub fn record(&self, outcome: SyncOutcome) {
179        match outcome {
180            SyncOutcome::Unchanged => self.unchanged.fetch_add(1, Ordering::Relaxed),
181            SyncOutcome::Updated => self.updated.fetch_add(1, Ordering::Relaxed),
182            SyncOutcome::Created => self.created.fetch_add(1, Ordering::Relaxed),
183            SyncOutcome::Failed => self.failed.fetch_add(1, Ordering::Relaxed),
184            SyncOutcome::Skipped => self.skipped.fetch_add(1, Ordering::Relaxed),
185        };
186    }
187
188    /// Converts atomic counters to a snapshot [`SyncStats`].
189    pub fn to_stats(&self) -> SyncStats {
190        SyncStats {
191            unchanged: self.unchanged.load(Ordering::Relaxed),
192            updated: self.updated.load(Ordering::Relaxed),
193            created: self.created.load(Ordering::Relaxed),
194            failed: self.failed.load(Ordering::Relaxed),
195            skipped: self.skipped.load(Ordering::Relaxed),
196        }
197    }
198}
199
200impl Default for AtomicSyncStats {
201    fn default() -> Self {
202        Self::new()
203    }
204}
205
206/// Result of delta detection for a dataset.
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub struct ReprocessingDecision {
209    /// Whether embedding needs to be regenerated
210    pub needs_embedding: bool,
211    /// The outcome classification for this dataset
212    pub outcome: SyncOutcome,
213    /// Human-readable reason for the decision
214    pub reason: &'static str,
215}
216
217impl ReprocessingDecision {
218    /// Returns true if this is a legacy record update (existing record without hash).
219    pub fn is_legacy(&self) -> bool {
220        self.reason == "legacy record without hash"
221    }
222}
223
224/// Determines if a dataset needs reprocessing based on content hash comparison.
225///
226/// # Arguments
227/// * `existing_hash` - The stored content hash for this dataset (None if new dataset)
228/// * `new_hash` - The computed content hash from the portal data
229///
230/// # Returns
231/// A `ReprocessingDecision` indicating whether embedding regeneration is needed
232/// and the classification of this sync operation.
233pub fn needs_reprocessing(
234    existing_hash: Option<&Option<String>>,
235    new_hash: &str,
236) -> ReprocessingDecision {
237    match existing_hash {
238        Some(Some(hash)) if hash == new_hash => {
239            // Hash matches - content unchanged
240            ReprocessingDecision {
241                needs_embedding: false,
242                outcome: SyncOutcome::Unchanged,
243                reason: "content hash matches",
244            }
245        }
246        Some(Some(_)) => {
247            // Hash exists but differs - content updated
248            ReprocessingDecision {
249                needs_embedding: true,
250                outcome: SyncOutcome::Updated,
251                reason: "content hash changed",
252            }
253        }
254        Some(None) => {
255            // Exists but no hash (legacy data) - treat as update
256            ReprocessingDecision {
257                needs_embedding: true,
258                outcome: SyncOutcome::Updated,
259                reason: "legacy record without hash",
260            }
261        }
262        None => {
263            // Not in existing data - new dataset
264            ReprocessingDecision {
265                needs_embedding: true,
266                outcome: SyncOutcome::Created,
267                reason: "new dataset",
268            }
269        }
270    }
271}
272
273// =============================================================================
274// Batch Harvest Types
275// =============================================================================
276
277/// Result of harvesting a single portal in batch mode.
278#[derive(Debug, Clone)]
279pub struct PortalHarvestResult {
280    /// Portal name identifier.
281    pub portal_name: String,
282    /// Portal URL.
283    pub portal_url: String,
284    /// Sync statistics for this portal.
285    pub stats: SyncStats,
286    /// Error message if harvest failed, None if successful.
287    pub error: Option<String>,
288}
289
290impl PortalHarvestResult {
291    /// Creates a successful harvest result.
292    pub fn success(name: String, url: String, stats: SyncStats) -> Self {
293        Self {
294            portal_name: name,
295            portal_url: url,
296            stats,
297            error: None,
298        }
299    }
300
301    /// Creates a failed harvest result.
302    pub fn failure(name: String, url: String, error: String) -> Self {
303        Self {
304            portal_name: name,
305            portal_url: url,
306            stats: SyncStats::default(),
307            error: Some(error),
308        }
309    }
310
311    /// Returns true if the harvest was successful.
312    pub fn is_success(&self) -> bool {
313        self.error.is_none()
314    }
315}
316
317/// Aggregated results from batch harvesting multiple portals.
318#[derive(Debug, Clone, Default)]
319pub struct BatchHarvestSummary {
320    /// Results for each portal.
321    pub results: Vec<PortalHarvestResult>,
322}
323
324impl BatchHarvestSummary {
325    /// Creates a new empty summary.
326    pub fn new() -> Self {
327        Self::default()
328    }
329
330    /// Adds a portal harvest result.
331    pub fn add(&mut self, result: PortalHarvestResult) {
332        self.results.push(result);
333    }
334
335    /// Returns the count of successful harvests.
336    pub fn successful_count(&self) -> usize {
337        self.results.iter().filter(|r| r.is_success()).count()
338    }
339
340    /// Returns the count of failed harvests.
341    pub fn failed_count(&self) -> usize {
342        self.results.iter().filter(|r| !r.is_success()).count()
343    }
344
345    /// Returns the total number of datasets across all successful portals.
346    pub fn total_datasets(&self) -> usize {
347        self.results.iter().map(|r| r.stats.total()).sum()
348    }
349
350    /// Returns the total number of portals processed.
351    pub fn total_portals(&self) -> usize {
352        self.results.len()
353    }
354}
355
356#[cfg(test)]
357mod tests {
358    use super::*;
359
360    #[test]
361    fn test_sync_stats_default() {
362        let stats = SyncStats::new();
363        assert_eq!(stats.unchanged, 0);
364        assert_eq!(stats.updated, 0);
365        assert_eq!(stats.created, 0);
366        assert_eq!(stats.failed, 0);
367        assert_eq!(stats.skipped, 0);
368    }
369
370    #[test]
371    fn test_sync_stats_record() {
372        let mut stats = SyncStats::new();
373        stats.record(SyncOutcome::Unchanged);
374        stats.record(SyncOutcome::Updated);
375        stats.record(SyncOutcome::Created);
376        stats.record(SyncOutcome::Failed);
377        stats.record(SyncOutcome::Skipped);
378
379        assert_eq!(stats.unchanged, 1);
380        assert_eq!(stats.updated, 1);
381        assert_eq!(stats.created, 1);
382        assert_eq!(stats.failed, 1);
383        assert_eq!(stats.skipped, 1);
384    }
385
386    #[test]
387    fn test_sync_stats_total() {
388        let mut stats = SyncStats::new();
389        stats.unchanged = 10;
390        stats.updated = 5;
391        stats.created = 3;
392        stats.failed = 2;
393        stats.skipped = 1;
394
395        assert_eq!(stats.total(), 21);
396    }
397
398    #[test]
399    fn test_sync_stats_successful() {
400        let mut stats = SyncStats::new();
401        stats.unchanged = 10;
402        stats.updated = 5;
403        stats.created = 3;
404        stats.failed = 2;
405        stats.skipped = 1;
406
407        // successful does not include failed or skipped
408        assert_eq!(stats.successful(), 18);
409    }
410
411    #[test]
412    fn test_needs_reprocessing_unchanged() {
413        let hash = "abc123".to_string();
414        let existing = Some(Some(hash.clone()));
415        let decision = needs_reprocessing(existing.as_ref(), &hash);
416
417        assert!(!decision.needs_embedding);
418        assert_eq!(decision.outcome, SyncOutcome::Unchanged);
419        assert_eq!(decision.reason, "content hash matches");
420    }
421
422    #[test]
423    fn test_needs_reprocessing_updated() {
424        let old_hash = "abc123".to_string();
425        let new_hash = "def456";
426        let existing = Some(Some(old_hash));
427        let decision = needs_reprocessing(existing.as_ref(), new_hash);
428
429        assert!(decision.needs_embedding);
430        assert_eq!(decision.outcome, SyncOutcome::Updated);
431        assert_eq!(decision.reason, "content hash changed");
432    }
433
434    #[test]
435    fn test_needs_reprocessing_legacy() {
436        let existing: Option<Option<String>> = Some(None);
437        let decision = needs_reprocessing(existing.as_ref(), "new_hash");
438
439        assert!(decision.needs_embedding);
440        assert_eq!(decision.outcome, SyncOutcome::Updated);
441        assert_eq!(decision.reason, "legacy record without hash");
442    }
443
444    #[test]
445    fn test_needs_reprocessing_new() {
446        let decision = needs_reprocessing(None, "new_hash");
447
448        assert!(decision.needs_embedding);
449        assert_eq!(decision.outcome, SyncOutcome::Created);
450        assert_eq!(decision.reason, "new dataset");
451    }
452
453    #[test]
454    fn test_is_legacy_true() {
455        let existing: Option<Option<String>> = Some(None);
456        let decision = needs_reprocessing(existing.as_ref(), "new_hash");
457
458        assert!(decision.is_legacy());
459    }
460
461    #[test]
462    fn test_is_legacy_false() {
463        let decision = needs_reprocessing(None, "new_hash");
464        assert!(!decision.is_legacy());
465
466        let hash = "abc123".to_string();
467        let existing = Some(Some(hash.clone()));
468        let decision = needs_reprocessing(existing.as_ref(), &hash);
469        assert!(!decision.is_legacy());
470    }
471
472    // =========================================================================
473    // PortalHarvestResult tests
474    // =========================================================================
475
476    #[test]
477    fn test_portal_harvest_result_success() {
478        let stats = SyncStats {
479            unchanged: 5,
480            updated: 3,
481            created: 2,
482            failed: 0,
483            skipped: 0,
484        };
485        let result = PortalHarvestResult::success(
486            "test".to_string(),
487            "https://example.com".to_string(),
488            stats,
489        );
490        assert!(result.is_success());
491        assert!(result.error.is_none());
492        assert_eq!(result.stats.total(), 10);
493        assert_eq!(result.portal_name, "test");
494        assert_eq!(result.portal_url, "https://example.com");
495    }
496
497    #[test]
498    fn test_portal_harvest_result_failure() {
499        let result = PortalHarvestResult::failure(
500            "test".to_string(),
501            "https://example.com".to_string(),
502            "Connection timeout".to_string(),
503        );
504        assert!(!result.is_success());
505        assert_eq!(result.error, Some("Connection timeout".to_string()));
506        assert_eq!(result.stats.total(), 0);
507    }
508
509    // =========================================================================
510    // BatchHarvestSummary tests
511    // =========================================================================
512
513    #[test]
514    fn test_batch_harvest_summary_empty() {
515        let summary = BatchHarvestSummary::new();
516        assert_eq!(summary.successful_count(), 0);
517        assert_eq!(summary.failed_count(), 0);
518        assert_eq!(summary.total_datasets(), 0);
519        assert_eq!(summary.total_portals(), 0);
520    }
521
522    #[test]
523    fn test_batch_harvest_summary_mixed_results() {
524        let mut summary = BatchHarvestSummary::new();
525
526        let stats1 = SyncStats {
527            unchanged: 10,
528            updated: 5,
529            created: 3,
530            failed: 2,
531            skipped: 0,
532        };
533        summary.add(PortalHarvestResult::success(
534            "a".into(),
535            "https://a.com".into(),
536            stats1,
537        ));
538
539        summary.add(PortalHarvestResult::failure(
540            "b".into(),
541            "https://b.com".into(),
542            "error".into(),
543        ));
544
545        let stats2 = SyncStats {
546            unchanged: 20,
547            updated: 0,
548            created: 0,
549            failed: 0,
550            skipped: 0,
551        };
552        summary.add(PortalHarvestResult::success(
553            "c".into(),
554            "https://c.com".into(),
555            stats2,
556        ));
557
558        assert_eq!(summary.total_portals(), 3);
559        assert_eq!(summary.successful_count(), 2);
560        assert_eq!(summary.failed_count(), 1);
561        assert_eq!(summary.total_datasets(), 40); // 20 + 20 + 0 (failed portal has 0)
562    }
563
564    #[test]
565    fn test_batch_harvest_summary_all_successful() {
566        let mut summary = BatchHarvestSummary::new();
567
568        let stats = SyncStats {
569            unchanged: 5,
570            updated: 0,
571            created: 5,
572            failed: 0,
573            skipped: 0,
574        };
575        summary.add(PortalHarvestResult::success(
576            "portal1".into(),
577            "https://portal1.com".into(),
578            stats,
579        ));
580
581        assert_eq!(summary.successful_count(), 1);
582        assert_eq!(summary.failed_count(), 0);
583        assert_eq!(summary.total_datasets(), 10);
584    }
585
586    #[test]
587    fn test_batch_harvest_summary_all_failed() {
588        let mut summary = BatchHarvestSummary::new();
589
590        summary.add(PortalHarvestResult::failure(
591            "portal1".into(),
592            "https://portal1.com".into(),
593            "error1".into(),
594        ));
595        summary.add(PortalHarvestResult::failure(
596            "portal2".into(),
597            "https://portal2.com".into(),
598            "error2".into(),
599        ));
600
601        assert_eq!(summary.successful_count(), 0);
602        assert_eq!(summary.failed_count(), 2);
603        assert_eq!(summary.total_datasets(), 0);
604        assert_eq!(summary.total_portals(), 2);
605    }
606
607    // =========================================================================
608    // AtomicSyncStats tests
609    // =========================================================================
610
611    #[test]
612    fn test_atomic_sync_stats_new() {
613        let stats = AtomicSyncStats::new();
614        let result = stats.to_stats();
615        assert_eq!(result.unchanged, 0);
616        assert_eq!(result.updated, 0);
617        assert_eq!(result.created, 0);
618        assert_eq!(result.failed, 0);
619        assert_eq!(result.skipped, 0);
620    }
621
622    #[test]
623    fn test_atomic_sync_stats_record() {
624        let stats = AtomicSyncStats::new();
625        stats.record(SyncOutcome::Unchanged);
626        stats.record(SyncOutcome::Updated);
627        stats.record(SyncOutcome::Created);
628        stats.record(SyncOutcome::Failed);
629        stats.record(SyncOutcome::Skipped);
630
631        let result = stats.to_stats();
632        assert_eq!(result.unchanged, 1);
633        assert_eq!(result.updated, 1);
634        assert_eq!(result.created, 1);
635        assert_eq!(result.failed, 1);
636        assert_eq!(result.skipped, 1);
637    }
638
639    #[test]
640    fn test_atomic_sync_stats_multiple_records() {
641        let stats = AtomicSyncStats::new();
642        for _ in 0..10 {
643            stats.record(SyncOutcome::Unchanged);
644        }
645        for _ in 0..5 {
646            stats.record(SyncOutcome::Updated);
647        }
648
649        let result = stats.to_stats();
650        assert_eq!(result.unchanged, 10);
651        assert_eq!(result.updated, 5);
652        assert_eq!(result.total(), 15);
653        assert_eq!(result.successful(), 15);
654    }
655
656    #[test]
657    fn test_atomic_sync_stats_default() {
658        let stats = AtomicSyncStats::default();
659        let result = stats.to_stats();
660        assert_eq!(result.total(), 0);
661    }
662
663    // =========================================================================
664    // SyncStatus tests
665    // =========================================================================
666
667    #[test]
668    fn test_sync_status_completed() {
669        let status = SyncStatus::Completed;
670        assert_eq!(status.as_str(), "completed");
671        assert!(status.is_completed());
672        assert!(!status.is_cancelled());
673    }
674
675    #[test]
676    fn test_sync_status_cancelled() {
677        let status = SyncStatus::Cancelled;
678        assert_eq!(status.as_str(), "cancelled");
679        assert!(!status.is_completed());
680        assert!(status.is_cancelled());
681    }
682
683    // =========================================================================
684    // SyncResult tests
685    // =========================================================================
686
687    #[test]
688    fn test_sync_result_completed() {
689        let stats = SyncStats {
690            unchanged: 10,
691            updated: 5,
692            created: 3,
693            failed: 0,
694            skipped: 0,
695        };
696        let result = SyncResult::completed(stats);
697        assert!(result.is_completed());
698        assert!(!result.is_cancelled());
699        assert!(result.message.is_none());
700        assert_eq!(result.stats.total(), 18);
701    }
702
703    #[test]
704    fn test_sync_result_cancelled() {
705        let stats = SyncStats {
706            unchanged: 5,
707            updated: 2,
708            created: 1,
709            failed: 0,
710            skipped: 0,
711        };
712        let result = SyncResult::cancelled(stats);
713        assert!(!result.is_completed());
714        assert!(result.is_cancelled());
715        assert!(result.message.is_some());
716        assert!(result.message.unwrap().contains("cancelled"));
717        assert_eq!(result.stats.total(), 8);
718    }
719}