use anyhow::Result;
use demikernel::{
demi_sgarray_t,
runtime::types::{
demi_opcode_t,
demi_qresult_t,
},
LibOS,
QDesc,
QToken,
};
pub struct PipeServer {
libos: LibOS,
pipe_name: String,
qd: Option<QDesc>,
}
impl PipeServer {
pub fn new(libos: LibOS, pipe_name: String) -> Result<Self> {
Ok(Self {
libos,
pipe_name,
qd: None,
})
}
pub fn run(&mut self) -> Result<()> {
let qd: QDesc = self.libos.create_pipe(&format!("{}:rx", self.pipe_name))?;
self.qd = Some(qd);
while self.pop_and_wait(qd)? > 0 {}
self.libos.close(qd)?;
Ok(())
}
fn pop_and_wait(&mut self, pipeqd: QDesc) -> Result<usize> {
let mut n: usize = 0;
let qt: QToken = self.libos.pop(pipeqd, None)?;
let qr: demi_qresult_t = self.libos.wait(qt, None)?;
let sga: demi_sgarray_t = match qr.qr_opcode {
demi_opcode_t::DEMI_OPC_POP => unsafe { qr.qr_value.sga },
demi_opcode_t::DEMI_OPC_FAILED => return Ok(self.handle_fail(&qr)?),
_ => anyhow::bail!("unexpected operation result"),
};
n += sga.sga_segs[0].sgaseg_len as usize;
self.libos.sgafree(sga)?;
Ok(n)
}
fn handle_fail(&mut self, qr: &demi_qresult_t) -> Result<usize> {
let qd: QDesc = qr.qr_qd.into();
let qt: QToken = qr.qr_qt.into();
let errno: i64 = qr.qr_ret;
if errno == libc::ECONNRESET as i64 {
println!("INFO: client reset connection (qd={:?})", qd);
} else {
println!(
"WARN: operation failed, ignoring (qd={:?}, qt={:?}, errno={:?})",
qd, qt, errno
);
}
Ok(0)
}
}
impl Drop for PipeServer {
fn drop(&mut self) {
if let Some(qd) = self.qd {
if let Err(e) = self.libos.close(qd) {
println!("WARN: leaking pipeqd={:?}", qd);
println!("ERROR: close() failed (error={:?})", e);
}
}
}
}