use crate::{
catmem::ring::{
Ring,
MAX_RETRIES_PUSH_EOF,
},
expect_ok,
runtime::{
fail::Fail,
limits,
memory::DemiBuffer,
poll_yield,
queue::IoQueue,
DemiRuntime,
QToken,
QType,
SharedObject,
},
};
use ::std::{
any::Any,
ops::{
Deref,
DerefMut,
},
};
pub struct CatmemQueue {
ring: Ring,
}
#[derive(Clone)]
pub struct SharedCatmemQueue(SharedObject<CatmemQueue>);
impl CatmemQueue {
pub fn create(name: &str) -> Result<Self, Fail> {
Ok(Self {
ring: Ring::create(name)?,
})
}
pub fn open(name: &str) -> Result<Self, Fail> {
Ok(Self {
ring: Ring::open(name)?,
})
}
}
impl SharedCatmemQueue {
pub fn create(name: &str) -> Result<Self, Fail> {
Ok(Self(SharedObject::new(CatmemQueue::create(name)?)))
}
pub fn open(name: &str) -> Result<Self, Fail> {
Ok(Self(SharedObject::new(CatmemQueue::open(name)?)))
}
pub fn shutdown(&mut self) -> Result<(), Fail> {
{
self.ring.prepare_close()?;
self.ring.commit();
self.ring.prepare_closed()?;
self.ring.commit();
}
Ok(())
}
pub fn close(&mut self) -> Result<(), Fail> {
{
self.ring.prepare_close()?;
match self.ring.close() {
Ok(()) => {
self.ring.commit();
},
Err(e) => {
self.ring.abort();
return Err(e);
},
}
}
self.ring.prepare_closed()?;
self.ring.commit();
Ok(())
}
pub fn async_close<F>(&mut self, coroutine_constructor: F) -> Result<QToken, Fail>
where
F: FnOnce() -> Result<QToken, Fail>,
{
self.ring.prepare_close()?;
self.do_generic_sync_control_path_call(coroutine_constructor)
}
pub async fn do_async_close(&mut self) -> Result<(), Fail> {
let mut retries: u32 = MAX_RETRIES_PUSH_EOF;
let x = loop {
if let Ok(()) = self.ring.try_close() {
break Ok(());
}
poll_yield().await;
if retries == 0 {
let cause: String = format!("failed to push EoF");
error!("push_eof(): {}", cause);
break Err(Fail::new(libc::EIO, &cause));
}
retries -= 1;
};
if x.is_err() {
self.ring.abort();
return x;
}
self.ring.commit();
Ok(())
}
pub async fn do_pop(&mut self, size: Option<usize>) -> Result<(DemiBuffer, bool), Fail> {
let size: usize = size.unwrap_or(limits::RECVBUF_SIZE_MAX);
let mut buf: DemiBuffer = DemiBuffer::new(size as u16);
let eof: bool = loop {
match self.ring.try_pop(&mut buf) {
Ok((len, eof)) => {
if eof {
self.ring.prepare_close()?;
self.ring.commit();
expect_ok!(buf.trim(size), "should be able to trim to a zero-length buffer");
} else {
expect_ok!(buf.trim(size - len), "should be able to trim down to only read bytes");
}
break eof;
},
Err(e) if DemiRuntime::should_retry(e.errno) => {
poll_yield().await;
},
Err(e) => return Err(e),
}
};
trace!("data read ({:?}/{:?} bytes, eof={:?})", buf.len(), size, eof);
Ok((buf, eof))
}
pub async fn do_push(&mut self, mut buf: DemiBuffer) -> Result<(), Fail> {
loop {
match self.ring.try_push(&buf) {
Ok(len) if len == buf.len() => {
trace!("data written ({:?}/{:?} bytes)", buf.len(), buf.len());
return Ok(());
},
Ok(len) if len < buf.len() => {
expect_ok!(buf.adjust(len), "should be able to split remaining bytes");
continue;
},
Ok(len) => unreachable!(
"should not be possible to write more than in the buffer (len={:?})",
len
),
Err(e) if DemiRuntime::should_retry(e.errno) => {
poll_yield().await;
},
Err(e) => return Err(e),
}
}
}
fn do_generic_sync_control_path_call<F>(&mut self, coroutine_constructor: F) -> Result<QToken, Fail>
where
F: FnOnce() -> Result<QToken, Fail>,
{
let qt: QToken = match coroutine_constructor() {
Ok(qt) => {
self.ring.commit();
qt
},
Err(e) => {
self.ring.abort();
return Err(e);
},
};
Ok(qt)
}
}
impl Deref for SharedCatmemQueue {
type Target = CatmemQueue;
fn deref(&self) -> &Self::Target {
self.0.deref()
}
}
impl DerefMut for SharedCatmemQueue {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0.deref_mut()
}
}
impl IoQueue for SharedCatmemQueue {
fn get_qtype(&self) -> QType {
QType::MemoryQueue
}
fn as_any_ref(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
fn as_any(self: Box<Self>) -> Box<dyn Any> {
self
}
}