use std::fmt::{Display, Formatter, Result};
use std::os::unix::io::AsRawFd;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use vmm_sys_util::{epoll::EventSet, eventfd::EventFd};
use crate::{EventOps, EventSubscriber, Events, MutEventSubscriber};
#[derive(Debug)]
pub struct Counter {
event_fd: EventFd,
counter: u64,
}
impl Display for Counter {
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
write!(
f,
"(event_fd = {}, counter = {})",
self.event_fd.as_raw_fd(),
self.counter
)
}
}
impl Counter {
pub fn new() -> Self {
Self {
event_fd: EventFd::new(0).unwrap(),
counter: 0,
}
}
pub fn trigger_event(&mut self) {
let _ = self.event_fd.write(1);
}
pub fn clear_event(&self) {
let _ = self.event_fd.read();
}
pub fn counter(&self) -> u64 {
self.counter
}
}
impl Default for Counter {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Default)]
pub struct CounterSubscriber(Counter);
impl std::ops::Deref for CounterSubscriber {
type Target = Counter;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl std::ops::DerefMut for CounterSubscriber {
fn deref_mut(&mut self) -> &mut Counter {
&mut self.0
}
}
impl MutEventSubscriber for CounterSubscriber {
fn process(&mut self, events: Events, event_ops: &mut EventOps) {
match events.event_set() {
EventSet::IN => {
self.counter += 1;
}
EventSet::ERROR => {
eprintln!("Got error on the monitored event.");
}
EventSet::HANG_UP => {
event_ops
.remove(events)
.expect("Encountered error during cleanup.");
panic!("Cannot continue execution. Associated fd was closed.");
}
_ => {
eprintln!(
"Received spurious event from the event manager {:#?}.",
events.event_set()
);
}
}
}
fn init(&mut self, ops: &mut EventOps) {
ops.add(Events::new(&self.event_fd, EventSet::IN))
.expect("Cannot register event.");
}
}
#[derive(Debug)]
pub struct CounterSubscriberWithData {
counter_1: Counter,
counter_2: Counter,
counter_3: Counter,
first_data: u32,
toggle_registry: bool,
}
impl CounterSubscriberWithData {
pub fn new(first_data: u32) -> Self {
Self {
counter_1: Counter::new(),
counter_2: Counter::new(),
counter_3: Counter::new(),
first_data,
toggle_registry: false,
}
}
pub fn trigger_all_counters(&mut self) {
self.counter_1.trigger_event();
self.counter_2.trigger_event();
self.counter_3.trigger_event();
}
pub fn get_all_counter_values(&self) -> Vec<u64> {
vec![
self.counter_1.counter(),
self.counter_2.counter(),
self.counter_3.counter(),
]
}
pub fn set_toggle_registry(&mut self, toggle: bool) {
self.toggle_registry = toggle;
}
}
impl MutEventSubscriber for CounterSubscriberWithData {
fn process(&mut self, events: Events, ops: &mut EventOps) {
if self.toggle_registry {
self.toggle_registry = false;
ops.remove(Events::with_data(
&self.counter_1.event_fd,
self.first_data,
EventSet::IN,
))
.expect("Cannot remove event.");
ops.remove(Events::with_data(
&self.counter_2.event_fd,
self.first_data + 1,
EventSet::IN,
))
.expect("Cannot remove event.");
ops.remove(Events::with_data(
&self.counter_3.event_fd,
self.first_data + 2,
EventSet::IN,
))
.expect("Cannot remove event.");
ops.add(Events::with_data(
&self.counter_1.event_fd,
self.first_data,
EventSet::IN,
))
.expect("Cannot register event.");
ops.add(Events::with_data(
&self.counter_2.event_fd,
self.first_data + 1,
EventSet::IN,
))
.expect("Cannot register event.");
ops.add(Events::with_data(
&self.counter_3.event_fd,
self.first_data + 2,
EventSet::IN,
))
.expect("Cannot register event.");
}
match events.event_set() {
EventSet::IN => {
let event_id = events.data() - self.first_data;
match event_id {
0 => {
self.counter_1.counter += 1;
}
1 => {
self.counter_2.counter += 1;
}
2 => {
self.counter_3.counter += 1;
}
_ => {
eprintln!("Received spurious event.");
}
};
}
EventSet::ERROR => {
eprintln!("Got error on the monitored event.");
}
EventSet::HANG_UP => {
ops.remove(events)
.expect("Encountered error during cleanup.");
panic!("Cannot continue execution. Associated fd was closed.");
}
_ => {}
}
}
fn init(&mut self, ops: &mut EventOps) {
ops.add(Events::with_data(
&self.counter_1.event_fd,
self.first_data,
EventSet::IN,
))
.expect("Cannot register event.");
ops.add(Events::with_data(
&self.counter_2.event_fd,
self.first_data + 1,
EventSet::IN,
))
.expect("Cannot register event.");
ops.add(Events::with_data(
&self.counter_3.event_fd,
self.first_data + 2,
EventSet::IN,
))
.expect("Cannot register event.");
}
}
#[derive(Debug)]
pub struct CounterInnerMutSubscriber {
event_fd: EventFd,
counter: AtomicU64,
}
impl Default for CounterInnerMutSubscriber {
fn default() -> Self {
Self {
event_fd: EventFd::new(0).unwrap(),
counter: AtomicU64::new(0),
}
}
}
impl CounterInnerMutSubscriber {
pub fn trigger_event(&self) {
let _ = self.event_fd.write(1);
}
pub fn clear_event(&self) {
let _ = self.event_fd.read();
}
pub fn counter(&self) -> u64 {
self.counter.load(Ordering::Relaxed)
}
}
impl EventSubscriber for CounterInnerMutSubscriber {
fn process(&self, events: Events, ops: &mut EventOps) {
match events.event_set() {
EventSet::IN => {
self.counter.fetch_add(1, Ordering::Relaxed);
}
EventSet::ERROR => {
eprintln!("Got error on the monitored event.");
}
EventSet::HANG_UP => {
ops.remove(events)
.expect("Encountered error during cleanup.");
panic!("Cannot continue execution. Associated fd was closed.");
}
_ => {}
}
}
fn init(&self, ops: &mut EventOps) {
ops.add(Events::new(&self.event_fd, EventSet::IN))
.expect("Cannot register event.");
}
}