use ::anyhow::Result;
use ::core::slice;
use ::demikernel::{
demi_sgarray_t,
runtime::types::demi_opcode_t,
LibOS,
LibOSName,
QDesc,
QToken,
};
use ::std::env;
const BUFFER_SIZE: usize = 64;
const NROUNDS: u8 = 128;
fn mksga(libos: &mut LibOS, size: usize, value: u8) -> Result<demi_sgarray_t> {
let sga: demi_sgarray_t = match libos.sgaalloc(size) {
Ok(sga) => sga,
Err(e) => anyhow::bail!("failed to allocate scatter-gather array: {:?}", e),
};
if sga.sga_segs[0].sgaseg_len as usize != size {
freesga(libos, sga);
let seglen: usize = sga.sga_segs[0].sgaseg_len as usize;
anyhow::bail!(
"failed to allocate scatter-gather array: expected size={:?} allocated size={:?}",
size,
seglen
);
}
let ptr: *mut u8 = sga.sga_segs[0].sgaseg_buf as *mut u8;
let len: usize = sga.sga_segs[0].sgaseg_len as usize;
let slice: &mut [u8] = unsafe { slice::from_raw_parts_mut(ptr, len) };
slice.fill(value);
Ok(sga)
}
fn chksga(sga: &demi_sgarray_t, expected_value: u8) -> Result<usize> {
let recvbuf: &[u8] = unsafe {
slice::from_raw_parts(
sga.sga_segs[0].sgaseg_buf as *const u8,
sga.sga_segs[0].sgaseg_len as usize,
)
};
for x in &recvbuf[..] {
if *x != expected_value {
demikernel::ensure_eq!(*x, expected_value);
}
}
Ok(recvbuf.len())
}
fn freesga(libos: &mut LibOS, sga: demi_sgarray_t) {
if let Err(e) = libos.sgafree(sga) {
println!("ERROR: sgafree() failed (error={:?})", e);
println!("WARN: leaking sga");
}
}
fn push_and_wait(libos: &mut LibOS, pipeqd: QDesc, length: usize, value: u8) -> Result<()> {
let sga: demi_sgarray_t = mksga(libos, length, value)?;
let qt: QToken = match libos.push(pipeqd, &sga) {
Ok(qt) => qt,
Err(e) => {
freesga(libos, sga);
anyhow::bail!("failed to push: {:?}", e)
},
};
if let Err(e) = libos.wait(qt, None) {
freesga(libos, sga);
anyhow::bail!("failed to wait on push: {:?}", e)
}
libos.sgafree(sga)?;
Ok(())
}
fn pop_and_wait(libos: &mut LibOS, pipeqd: QDesc, expected_length: usize, expected_value: u8) -> Result<()> {
let mut nreceived: usize = 0;
while nreceived < expected_length {
let qt: QToken = libos.pop(pipeqd, None)?;
let sga: demi_sgarray_t = match libos.wait(qt, None) {
Ok(qr) if qr.qr_opcode == demi_opcode_t::DEMI_OPC_POP => unsafe { qr.qr_value.sga },
Ok(_) => anyhow::bail!("unexpected operation result"),
_ => anyhow::bail!("server failed to wait()"),
};
nreceived += match chksga(&sga, expected_value) {
Ok(len) => len,
Err(e) => {
freesga(libos, sga);
anyhow::bail!("sga contents did not match: {:?}", e)
},
};
libos.sgafree(sga)?;
}
Ok(())
}
pub struct PipeServer {
libos: LibOS,
pipeqd: QDesc,
}
impl PipeServer {
pub fn new(mut libos: LibOS, pipe_name: &str) -> Result<Self> {
let pipeqd: QDesc = match libos.create_pipe(&format!("{}", pipe_name)) {
Ok(qd) => qd,
Err(e) => {
anyhow::bail!("failed to open memory queue: {:?}", e)
},
};
return Ok(Self { libos, pipeqd });
}
pub fn run(&mut self, buffer_size: usize, nrounds: u8) -> Result<()> {
for i in 0..nrounds {
if let Err(e) = pop_and_wait(&mut self.libos, self.pipeqd, buffer_size, i) {
anyhow::bail!("failed to pop memory queue: {:?}", e);
}
if let Err(e) = push_and_wait(&mut self.libos, self.pipeqd, buffer_size, i) {
anyhow::bail!("failed to push memory queue: {:?}", e);
}
println!("pong {:?}", i);
}
Ok(())
}
fn close(&mut self, pipeqd: QDesc) {
if let Err(e) = self.libos.close(pipeqd) {
println!("ERROR: close() failed (error={:?})", e);
println!("WARN: leaking pipeqd={:?}", pipeqd);
}
}
}
impl Drop for PipeServer {
fn drop(&mut self) {
self.close(self.pipeqd);
}
}
pub struct PipeClient {
libos: LibOS,
pipeqd: QDesc,
}
impl PipeClient {
pub fn new(mut libos: LibOS, pipe_name: &str) -> Result<Self> {
let pipeqd = match libos.open_pipe(&format!("{}", pipe_name)) {
Ok(qd) => qd,
Err(e) => anyhow::bail!("failed to open memory queue: {:?}", e.cause),
};
return Ok(Self { libos, pipeqd });
}
pub fn run(&mut self, buffer_size: usize, nrounds: u8) -> Result<()> {
for i in 0..nrounds {
if let Err(e) = push_and_wait(&mut self.libos, self.pipeqd, buffer_size, i) {
anyhow::bail!("failed to pop memory queue: {:?}", e)
}
if let Err(e) = pop_and_wait(&mut self.libos, self.pipeqd, buffer_size, i) {
anyhow::bail!("failed to push memory queue: {:?}", e)
}
println!("pong {:?}", i);
}
Ok(())
}
fn close(&mut self, pipeqd: QDesc) {
if let Err(e) = self.libos.close(pipeqd) {
println!("ERROR: close() failed (error={:?})", e);
println!("WARN: leaking pipeqd={:?}", pipeqd);
}
}
}
impl Drop for PipeClient {
fn drop(&mut self) {
self.close(self.pipeqd);
}
}
fn usage(program_name: &String) {
println!("Usage: {} MODE pipe-name", program_name);
println!("Modes:\n");
println!(" --client Run program in client mode.");
println!(" --server Run program in server mode.");
}
pub fn main() -> Result<()> {
let args: Vec<String> = env::args().collect();
if args.len() >= 3 {
let libos_name: LibOSName = match LibOSName::from_env() {
Ok(libos_name) => libos_name.into(),
Err(e) => anyhow::bail!("{:?}", e),
};
let libos: LibOS = match LibOS::new(libos_name, None) {
Ok(libos) => libos,
Err(e) => anyhow::bail!("failed to initialize libos: {:?}", e),
};
let pipe_name: &str = &args[2];
if args[1] == "--server" {
let mut server: PipeServer = PipeServer::new(libos, pipe_name)?;
return server.run(BUFFER_SIZE, NROUNDS);
} else if args[1] == "--client" {
let mut client: PipeClient = PipeClient::new(libos, pipe_name)?;
return client.run(BUFFER_SIZE, NROUNDS);
}
}
usage(&args[0]);
Ok(())
}