1use std::sync::atomic::{AtomicUsize, Ordering};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum SyncOutcome {
11 Unchanged,
13 Updated,
15 Created,
17 Failed,
19 Skipped,
21}
22
23#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)]
25pub struct SyncStats {
26 pub unchanged: usize,
27 pub updated: usize,
28 pub created: usize,
29 pub failed: usize,
30 #[serde(default)]
32 pub skipped: usize,
33}
34
35impl SyncStats {
36 pub fn new() -> Self {
38 Self::default()
39 }
40
41 pub fn record(&mut self, outcome: SyncOutcome) {
43 match outcome {
44 SyncOutcome::Unchanged => self.unchanged += 1,
45 SyncOutcome::Updated => self.updated += 1,
46 SyncOutcome::Created => self.created += 1,
47 SyncOutcome::Failed => self.failed += 1,
48 SyncOutcome::Skipped => self.skipped += 1,
49 }
50 }
51
52 pub fn total(&self) -> usize {
54 self.unchanged + self.updated + self.created + self.failed + self.skipped
55 }
56
57 pub fn successful(&self) -> usize {
59 self.unchanged + self.updated + self.created
60 }
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum SyncStatus {
70 Completed,
72 Cancelled,
74}
75
76impl SyncStatus {
77 pub fn as_str(&self) -> &'static str {
79 match self {
80 SyncStatus::Completed => "completed",
81 SyncStatus::Cancelled => "cancelled",
82 }
83 }
84
85 pub fn is_completed(&self) -> bool {
87 matches!(self, SyncStatus::Completed)
88 }
89
90 pub fn is_cancelled(&self) -> bool {
92 matches!(self, SyncStatus::Cancelled)
93 }
94}
95
96#[derive(Debug, Clone)]
98pub struct SyncResult {
99 pub status: SyncStatus,
101 pub stats: SyncStats,
103 pub message: Option<String>,
105}
106
107impl SyncResult {
108 pub fn completed(stats: SyncStats) -> Self {
110 Self {
111 status: SyncStatus::Completed,
112 stats,
113 message: None,
114 }
115 }
116
117 pub fn cancelled(stats: SyncStats) -> Self {
119 Self {
120 status: SyncStatus::Cancelled,
121 stats,
122 message: Some("Operation cancelled - partial progress saved".to_string()),
123 }
124 }
125
126 pub fn is_completed(&self) -> bool {
128 self.status.is_completed()
129 }
130
131 pub fn is_cancelled(&self) -> bool {
133 self.status.is_cancelled()
134 }
135}
136
137pub struct AtomicSyncStats {
158 unchanged: AtomicUsize,
159 updated: AtomicUsize,
160 created: AtomicUsize,
161 failed: AtomicUsize,
162 skipped: AtomicUsize,
163}
164
165impl AtomicSyncStats {
166 pub fn new() -> Self {
168 Self {
169 unchanged: AtomicUsize::new(0),
170 updated: AtomicUsize::new(0),
171 created: AtomicUsize::new(0),
172 failed: AtomicUsize::new(0),
173 skipped: AtomicUsize::new(0),
174 }
175 }
176
177 pub fn record(&self, outcome: SyncOutcome) {
179 match outcome {
180 SyncOutcome::Unchanged => self.unchanged.fetch_add(1, Ordering::Relaxed),
181 SyncOutcome::Updated => self.updated.fetch_add(1, Ordering::Relaxed),
182 SyncOutcome::Created => self.created.fetch_add(1, Ordering::Relaxed),
183 SyncOutcome::Failed => self.failed.fetch_add(1, Ordering::Relaxed),
184 SyncOutcome::Skipped => self.skipped.fetch_add(1, Ordering::Relaxed),
185 };
186 }
187
188 pub fn to_stats(&self) -> SyncStats {
190 SyncStats {
191 unchanged: self.unchanged.load(Ordering::Relaxed),
192 updated: self.updated.load(Ordering::Relaxed),
193 created: self.created.load(Ordering::Relaxed),
194 failed: self.failed.load(Ordering::Relaxed),
195 skipped: self.skipped.load(Ordering::Relaxed),
196 }
197 }
198}
199
200impl Default for AtomicSyncStats {
201 fn default() -> Self {
202 Self::new()
203 }
204}
205
206#[derive(Debug, Clone, PartialEq, Eq)]
208pub struct ReprocessingDecision {
209 pub outcome: SyncOutcome,
211 pub reason: &'static str,
213}
214
215impl ReprocessingDecision {
216 pub fn is_legacy(&self) -> bool {
218 self.reason == "legacy record without hash"
219 }
220}
221
222pub trait DeltaDetector: Send + Sync + Clone {
228 fn needs_reprocessing(
240 &self,
241 existing_hash: Option<&Option<String>>,
242 new_hash: &str,
243 ) -> ReprocessingDecision;
244}
245
246#[derive(Debug, Clone, Default)]
251pub struct ContentHashDetector;
252
253impl DeltaDetector for ContentHashDetector {
254 fn needs_reprocessing(
255 &self,
256 existing_hash: Option<&Option<String>>,
257 new_hash: &str,
258 ) -> ReprocessingDecision {
259 match existing_hash {
260 Some(Some(hash)) if hash == new_hash => {
261 ReprocessingDecision {
263 outcome: SyncOutcome::Unchanged,
264 reason: "content hash matches",
265 }
266 }
267 Some(Some(_)) => {
268 ReprocessingDecision {
270 outcome: SyncOutcome::Updated,
271 reason: "content hash changed",
272 }
273 }
274 Some(None) => {
275 ReprocessingDecision {
277 outcome: SyncOutcome::Updated,
278 reason: "legacy record without hash",
279 }
280 }
281 None => {
282 ReprocessingDecision {
284 outcome: SyncOutcome::Created,
285 reason: "new dataset",
286 }
287 }
288 }
289 }
290}
291
292#[derive(Debug, Clone, Default)]
297pub struct AlwaysReprocessDetector;
298
299impl DeltaDetector for AlwaysReprocessDetector {
300 fn needs_reprocessing(
301 &self,
302 existing_hash: Option<&Option<String>>,
303 _new_hash: &str,
304 ) -> ReprocessingDecision {
305 match existing_hash {
306 Some(_) => ReprocessingDecision {
307 outcome: SyncOutcome::Updated,
308 reason: "always reprocess strategy",
309 },
310 None => ReprocessingDecision {
311 outcome: SyncOutcome::Created,
312 reason: "new dataset",
313 },
314 }
315 }
316}
317
318pub fn needs_reprocessing(
330 existing_hash: Option<&Option<String>>,
331 new_hash: &str,
332) -> ReprocessingDecision {
333 ContentHashDetector.needs_reprocessing(existing_hash, new_hash)
334}
335
336#[derive(Debug, Clone)]
342pub struct PortalHarvestResult {
343 pub portal_name: String,
345 pub portal_url: String,
347 pub stats: SyncStats,
349 pub error: Option<String>,
351}
352
353impl PortalHarvestResult {
354 pub fn success(name: String, url: String, stats: SyncStats) -> Self {
356 Self {
357 portal_name: name,
358 portal_url: url,
359 stats,
360 error: None,
361 }
362 }
363
364 pub fn failure(name: String, url: String, error: String) -> Self {
366 Self {
367 portal_name: name,
368 portal_url: url,
369 stats: SyncStats::default(),
370 error: Some(error),
371 }
372 }
373
374 pub fn is_success(&self) -> bool {
376 self.error.is_none()
377 }
378}
379
380#[derive(Debug, Clone, Default)]
382pub struct BatchHarvestSummary {
383 pub results: Vec<PortalHarvestResult>,
385}
386
387impl BatchHarvestSummary {
388 pub fn new() -> Self {
390 Self::default()
391 }
392
393 pub fn add(&mut self, result: PortalHarvestResult) {
395 self.results.push(result);
396 }
397
398 pub fn successful_count(&self) -> usize {
400 self.results.iter().filter(|r| r.is_success()).count()
401 }
402
403 pub fn failed_count(&self) -> usize {
405 self.results.iter().filter(|r| !r.is_success()).count()
406 }
407
408 pub fn total_datasets(&self) -> usize {
410 self.results.iter().map(|r| r.stats.total()).sum()
411 }
412
413 pub fn total_portals(&self) -> usize {
415 self.results.len()
416 }
417}
418
419#[cfg(test)]
420mod tests {
421 use super::*;
422
423 #[test]
424 fn test_sync_stats_default() {
425 let stats = SyncStats::new();
426 assert_eq!(stats.unchanged, 0);
427 assert_eq!(stats.updated, 0);
428 assert_eq!(stats.created, 0);
429 assert_eq!(stats.failed, 0);
430 assert_eq!(stats.skipped, 0);
431 }
432
433 #[test]
434 fn test_sync_stats_record() {
435 let mut stats = SyncStats::new();
436 stats.record(SyncOutcome::Unchanged);
437 stats.record(SyncOutcome::Updated);
438 stats.record(SyncOutcome::Created);
439 stats.record(SyncOutcome::Failed);
440 stats.record(SyncOutcome::Skipped);
441
442 assert_eq!(stats.unchanged, 1);
443 assert_eq!(stats.updated, 1);
444 assert_eq!(stats.created, 1);
445 assert_eq!(stats.failed, 1);
446 assert_eq!(stats.skipped, 1);
447 }
448
449 #[test]
450 fn test_sync_stats_total() {
451 let mut stats = SyncStats::new();
452 stats.unchanged = 10;
453 stats.updated = 5;
454 stats.created = 3;
455 stats.failed = 2;
456 stats.skipped = 1;
457
458 assert_eq!(stats.total(), 21);
459 }
460
461 #[test]
462 fn test_sync_stats_successful() {
463 let mut stats = SyncStats::new();
464 stats.unchanged = 10;
465 stats.updated = 5;
466 stats.created = 3;
467 stats.failed = 2;
468 stats.skipped = 1;
469
470 assert_eq!(stats.successful(), 18);
472 }
473
474 #[test]
475 fn test_needs_reprocessing_unchanged() {
476 let hash = "abc123".to_string();
477 let existing = Some(Some(hash.clone()));
478 let decision = needs_reprocessing(existing.as_ref(), &hash);
479
480 assert_eq!(decision.outcome, SyncOutcome::Unchanged);
481 assert_eq!(decision.reason, "content hash matches");
482 }
483
484 #[test]
485 fn test_needs_reprocessing_updated() {
486 let old_hash = "abc123".to_string();
487 let new_hash = "def456";
488 let existing = Some(Some(old_hash));
489 let decision = needs_reprocessing(existing.as_ref(), new_hash);
490
491 assert_eq!(decision.outcome, SyncOutcome::Updated);
492 assert_eq!(decision.reason, "content hash changed");
493 }
494
495 #[test]
496 fn test_needs_reprocessing_legacy() {
497 let existing: Option<Option<String>> = Some(None);
498 let decision = needs_reprocessing(existing.as_ref(), "new_hash");
499
500 assert_eq!(decision.outcome, SyncOutcome::Updated);
501 assert_eq!(decision.reason, "legacy record without hash");
502 }
503
504 #[test]
505 fn test_needs_reprocessing_new() {
506 let decision = needs_reprocessing(None, "new_hash");
507
508 assert_eq!(decision.outcome, SyncOutcome::Created);
509 assert_eq!(decision.reason, "new dataset");
510 }
511
512 #[test]
513 fn test_is_legacy_true() {
514 let existing: Option<Option<String>> = Some(None);
515 let decision = needs_reprocessing(existing.as_ref(), "new_hash");
516
517 assert!(decision.is_legacy());
518 }
519
520 #[test]
521 fn test_is_legacy_false() {
522 let decision = needs_reprocessing(None, "new_hash");
523 assert!(!decision.is_legacy());
524
525 let hash = "abc123".to_string();
526 let existing = Some(Some(hash.clone()));
527 let decision = needs_reprocessing(existing.as_ref(), &hash);
528 assert!(!decision.is_legacy());
529 }
530
531 #[test]
536 fn test_portal_harvest_result_success() {
537 let stats = SyncStats {
538 unchanged: 5,
539 updated: 3,
540 created: 2,
541 failed: 0,
542 skipped: 0,
543 };
544 let result = PortalHarvestResult::success(
545 "test".to_string(),
546 "https://example.com".to_string(),
547 stats,
548 );
549 assert!(result.is_success());
550 assert!(result.error.is_none());
551 assert_eq!(result.stats.total(), 10);
552 assert_eq!(result.portal_name, "test");
553 assert_eq!(result.portal_url, "https://example.com");
554 }
555
556 #[test]
557 fn test_portal_harvest_result_failure() {
558 let result = PortalHarvestResult::failure(
559 "test".to_string(),
560 "https://example.com".to_string(),
561 "Connection timeout".to_string(),
562 );
563 assert!(!result.is_success());
564 assert_eq!(result.error, Some("Connection timeout".to_string()));
565 assert_eq!(result.stats.total(), 0);
566 }
567
568 #[test]
573 fn test_batch_harvest_summary_empty() {
574 let summary = BatchHarvestSummary::new();
575 assert_eq!(summary.successful_count(), 0);
576 assert_eq!(summary.failed_count(), 0);
577 assert_eq!(summary.total_datasets(), 0);
578 assert_eq!(summary.total_portals(), 0);
579 }
580
581 #[test]
582 fn test_batch_harvest_summary_mixed_results() {
583 let mut summary = BatchHarvestSummary::new();
584
585 let stats1 = SyncStats {
586 unchanged: 10,
587 updated: 5,
588 created: 3,
589 failed: 2,
590 skipped: 0,
591 };
592 summary.add(PortalHarvestResult::success(
593 "a".into(),
594 "https://a.com".into(),
595 stats1,
596 ));
597
598 summary.add(PortalHarvestResult::failure(
599 "b".into(),
600 "https://b.com".into(),
601 "error".into(),
602 ));
603
604 let stats2 = SyncStats {
605 unchanged: 20,
606 updated: 0,
607 created: 0,
608 failed: 0,
609 skipped: 0,
610 };
611 summary.add(PortalHarvestResult::success(
612 "c".into(),
613 "https://c.com".into(),
614 stats2,
615 ));
616
617 assert_eq!(summary.total_portals(), 3);
618 assert_eq!(summary.successful_count(), 2);
619 assert_eq!(summary.failed_count(), 1);
620 assert_eq!(summary.total_datasets(), 40); }
622
623 #[test]
624 fn test_batch_harvest_summary_all_successful() {
625 let mut summary = BatchHarvestSummary::new();
626
627 let stats = SyncStats {
628 unchanged: 5,
629 updated: 0,
630 created: 5,
631 failed: 0,
632 skipped: 0,
633 };
634 summary.add(PortalHarvestResult::success(
635 "portal1".into(),
636 "https://portal1.com".into(),
637 stats,
638 ));
639
640 assert_eq!(summary.successful_count(), 1);
641 assert_eq!(summary.failed_count(), 0);
642 assert_eq!(summary.total_datasets(), 10);
643 }
644
645 #[test]
646 fn test_batch_harvest_summary_all_failed() {
647 let mut summary = BatchHarvestSummary::new();
648
649 summary.add(PortalHarvestResult::failure(
650 "portal1".into(),
651 "https://portal1.com".into(),
652 "error1".into(),
653 ));
654 summary.add(PortalHarvestResult::failure(
655 "portal2".into(),
656 "https://portal2.com".into(),
657 "error2".into(),
658 ));
659
660 assert_eq!(summary.successful_count(), 0);
661 assert_eq!(summary.failed_count(), 2);
662 assert_eq!(summary.total_datasets(), 0);
663 assert_eq!(summary.total_portals(), 2);
664 }
665
666 #[test]
671 fn test_atomic_sync_stats_new() {
672 let stats = AtomicSyncStats::new();
673 let result = stats.to_stats();
674 assert_eq!(result.unchanged, 0);
675 assert_eq!(result.updated, 0);
676 assert_eq!(result.created, 0);
677 assert_eq!(result.failed, 0);
678 assert_eq!(result.skipped, 0);
679 }
680
681 #[test]
682 fn test_atomic_sync_stats_record() {
683 let stats = AtomicSyncStats::new();
684 stats.record(SyncOutcome::Unchanged);
685 stats.record(SyncOutcome::Updated);
686 stats.record(SyncOutcome::Created);
687 stats.record(SyncOutcome::Failed);
688 stats.record(SyncOutcome::Skipped);
689
690 let result = stats.to_stats();
691 assert_eq!(result.unchanged, 1);
692 assert_eq!(result.updated, 1);
693 assert_eq!(result.created, 1);
694 assert_eq!(result.failed, 1);
695 assert_eq!(result.skipped, 1);
696 }
697
698 #[test]
699 fn test_atomic_sync_stats_multiple_records() {
700 let stats = AtomicSyncStats::new();
701 for _ in 0..10 {
702 stats.record(SyncOutcome::Unchanged);
703 }
704 for _ in 0..5 {
705 stats.record(SyncOutcome::Updated);
706 }
707
708 let result = stats.to_stats();
709 assert_eq!(result.unchanged, 10);
710 assert_eq!(result.updated, 5);
711 assert_eq!(result.total(), 15);
712 assert_eq!(result.successful(), 15);
713 }
714
715 #[test]
716 fn test_atomic_sync_stats_default() {
717 let stats = AtomicSyncStats::default();
718 let result = stats.to_stats();
719 assert_eq!(result.total(), 0);
720 }
721
722 #[test]
727 fn test_sync_status_completed() {
728 let status = SyncStatus::Completed;
729 assert_eq!(status.as_str(), "completed");
730 assert!(status.is_completed());
731 assert!(!status.is_cancelled());
732 }
733
734 #[test]
735 fn test_sync_status_cancelled() {
736 let status = SyncStatus::Cancelled;
737 assert_eq!(status.as_str(), "cancelled");
738 assert!(!status.is_completed());
739 assert!(status.is_cancelled());
740 }
741
742 #[test]
747 fn test_sync_result_completed() {
748 let stats = SyncStats {
749 unchanged: 10,
750 updated: 5,
751 created: 3,
752 failed: 0,
753 skipped: 0,
754 };
755 let result = SyncResult::completed(stats);
756 assert!(result.is_completed());
757 assert!(!result.is_cancelled());
758 assert!(result.message.is_none());
759 assert_eq!(result.stats.total(), 18);
760 }
761
762 #[test]
763 fn test_sync_result_cancelled() {
764 let stats = SyncStats {
765 unchanged: 5,
766 updated: 2,
767 created: 1,
768 failed: 0,
769 skipped: 0,
770 };
771 let result = SyncResult::cancelled(stats);
772 assert!(!result.is_completed());
773 assert!(result.is_cancelled());
774 assert!(result.message.is_some());
775 assert!(result.message.unwrap().contains("cancelled"));
776 assert_eq!(result.stats.total(), 8);
777 }
778
779 #[test]
784 fn test_content_hash_detector_unchanged() {
785 let detector = ContentHashDetector;
786 let hash = "abc123".to_string();
787 let existing = Some(Some(hash.clone()));
788 let decision = detector.needs_reprocessing(existing.as_ref(), &hash);
789
790 assert_eq!(decision.outcome, SyncOutcome::Unchanged);
791 }
792
793 #[test]
794 fn test_content_hash_detector_updated() {
795 let detector = ContentHashDetector;
796 let existing = Some(Some("old_hash".to_string()));
797 let decision = detector.needs_reprocessing(existing.as_ref(), "new_hash");
798
799 assert_eq!(decision.outcome, SyncOutcome::Updated);
800 }
801
802 #[test]
803 fn test_content_hash_detector_new() {
804 let detector = ContentHashDetector;
805 let decision = detector.needs_reprocessing(None, "new_hash");
806
807 assert_eq!(decision.outcome, SyncOutcome::Created);
808 }
809
810 #[test]
811 fn test_always_reprocess_detector_existing() {
812 let detector = AlwaysReprocessDetector;
813 let hash = "abc123".to_string();
814 let existing = Some(Some(hash.clone()));
815 let decision = detector.needs_reprocessing(existing.as_ref(), &hash);
816
817 assert_eq!(decision.outcome, SyncOutcome::Updated);
818 assert_eq!(decision.reason, "always reprocess strategy");
819 }
820
821 #[test]
822 fn test_always_reprocess_detector_new() {
823 let detector = AlwaysReprocessDetector;
824 let decision = detector.needs_reprocessing(None, "new_hash");
825
826 assert_eq!(decision.outcome, SyncOutcome::Created);
827 assert_eq!(decision.reason, "new dataset");
828 }
829
830 #[test]
831 fn test_always_reprocess_detector_legacy() {
832 let detector = AlwaysReprocessDetector;
833 let existing: Option<Option<String>> = Some(None);
834 let decision = detector.needs_reprocessing(existing.as_ref(), "new_hash");
835
836 assert_eq!(decision.outcome, SyncOutcome::Updated);
837 assert_eq!(decision.reason, "always reprocess strategy");
838 }
839}