Skip to main content

ceres_core/
sync.rs

1//! Sync service layer for portal synchronization logic.
2//!
3//! This module provides pure business logic for delta detection and sync statistics,
4//! decoupled from I/O operations and CLI orchestration.
5
6use std::sync::atomic::{AtomicUsize, Ordering};
7
8/// Outcome of processing a single dataset during sync.
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum SyncOutcome {
11    /// Dataset content hash matches existing - no changes needed
12    Unchanged,
13    /// Dataset content changed - embedding regenerated
14    Updated,
15    /// New dataset - first time seeing this dataset
16    Created,
17    /// Processing failed for this dataset
18    Failed,
19    /// Dataset skipped due to circuit breaker being open
20    Skipped,
21}
22
23/// Statistics for a portal sync operation.
24#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)]
25pub struct SyncStats {
26    pub unchanged: usize,
27    pub updated: usize,
28    pub created: usize,
29    pub failed: usize,
30    /// Number of datasets skipped due to circuit breaker being open.
31    #[serde(default)]
32    pub skipped: usize,
33}
34
35impl SyncStats {
36    /// Creates a new empty stats tracker.
37    pub fn new() -> Self {
38        Self::default()
39    }
40
41    /// Records an outcome, incrementing the appropriate counter.
42    pub fn record(&mut self, outcome: SyncOutcome) {
43        match outcome {
44            SyncOutcome::Unchanged => self.unchanged += 1,
45            SyncOutcome::Updated => self.updated += 1,
46            SyncOutcome::Created => self.created += 1,
47            SyncOutcome::Failed => self.failed += 1,
48            SyncOutcome::Skipped => self.skipped += 1,
49        }
50    }
51
52    /// Returns the total number of processed datasets.
53    pub fn total(&self) -> usize {
54        self.unchanged + self.updated + self.created + self.failed + self.skipped
55    }
56
57    /// Returns the number of successfully processed datasets.
58    pub fn successful(&self) -> usize {
59        self.unchanged + self.updated + self.created
60    }
61}
62
63// =============================================================================
64// Sync Status and Result Types (for cancellation support)
65// =============================================================================
66
67/// Overall status of a sync operation.
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum SyncStatus {
70    /// Sync completed successfully with all datasets processed.
71    Completed,
72    /// Sync was cancelled but partial progress was saved.
73    Cancelled,
74}
75
76impl SyncStatus {
77    /// Returns the string representation for database storage.
78    pub fn as_str(&self) -> &'static str {
79        match self {
80            SyncStatus::Completed => "completed",
81            SyncStatus::Cancelled => "cancelled",
82        }
83    }
84
85    /// Returns true if the sync was completed successfully.
86    pub fn is_completed(&self) -> bool {
87        matches!(self, SyncStatus::Completed)
88    }
89
90    /// Returns true if the sync was cancelled.
91    pub fn is_cancelled(&self) -> bool {
92        matches!(self, SyncStatus::Cancelled)
93    }
94}
95
96/// Result of a sync operation including status and statistics.
97#[derive(Debug, Clone)]
98pub struct SyncResult {
99    /// The final status of the sync operation.
100    pub status: SyncStatus,
101    /// Statistics about processed datasets.
102    pub stats: SyncStats,
103    /// Optional message providing context (e.g., cancellation reason).
104    pub message: Option<String>,
105}
106
107impl SyncResult {
108    /// Creates a completed sync result.
109    pub fn completed(stats: SyncStats) -> Self {
110        Self {
111            status: SyncStatus::Completed,
112            stats,
113            message: None,
114        }
115    }
116
117    /// Creates a cancelled sync result with partial statistics.
118    pub fn cancelled(stats: SyncStats) -> Self {
119        Self {
120            status: SyncStatus::Cancelled,
121            stats,
122            message: Some("Operation cancelled - partial progress saved".to_string()),
123        }
124    }
125
126    /// Returns true if the sync was fully successful.
127    pub fn is_completed(&self) -> bool {
128        self.status.is_completed()
129    }
130
131    /// Returns true if the sync was cancelled.
132    pub fn is_cancelled(&self) -> bool {
133        self.status.is_cancelled()
134    }
135}
136
137/// Thread-safe wrapper for [`SyncStats`] using atomic counters.
138///
139/// This is useful for concurrent harvesting where multiple tasks
140/// update statistics simultaneously without requiring a mutex.
141///
142/// # Example
143///
144/// ```
145/// use ceres_core::sync::{AtomicSyncStats, SyncOutcome};
146///
147/// let stats = AtomicSyncStats::new();
148///
149/// // Can be safely called from multiple threads
150/// stats.record(SyncOutcome::Created);
151/// stats.record(SyncOutcome::Unchanged);
152///
153/// let snapshot = stats.to_stats();
154/// assert_eq!(snapshot.created, 1);
155/// assert_eq!(snapshot.unchanged, 1);
156/// ```
157pub struct AtomicSyncStats {
158    unchanged: AtomicUsize,
159    updated: AtomicUsize,
160    created: AtomicUsize,
161    failed: AtomicUsize,
162    skipped: AtomicUsize,
163}
164
165impl AtomicSyncStats {
166    /// Creates a new zeroed stats tracker.
167    pub fn new() -> Self {
168        Self {
169            unchanged: AtomicUsize::new(0),
170            updated: AtomicUsize::new(0),
171            created: AtomicUsize::new(0),
172            failed: AtomicUsize::new(0),
173            skipped: AtomicUsize::new(0),
174        }
175    }
176
177    /// Records an outcome, incrementing the appropriate counter atomically.
178    pub fn record(&self, outcome: SyncOutcome) {
179        match outcome {
180            SyncOutcome::Unchanged => self.unchanged.fetch_add(1, Ordering::Relaxed),
181            SyncOutcome::Updated => self.updated.fetch_add(1, Ordering::Relaxed),
182            SyncOutcome::Created => self.created.fetch_add(1, Ordering::Relaxed),
183            SyncOutcome::Failed => self.failed.fetch_add(1, Ordering::Relaxed),
184            SyncOutcome::Skipped => self.skipped.fetch_add(1, Ordering::Relaxed),
185        };
186    }
187
188    /// Converts atomic counters to a snapshot [`SyncStats`].
189    pub fn to_stats(&self) -> SyncStats {
190        SyncStats {
191            unchanged: self.unchanged.load(Ordering::Relaxed),
192            updated: self.updated.load(Ordering::Relaxed),
193            created: self.created.load(Ordering::Relaxed),
194            failed: self.failed.load(Ordering::Relaxed),
195            skipped: self.skipped.load(Ordering::Relaxed),
196        }
197    }
198}
199
200impl Default for AtomicSyncStats {
201    fn default() -> Self {
202        Self::new()
203    }
204}
205
206/// Result of delta detection for a dataset.
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub struct ReprocessingDecision {
209    /// Whether embedding needs to be regenerated
210    pub needs_embedding: bool,
211    /// The outcome classification for this dataset
212    pub outcome: SyncOutcome,
213    /// Human-readable reason for the decision
214    pub reason: &'static str,
215}
216
217impl ReprocessingDecision {
218    /// Returns true if this is a legacy record update (existing record without hash).
219    pub fn is_legacy(&self) -> bool {
220        self.reason == "legacy record without hash"
221    }
222}
223
224/// Trait for delta detection strategies.
225///
226/// Implementations determine whether a dataset needs reprocessing based on
227/// its existing and new content hashes. This enables different strategies
228/// such as content-hash comparison, always-reprocess, or timestamp-based detection.
229pub trait DeltaDetector: Send + Sync + Clone {
230    /// Determines if a dataset needs reprocessing.
231    ///
232    /// Implementations must only return `Created`, `Updated`, or `Unchanged` outcomes.
233    /// `Failed` and `Skipped` are reserved for runtime errors in the harvest pipeline.
234    ///
235    /// # Arguments
236    /// * `existing_hash` - The stored content hash lookup result:
237    ///   - `None` — dataset not found in the store (new dataset)
238    ///   - `Some(None)` — dataset exists but has no stored hash (legacy record)
239    ///   - `Some(Some(hash))` — dataset exists with the given content hash
240    /// * `new_hash` - The computed content hash from the portal data
241    fn needs_reprocessing(
242        &self,
243        existing_hash: Option<&Option<String>>,
244        new_hash: &str,
245    ) -> ReprocessingDecision;
246}
247
248/// Default delta detector using content hash comparison.
249///
250/// Compares the stored content hash with the newly computed hash to determine
251/// if a dataset's embedding needs to be regenerated.
252#[derive(Debug, Clone, Default)]
253pub struct ContentHashDetector;
254
255impl DeltaDetector for ContentHashDetector {
256    fn needs_reprocessing(
257        &self,
258        existing_hash: Option<&Option<String>>,
259        new_hash: &str,
260    ) -> ReprocessingDecision {
261        match existing_hash {
262            Some(Some(hash)) if hash == new_hash => {
263                // Hash matches - content unchanged
264                ReprocessingDecision {
265                    needs_embedding: false,
266                    outcome: SyncOutcome::Unchanged,
267                    reason: "content hash matches",
268                }
269            }
270            Some(Some(_)) => {
271                // Hash exists but differs - content updated
272                ReprocessingDecision {
273                    needs_embedding: true,
274                    outcome: SyncOutcome::Updated,
275                    reason: "content hash changed",
276                }
277            }
278            Some(None) => {
279                // Exists but no hash (legacy data) - treat as update
280                ReprocessingDecision {
281                    needs_embedding: true,
282                    outcome: SyncOutcome::Updated,
283                    reason: "legacy record without hash",
284                }
285            }
286            None => {
287                // Not in existing data - new dataset
288                ReprocessingDecision {
289                    needs_embedding: true,
290                    outcome: SyncOutcome::Created,
291                    reason: "new dataset",
292                }
293            }
294        }
295    }
296}
297
298/// Delta detector that always triggers reprocessing.
299///
300/// Useful for full rebuilds where all embeddings should be regenerated
301/// regardless of content changes.
302#[derive(Debug, Clone, Default)]
303pub struct AlwaysReprocessDetector;
304
305impl DeltaDetector for AlwaysReprocessDetector {
306    fn needs_reprocessing(
307        &self,
308        existing_hash: Option<&Option<String>>,
309        _new_hash: &str,
310    ) -> ReprocessingDecision {
311        match existing_hash {
312            Some(_) => ReprocessingDecision {
313                needs_embedding: true,
314                outcome: SyncOutcome::Updated,
315                reason: "always reprocess strategy",
316            },
317            None => ReprocessingDecision {
318                needs_embedding: true,
319                outcome: SyncOutcome::Created,
320                reason: "new dataset",
321            },
322        }
323    }
324}
325
326/// Determines if a dataset needs reprocessing based on content hash comparison.
327///
328/// This is a convenience wrapper around [`ContentHashDetector`].
329///
330/// # Arguments
331/// * `existing_hash` - The stored content hash for this dataset (None if new dataset)
332/// * `new_hash` - The computed content hash from the portal data
333///
334/// # Returns
335/// A `ReprocessingDecision` indicating whether embedding regeneration is needed
336/// and the classification of this sync operation.
337pub fn needs_reprocessing(
338    existing_hash: Option<&Option<String>>,
339    new_hash: &str,
340) -> ReprocessingDecision {
341    ContentHashDetector.needs_reprocessing(existing_hash, new_hash)
342}
343
344// =============================================================================
345// Batch Harvest Types
346// =============================================================================
347
348/// Result of harvesting a single portal in batch mode.
349#[derive(Debug, Clone)]
350pub struct PortalHarvestResult {
351    /// Portal name identifier.
352    pub portal_name: String,
353    /// Portal URL.
354    pub portal_url: String,
355    /// Sync statistics for this portal.
356    pub stats: SyncStats,
357    /// Error message if harvest failed, None if successful.
358    pub error: Option<String>,
359}
360
361impl PortalHarvestResult {
362    /// Creates a successful harvest result.
363    pub fn success(name: String, url: String, stats: SyncStats) -> Self {
364        Self {
365            portal_name: name,
366            portal_url: url,
367            stats,
368            error: None,
369        }
370    }
371
372    /// Creates a failed harvest result.
373    pub fn failure(name: String, url: String, error: String) -> Self {
374        Self {
375            portal_name: name,
376            portal_url: url,
377            stats: SyncStats::default(),
378            error: Some(error),
379        }
380    }
381
382    /// Returns true if the harvest was successful.
383    pub fn is_success(&self) -> bool {
384        self.error.is_none()
385    }
386}
387
388/// Aggregated results from batch harvesting multiple portals.
389#[derive(Debug, Clone, Default)]
390pub struct BatchHarvestSummary {
391    /// Results for each portal.
392    pub results: Vec<PortalHarvestResult>,
393}
394
395impl BatchHarvestSummary {
396    /// Creates a new empty summary.
397    pub fn new() -> Self {
398        Self::default()
399    }
400
401    /// Adds a portal harvest result.
402    pub fn add(&mut self, result: PortalHarvestResult) {
403        self.results.push(result);
404    }
405
406    /// Returns the count of successful harvests.
407    pub fn successful_count(&self) -> usize {
408        self.results.iter().filter(|r| r.is_success()).count()
409    }
410
411    /// Returns the count of failed harvests.
412    pub fn failed_count(&self) -> usize {
413        self.results.iter().filter(|r| !r.is_success()).count()
414    }
415
416    /// Returns the total number of datasets across all successful portals.
417    pub fn total_datasets(&self) -> usize {
418        self.results.iter().map(|r| r.stats.total()).sum()
419    }
420
421    /// Returns the total number of portals processed.
422    pub fn total_portals(&self) -> usize {
423        self.results.len()
424    }
425}
426
427#[cfg(test)]
428mod tests {
429    use super::*;
430
431    #[test]
432    fn test_sync_stats_default() {
433        let stats = SyncStats::new();
434        assert_eq!(stats.unchanged, 0);
435        assert_eq!(stats.updated, 0);
436        assert_eq!(stats.created, 0);
437        assert_eq!(stats.failed, 0);
438        assert_eq!(stats.skipped, 0);
439    }
440
441    #[test]
442    fn test_sync_stats_record() {
443        let mut stats = SyncStats::new();
444        stats.record(SyncOutcome::Unchanged);
445        stats.record(SyncOutcome::Updated);
446        stats.record(SyncOutcome::Created);
447        stats.record(SyncOutcome::Failed);
448        stats.record(SyncOutcome::Skipped);
449
450        assert_eq!(stats.unchanged, 1);
451        assert_eq!(stats.updated, 1);
452        assert_eq!(stats.created, 1);
453        assert_eq!(stats.failed, 1);
454        assert_eq!(stats.skipped, 1);
455    }
456
457    #[test]
458    fn test_sync_stats_total() {
459        let mut stats = SyncStats::new();
460        stats.unchanged = 10;
461        stats.updated = 5;
462        stats.created = 3;
463        stats.failed = 2;
464        stats.skipped = 1;
465
466        assert_eq!(stats.total(), 21);
467    }
468
469    #[test]
470    fn test_sync_stats_successful() {
471        let mut stats = SyncStats::new();
472        stats.unchanged = 10;
473        stats.updated = 5;
474        stats.created = 3;
475        stats.failed = 2;
476        stats.skipped = 1;
477
478        // successful does not include failed or skipped
479        assert_eq!(stats.successful(), 18);
480    }
481
482    #[test]
483    fn test_needs_reprocessing_unchanged() {
484        let hash = "abc123".to_string();
485        let existing = Some(Some(hash.clone()));
486        let decision = needs_reprocessing(existing.as_ref(), &hash);
487
488        assert!(!decision.needs_embedding);
489        assert_eq!(decision.outcome, SyncOutcome::Unchanged);
490        assert_eq!(decision.reason, "content hash matches");
491    }
492
493    #[test]
494    fn test_needs_reprocessing_updated() {
495        let old_hash = "abc123".to_string();
496        let new_hash = "def456";
497        let existing = Some(Some(old_hash));
498        let decision = needs_reprocessing(existing.as_ref(), new_hash);
499
500        assert!(decision.needs_embedding);
501        assert_eq!(decision.outcome, SyncOutcome::Updated);
502        assert_eq!(decision.reason, "content hash changed");
503    }
504
505    #[test]
506    fn test_needs_reprocessing_legacy() {
507        let existing: Option<Option<String>> = Some(None);
508        let decision = needs_reprocessing(existing.as_ref(), "new_hash");
509
510        assert!(decision.needs_embedding);
511        assert_eq!(decision.outcome, SyncOutcome::Updated);
512        assert_eq!(decision.reason, "legacy record without hash");
513    }
514
515    #[test]
516    fn test_needs_reprocessing_new() {
517        let decision = needs_reprocessing(None, "new_hash");
518
519        assert!(decision.needs_embedding);
520        assert_eq!(decision.outcome, SyncOutcome::Created);
521        assert_eq!(decision.reason, "new dataset");
522    }
523
524    #[test]
525    fn test_is_legacy_true() {
526        let existing: Option<Option<String>> = Some(None);
527        let decision = needs_reprocessing(existing.as_ref(), "new_hash");
528
529        assert!(decision.is_legacy());
530    }
531
532    #[test]
533    fn test_is_legacy_false() {
534        let decision = needs_reprocessing(None, "new_hash");
535        assert!(!decision.is_legacy());
536
537        let hash = "abc123".to_string();
538        let existing = Some(Some(hash.clone()));
539        let decision = needs_reprocessing(existing.as_ref(), &hash);
540        assert!(!decision.is_legacy());
541    }
542
543    // =========================================================================
544    // PortalHarvestResult tests
545    // =========================================================================
546
547    #[test]
548    fn test_portal_harvest_result_success() {
549        let stats = SyncStats {
550            unchanged: 5,
551            updated: 3,
552            created: 2,
553            failed: 0,
554            skipped: 0,
555        };
556        let result = PortalHarvestResult::success(
557            "test".to_string(),
558            "https://example.com".to_string(),
559            stats,
560        );
561        assert!(result.is_success());
562        assert!(result.error.is_none());
563        assert_eq!(result.stats.total(), 10);
564        assert_eq!(result.portal_name, "test");
565        assert_eq!(result.portal_url, "https://example.com");
566    }
567
568    #[test]
569    fn test_portal_harvest_result_failure() {
570        let result = PortalHarvestResult::failure(
571            "test".to_string(),
572            "https://example.com".to_string(),
573            "Connection timeout".to_string(),
574        );
575        assert!(!result.is_success());
576        assert_eq!(result.error, Some("Connection timeout".to_string()));
577        assert_eq!(result.stats.total(), 0);
578    }
579
580    // =========================================================================
581    // BatchHarvestSummary tests
582    // =========================================================================
583
584    #[test]
585    fn test_batch_harvest_summary_empty() {
586        let summary = BatchHarvestSummary::new();
587        assert_eq!(summary.successful_count(), 0);
588        assert_eq!(summary.failed_count(), 0);
589        assert_eq!(summary.total_datasets(), 0);
590        assert_eq!(summary.total_portals(), 0);
591    }
592
593    #[test]
594    fn test_batch_harvest_summary_mixed_results() {
595        let mut summary = BatchHarvestSummary::new();
596
597        let stats1 = SyncStats {
598            unchanged: 10,
599            updated: 5,
600            created: 3,
601            failed: 2,
602            skipped: 0,
603        };
604        summary.add(PortalHarvestResult::success(
605            "a".into(),
606            "https://a.com".into(),
607            stats1,
608        ));
609
610        summary.add(PortalHarvestResult::failure(
611            "b".into(),
612            "https://b.com".into(),
613            "error".into(),
614        ));
615
616        let stats2 = SyncStats {
617            unchanged: 20,
618            updated: 0,
619            created: 0,
620            failed: 0,
621            skipped: 0,
622        };
623        summary.add(PortalHarvestResult::success(
624            "c".into(),
625            "https://c.com".into(),
626            stats2,
627        ));
628
629        assert_eq!(summary.total_portals(), 3);
630        assert_eq!(summary.successful_count(), 2);
631        assert_eq!(summary.failed_count(), 1);
632        assert_eq!(summary.total_datasets(), 40); // 20 + 20 + 0 (failed portal has 0)
633    }
634
635    #[test]
636    fn test_batch_harvest_summary_all_successful() {
637        let mut summary = BatchHarvestSummary::new();
638
639        let stats = SyncStats {
640            unchanged: 5,
641            updated: 0,
642            created: 5,
643            failed: 0,
644            skipped: 0,
645        };
646        summary.add(PortalHarvestResult::success(
647            "portal1".into(),
648            "https://portal1.com".into(),
649            stats,
650        ));
651
652        assert_eq!(summary.successful_count(), 1);
653        assert_eq!(summary.failed_count(), 0);
654        assert_eq!(summary.total_datasets(), 10);
655    }
656
657    #[test]
658    fn test_batch_harvest_summary_all_failed() {
659        let mut summary = BatchHarvestSummary::new();
660
661        summary.add(PortalHarvestResult::failure(
662            "portal1".into(),
663            "https://portal1.com".into(),
664            "error1".into(),
665        ));
666        summary.add(PortalHarvestResult::failure(
667            "portal2".into(),
668            "https://portal2.com".into(),
669            "error2".into(),
670        ));
671
672        assert_eq!(summary.successful_count(), 0);
673        assert_eq!(summary.failed_count(), 2);
674        assert_eq!(summary.total_datasets(), 0);
675        assert_eq!(summary.total_portals(), 2);
676    }
677
678    // =========================================================================
679    // AtomicSyncStats tests
680    // =========================================================================
681
682    #[test]
683    fn test_atomic_sync_stats_new() {
684        let stats = AtomicSyncStats::new();
685        let result = stats.to_stats();
686        assert_eq!(result.unchanged, 0);
687        assert_eq!(result.updated, 0);
688        assert_eq!(result.created, 0);
689        assert_eq!(result.failed, 0);
690        assert_eq!(result.skipped, 0);
691    }
692
693    #[test]
694    fn test_atomic_sync_stats_record() {
695        let stats = AtomicSyncStats::new();
696        stats.record(SyncOutcome::Unchanged);
697        stats.record(SyncOutcome::Updated);
698        stats.record(SyncOutcome::Created);
699        stats.record(SyncOutcome::Failed);
700        stats.record(SyncOutcome::Skipped);
701
702        let result = stats.to_stats();
703        assert_eq!(result.unchanged, 1);
704        assert_eq!(result.updated, 1);
705        assert_eq!(result.created, 1);
706        assert_eq!(result.failed, 1);
707        assert_eq!(result.skipped, 1);
708    }
709
710    #[test]
711    fn test_atomic_sync_stats_multiple_records() {
712        let stats = AtomicSyncStats::new();
713        for _ in 0..10 {
714            stats.record(SyncOutcome::Unchanged);
715        }
716        for _ in 0..5 {
717            stats.record(SyncOutcome::Updated);
718        }
719
720        let result = stats.to_stats();
721        assert_eq!(result.unchanged, 10);
722        assert_eq!(result.updated, 5);
723        assert_eq!(result.total(), 15);
724        assert_eq!(result.successful(), 15);
725    }
726
727    #[test]
728    fn test_atomic_sync_stats_default() {
729        let stats = AtomicSyncStats::default();
730        let result = stats.to_stats();
731        assert_eq!(result.total(), 0);
732    }
733
734    // =========================================================================
735    // SyncStatus tests
736    // =========================================================================
737
738    #[test]
739    fn test_sync_status_completed() {
740        let status = SyncStatus::Completed;
741        assert_eq!(status.as_str(), "completed");
742        assert!(status.is_completed());
743        assert!(!status.is_cancelled());
744    }
745
746    #[test]
747    fn test_sync_status_cancelled() {
748        let status = SyncStatus::Cancelled;
749        assert_eq!(status.as_str(), "cancelled");
750        assert!(!status.is_completed());
751        assert!(status.is_cancelled());
752    }
753
754    // =========================================================================
755    // SyncResult tests
756    // =========================================================================
757
758    #[test]
759    fn test_sync_result_completed() {
760        let stats = SyncStats {
761            unchanged: 10,
762            updated: 5,
763            created: 3,
764            failed: 0,
765            skipped: 0,
766        };
767        let result = SyncResult::completed(stats);
768        assert!(result.is_completed());
769        assert!(!result.is_cancelled());
770        assert!(result.message.is_none());
771        assert_eq!(result.stats.total(), 18);
772    }
773
774    #[test]
775    fn test_sync_result_cancelled() {
776        let stats = SyncStats {
777            unchanged: 5,
778            updated: 2,
779            created: 1,
780            failed: 0,
781            skipped: 0,
782        };
783        let result = SyncResult::cancelled(stats);
784        assert!(!result.is_completed());
785        assert!(result.is_cancelled());
786        assert!(result.message.is_some());
787        assert!(result.message.unwrap().contains("cancelled"));
788        assert_eq!(result.stats.total(), 8);
789    }
790
791    // =========================================================================
792    // DeltaDetector trait tests
793    // =========================================================================
794
795    #[test]
796    fn test_content_hash_detector_unchanged() {
797        let detector = ContentHashDetector;
798        let hash = "abc123".to_string();
799        let existing = Some(Some(hash.clone()));
800        let decision = detector.needs_reprocessing(existing.as_ref(), &hash);
801
802        assert!(!decision.needs_embedding);
803        assert_eq!(decision.outcome, SyncOutcome::Unchanged);
804    }
805
806    #[test]
807    fn test_content_hash_detector_updated() {
808        let detector = ContentHashDetector;
809        let existing = Some(Some("old_hash".to_string()));
810        let decision = detector.needs_reprocessing(existing.as_ref(), "new_hash");
811
812        assert!(decision.needs_embedding);
813        assert_eq!(decision.outcome, SyncOutcome::Updated);
814    }
815
816    #[test]
817    fn test_content_hash_detector_new() {
818        let detector = ContentHashDetector;
819        let decision = detector.needs_reprocessing(None, "new_hash");
820
821        assert!(decision.needs_embedding);
822        assert_eq!(decision.outcome, SyncOutcome::Created);
823    }
824
825    #[test]
826    fn test_always_reprocess_detector_existing() {
827        let detector = AlwaysReprocessDetector;
828        let hash = "abc123".to_string();
829        let existing = Some(Some(hash.clone()));
830        let decision = detector.needs_reprocessing(existing.as_ref(), &hash);
831
832        assert!(decision.needs_embedding);
833        assert_eq!(decision.outcome, SyncOutcome::Updated);
834        assert_eq!(decision.reason, "always reprocess strategy");
835    }
836
837    #[test]
838    fn test_always_reprocess_detector_new() {
839        let detector = AlwaysReprocessDetector;
840        let decision = detector.needs_reprocessing(None, "new_hash");
841
842        assert!(decision.needs_embedding);
843        assert_eq!(decision.outcome, SyncOutcome::Created);
844        assert_eq!(decision.reason, "new dataset");
845    }
846
847    #[test]
848    fn test_always_reprocess_detector_legacy() {
849        let detector = AlwaysReprocessDetector;
850        let existing: Option<Option<String>> = Some(None);
851        let decision = detector.needs_reprocessing(existing.as_ref(), "new_hash");
852
853        assert!(decision.needs_embedding);
854        assert_eq!(decision.outcome, SyncOutcome::Updated);
855        assert_eq!(decision.reason, "always reprocess strategy");
856    }
857}