1use std::ops::Deref;
2use std::ops::DerefMut;
3use std::hash::Hash;
4use std::cmp::Eq;
5use std::collections::HashMap;
6use std::borrow::Borrow;
7
8use once_cell::sync::Lazy;
9use tracing::trace;
10
11use super::EpochCounter;
12use super::Epoch;
13use super::EpochDeltaChanges;
14use super::EpochChanges;
15
16pub trait DualDiff {
17 fn diff(&self, new_value: &Self) -> ChangeFlag;
19}
20
21pub static FULL_FILTER: Lazy<ChangeFlag> = Lazy::new(ChangeFlag::all);
22
23pub static SPEC_FILTER: Lazy<ChangeFlag> = Lazy::new(|| ChangeFlag {
24 spec: true,
25 status: false,
26 meta: false,
27});
28
29pub static STATUS_FILTER: Lazy<ChangeFlag> = Lazy::new(|| ChangeFlag {
30 spec: false,
31 status: true,
32 meta: false,
33});
34pub static META_FILTER: Lazy<ChangeFlag> = Lazy::new(|| ChangeFlag {
35 spec: false,
36 status: false,
37 meta: true,
38});
39
40#[derive(Debug)]
42pub struct ChangeFlag {
43 pub spec: bool,
44 pub status: bool,
45 pub meta: bool,
46}
47
48impl ChangeFlag {
49 pub fn all() -> Self {
50 Self {
51 spec: true,
52 status: true,
53 meta: true,
54 }
55 }
56
57 #[inline]
59 pub fn no_change() -> Self {
60 Self {
61 spec: false,
62 status: false,
63 meta: false,
64 }
65 }
66
67 #[inline]
68 pub fn has_full_change(&self) -> bool {
69 self.spec && self.status && self.meta
70 }
71
72 #[inline]
74 pub fn has_no_changes(&self) -> bool {
75 !self.spec && !self.status && !self.meta
76 }
77}
78
79#[derive(Debug, Default, Clone)]
82pub struct DualEpochCounter<T> {
83 spec_epoch: Epoch,
84 status_epoch: Epoch,
85 meta_epoch: Epoch,
86 inner: T,
87}
88
89impl<T> DualEpochCounter<T> {
90 pub fn new(inner: T) -> Self {
91 Self {
92 spec_epoch: 0,
93 status_epoch: 0,
94 meta_epoch: 0,
95 inner,
96 }
97 }
98
99 fn set_epoch(&mut self, epoch: Epoch) {
101 self.spec_epoch = epoch;
102 self.status_epoch = epoch;
103 self.meta_epoch = epoch;
104 }
105
106 fn copy_epoch(&mut self, old: &Self) {
108 self.spec_epoch = old.spec_epoch;
109 self.status_epoch = old.status_epoch;
110 self.meta_epoch = old.meta_epoch;
111 }
112
113 #[inline]
114 pub fn spec_epoch(&self) -> Epoch {
115 self.spec_epoch
116 }
117
118 fn set_spec_epoch(&mut self, epoch: Epoch) {
119 self.spec_epoch = epoch;
120 }
121
122 #[inline]
123 pub fn status_epoch(&self) -> Epoch {
124 self.status_epoch
125 }
126
127 fn set_status_epoch(&mut self, epoch: Epoch) {
128 self.status_epoch = epoch;
129 }
130
131 #[inline]
132 pub fn meta_epoch(&self) -> Epoch {
133 self.meta_epoch
134 }
135
136 fn set_meta_epoch(&mut self, epoch: Epoch) {
137 self.meta_epoch = epoch;
138 }
139
140 #[inline]
141 pub fn inner(&self) -> &T {
142 &self.inner
143 }
144
145 pub fn inner_mut(&mut self) -> &mut T {
146 &mut self.inner
147 }
148
149 pub fn inner_owned(self) -> T {
150 self.inner
151 }
152}
153
154impl<T> Deref for DualEpochCounter<T> {
155 type Target = T;
156
157 fn deref(&self) -> &Self::Target {
158 &self.inner
159 }
160}
161
162impl<T> DerefMut for DualEpochCounter<T> {
163 fn deref_mut(&mut self) -> &mut Self::Target {
164 &mut self.inner
165 }
166}
167
168impl<T> From<T> for DualEpochCounter<T> {
169 fn from(inner: T) -> Self {
170 Self::new(inner)
171 }
172}
173
174#[derive(Debug, Default)]
176pub struct DualEpochMap<K, V> {
177 epoch: EpochCounter<()>,
178 fence: EpochCounter<()>, values: HashMap<K, DualEpochCounter<V>>,
180 deleted: Vec<DualEpochCounter<V>>,
181}
182
183impl<K, V> Deref for DualEpochMap<K, V> {
184 type Target = HashMap<K, DualEpochCounter<V>>;
185
186 fn deref(&self) -> &Self::Target {
187 &self.values
188 }
189}
190
191impl<K, V> DerefMut for DualEpochMap<K, V> {
192 fn deref_mut(&mut self) -> &mut Self::Target {
193 &mut self.values
194 }
195}
196
197impl<K, V> DualEpochMap<K, V> {
198 pub fn increment_epoch(&mut self) {
199 self.epoch.increment();
200 }
201
202 pub fn decrement_epoch(&mut self) {
203 self.epoch.decrement();
204 }
205
206 pub fn epoch(&self) -> Epoch {
207 self.epoch.epoch()
208 }
209}
210
211impl<K, V> DualEpochMap<K, V>
212where
213 V: DualDiff,
214 K: Eq + Hash,
215{
216 pub fn new() -> Self {
217 Self::new_with_map(HashMap::new())
218 }
219
220 pub fn new_with_map(values: HashMap<K, DualEpochCounter<V>>) -> Self {
221 Self {
222 epoch: EpochCounter::default(),
223 fence: EpochCounter::default(),
224 values,
225 deleted: vec![],
226 }
227 }
228
229 pub fn update(&mut self, key: K, new_value: V) -> Option<ChangeFlag>
233 where
234 K: Clone,
235 {
236 let mut new_value = DualEpochCounter::new(new_value);
237 let current_epoch = self.epoch.epoch();
238
239 trace!(current_epoch, "updating");
240
241 if let Some(existing_value) = self.values.get_mut(&key) {
243 let diff = existing_value.diff(new_value.inner());
244 trace!("existing diff: {:#?}", diff);
245 if !diff.has_no_changes() {
246 new_value.copy_epoch(existing_value);
247 if diff.spec {
248 new_value.set_spec_epoch(current_epoch);
249 }
250 if diff.status {
251 new_value.set_status_epoch(current_epoch);
252 }
253 if diff.meta {
254 new_value.set_meta_epoch(current_epoch);
255 }
256
257 *existing_value = new_value;
258 }
259
260 Some(diff)
261 } else {
262 new_value.set_epoch(current_epoch);
264 self.values.insert(key, new_value);
265 None
266 }
267 }
268
269 pub fn remove<Q>(&mut self, k: &Q) -> Option<DualEpochCounter<V>>
272 where
273 K: Borrow<Q>,
274 Q: ?Sized + Hash + Eq,
275 V: Clone,
276 {
277 if let Some((_, mut old_value)) = self.values.remove_entry(k) {
278 old_value.set_epoch(self.epoch.epoch());
279 self.deleted.push(old_value.clone());
280 Some(old_value)
281 } else {
282 None
283 }
284 }
285
286 pub fn mark_fence(&mut self) {
289 self.deleted = vec![];
290 self.fence = self.epoch.clone();
291 }
292}
293
294impl<K, V> DualEpochMap<K, V>
295where
296 K: Clone,
297{
298 pub fn clone_keys(&self) -> Vec<K> {
299 self.keys().cloned().collect()
300 }
301}
302
303impl<K, V> DualEpochMap<K, V>
304where
305 V: Clone,
306 K: Clone,
307{
308 pub fn clone_values(&self) -> Vec<V> {
309 self.values().cloned().map(|c| c.inner_owned()).collect()
310 }
311
312 pub fn spec_changes_since<E>(&self, epoch_value: E) -> EpochChanges<V>
317 where
318 Epoch: From<E>,
319 {
320 let epoch = epoch_value.into();
321 self.changes_since_with_filter(epoch, &SPEC_FILTER)
322 }
323
324 pub fn status_changes_since<E>(&self, epoch_value: E) -> EpochChanges<V>
326 where
327 Epoch: From<E>,
328 {
329 let epoch = epoch_value.into();
330 self.changes_since_with_filter(epoch, &STATUS_FILTER)
331 }
332
333 pub fn meta_changes_since<E>(&self, epoch_value: E) -> EpochChanges<V>
334 where
335 Epoch: From<E>,
336 {
337 let epoch = epoch_value.into();
338 self.changes_since_with_filter(epoch, &META_FILTER)
339 }
340
341 pub fn changes_since<E>(&self, epoch_value: E) -> EpochChanges<V>
343 where
344 Epoch: From<E>,
345 {
346 let epoch = epoch_value.into();
347
348 self.changes_since_with_filter(epoch, &FULL_FILTER)
349 }
350
351 pub fn changes_since_with_filter(&self, epoch: Epoch, filter: &ChangeFlag) -> EpochChanges<V> {
353 if epoch < self.fence.epoch() {
354 return EpochChanges::new(
355 self.epoch.epoch(),
356 EpochDeltaChanges::SyncAll(self.clone_values()),
357 );
358 }
359
360 if epoch == self.epoch() {
361 return EpochChanges::new(self.epoch.epoch(), EpochDeltaChanges::empty());
362 }
363
364 let updates: Vec<V> = self
365 .values()
366 .filter_map(|v| {
367 if filter.spec && v.spec_epoch > epoch
368 || filter.status && v.status_epoch > epoch
369 || filter.meta && v.meta_epoch > epoch
370 {
371 Some(v.inner().clone())
372 } else {
373 None
374 }
375 })
376 .collect();
377
378 let deletes = self
379 .deleted
380 .iter()
381 .filter_map(|v| {
382 if filter.spec && v.spec_epoch > epoch
383 || filter.status && v.status_epoch > epoch
384 || filter.meta && v.meta_epoch > epoch
385 {
386 Some(v.inner().clone())
387 } else {
388 None
389 }
390 })
391 .collect();
392
393 EpochChanges::new(
394 self.epoch.epoch(),
395 EpochDeltaChanges::Changes((updates, deletes)),
396 )
397 }
398}
399
400#[cfg(test)]
401#[cfg(feature = "fixtures")]
402mod test {
403
404 use crate::fixture::{DefaultTest, TestEpochMap};
405
406 use super::ChangeFlag;
407
408 #[test]
409 fn test_metadata_changes() {
410 let full_change = ChangeFlag::all();
411 assert!(full_change.has_full_change());
412 assert!(!full_change.has_no_changes());
413 let no_change = ChangeFlag::no_change();
414 assert!(no_change.has_no_changes());
415 assert!(!no_change.has_full_change());
416 }
417
418 #[test]
419 fn test_epoch_map_empty() {
420 let map = TestEpochMap::new();
421 assert_eq!(map.epoch(), 0);
422 }
423
424 #[test]
425 fn test_epoch_map_update_simple() {
426 let mut map = TestEpochMap::new();
427
428 map.increment_epoch();
432
433 let test1 = DefaultTest::with_key("t1");
434 assert!(map.update(test1.key_owned(), test1).is_none()); assert_eq!(map.epoch(), 1);
437
438 {
440 let spec_changes = map.spec_changes_since(-1);
441 assert_eq!(*spec_changes.current_epoch(), 1); assert!(spec_changes.is_sync_all());
443 let (updates, deletes) = spec_changes.parts();
444 assert_eq!(updates.len(), 1);
445 assert_eq!(deletes.len(), 0);
446
447 let status_changes = map.status_changes_since(-1);
448 assert_eq!(*status_changes.current_epoch(), 1); assert!(status_changes.is_sync_all());
450 let (updates2, deletes2) = status_changes.parts();
451 assert_eq!(updates2.len(), 1);
452 assert_eq!(deletes2.len(), 0);
453
454 let meta_changes = map.meta_changes_since(-1);
455 assert_eq!(*meta_changes.current_epoch(), 1); assert!(meta_changes.is_sync_all());
457 let (updates2, deletes2) = meta_changes.parts();
458 assert_eq!(updates2.len(), 1);
459 assert_eq!(deletes2.len(), 0);
460
461 let any_change = map.changes_since(-1);
462 assert_eq!(*any_change.current_epoch(), 1);
463 assert!(any_change.is_sync_all());
464 let (updates2, deletes2) = any_change.parts();
465 assert_eq!(updates2.len(), 1);
466 assert_eq!(deletes2.len(), 0);
467 }
468
469 {
471 let spec_changes = map.spec_changes_since(0);
472 assert_eq!(*spec_changes.current_epoch(), 1); assert!(!spec_changes.is_sync_all()); let (updates, deletes) = spec_changes.parts();
475 assert_eq!(updates.len(), 1);
476 assert_eq!(deletes.len(), 0);
477
478 let status_changes = map.status_changes_since(0);
479 assert_eq!(*status_changes.current_epoch(), 1); assert!(!status_changes.is_sync_all()); let (updates, deletes) = status_changes.parts();
482 assert_eq!(updates.len(), 1);
483 assert_eq!(deletes.len(), 0);
484
485 let meta_changes = map.meta_changes_since(0);
486 assert_eq!(*meta_changes.current_epoch(), 1); assert!(!meta_changes.is_sync_all()); let (updates, deletes) = meta_changes.parts();
489 assert_eq!(updates.len(), 1);
490 assert_eq!(deletes.len(), 0);
491
492 let any_change = map.changes_since(0);
493 assert_eq!(*any_change.current_epoch(), 1);
494 assert!(!any_change.is_sync_all());
495 let (updates2, deletes2) = any_change.parts();
496 assert_eq!(updates2.len(), 1);
497 assert_eq!(deletes2.len(), 0);
498 }
499
500 {
502 let spec_changes = map.spec_changes_since(1);
503 assert_eq!(*spec_changes.current_epoch(), 1); assert!(!spec_changes.is_sync_all()); let (updates, deletes) = spec_changes.parts();
506 assert_eq!(updates.len(), 0);
507 assert_eq!(deletes.len(), 0);
508
509 let status_changes = map.status_changes_since(1);
510 assert_eq!(*status_changes.current_epoch(), 1); assert!(!status_changes.is_sync_all()); let (updates, deletes) = status_changes.parts();
513 assert_eq!(updates.len(), 0);
514 assert_eq!(deletes.len(), 0);
515
516 let meta_changes = map.meta_changes_since(1);
517 assert_eq!(*meta_changes.current_epoch(), 1); assert!(!meta_changes.is_sync_all()); let (updates, deletes) = meta_changes.parts();
520 assert_eq!(updates.len(), 0);
521 assert_eq!(deletes.len(), 0);
522
523 let any_change = map.changes_since(1);
524 assert_eq!(*any_change.current_epoch(), 1);
525 assert!(!any_change.is_sync_all());
526 let (updates2, deletes2) = any_change.parts();
527 assert_eq!(updates2.len(), 0);
528 assert_eq!(deletes2.len(), 0);
529 }
530 }
531
532 #[test]
533 fn test_epoch_map_update_status() {
534 let mut map = TestEpochMap::new();
535
536 let test1 = DefaultTest::with_key("t1");
537 let mut test2 = test1.clone();
538 test2.status.up = true;
539
540 map.increment_epoch();
542
543 assert_eq!(test1.ctx().item().rev, 0);
544 assert!(map.update(test1.key_owned(), test1).is_none());
545
546 map.increment_epoch();
547
548 let changes = map
550 .update(test2.key_owned(), test2.next_rev())
551 .expect("update");
552 assert!(!changes.spec);
553 assert!(changes.status);
554
555 assert_eq!(map.epoch(), 2);
557
558 {
560 let (updates, deletes) = map.spec_changes_since(0).parts();
561 assert_eq!(updates.len(), 1);
562 assert_eq!(deletes.len(), 0);
563
564 let (updates, deletes) = map.status_changes_since(0).parts();
565 assert_eq!(updates.len(), 1);
566 assert_eq!(deletes.len(), 0);
567
568 let (updates, deletes) = map.changes_since(0).parts();
569 assert_eq!(updates.len(), 1);
570 assert_eq!(deletes.len(), 0);
571
572 let (updates, deletes) = map.meta_changes_since(0).parts();
573 assert_eq!(updates.len(), 1);
574 assert_eq!(deletes.len(), 0);
575 }
576
577 {
580 let (updates, deletes) = map.spec_changes_since(1).parts();
581 assert_eq!(updates.len(), 0);
582 assert_eq!(deletes.len(), 0);
583
584 let (updates, deletes) = map.status_changes_since(1).parts();
585 assert_eq!(updates.len(), 1);
586 assert_eq!(deletes.len(), 0);
587
588 let (updates, deletes) = map.changes_since(1).parts();
589 assert_eq!(updates.len(), 1);
590 assert_eq!(deletes.len(), 0);
591
592 let (updates, deletes) = map.meta_changes_since(1).parts();
593 assert_eq!(updates.len(), 1); assert_eq!(deletes.len(), 0);
595 }
596
597 {
598 let (updates, deletes) = map.spec_changes_since(2).parts();
599 assert_eq!(updates.len(), 0);
600 assert_eq!(deletes.len(), 0);
601
602 let (updates, deletes) = map.status_changes_since(2).parts();
603 assert_eq!(updates.len(), 0);
604 assert_eq!(deletes.len(), 0);
605
606 let (updates, deletes) = map.changes_since(2).parts();
607 assert_eq!(updates.len(), 0);
608 assert_eq!(deletes.len(), 0);
609
610 let (updates, deletes) = map.meta_changes_since(2).parts();
611 assert_eq!(updates.len(), 0);
612 assert_eq!(deletes.len(), 0);
613 }
614 }
615
616 #[test]
617 fn test_epoch_map_update_spec() {
618 let mut map = TestEpochMap::new();
619
620 let test1 = DefaultTest::with_key("t1");
621 let mut test2 = test1.clone();
622 test2.spec.replica = 20;
623
624 map.increment_epoch();
626
627 assert!(map.update(test1.key_owned(), test1).is_none());
629
630 map.increment_epoch();
631 let changes = map
632 .update(test2.key_owned(), test2.next_rev())
633 .expect("update");
634 assert!(changes.spec);
635 assert!(!changes.status);
636
637 assert_eq!(map.epoch(), 2);
639
640 {
642 let (updates, deletes) = map.spec_changes_since(0).parts();
643 assert_eq!(updates.len(), 1);
644 assert_eq!(deletes.len(), 0);
645
646 let (updates, deletes) = map.status_changes_since(0).parts();
647 assert_eq!(updates.len(), 1);
648 assert_eq!(deletes.len(), 0);
649
650 let (updates, deletes) = map.changes_since(0).parts();
651 assert_eq!(updates.len(), 1);
652 assert_eq!(deletes.len(), 0);
653
654 let (updates, deletes) = map.meta_changes_since(0).parts();
655 assert_eq!(updates.len(), 1);
656 assert_eq!(deletes.len(), 0);
657 }
658
659 {
662 let (updates, deletes) = map.spec_changes_since(1).parts();
663 assert_eq!(updates.len(), 1);
664 assert_eq!(deletes.len(), 0);
665
666 let (updates, deletes) = map.status_changes_since(1).parts();
667 assert_eq!(updates.len(), 0);
668 assert_eq!(deletes.len(), 0);
669
670 let (updates, deletes) = map.changes_since(1).parts();
671 assert_eq!(updates.len(), 1);
672 assert_eq!(deletes.len(), 0);
673
674 let (updates, deletes) = map.meta_changes_since(1).parts();
675 assert_eq!(updates.len(), 1);
676 assert_eq!(deletes.len(), 0);
677 }
678
679 {
680 let (updates, deletes) = map.spec_changes_since(2).parts();
681 assert_eq!(updates.len(), 0);
682 assert_eq!(deletes.len(), 0);
683
684 let (updates, deletes) = map.status_changes_since(2).parts();
685 assert_eq!(updates.len(), 0);
686 assert_eq!(deletes.len(), 0);
687
688 let (updates, deletes) = map.changes_since(2).parts();
689 assert_eq!(updates.len(), 0);
690 assert_eq!(deletes.len(), 0);
691
692 let (updates, deletes) = map.meta_changes_since(2).parts();
693 assert_eq!(updates.len(), 0);
694 assert_eq!(deletes.len(), 0);
695 }
696 }
697
698 #[test]
699 fn test_epoch_map_update_meta() {
700 let mut map = TestEpochMap::new();
701
702 let test1 = DefaultTest::with_key("t1");
703 let mut test2 = test1.clone();
704 test2.ctx.item_mut().comment = "test".to_owned();
705
706 map.increment_epoch();
707
708 assert!(map.update(test1.key_owned(), test1).is_none());
709
710 assert!(
712 map.update(test2.key_owned(), test2.clone())
713 .expect("update")
714 .has_no_changes()
715 );
716
717 map.increment_epoch();
718 let changes = map
719 .update(test2.key_owned(), test2.next_rev())
720 .expect("update");
721 assert!(!changes.spec);
722 assert!(!changes.status);
723 assert!(changes.meta);
724
725 assert_eq!(map.epoch(), 2);
727
728 {
730 let (updates, deletes) = map.spec_changes_since(0).parts();
731 assert_eq!(updates.len(), 1);
732 assert_eq!(deletes.len(), 0);
733
734 let (updates, deletes) = map.status_changes_since(0).parts();
735 assert_eq!(updates.len(), 1);
736 assert_eq!(deletes.len(), 0);
737
738 let (updates, deletes) = map.changes_since(0).parts();
739 assert_eq!(updates.len(), 1);
740 assert_eq!(deletes.len(), 0);
741
742 let (updates, deletes) = map.meta_changes_since(0).parts();
743 assert_eq!(updates.len(), 1);
744 assert_eq!(deletes.len(), 0);
745 }
746
747 {
750 let (updates, deletes) = map.spec_changes_since(1).parts();
751 assert_eq!(updates.len(), 0);
752 assert_eq!(deletes.len(), 0);
753
754 let (updates, deletes) = map.status_changes_since(1).parts();
755 assert_eq!(updates.len(), 0);
756 assert_eq!(deletes.len(), 0);
757
758 let (updates, deletes) = map.changes_since(1).parts();
759 assert_eq!(updates.len(), 1);
760 assert_eq!(deletes.len(), 0);
761
762 let (updates, deletes) = map.meta_changes_since(1).parts();
763 assert_eq!(updates.len(), 1);
764 assert_eq!(deletes.len(), 0);
765 }
766
767 {
768 let (updates, deletes) = map.spec_changes_since(2).parts();
769 assert_eq!(updates.len(), 0);
770 assert_eq!(deletes.len(), 0);
771
772 let (updates, deletes) = map.status_changes_since(2).parts();
773 assert_eq!(updates.len(), 0);
774 assert_eq!(deletes.len(), 0);
775
776 let (updates, deletes) = map.changes_since(2).parts();
777 assert_eq!(updates.len(), 0);
778 assert_eq!(deletes.len(), 0);
779
780 let (updates, deletes) = map.meta_changes_since(2).parts();
781 assert_eq!(updates.len(), 0);
782 assert_eq!(deletes.len(), 0);
783 }
784 }
785}