use std::io;
use std::sync::Mutex;
use crossbeam_channel::{Receiver, Sender, bounded, unbounded};
use crate::buffer::fixed::{MemoryRegion, RegionId};
use crate::wakeup::WakeHandle;
pub(crate) enum RegionControlMsg {
Register {
slot: u16,
region: MemoryRegion,
ack: Sender<io::Result<()>>,
},
Unregister {
slot: u16,
ack: Sender<io::Result<()>>,
},
}
pub(crate) type RegionControlRx = Receiver<RegionControlMsg>;
pub(crate) struct RegionRegistrar {
inner: Mutex<RegistrarInner>,
workers: Vec<Sender<RegionControlMsg>>,
wake_handles: Vec<WakeHandle>,
}
struct RegistrarInner {
free_slots: Vec<u16>,
}
impl RegionRegistrar {
pub(crate) fn new(
max_slots: u16,
reserved: u16,
workers: Vec<Sender<RegionControlMsg>>,
wake_handles: Vec<WakeHandle>,
) -> Self {
debug_assert!(reserved <= max_slots);
debug_assert_eq!(workers.len(), wake_handles.len());
let free_slots: Vec<u16> = (reserved..max_slots).rev().collect();
RegionRegistrar {
inner: Mutex::new(RegistrarInner { free_slots }),
workers,
wake_handles,
}
}
pub(crate) fn register(&self, region: MemoryRegion) -> io::Result<RegionId> {
let slot = {
let mut inner = self.inner.lock().expect("registrar mutex poisoned");
inner
.free_slots
.pop()
.ok_or_else(|| io::Error::other("registered-region table is full"))?
};
match self.broadcast(|ack| RegionControlMsg::Register {
slot,
region: region.clone(),
ack,
}) {
Ok(()) => Ok(RegionId(slot)),
Err(e) => {
self.return_slot(slot);
Err(e)
}
}
}
pub(crate) fn unregister(&self, id: RegionId) -> io::Result<()> {
if id == RegionId::UNREGISTERED {
return Err(io::Error::other("cannot unregister UNREGISTERED sentinel"));
}
let slot = id.0;
self.broadcast(|ack| RegionControlMsg::Unregister { slot, ack })?;
self.return_slot(slot);
Ok(())
}
fn broadcast(
&self,
mut build: impl FnMut(Sender<io::Result<()>>) -> RegionControlMsg,
) -> io::Result<()> {
let n = self.workers.len();
let (ack_tx, ack_rx) = bounded(n);
for (tx, wake) in self.workers.iter().zip(&self.wake_handles) {
let msg = build(ack_tx.clone());
tx.send(msg)
.map_err(|_| io::Error::other("worker control channel disconnected"))?;
wake.wake();
}
drop(ack_tx);
let mut first_err: Option<io::Error> = None;
for _ in 0..n {
match ack_rx.recv() {
Ok(Ok(())) => {}
Ok(Err(e)) if first_err.is_none() => first_err = Some(e),
Ok(Err(_)) => {}
Err(_) => {
if first_err.is_none() {
first_err = Some(io::Error::other(
"worker terminated before acknowledging region update",
));
}
}
}
}
match first_err {
Some(e) => Err(e),
None => Ok(()),
}
}
fn return_slot(&self, slot: u16) {
let mut inner = self.inner.lock().expect("registrar mutex poisoned");
inner.free_slots.push(slot);
}
}
pub(crate) fn build_worker_channels(
num_workers: usize,
) -> (Vec<Sender<RegionControlMsg>>, Vec<RegionControlRx>) {
let mut txs = Vec::with_capacity(num_workers);
let mut rxs = Vec::with_capacity(num_workers);
for _ in 0..num_workers {
let (tx, rx) = unbounded();
txs.push(tx);
rxs.push(rx);
}
(txs, rxs)
}