use kekbit::api::ReadError::*;
use kekbit::api::{EncoderHandler, Reader, Writer};
use kekbit::core::*;
use log::{error, info};
use nix::sys::wait::waitpid;
use nix::unistd::{fork, getpid, ForkResult};
use simple_logger::SimpleLogger;
use std::path::Path;
use std::process::exit;
use std::result::Result;
const ITERATIONS: u32 = 1 * 1_000_000_0;
const Q_PATH: &str = "/dev/shm";
pub fn run_writer() -> Result<(), ()> {
info!("Creating writer process ...{}", getpid());
let chunk_size = 100;
let metadata = Metadata::new(100, 1000, chunk_size * (ITERATIONS + 100), 1000, 99999999999, TickUnit::Nanos);
let mut writer = shm_writer(&Path::new(Q_PATH), &metadata, EncoderHandler::default()).unwrap();
let msg_bytes = "There are 10 kinds of people: those who know binary and those who don't".as_bytes();
let mut total = 16;
for _i in 0..ITERATIONS {
let res = writer.write(&msg_bytes);
match res {
Ok(b) => {
total += b;
}
Err(err) => {
error!("Write failed {:?}", err);
panic!("Write failed");
}
};
}
info!("We wrote {} bytes ", total);
Ok(())
}
pub fn run_reader() -> Result<(), ()> {
info!("Creating reader porcess ...{}", getpid());
let mut reader = try_shm_reader(&Path::new(Q_PATH), 1000, 2000, 200).unwrap();
let mut stop = false;
let mut msg_count = 0;
while !stop {
match reader.try_read() {
Ok(Some(_)) => msg_count += 1,
Ok(None) => (),
Err(read_err) => match read_err {
Timeout(_) => {
info!("Timeout detected by reader");
stop = true;
}
Closed => {
info!("Closed channel detected by reader");
stop = true;
}
ChannelFull | Failed => {
error!("Read failed. Will stop. So far we read {} messages", msg_count);
panic!("Read failed!!!!");
}
},
}
}
info!(
"We read {} bytes in {} messages. Channel state is {:?}",
reader.position(),
msg_count,
reader.exhausted()
);
Ok(())
}
fn main() {
SimpleLogger::new().init().unwrap();
info!("Kekbit Driver PID is {}.", getpid());
let w_pid = unsafe {
match fork() {
Ok(ForkResult::Child) => {
exit(match run_writer() {
Ok(_) => 0,
Err(err) => {
error!("error: {:?}", err);
1
}
});
}
Ok(ForkResult::Parent { child, .. }) => child,
Err(err) => {
panic!("[main] writer fork() failed: {}", err);
}
}
};
let mut rpids = Vec::new();
for _i in 0..1 {
let r_pid = unsafe {
match fork() {
Ok(ForkResult::Child) => {
exit(match run_reader() {
Ok(_) => 0,
Err(err) => {
error!("error: {:?}", err);
1
}
});
}
Ok(ForkResult::Parent { child, .. }) => child,
Err(err) => {
panic!("[main] reader fork() failed: {}", err);
}
}
};
rpids.push(r_pid);
}
for r_pid in rpids {
match waitpid(r_pid, None) {
Ok(status) => info!("[main] Reader {} completed with status {:?}", r_pid, status),
Err(err) => panic!("[main] waitpid() on reader {} failed: {}", r_pid, err),
}
}
match waitpid(w_pid, None) {
Ok(status) => info!("[main] Writer completed with status {:?}", status),
Err(err) => panic!("[main] waitpid() on writer failed: {}", err),
}
let shm_file_path = storage_path(&Path::new(Q_PATH), 1000);
if shm_file_path.exists() {
std::fs::remove_file(&shm_file_path).unwrap();
info!("Channel data file {:?} removed", &shm_file_path);
}
info!("Kekbit Driver Done!");
}