server/
server.rs

1use nix::sys::socket::Backlog;
2
3use std::time::Duration;
4
5use rtipc::ChannelVector;
6use rtipc::ConsumeResult;
7use rtipc::Consumer;
8use rtipc::Producer;
9
10use rtipc::ProduceTryResult;
11
12use rtipc::Server;
13
14use crate::common::CommandId;
15use crate::common::MsgCommand;
16use crate::common::MsgEvent;
17use crate::common::MsgResponse;
18
19use crate::common::wait_pollin;
20
21mod common;
22
23struct App {
24    command: Consumer<MsgCommand>,
25    response: Producer<MsgResponse>,
26    event: Producer<MsgEvent>,
27}
28
29fn print_vector(vec: &ChannelVector) {
30    let vec_info = str::from_utf8(vec.info()).unwrap();
31    let cmd_info = str::from_utf8(vec.consumer_info(0).unwrap()).unwrap();
32    let rsp_info = str::from_utf8(vec.producer_info(0).unwrap()).unwrap();
33    let evt_info = str::from_utf8(vec.producer_info(1).unwrap()).unwrap();
34    println!(
35        "server received request vec={} cmd={} rsp={} evt={}",
36        vec_info, cmd_info, rsp_info, evt_info
37    );
38}
39
40impl App {
41    pub fn new(mut vec: ChannelVector) -> Self {
42        print_vector(&vec);
43        let command = vec.take_consumer(0).unwrap();
44        let response = vec.take_producer(0).unwrap();
45        let event = vec.take_producer(1).unwrap();
46
47        Self {
48            command,
49            response,
50            event,
51        }
52    }
53    fn run(&mut self) {
54        let mut run = true;
55        let mut cnt = 0;
56
57        while run {
58            let eventfd = self.command.eventfd().unwrap();
59            let _ = wait_pollin(eventfd, Duration::from_millis(10));
60            match self.command.pop() {
61                ConsumeResult::QueueError => panic!(),
62                ConsumeResult::NoMessage => continue,
63                ConsumeResult::NoNewMessage => continue,
64                ConsumeResult::Success => {}
65                ConsumeResult::SuccessMessagesDiscarded => {}
66            };
67            let cmd = self.command.current_message().unwrap();
68            self.response.current_message().id = cmd.id;
69            let args: [i32; 3] = cmd.args;
70            println!("server received command: {}", cmd);
71
72            let cmdid: CommandId = unsafe { ::std::mem::transmute(cmd.id) };
73            self.response.current_message().result = match cmdid {
74                CommandId::Hello => 0,
75                CommandId::Stop => {
76                    run = false;
77                    0
78                }
79                CommandId::SendEvent => {
80                    self.send_events(args[0] as u32, args[1] as u32, args[2] != 0)
81                }
82                CommandId::Div => {
83                    let (err, res) = self.div(args[0], args[1]);
84                    self.response.current_message().data = res;
85                    err
86                }
87            };
88            self.response.force_push();
89
90            cnt = cnt + 1;
91        }
92    }
93    fn send_events(&mut self, id: u32, num: u32, force: bool) -> i32 {
94        for i in 0..num {
95            let event = self.event.current_message();
96            event.id = id;
97            event.nr = i;
98            if force {
99                self.event.force_push();
100            } else {
101                if self.event.try_push() == ProduceTryResult::QueueFull {
102                    return i as i32;
103                }
104            }
105        }
106        num as i32
107    }
108    fn div(&mut self, a: i32, b: i32) -> (i32, i32) {
109        if b == 0 {
110            return (-1, 0);
111        } else {
112            return (0, a / b);
113        }
114    }
115}
116
117fn main() {
118    let backlog = Backlog::new(1).unwrap();
119    let server = Server::new("rtipc.sock", backlog).unwrap();
120    let vec = server.conditional_accept(|_| true).unwrap();
121    let mut app = App::new(vec);
122    app.run();
123}