#[derive(Debug, PartialEq, Eq)]
pub(crate) enum ChainEvent {
Pending,
AllOpsComplete,
Complete { bytes_sent: u32, error: Option<i32> },
}
pub(crate) struct ChainState {
total_sqes: u16,
cqes_received: u16,
pub(crate) bytes_sent: u32,
total_bytes: u32,
broken: bool,
pub(crate) first_error: Option<i32>,
zc_notifs_pending: u16,
}
impl ChainState {
fn is_complete(&self) -> bool {
self.cqes_received == self.total_sqes && self.zc_notifs_pending == 0
}
fn to_event(&self) -> ChainEvent {
if self.is_complete() {
ChainEvent::Complete {
bytes_sent: self.bytes_sent,
error: self.first_error,
}
} else if self.cqes_received == self.total_sqes {
ChainEvent::AllOpsComplete
} else {
ChainEvent::Pending
}
}
}
pub(crate) struct SendChainTable {
chains: Vec<Option<ChainState>>,
}
impl SendChainTable {
pub fn new(max_connections: u32) -> Self {
let mut chains = Vec::with_capacity(max_connections as usize);
chains.resize_with(max_connections as usize, || None);
SendChainTable { chains }
}
pub fn start(&mut self, conn_index: u32, total_sqes: u16, total_bytes: u32) {
let slot = &mut self.chains[conn_index as usize];
debug_assert!(
slot.is_none(),
"starting chain on conn {conn_index} with existing active chain"
);
*slot = Some(ChainState {
total_sqes,
cqes_received: 0,
bytes_sent: 0,
total_bytes,
broken: false,
first_error: None,
zc_notifs_pending: 0,
});
}
#[inline]
pub fn is_active(&self, conn_index: u32) -> bool {
self.chains
.get(conn_index as usize)
.map(|s| s.is_some())
.unwrap_or(false)
}
pub fn on_operation_cqe(&mut self, conn_index: u32, result: i32) -> ChainEvent {
let chain = match &mut self.chains[conn_index as usize] {
Some(c) => c,
None => return ChainEvent::Pending,
};
chain.cqes_received += 1;
if result >= 0 {
chain.bytes_sent += result as u32;
} else {
chain.broken = true;
if chain.first_error.is_none() {
chain.first_error = Some(result);
}
}
chain.to_event()
}
pub fn inc_zc_notif(&mut self, conn_index: u32) {
if let Some(ref mut chain) = self.chains[conn_index as usize] {
chain.zc_notifs_pending += 1;
}
}
pub fn on_notif_cqe(&mut self, conn_index: u32) -> ChainEvent {
let chain = match &mut self.chains[conn_index as usize] {
Some(c) => c,
None => return ChainEvent::Pending,
};
debug_assert!(
chain.zc_notifs_pending > 0,
"ZC notif underflow on conn {conn_index}"
);
chain.zc_notifs_pending -= 1;
chain.to_event()
}
pub fn take(&mut self, conn_index: u32) -> Option<ChainState> {
self.chains[conn_index as usize].take()
}
pub fn cancel(&mut self, conn_index: u32) -> u32 {
if let Some(chain) = self.chains[conn_index as usize].take() {
chain.total_bytes
} else {
0
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn single_sqe_chain() {
let mut table = SendChainTable::new(16);
table.start(0, 1, 100);
assert!(table.is_active(0));
let event = table.on_operation_cqe(0, 100);
assert_eq!(
event,
ChainEvent::Complete {
bytes_sent: 100,
error: None
}
);
let state = table.take(0);
assert!(state.is_some());
assert!(!table.is_active(0));
}
#[test]
fn multi_sqe_chain_success() {
let mut table = SendChainTable::new(16);
table.start(0, 3, 300);
let event = table.on_operation_cqe(0, 100);
assert_eq!(event, ChainEvent::Pending);
let event = table.on_operation_cqe(0, 100);
assert_eq!(event, ChainEvent::Pending);
let event = table.on_operation_cqe(0, 100);
assert_eq!(
event,
ChainEvent::Complete {
bytes_sent: 300,
error: None
}
);
}
#[test]
fn chain_with_error_and_cancel() {
let mut table = SendChainTable::new(16);
table.start(0, 3, 300);
let event = table.on_operation_cqe(0, 100);
assert_eq!(event, ChainEvent::Pending);
let event = table.on_operation_cqe(0, -libc::EIO);
assert_eq!(event, ChainEvent::Pending);
let event = table.on_operation_cqe(0, -libc::ECANCELED);
assert_eq!(
event,
ChainEvent::Complete {
bytes_sent: 100,
error: Some(-libc::EIO)
}
);
}
#[test]
fn chain_with_zc_notifs() {
let mut table = SendChainTable::new(16);
table.start(0, 2, 200);
table.inc_zc_notif(0);
let event = table.on_operation_cqe(0, 100);
assert_eq!(event, ChainEvent::Pending);
let event = table.on_operation_cqe(0, 100);
assert_eq!(event, ChainEvent::AllOpsComplete);
let event = table.on_notif_cqe(0);
assert_eq!(
event,
ChainEvent::Complete {
bytes_sent: 200,
error: None
}
);
}
#[test]
fn cancel_active_chain() {
let mut table = SendChainTable::new(16);
table.start(0, 5, 500);
assert!(table.is_active(0));
let total = table.cancel(0);
assert_eq!(total, 500);
assert!(!table.is_active(0));
}
#[test]
fn cancel_no_chain() {
let mut table = SendChainTable::new(16);
let total = table.cancel(0);
assert_eq!(total, 0);
}
#[test]
fn inactive_connection() {
let table = SendChainTable::new(16);
assert!(!table.is_active(0));
assert!(!table.is_active(15));
}
}