pub mod queue;
mod ring;
use self::queue::SharedCatmemQueue;
use crate::{
demikernel::config::Config,
expect_ok,
pal::linux::socketaddrv4_to_sockaddr,
runtime::{
fail::Fail,
limits,
memory::{
DemiBuffer,
MemoryRuntime,
},
queue::downcast_queue,
types::{
demi_opcode_t,
demi_qr_value_t,
demi_qresult_t,
demi_sgarray_t,
},
OperationResult,
SharedDemiRuntime,
SharedObject,
},
QDesc,
QToken,
};
use ::futures::FutureExt;
use ::std::{
mem,
ops::{
Deref,
DerefMut,
},
time::Duration,
};
pub struct CatmemLibOS {
runtime: SharedDemiRuntime,
}
#[derive(Clone)]
pub struct SharedCatmemLibOS(SharedObject<CatmemLibOS>);
impl CatmemLibOS {
pub fn new(runtime: SharedDemiRuntime) -> Self {
Self { runtime }
}
}
impl SharedCatmemLibOS {
pub fn new(_config: &Config, runtime: SharedDemiRuntime) -> Self {
Self(SharedObject::new(CatmemLibOS::new(runtime)))
}
pub fn create_pipe(&mut self, name: &str) -> Result<QDesc, Fail> {
trace!("create_pipe() name={:?}", name);
let qd: QDesc = self
.runtime
.alloc_queue::<SharedCatmemQueue>(SharedCatmemQueue::create(name)?);
Ok(qd)
}
pub fn open_pipe(&mut self, name: &str) -> Result<QDesc, Fail> {
trace!("open_pipe() name={:?}", name);
let qd: QDesc = self
.runtime
.alloc_queue::<SharedCatmemQueue>(SharedCatmemQueue::open(name)?);
Ok(qd)
}
pub fn shutdown(&mut self, qd: QDesc) -> Result<(), Fail> {
trace!("shutdown() qd={:?}", qd);
let mut queue: SharedCatmemQueue = self.runtime.free_queue::<SharedCatmemQueue>(&qd)?;
queue.shutdown()
}
pub fn close(&mut self, qd: QDesc) -> Result<(), Fail> {
trace!("close() qd={:?}", qd);
let mut queue: SharedCatmemQueue = self.runtime.free_queue::<SharedCatmemQueue>(&qd)?;
queue.close()
}
pub fn async_close(&mut self, qd: QDesc) -> Result<QToken, Fail> {
trace!("async_close() qd={:?}", qd);
let mut queue: SharedCatmemQueue = self.get_queue(&qd)?;
let coroutine_constructor = || -> Result<QToken, Fail> {
let coroutine = Box::pin(self.clone().close_coroutine(qd).fuse());
self.runtime
.clone()
.insert_io_coroutine("Catmem::async_close", coroutine)
};
queue.async_close(coroutine_constructor)
}
pub async fn close_coroutine(mut self, qd: QDesc) -> (QDesc, OperationResult) {
let mut queue: SharedCatmemQueue = match self.get_queue(&qd) {
Ok(queue) => queue,
Err(e) => return (qd, OperationResult::Failed(e)),
};
match queue.do_async_close().await {
Ok(()) => {
expect_ok!(self.runtime.free_queue::<SharedCatmemQueue>(&qd), "queue should exist");
(qd, OperationResult::Close)
},
Err(e) => {
warn!("async_close(): {:?}", &e);
(qd, OperationResult::Failed(e))
},
}
}
pub fn push(&mut self, qd: QDesc, sga: &demi_sgarray_t) -> Result<QToken, Fail> {
trace!("push() qd={:?}", qd);
let buf: DemiBuffer = self.clone_sgarray(sga)?;
if buf.len() == 0 {
let cause: String = format!("zero-length buffer (qd={:?})", qd);
error!("push(): {}", cause);
return Err(Fail::new(libc::EINVAL, &cause));
}
let coroutine = Box::pin(self.clone().push_coroutine(qd, buf).fuse());
self.runtime.clone().insert_io_coroutine("Catmem::push", coroutine)
}
pub async fn push_coroutine(self, qd: QDesc, buf: DemiBuffer) -> (QDesc, OperationResult) {
let mut queue: SharedCatmemQueue = match self.get_queue(&qd) {
Ok(queue) => queue,
Err(e) => return (qd, OperationResult::Failed(e)),
};
match queue.do_push(buf).await {
Ok(()) => (qd, OperationResult::Push),
Err(e) => (qd, OperationResult::Failed(e)),
}
}
pub fn pop(&mut self, qd: QDesc, size: Option<usize>) -> Result<QToken, Fail> {
trace!("pop() qd={:?}, size={:?}", qd, size);
debug_assert!(size.is_none() || ((size.unwrap() > 0) && (size.unwrap() <= limits::POP_SIZE_MAX)));
let coroutine = Box::pin(self.clone().pop_coroutine(qd, size).fuse());
self.runtime.clone().insert_io_coroutine("Catmem::pop", coroutine)
}
pub async fn pop_coroutine(self, qd: QDesc, size: Option<usize>) -> (QDesc, OperationResult) {
let mut queue: SharedCatmemQueue = match self.get_queue(&qd) {
Ok(queue) => queue,
Err(e) => return (qd, OperationResult::Failed(e)),
};
let (buf, _) = match queue.do_pop(size).await {
Ok(result) => result,
Err(e) => return (qd, OperationResult::Failed(e)),
};
(qd, OperationResult::Pop(None, buf))
}
pub fn wait_any(&mut self, qts: &[QToken], timeout: Duration) -> Result<(usize, demi_qresult_t), Fail> {
let (offset, qt, qd, result) = self.runtime.wait_any(qts, timeout)?;
Ok((offset, self.create_result(result, qd, qt)))
}
pub fn wait_next_n<Acceptor: FnMut(demi_qresult_t) -> bool>(
&mut self,
mut acceptor: Acceptor,
timeout: Duration
) -> Result<(), Fail>
{
self.runtime.clone().wait_next_n(
|qt, qd, result| acceptor(self.create_result(result, qd, qt)), timeout)
}
pub fn poll(&mut self) {
self.runtime.poll()
}
pub fn create_result(&self, result: OperationResult, qd: QDesc, qt: QToken) -> demi_qresult_t {
match result {
OperationResult::Connect => unreachable!("Memory libOSes do not support connect"),
OperationResult::Accept((_, _)) => unreachable!("Memory libOSes do not support connect"),
OperationResult::Push => demi_qresult_t {
qr_opcode: demi_opcode_t::DEMI_OPC_PUSH,
qr_qd: qd.into(),
qr_qt: qt.into(),
qr_ret: 0,
qr_value: unsafe { mem::zeroed() },
},
OperationResult::Pop(addr, bytes) => match self.into_sgarray(bytes) {
Ok(mut sga) => {
if let Some(addr) = addr {
sga.sga_addr = socketaddrv4_to_sockaddr(&addr);
}
let qr_value: demi_qr_value_t = demi_qr_value_t { sga };
demi_qresult_t {
qr_opcode: demi_opcode_t::DEMI_OPC_POP,
qr_qd: qd.into(),
qr_qt: qt.into(),
qr_ret: 0,
qr_value,
}
},
Err(e) => {
warn!("Operation Failed: {:?}", e);
demi_qresult_t {
qr_opcode: demi_opcode_t::DEMI_OPC_FAILED,
qr_qd: qd.into(),
qr_qt: qt.into(),
qr_ret: e.errno as i64,
qr_value: unsafe { mem::zeroed() },
}
},
},
OperationResult::Close => demi_qresult_t {
qr_opcode: demi_opcode_t::DEMI_OPC_CLOSE,
qr_qd: qd.into(),
qr_qt: qt.into(),
qr_ret: 0,
qr_value: unsafe { mem::zeroed() },
},
OperationResult::Failed(e) => {
warn!("Operation Failed: {:?}", e);
demi_qresult_t {
qr_opcode: demi_opcode_t::DEMI_OPC_FAILED,
qr_qd: qd.into(),
qr_qt: qt.into(),
qr_ret: e.errno as i64,
qr_value: unsafe { mem::zeroed() },
}
},
}
}
pub fn get_queue(&self, qd: &QDesc) -> Result<SharedCatmemQueue, Fail> {
Ok(self.runtime.get_qtable().get::<SharedCatmemQueue>(qd)?.clone())
}
}
impl Deref for SharedCatmemLibOS {
type Target = CatmemLibOS;
fn deref(&self) -> &Self::Target {
self.0.deref()
}
}
impl DerefMut for SharedCatmemLibOS {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0.deref_mut()
}
}
impl Drop for CatmemLibOS {
fn drop(&mut self) {
for boxed_queue in self.runtime.get_mut_qtable().drain() {
if let Ok(mut catmem_queue) = downcast_queue::<SharedCatmemQueue>(boxed_queue) {
if let Err(e) = catmem_queue.close() {
error!("push_eof() failed: {:?}", e);
warn!("leaking shared memory region");
}
}
}
}
}
impl MemoryRuntime for SharedCatmemLibOS {}