fluvio_stream_model/epoch/
epoch_map.rs1use std::ops::Deref;
2use std::ops::DerefMut;
3use std::hash::Hash;
4use std::hash::Hasher;
5use std::cmp::Eq;
6use std::cmp::PartialEq;
7use std::fmt;
8
9pub type Epoch = i64;
10
11#[derive(Debug, Default, Clone)]
14pub struct EpochCounter<T> {
15 epoch: Epoch,
16 inner: T,
17}
18
19impl<T> Hash for EpochCounter<T>
20where
21 T: Hash,
22{
23 fn hash<H: Hasher>(&self, state: &mut H) {
24 self.inner.hash(state);
25 }
26}
27
28impl<T> PartialEq for EpochCounter<T>
29where
30 T: PartialEq,
31{
32 fn eq(&self, other: &Self) -> bool {
33 self.inner == other.inner
34 }
35}
36
37impl<T> Eq for EpochCounter<T> where T: Eq {}
38
39impl<T> Deref for EpochCounter<T> {
40 type Target = T;
41
42 fn deref(&self) -> &Self::Target {
43 &self.inner
44 }
45}
46
47impl<T> DerefMut for EpochCounter<T> {
48 fn deref_mut(&mut self) -> &mut Self::Target {
49 &mut self.inner
50 }
51}
52
53impl<T> fmt::Display for EpochCounter<T> {
54 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
55 write!(f, "epoch: {}", self.epoch)
56 }
57}
58
59impl<T> From<T> for EpochCounter<T> {
60 fn from(inner: T) -> Self {
61 Self { epoch: 0, inner }
62 }
63}
64
65impl<T> EpochCounter<T> {
66 pub fn new(inner: T) -> Self {
67 Self { epoch: 0, inner }
68 }
69
70 pub fn new_with_epoch(inner: T, epoch: impl Into<i64>) -> Self {
71 Self {
72 epoch: epoch.into(),
73 inner,
74 }
75 }
76
77 pub fn inner(&self) -> &T {
78 &self.inner
79 }
80
81 pub fn inner_mut(&mut self) -> &mut T {
82 &mut self.inner
83 }
84
85 pub fn inner_owned(self) -> T {
86 self.inner
87 }
88
89 pub fn epoch(&self) -> Epoch {
90 self.epoch
91 }
92
93 fn set_epoch(&mut self, epoch: Epoch) {
94 self.epoch = epoch;
95 }
96
97 pub fn increment(&mut self) {
98 self.epoch += 1;
99 }
100
101 pub fn decrement(&mut self) {
102 self.epoch -= 1;
103 }
104}
105
106pub use old_map::*;
107
108mod old_map {
109
110 use std::collections::HashMap;
111 use std::hash::Hash;
112 use std::borrow::Borrow;
113
114 use super::*;
115
116 #[derive(Debug, Default)]
120 pub struct EpochMap<K, V> {
121 epoch: EpochCounter<()>,
122 fence: EpochCounter<()>, map: HashMap<K, EpochCounter<V>>,
124 deleted: Vec<EpochCounter<V>>,
125 }
126
127 impl<K, V> Deref for EpochMap<K, V> {
128 type Target = HashMap<K, EpochCounter<V>>;
129
130 fn deref(&self) -> &Self::Target {
131 &self.map
132 }
133 }
134
135 impl<K, V> DerefMut for EpochMap<K, V> {
136 fn deref_mut(&mut self) -> &mut Self::Target {
137 &mut self.map
138 }
139 }
140
141 impl<K, V> EpochMap<K, V> {
142 pub fn increment_epoch(&mut self) {
143 self.epoch.increment();
144 }
145
146 pub fn epoch(&self) -> Epoch {
147 self.epoch.epoch()
148 }
149
150 pub fn mark_fence(&mut self) {
153 self.deleted = vec![];
154 self.fence = self.epoch.clone();
155 }
156 }
157
158 impl<K, V> EpochMap<K, V>
159 where
160 K: Eq + Hash,
161 {
162 pub fn new() -> Self {
163 Self::new_with_map(HashMap::new())
164 }
165
166 pub fn new_with_map(map: HashMap<K, EpochCounter<V>>) -> Self {
167 Self {
168 epoch: EpochCounter::default(),
169 fence: EpochCounter::default(),
170 map,
171 deleted: vec![],
172 }
173 }
174
175 pub fn insert(&mut self, key: K, value: V) -> Option<EpochCounter<V>>
178 where
179 K: Clone,
180 {
181 let mut epoch_value: EpochCounter<V> = value.into();
182 epoch_value.set_epoch(self.epoch.epoch());
183 self.map.insert(key, epoch_value)
184 }
185
186 pub fn remove<Q>(&mut self, k: &Q) -> Option<EpochCounter<V>>
189 where
190 K: Borrow<Q>,
191 Q: ?Sized + Hash + Eq,
192 V: Clone,
193 {
194 if let Some((_, mut old_value)) = self.map.remove_entry(k) {
195 old_value.set_epoch(self.epoch.epoch());
196 self.deleted.push(old_value.clone());
197 Some(old_value)
198 } else {
199 None
200 }
201 }
202 }
203
204 impl<K, V> EpochMap<K, V>
205 where
206 K: Clone,
207 {
208 pub fn clone_keys(&self) -> Vec<K> {
209 self.keys().cloned().collect()
210 }
211 }
212
213 impl<K, V> EpochMap<K, V>
214 where
215 V: Clone,
216 K: Clone,
217 {
218 pub fn clone_values(&self) -> Vec<V> {
219 self.values().cloned().map(|c| c.inner_owned()).collect()
220 }
221
222 pub fn changes_since<E>(&self, epoch_value: E) -> EpochChanges<V>
227 where
228 Epoch: From<E>,
229 {
230 let epoch = epoch_value.into();
231 if epoch < self.fence.epoch() {
232 return EpochChanges {
233 epoch: self.epoch.epoch(),
234 changes: EpochDeltaChanges::SyncAll(self.clone_values()),
235 };
236 }
237
238 if epoch == self.epoch() {
239 return EpochChanges {
240 epoch: self.epoch.epoch(),
241 changes: EpochDeltaChanges::empty(),
242 };
243 }
244
245 let updates = self
246 .values()
247 .filter_map(|v| {
248 if v.epoch > epoch {
249 Some(v.inner().clone())
250 } else {
251 None
252 }
253 })
254 .collect();
255
256 let deletes = self
257 .deleted
258 .iter()
259 .filter_map(|d| {
260 if d.epoch > epoch {
261 Some(d.inner().clone())
262 } else {
263 None
264 }
265 })
266 .collect();
267
268 EpochChanges {
269 epoch: self.epoch.epoch(),
270 changes: EpochDeltaChanges::Changes((updates, deletes)),
271 }
272 }
273 }
274
275 pub struct EpochChanges<V> {
276 pub epoch: Epoch,
278 changes: EpochDeltaChanges<V>,
279 }
280
281 impl<V: fmt::Debug> fmt::Debug for EpochChanges<V> {
282 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
283 f.debug_struct("EpochChanges")
284 .field("epoch", &self.epoch)
285 .field("changes", &self.changes)
286 .finish()
287 }
288 }
289
290 impl<V> EpochChanges<V> {
291 pub fn new(epoch: Epoch, changes: EpochDeltaChanges<V>) -> Self {
292 Self { epoch, changes }
293 }
294
295 pub fn current_epoch(&self) -> &Epoch {
297 &self.epoch
298 }
299
300 pub fn parts(self) -> (Vec<V>, Vec<V>) {
303 match self.changes {
304 EpochDeltaChanges::SyncAll(all) => (all, vec![]),
305 EpochDeltaChanges::Changes(changes) => changes,
306 }
307 }
308
309 pub fn is_empty(&self) -> bool {
310 match &self.changes {
311 EpochDeltaChanges::SyncAll(all) => all.is_empty(),
312 EpochDeltaChanges::Changes(changes) => changes.0.is_empty() && changes.1.is_empty(),
313 }
314 }
315
316 pub fn is_sync_all(&self) -> bool {
318 match &self.changes {
319 EpochDeltaChanges::SyncAll(_) => true,
320 EpochDeltaChanges::Changes(_) => false,
321 }
322 }
323 }
324
325 pub enum EpochDeltaChanges<V> {
326 SyncAll(Vec<V>),
327 Changes((Vec<V>, Vec<V>)),
328 }
329
330 impl<V> EpochDeltaChanges<V> {
331 pub fn empty() -> Self {
332 Self::Changes((vec![], vec![]))
333 }
334 }
335
336 impl<V: fmt::Debug> fmt::Debug for EpochDeltaChanges<V> {
337 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
338 match self {
339 Self::SyncAll(all) => f.debug_tuple("SyncAll").field(all).finish(),
340 Self::Changes((add, del)) => {
341 f.debug_tuple("Changes").field(add).field(del).finish()
342 }
343 }
344 }
345 }
346}
347
348#[cfg(test)]
349mod test {
350
351 use std::fmt::Display;
352
353 use serde::{Serialize, Deserialize};
354
355 use crate::core::{Spec, Status};
356 use crate::store::DefaultMetadataObject;
357
358 use super::EpochMap;
359
360 #[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
362 struct TestSpec {
363 replica: u16,
364 }
365
366 impl Spec for TestSpec {
367 const LABEL: &'static str = "Test";
368 type IndexKey = String;
369 type Owner = Self;
370 type Status = TestStatus;
371 }
372
373 #[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
374 struct TestStatus {
375 up: bool,
376 }
377
378 impl Status for TestStatus {}
379
380 impl Display for TestStatus {
381 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
382 write!(f, "{self:?}")
383 }
384 }
385
386 type DefaultTest = DefaultMetadataObject<TestSpec>;
387
388 type TestEpochMap = EpochMap<String, DefaultTest>;
389
390 #[test]
391 fn test_epoch_map_empty() {
392 let map = TestEpochMap::new();
393 assert_eq!(map.epoch(), 0);
394 }
395
396 #[test]
397 fn test_epoch_map_insert() {
398 let mut map = TestEpochMap::new();
399
400 map.increment_epoch();
404
405 let test1 = DefaultTest::with_key("t1");
406 map.insert(test1.key_owned(), test1);
407
408 assert_eq!(map.epoch(), 1);
409
410 {
412 let changes = map.changes_since(-1);
413 assert_eq!(*changes.current_epoch(), 1); assert!(changes.is_sync_all()); let (updates, deletes) = changes.parts();
417 assert_eq!(updates.len(), 1);
418 assert_eq!(deletes.len(), 0);
419 }
420
421 {
423 let changes = map.changes_since(0);
424 assert_eq!(*changes.current_epoch(), 1); assert!(!changes.is_sync_all()); let (updates, deletes) = changes.parts();
428 assert_eq!(updates.len(), 1);
429 assert_eq!(deletes.len(), 0);
430 }
431
432 {
434 let changes = map.changes_since(1);
435 assert_eq!(*changes.current_epoch(), 1); assert!(!changes.is_sync_all()); let (updates, deletes) = changes.parts();
438 assert_eq!(updates.len(), 0);
439 assert_eq!(deletes.len(), 0);
440 }
441 }
442
443 #[test]
444 fn test_epoch_map_insert_update() {
445 let mut map = TestEpochMap::new();
446
447 let test1 = DefaultTest::with_key("t1");
448 let test2 = test1.clone();
449 let test3 = DefaultTest::with_key("t2");
450
451 map.increment_epoch();
453 map.insert(test1.key_owned(), test1);
454 map.insert(test3.key_owned(), test3);
455
456 map.increment_epoch();
458 map.insert(test2.key_owned(), test2);
459
460 assert_eq!(map.epoch(), 2);
461
462 {
464 let changes = map.changes_since(0);
465 assert_eq!(*changes.current_epoch(), 2);
466 assert!(!changes.is_sync_all());
467
468 let (updates, deletes) = changes.parts();
469 assert_eq!(updates.len(), 2);
470 assert_eq!(deletes.len(), 0);
471 }
472
473 {
475 let changes = map.changes_since(1);
476 assert_eq!(*changes.current_epoch(), 2);
477 assert!(!changes.is_sync_all());
478 let (updates, deletes) = changes.parts();
479 assert_eq!(updates.len(), 1);
480 assert_eq!(deletes.len(), 0);
481 }
482 }
483}