use crate::{
collections::{
concurrent_ring::ConcurrentRingBuffer,
shared_ring::SharedRingBuffer,
},
runtime::{
fail::Fail,
network::ring::{
operation::RingControlOperation,
state::RingStateMachine,
},
},
};
use ::std::ptr::copy;
const HEADER_SIZE: usize = 4;
const EOF_MESSAGE_HEADER: [u8; HEADER_SIZE] = [0xD, 0xE, 0xA, 0xD];
const REGULAR_MESSAGE_HEADER: [u8; HEADER_SIZE] = [0xB, 0xE, 0xE, 0xF];
const RING_BUFFER_CAPACITY: usize = 65536;
pub const MAX_RETRIES_PUSH_EOF: u32 = 16;
pub struct Ring {
push_buf: SharedRingBuffer<ConcurrentRingBuffer>,
pop_buf: SharedRingBuffer<ConcurrentRingBuffer>,
state_machine: RingStateMachine,
}
impl Ring {
pub fn create(name: &str) -> Result<Self, Fail> {
if name.is_empty() {
return Err(Fail::new(libc::EINVAL, "name of shared memory region cannot be empty"));
}
Ok(Self {
push_buf: SharedRingBuffer::create(&format!("{}:tx", name), RING_BUFFER_CAPACITY)?,
pop_buf: SharedRingBuffer::create(&format!("{}:rx", name), RING_BUFFER_CAPACITY)?,
state_machine: RingStateMachine::new(),
})
}
pub fn open(name: &str) -> Result<Self, Fail> {
if name.is_empty() {
return Err(Fail::new(libc::EINVAL, "name of shared memory region cannot be empty"));
}
Ok(Self {
push_buf: SharedRingBuffer::open(&format!("{}:rx", name), RING_BUFFER_CAPACITY)?,
pop_buf: SharedRingBuffer::open(&format!("{}:tx", name), RING_BUFFER_CAPACITY)?,
state_machine: RingStateMachine::new(),
})
}
pub fn try_pop(&mut self, buf: &mut [u8]) -> Result<(usize, bool), Fail> {
self.state_machine.may_pop()?;
let mut msg: Vec<u8> = vec![0; buf.len() + HEADER_SIZE];
let msg_len: usize = self.pop_buf.try_pop(&mut msg)? - HEADER_SIZE;
if msg_len > 0 {
debug_assert_eq!(REGULAR_MESSAGE_HEADER, msg[0..HEADER_SIZE]);
unsafe {
let buf_ptr: *mut u8 = buf.as_mut_ptr();
let msg_ptr: *const u8 = msg.as_ptr();
copy(msg_ptr.add(HEADER_SIZE), buf_ptr, msg_len);
};
Ok((msg_len, false))
} else {
debug_assert_eq!(EOF_MESSAGE_HEADER, msg[0..HEADER_SIZE]);
Ok((0, true))
}
}
pub fn try_push(&mut self, buf: &[u8]) -> Result<usize, Fail> {
self.state_machine.may_push()?;
let mut msg: Vec<u8> = REGULAR_MESSAGE_HEADER.to_vec();
msg.append(&mut buf.to_vec());
Ok(self.push_buf.try_push(&msg)? - HEADER_SIZE)
}
pub fn close(&mut self) -> Result<(), Fail> {
for _ in 0..MAX_RETRIES_PUSH_EOF {
match self.try_close() {
Ok(()) => return Ok(()),
Err(_) => continue,
}
}
let cause: String = format!("failed to push EoF");
error!("push_eof(): {}", cause);
Err(Fail::new(libc::EIO, &cause))
}
pub fn try_close(&mut self) -> Result<(), Fail> {
match self.push_buf.try_push(&EOF_MESSAGE_HEADER) {
Ok(len) => {
debug_assert_eq!(len, HEADER_SIZE);
Ok(())
},
Err(_) => {
let cause: String = format!("failed to push EoF");
error!("try_close(): {:?}", &cause);
Err(Fail::new(libc::EAGAIN, &cause))
},
}
}
pub fn prepare_close(&mut self) -> Result<(), Fail> {
self.state_machine.prepare(RingControlOperation::Close)
}
pub fn prepare_closed(&mut self) -> Result<(), Fail> {
self.state_machine.prepare(RingControlOperation::Closed)
}
pub fn commit(&mut self) {
self.state_machine.commit();
}
pub fn abort(&mut self) {
self.state_machine.abort();
}
}