fluvio_stream_model/store/
dual_store.rs

1use std::sync::Arc;
2use std::fmt::Debug;
3use std::fmt::Display;
4use std::borrow::Borrow;
5use std::collections::HashMap;
6use std::hash::Hash;
7
8use tracing::trace;
9use tracing::{debug, error};
10use async_lock::RwLock;
11use async_lock::RwLockReadGuard;
12use async_lock::RwLockWriteGuard;
13
14use crate::core::{MetadataItem, Spec};
15
16use super::MetadataStoreObject;
17use super::{DualEpochMap, DualEpochCounter, Epoch, EpochChanges};
18use super::actions::LSUpdate;
19use super::event::EventPublisher;
20
21pub use listener::ChangeListener;
22pub type MetadataChanges<S, C> = EpochChanges<MetadataStoreObject<S, C>>;
23
24/// Idempotent local memory cache of meta objects.
25/// There are only 2 write operations are permitted: sync and apply changes which are idempotent.
26/// For read, read guards are provided which provide hash map API using deref.  
27/// Hash values are wrapped in EpochCounter.  EpochCounter is also deref.
28/// Using async lock to ensure read/write are thread safe.
29#[derive(Debug)]
30pub struct LocalStore<S, C>
31where
32    S: Spec,
33    C: MetadataItem,
34{
35    store: RwLock<DualEpochMap<S::IndexKey, MetadataStoreObject<S, C>>>,
36    event_publisher: Arc<EventPublisher>,
37}
38
39impl<S, C> Default for LocalStore<S, C>
40where
41    S: Spec,
42    C: MetadataItem,
43{
44    fn default() -> Self {
45        Self {
46            store: RwLock::new(DualEpochMap::new()),
47            event_publisher: EventPublisher::shared(),
48        }
49    }
50}
51
52impl<S, C> LocalStore<S, C>
53where
54    S: Spec,
55    C: MetadataItem,
56{
57    /// initialize local stores with list of metadata objects
58    pub fn bulk_new(objects: Vec<impl Into<MetadataStoreObject<S, C>>>) -> Self {
59        let obj: Vec<MetadataStoreObject<S, C>> = objects.into_iter().map(|s| s.into()).collect();
60        let mut map = HashMap::new();
61        for obj in obj {
62            map.insert(obj.key.clone(), obj.into());
63        }
64        Self {
65            store: RwLock::new(DualEpochMap::new_with_map(map)),
66            event_publisher: EventPublisher::shared(),
67        }
68    }
69
70    /// create arc wrapper
71    pub fn new_shared() -> Arc<Self> {
72        Arc::new(Self::default())
73    }
74
75    /// Read guard
76    #[inline(always)]
77    pub async fn read(
78        &'_ self,
79    ) -> RwLockReadGuard<'_, DualEpochMap<S::IndexKey, MetadataStoreObject<S, C>>> {
80        self.store.read().await
81    }
82
83    /// write guard, this is private, use sync API to make changes
84    #[inline(always)]
85    async fn write(
86        &'_ self,
87    ) -> RwLockWriteGuard<'_, DualEpochMap<S::IndexKey, MetadataStoreObject<S, C>>> {
88        self.store.write().await
89    }
90
91    /// current epoch
92    pub async fn epoch(&self) -> i64 {
93        self.read().await.epoch()
94    }
95
96    /// initial epoch that should be used
97    /// store will always have epoch greater than init_epoch if there are any changes
98    pub fn init_epoch(&self) -> DualEpochCounter<()> {
99        DualEpochCounter::default()
100    }
101
102    /// copy of the value
103    pub async fn value<K>(&self, key: &K) -> Option<DualEpochCounter<MetadataStoreObject<S, C>>>
104    where
105        S::IndexKey: Borrow<K>,
106        K: ?Sized + Eq + Hash,
107    {
108        self.read().await.get(key).cloned()
109    }
110
111    /// copy spec
112    pub async fn spec<K>(&self, key: &K) -> Option<S>
113    where
114        S::IndexKey: Borrow<K>,
115        K: ?Sized + Eq + Hash,
116    {
117        self.read().await.get(key).map(|value| value.spec.clone())
118    }
119
120    /// iterate over entry
121    pub async fn find_and_do<K, F>(&self, key: &K, mut func: F) -> Option<()>
122    where
123        F: FnMut(&'_ MetadataStoreObject<S, C>),
124        K: Eq + Hash,
125        S::IndexKey: Borrow<K>,
126    {
127        if let Some(value) = self.read().await.get(key) {
128            func(value);
129            Some(())
130        } else {
131            None
132        }
133    }
134
135    pub async fn contains_key<K>(&self, key: &K) -> bool
136    where
137        S::IndexKey: Borrow<K>,
138        K: ?Sized + Eq + Hash,
139    {
140        self.read().await.contains_key(key)
141    }
142
143    pub async fn count(&self) -> usize {
144        self.read().await.len()
145    }
146
147    pub async fn clone_specs(&self) -> Vec<S> {
148        self.read()
149            .await
150            .values()
151            .map(|kv| kv.spec.clone())
152            .collect()
153    }
154
155    pub async fn clone_keys(&self) -> Vec<S::IndexKey> {
156        self.read().await.clone_keys()
157    }
158
159    pub async fn clone_values(&self) -> Vec<MetadataStoreObject<S, C>> {
160        self.read().await.clone_values()
161    }
162
163    pub fn event_publisher(&self) -> &EventPublisher {
164        &self.event_publisher
165    }
166
167    /// create new change listener
168    pub fn change_listener(self: &Arc<Self>) -> ChangeListener<S, C> {
169        ChangeListener::new(self.clone())
170    }
171
172    /// returns once there is at least one change recorded by the the event_publisher
173    pub async fn wait_for_first_change(self: &Arc<Self>) {
174        self.change_listener().listen().await;
175    }
176}
177
178impl<S, C> Display for LocalStore<S, C>
179where
180    S: Spec,
181    C: MetadataItem,
182{
183    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
184        write!(f, "{} Store", S::LABEL)
185    }
186}
187
188pub struct SyncStatus {
189    pub epoch: Epoch,
190    pub add: i32,
191    pub update_spec: i32,
192    pub update_status: i32,
193    pub update_meta: i32,
194    pub delete: i32,
195}
196
197impl SyncStatus {
198    pub fn has_spec_changes(&self) -> bool {
199        self.add > 0 || self.update_spec > 0 || self.delete > 0
200    }
201
202    pub fn has_status_changes(&self) -> bool {
203        self.update_status > 0
204    }
205}
206
207impl<S, C> LocalStore<S, C>
208where
209    S: Spec,
210    C: MetadataItem,
211{
212    /// sync with incoming changes as source of truth.
213    /// any objects not in incoming list will be deleted
214    /// after sync operation, prior history will be removed and any subsequent
215    /// change query will return full list instead of changes
216    pub async fn sync_all(&self, incoming_changes: Vec<MetadataStoreObject<S, C>>) -> SyncStatus {
217        let (mut add, mut update_spec, mut update_status, mut update_meta, mut delete) =
218            (0, 0, 0, 0, 0);
219
220        let mut write_guard = self.write().await;
221
222        debug!(
223            "SyncAll: <{}> epoch: {} incoming {}",
224            S::LABEL,
225            write_guard.epoch(),
226            incoming_changes.len()
227        );
228
229        let mut local_keys = write_guard.clone_keys();
230        // start new epoch cycle
231        write_guard.increment_epoch();
232
233        for source in incoming_changes {
234            let key = source.key().clone();
235
236            // always insert, so we stamp current epoch
237            if let Some(diff) = write_guard.update(key.clone(), source) {
238                if diff.spec {
239                    update_spec += 1;
240                }
241                if diff.status {
242                    update_status += 1;
243                }
244                if diff.meta {
245                    update_meta += 1;
246                }
247            } else {
248                add += 1;
249            }
250
251            local_keys.retain(|n| n != &key);
252        }
253
254        // delete value that shouldn't be there
255        for name in local_keys.into_iter() {
256            if write_guard.contains_key(&name) {
257                if write_guard.remove(&name).is_some() {
258                    delete += 1;
259                } else {
260                    error!("delete  should never fail since key exists: {:#?}", name);
261                }
262            } else {
263                error!("kv unexpectedly removed... skipped {:#?}", name);
264            }
265        }
266
267        write_guard.mark_fence();
268
269        let epoch = write_guard.epoch();
270
271        let status = SyncStatus {
272            epoch,
273            add,
274            update_spec,
275            update_status,
276            update_meta,
277            delete,
278        };
279
280        drop(write_guard);
281
282        self.event_publisher.store_change(epoch);
283
284        debug!(
285            "Sync all: <{}:{}> [add:{}, mod_spec:{}, mod_status: {}, mod_meta: {}, del:{}], ",
286            S::LABEL,
287            epoch,
288            add,
289            update_spec,
290            update_status,
291            update_meta,
292            delete,
293        );
294        status
295    }
296
297    /// apply changes to this store
298    /// if item doesn't exit, it will be treated as add
299    /// if item exist but different, it will be treated as updated
300    /// epoch will be only incremented if there are actual changes
301    /// which means this is idempotent operations.
302    /// same add result in only 1 single epoch increase.
303    pub async fn apply_changes(&self, changes: Vec<LSUpdate<S, C>>) -> Option<SyncStatus> {
304        let (mut add, mut update_spec, mut update_status, mut update_meta, mut delete) =
305            (0, 0, 0, 0, 0);
306        let mut write_guard = self.write().await;
307        write_guard.increment_epoch();
308
309        debug!(
310            "apply changes <{}> new epoch: {}, incoming: {} items",
311            S::LABEL,
312            write_guard.epoch(),
313            changes.len(),
314        );
315
316        // loop through items and generate add/mod actions
317        for change in changes.into_iter() {
318            match change {
319                LSUpdate::Mod(new_kv_value) => {
320                    let key = new_kv_value.key_owned();
321
322                    if let Some(diff) = write_guard.update(key, new_kv_value) {
323                        if diff.spec {
324                            update_spec += 1;
325                        }
326                        if diff.status {
327                            update_status += 1;
328                        }
329                        if diff.meta {
330                            update_meta += 1;
331                        }
332                        trace!(update_spec, update_status, update_meta, "update metrics");
333                    } else {
334                        trace!("new");
335                        // there was no existing, so this is new
336                        add += 1;
337                    }
338                }
339                LSUpdate::Delete(key) => {
340                    write_guard.remove(&key);
341                    delete += 1;
342                }
343            }
344        }
345
346        // if there are no changes, we revert epoch
347        if add == 0 && update_spec == 0 && update_status == 0 && delete == 0 && update_meta == 0 {
348            write_guard.decrement_epoch();
349
350            debug!(
351                "Apply changes: {} no changes, reverting back epoch to: {}",
352                S::LABEL,
353                write_guard.epoch()
354            );
355
356            return None;
357        }
358
359        let epoch = write_guard.epoch();
360
361        let status = SyncStatus {
362            epoch,
363            add,
364            update_spec,
365            update_status,
366            update_meta,
367            delete,
368        };
369
370        drop(write_guard);
371
372        debug!("notify epoch changed: {}", epoch);
373        self.event_publisher.store_change(epoch);
374
375        debug!(
376            "Apply changes {} [add:{},mod_spec:{},mod_status: {},mod_update: {}, del:{},epoch: {}",
377            S::LABEL,
378            add,
379            update_spec,
380            update_status,
381            update_meta,
382            delete,
383            epoch,
384        );
385        Some(status)
386    }
387}
388
389mod listener {
390
391    use std::fmt;
392    use std::sync::Arc;
393
394    use tracing::{trace, debug, instrument};
395
396    use crate::store::event::EventPublisher;
397    use crate::store::{
398        ChangeFlag, FULL_FILTER, META_FILTER, MetadataStoreObject, SPEC_FILTER, STATUS_FILTER,
399    };
400
401    use super::{LocalStore, Spec, MetadataItem, MetadataChanges};
402
403    /// listen for changes local store
404    pub struct ChangeListener<S, C>
405    where
406        S: Spec,
407        C: MetadataItem,
408    {
409        store: Arc<LocalStore<S, C>>,
410        last_change: i64,
411    }
412
413    impl<S, C> fmt::Debug for ChangeListener<S, C>
414    where
415        S: Spec,
416        C: MetadataItem,
417    {
418        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
419            write!(
420                f,
421                "{} last:{},current:{}",
422                S::LABEL,
423                self.last_change,
424                self.event_publisher().current_change()
425            )
426        }
427    }
428
429    impl<S, C> ChangeListener<S, C>
430    where
431        S: Spec,
432        C: MetadataItem,
433    {
434        pub fn new(store: Arc<LocalStore<S, C>>) -> Self {
435            Self {
436                store,
437                last_change: 0,
438            }
439        }
440
441        #[inline]
442        fn event_publisher(&self) -> &EventPublisher {
443            self.store.event_publisher()
444        }
445
446        /// check if there should be any changes
447        /// this should be done before event listener
448        /// to ensure no events are missed
449        #[inline]
450        pub fn has_change(&self) -> bool {
451            self.event_publisher().current_change() > self.last_change
452        }
453
454        /// sync change to current change
455        #[inline(always)]
456        pub fn load_last(&mut self) {
457            self.set_last_change(self.event_publisher().current_change());
458        }
459
460        #[inline(always)]
461        pub fn set_last_change(&mut self, updated_change: i64) {
462            self.last_change = updated_change;
463        }
464
465        #[inline]
466        pub fn last_change(&self) -> i64 {
467            self.last_change
468        }
469
470        pub fn current_change(&self) -> i64 {
471            self.event_publisher().current_change()
472        }
473
474        pub async fn listen(&self) {
475            if self.has_change() {
476                trace!("before has change: {}", self.last_change());
477                return;
478            }
479
480            let listener = self.event_publisher().listen();
481
482            if self.has_change() {
483                trace!("after has change: {}", self.last_change());
484                return;
485            }
486
487            trace!("waiting for publisher");
488
489            listener.await;
490
491            trace!("new change: {}", self.current_change());
492        }
493
494        /// find all changes derived from this listener
495        pub async fn sync_changes(&mut self) -> MetadataChanges<S, C> {
496            self.sync_changes_with_filter(&FULL_FILTER).await
497        }
498
499        /// find all spec related changes
500        pub async fn sync_spec_changes(&mut self) -> MetadataChanges<S, C> {
501            self.sync_changes_with_filter(&SPEC_FILTER).await
502        }
503
504        /// all status related changes
505        pub async fn sync_status_changes(&mut self) -> MetadataChanges<S, C> {
506            self.sync_changes_with_filter(&STATUS_FILTER).await
507        }
508
509        /// all meta related changes
510        pub async fn sync_meta_changes(&mut self) -> MetadataChanges<S, C> {
511            self.sync_changes_with_filter(&META_FILTER).await
512        }
513
514        /// all meta related changes
515        pub async fn sync_changes_with_filter(
516            &mut self,
517            filter: &ChangeFlag,
518        ) -> MetadataChanges<S, C> {
519            let read_guard = self.store.read().await;
520            let changes = read_guard.changes_since_with_filter(self.last_change, filter);
521            drop(read_guard);
522            trace!(
523                "finding last status change: {}, from: {}",
524                self.last_change, changes.epoch
525            );
526
527            let current_epoch = self.event_publisher().current_change();
528            if changes.epoch > current_epoch {
529                trace!(
530                    "latest epoch: {} > spec epoch: {}",
531                    changes.epoch, current_epoch
532                );
533            }
534            self.set_last_change(changes.epoch);
535            changes
536        }
537
538        /// wait for initial loading and return all as expected
539        #[instrument()]
540        pub async fn wait_for_initial_sync(&mut self) -> Vec<MetadataStoreObject<S, C>> {
541            debug!("waiting");
542            self.listen().await;
543
544            let changes = self.sync_changes().await;
545            assert!(changes.is_sync_all());
546
547            debug!("finished initial sync");
548            changes.parts().0
549        }
550    }
551}
552
553#[cfg(test)]
554#[cfg(feature = "fixtures")]
555mod test {
556
557    use crate::store::actions::LSUpdate;
558    use crate::fixture::{TestSpec, TestStatus, DefaultTest, TestMeta};
559
560    use super::LocalStore;
561
562    type DefaultTestStore = LocalStore<TestSpec, TestMeta>;
563
564    #[fluvio_future::test]
565    async fn test_store_sync_all() {
566        let tests = vec![DefaultTest::with_spec("t1", TestSpec::default())];
567        let test_store = DefaultTestStore::default();
568        assert_eq!(test_store.epoch().await, 0);
569
570        let sync1 = test_store.sync_all(tests.clone()).await;
571        assert_eq!(test_store.epoch().await, 1);
572        assert_eq!(sync1.add, 1);
573        assert_eq!(sync1.delete, 0);
574        assert_eq!(sync1.update_spec, 0);
575        assert_eq!(sync1.update_status, 0);
576
577        let read_guard = test_store.read().await;
578        let test1 = read_guard.get("t1").expect("t1 should exists");
579        assert_eq!(test1.status_epoch(), 1);
580        assert_eq!(test1.spec_epoch(), 1);
581        drop(read_guard);
582
583        // sync all with spec changes only
584
585        let spec_changes =
586            vec![DefaultTest::with_spec("t1", TestSpec { replica: 6 }).with_context(2)];
587        let sync2 = test_store.sync_all(spec_changes.clone()).await;
588        assert_eq!(test_store.epoch().await, 2);
589        assert_eq!(sync2.add, 0);
590        assert_eq!(sync2.delete, 0);
591        assert_eq!(sync2.update_spec, 1);
592        assert_eq!(sync2.update_status, 0);
593
594        // apply again, this time there should not be any change all
595        let sync3 = test_store.sync_all(spec_changes.clone()).await;
596        assert_eq!(test_store.epoch().await, 3);
597        assert_eq!(sync3.add, 0);
598        assert_eq!(sync3.delete, 0);
599        assert_eq!(sync3.update_spec, 0);
600        assert_eq!(sync3.update_status, 0);
601    }
602
603    #[fluvio_future::test]
604    async fn test_store_update() {
605        let initial_topic = DefaultTest::with_spec("t1", TestSpec::default()).with_context(2);
606
607        let topic_store = DefaultTestStore::default();
608        let _ = topic_store.sync_all(vec![initial_topic.clone()]).await;
609        assert_eq!(topic_store.epoch().await, 1);
610
611        // applying same data should result in zero changes in the store
612        assert!(
613            topic_store
614                .apply_changes(vec![LSUpdate::Mod(initial_topic.clone())])
615                .await
616                .is_none()
617        );
618        assert_eq!(topic_store.epoch().await, 1);
619
620        // update spec should result in increase epoch
621        let topic2 =
622            DefaultTest::new("t1", TestSpec::default(), TestStatus { up: true }).with_context(3);
623        let changes = topic_store
624            .apply_changes(vec![LSUpdate::Mod(topic2)])
625            .await
626            .expect("some changes");
627        assert_eq!(changes.update_spec, 0);
628        assert_eq!(changes.update_status, 1);
629        assert_eq!(topic_store.epoch().await, 2);
630        assert_eq!(
631            topic_store.value("t1").await.expect("t1").ctx().item().rev,
632            3
633        );
634
635        // updating topics should only result in epoch
636
637        assert_eq!(initial_topic.ctx().item().rev, 2);
638        let changes = topic_store
639            .apply_changes(vec![LSUpdate::Mod(initial_topic.clone())])
640            .await;
641        assert_eq!(topic_store.epoch().await, 2);
642        assert!(changes.is_none());
643        assert_eq!(
644            topic_store.value("t1").await.expect("t1").status,
645            TestStatus { up: true }
646        );
647
648        // re-syching with initial topic should only cause epoch to change
649        let sync_all = topic_store.sync_all(vec![initial_topic]).await;
650        assert_eq!(topic_store.epoch().await, 3);
651        assert_eq!(sync_all.add, 0);
652        assert_eq!(sync_all.delete, 0);
653        assert_eq!(sync_all.update_spec, 0);
654        assert_eq!(sync_all.update_status, 0);
655    }
656}
657
658#[cfg(test)]
659#[cfg(feature = "fixture")]
660mod test_notify {
661
662    use std::sync::Arc;
663    use std::time::Duration;
664    use std::sync::atomic::{AtomicI64, AtomicBool};
665    use std::sync::atomic::Ordering::SeqCst;
666
667    use tokio::select;
668    use tracing::debug;
669
670    use fluvio_future::task::{spawn, spawn_task, Task};
671    use fluvio_future::timer::sleep;
672
673    use crate::core::{Spec, MetadataItem};
674    use crate::store::actions::LSUpdate;
675    use crate::store::event::SimpleEvent;
676    use crate::fixture::{TestSpec, DefaultTest, TestMeta};
677
678    use super::LocalStore;
679
680    type DefaultTestStore = LocalStore<TestSpec, TestMeta>;
681
682    use super::ChangeListener;
683
684    struct TestController {
685        store: Arc<DefaultTestStore>,
686        shutdown: Arc<SimpleEvent>,
687        last_change: Arc<AtomicI64>,
688    }
689
690    impl TestController {
691        fn start(
692            ctx: Arc<DefaultTestStore>,
693            shutdown: Arc<SimpleEvent>,
694            last_change: Arc<AtomicI64>,
695        ) {
696            let controller = Self {
697                store: ctx,
698                shutdown,
699                last_change,
700            };
701
702            spawn(controller.dispatch_loop());
703        }
704
705        async fn dispatch_loop(mut self) {
706            debug!("entering loop");
707
708            let mut spec_listener = self.store.change_listener();
709
710            loop {
711                self.sync(&mut spec_listener).await;
712
713                select! {
714                    _ = spec_listener.listen() => {
715                        debug!("spec change occur: {}",spec_listener.last_change());
716                        continue;
717                    },
718                    _ = self.shutdown.listen() => {
719                        debug!("shutdown");
720                        break;
721                    }
722                }
723            }
724        }
725
726        async fn sync(&mut self, spec_listener: &mut ChangeListener<TestSpec, TestMeta>) {
727            debug!("sync start");
728            let (update, _delete) = spec_listener.sync_spec_changes().await.parts();
729            // assert!(update.len() > 0);
730            debug!("changes: {}", update.len());
731            sleep(Duration::from_millis(10)).await;
732            debug!("sync end");
733            self.last_change.fetch_add(1, SeqCst);
734        }
735    }
736
737    #[fluvio_future::test]
738    async fn test_store_notifications() {
739        let topic_store = Arc::new(DefaultTestStore::default());
740        let last_change = Arc::new(AtomicI64::new(0));
741        let shutdown = SimpleEvent::shared();
742
743        TestController::start(topic_store.clone(), shutdown.clone(), last_change.clone());
744
745        let initial_topic = DefaultTest::with_spec("t1", TestSpec::default()).with_context(2);
746        let _ = topic_store.sync_all(vec![initial_topic.clone()]).await;
747
748        for i in 0..10u16 {
749            sleep(Duration::from_millis(2)).await;
750            let topic_name = format!("topic{i}");
751            debug!("creating topic: {}", topic_name);
752            let topic = DefaultTest::with_spec(topic_name, TestSpec::default()).with_context(3);
753            let _ = topic_store.apply_changes(vec![LSUpdate::Mod(topic)]).await;
754        }
755
756        // wait for controller to sync
757        sleep(Duration::from_millis(100)).await;
758        shutdown.notify();
759        sleep(Duration::from_millis(1)).await;
760
761        //  assert_eq!(last_change.load(SeqCst), 4);
762    }
763    #[fluvio_future::test]
764    async fn test_change_listener_non_blocking() {
765        let mut timer = sleep(Duration::from_millis(5));
766        let store = Arc::new(DefaultTestStore::default());
767        let listener = store.change_listener();
768
769        // no events, this should timeout
770        select! {
771
772            _ = listener.listen() => {
773            panic!("test failed");
774            },
775            _ = &mut timer => {
776                // test succeeds
777            }
778
779        }
780    }
781
782    #[test]
783    fn test_wait_for_first_change_assumptions() {
784        let topic_store = Arc::new(DefaultTestStore::default());
785
786        // wait_for_first_change() requires that ChangeListener is initialized with current_change = 0
787        assert_eq!(0, topic_store.change_listener().current_change())
788    }
789
790    #[fluvio_future::test]
791    async fn test_change_listener() {
792        let topic_store = Arc::new(DefaultTestStore::default());
793        let last_change = Arc::new(AtomicI64::new(0));
794        let shutdown = SimpleEvent::shared();
795        let topic_name = "topic";
796        let initial_topic = DefaultTest::with_spec(topic_name, TestSpec::default());
797        let has_been_updated = Arc::new(AtomicBool::default());
798
799        // Start a batch of listeners before changes have been.
800        let jh = start_batch_of_test_listeners(topic_store.clone(), has_been_updated.clone());
801        TestController::start(topic_store.clone(), shutdown.clone(), last_change.clone());
802
803        // set flag that we are about to update, then do the update
804        has_been_updated.store(true, std::sync::atomic::Ordering::Relaxed);
805        let _ = topic_store.sync_all(vec![initial_topic.clone()]).await;
806
807        // make sure that every listener got notified and returned
808        for j in jh {
809            j.await
810        }
811
812        // Test batch again with a store that already has updates
813        let jh = start_batch_of_test_listeners(topic_store.clone(), has_been_updated.clone());
814        // make sure that every listener got notified and returned
815        for j in jh {
816            j.await
817        }
818
819        // update with apply_changes
820        let topic = DefaultTest::with_spec(topic_name, TestSpec::default());
821
822        let _ = topic_store.apply_changes(vec![LSUpdate::Mod(topic)]).await;
823
824        // Test batch again to make sure returns correctly after an apply_changes
825        let jh = start_batch_of_test_listeners(topic_store, has_been_updated);
826        // make sure that every listener got notified and returned
827        for j in jh {
828            j.await
829        }
830
831        // wait for controller to sync
832        sleep(Duration::from_millis(100)).await;
833        shutdown.notify();
834        sleep(Duration::from_millis(1)).await;
835    }
836
837    fn start_batch_of_test_listeners(
838        store: Arc<LocalStore<TestSpec, TestMeta>>,
839        has_been_updated: Arc<AtomicBool>,
840    ) -> Vec<Task<()>> {
841        (0..10u32)
842            // let jh: Vec<()> = (0..10u32)
843            .map(|_| {
844                let store = store.clone();
845
846                spawn_task(listener_thread(store, has_been_updated.clone()))
847            })
848            .collect()
849    }
850
851    async fn listener_thread<S, C>(store: Arc<LocalStore<S, C>>, has_been_updated: Arc<AtomicBool>)
852    where
853        S: Spec,
854        C: MetadataItem,
855    {
856        store.wait_for_first_change().await;
857        // Make sure that we never return before we update the store
858        assert!(has_been_updated.load(std::sync::atomic::Ordering::Relaxed));
859    }
860}