socket_server_mocker/
udp_server.rs1use std::net::{SocketAddr, UdpSocket};
2use std::sync::mpsc::{Receiver, Sender};
3use std::thread;
4use std::time::Duration;
5
6use crate::server_mocker::MockerOptions;
7use crate::Instruction::{
8 self, ReceiveMessageWithMaxSize, SendMessage, SendMessageDependingOnLastReceivedMessage,
9};
10use crate::ServerMockerError::{
11 self, FailedToSendUdpMessage, GotSendMessageBeforeReceiveMessage, UnableToBindListener,
12 UnableToGetLocalAddress, UnableToReadUdpStream, UnableToSetReadTimeout,
13};
14
15#[derive(Debug, Clone)]
17pub struct UdpMocker {
18 pub socket_addr: SocketAddr,
20 pub net_timeout: Duration,
22 pub rx_timeout: Duration,
24 pub max_packet_size: usize,
26}
27
28impl Default for UdpMocker {
29 fn default() -> Self {
30 Self {
31 socket_addr: SocketAddr::from(([127, 0, 0, 1], 0)),
32 net_timeout: Duration::from_millis(100),
33 rx_timeout: Duration::from_millis(100),
34 max_packet_size: 65507,
35 }
36 }
37}
38
39impl MockerOptions for UdpMocker {
40 fn socket_address(&self) -> SocketAddr {
41 self.socket_addr
42 }
43
44 fn net_timeout(&self) -> Duration {
45 self.net_timeout
46 }
47
48 fn run(
49 self,
50 instruction_rx: Receiver<Vec<Instruction>>,
51 message_tx: Sender<Vec<u8>>,
52 error_tx: Sender<ServerMockerError>,
53 ) -> Result<SocketAddr, ServerMockerError> {
54 let connection = UdpSocket::bind(self.socket_addr)
55 .map_err(|e| UnableToBindListener(self.socket_addr, e))?;
56 let socket_addr = connection.local_addr().map_err(UnableToGetLocalAddress)?;
57
58 thread::spawn(move || {
59 UdpServerImpl {
60 options: self,
61 connection,
62 instruction_rx,
63 message_tx,
64 error_tx,
65 }
66 .run();
67 });
68
69 Ok(socket_addr)
70 }
71}
72
73struct UdpServerImpl {
75 options: UdpMocker,
76 connection: UdpSocket,
77 instruction_rx: Receiver<Vec<Instruction>>,
78 message_tx: Sender<Vec<u8>>,
79 error_tx: Sender<ServerMockerError>,
80}
81
82impl UdpServerImpl {
84 fn run(&self) {
85 let timeout = Some(self.options.net_timeout);
86 if let Err(e) = self.connection.set_read_timeout(timeout) {
87 self.error_tx.send(UnableToSetReadTimeout(e)).unwrap();
88 return;
89 }
90
91 let mut last_received_packed_with_addr: Option<(SocketAddr, Vec<u8>)> = None;
93
94 while let Ok(instructions) = self.instruction_rx.recv_timeout(self.options.rx_timeout) {
97 for instruction in instructions {
98 match instruction {
99 SendMessage(binary_message) => {
100 if let Err(e) = self.send_packet_to_last_client(
101 &binary_message,
102 &last_received_packed_with_addr,
103 ) {
104 self.error_tx.send(e).unwrap();
105 }
106 }
107 SendMessageDependingOnLastReceivedMessage(sent_message_calculator) => {
108 let message_to_send =
110 sent_message_calculator(match last_received_packed_with_addr {
111 Some((_, ref message)) => Some(message.clone()),
112 None => None,
113 });
114 if let Some(message_to_send) = message_to_send {
115 if let Err(e) = self.send_packet_to_last_client(
116 &message_to_send,
117 &last_received_packed_with_addr,
118 ) {
119 self.error_tx.send(e).unwrap();
120 }
121 }
122 }
123 Instruction::ReceiveMessage => {
124 let received_packet_with_addr =
125 match self.receive_packet(self.options.max_packet_size) {
126 Ok(received) => received,
127 Err(e) => {
128 self.error_tx.send(e).unwrap();
129 continue;
130 }
131 };
132
133 last_received_packed_with_addr = Some((
134 received_packet_with_addr.0,
135 received_packet_with_addr.1.clone(),
136 ));
137 self.message_tx.send(received_packet_with_addr.1).unwrap();
138 }
139 ReceiveMessageWithMaxSize(max_message_size) => {
140 match self.receive_packet(max_message_size) {
141 Ok(received) => {
142 last_received_packed_with_addr =
143 Some((received.0, received.1.clone()));
144 self.message_tx.send(received.1).unwrap();
145 }
146 Err(e) => self.error_tx.send(e).unwrap(),
147 };
148 }
149 Instruction::StopExchange => {
150 return;
151 }
152 }
153 }
154 }
155 }
156
157 fn receive_packet(
158 &self,
159 max_packet_size: usize,
160 ) -> Result<(SocketAddr, Vec<u8>), ServerMockerError> {
161 let mut whole_received_packet: Vec<u8> = vec![0; max_packet_size];
162
163 let (bytes_read, packet_sender_addr) = self
164 .connection
165 .recv_from(&mut whole_received_packet)
166 .map_err(UnableToReadUdpStream)?;
167
168 whole_received_packet.truncate(bytes_read);
170
171 Ok((packet_sender_addr, whole_received_packet))
172 }
173
174 fn send_packet_to_last_client(
175 &self,
176 message_to_send: &[u8],
177 last_received_packed_with_addr: &Option<(SocketAddr, Vec<u8>)>,
178 ) -> Result<(), ServerMockerError> {
179 last_received_packed_with_addr
181 .as_ref()
182 .ok_or(GotSendMessageBeforeReceiveMessage)?;
183
184 self.connection
185 .send_to(
186 message_to_send,
187 last_received_packed_with_addr.as_ref().unwrap().0,
188 )
189 .map_err(FailedToSendUdpMessage)?;
190 Ok(())
191 }
192}