1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
use std::pin::Pin;
use std::sync::atomic::{AtomicI64, Ordering, AtomicBool};
use std::sync::Arc;
use tracing::trace;
use event_listener::{Event, EventListener};
const DEFAULT_EVENT_ORDERING: Ordering = Ordering::SeqCst;
/// Track publishing of events by using u64 counter
#[derive(Debug, Default)]
pub struct EventPublisher {
event: Event,
change: AtomicI64,
}
impl EventPublisher {
pub fn new() -> Self {
Self {
event: Event::new(),
change: AtomicI64::new(0),
}
}
pub fn shared() -> Arc<Self> {
Arc::new(Self::new())
}
fn notify(&self) {
self.event.notify(usize::MAX);
}
#[inline]
pub fn current_change(&self) -> i64 {
self.change.load(DEFAULT_EVENT_ORDERING)
}
/// stores new value and notifies any listeners
pub fn store_change(&self, value: i64) {
self.change.store(value, DEFAULT_EVENT_ORDERING);
self.notify()
}
pub fn listen(&self) -> Pin<Box<EventListener>> {
self.event.listen()
}
}
pub struct SimpleEvent {
flag: AtomicBool,
event: Event,
}
impl SimpleEvent {
pub fn shared() -> Arc<Self> {
Arc::new(Self {
flag: AtomicBool::new(false),
event: Event::new(),
})
}
// is flag set
pub fn is_set(&self) -> bool {
self.flag.load(DEFAULT_EVENT_ORDERING)
}
pub async fn listen(&self) {
if self.is_set() {
trace!("before, flag is set");
return;
}
let listener = self.event.listen();
if self.is_set() {
trace!("after flag is set");
return;
}
listener.await
}
pub fn notify(&self) {
self.flag.store(true, DEFAULT_EVENT_ORDERING);
self.event.notify(usize::MAX);
}
}
/*
#[cfg(test)]
mod test {
use std::time::Duration;
use std::sync::Arc;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering::SeqCst;
use tracing::debug;
use fluvio_future::task::spawn;
use fluvio_future::timer::sleep;
use super::ChangeListener;
use super::EventPublisher;
use super::SimpleEvent;
struct TestController {
change: ChangeListener,
shutdown: Arc<SimpleEvent>,
last_change: Arc<AtomicI64>,
}
impl TestController {
fn start(change: ChangeListener, shutdown: Arc<SimpleEvent>, last_change: Arc<AtomicI64>) {
let controller = Self {
change,
shutdown,
last_change,
};
spawn(controller.dispatch_loop());
}
async fn dispatch_loop(mut self) {
use tokio::select;
debug!("entering loop");
loop {
self.sync().await;
select! {
_ = self.change.listen() => {
debug!("listen occur");
continue;
},
_ = self.shutdown.listen() => {
debug!("shutdown");
break;
}
}
}
debug!("terminated, last change: {}", self.change.last_change());
}
/// randomly sleep to simulate some tasks
async fn sync(&mut self) {
debug!("sync start");
self.last_change.fetch_add(1, SeqCst);
sleep(Duration::from_millis(5)).await;
self.change.load_last(); // sync to latest
debug!("sync end: {}", self.change.last_change());
}
}
#[fluvio_future::test]
async fn test_listener() {
let publisher = Arc::new(EventPublisher::new());
let listener = publisher.change_listener(0);
let shutdown = SimpleEvent::shared();
let last_change = Arc::new(AtomicI64::new(0));
TestController::start(listener, shutdown.clone(), last_change.clone());
for i in 0..5u16 {
sleep(Duration::from_millis(2)).await;
publisher.increment();
publisher.notify();
debug!("notification: {}, value: {}", i, publisher.current_change());
}
// wait for test controller to finish
sleep(Duration::from_millis(20)).await;
// shutdown and wait to finish
shutdown.notify();
sleep(Duration::from_millis(5)).await;
// assert_eq!(last_change.load(SeqCst), 2); // there should be 2 sync happenings
}
}
*/