fluvio_stream_model/epoch/
epoch_map.rs

1use std::ops::Deref;
2use std::ops::DerefMut;
3use std::hash::Hash;
4use std::hash::Hasher;
5use std::cmp::Eq;
6use std::cmp::PartialEq;
7use std::fmt;
8
9pub type Epoch = i64;
10
11/// Keep track of changes to object using epoch
12/// for every changes to objects, epoch counter must be incremented
13#[derive(Debug, Default, Clone)]
14pub struct EpochCounter<T> {
15    epoch: Epoch,
16    inner: T,
17}
18
19impl<T> Hash for EpochCounter<T>
20where
21    T: Hash,
22{
23    fn hash<H: Hasher>(&self, state: &mut H) {
24        self.inner.hash(state);
25    }
26}
27
28impl<T> PartialEq for EpochCounter<T>
29where
30    T: PartialEq,
31{
32    fn eq(&self, other: &Self) -> bool {
33        self.inner == other.inner
34    }
35}
36
37impl<T> Eq for EpochCounter<T> where T: Eq {}
38
39impl<T> Deref for EpochCounter<T> {
40    type Target = T;
41
42    fn deref(&self) -> &Self::Target {
43        &self.inner
44    }
45}
46
47impl<T> DerefMut for EpochCounter<T> {
48    fn deref_mut(&mut self) -> &mut Self::Target {
49        &mut self.inner
50    }
51}
52
53impl<T> fmt::Display for EpochCounter<T> {
54    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
55        write!(f, "epoch: {}", self.epoch)
56    }
57}
58
59impl<T> From<T> for EpochCounter<T> {
60    fn from(inner: T) -> Self {
61        Self { epoch: 0, inner }
62    }
63}
64
65impl<T> EpochCounter<T> {
66    pub fn new(inner: T) -> Self {
67        Self { epoch: 0, inner }
68    }
69
70    pub fn new_with_epoch(inner: T, epoch: impl Into<i64>) -> Self {
71        Self {
72            epoch: epoch.into(),
73            inner,
74        }
75    }
76
77    pub fn inner(&self) -> &T {
78        &self.inner
79    }
80
81    pub fn inner_mut(&mut self) -> &mut T {
82        &mut self.inner
83    }
84
85    pub fn inner_owned(self) -> T {
86        self.inner
87    }
88
89    pub fn epoch(&self) -> Epoch {
90        self.epoch
91    }
92
93    fn set_epoch(&mut self, epoch: Epoch) {
94        self.epoch = epoch;
95    }
96
97    pub fn increment(&mut self) {
98        self.epoch += 1;
99    }
100
101    pub fn decrement(&mut self) {
102        self.epoch -= 1;
103    }
104}
105
106pub use old_map::*;
107
108mod old_map {
109
110    use std::collections::HashMap;
111    use std::hash::Hash;
112    use std::borrow::Borrow;
113
114    use super::*;
115
116    /// use epoch counter for every value in the hashmap
117    /// if value are deleted, it is moved to thrash can (deleted)
118    /// using epoch counter, level changes can be calculated
119    #[derive(Debug, Default)]
120    pub struct EpochMap<K, V> {
121        epoch: EpochCounter<()>,
122        fence: EpochCounter<()>, // last changes
123        map: HashMap<K, EpochCounter<V>>,
124        deleted: Vec<EpochCounter<V>>,
125    }
126
127    impl<K, V> Deref for EpochMap<K, V> {
128        type Target = HashMap<K, EpochCounter<V>>;
129
130        fn deref(&self) -> &Self::Target {
131            &self.map
132        }
133    }
134
135    impl<K, V> DerefMut for EpochMap<K, V> {
136        fn deref_mut(&mut self) -> &mut Self::Target {
137            &mut self.map
138        }
139    }
140
141    impl<K, V> EpochMap<K, V> {
142        pub fn increment_epoch(&mut self) {
143            self.epoch.increment();
144        }
145
146        pub fn epoch(&self) -> Epoch {
147            self.epoch.epoch()
148        }
149
150        /// fence history to current epoch,
151        /// older before fence will be lost
152        pub fn mark_fence(&mut self) {
153            self.deleted = vec![];
154            self.fence = self.epoch.clone();
155        }
156    }
157
158    impl<K, V> EpochMap<K, V>
159    where
160        K: Eq + Hash,
161    {
162        pub fn new() -> Self {
163            Self::new_with_map(HashMap::new())
164        }
165
166        pub fn new_with_map(map: HashMap<K, EpochCounter<V>>) -> Self {
167            Self {
168                epoch: EpochCounter::default(),
169                fence: EpochCounter::default(),
170                map,
171                deleted: vec![],
172            }
173        }
174
175        /// insert new value
176        /// remove history from deleted set
177        pub fn insert(&mut self, key: K, value: V) -> Option<EpochCounter<V>>
178        where
179            K: Clone,
180        {
181            let mut epoch_value: EpochCounter<V> = value.into();
182            epoch_value.set_epoch(self.epoch.epoch());
183            self.map.insert(key, epoch_value)
184        }
185
186        /// remove existing value
187        /// if successful, remove are added to history
188        pub fn remove<Q>(&mut self, k: &Q) -> Option<EpochCounter<V>>
189        where
190            K: Borrow<Q>,
191            Q: ?Sized + Hash + Eq,
192            V: Clone,
193        {
194            if let Some((_, mut old_value)) = self.map.remove_entry(k) {
195                old_value.set_epoch(self.epoch.epoch());
196                self.deleted.push(old_value.clone());
197                Some(old_value)
198            } else {
199                None
200            }
201        }
202    }
203
204    impl<K, V> EpochMap<K, V>
205    where
206        K: Clone,
207    {
208        pub fn clone_keys(&self) -> Vec<K> {
209            self.keys().cloned().collect()
210        }
211    }
212
213    impl<K, V> EpochMap<K, V>
214    where
215        V: Clone,
216        K: Clone,
217    {
218        pub fn clone_values(&self) -> Vec<V> {
219            self.values().cloned().map(|c| c.inner_owned()).collect()
220        }
221
222        /// find all changes since a epoch
223        /// if epoch is before fence, return full changes with epoch,
224        /// otherwise return delta changes
225        /// user should keep that epoch and do subsequent changes
226        pub fn changes_since<E>(&self, epoch_value: E) -> EpochChanges<V>
227        where
228            Epoch: From<E>,
229        {
230            let epoch = epoch_value.into();
231            if epoch < self.fence.epoch() {
232                return EpochChanges {
233                    epoch: self.epoch.epoch(),
234                    changes: EpochDeltaChanges::SyncAll(self.clone_values()),
235                };
236            }
237
238            if epoch == self.epoch() {
239                return EpochChanges {
240                    epoch: self.epoch.epoch(),
241                    changes: EpochDeltaChanges::empty(),
242                };
243            }
244
245            let updates = self
246                .values()
247                .filter_map(|v| {
248                    if v.epoch > epoch {
249                        Some(v.inner().clone())
250                    } else {
251                        None
252                    }
253                })
254                .collect();
255
256            let deletes = self
257                .deleted
258                .iter()
259                .filter_map(|d| {
260                    if d.epoch > epoch {
261                        Some(d.inner().clone())
262                    } else {
263                        None
264                    }
265                })
266                .collect();
267
268            EpochChanges {
269                epoch: self.epoch.epoch(),
270                changes: EpochDeltaChanges::Changes((updates, deletes)),
271            }
272        }
273    }
274
275    pub struct EpochChanges<V> {
276        // current epoch
277        pub epoch: Epoch,
278        changes: EpochDeltaChanges<V>,
279    }
280
281    impl<V: fmt::Debug> fmt::Debug for EpochChanges<V> {
282        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
283            f.debug_struct("EpochChanges")
284                .field("epoch", &self.epoch)
285                .field("changes", &self.changes)
286                .finish()
287        }
288    }
289
290    impl<V> EpochChanges<V> {
291        pub fn new(epoch: Epoch, changes: EpochDeltaChanges<V>) -> Self {
292            Self { epoch, changes }
293        }
294
295        /// current epoch
296        pub fn current_epoch(&self) -> &Epoch {
297            &self.epoch
298        }
299
300        /// return all updates regardless of sync or changes
301        /// (update,deletes)
302        pub fn parts(self) -> (Vec<V>, Vec<V>) {
303            match self.changes {
304                EpochDeltaChanges::SyncAll(all) => (all, vec![]),
305                EpochDeltaChanges::Changes(changes) => changes,
306            }
307        }
308
309        pub fn is_empty(&self) -> bool {
310            match &self.changes {
311                EpochDeltaChanges::SyncAll(all) => all.is_empty(),
312                EpochDeltaChanges::Changes(changes) => changes.0.is_empty() && changes.1.is_empty(),
313            }
314        }
315
316        /// is change contain sync all
317        pub fn is_sync_all(&self) -> bool {
318            match &self.changes {
319                EpochDeltaChanges::SyncAll(_) => true,
320                EpochDeltaChanges::Changes(_) => false,
321            }
322        }
323    }
324
325    pub enum EpochDeltaChanges<V> {
326        SyncAll(Vec<V>),
327        Changes((Vec<V>, Vec<V>)),
328    }
329
330    impl<V> EpochDeltaChanges<V> {
331        pub fn empty() -> Self {
332            Self::Changes((vec![], vec![]))
333        }
334    }
335
336    impl<V: fmt::Debug> fmt::Debug for EpochDeltaChanges<V> {
337        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
338            match self {
339                Self::SyncAll(all) => f.debug_tuple("SyncAll").field(all).finish(),
340                Self::Changes((add, del)) => {
341                    f.debug_tuple("Changes").field(add).field(del).finish()
342                }
343            }
344        }
345    }
346}
347
348#[cfg(test)]
349mod test {
350
351    use std::fmt::Display;
352
353    use serde::{Serialize, Deserialize};
354
355    use crate::core::{Spec, Status};
356    use crate::store::DefaultMetadataObject;
357
358    use super::EpochMap;
359
360    // define test spec and status
361    #[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
362    struct TestSpec {
363        replica: u16,
364    }
365
366    impl Spec for TestSpec {
367        const LABEL: &'static str = "Test";
368        type IndexKey = String;
369        type Owner = Self;
370        type Status = TestStatus;
371    }
372
373    #[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
374    struct TestStatus {
375        up: bool,
376    }
377
378    impl Status for TestStatus {}
379
380    impl Display for TestStatus {
381        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
382            write!(f, "{self:?}")
383        }
384    }
385
386    type DefaultTest = DefaultMetadataObject<TestSpec>;
387
388    type TestEpochMap = EpochMap<String, DefaultTest>;
389
390    #[test]
391    fn test_epoch_map_empty() {
392        let map = TestEpochMap::new();
393        assert_eq!(map.epoch(), 0);
394    }
395
396    #[test]
397    fn test_epoch_map_insert() {
398        let mut map = TestEpochMap::new();
399
400        // increase epoch
401        // epoch must be increased before any write occur manually here
402        // in the store, this is done automatically but this is low level interface
403        map.increment_epoch();
404
405        let test1 = DefaultTest::with_key("t1");
406        map.insert(test1.key_owned(), test1);
407
408        assert_eq!(map.epoch(), 1);
409
410        // test with before base epoch
411        {
412            let changes = map.changes_since(-1);
413            assert_eq!(*changes.current_epoch(), 1); // current epoch is 1
414            assert!(changes.is_sync_all()); // this is only delta
415
416            let (updates, deletes) = changes.parts();
417            assert_eq!(updates.len(), 1);
418            assert_eq!(deletes.len(), 0);
419        }
420
421        // test with base epoch
422        {
423            let changes = map.changes_since(0);
424            assert_eq!(*changes.current_epoch(), 1); // current epoch is 1
425            assert!(!changes.is_sync_all()); // this is only delta
426
427            let (updates, deletes) = changes.parts();
428            assert_eq!(updates.len(), 1);
429            assert_eq!(deletes.len(), 0);
430        }
431
432        // test with current epoch which should return empty
433        {
434            let changes = map.changes_since(1);
435            assert_eq!(*changes.current_epoch(), 1); // current epoch is 1
436            assert!(!changes.is_sync_all()); // this is only delta
437            let (updates, deletes) = changes.parts();
438            assert_eq!(updates.len(), 0);
439            assert_eq!(deletes.len(), 0);
440        }
441    }
442
443    #[test]
444    fn test_epoch_map_insert_update() {
445        let mut map = TestEpochMap::new();
446
447        let test1 = DefaultTest::with_key("t1");
448        let test2 = test1.clone();
449        let test3 = DefaultTest::with_key("t2");
450
451        // first epoch
452        map.increment_epoch();
453        map.insert(test1.key_owned(), test1);
454        map.insert(test3.key_owned(), test3);
455
456        // second epoch
457        map.increment_epoch();
458        map.insert(test2.key_owned(), test2);
459
460        assert_eq!(map.epoch(), 2);
461
462        // test with base epoch, this should return a single changes, both insert/update are consolidated into a single
463        {
464            let changes = map.changes_since(0);
465            assert_eq!(*changes.current_epoch(), 2);
466            assert!(!changes.is_sync_all());
467
468            let (updates, deletes) = changes.parts();
469            assert_eq!(updates.len(), 2);
470            assert_eq!(deletes.len(), 0);
471        }
472
473        // test with middle epoch, this should still return a single changes
474        {
475            let changes = map.changes_since(1);
476            assert_eq!(*changes.current_epoch(), 2);
477            assert!(!changes.is_sync_all());
478            let (updates, deletes) = changes.parts();
479            assert_eq!(updates.len(), 1);
480            assert_eq!(deletes.len(), 0);
481        }
482    }
483}