use nix::sys::socket::Backlog;
use std::time::Duration;
use rtipc::ChannelVector;
use rtipc::ConsumeResult;
use rtipc::Consumer;
use rtipc::Producer;
use rtipc::ProduceTryResult;
use rtipc::Server;
use crate::common::CommandId;
use crate::common::MsgCommand;
use crate::common::MsgEvent;
use crate::common::MsgResponse;
use crate::common::wait_pollin;
mod common;
struct App {
command: Consumer<MsgCommand>,
response: Producer<MsgResponse>,
event: Producer<MsgEvent>,
}
fn print_vector(vec: &ChannelVector) {
let vec_info = str::from_utf8(vec.info()).unwrap();
let cmd_info = str::from_utf8(vec.consumer_info(0).unwrap()).unwrap();
let rsp_info = str::from_utf8(vec.producer_info(0).unwrap()).unwrap();
let evt_info = str::from_utf8(vec.producer_info(1).unwrap()).unwrap();
println!(
"server received request vec={} cmd={} rsp={} evt={}",
vec_info, cmd_info, rsp_info, evt_info
);
}
impl App {
pub fn new(mut vec: ChannelVector) -> Self {
print_vector(&vec);
let command = vec.take_consumer(0).unwrap();
let response = vec.take_producer(0).unwrap();
let event = vec.take_producer(1).unwrap();
Self {
command,
response,
event,
}
}
fn run(&mut self) {
let mut run = true;
let mut cnt = 0;
while run {
let eventfd = self.command.eventfd().unwrap();
let _ = wait_pollin(eventfd, Duration::from_millis(10));
match self.command.pop() {
ConsumeResult::QueueError => panic!(),
ConsumeResult::NoMessage => continue,
ConsumeResult::NoNewMessage => continue,
ConsumeResult::Success => {}
ConsumeResult::SuccessMessagesDiscarded => {}
};
let cmd = self.command.current_message().unwrap();
self.response.current_message().id = cmd.id;
let args: [i32; 3] = cmd.args;
println!("server received command: {}", cmd);
let cmdid: CommandId = unsafe { ::std::mem::transmute(cmd.id) };
self.response.current_message().result = match cmdid {
CommandId::Hello => 0,
CommandId::Stop => {
run = false;
0
}
CommandId::SendEvent => {
self.send_events(args[0] as u32, args[1] as u32, args[2] != 0)
}
CommandId::Div => {
let (err, res) = self.div(args[0], args[1]);
self.response.current_message().data = res;
err
}
};
self.response.force_push();
cnt = cnt + 1;
}
}
fn send_events(&mut self, id: u32, num: u32, force: bool) -> i32 {
for i in 0..num {
let event = self.event.current_message();
event.id = id;
event.nr = i;
if force {
self.event.force_push();
} else {
if self.event.try_push() == ProduceTryResult::QueueFull {
return i as i32;
}
}
}
num as i32
}
fn div(&mut self, a: i32, b: i32) -> (i32, i32) {
if b == 0 {
return (-1, 0);
} else {
return (0, a / b);
}
}
}
fn main() {
let backlog = Backlog::new(1).unwrap();
let server = Server::new("rtipc.sock", backlog).unwrap();
let vec = server.conditional_accept(|_| true).unwrap();
let mut app = App::new(vec);
app.run();
}