fluvio_stream_model/store/
event.rs

1use std::sync::atomic::{AtomicI64, Ordering, AtomicBool};
2use std::sync::Arc;
3
4use tracing::trace;
5use event_listener::{Event, EventListener};
6
7const DEFAULT_EVENT_ORDERING: Ordering = Ordering::SeqCst;
8
9/// Track publishing of events by using u64 counter
10#[derive(Debug, Default)]
11pub struct EventPublisher {
12    event: Event,
13    change: AtomicI64,
14}
15
16impl EventPublisher {
17    pub fn new() -> Self {
18        Self {
19            event: Event::new(),
20            change: AtomicI64::new(0),
21        }
22    }
23
24    pub fn shared() -> Arc<Self> {
25        Arc::new(Self::new())
26    }
27
28    fn notify(&self) {
29        self.event.notify(usize::MAX);
30    }
31
32    #[inline]
33    pub fn current_change(&self) -> i64 {
34        self.change.load(DEFAULT_EVENT_ORDERING)
35    }
36
37    /// stores new value and notifies any listeners
38    pub fn store_change(&self, value: i64) {
39        self.change.store(value, DEFAULT_EVENT_ORDERING);
40        self.notify()
41    }
42
43    pub fn listen(&self) -> EventListener {
44        self.event.listen()
45    }
46}
47
48pub struct SimpleEvent {
49    flag: AtomicBool,
50    event: Event,
51}
52
53impl SimpleEvent {
54    pub fn shared() -> Arc<Self> {
55        Arc::new(Self {
56            flag: AtomicBool::new(false),
57            event: Event::new(),
58        })
59    }
60
61    // is flag set
62    pub fn is_set(&self) -> bool {
63        self.flag.load(DEFAULT_EVENT_ORDERING)
64    }
65
66    pub async fn listen(&self) {
67        if self.is_set() {
68            trace!("before, flag is set");
69            return;
70        }
71
72        let listener = self.event.listen();
73
74        if self.is_set() {
75            trace!("after flag is set");
76            return;
77        }
78
79        listener.await
80    }
81
82    pub fn notify(&self) {
83        self.flag.store(true, DEFAULT_EVENT_ORDERING);
84        self.event.notify(usize::MAX);
85    }
86}
87
88/*
89#[cfg(test)]
90mod test {
91
92    use std::time::Duration;
93    use std::sync::Arc;
94    use std::sync::atomic::AtomicI64;
95    use std::sync::atomic::Ordering::SeqCst;
96
97    use tracing::debug;
98
99    use fluvio_future::task::spawn;
100    use fluvio_future::timer::sleep;
101
102    use super::ChangeListener;
103    use super::EventPublisher;
104    use super::SimpleEvent;
105
106    struct TestController {
107        change: ChangeListener,
108        shutdown: Arc<SimpleEvent>,
109        last_change: Arc<AtomicI64>,
110    }
111
112    impl TestController {
113        fn start(change: ChangeListener, shutdown: Arc<SimpleEvent>, last_change: Arc<AtomicI64>) {
114            let controller = Self {
115                change,
116                shutdown,
117                last_change,
118            };
119            spawn(controller.dispatch_loop());
120        }
121
122        async fn dispatch_loop(mut self) {
123            use tokio::select;
124
125            debug!("entering loop");
126            loop {
127                self.sync().await;
128
129                select! {
130                    _ = self.change.listen() => {
131                        debug!("listen occur");
132                        continue;
133                    },
134                    _ = self.shutdown.listen() => {
135                        debug!("shutdown");
136                        break;
137                    }
138                }
139            }
140
141            debug!("terminated, last change: {}", self.change.last_change());
142        }
143
144        /// randomly sleep to simulate some tasks
145        async fn sync(&mut self) {
146            debug!("sync start");
147            self.last_change.fetch_add(1, SeqCst);
148            sleep(Duration::from_millis(5)).await;
149            self.change.load_last(); // sync to latest
150            debug!("sync end: {}", self.change.last_change());
151        }
152    }
153
154    #[fluvio_future::test]
155    async fn test_listener()  {
156        let publisher = Arc::new(EventPublisher::new());
157        let listener = publisher.change_listener(0);
158        let shutdown = SimpleEvent::shared();
159        let last_change = Arc::new(AtomicI64::new(0));
160        TestController::start(listener, shutdown.clone(), last_change.clone());
161
162        for i in 0..5u16 {
163            sleep(Duration::from_millis(2)).await;
164            publisher.increment();
165            publisher.notify();
166            debug!("notification: {}, value: {}", i, publisher.current_change());
167        }
168
169        // wait for test controller to finish
170        sleep(Duration::from_millis(20)).await;
171
172        // shutdown and wait to finish
173        shutdown.notify();
174
175        sleep(Duration::from_millis(5)).await;
176
177        // assert_eq!(last_change.load(SeqCst), 2); // there should be 2 sync happenings
178
179    }
180}
181*/