client/
client.rs

1use std::num::NonZeroUsize;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::thread;
4use std::thread::JoinHandle;
5use std::time;
6use std::time::Duration;
7
8use nix::errno::Errno;
9
10use rtipc::ChannelVector;
11use rtipc::ConsumeResult;
12use rtipc::Consumer;
13use rtipc::Producer;
14use rtipc::client_connect;
15use rtipc::error::*;
16use rtipc::{ChannelConfig, QueueConfig, VectorConfig};
17
18use crate::common::CommandId;
19use crate::common::MsgCommand;
20use crate::common::MsgEvent;
21use crate::common::MsgResponse;
22use crate::common::wait_pollin;
23
24mod common;
25
26static STOP_EVENT_LISTERNER: AtomicBool = AtomicBool::new(false);
27
28fn handle_events(mut consumer: Consumer<MsgEvent>) -> Result<(), Errno> {
29    while !STOP_EVENT_LISTERNER.load(Ordering::Relaxed) {
30        let eventfd = consumer.eventfd().unwrap();
31        let ev = wait_pollin(eventfd, Duration::from_millis(10))?;
32
33        if !ev {
34            continue;
35        }
36
37        match consumer.pop() {
38            ConsumeResult::QueueError => panic!(),
39            ConsumeResult::NoMessage => return Err(Errno::EBADMSG),
40            ConsumeResult::NoNewMessage => return Err(Errno::EBADMSG),
41            ConsumeResult::Success => {
42                println!(
43                    "client received event: {}",
44                    consumer.current_message().unwrap()
45                )
46            }
47            ConsumeResult::SuccessMessagesDiscarded => {
48                println!(
49                    "client received event: {}",
50                    consumer.current_message().unwrap()
51                )
52            }
53        };
54    }
55    println!("handle_events returns");
56    Ok(())
57}
58
59struct App {
60    command: Producer<MsgCommand>,
61    response: Consumer<MsgResponse>,
62    event_listener: Option<JoinHandle<Result<(), Errno>>>,
63}
64
65impl App {
66    pub fn new(mut vec: ChannelVector) -> Self {
67        let command = vec.take_producer(0).unwrap();
68        let response = vec.take_consumer(0).unwrap();
69        let event = vec.take_consumer(1).unwrap();
70
71        let event_listener = Some(thread::spawn(move || handle_events(event)));
72
73        Self {
74            command,
75            response,
76            event_listener,
77        }
78    }
79
80    pub fn run(&mut self, cmds: &[MsgCommand]) {
81        let pause = time::Duration::from_millis(10);
82
83        for cmd in cmds {
84            self.command.current_message().clone_from(cmd);
85            self.command.force_push();
86
87            loop {
88                match self.response.pop() {
89                    ConsumeResult::QueueError => panic!(),
90                    ConsumeResult::NoMessage => {
91                        thread::sleep(pause);
92                        continue;
93                    }
94                    ConsumeResult::NoNewMessage => {
95                        thread::sleep(pause);
96                        continue;
97                    }
98                    ConsumeResult::Success => {}
99                    ConsumeResult::SuccessMessagesDiscarded => {}
100                };
101
102                println!(
103                    "client received response: {}",
104                    self.response.current_message().unwrap()
105                );
106                break;
107            }
108        }
109        thread::sleep(time::Duration::from_millis(100));
110        STOP_EVENT_LISTERNER.store(true, Ordering::Relaxed);
111        self.event_listener.take().map(|h| h.join());
112    }
113}
114
115fn main() {
116    let commands: [MsgCommand; 6] = [
117        MsgCommand {
118            id: CommandId::Hello as u32,
119            args: [1, 2, 0],
120        },
121        MsgCommand {
122            id: CommandId::SendEvent as u32,
123            args: [11, 20, 0],
124        },
125        MsgCommand {
126            id: CommandId::SendEvent as u32,
127            args: [12, 20, 1],
128        },
129        MsgCommand {
130            id: CommandId::Div as u32,
131            args: [100, 7, 0],
132        },
133        MsgCommand {
134            id: CommandId::Div as u32,
135            args: [100, 0, 0],
136        },
137        MsgCommand {
138            id: CommandId::Stop as u32,
139            args: [0, 0, 0],
140        },
141    ];
142
143    let c2s_channels: [ChannelConfig; 1] = [ChannelConfig {
144        queue: QueueConfig {
145            additional_messages: 0,
146            message_size: unsafe { NonZeroUsize::new_unchecked(size_of::<MsgCommand>()) },
147            info: b"rpc command".to_vec(),
148        },
149        eventfd: true,
150    }];
151
152    let s2c_channels: [ChannelConfig; 2] = [
153        ChannelConfig {
154            queue: QueueConfig {
155                additional_messages: 0,
156                message_size: unsafe { NonZeroUsize::new_unchecked(size_of::<MsgResponse>()) },
157                info: b"rpc response".to_vec(),
158            },
159            eventfd: false,
160        },
161        ChannelConfig {
162            queue: QueueConfig {
163                additional_messages: 10,
164                message_size: unsafe { NonZeroUsize::new_unchecked(size_of::<MsgEvent>()) },
165                info: b"rpc event".to_vec(),
166            },
167            eventfd: true,
168        },
169    ];
170
171    let vparam = VectorConfig {
172        producers: c2s_channels.to_vec(),
173        consumers: s2c_channels.to_vec(),
174        info: b"rpc example".to_vec(),
175    };
176    let vec = client_connect("rtipc.sock", vparam).unwrap();
177    let mut app = App::new(vec);
178    thread::sleep(time::Duration::from_millis(100));
179    app.run(&commands);
180}