fluvio_stream_model/store/event.rs
1use std::sync::atomic::{AtomicI64, Ordering, AtomicBool};
2use std::sync::Arc;
3
4use tracing::trace;
5use event_listener::{Event, EventListener};
6
7const DEFAULT_EVENT_ORDERING: Ordering = Ordering::SeqCst;
8
9/// Track publishing of events by using u64 counter
10#[derive(Debug, Default)]
11pub struct EventPublisher {
12 event: Event,
13 change: AtomicI64,
14}
15
16impl EventPublisher {
17 pub fn new() -> Self {
18 Self {
19 event: Event::new(),
20 change: AtomicI64::new(0),
21 }
22 }
23
24 pub fn shared() -> Arc<Self> {
25 Arc::new(Self::new())
26 }
27
28 fn notify(&self) {
29 self.event.notify(usize::MAX);
30 }
31
32 #[inline]
33 pub fn current_change(&self) -> i64 {
34 self.change.load(DEFAULT_EVENT_ORDERING)
35 }
36
37 /// stores new value and notifies any listeners
38 pub fn store_change(&self, value: i64) {
39 self.change.store(value, DEFAULT_EVENT_ORDERING);
40 self.notify()
41 }
42
43 pub fn listen(&self) -> EventListener {
44 self.event.listen()
45 }
46}
47
48pub struct SimpleEvent {
49 flag: AtomicBool,
50 event: Event,
51}
52
53impl SimpleEvent {
54 pub fn shared() -> Arc<Self> {
55 Arc::new(Self {
56 flag: AtomicBool::new(false),
57 event: Event::new(),
58 })
59 }
60
61 // is flag set
62 pub fn is_set(&self) -> bool {
63 self.flag.load(DEFAULT_EVENT_ORDERING)
64 }
65
66 pub async fn listen(&self) {
67 if self.is_set() {
68 trace!("before, flag is set");
69 return;
70 }
71
72 let listener = self.event.listen();
73
74 if self.is_set() {
75 trace!("after flag is set");
76 return;
77 }
78
79 listener.await
80 }
81
82 pub fn notify(&self) {
83 self.flag.store(true, DEFAULT_EVENT_ORDERING);
84 self.event.notify(usize::MAX);
85 }
86}
87
88/*
89#[cfg(test)]
90mod test {
91
92 use std::time::Duration;
93 use std::sync::Arc;
94 use std::sync::atomic::AtomicI64;
95 use std::sync::atomic::Ordering::SeqCst;
96
97 use tracing::debug;
98
99 use fluvio_future::task::spawn;
100 use fluvio_future::timer::sleep;
101
102 use super::ChangeListener;
103 use super::EventPublisher;
104 use super::SimpleEvent;
105
106 struct TestController {
107 change: ChangeListener,
108 shutdown: Arc<SimpleEvent>,
109 last_change: Arc<AtomicI64>,
110 }
111
112 impl TestController {
113 fn start(change: ChangeListener, shutdown: Arc<SimpleEvent>, last_change: Arc<AtomicI64>) {
114 let controller = Self {
115 change,
116 shutdown,
117 last_change,
118 };
119 spawn(controller.dispatch_loop());
120 }
121
122 async fn dispatch_loop(mut self) {
123 use tokio::select;
124
125 debug!("entering loop");
126 loop {
127 self.sync().await;
128
129 select! {
130 _ = self.change.listen() => {
131 debug!("listen occur");
132 continue;
133 },
134 _ = self.shutdown.listen() => {
135 debug!("shutdown");
136 break;
137 }
138 }
139 }
140
141 debug!("terminated, last change: {}", self.change.last_change());
142 }
143
144 /// randomly sleep to simulate some tasks
145 async fn sync(&mut self) {
146 debug!("sync start");
147 self.last_change.fetch_add(1, SeqCst);
148 sleep(Duration::from_millis(5)).await;
149 self.change.load_last(); // sync to latest
150 debug!("sync end: {}", self.change.last_change());
151 }
152 }
153
154 #[fluvio_future::test]
155 async fn test_listener() {
156 let publisher = Arc::new(EventPublisher::new());
157 let listener = publisher.change_listener(0);
158 let shutdown = SimpleEvent::shared();
159 let last_change = Arc::new(AtomicI64::new(0));
160 TestController::start(listener, shutdown.clone(), last_change.clone());
161
162 for i in 0..5u16 {
163 sleep(Duration::from_millis(2)).await;
164 publisher.increment();
165 publisher.notify();
166 debug!("notification: {}, value: {}", i, publisher.current_change());
167 }
168
169 // wait for test controller to finish
170 sleep(Duration::from_millis(20)).await;
171
172 // shutdown and wait to finish
173 shutdown.notify();
174
175 sleep(Duration::from_millis(5)).await;
176
177 // assert_eq!(last_change.load(SeqCst), 2); // there should be 2 sync happenings
178
179 }
180}
181*/