1use std::sync::atomic::{AtomicUsize, Ordering};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum SyncOutcome {
11 Unchanged,
13 Updated,
15 Created,
17 Failed,
19 Skipped,
21}
22
23#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)]
25pub struct SyncStats {
26 pub unchanged: usize,
27 pub updated: usize,
28 pub created: usize,
29 pub failed: usize,
30 #[serde(default)]
32 pub skipped: usize,
33}
34
35impl SyncStats {
36 pub fn new() -> Self {
38 Self::default()
39 }
40
41 pub fn record(&mut self, outcome: SyncOutcome) {
43 match outcome {
44 SyncOutcome::Unchanged => self.unchanged += 1,
45 SyncOutcome::Updated => self.updated += 1,
46 SyncOutcome::Created => self.created += 1,
47 SyncOutcome::Failed => self.failed += 1,
48 SyncOutcome::Skipped => self.skipped += 1,
49 }
50 }
51
52 pub fn total(&self) -> usize {
54 self.unchanged + self.updated + self.created + self.failed + self.skipped
55 }
56
57 pub fn successful(&self) -> usize {
59 self.unchanged + self.updated + self.created
60 }
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum SyncStatus {
70 Completed,
72 Cancelled,
74}
75
76impl SyncStatus {
77 pub fn as_str(&self) -> &'static str {
79 match self {
80 SyncStatus::Completed => "completed",
81 SyncStatus::Cancelled => "cancelled",
82 }
83 }
84
85 pub fn is_completed(&self) -> bool {
87 matches!(self, SyncStatus::Completed)
88 }
89
90 pub fn is_cancelled(&self) -> bool {
92 matches!(self, SyncStatus::Cancelled)
93 }
94}
95
96#[derive(Debug, Clone)]
98pub struct SyncResult {
99 pub status: SyncStatus,
101 pub stats: SyncStats,
103 pub message: Option<String>,
105}
106
107impl SyncResult {
108 pub fn completed(stats: SyncStats) -> Self {
110 Self {
111 status: SyncStatus::Completed,
112 stats,
113 message: None,
114 }
115 }
116
117 pub fn cancelled(stats: SyncStats) -> Self {
119 Self {
120 status: SyncStatus::Cancelled,
121 stats,
122 message: Some("Operation cancelled - partial progress saved".to_string()),
123 }
124 }
125
126 pub fn is_completed(&self) -> bool {
128 self.status.is_completed()
129 }
130
131 pub fn is_cancelled(&self) -> bool {
133 self.status.is_cancelled()
134 }
135}
136
137pub struct AtomicSyncStats {
158 unchanged: AtomicUsize,
159 updated: AtomicUsize,
160 created: AtomicUsize,
161 failed: AtomicUsize,
162 skipped: AtomicUsize,
163}
164
165impl AtomicSyncStats {
166 pub fn new() -> Self {
168 Self {
169 unchanged: AtomicUsize::new(0),
170 updated: AtomicUsize::new(0),
171 created: AtomicUsize::new(0),
172 failed: AtomicUsize::new(0),
173 skipped: AtomicUsize::new(0),
174 }
175 }
176
177 pub fn record(&self, outcome: SyncOutcome) {
179 match outcome {
180 SyncOutcome::Unchanged => self.unchanged.fetch_add(1, Ordering::Relaxed),
181 SyncOutcome::Updated => self.updated.fetch_add(1, Ordering::Relaxed),
182 SyncOutcome::Created => self.created.fetch_add(1, Ordering::Relaxed),
183 SyncOutcome::Failed => self.failed.fetch_add(1, Ordering::Relaxed),
184 SyncOutcome::Skipped => self.skipped.fetch_add(1, Ordering::Relaxed),
185 };
186 }
187
188 pub fn to_stats(&self) -> SyncStats {
190 SyncStats {
191 unchanged: self.unchanged.load(Ordering::Relaxed),
192 updated: self.updated.load(Ordering::Relaxed),
193 created: self.created.load(Ordering::Relaxed),
194 failed: self.failed.load(Ordering::Relaxed),
195 skipped: self.skipped.load(Ordering::Relaxed),
196 }
197 }
198}
199
200impl Default for AtomicSyncStats {
201 fn default() -> Self {
202 Self::new()
203 }
204}
205
206#[derive(Debug, Clone, PartialEq, Eq)]
208pub struct ReprocessingDecision {
209 pub needs_embedding: bool,
211 pub outcome: SyncOutcome,
213 pub reason: &'static str,
215}
216
217impl ReprocessingDecision {
218 pub fn is_legacy(&self) -> bool {
220 self.reason == "legacy record without hash"
221 }
222}
223
224pub trait DeltaDetector: Send + Sync + Clone {
230 fn needs_reprocessing(
242 &self,
243 existing_hash: Option<&Option<String>>,
244 new_hash: &str,
245 ) -> ReprocessingDecision;
246}
247
248#[derive(Debug, Clone, Default)]
253pub struct ContentHashDetector;
254
255impl DeltaDetector for ContentHashDetector {
256 fn needs_reprocessing(
257 &self,
258 existing_hash: Option<&Option<String>>,
259 new_hash: &str,
260 ) -> ReprocessingDecision {
261 match existing_hash {
262 Some(Some(hash)) if hash == new_hash => {
263 ReprocessingDecision {
265 needs_embedding: false,
266 outcome: SyncOutcome::Unchanged,
267 reason: "content hash matches",
268 }
269 }
270 Some(Some(_)) => {
271 ReprocessingDecision {
273 needs_embedding: true,
274 outcome: SyncOutcome::Updated,
275 reason: "content hash changed",
276 }
277 }
278 Some(None) => {
279 ReprocessingDecision {
281 needs_embedding: true,
282 outcome: SyncOutcome::Updated,
283 reason: "legacy record without hash",
284 }
285 }
286 None => {
287 ReprocessingDecision {
289 needs_embedding: true,
290 outcome: SyncOutcome::Created,
291 reason: "new dataset",
292 }
293 }
294 }
295 }
296}
297
298#[derive(Debug, Clone, Default)]
303pub struct AlwaysReprocessDetector;
304
305impl DeltaDetector for AlwaysReprocessDetector {
306 fn needs_reprocessing(
307 &self,
308 existing_hash: Option<&Option<String>>,
309 _new_hash: &str,
310 ) -> ReprocessingDecision {
311 match existing_hash {
312 Some(_) => ReprocessingDecision {
313 needs_embedding: true,
314 outcome: SyncOutcome::Updated,
315 reason: "always reprocess strategy",
316 },
317 None => ReprocessingDecision {
318 needs_embedding: true,
319 outcome: SyncOutcome::Created,
320 reason: "new dataset",
321 },
322 }
323 }
324}
325
326pub fn needs_reprocessing(
338 existing_hash: Option<&Option<String>>,
339 new_hash: &str,
340) -> ReprocessingDecision {
341 ContentHashDetector.needs_reprocessing(existing_hash, new_hash)
342}
343
344#[derive(Debug, Clone)]
350pub struct PortalHarvestResult {
351 pub portal_name: String,
353 pub portal_url: String,
355 pub stats: SyncStats,
357 pub error: Option<String>,
359}
360
361impl PortalHarvestResult {
362 pub fn success(name: String, url: String, stats: SyncStats) -> Self {
364 Self {
365 portal_name: name,
366 portal_url: url,
367 stats,
368 error: None,
369 }
370 }
371
372 pub fn failure(name: String, url: String, error: String) -> Self {
374 Self {
375 portal_name: name,
376 portal_url: url,
377 stats: SyncStats::default(),
378 error: Some(error),
379 }
380 }
381
382 pub fn is_success(&self) -> bool {
384 self.error.is_none()
385 }
386}
387
388#[derive(Debug, Clone, Default)]
390pub struct BatchHarvestSummary {
391 pub results: Vec<PortalHarvestResult>,
393}
394
395impl BatchHarvestSummary {
396 pub fn new() -> Self {
398 Self::default()
399 }
400
401 pub fn add(&mut self, result: PortalHarvestResult) {
403 self.results.push(result);
404 }
405
406 pub fn successful_count(&self) -> usize {
408 self.results.iter().filter(|r| r.is_success()).count()
409 }
410
411 pub fn failed_count(&self) -> usize {
413 self.results.iter().filter(|r| !r.is_success()).count()
414 }
415
416 pub fn total_datasets(&self) -> usize {
418 self.results.iter().map(|r| r.stats.total()).sum()
419 }
420
421 pub fn total_portals(&self) -> usize {
423 self.results.len()
424 }
425}
426
427#[cfg(test)]
428mod tests {
429 use super::*;
430
431 #[test]
432 fn test_sync_stats_default() {
433 let stats = SyncStats::new();
434 assert_eq!(stats.unchanged, 0);
435 assert_eq!(stats.updated, 0);
436 assert_eq!(stats.created, 0);
437 assert_eq!(stats.failed, 0);
438 assert_eq!(stats.skipped, 0);
439 }
440
441 #[test]
442 fn test_sync_stats_record() {
443 let mut stats = SyncStats::new();
444 stats.record(SyncOutcome::Unchanged);
445 stats.record(SyncOutcome::Updated);
446 stats.record(SyncOutcome::Created);
447 stats.record(SyncOutcome::Failed);
448 stats.record(SyncOutcome::Skipped);
449
450 assert_eq!(stats.unchanged, 1);
451 assert_eq!(stats.updated, 1);
452 assert_eq!(stats.created, 1);
453 assert_eq!(stats.failed, 1);
454 assert_eq!(stats.skipped, 1);
455 }
456
457 #[test]
458 fn test_sync_stats_total() {
459 let mut stats = SyncStats::new();
460 stats.unchanged = 10;
461 stats.updated = 5;
462 stats.created = 3;
463 stats.failed = 2;
464 stats.skipped = 1;
465
466 assert_eq!(stats.total(), 21);
467 }
468
469 #[test]
470 fn test_sync_stats_successful() {
471 let mut stats = SyncStats::new();
472 stats.unchanged = 10;
473 stats.updated = 5;
474 stats.created = 3;
475 stats.failed = 2;
476 stats.skipped = 1;
477
478 assert_eq!(stats.successful(), 18);
480 }
481
482 #[test]
483 fn test_needs_reprocessing_unchanged() {
484 let hash = "abc123".to_string();
485 let existing = Some(Some(hash.clone()));
486 let decision = needs_reprocessing(existing.as_ref(), &hash);
487
488 assert!(!decision.needs_embedding);
489 assert_eq!(decision.outcome, SyncOutcome::Unchanged);
490 assert_eq!(decision.reason, "content hash matches");
491 }
492
493 #[test]
494 fn test_needs_reprocessing_updated() {
495 let old_hash = "abc123".to_string();
496 let new_hash = "def456";
497 let existing = Some(Some(old_hash));
498 let decision = needs_reprocessing(existing.as_ref(), new_hash);
499
500 assert!(decision.needs_embedding);
501 assert_eq!(decision.outcome, SyncOutcome::Updated);
502 assert_eq!(decision.reason, "content hash changed");
503 }
504
505 #[test]
506 fn test_needs_reprocessing_legacy() {
507 let existing: Option<Option<String>> = Some(None);
508 let decision = needs_reprocessing(existing.as_ref(), "new_hash");
509
510 assert!(decision.needs_embedding);
511 assert_eq!(decision.outcome, SyncOutcome::Updated);
512 assert_eq!(decision.reason, "legacy record without hash");
513 }
514
515 #[test]
516 fn test_needs_reprocessing_new() {
517 let decision = needs_reprocessing(None, "new_hash");
518
519 assert!(decision.needs_embedding);
520 assert_eq!(decision.outcome, SyncOutcome::Created);
521 assert_eq!(decision.reason, "new dataset");
522 }
523
524 #[test]
525 fn test_is_legacy_true() {
526 let existing: Option<Option<String>> = Some(None);
527 let decision = needs_reprocessing(existing.as_ref(), "new_hash");
528
529 assert!(decision.is_legacy());
530 }
531
532 #[test]
533 fn test_is_legacy_false() {
534 let decision = needs_reprocessing(None, "new_hash");
535 assert!(!decision.is_legacy());
536
537 let hash = "abc123".to_string();
538 let existing = Some(Some(hash.clone()));
539 let decision = needs_reprocessing(existing.as_ref(), &hash);
540 assert!(!decision.is_legacy());
541 }
542
543 #[test]
548 fn test_portal_harvest_result_success() {
549 let stats = SyncStats {
550 unchanged: 5,
551 updated: 3,
552 created: 2,
553 failed: 0,
554 skipped: 0,
555 };
556 let result = PortalHarvestResult::success(
557 "test".to_string(),
558 "https://example.com".to_string(),
559 stats,
560 );
561 assert!(result.is_success());
562 assert!(result.error.is_none());
563 assert_eq!(result.stats.total(), 10);
564 assert_eq!(result.portal_name, "test");
565 assert_eq!(result.portal_url, "https://example.com");
566 }
567
568 #[test]
569 fn test_portal_harvest_result_failure() {
570 let result = PortalHarvestResult::failure(
571 "test".to_string(),
572 "https://example.com".to_string(),
573 "Connection timeout".to_string(),
574 );
575 assert!(!result.is_success());
576 assert_eq!(result.error, Some("Connection timeout".to_string()));
577 assert_eq!(result.stats.total(), 0);
578 }
579
580 #[test]
585 fn test_batch_harvest_summary_empty() {
586 let summary = BatchHarvestSummary::new();
587 assert_eq!(summary.successful_count(), 0);
588 assert_eq!(summary.failed_count(), 0);
589 assert_eq!(summary.total_datasets(), 0);
590 assert_eq!(summary.total_portals(), 0);
591 }
592
593 #[test]
594 fn test_batch_harvest_summary_mixed_results() {
595 let mut summary = BatchHarvestSummary::new();
596
597 let stats1 = SyncStats {
598 unchanged: 10,
599 updated: 5,
600 created: 3,
601 failed: 2,
602 skipped: 0,
603 };
604 summary.add(PortalHarvestResult::success(
605 "a".into(),
606 "https://a.com".into(),
607 stats1,
608 ));
609
610 summary.add(PortalHarvestResult::failure(
611 "b".into(),
612 "https://b.com".into(),
613 "error".into(),
614 ));
615
616 let stats2 = SyncStats {
617 unchanged: 20,
618 updated: 0,
619 created: 0,
620 failed: 0,
621 skipped: 0,
622 };
623 summary.add(PortalHarvestResult::success(
624 "c".into(),
625 "https://c.com".into(),
626 stats2,
627 ));
628
629 assert_eq!(summary.total_portals(), 3);
630 assert_eq!(summary.successful_count(), 2);
631 assert_eq!(summary.failed_count(), 1);
632 assert_eq!(summary.total_datasets(), 40); }
634
635 #[test]
636 fn test_batch_harvest_summary_all_successful() {
637 let mut summary = BatchHarvestSummary::new();
638
639 let stats = SyncStats {
640 unchanged: 5,
641 updated: 0,
642 created: 5,
643 failed: 0,
644 skipped: 0,
645 };
646 summary.add(PortalHarvestResult::success(
647 "portal1".into(),
648 "https://portal1.com".into(),
649 stats,
650 ));
651
652 assert_eq!(summary.successful_count(), 1);
653 assert_eq!(summary.failed_count(), 0);
654 assert_eq!(summary.total_datasets(), 10);
655 }
656
657 #[test]
658 fn test_batch_harvest_summary_all_failed() {
659 let mut summary = BatchHarvestSummary::new();
660
661 summary.add(PortalHarvestResult::failure(
662 "portal1".into(),
663 "https://portal1.com".into(),
664 "error1".into(),
665 ));
666 summary.add(PortalHarvestResult::failure(
667 "portal2".into(),
668 "https://portal2.com".into(),
669 "error2".into(),
670 ));
671
672 assert_eq!(summary.successful_count(), 0);
673 assert_eq!(summary.failed_count(), 2);
674 assert_eq!(summary.total_datasets(), 0);
675 assert_eq!(summary.total_portals(), 2);
676 }
677
678 #[test]
683 fn test_atomic_sync_stats_new() {
684 let stats = AtomicSyncStats::new();
685 let result = stats.to_stats();
686 assert_eq!(result.unchanged, 0);
687 assert_eq!(result.updated, 0);
688 assert_eq!(result.created, 0);
689 assert_eq!(result.failed, 0);
690 assert_eq!(result.skipped, 0);
691 }
692
693 #[test]
694 fn test_atomic_sync_stats_record() {
695 let stats = AtomicSyncStats::new();
696 stats.record(SyncOutcome::Unchanged);
697 stats.record(SyncOutcome::Updated);
698 stats.record(SyncOutcome::Created);
699 stats.record(SyncOutcome::Failed);
700 stats.record(SyncOutcome::Skipped);
701
702 let result = stats.to_stats();
703 assert_eq!(result.unchanged, 1);
704 assert_eq!(result.updated, 1);
705 assert_eq!(result.created, 1);
706 assert_eq!(result.failed, 1);
707 assert_eq!(result.skipped, 1);
708 }
709
710 #[test]
711 fn test_atomic_sync_stats_multiple_records() {
712 let stats = AtomicSyncStats::new();
713 for _ in 0..10 {
714 stats.record(SyncOutcome::Unchanged);
715 }
716 for _ in 0..5 {
717 stats.record(SyncOutcome::Updated);
718 }
719
720 let result = stats.to_stats();
721 assert_eq!(result.unchanged, 10);
722 assert_eq!(result.updated, 5);
723 assert_eq!(result.total(), 15);
724 assert_eq!(result.successful(), 15);
725 }
726
727 #[test]
728 fn test_atomic_sync_stats_default() {
729 let stats = AtomicSyncStats::default();
730 let result = stats.to_stats();
731 assert_eq!(result.total(), 0);
732 }
733
734 #[test]
739 fn test_sync_status_completed() {
740 let status = SyncStatus::Completed;
741 assert_eq!(status.as_str(), "completed");
742 assert!(status.is_completed());
743 assert!(!status.is_cancelled());
744 }
745
746 #[test]
747 fn test_sync_status_cancelled() {
748 let status = SyncStatus::Cancelled;
749 assert_eq!(status.as_str(), "cancelled");
750 assert!(!status.is_completed());
751 assert!(status.is_cancelled());
752 }
753
754 #[test]
759 fn test_sync_result_completed() {
760 let stats = SyncStats {
761 unchanged: 10,
762 updated: 5,
763 created: 3,
764 failed: 0,
765 skipped: 0,
766 };
767 let result = SyncResult::completed(stats);
768 assert!(result.is_completed());
769 assert!(!result.is_cancelled());
770 assert!(result.message.is_none());
771 assert_eq!(result.stats.total(), 18);
772 }
773
774 #[test]
775 fn test_sync_result_cancelled() {
776 let stats = SyncStats {
777 unchanged: 5,
778 updated: 2,
779 created: 1,
780 failed: 0,
781 skipped: 0,
782 };
783 let result = SyncResult::cancelled(stats);
784 assert!(!result.is_completed());
785 assert!(result.is_cancelled());
786 assert!(result.message.is_some());
787 assert!(result.message.unwrap().contains("cancelled"));
788 assert_eq!(result.stats.total(), 8);
789 }
790
791 #[test]
796 fn test_content_hash_detector_unchanged() {
797 let detector = ContentHashDetector;
798 let hash = "abc123".to_string();
799 let existing = Some(Some(hash.clone()));
800 let decision = detector.needs_reprocessing(existing.as_ref(), &hash);
801
802 assert!(!decision.needs_embedding);
803 assert_eq!(decision.outcome, SyncOutcome::Unchanged);
804 }
805
806 #[test]
807 fn test_content_hash_detector_updated() {
808 let detector = ContentHashDetector;
809 let existing = Some(Some("old_hash".to_string()));
810 let decision = detector.needs_reprocessing(existing.as_ref(), "new_hash");
811
812 assert!(decision.needs_embedding);
813 assert_eq!(decision.outcome, SyncOutcome::Updated);
814 }
815
816 #[test]
817 fn test_content_hash_detector_new() {
818 let detector = ContentHashDetector;
819 let decision = detector.needs_reprocessing(None, "new_hash");
820
821 assert!(decision.needs_embedding);
822 assert_eq!(decision.outcome, SyncOutcome::Created);
823 }
824
825 #[test]
826 fn test_always_reprocess_detector_existing() {
827 let detector = AlwaysReprocessDetector;
828 let hash = "abc123".to_string();
829 let existing = Some(Some(hash.clone()));
830 let decision = detector.needs_reprocessing(existing.as_ref(), &hash);
831
832 assert!(decision.needs_embedding);
833 assert_eq!(decision.outcome, SyncOutcome::Updated);
834 assert_eq!(decision.reason, "always reprocess strategy");
835 }
836
837 #[test]
838 fn test_always_reprocess_detector_new() {
839 let detector = AlwaysReprocessDetector;
840 let decision = detector.needs_reprocessing(None, "new_hash");
841
842 assert!(decision.needs_embedding);
843 assert_eq!(decision.outcome, SyncOutcome::Created);
844 assert_eq!(decision.reason, "new dataset");
845 }
846
847 #[test]
848 fn test_always_reprocess_detector_legacy() {
849 let detector = AlwaysReprocessDetector;
850 let existing: Option<Option<String>> = Some(None);
851 let decision = detector.needs_reprocessing(existing.as_ref(), "new_hash");
852
853 assert!(decision.needs_embedding);
854 assert_eq!(decision.outcome, SyncOutcome::Updated);
855 assert_eq!(decision.reason, "always reprocess strategy");
856 }
857}