use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SyncOutcome {
Unchanged,
Updated,
Created,
Failed,
Skipped,
}
#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)]
pub struct SyncStats {
pub unchanged: usize,
pub updated: usize,
pub created: usize,
pub failed: usize,
#[serde(default)]
pub skipped: usize,
}
impl SyncStats {
pub fn new() -> Self {
Self::default()
}
pub fn record(&mut self, outcome: SyncOutcome) {
match outcome {
SyncOutcome::Unchanged => self.unchanged += 1,
SyncOutcome::Updated => self.updated += 1,
SyncOutcome::Created => self.created += 1,
SyncOutcome::Failed => self.failed += 1,
SyncOutcome::Skipped => self.skipped += 1,
}
}
pub fn total(&self) -> usize {
self.unchanged + self.updated + self.created + self.failed + self.skipped
}
pub fn successful(&self) -> usize {
self.unchanged + self.updated + self.created
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SyncStatus {
Completed,
Cancelled,
}
impl SyncStatus {
pub fn as_str(&self) -> &'static str {
match self {
SyncStatus::Completed => "completed",
SyncStatus::Cancelled => "cancelled",
}
}
pub fn is_completed(&self) -> bool {
matches!(self, SyncStatus::Completed)
}
pub fn is_cancelled(&self) -> bool {
matches!(self, SyncStatus::Cancelled)
}
}
#[derive(Debug, Clone)]
pub struct SyncResult {
pub status: SyncStatus,
pub stats: SyncStats,
pub message: Option<String>,
}
impl SyncResult {
pub fn completed(stats: SyncStats) -> Self {
Self {
status: SyncStatus::Completed,
stats,
message: None,
}
}
pub fn cancelled(stats: SyncStats) -> Self {
Self {
status: SyncStatus::Cancelled,
stats,
message: Some("Operation cancelled - partial progress saved".to_string()),
}
}
pub fn is_completed(&self) -> bool {
self.status.is_completed()
}
pub fn is_cancelled(&self) -> bool {
self.status.is_cancelled()
}
}
pub struct AtomicSyncStats {
unchanged: AtomicUsize,
updated: AtomicUsize,
created: AtomicUsize,
failed: AtomicUsize,
skipped: AtomicUsize,
}
impl AtomicSyncStats {
pub fn new() -> Self {
Self {
unchanged: AtomicUsize::new(0),
updated: AtomicUsize::new(0),
created: AtomicUsize::new(0),
failed: AtomicUsize::new(0),
skipped: AtomicUsize::new(0),
}
}
pub fn record(&self, outcome: SyncOutcome) {
match outcome {
SyncOutcome::Unchanged => self.unchanged.fetch_add(1, Ordering::Relaxed),
SyncOutcome::Updated => self.updated.fetch_add(1, Ordering::Relaxed),
SyncOutcome::Created => self.created.fetch_add(1, Ordering::Relaxed),
SyncOutcome::Failed => self.failed.fetch_add(1, Ordering::Relaxed),
SyncOutcome::Skipped => self.skipped.fetch_add(1, Ordering::Relaxed),
};
}
pub fn to_stats(&self) -> SyncStats {
SyncStats {
unchanged: self.unchanged.load(Ordering::Relaxed),
updated: self.updated.load(Ordering::Relaxed),
created: self.created.load(Ordering::Relaxed),
failed: self.failed.load(Ordering::Relaxed),
skipped: self.skipped.load(Ordering::Relaxed),
}
}
}
impl Default for AtomicSyncStats {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReprocessingDecision {
pub outcome: SyncOutcome,
pub reason: &'static str,
}
impl ReprocessingDecision {
pub fn is_legacy(&self) -> bool {
self.reason == "legacy record without hash"
}
}
pub trait DeltaDetector: Send + Sync + Clone {
fn needs_reprocessing(
&self,
existing_hash: Option<&Option<String>>,
new_hash: &str,
) -> ReprocessingDecision;
}
#[derive(Debug, Clone, Default)]
pub struct ContentHashDetector;
impl DeltaDetector for ContentHashDetector {
fn needs_reprocessing(
&self,
existing_hash: Option<&Option<String>>,
new_hash: &str,
) -> ReprocessingDecision {
match existing_hash {
Some(Some(hash)) if hash == new_hash => {
ReprocessingDecision {
outcome: SyncOutcome::Unchanged,
reason: "content hash matches",
}
}
Some(Some(_)) => {
ReprocessingDecision {
outcome: SyncOutcome::Updated,
reason: "content hash changed",
}
}
Some(None) => {
ReprocessingDecision {
outcome: SyncOutcome::Updated,
reason: "legacy record without hash",
}
}
None => {
ReprocessingDecision {
outcome: SyncOutcome::Created,
reason: "new dataset",
}
}
}
}
}
#[derive(Debug, Clone, Default)]
pub struct AlwaysReprocessDetector;
impl DeltaDetector for AlwaysReprocessDetector {
fn needs_reprocessing(
&self,
existing_hash: Option<&Option<String>>,
_new_hash: &str,
) -> ReprocessingDecision {
match existing_hash {
Some(_) => ReprocessingDecision {
outcome: SyncOutcome::Updated,
reason: "always reprocess strategy",
},
None => ReprocessingDecision {
outcome: SyncOutcome::Created,
reason: "new dataset",
},
}
}
}
pub fn needs_reprocessing(
existing_hash: Option<&Option<String>>,
new_hash: &str,
) -> ReprocessingDecision {
ContentHashDetector.needs_reprocessing(existing_hash, new_hash)
}
#[derive(Debug, Clone)]
pub struct PortalHarvestResult {
pub portal_name: String,
pub portal_url: String,
pub stats: SyncStats,
pub error: Option<String>,
}
impl PortalHarvestResult {
pub fn success(name: String, url: String, stats: SyncStats) -> Self {
Self {
portal_name: name,
portal_url: url,
stats,
error: None,
}
}
pub fn failure(name: String, url: String, error: String) -> Self {
Self {
portal_name: name,
portal_url: url,
stats: SyncStats::default(),
error: Some(error),
}
}
pub fn is_success(&self) -> bool {
self.error.is_none()
}
}
#[derive(Debug, Clone, Default)]
pub struct BatchHarvestSummary {
pub results: Vec<PortalHarvestResult>,
}
impl BatchHarvestSummary {
pub fn new() -> Self {
Self::default()
}
pub fn add(&mut self, result: PortalHarvestResult) {
self.results.push(result);
}
pub fn successful_count(&self) -> usize {
self.results.iter().filter(|r| r.is_success()).count()
}
pub fn failed_count(&self) -> usize {
self.results.iter().filter(|r| !r.is_success()).count()
}
pub fn total_datasets(&self) -> usize {
self.results.iter().map(|r| r.stats.total()).sum()
}
pub fn total_portals(&self) -> usize {
self.results.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sync_stats_default() {
let stats = SyncStats::new();
assert_eq!(stats.unchanged, 0);
assert_eq!(stats.updated, 0);
assert_eq!(stats.created, 0);
assert_eq!(stats.failed, 0);
assert_eq!(stats.skipped, 0);
}
#[test]
fn test_sync_stats_record() {
let mut stats = SyncStats::new();
stats.record(SyncOutcome::Unchanged);
stats.record(SyncOutcome::Updated);
stats.record(SyncOutcome::Created);
stats.record(SyncOutcome::Failed);
stats.record(SyncOutcome::Skipped);
assert_eq!(stats.unchanged, 1);
assert_eq!(stats.updated, 1);
assert_eq!(stats.created, 1);
assert_eq!(stats.failed, 1);
assert_eq!(stats.skipped, 1);
}
#[test]
fn test_sync_stats_total() {
let mut stats = SyncStats::new();
stats.unchanged = 10;
stats.updated = 5;
stats.created = 3;
stats.failed = 2;
stats.skipped = 1;
assert_eq!(stats.total(), 21);
}
#[test]
fn test_sync_stats_successful() {
let mut stats = SyncStats::new();
stats.unchanged = 10;
stats.updated = 5;
stats.created = 3;
stats.failed = 2;
stats.skipped = 1;
assert_eq!(stats.successful(), 18);
}
#[test]
fn test_needs_reprocessing_unchanged() {
let hash = "abc123".to_string();
let existing = Some(Some(hash.clone()));
let decision = needs_reprocessing(existing.as_ref(), &hash);
assert_eq!(decision.outcome, SyncOutcome::Unchanged);
assert_eq!(decision.reason, "content hash matches");
}
#[test]
fn test_needs_reprocessing_updated() {
let old_hash = "abc123".to_string();
let new_hash = "def456";
let existing = Some(Some(old_hash));
let decision = needs_reprocessing(existing.as_ref(), new_hash);
assert_eq!(decision.outcome, SyncOutcome::Updated);
assert_eq!(decision.reason, "content hash changed");
}
#[test]
fn test_needs_reprocessing_legacy() {
let existing: Option<Option<String>> = Some(None);
let decision = needs_reprocessing(existing.as_ref(), "new_hash");
assert_eq!(decision.outcome, SyncOutcome::Updated);
assert_eq!(decision.reason, "legacy record without hash");
}
#[test]
fn test_needs_reprocessing_new() {
let decision = needs_reprocessing(None, "new_hash");
assert_eq!(decision.outcome, SyncOutcome::Created);
assert_eq!(decision.reason, "new dataset");
}
#[test]
fn test_is_legacy_true() {
let existing: Option<Option<String>> = Some(None);
let decision = needs_reprocessing(existing.as_ref(), "new_hash");
assert!(decision.is_legacy());
}
#[test]
fn test_is_legacy_false() {
let decision = needs_reprocessing(None, "new_hash");
assert!(!decision.is_legacy());
let hash = "abc123".to_string();
let existing = Some(Some(hash.clone()));
let decision = needs_reprocessing(existing.as_ref(), &hash);
assert!(!decision.is_legacy());
}
#[test]
fn test_portal_harvest_result_success() {
let stats = SyncStats {
unchanged: 5,
updated: 3,
created: 2,
failed: 0,
skipped: 0,
};
let result = PortalHarvestResult::success(
"test".to_string(),
"https://example.com".to_string(),
stats,
);
assert!(result.is_success());
assert!(result.error.is_none());
assert_eq!(result.stats.total(), 10);
assert_eq!(result.portal_name, "test");
assert_eq!(result.portal_url, "https://example.com");
}
#[test]
fn test_portal_harvest_result_failure() {
let result = PortalHarvestResult::failure(
"test".to_string(),
"https://example.com".to_string(),
"Connection timeout".to_string(),
);
assert!(!result.is_success());
assert_eq!(result.error, Some("Connection timeout".to_string()));
assert_eq!(result.stats.total(), 0);
}
#[test]
fn test_batch_harvest_summary_empty() {
let summary = BatchHarvestSummary::new();
assert_eq!(summary.successful_count(), 0);
assert_eq!(summary.failed_count(), 0);
assert_eq!(summary.total_datasets(), 0);
assert_eq!(summary.total_portals(), 0);
}
#[test]
fn test_batch_harvest_summary_mixed_results() {
let mut summary = BatchHarvestSummary::new();
let stats1 = SyncStats {
unchanged: 10,
updated: 5,
created: 3,
failed: 2,
skipped: 0,
};
summary.add(PortalHarvestResult::success(
"a".into(),
"https://a.com".into(),
stats1,
));
summary.add(PortalHarvestResult::failure(
"b".into(),
"https://b.com".into(),
"error".into(),
));
let stats2 = SyncStats {
unchanged: 20,
updated: 0,
created: 0,
failed: 0,
skipped: 0,
};
summary.add(PortalHarvestResult::success(
"c".into(),
"https://c.com".into(),
stats2,
));
assert_eq!(summary.total_portals(), 3);
assert_eq!(summary.successful_count(), 2);
assert_eq!(summary.failed_count(), 1);
assert_eq!(summary.total_datasets(), 40); }
#[test]
fn test_batch_harvest_summary_all_successful() {
let mut summary = BatchHarvestSummary::new();
let stats = SyncStats {
unchanged: 5,
updated: 0,
created: 5,
failed: 0,
skipped: 0,
};
summary.add(PortalHarvestResult::success(
"portal1".into(),
"https://portal1.com".into(),
stats,
));
assert_eq!(summary.successful_count(), 1);
assert_eq!(summary.failed_count(), 0);
assert_eq!(summary.total_datasets(), 10);
}
#[test]
fn test_batch_harvest_summary_all_failed() {
let mut summary = BatchHarvestSummary::new();
summary.add(PortalHarvestResult::failure(
"portal1".into(),
"https://portal1.com".into(),
"error1".into(),
));
summary.add(PortalHarvestResult::failure(
"portal2".into(),
"https://portal2.com".into(),
"error2".into(),
));
assert_eq!(summary.successful_count(), 0);
assert_eq!(summary.failed_count(), 2);
assert_eq!(summary.total_datasets(), 0);
assert_eq!(summary.total_portals(), 2);
}
#[test]
fn test_atomic_sync_stats_new() {
let stats = AtomicSyncStats::new();
let result = stats.to_stats();
assert_eq!(result.unchanged, 0);
assert_eq!(result.updated, 0);
assert_eq!(result.created, 0);
assert_eq!(result.failed, 0);
assert_eq!(result.skipped, 0);
}
#[test]
fn test_atomic_sync_stats_record() {
let stats = AtomicSyncStats::new();
stats.record(SyncOutcome::Unchanged);
stats.record(SyncOutcome::Updated);
stats.record(SyncOutcome::Created);
stats.record(SyncOutcome::Failed);
stats.record(SyncOutcome::Skipped);
let result = stats.to_stats();
assert_eq!(result.unchanged, 1);
assert_eq!(result.updated, 1);
assert_eq!(result.created, 1);
assert_eq!(result.failed, 1);
assert_eq!(result.skipped, 1);
}
#[test]
fn test_atomic_sync_stats_multiple_records() {
let stats = AtomicSyncStats::new();
for _ in 0..10 {
stats.record(SyncOutcome::Unchanged);
}
for _ in 0..5 {
stats.record(SyncOutcome::Updated);
}
let result = stats.to_stats();
assert_eq!(result.unchanged, 10);
assert_eq!(result.updated, 5);
assert_eq!(result.total(), 15);
assert_eq!(result.successful(), 15);
}
#[test]
fn test_atomic_sync_stats_default() {
let stats = AtomicSyncStats::default();
let result = stats.to_stats();
assert_eq!(result.total(), 0);
}
#[test]
fn test_sync_status_completed() {
let status = SyncStatus::Completed;
assert_eq!(status.as_str(), "completed");
assert!(status.is_completed());
assert!(!status.is_cancelled());
}
#[test]
fn test_sync_status_cancelled() {
let status = SyncStatus::Cancelled;
assert_eq!(status.as_str(), "cancelled");
assert!(!status.is_completed());
assert!(status.is_cancelled());
}
#[test]
fn test_sync_result_completed() {
let stats = SyncStats {
unchanged: 10,
updated: 5,
created: 3,
failed: 0,
skipped: 0,
};
let result = SyncResult::completed(stats);
assert!(result.is_completed());
assert!(!result.is_cancelled());
assert!(result.message.is_none());
assert_eq!(result.stats.total(), 18);
}
#[test]
fn test_sync_result_cancelled() {
let stats = SyncStats {
unchanged: 5,
updated: 2,
created: 1,
failed: 0,
skipped: 0,
};
let result = SyncResult::cancelled(stats);
assert!(!result.is_completed());
assert!(result.is_cancelled());
assert!(result.message.is_some());
assert!(result.message.unwrap().contains("cancelled"));
assert_eq!(result.stats.total(), 8);
}
#[test]
fn test_content_hash_detector_unchanged() {
let detector = ContentHashDetector;
let hash = "abc123".to_string();
let existing = Some(Some(hash.clone()));
let decision = detector.needs_reprocessing(existing.as_ref(), &hash);
assert_eq!(decision.outcome, SyncOutcome::Unchanged);
}
#[test]
fn test_content_hash_detector_updated() {
let detector = ContentHashDetector;
let existing = Some(Some("old_hash".to_string()));
let decision = detector.needs_reprocessing(existing.as_ref(), "new_hash");
assert_eq!(decision.outcome, SyncOutcome::Updated);
}
#[test]
fn test_content_hash_detector_new() {
let detector = ContentHashDetector;
let decision = detector.needs_reprocessing(None, "new_hash");
assert_eq!(decision.outcome, SyncOutcome::Created);
}
#[test]
fn test_always_reprocess_detector_existing() {
let detector = AlwaysReprocessDetector;
let hash = "abc123".to_string();
let existing = Some(Some(hash.clone()));
let decision = detector.needs_reprocessing(existing.as_ref(), &hash);
assert_eq!(decision.outcome, SyncOutcome::Updated);
assert_eq!(decision.reason, "always reprocess strategy");
}
#[test]
fn test_always_reprocess_detector_new() {
let detector = AlwaysReprocessDetector;
let decision = detector.needs_reprocessing(None, "new_hash");
assert_eq!(decision.outcome, SyncOutcome::Created);
assert_eq!(decision.reason, "new dataset");
}
#[test]
fn test_always_reprocess_detector_legacy() {
let detector = AlwaysReprocessDetector;
let existing: Option<Option<String>> = Some(None);
let decision = detector.needs_reprocessing(existing.as_ref(), "new_hash");
assert_eq!(decision.outcome, SyncOutcome::Updated);
assert_eq!(decision.reason, "always reprocess strategy");
}
}