1use std::sync::atomic::{AtomicUsize, Ordering};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum SyncOutcome {
11 Unchanged,
13 Updated,
15 Created,
17 Failed,
19 Skipped,
21}
22
23#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)]
25pub struct SyncStats {
26 pub unchanged: usize,
27 pub updated: usize,
28 pub created: usize,
29 pub failed: usize,
30 #[serde(default)]
32 pub skipped: usize,
33}
34
35impl SyncStats {
36 pub fn new() -> Self {
38 Self::default()
39 }
40
41 pub fn record(&mut self, outcome: SyncOutcome) {
43 match outcome {
44 SyncOutcome::Unchanged => self.unchanged += 1,
45 SyncOutcome::Updated => self.updated += 1,
46 SyncOutcome::Created => self.created += 1,
47 SyncOutcome::Failed => self.failed += 1,
48 SyncOutcome::Skipped => self.skipped += 1,
49 }
50 }
51
52 pub fn total(&self) -> usize {
54 self.unchanged + self.updated + self.created + self.failed + self.skipped
55 }
56
57 pub fn successful(&self) -> usize {
59 self.unchanged + self.updated + self.created
60 }
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum SyncStatus {
70 Completed,
72 Cancelled,
74}
75
76impl SyncStatus {
77 pub fn as_str(&self) -> &'static str {
79 match self {
80 SyncStatus::Completed => "completed",
81 SyncStatus::Cancelled => "cancelled",
82 }
83 }
84
85 pub fn is_completed(&self) -> bool {
87 matches!(self, SyncStatus::Completed)
88 }
89
90 pub fn is_cancelled(&self) -> bool {
92 matches!(self, SyncStatus::Cancelled)
93 }
94}
95
96#[derive(Debug, Clone)]
98pub struct SyncResult {
99 pub status: SyncStatus,
101 pub stats: SyncStats,
103 pub message: Option<String>,
105}
106
107impl SyncResult {
108 pub fn completed(stats: SyncStats) -> Self {
110 Self {
111 status: SyncStatus::Completed,
112 stats,
113 message: None,
114 }
115 }
116
117 pub fn cancelled(stats: SyncStats) -> Self {
119 Self {
120 status: SyncStatus::Cancelled,
121 stats,
122 message: Some("Operation cancelled - partial progress saved".to_string()),
123 }
124 }
125
126 pub fn is_completed(&self) -> bool {
128 self.status.is_completed()
129 }
130
131 pub fn is_cancelled(&self) -> bool {
133 self.status.is_cancelled()
134 }
135}
136
137pub struct AtomicSyncStats {
158 unchanged: AtomicUsize,
159 updated: AtomicUsize,
160 created: AtomicUsize,
161 failed: AtomicUsize,
162 skipped: AtomicUsize,
163}
164
165impl AtomicSyncStats {
166 pub fn new() -> Self {
168 Self {
169 unchanged: AtomicUsize::new(0),
170 updated: AtomicUsize::new(0),
171 created: AtomicUsize::new(0),
172 failed: AtomicUsize::new(0),
173 skipped: AtomicUsize::new(0),
174 }
175 }
176
177 pub fn record(&self, outcome: SyncOutcome) {
179 match outcome {
180 SyncOutcome::Unchanged => self.unchanged.fetch_add(1, Ordering::Relaxed),
181 SyncOutcome::Updated => self.updated.fetch_add(1, Ordering::Relaxed),
182 SyncOutcome::Created => self.created.fetch_add(1, Ordering::Relaxed),
183 SyncOutcome::Failed => self.failed.fetch_add(1, Ordering::Relaxed),
184 SyncOutcome::Skipped => self.skipped.fetch_add(1, Ordering::Relaxed),
185 };
186 }
187
188 pub fn to_stats(&self) -> SyncStats {
190 SyncStats {
191 unchanged: self.unchanged.load(Ordering::Relaxed),
192 updated: self.updated.load(Ordering::Relaxed),
193 created: self.created.load(Ordering::Relaxed),
194 failed: self.failed.load(Ordering::Relaxed),
195 skipped: self.skipped.load(Ordering::Relaxed),
196 }
197 }
198}
199
200impl Default for AtomicSyncStats {
201 fn default() -> Self {
202 Self::new()
203 }
204}
205
206#[derive(Debug, Clone, PartialEq, Eq)]
208pub struct ReprocessingDecision {
209 pub needs_embedding: bool,
211 pub outcome: SyncOutcome,
213 pub reason: &'static str,
215}
216
217impl ReprocessingDecision {
218 pub fn is_legacy(&self) -> bool {
220 self.reason == "legacy record without hash"
221 }
222}
223
224pub fn needs_reprocessing(
234 existing_hash: Option<&Option<String>>,
235 new_hash: &str,
236) -> ReprocessingDecision {
237 match existing_hash {
238 Some(Some(hash)) if hash == new_hash => {
239 ReprocessingDecision {
241 needs_embedding: false,
242 outcome: SyncOutcome::Unchanged,
243 reason: "content hash matches",
244 }
245 }
246 Some(Some(_)) => {
247 ReprocessingDecision {
249 needs_embedding: true,
250 outcome: SyncOutcome::Updated,
251 reason: "content hash changed",
252 }
253 }
254 Some(None) => {
255 ReprocessingDecision {
257 needs_embedding: true,
258 outcome: SyncOutcome::Updated,
259 reason: "legacy record without hash",
260 }
261 }
262 None => {
263 ReprocessingDecision {
265 needs_embedding: true,
266 outcome: SyncOutcome::Created,
267 reason: "new dataset",
268 }
269 }
270 }
271}
272
273#[derive(Debug, Clone)]
279pub struct PortalHarvestResult {
280 pub portal_name: String,
282 pub portal_url: String,
284 pub stats: SyncStats,
286 pub error: Option<String>,
288}
289
290impl PortalHarvestResult {
291 pub fn success(name: String, url: String, stats: SyncStats) -> Self {
293 Self {
294 portal_name: name,
295 portal_url: url,
296 stats,
297 error: None,
298 }
299 }
300
301 pub fn failure(name: String, url: String, error: String) -> Self {
303 Self {
304 portal_name: name,
305 portal_url: url,
306 stats: SyncStats::default(),
307 error: Some(error),
308 }
309 }
310
311 pub fn is_success(&self) -> bool {
313 self.error.is_none()
314 }
315}
316
317#[derive(Debug, Clone, Default)]
319pub struct BatchHarvestSummary {
320 pub results: Vec<PortalHarvestResult>,
322}
323
324impl BatchHarvestSummary {
325 pub fn new() -> Self {
327 Self::default()
328 }
329
330 pub fn add(&mut self, result: PortalHarvestResult) {
332 self.results.push(result);
333 }
334
335 pub fn successful_count(&self) -> usize {
337 self.results.iter().filter(|r| r.is_success()).count()
338 }
339
340 pub fn failed_count(&self) -> usize {
342 self.results.iter().filter(|r| !r.is_success()).count()
343 }
344
345 pub fn total_datasets(&self) -> usize {
347 self.results.iter().map(|r| r.stats.total()).sum()
348 }
349
350 pub fn total_portals(&self) -> usize {
352 self.results.len()
353 }
354}
355
356#[cfg(test)]
357mod tests {
358 use super::*;
359
360 #[test]
361 fn test_sync_stats_default() {
362 let stats = SyncStats::new();
363 assert_eq!(stats.unchanged, 0);
364 assert_eq!(stats.updated, 0);
365 assert_eq!(stats.created, 0);
366 assert_eq!(stats.failed, 0);
367 assert_eq!(stats.skipped, 0);
368 }
369
370 #[test]
371 fn test_sync_stats_record() {
372 let mut stats = SyncStats::new();
373 stats.record(SyncOutcome::Unchanged);
374 stats.record(SyncOutcome::Updated);
375 stats.record(SyncOutcome::Created);
376 stats.record(SyncOutcome::Failed);
377 stats.record(SyncOutcome::Skipped);
378
379 assert_eq!(stats.unchanged, 1);
380 assert_eq!(stats.updated, 1);
381 assert_eq!(stats.created, 1);
382 assert_eq!(stats.failed, 1);
383 assert_eq!(stats.skipped, 1);
384 }
385
386 #[test]
387 fn test_sync_stats_total() {
388 let mut stats = SyncStats::new();
389 stats.unchanged = 10;
390 stats.updated = 5;
391 stats.created = 3;
392 stats.failed = 2;
393 stats.skipped = 1;
394
395 assert_eq!(stats.total(), 21);
396 }
397
398 #[test]
399 fn test_sync_stats_successful() {
400 let mut stats = SyncStats::new();
401 stats.unchanged = 10;
402 stats.updated = 5;
403 stats.created = 3;
404 stats.failed = 2;
405 stats.skipped = 1;
406
407 assert_eq!(stats.successful(), 18);
409 }
410
411 #[test]
412 fn test_needs_reprocessing_unchanged() {
413 let hash = "abc123".to_string();
414 let existing = Some(Some(hash.clone()));
415 let decision = needs_reprocessing(existing.as_ref(), &hash);
416
417 assert!(!decision.needs_embedding);
418 assert_eq!(decision.outcome, SyncOutcome::Unchanged);
419 assert_eq!(decision.reason, "content hash matches");
420 }
421
422 #[test]
423 fn test_needs_reprocessing_updated() {
424 let old_hash = "abc123".to_string();
425 let new_hash = "def456";
426 let existing = Some(Some(old_hash));
427 let decision = needs_reprocessing(existing.as_ref(), new_hash);
428
429 assert!(decision.needs_embedding);
430 assert_eq!(decision.outcome, SyncOutcome::Updated);
431 assert_eq!(decision.reason, "content hash changed");
432 }
433
434 #[test]
435 fn test_needs_reprocessing_legacy() {
436 let existing: Option<Option<String>> = Some(None);
437 let decision = needs_reprocessing(existing.as_ref(), "new_hash");
438
439 assert!(decision.needs_embedding);
440 assert_eq!(decision.outcome, SyncOutcome::Updated);
441 assert_eq!(decision.reason, "legacy record without hash");
442 }
443
444 #[test]
445 fn test_needs_reprocessing_new() {
446 let decision = needs_reprocessing(None, "new_hash");
447
448 assert!(decision.needs_embedding);
449 assert_eq!(decision.outcome, SyncOutcome::Created);
450 assert_eq!(decision.reason, "new dataset");
451 }
452
453 #[test]
454 fn test_is_legacy_true() {
455 let existing: Option<Option<String>> = Some(None);
456 let decision = needs_reprocessing(existing.as_ref(), "new_hash");
457
458 assert!(decision.is_legacy());
459 }
460
461 #[test]
462 fn test_is_legacy_false() {
463 let decision = needs_reprocessing(None, "new_hash");
464 assert!(!decision.is_legacy());
465
466 let hash = "abc123".to_string();
467 let existing = Some(Some(hash.clone()));
468 let decision = needs_reprocessing(existing.as_ref(), &hash);
469 assert!(!decision.is_legacy());
470 }
471
472 #[test]
477 fn test_portal_harvest_result_success() {
478 let stats = SyncStats {
479 unchanged: 5,
480 updated: 3,
481 created: 2,
482 failed: 0,
483 skipped: 0,
484 };
485 let result = PortalHarvestResult::success(
486 "test".to_string(),
487 "https://example.com".to_string(),
488 stats,
489 );
490 assert!(result.is_success());
491 assert!(result.error.is_none());
492 assert_eq!(result.stats.total(), 10);
493 assert_eq!(result.portal_name, "test");
494 assert_eq!(result.portal_url, "https://example.com");
495 }
496
497 #[test]
498 fn test_portal_harvest_result_failure() {
499 let result = PortalHarvestResult::failure(
500 "test".to_string(),
501 "https://example.com".to_string(),
502 "Connection timeout".to_string(),
503 );
504 assert!(!result.is_success());
505 assert_eq!(result.error, Some("Connection timeout".to_string()));
506 assert_eq!(result.stats.total(), 0);
507 }
508
509 #[test]
514 fn test_batch_harvest_summary_empty() {
515 let summary = BatchHarvestSummary::new();
516 assert_eq!(summary.successful_count(), 0);
517 assert_eq!(summary.failed_count(), 0);
518 assert_eq!(summary.total_datasets(), 0);
519 assert_eq!(summary.total_portals(), 0);
520 }
521
522 #[test]
523 fn test_batch_harvest_summary_mixed_results() {
524 let mut summary = BatchHarvestSummary::new();
525
526 let stats1 = SyncStats {
527 unchanged: 10,
528 updated: 5,
529 created: 3,
530 failed: 2,
531 skipped: 0,
532 };
533 summary.add(PortalHarvestResult::success(
534 "a".into(),
535 "https://a.com".into(),
536 stats1,
537 ));
538
539 summary.add(PortalHarvestResult::failure(
540 "b".into(),
541 "https://b.com".into(),
542 "error".into(),
543 ));
544
545 let stats2 = SyncStats {
546 unchanged: 20,
547 updated: 0,
548 created: 0,
549 failed: 0,
550 skipped: 0,
551 };
552 summary.add(PortalHarvestResult::success(
553 "c".into(),
554 "https://c.com".into(),
555 stats2,
556 ));
557
558 assert_eq!(summary.total_portals(), 3);
559 assert_eq!(summary.successful_count(), 2);
560 assert_eq!(summary.failed_count(), 1);
561 assert_eq!(summary.total_datasets(), 40); }
563
564 #[test]
565 fn test_batch_harvest_summary_all_successful() {
566 let mut summary = BatchHarvestSummary::new();
567
568 let stats = SyncStats {
569 unchanged: 5,
570 updated: 0,
571 created: 5,
572 failed: 0,
573 skipped: 0,
574 };
575 summary.add(PortalHarvestResult::success(
576 "portal1".into(),
577 "https://portal1.com".into(),
578 stats,
579 ));
580
581 assert_eq!(summary.successful_count(), 1);
582 assert_eq!(summary.failed_count(), 0);
583 assert_eq!(summary.total_datasets(), 10);
584 }
585
586 #[test]
587 fn test_batch_harvest_summary_all_failed() {
588 let mut summary = BatchHarvestSummary::new();
589
590 summary.add(PortalHarvestResult::failure(
591 "portal1".into(),
592 "https://portal1.com".into(),
593 "error1".into(),
594 ));
595 summary.add(PortalHarvestResult::failure(
596 "portal2".into(),
597 "https://portal2.com".into(),
598 "error2".into(),
599 ));
600
601 assert_eq!(summary.successful_count(), 0);
602 assert_eq!(summary.failed_count(), 2);
603 assert_eq!(summary.total_datasets(), 0);
604 assert_eq!(summary.total_portals(), 2);
605 }
606
607 #[test]
612 fn test_atomic_sync_stats_new() {
613 let stats = AtomicSyncStats::new();
614 let result = stats.to_stats();
615 assert_eq!(result.unchanged, 0);
616 assert_eq!(result.updated, 0);
617 assert_eq!(result.created, 0);
618 assert_eq!(result.failed, 0);
619 assert_eq!(result.skipped, 0);
620 }
621
622 #[test]
623 fn test_atomic_sync_stats_record() {
624 let stats = AtomicSyncStats::new();
625 stats.record(SyncOutcome::Unchanged);
626 stats.record(SyncOutcome::Updated);
627 stats.record(SyncOutcome::Created);
628 stats.record(SyncOutcome::Failed);
629 stats.record(SyncOutcome::Skipped);
630
631 let result = stats.to_stats();
632 assert_eq!(result.unchanged, 1);
633 assert_eq!(result.updated, 1);
634 assert_eq!(result.created, 1);
635 assert_eq!(result.failed, 1);
636 assert_eq!(result.skipped, 1);
637 }
638
639 #[test]
640 fn test_atomic_sync_stats_multiple_records() {
641 let stats = AtomicSyncStats::new();
642 for _ in 0..10 {
643 stats.record(SyncOutcome::Unchanged);
644 }
645 for _ in 0..5 {
646 stats.record(SyncOutcome::Updated);
647 }
648
649 let result = stats.to_stats();
650 assert_eq!(result.unchanged, 10);
651 assert_eq!(result.updated, 5);
652 assert_eq!(result.total(), 15);
653 assert_eq!(result.successful(), 15);
654 }
655
656 #[test]
657 fn test_atomic_sync_stats_default() {
658 let stats = AtomicSyncStats::default();
659 let result = stats.to_stats();
660 assert_eq!(result.total(), 0);
661 }
662
663 #[test]
668 fn test_sync_status_completed() {
669 let status = SyncStatus::Completed;
670 assert_eq!(status.as_str(), "completed");
671 assert!(status.is_completed());
672 assert!(!status.is_cancelled());
673 }
674
675 #[test]
676 fn test_sync_status_cancelled() {
677 let status = SyncStatus::Cancelled;
678 assert_eq!(status.as_str(), "cancelled");
679 assert!(!status.is_completed());
680 assert!(status.is_cancelled());
681 }
682
683 #[test]
688 fn test_sync_result_completed() {
689 let stats = SyncStats {
690 unchanged: 10,
691 updated: 5,
692 created: 3,
693 failed: 0,
694 skipped: 0,
695 };
696 let result = SyncResult::completed(stats);
697 assert!(result.is_completed());
698 assert!(!result.is_cancelled());
699 assert!(result.message.is_none());
700 assert_eq!(result.stats.total(), 18);
701 }
702
703 #[test]
704 fn test_sync_result_cancelled() {
705 let stats = SyncStats {
706 unchanged: 5,
707 updated: 2,
708 created: 1,
709 failed: 0,
710 skipped: 0,
711 };
712 let result = SyncResult::cancelled(stats);
713 assert!(!result.is_completed());
714 assert!(result.is_cancelled());
715 assert!(result.message.is_some());
716 assert!(result.message.unwrap().contains("cancelled"));
717 assert_eq!(result.stats.total(), 8);
718 }
719}