1use nix::sys::socket::Backlog;
2
3use std::time::Duration;
4
5use rtipc::ChannelVector;
6use rtipc::ConsumeResult;
7use rtipc::Consumer;
8use rtipc::Producer;
9
10use rtipc::ProduceTryResult;
11
12use rtipc::Server;
13
14use crate::common::CommandId;
15use crate::common::MsgCommand;
16use crate::common::MsgEvent;
17use crate::common::MsgResponse;
18
19use crate::common::wait_pollin;
20
21mod common;
22
23struct App {
24 command: Consumer<MsgCommand>,
25 response: Producer<MsgResponse>,
26 event: Producer<MsgEvent>,
27}
28
29fn print_vector(vec: &ChannelVector) {
30 let vec_info = str::from_utf8(vec.info()).unwrap();
31 let cmd_info = str::from_utf8(vec.consumer_info(0).unwrap()).unwrap();
32 let rsp_info = str::from_utf8(vec.producer_info(0).unwrap()).unwrap();
33 let evt_info = str::from_utf8(vec.producer_info(1).unwrap()).unwrap();
34 println!(
35 "server received request vec={} cmd={} rsp={} evt={}",
36 vec_info, cmd_info, rsp_info, evt_info
37 );
38}
39
40impl App {
41 pub fn new(mut vec: ChannelVector) -> Self {
42 print_vector(&vec);
43 let command = vec.take_consumer(0).unwrap();
44 let response = vec.take_producer(0).unwrap();
45 let event = vec.take_producer(1).unwrap();
46
47 Self {
48 command,
49 response,
50 event,
51 }
52 }
53 fn run(&mut self) {
54 let mut run = true;
55 let mut cnt = 0;
56
57 while run {
58 let eventfd = self.command.eventfd().unwrap();
59 let _ = wait_pollin(eventfd, Duration::from_millis(10));
60 match self.command.pop() {
61 ConsumeResult::QueueError => panic!(),
62 ConsumeResult::NoMessage => continue,
63 ConsumeResult::NoNewMessage => continue,
64 ConsumeResult::Success => {}
65 ConsumeResult::SuccessMessagesDiscarded => {}
66 };
67 let cmd = self.command.current_message().unwrap();
68 self.response.current_message().id = cmd.id;
69 let args: [i32; 3] = cmd.args;
70 println!("server received command: {}", cmd);
71
72 let cmdid: CommandId = unsafe { ::std::mem::transmute(cmd.id) };
73 self.response.current_message().result = match cmdid {
74 CommandId::Hello => 0,
75 CommandId::Stop => {
76 run = false;
77 0
78 }
79 CommandId::SendEvent => {
80 self.send_events(args[0] as u32, args[1] as u32, args[2] != 0)
81 }
82 CommandId::Div => {
83 let (err, res) = self.div(args[0], args[1]);
84 self.response.current_message().data = res;
85 err
86 }
87 };
88 self.response.force_push();
89
90 cnt = cnt + 1;
91 }
92 }
93 fn send_events(&mut self, id: u32, num: u32, force: bool) -> i32 {
94 for i in 0..num {
95 let event = self.event.current_message();
96 event.id = id;
97 event.nr = i;
98 if force {
99 self.event.force_push();
100 } else {
101 if self.event.try_push() == ProduceTryResult::QueueFull {
102 return i as i32;
103 }
104 }
105 }
106 num as i32
107 }
108 fn div(&mut self, a: i32, b: i32) -> (i32, i32) {
109 if b == 0 {
110 return (-1, 0);
111 } else {
112 return (0, a / b);
113 }
114 }
115}
116
117fn main() {
118 let backlog = Backlog::new(1).unwrap();
119 let server = Server::new("rtipc.sock", backlog).unwrap();
120 let vec = server.conditional_accept(|_| true).unwrap();
121 let mut app = App::new(vec);
122 app.run();
123}