use crossbeam_queue::ArrayQueue;
pub use microsandbox_utils::wake_pipe::WakePipe;
use std::sync::{
Arc, Mutex,
atomic::{AtomicU64, Ordering},
};
pub const DEFAULT_QUEUE_CAPACITY: usize = 1024;
pub struct SharedState {
pub tx_ring: ArrayQueue<Vec<u8>>,
pub rx_ring: ArrayQueue<Vec<u8>>,
pub rx_wake: WakePipe,
pub tx_wake: WakePipe,
pub proxy_wake: WakePipe,
termination_hook: Mutex<Option<Arc<dyn Fn() + Send + Sync>>>,
metrics: NetworkMetrics,
}
pub struct NetworkMetrics {
tx_bytes: AtomicU64,
rx_bytes: AtomicU64,
}
impl SharedState {
pub fn new(queue_capacity: usize) -> Self {
Self {
tx_ring: ArrayQueue::new(queue_capacity),
rx_ring: ArrayQueue::new(queue_capacity),
rx_wake: WakePipe::new(),
tx_wake: WakePipe::new(),
proxy_wake: WakePipe::new(),
termination_hook: Mutex::new(None),
metrics: NetworkMetrics::default(),
}
}
pub fn set_termination_hook(&self, hook: Arc<dyn Fn() + Send + Sync>) {
*self.termination_hook.lock().unwrap() = Some(hook);
}
pub fn trigger_termination(&self) {
let hook = self.termination_hook.lock().unwrap().clone();
if let Some(hook) = hook {
hook();
}
}
pub fn add_tx_bytes(&self, bytes: usize) {
self.metrics
.tx_bytes
.fetch_add(bytes as u64, Ordering::Relaxed);
}
pub fn add_rx_bytes(&self, bytes: usize) {
self.metrics
.rx_bytes
.fetch_add(bytes as u64, Ordering::Relaxed);
}
pub fn tx_bytes(&self) -> u64 {
self.metrics.tx_bytes.load(Ordering::Relaxed)
}
pub fn rx_bytes(&self) -> u64 {
self.metrics.rx_bytes.load(Ordering::Relaxed)
}
}
impl Default for NetworkMetrics {
fn default() -> Self {
Self {
tx_bytes: AtomicU64::new(0),
rx_bytes: AtomicU64::new(0),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn shared_state_queue_push_pop() {
let state = SharedState::new(4);
state.tx_ring.push(vec![1, 2, 3]).unwrap();
state.tx_ring.push(vec![4, 5, 6]).unwrap();
assert_eq!(state.tx_ring.pop(), Some(vec![1, 2, 3]));
assert_eq!(state.tx_ring.pop(), Some(vec![4, 5, 6]));
assert_eq!(state.tx_ring.pop(), None);
}
#[test]
fn shared_state_queue_full() {
let state = SharedState::new(2);
state.rx_ring.push(vec![1]).unwrap();
state.rx_ring.push(vec![2]).unwrap();
assert!(state.rx_ring.push(vec![3]).is_err());
}
}