1use std::num::NonZeroUsize;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::thread;
4use std::thread::JoinHandle;
5use std::time;
6use std::time::Duration;
7
8use nix::errno::Errno;
9
10use rtipc::ChannelVector;
11use rtipc::ConsumeResult;
12use rtipc::Consumer;
13use rtipc::Producer;
14use rtipc::client_connect;
15use rtipc::error::*;
16use rtipc::{ChannelConfig, QueueConfig, VectorConfig};
17
18use crate::common::CommandId;
19use crate::common::MsgCommand;
20use crate::common::MsgEvent;
21use crate::common::MsgResponse;
22use crate::common::wait_pollin;
23
24mod common;
25
26static STOP_EVENT_LISTERNER: AtomicBool = AtomicBool::new(false);
27
28fn handle_events(mut consumer: Consumer<MsgEvent>) -> Result<(), Errno> {
29 while !STOP_EVENT_LISTERNER.load(Ordering::Relaxed) {
30 let eventfd = consumer.eventfd().unwrap();
31 let ev = wait_pollin(eventfd, Duration::from_millis(10))?;
32
33 if !ev {
34 continue;
35 }
36
37 match consumer.pop() {
38 ConsumeResult::QueueError => panic!(),
39 ConsumeResult::NoMessage => return Err(Errno::EBADMSG),
40 ConsumeResult::NoNewMessage => return Err(Errno::EBADMSG),
41 ConsumeResult::Success => {
42 println!(
43 "client received event: {}",
44 consumer.current_message().unwrap()
45 )
46 }
47 ConsumeResult::SuccessMessagesDiscarded => {
48 println!(
49 "client received event: {}",
50 consumer.current_message().unwrap()
51 )
52 }
53 };
54 }
55 println!("handle_events returns");
56 Ok(())
57}
58
59struct App {
60 command: Producer<MsgCommand>,
61 response: Consumer<MsgResponse>,
62 event_listener: Option<JoinHandle<Result<(), Errno>>>,
63}
64
65impl App {
66 pub fn new(mut vec: ChannelVector) -> Self {
67 let command = vec.take_producer(0).unwrap();
68 let response = vec.take_consumer(0).unwrap();
69 let event = vec.take_consumer(1).unwrap();
70
71 let event_listener = Some(thread::spawn(move || handle_events(event)));
72
73 Self {
74 command,
75 response,
76 event_listener,
77 }
78 }
79
80 pub fn run(&mut self, cmds: &[MsgCommand]) {
81 let pause = time::Duration::from_millis(10);
82
83 for cmd in cmds {
84 self.command.current_message().clone_from(cmd);
85 self.command.force_push();
86
87 loop {
88 match self.response.pop() {
89 ConsumeResult::QueueError => panic!(),
90 ConsumeResult::NoMessage => {
91 thread::sleep(pause);
92 continue;
93 }
94 ConsumeResult::NoNewMessage => {
95 thread::sleep(pause);
96 continue;
97 }
98 ConsumeResult::Success => {}
99 ConsumeResult::SuccessMessagesDiscarded => {}
100 };
101
102 println!(
103 "client received response: {}",
104 self.response.current_message().unwrap()
105 );
106 break;
107 }
108 }
109 thread::sleep(time::Duration::from_millis(100));
110 STOP_EVENT_LISTERNER.store(true, Ordering::Relaxed);
111 self.event_listener.take().map(|h| h.join());
112 }
113}
114
115fn main() {
116 let commands: [MsgCommand; 6] = [
117 MsgCommand {
118 id: CommandId::Hello as u32,
119 args: [1, 2, 0],
120 },
121 MsgCommand {
122 id: CommandId::SendEvent as u32,
123 args: [11, 20, 0],
124 },
125 MsgCommand {
126 id: CommandId::SendEvent as u32,
127 args: [12, 20, 1],
128 },
129 MsgCommand {
130 id: CommandId::Div as u32,
131 args: [100, 7, 0],
132 },
133 MsgCommand {
134 id: CommandId::Div as u32,
135 args: [100, 0, 0],
136 },
137 MsgCommand {
138 id: CommandId::Stop as u32,
139 args: [0, 0, 0],
140 },
141 ];
142
143 let c2s_channels: [ChannelConfig; 1] = [ChannelConfig {
144 queue: QueueConfig {
145 additional_messages: 0,
146 message_size: unsafe { NonZeroUsize::new_unchecked(size_of::<MsgCommand>()) },
147 info: b"rpc command".to_vec(),
148 },
149 eventfd: true,
150 }];
151
152 let s2c_channels: [ChannelConfig; 2] = [
153 ChannelConfig {
154 queue: QueueConfig {
155 additional_messages: 0,
156 message_size: unsafe { NonZeroUsize::new_unchecked(size_of::<MsgResponse>()) },
157 info: b"rpc response".to_vec(),
158 },
159 eventfd: false,
160 },
161 ChannelConfig {
162 queue: QueueConfig {
163 additional_messages: 10,
164 message_size: unsafe { NonZeroUsize::new_unchecked(size_of::<MsgEvent>()) },
165 info: b"rpc event".to_vec(),
166 },
167 eventfd: true,
168 },
169 ];
170
171 let vparam = VectorConfig {
172 producers: c2s_channels.to_vec(),
173 consumers: s2c_channels.to_vec(),
174 info: b"rpc example".to_vec(),
175 };
176 let vec = client_connect("rtipc.sock", vparam).unwrap();
177 let mut app = App::new(vec);
178 thread::sleep(time::Duration::from_millis(100));
179 app.run(&commands);
180}