1use std::sync::Arc;
2use std::fmt::Debug;
3use std::fmt::Display;
4use std::borrow::Borrow;
5use std::collections::HashMap;
6use std::hash::Hash;
7
8use tracing::trace;
9use tracing::{debug, error};
10use async_lock::RwLock;
11use async_lock::RwLockReadGuard;
12use async_lock::RwLockWriteGuard;
13
14use crate::core::{MetadataItem, Spec};
15
16use super::MetadataStoreObject;
17use super::{DualEpochMap, DualEpochCounter, Epoch, EpochChanges};
18use super::actions::LSUpdate;
19use super::event::EventPublisher;
20
21pub use listener::ChangeListener;
22pub type MetadataChanges<S, C> = EpochChanges<MetadataStoreObject<S, C>>;
23
24#[derive(Debug)]
30pub struct LocalStore<S, C>
31where
32 S: Spec,
33 C: MetadataItem,
34{
35 store: RwLock<DualEpochMap<S::IndexKey, MetadataStoreObject<S, C>>>,
36 event_publisher: Arc<EventPublisher>,
37}
38
39impl<S, C> Default for LocalStore<S, C>
40where
41 S: Spec,
42 C: MetadataItem,
43{
44 fn default() -> Self {
45 Self {
46 store: RwLock::new(DualEpochMap::new()),
47 event_publisher: EventPublisher::shared(),
48 }
49 }
50}
51
52impl<S, C> LocalStore<S, C>
53where
54 S: Spec,
55 C: MetadataItem,
56{
57 pub fn bulk_new(objects: Vec<impl Into<MetadataStoreObject<S, C>>>) -> Self {
59 let obj: Vec<MetadataStoreObject<S, C>> = objects.into_iter().map(|s| s.into()).collect();
60 let mut map = HashMap::new();
61 for obj in obj {
62 map.insert(obj.key.clone(), obj.into());
63 }
64 Self {
65 store: RwLock::new(DualEpochMap::new_with_map(map)),
66 event_publisher: EventPublisher::shared(),
67 }
68 }
69
70 pub fn new_shared() -> Arc<Self> {
72 Arc::new(Self::default())
73 }
74
75 #[inline(always)]
77 pub async fn read(
78 &'_ self,
79 ) -> RwLockReadGuard<'_, DualEpochMap<S::IndexKey, MetadataStoreObject<S, C>>> {
80 self.store.read().await
81 }
82
83 #[inline(always)]
85 async fn write(
86 &'_ self,
87 ) -> RwLockWriteGuard<'_, DualEpochMap<S::IndexKey, MetadataStoreObject<S, C>>> {
88 self.store.write().await
89 }
90
91 pub async fn epoch(&self) -> i64 {
93 self.read().await.epoch()
94 }
95
96 pub fn init_epoch(&self) -> DualEpochCounter<()> {
99 DualEpochCounter::default()
100 }
101
102 pub async fn value<K>(&self, key: &K) -> Option<DualEpochCounter<MetadataStoreObject<S, C>>>
104 where
105 S::IndexKey: Borrow<K>,
106 K: ?Sized + Eq + Hash,
107 {
108 self.read().await.get(key).cloned()
109 }
110
111 pub async fn spec<K>(&self, key: &K) -> Option<S>
113 where
114 S::IndexKey: Borrow<K>,
115 K: ?Sized + Eq + Hash,
116 {
117 self.read().await.get(key).map(|value| value.spec.clone())
118 }
119
120 pub async fn find_and_do<K, F>(&self, key: &K, mut func: F) -> Option<()>
122 where
123 F: FnMut(&'_ MetadataStoreObject<S, C>),
124 K: Eq + Hash,
125 S::IndexKey: Borrow<K>,
126 {
127 if let Some(value) = self.read().await.get(key) {
128 func(value);
129 Some(())
130 } else {
131 None
132 }
133 }
134
135 pub async fn contains_key<K>(&self, key: &K) -> bool
136 where
137 S::IndexKey: Borrow<K>,
138 K: ?Sized + Eq + Hash,
139 {
140 self.read().await.contains_key(key)
141 }
142
143 pub async fn count(&self) -> usize {
144 self.read().await.len()
145 }
146
147 pub async fn clone_specs(&self) -> Vec<S> {
148 self.read()
149 .await
150 .values()
151 .map(|kv| kv.spec.clone())
152 .collect()
153 }
154
155 pub async fn clone_keys(&self) -> Vec<S::IndexKey> {
156 self.read().await.clone_keys()
157 }
158
159 pub async fn clone_values(&self) -> Vec<MetadataStoreObject<S, C>> {
160 self.read().await.clone_values()
161 }
162
163 pub fn event_publisher(&self) -> &EventPublisher {
164 &self.event_publisher
165 }
166
167 pub fn change_listener(self: &Arc<Self>) -> ChangeListener<S, C> {
169 ChangeListener::new(self.clone())
170 }
171
172 pub async fn wait_for_first_change(self: &Arc<Self>) {
174 self.change_listener().listen().await;
175 }
176}
177
178impl<S, C> Display for LocalStore<S, C>
179where
180 S: Spec,
181 C: MetadataItem,
182{
183 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
184 write!(f, "{} Store", S::LABEL)
185 }
186}
187
188pub struct SyncStatus {
189 pub epoch: Epoch,
190 pub add: i32,
191 pub update_spec: i32,
192 pub update_status: i32,
193 pub update_meta: i32,
194 pub delete: i32,
195}
196
197impl SyncStatus {
198 pub fn has_spec_changes(&self) -> bool {
199 self.add > 0 || self.update_spec > 0 || self.delete > 0
200 }
201
202 pub fn has_status_changes(&self) -> bool {
203 self.update_status > 0
204 }
205}
206
207impl<S, C> LocalStore<S, C>
208where
209 S: Spec,
210 C: MetadataItem,
211{
212 pub async fn sync_all(&self, incoming_changes: Vec<MetadataStoreObject<S, C>>) -> SyncStatus {
217 let (mut add, mut update_spec, mut update_status, mut update_meta, mut delete) =
218 (0, 0, 0, 0, 0);
219
220 let mut write_guard = self.write().await;
221
222 debug!(
223 "SyncAll: <{}> epoch: {} incoming {}",
224 S::LABEL,
225 write_guard.epoch(),
226 incoming_changes.len()
227 );
228
229 let mut local_keys = write_guard.clone_keys();
230 write_guard.increment_epoch();
232
233 for source in incoming_changes {
234 let key = source.key().clone();
235
236 if let Some(diff) = write_guard.update(key.clone(), source) {
238 if diff.spec {
239 update_spec += 1;
240 }
241 if diff.status {
242 update_status += 1;
243 }
244 if diff.meta {
245 update_meta += 1;
246 }
247 } else {
248 add += 1;
249 }
250
251 local_keys.retain(|n| n != &key);
252 }
253
254 for name in local_keys.into_iter() {
256 if write_guard.contains_key(&name) {
257 if write_guard.remove(&name).is_some() {
258 delete += 1;
259 } else {
260 error!("delete should never fail since key exists: {:#?}", name);
261 }
262 } else {
263 error!("kv unexpectedly removed... skipped {:#?}", name);
264 }
265 }
266
267 write_guard.mark_fence();
268
269 let epoch = write_guard.epoch();
270
271 let status = SyncStatus {
272 epoch,
273 add,
274 update_spec,
275 update_status,
276 update_meta,
277 delete,
278 };
279
280 drop(write_guard);
281
282 self.event_publisher.store_change(epoch);
283
284 debug!(
285 "Sync all: <{}:{}> [add:{}, mod_spec:{}, mod_status: {}, mod_meta: {}, del:{}], ",
286 S::LABEL,
287 epoch,
288 add,
289 update_spec,
290 update_status,
291 update_meta,
292 delete,
293 );
294 status
295 }
296
297 pub async fn apply_changes(&self, changes: Vec<LSUpdate<S, C>>) -> Option<SyncStatus> {
304 let (mut add, mut update_spec, mut update_status, mut update_meta, mut delete) =
305 (0, 0, 0, 0, 0);
306 let mut write_guard = self.write().await;
307 write_guard.increment_epoch();
308
309 debug!(
310 "apply changes <{}> new epoch: {}, incoming: {} items",
311 S::LABEL,
312 write_guard.epoch(),
313 changes.len(),
314 );
315
316 for change in changes.into_iter() {
318 match change {
319 LSUpdate::Mod(new_kv_value) => {
320 let key = new_kv_value.key_owned();
321
322 if let Some(diff) = write_guard.update(key, new_kv_value) {
323 if diff.spec {
324 update_spec += 1;
325 }
326 if diff.status {
327 update_status += 1;
328 }
329 if diff.meta {
330 update_meta += 1;
331 }
332 trace!(update_spec, update_status, update_meta, "update metrics");
333 } else {
334 trace!("new");
335 add += 1;
337 }
338 }
339 LSUpdate::Delete(key) => {
340 write_guard.remove(&key);
341 delete += 1;
342 }
343 }
344 }
345
346 if add == 0 && update_spec == 0 && update_status == 0 && delete == 0 && update_meta == 0 {
348 write_guard.decrement_epoch();
349
350 debug!(
351 "Apply changes: {} no changes, reverting back epoch to: {}",
352 S::LABEL,
353 write_guard.epoch()
354 );
355
356 return None;
357 }
358
359 let epoch = write_guard.epoch();
360
361 let status = SyncStatus {
362 epoch,
363 add,
364 update_spec,
365 update_status,
366 update_meta,
367 delete,
368 };
369
370 drop(write_guard);
371
372 debug!("notify epoch changed: {}", epoch);
373 self.event_publisher.store_change(epoch);
374
375 debug!(
376 "Apply changes {} [add:{},mod_spec:{},mod_status: {},mod_update: {}, del:{},epoch: {}",
377 S::LABEL,
378 add,
379 update_spec,
380 update_status,
381 update_meta,
382 delete,
383 epoch,
384 );
385 Some(status)
386 }
387}
388
389mod listener {
390
391 use std::fmt;
392 use std::sync::Arc;
393
394 use tracing::{trace, debug, instrument};
395
396 use crate::store::event::EventPublisher;
397 use crate::store::{
398 ChangeFlag, FULL_FILTER, META_FILTER, MetadataStoreObject, SPEC_FILTER, STATUS_FILTER,
399 };
400
401 use super::{LocalStore, Spec, MetadataItem, MetadataChanges};
402
403 pub struct ChangeListener<S, C>
405 where
406 S: Spec,
407 C: MetadataItem,
408 {
409 store: Arc<LocalStore<S, C>>,
410 last_change: i64,
411 }
412
413 impl<S, C> fmt::Debug for ChangeListener<S, C>
414 where
415 S: Spec,
416 C: MetadataItem,
417 {
418 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
419 write!(
420 f,
421 "{} last:{},current:{}",
422 S::LABEL,
423 self.last_change,
424 self.event_publisher().current_change()
425 )
426 }
427 }
428
429 impl<S, C> ChangeListener<S, C>
430 where
431 S: Spec,
432 C: MetadataItem,
433 {
434 pub fn new(store: Arc<LocalStore<S, C>>) -> Self {
435 Self {
436 store,
437 last_change: 0,
438 }
439 }
440
441 #[inline]
442 fn event_publisher(&self) -> &EventPublisher {
443 self.store.event_publisher()
444 }
445
446 #[inline]
450 pub fn has_change(&self) -> bool {
451 self.event_publisher().current_change() > self.last_change
452 }
453
454 #[inline(always)]
456 pub fn load_last(&mut self) {
457 self.set_last_change(self.event_publisher().current_change());
458 }
459
460 #[inline(always)]
461 pub fn set_last_change(&mut self, updated_change: i64) {
462 self.last_change = updated_change;
463 }
464
465 #[inline]
466 pub fn last_change(&self) -> i64 {
467 self.last_change
468 }
469
470 pub fn current_change(&self) -> i64 {
471 self.event_publisher().current_change()
472 }
473
474 pub async fn listen(&self) {
475 if self.has_change() {
476 trace!("before has change: {}", self.last_change());
477 return;
478 }
479
480 let listener = self.event_publisher().listen();
481
482 if self.has_change() {
483 trace!("after has change: {}", self.last_change());
484 return;
485 }
486
487 trace!("waiting for publisher");
488
489 listener.await;
490
491 trace!("new change: {}", self.current_change());
492 }
493
494 pub async fn sync_changes(&mut self) -> MetadataChanges<S, C> {
496 self.sync_changes_with_filter(&FULL_FILTER).await
497 }
498
499 pub async fn sync_spec_changes(&mut self) -> MetadataChanges<S, C> {
501 self.sync_changes_with_filter(&SPEC_FILTER).await
502 }
503
504 pub async fn sync_status_changes(&mut self) -> MetadataChanges<S, C> {
506 self.sync_changes_with_filter(&STATUS_FILTER).await
507 }
508
509 pub async fn sync_meta_changes(&mut self) -> MetadataChanges<S, C> {
511 self.sync_changes_with_filter(&META_FILTER).await
512 }
513
514 pub async fn sync_changes_with_filter(
516 &mut self,
517 filter: &ChangeFlag,
518 ) -> MetadataChanges<S, C> {
519 let read_guard = self.store.read().await;
520 let changes = read_guard.changes_since_with_filter(self.last_change, filter);
521 drop(read_guard);
522 trace!(
523 "finding last status change: {}, from: {}",
524 self.last_change, changes.epoch
525 );
526
527 let current_epoch = self.event_publisher().current_change();
528 if changes.epoch > current_epoch {
529 trace!(
530 "latest epoch: {} > spec epoch: {}",
531 changes.epoch, current_epoch
532 );
533 }
534 self.set_last_change(changes.epoch);
535 changes
536 }
537
538 #[instrument()]
540 pub async fn wait_for_initial_sync(&mut self) -> Vec<MetadataStoreObject<S, C>> {
541 debug!("waiting");
542 self.listen().await;
543
544 let changes = self.sync_changes().await;
545 assert!(changes.is_sync_all());
546
547 debug!("finished initial sync");
548 changes.parts().0
549 }
550 }
551}
552
553#[cfg(test)]
554#[cfg(feature = "fixtures")]
555mod test {
556
557 use crate::store::actions::LSUpdate;
558 use crate::fixture::{TestSpec, TestStatus, DefaultTest, TestMeta};
559
560 use super::LocalStore;
561
562 type DefaultTestStore = LocalStore<TestSpec, TestMeta>;
563
564 #[fluvio_future::test]
565 async fn test_store_sync_all() {
566 let tests = vec![DefaultTest::with_spec("t1", TestSpec::default())];
567 let test_store = DefaultTestStore::default();
568 assert_eq!(test_store.epoch().await, 0);
569
570 let sync1 = test_store.sync_all(tests.clone()).await;
571 assert_eq!(test_store.epoch().await, 1);
572 assert_eq!(sync1.add, 1);
573 assert_eq!(sync1.delete, 0);
574 assert_eq!(sync1.update_spec, 0);
575 assert_eq!(sync1.update_status, 0);
576
577 let read_guard = test_store.read().await;
578 let test1 = read_guard.get("t1").expect("t1 should exists");
579 assert_eq!(test1.status_epoch(), 1);
580 assert_eq!(test1.spec_epoch(), 1);
581 drop(read_guard);
582
583 let spec_changes =
586 vec![DefaultTest::with_spec("t1", TestSpec { replica: 6 }).with_context(2)];
587 let sync2 = test_store.sync_all(spec_changes.clone()).await;
588 assert_eq!(test_store.epoch().await, 2);
589 assert_eq!(sync2.add, 0);
590 assert_eq!(sync2.delete, 0);
591 assert_eq!(sync2.update_spec, 1);
592 assert_eq!(sync2.update_status, 0);
593
594 let sync3 = test_store.sync_all(spec_changes.clone()).await;
596 assert_eq!(test_store.epoch().await, 3);
597 assert_eq!(sync3.add, 0);
598 assert_eq!(sync3.delete, 0);
599 assert_eq!(sync3.update_spec, 0);
600 assert_eq!(sync3.update_status, 0);
601 }
602
603 #[fluvio_future::test]
604 async fn test_store_update() {
605 let initial_topic = DefaultTest::with_spec("t1", TestSpec::default()).with_context(2);
606
607 let topic_store = DefaultTestStore::default();
608 let _ = topic_store.sync_all(vec![initial_topic.clone()]).await;
609 assert_eq!(topic_store.epoch().await, 1);
610
611 assert!(
613 topic_store
614 .apply_changes(vec![LSUpdate::Mod(initial_topic.clone())])
615 .await
616 .is_none()
617 );
618 assert_eq!(topic_store.epoch().await, 1);
619
620 let topic2 =
622 DefaultTest::new("t1", TestSpec::default(), TestStatus { up: true }).with_context(3);
623 let changes = topic_store
624 .apply_changes(vec![LSUpdate::Mod(topic2)])
625 .await
626 .expect("some changes");
627 assert_eq!(changes.update_spec, 0);
628 assert_eq!(changes.update_status, 1);
629 assert_eq!(topic_store.epoch().await, 2);
630 assert_eq!(
631 topic_store.value("t1").await.expect("t1").ctx().item().rev,
632 3
633 );
634
635 assert_eq!(initial_topic.ctx().item().rev, 2);
638 let changes = topic_store
639 .apply_changes(vec![LSUpdate::Mod(initial_topic.clone())])
640 .await;
641 assert_eq!(topic_store.epoch().await, 2);
642 assert!(changes.is_none());
643 assert_eq!(
644 topic_store.value("t1").await.expect("t1").status,
645 TestStatus { up: true }
646 );
647
648 let sync_all = topic_store.sync_all(vec![initial_topic]).await;
650 assert_eq!(topic_store.epoch().await, 3);
651 assert_eq!(sync_all.add, 0);
652 assert_eq!(sync_all.delete, 0);
653 assert_eq!(sync_all.update_spec, 0);
654 assert_eq!(sync_all.update_status, 0);
655 }
656}
657
658#[cfg(test)]
659#[cfg(feature = "fixture")]
660mod test_notify {
661
662 use std::sync::Arc;
663 use std::time::Duration;
664 use std::sync::atomic::{AtomicI64, AtomicBool};
665 use std::sync::atomic::Ordering::SeqCst;
666
667 use tokio::select;
668 use tracing::debug;
669
670 use fluvio_future::task::{spawn, spawn_task, Task};
671 use fluvio_future::timer::sleep;
672
673 use crate::core::{Spec, MetadataItem};
674 use crate::store::actions::LSUpdate;
675 use crate::store::event::SimpleEvent;
676 use crate::fixture::{TestSpec, DefaultTest, TestMeta};
677
678 use super::LocalStore;
679
680 type DefaultTestStore = LocalStore<TestSpec, TestMeta>;
681
682 use super::ChangeListener;
683
684 struct TestController {
685 store: Arc<DefaultTestStore>,
686 shutdown: Arc<SimpleEvent>,
687 last_change: Arc<AtomicI64>,
688 }
689
690 impl TestController {
691 fn start(
692 ctx: Arc<DefaultTestStore>,
693 shutdown: Arc<SimpleEvent>,
694 last_change: Arc<AtomicI64>,
695 ) {
696 let controller = Self {
697 store: ctx,
698 shutdown,
699 last_change,
700 };
701
702 spawn(controller.dispatch_loop());
703 }
704
705 async fn dispatch_loop(mut self) {
706 debug!("entering loop");
707
708 let mut spec_listener = self.store.change_listener();
709
710 loop {
711 self.sync(&mut spec_listener).await;
712
713 select! {
714 _ = spec_listener.listen() => {
715 debug!("spec change occur: {}",spec_listener.last_change());
716 continue;
717 },
718 _ = self.shutdown.listen() => {
719 debug!("shutdown");
720 break;
721 }
722 }
723 }
724 }
725
726 async fn sync(&mut self, spec_listener: &mut ChangeListener<TestSpec, TestMeta>) {
727 debug!("sync start");
728 let (update, _delete) = spec_listener.sync_spec_changes().await.parts();
729 debug!("changes: {}", update.len());
731 sleep(Duration::from_millis(10)).await;
732 debug!("sync end");
733 self.last_change.fetch_add(1, SeqCst);
734 }
735 }
736
737 #[fluvio_future::test]
738 async fn test_store_notifications() {
739 let topic_store = Arc::new(DefaultTestStore::default());
740 let last_change = Arc::new(AtomicI64::new(0));
741 let shutdown = SimpleEvent::shared();
742
743 TestController::start(topic_store.clone(), shutdown.clone(), last_change.clone());
744
745 let initial_topic = DefaultTest::with_spec("t1", TestSpec::default()).with_context(2);
746 let _ = topic_store.sync_all(vec![initial_topic.clone()]).await;
747
748 for i in 0..10u16 {
749 sleep(Duration::from_millis(2)).await;
750 let topic_name = format!("topic{i}");
751 debug!("creating topic: {}", topic_name);
752 let topic = DefaultTest::with_spec(topic_name, TestSpec::default()).with_context(3);
753 let _ = topic_store.apply_changes(vec![LSUpdate::Mod(topic)]).await;
754 }
755
756 sleep(Duration::from_millis(100)).await;
758 shutdown.notify();
759 sleep(Duration::from_millis(1)).await;
760
761 }
763 #[fluvio_future::test]
764 async fn test_change_listener_non_blocking() {
765 let mut timer = sleep(Duration::from_millis(5));
766 let store = Arc::new(DefaultTestStore::default());
767 let listener = store.change_listener();
768
769 select! {
771
772 _ = listener.listen() => {
773 panic!("test failed");
774 },
775 _ = &mut timer => {
776 }
778
779 }
780 }
781
782 #[test]
783 fn test_wait_for_first_change_assumptions() {
784 let topic_store = Arc::new(DefaultTestStore::default());
785
786 assert_eq!(0, topic_store.change_listener().current_change())
788 }
789
790 #[fluvio_future::test]
791 async fn test_change_listener() {
792 let topic_store = Arc::new(DefaultTestStore::default());
793 let last_change = Arc::new(AtomicI64::new(0));
794 let shutdown = SimpleEvent::shared();
795 let topic_name = "topic";
796 let initial_topic = DefaultTest::with_spec(topic_name, TestSpec::default());
797 let has_been_updated = Arc::new(AtomicBool::default());
798
799 let jh = start_batch_of_test_listeners(topic_store.clone(), has_been_updated.clone());
801 TestController::start(topic_store.clone(), shutdown.clone(), last_change.clone());
802
803 has_been_updated.store(true, std::sync::atomic::Ordering::Relaxed);
805 let _ = topic_store.sync_all(vec![initial_topic.clone()]).await;
806
807 for j in jh {
809 j.await
810 }
811
812 let jh = start_batch_of_test_listeners(topic_store.clone(), has_been_updated.clone());
814 for j in jh {
816 j.await
817 }
818
819 let topic = DefaultTest::with_spec(topic_name, TestSpec::default());
821
822 let _ = topic_store.apply_changes(vec![LSUpdate::Mod(topic)]).await;
823
824 let jh = start_batch_of_test_listeners(topic_store, has_been_updated);
826 for j in jh {
828 j.await
829 }
830
831 sleep(Duration::from_millis(100)).await;
833 shutdown.notify();
834 sleep(Duration::from_millis(1)).await;
835 }
836
837 fn start_batch_of_test_listeners(
838 store: Arc<LocalStore<TestSpec, TestMeta>>,
839 has_been_updated: Arc<AtomicBool>,
840 ) -> Vec<Task<()>> {
841 (0..10u32)
842 .map(|_| {
844 let store = store.clone();
845
846 spawn_task(listener_thread(store, has_been_updated.clone()))
847 })
848 .collect()
849 }
850
851 async fn listener_thread<S, C>(store: Arc<LocalStore<S, C>>, has_been_updated: Arc<AtomicBool>)
852 where
853 S: Spec,
854 C: MetadataItem,
855 {
856 store.wait_for_first_change().await;
857 assert!(has_been_updated.load(std::sync::atomic::Ordering::Relaxed));
859 }
860}