Skip to main content

ceres_core/
sync.rs

1//! Sync service layer for portal synchronization logic.
2//!
3//! This module provides pure business logic for delta detection and sync statistics,
4//! decoupled from I/O operations and CLI orchestration.
5
6use std::sync::atomic::{AtomicUsize, Ordering};
7
8/// Outcome of processing a single dataset during sync.
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum SyncOutcome {
11    /// Dataset content hash matches existing - no changes needed
12    Unchanged,
13    /// Dataset content changed - embedding regenerated
14    Updated,
15    /// New dataset - first time seeing this dataset
16    Created,
17    /// Processing failed for this dataset
18    Failed,
19    /// Dataset skipped due to circuit breaker being open
20    Skipped,
21}
22
23/// Statistics for a portal sync operation.
24#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)]
25pub struct SyncStats {
26    pub unchanged: usize,
27    pub updated: usize,
28    pub created: usize,
29    pub failed: usize,
30    /// Number of datasets skipped due to circuit breaker being open.
31    #[serde(default)]
32    pub skipped: usize,
33}
34
35impl SyncStats {
36    /// Creates a new empty stats tracker.
37    pub fn new() -> Self {
38        Self::default()
39    }
40
41    /// Records an outcome, incrementing the appropriate counter.
42    pub fn record(&mut self, outcome: SyncOutcome) {
43        match outcome {
44            SyncOutcome::Unchanged => self.unchanged += 1,
45            SyncOutcome::Updated => self.updated += 1,
46            SyncOutcome::Created => self.created += 1,
47            SyncOutcome::Failed => self.failed += 1,
48            SyncOutcome::Skipped => self.skipped += 1,
49        }
50    }
51
52    /// Returns the total number of processed datasets.
53    pub fn total(&self) -> usize {
54        self.unchanged + self.updated + self.created + self.failed + self.skipped
55    }
56
57    /// Returns the number of successfully processed datasets.
58    pub fn successful(&self) -> usize {
59        self.unchanged + self.updated + self.created
60    }
61}
62
63// =============================================================================
64// Sync Status and Result Types (for cancellation support)
65// =============================================================================
66
67/// Overall status of a sync operation.
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum SyncStatus {
70    /// Sync completed successfully with all datasets processed.
71    Completed,
72    /// Sync was cancelled but partial progress was saved.
73    Cancelled,
74}
75
76impl SyncStatus {
77    /// Returns the string representation for database storage.
78    pub fn as_str(&self) -> &'static str {
79        match self {
80            SyncStatus::Completed => "completed",
81            SyncStatus::Cancelled => "cancelled",
82        }
83    }
84
85    /// Returns true if the sync was completed successfully.
86    pub fn is_completed(&self) -> bool {
87        matches!(self, SyncStatus::Completed)
88    }
89
90    /// Returns true if the sync was cancelled.
91    pub fn is_cancelled(&self) -> bool {
92        matches!(self, SyncStatus::Cancelled)
93    }
94}
95
96/// Result of a sync operation including status and statistics.
97#[derive(Debug, Clone)]
98pub struct SyncResult {
99    /// The final status of the sync operation.
100    pub status: SyncStatus,
101    /// Statistics about processed datasets.
102    pub stats: SyncStats,
103    /// Optional message providing context (e.g., cancellation reason).
104    pub message: Option<String>,
105}
106
107impl SyncResult {
108    /// Creates a completed sync result.
109    pub fn completed(stats: SyncStats) -> Self {
110        Self {
111            status: SyncStatus::Completed,
112            stats,
113            message: None,
114        }
115    }
116
117    /// Creates a cancelled sync result with partial statistics.
118    pub fn cancelled(stats: SyncStats) -> Self {
119        Self {
120            status: SyncStatus::Cancelled,
121            stats,
122            message: Some("Operation cancelled - partial progress saved".to_string()),
123        }
124    }
125
126    /// Returns true if the sync was fully successful.
127    pub fn is_completed(&self) -> bool {
128        self.status.is_completed()
129    }
130
131    /// Returns true if the sync was cancelled.
132    pub fn is_cancelled(&self) -> bool {
133        self.status.is_cancelled()
134    }
135}
136
137/// Thread-safe wrapper for [`SyncStats`] using atomic counters.
138///
139/// This is useful for concurrent harvesting where multiple tasks
140/// update statistics simultaneously without requiring a mutex.
141///
142/// # Example
143///
144/// ```
145/// use ceres_core::sync::{AtomicSyncStats, SyncOutcome};
146///
147/// let stats = AtomicSyncStats::new();
148///
149/// // Can be safely called from multiple threads
150/// stats.record(SyncOutcome::Created);
151/// stats.record(SyncOutcome::Unchanged);
152///
153/// let snapshot = stats.to_stats();
154/// assert_eq!(snapshot.created, 1);
155/// assert_eq!(snapshot.unchanged, 1);
156/// ```
157pub struct AtomicSyncStats {
158    unchanged: AtomicUsize,
159    updated: AtomicUsize,
160    created: AtomicUsize,
161    failed: AtomicUsize,
162    skipped: AtomicUsize,
163}
164
165impl AtomicSyncStats {
166    /// Creates a new zeroed stats tracker.
167    pub fn new() -> Self {
168        Self {
169            unchanged: AtomicUsize::new(0),
170            updated: AtomicUsize::new(0),
171            created: AtomicUsize::new(0),
172            failed: AtomicUsize::new(0),
173            skipped: AtomicUsize::new(0),
174        }
175    }
176
177    /// Records an outcome, incrementing the appropriate counter atomically.
178    pub fn record(&self, outcome: SyncOutcome) {
179        match outcome {
180            SyncOutcome::Unchanged => self.unchanged.fetch_add(1, Ordering::Relaxed),
181            SyncOutcome::Updated => self.updated.fetch_add(1, Ordering::Relaxed),
182            SyncOutcome::Created => self.created.fetch_add(1, Ordering::Relaxed),
183            SyncOutcome::Failed => self.failed.fetch_add(1, Ordering::Relaxed),
184            SyncOutcome::Skipped => self.skipped.fetch_add(1, Ordering::Relaxed),
185        };
186    }
187
188    /// Converts atomic counters to a snapshot [`SyncStats`].
189    pub fn to_stats(&self) -> SyncStats {
190        SyncStats {
191            unchanged: self.unchanged.load(Ordering::Relaxed),
192            updated: self.updated.load(Ordering::Relaxed),
193            created: self.created.load(Ordering::Relaxed),
194            failed: self.failed.load(Ordering::Relaxed),
195            skipped: self.skipped.load(Ordering::Relaxed),
196        }
197    }
198}
199
200impl Default for AtomicSyncStats {
201    fn default() -> Self {
202        Self::new()
203    }
204}
205
206/// Result of delta detection for a dataset.
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub struct ReprocessingDecision {
209    /// The outcome classification for this dataset
210    pub outcome: SyncOutcome,
211    /// Human-readable reason for the decision
212    pub reason: &'static str,
213}
214
215impl ReprocessingDecision {
216    /// Returns true if this is a legacy record update (existing record without hash).
217    pub fn is_legacy(&self) -> bool {
218        self.reason == "legacy record without hash"
219    }
220}
221
222/// Trait for delta detection strategies.
223///
224/// Implementations determine whether a dataset needs reprocessing based on
225/// its existing and new content hashes. This enables different strategies
226/// such as content-hash comparison, always-reprocess, or timestamp-based detection.
227pub trait DeltaDetector: Send + Sync + Clone {
228    /// Determines if a dataset needs reprocessing.
229    ///
230    /// Implementations must only return `Created`, `Updated`, or `Unchanged` outcomes.
231    /// `Failed` and `Skipped` are reserved for runtime errors in the harvest pipeline.
232    ///
233    /// # Arguments
234    /// * `existing_hash` - The stored content hash lookup result:
235    ///   - `None` — dataset not found in the store (new dataset)
236    ///   - `Some(None)` — dataset exists but has no stored hash (legacy record)
237    ///   - `Some(Some(hash))` — dataset exists with the given content hash
238    /// * `new_hash` - The computed content hash from the portal data
239    fn needs_reprocessing(
240        &self,
241        existing_hash: Option<&Option<String>>,
242        new_hash: &str,
243    ) -> ReprocessingDecision;
244}
245
246/// Default delta detector using content hash comparison.
247///
248/// Compares the stored content hash with the newly computed hash to determine
249/// if a dataset's embedding needs to be regenerated.
250#[derive(Debug, Clone, Default)]
251pub struct ContentHashDetector;
252
253impl DeltaDetector for ContentHashDetector {
254    fn needs_reprocessing(
255        &self,
256        existing_hash: Option<&Option<String>>,
257        new_hash: &str,
258    ) -> ReprocessingDecision {
259        match existing_hash {
260            Some(Some(hash)) if hash == new_hash => {
261                // Hash matches - content unchanged
262                ReprocessingDecision {
263                    outcome: SyncOutcome::Unchanged,
264                    reason: "content hash matches",
265                }
266            }
267            Some(Some(_)) => {
268                // Hash exists but differs - content updated
269                ReprocessingDecision {
270                    outcome: SyncOutcome::Updated,
271                    reason: "content hash changed",
272                }
273            }
274            Some(None) => {
275                // Exists but no hash (legacy data) - treat as update
276                ReprocessingDecision {
277                    outcome: SyncOutcome::Updated,
278                    reason: "legacy record without hash",
279                }
280            }
281            None => {
282                // Not in existing data - new dataset
283                ReprocessingDecision {
284                    outcome: SyncOutcome::Created,
285                    reason: "new dataset",
286                }
287            }
288        }
289    }
290}
291
292/// Delta detector that always triggers reprocessing.
293///
294/// Useful for full rebuilds where all embeddings should be regenerated
295/// regardless of content changes.
296#[derive(Debug, Clone, Default)]
297pub struct AlwaysReprocessDetector;
298
299impl DeltaDetector for AlwaysReprocessDetector {
300    fn needs_reprocessing(
301        &self,
302        existing_hash: Option<&Option<String>>,
303        _new_hash: &str,
304    ) -> ReprocessingDecision {
305        match existing_hash {
306            Some(_) => ReprocessingDecision {
307                outcome: SyncOutcome::Updated,
308                reason: "always reprocess strategy",
309            },
310            None => ReprocessingDecision {
311                outcome: SyncOutcome::Created,
312                reason: "new dataset",
313            },
314        }
315    }
316}
317
318/// Determines if a dataset needs reprocessing based on content hash comparison.
319///
320/// This is a convenience wrapper around [`ContentHashDetector`].
321///
322/// # Arguments
323/// * `existing_hash` - The stored content hash for this dataset (None if new dataset)
324/// * `new_hash` - The computed content hash from the portal data
325///
326/// # Returns
327/// A `ReprocessingDecision` indicating whether embedding regeneration is needed
328/// and the classification of this sync operation.
329pub fn needs_reprocessing(
330    existing_hash: Option<&Option<String>>,
331    new_hash: &str,
332) -> ReprocessingDecision {
333    ContentHashDetector.needs_reprocessing(existing_hash, new_hash)
334}
335
336// =============================================================================
337// Batch Harvest Types
338// =============================================================================
339
340/// Result of harvesting a single portal in batch mode.
341#[derive(Debug, Clone)]
342pub struct PortalHarvestResult {
343    /// Portal name identifier.
344    pub portal_name: String,
345    /// Portal URL.
346    pub portal_url: String,
347    /// Sync statistics for this portal.
348    pub stats: SyncStats,
349    /// Error message if harvest failed, None if successful.
350    pub error: Option<String>,
351}
352
353impl PortalHarvestResult {
354    /// Creates a successful harvest result.
355    pub fn success(name: String, url: String, stats: SyncStats) -> Self {
356        Self {
357            portal_name: name,
358            portal_url: url,
359            stats,
360            error: None,
361        }
362    }
363
364    /// Creates a failed harvest result.
365    pub fn failure(name: String, url: String, error: String) -> Self {
366        Self {
367            portal_name: name,
368            portal_url: url,
369            stats: SyncStats::default(),
370            error: Some(error),
371        }
372    }
373
374    /// Returns true if the harvest was successful.
375    pub fn is_success(&self) -> bool {
376        self.error.is_none()
377    }
378}
379
380/// Aggregated results from batch harvesting multiple portals.
381#[derive(Debug, Clone, Default)]
382pub struct BatchHarvestSummary {
383    /// Results for each portal.
384    pub results: Vec<PortalHarvestResult>,
385}
386
387impl BatchHarvestSummary {
388    /// Creates a new empty summary.
389    pub fn new() -> Self {
390        Self::default()
391    }
392
393    /// Adds a portal harvest result.
394    pub fn add(&mut self, result: PortalHarvestResult) {
395        self.results.push(result);
396    }
397
398    /// Returns the count of successful harvests.
399    pub fn successful_count(&self) -> usize {
400        self.results.iter().filter(|r| r.is_success()).count()
401    }
402
403    /// Returns the count of failed harvests.
404    pub fn failed_count(&self) -> usize {
405        self.results.iter().filter(|r| !r.is_success()).count()
406    }
407
408    /// Returns the total number of datasets across all successful portals.
409    pub fn total_datasets(&self) -> usize {
410        self.results.iter().map(|r| r.stats.total()).sum()
411    }
412
413    /// Returns the total number of portals processed.
414    pub fn total_portals(&self) -> usize {
415        self.results.len()
416    }
417}
418
419#[cfg(test)]
420mod tests {
421    use super::*;
422
423    #[test]
424    fn test_sync_stats_default() {
425        let stats = SyncStats::new();
426        assert_eq!(stats.unchanged, 0);
427        assert_eq!(stats.updated, 0);
428        assert_eq!(stats.created, 0);
429        assert_eq!(stats.failed, 0);
430        assert_eq!(stats.skipped, 0);
431    }
432
433    #[test]
434    fn test_sync_stats_record() {
435        let mut stats = SyncStats::new();
436        stats.record(SyncOutcome::Unchanged);
437        stats.record(SyncOutcome::Updated);
438        stats.record(SyncOutcome::Created);
439        stats.record(SyncOutcome::Failed);
440        stats.record(SyncOutcome::Skipped);
441
442        assert_eq!(stats.unchanged, 1);
443        assert_eq!(stats.updated, 1);
444        assert_eq!(stats.created, 1);
445        assert_eq!(stats.failed, 1);
446        assert_eq!(stats.skipped, 1);
447    }
448
449    #[test]
450    fn test_sync_stats_total() {
451        let mut stats = SyncStats::new();
452        stats.unchanged = 10;
453        stats.updated = 5;
454        stats.created = 3;
455        stats.failed = 2;
456        stats.skipped = 1;
457
458        assert_eq!(stats.total(), 21);
459    }
460
461    #[test]
462    fn test_sync_stats_successful() {
463        let mut stats = SyncStats::new();
464        stats.unchanged = 10;
465        stats.updated = 5;
466        stats.created = 3;
467        stats.failed = 2;
468        stats.skipped = 1;
469
470        // successful does not include failed or skipped
471        assert_eq!(stats.successful(), 18);
472    }
473
474    #[test]
475    fn test_needs_reprocessing_unchanged() {
476        let hash = "abc123".to_string();
477        let existing = Some(Some(hash.clone()));
478        let decision = needs_reprocessing(existing.as_ref(), &hash);
479
480        assert_eq!(decision.outcome, SyncOutcome::Unchanged);
481        assert_eq!(decision.reason, "content hash matches");
482    }
483
484    #[test]
485    fn test_needs_reprocessing_updated() {
486        let old_hash = "abc123".to_string();
487        let new_hash = "def456";
488        let existing = Some(Some(old_hash));
489        let decision = needs_reprocessing(existing.as_ref(), new_hash);
490
491        assert_eq!(decision.outcome, SyncOutcome::Updated);
492        assert_eq!(decision.reason, "content hash changed");
493    }
494
495    #[test]
496    fn test_needs_reprocessing_legacy() {
497        let existing: Option<Option<String>> = Some(None);
498        let decision = needs_reprocessing(existing.as_ref(), "new_hash");
499
500        assert_eq!(decision.outcome, SyncOutcome::Updated);
501        assert_eq!(decision.reason, "legacy record without hash");
502    }
503
504    #[test]
505    fn test_needs_reprocessing_new() {
506        let decision = needs_reprocessing(None, "new_hash");
507
508        assert_eq!(decision.outcome, SyncOutcome::Created);
509        assert_eq!(decision.reason, "new dataset");
510    }
511
512    #[test]
513    fn test_is_legacy_true() {
514        let existing: Option<Option<String>> = Some(None);
515        let decision = needs_reprocessing(existing.as_ref(), "new_hash");
516
517        assert!(decision.is_legacy());
518    }
519
520    #[test]
521    fn test_is_legacy_false() {
522        let decision = needs_reprocessing(None, "new_hash");
523        assert!(!decision.is_legacy());
524
525        let hash = "abc123".to_string();
526        let existing = Some(Some(hash.clone()));
527        let decision = needs_reprocessing(existing.as_ref(), &hash);
528        assert!(!decision.is_legacy());
529    }
530
531    // =========================================================================
532    // PortalHarvestResult tests
533    // =========================================================================
534
535    #[test]
536    fn test_portal_harvest_result_success() {
537        let stats = SyncStats {
538            unchanged: 5,
539            updated: 3,
540            created: 2,
541            failed: 0,
542            skipped: 0,
543        };
544        let result = PortalHarvestResult::success(
545            "test".to_string(),
546            "https://example.com".to_string(),
547            stats,
548        );
549        assert!(result.is_success());
550        assert!(result.error.is_none());
551        assert_eq!(result.stats.total(), 10);
552        assert_eq!(result.portal_name, "test");
553        assert_eq!(result.portal_url, "https://example.com");
554    }
555
556    #[test]
557    fn test_portal_harvest_result_failure() {
558        let result = PortalHarvestResult::failure(
559            "test".to_string(),
560            "https://example.com".to_string(),
561            "Connection timeout".to_string(),
562        );
563        assert!(!result.is_success());
564        assert_eq!(result.error, Some("Connection timeout".to_string()));
565        assert_eq!(result.stats.total(), 0);
566    }
567
568    // =========================================================================
569    // BatchHarvestSummary tests
570    // =========================================================================
571
572    #[test]
573    fn test_batch_harvest_summary_empty() {
574        let summary = BatchHarvestSummary::new();
575        assert_eq!(summary.successful_count(), 0);
576        assert_eq!(summary.failed_count(), 0);
577        assert_eq!(summary.total_datasets(), 0);
578        assert_eq!(summary.total_portals(), 0);
579    }
580
581    #[test]
582    fn test_batch_harvest_summary_mixed_results() {
583        let mut summary = BatchHarvestSummary::new();
584
585        let stats1 = SyncStats {
586            unchanged: 10,
587            updated: 5,
588            created: 3,
589            failed: 2,
590            skipped: 0,
591        };
592        summary.add(PortalHarvestResult::success(
593            "a".into(),
594            "https://a.com".into(),
595            stats1,
596        ));
597
598        summary.add(PortalHarvestResult::failure(
599            "b".into(),
600            "https://b.com".into(),
601            "error".into(),
602        ));
603
604        let stats2 = SyncStats {
605            unchanged: 20,
606            updated: 0,
607            created: 0,
608            failed: 0,
609            skipped: 0,
610        };
611        summary.add(PortalHarvestResult::success(
612            "c".into(),
613            "https://c.com".into(),
614            stats2,
615        ));
616
617        assert_eq!(summary.total_portals(), 3);
618        assert_eq!(summary.successful_count(), 2);
619        assert_eq!(summary.failed_count(), 1);
620        assert_eq!(summary.total_datasets(), 40); // 20 + 20 + 0 (failed portal has 0)
621    }
622
623    #[test]
624    fn test_batch_harvest_summary_all_successful() {
625        let mut summary = BatchHarvestSummary::new();
626
627        let stats = SyncStats {
628            unchanged: 5,
629            updated: 0,
630            created: 5,
631            failed: 0,
632            skipped: 0,
633        };
634        summary.add(PortalHarvestResult::success(
635            "portal1".into(),
636            "https://portal1.com".into(),
637            stats,
638        ));
639
640        assert_eq!(summary.successful_count(), 1);
641        assert_eq!(summary.failed_count(), 0);
642        assert_eq!(summary.total_datasets(), 10);
643    }
644
645    #[test]
646    fn test_batch_harvest_summary_all_failed() {
647        let mut summary = BatchHarvestSummary::new();
648
649        summary.add(PortalHarvestResult::failure(
650            "portal1".into(),
651            "https://portal1.com".into(),
652            "error1".into(),
653        ));
654        summary.add(PortalHarvestResult::failure(
655            "portal2".into(),
656            "https://portal2.com".into(),
657            "error2".into(),
658        ));
659
660        assert_eq!(summary.successful_count(), 0);
661        assert_eq!(summary.failed_count(), 2);
662        assert_eq!(summary.total_datasets(), 0);
663        assert_eq!(summary.total_portals(), 2);
664    }
665
666    // =========================================================================
667    // AtomicSyncStats tests
668    // =========================================================================
669
670    #[test]
671    fn test_atomic_sync_stats_new() {
672        let stats = AtomicSyncStats::new();
673        let result = stats.to_stats();
674        assert_eq!(result.unchanged, 0);
675        assert_eq!(result.updated, 0);
676        assert_eq!(result.created, 0);
677        assert_eq!(result.failed, 0);
678        assert_eq!(result.skipped, 0);
679    }
680
681    #[test]
682    fn test_atomic_sync_stats_record() {
683        let stats = AtomicSyncStats::new();
684        stats.record(SyncOutcome::Unchanged);
685        stats.record(SyncOutcome::Updated);
686        stats.record(SyncOutcome::Created);
687        stats.record(SyncOutcome::Failed);
688        stats.record(SyncOutcome::Skipped);
689
690        let result = stats.to_stats();
691        assert_eq!(result.unchanged, 1);
692        assert_eq!(result.updated, 1);
693        assert_eq!(result.created, 1);
694        assert_eq!(result.failed, 1);
695        assert_eq!(result.skipped, 1);
696    }
697
698    #[test]
699    fn test_atomic_sync_stats_multiple_records() {
700        let stats = AtomicSyncStats::new();
701        for _ in 0..10 {
702            stats.record(SyncOutcome::Unchanged);
703        }
704        for _ in 0..5 {
705            stats.record(SyncOutcome::Updated);
706        }
707
708        let result = stats.to_stats();
709        assert_eq!(result.unchanged, 10);
710        assert_eq!(result.updated, 5);
711        assert_eq!(result.total(), 15);
712        assert_eq!(result.successful(), 15);
713    }
714
715    #[test]
716    fn test_atomic_sync_stats_default() {
717        let stats = AtomicSyncStats::default();
718        let result = stats.to_stats();
719        assert_eq!(result.total(), 0);
720    }
721
722    // =========================================================================
723    // SyncStatus tests
724    // =========================================================================
725
726    #[test]
727    fn test_sync_status_completed() {
728        let status = SyncStatus::Completed;
729        assert_eq!(status.as_str(), "completed");
730        assert!(status.is_completed());
731        assert!(!status.is_cancelled());
732    }
733
734    #[test]
735    fn test_sync_status_cancelled() {
736        let status = SyncStatus::Cancelled;
737        assert_eq!(status.as_str(), "cancelled");
738        assert!(!status.is_completed());
739        assert!(status.is_cancelled());
740    }
741
742    // =========================================================================
743    // SyncResult tests
744    // =========================================================================
745
746    #[test]
747    fn test_sync_result_completed() {
748        let stats = SyncStats {
749            unchanged: 10,
750            updated: 5,
751            created: 3,
752            failed: 0,
753            skipped: 0,
754        };
755        let result = SyncResult::completed(stats);
756        assert!(result.is_completed());
757        assert!(!result.is_cancelled());
758        assert!(result.message.is_none());
759        assert_eq!(result.stats.total(), 18);
760    }
761
762    #[test]
763    fn test_sync_result_cancelled() {
764        let stats = SyncStats {
765            unchanged: 5,
766            updated: 2,
767            created: 1,
768            failed: 0,
769            skipped: 0,
770        };
771        let result = SyncResult::cancelled(stats);
772        assert!(!result.is_completed());
773        assert!(result.is_cancelled());
774        assert!(result.message.is_some());
775        assert!(result.message.unwrap().contains("cancelled"));
776        assert_eq!(result.stats.total(), 8);
777    }
778
779    // =========================================================================
780    // DeltaDetector trait tests
781    // =========================================================================
782
783    #[test]
784    fn test_content_hash_detector_unchanged() {
785        let detector = ContentHashDetector;
786        let hash = "abc123".to_string();
787        let existing = Some(Some(hash.clone()));
788        let decision = detector.needs_reprocessing(existing.as_ref(), &hash);
789
790        assert_eq!(decision.outcome, SyncOutcome::Unchanged);
791    }
792
793    #[test]
794    fn test_content_hash_detector_updated() {
795        let detector = ContentHashDetector;
796        let existing = Some(Some("old_hash".to_string()));
797        let decision = detector.needs_reprocessing(existing.as_ref(), "new_hash");
798
799        assert_eq!(decision.outcome, SyncOutcome::Updated);
800    }
801
802    #[test]
803    fn test_content_hash_detector_new() {
804        let detector = ContentHashDetector;
805        let decision = detector.needs_reprocessing(None, "new_hash");
806
807        assert_eq!(decision.outcome, SyncOutcome::Created);
808    }
809
810    #[test]
811    fn test_always_reprocess_detector_existing() {
812        let detector = AlwaysReprocessDetector;
813        let hash = "abc123".to_string();
814        let existing = Some(Some(hash.clone()));
815        let decision = detector.needs_reprocessing(existing.as_ref(), &hash);
816
817        assert_eq!(decision.outcome, SyncOutcome::Updated);
818        assert_eq!(decision.reason, "always reprocess strategy");
819    }
820
821    #[test]
822    fn test_always_reprocess_detector_new() {
823        let detector = AlwaysReprocessDetector;
824        let decision = detector.needs_reprocessing(None, "new_hash");
825
826        assert_eq!(decision.outcome, SyncOutcome::Created);
827        assert_eq!(decision.reason, "new dataset");
828    }
829
830    #[test]
831    fn test_always_reprocess_detector_legacy() {
832        let detector = AlwaysReprocessDetector;
833        let existing: Option<Option<String>> = Some(None);
834        let decision = detector.needs_reprocessing(existing.as_ref(), "new_hash");
835
836        assert_eq!(decision.outcome, SyncOutcome::Updated);
837        assert_eq!(decision.reason, "always reprocess strategy");
838    }
839}