socket_server_mocker/
udp_server.rs

1use std::net::{SocketAddr, UdpSocket};
2use std::sync::mpsc::{Receiver, Sender};
3use std::thread;
4use std::time::Duration;
5
6use crate::server_mocker::MockerOptions;
7use crate::Instruction::{
8    self, ReceiveMessageWithMaxSize, SendMessage, SendMessageDependingOnLastReceivedMessage,
9};
10use crate::ServerMockerError::{
11    self, FailedToSendUdpMessage, GotSendMessageBeforeReceiveMessage, UnableToBindListener,
12    UnableToGetLocalAddress, UnableToReadUdpStream, UnableToSetReadTimeout,
13};
14
15/// Options for the UDP server mocker
16#[derive(Debug, Clone)]
17pub struct UdpMocker {
18    /// Socket address on which the server will listen. Will be set to `127.0.0.1:0` by default.
19    pub socket_addr: SocketAddr,
20    /// Timeout for the server to wait for a message from the client.
21    pub net_timeout: Duration,
22    /// Timeout if no more instruction is available and [`Instruction::StopExchange`] hasn't been sent
23    pub rx_timeout: Duration,
24    /// Maximum size of a UDP packet in bytes, specified in RFC 768
25    pub max_packet_size: usize,
26}
27
28impl Default for UdpMocker {
29    fn default() -> Self {
30        Self {
31            socket_addr: SocketAddr::from(([127, 0, 0, 1], 0)),
32            net_timeout: Duration::from_millis(100),
33            rx_timeout: Duration::from_millis(100),
34            max_packet_size: 65507,
35        }
36    }
37}
38
39impl MockerOptions for UdpMocker {
40    fn socket_address(&self) -> SocketAddr {
41        self.socket_addr
42    }
43
44    fn net_timeout(&self) -> Duration {
45        self.net_timeout
46    }
47
48    fn run(
49        self,
50        instruction_rx: Receiver<Vec<Instruction>>,
51        message_tx: Sender<Vec<u8>>,
52        error_tx: Sender<ServerMockerError>,
53    ) -> Result<SocketAddr, ServerMockerError> {
54        let connection = UdpSocket::bind(self.socket_addr)
55            .map_err(|e| UnableToBindListener(self.socket_addr, e))?;
56        let socket_addr = connection.local_addr().map_err(UnableToGetLocalAddress)?;
57
58        thread::spawn(move || {
59            UdpServerImpl {
60                options: self,
61                connection,
62                instruction_rx,
63                message_tx,
64                error_tx,
65            }
66            .run();
67        });
68
69        Ok(socket_addr)
70    }
71}
72
73/// TCP server mocker thread implementation
74struct UdpServerImpl {
75    options: UdpMocker,
76    connection: UdpSocket,
77    instruction_rx: Receiver<Vec<Instruction>>,
78    message_tx: Sender<Vec<u8>>,
79    error_tx: Sender<ServerMockerError>,
80}
81
82/// Specific implementation methods and constants for UDP server mocker
83impl UdpServerImpl {
84    fn run(&self) {
85        let timeout = Some(self.options.net_timeout);
86        if let Err(e) = self.connection.set_read_timeout(timeout) {
87            self.error_tx.send(UnableToSetReadTimeout(e)).unwrap();
88            return;
89        }
90
91        // Last message received with the address of the client, used to send the response
92        let mut last_received_packed_with_addr: Option<(SocketAddr, Vec<u8>)> = None;
93
94        // Timeout: if no more instruction is available and StopExchange hasn't been sent
95        // Stop server if no more instruction is available and StopExchange hasn't been sent
96        while let Ok(instructions) = self.instruction_rx.recv_timeout(self.options.rx_timeout) {
97            for instruction in instructions {
98                match instruction {
99                    SendMessage(binary_message) => {
100                        if let Err(e) = self.send_packet_to_last_client(
101                            &binary_message,
102                            &last_received_packed_with_addr,
103                        ) {
104                            self.error_tx.send(e).unwrap();
105                        }
106                    }
107                    SendMessageDependingOnLastReceivedMessage(sent_message_calculator) => {
108                        // Pass None if no message has been received yet
109                        let message_to_send =
110                            sent_message_calculator(match last_received_packed_with_addr {
111                                Some((_, ref message)) => Some(message.clone()),
112                                None => None,
113                            });
114                        if let Some(message_to_send) = message_to_send {
115                            if let Err(e) = self.send_packet_to_last_client(
116                                &message_to_send,
117                                &last_received_packed_with_addr,
118                            ) {
119                                self.error_tx.send(e).unwrap();
120                            }
121                        }
122                    }
123                    Instruction::ReceiveMessage => {
124                        let received_packet_with_addr =
125                            match self.receive_packet(self.options.max_packet_size) {
126                                Ok(received) => received,
127                                Err(e) => {
128                                    self.error_tx.send(e).unwrap();
129                                    continue;
130                                }
131                            };
132
133                        last_received_packed_with_addr = Some((
134                            received_packet_with_addr.0,
135                            received_packet_with_addr.1.clone(),
136                        ));
137                        self.message_tx.send(received_packet_with_addr.1).unwrap();
138                    }
139                    ReceiveMessageWithMaxSize(max_message_size) => {
140                        match self.receive_packet(max_message_size) {
141                            Ok(received) => {
142                                last_received_packed_with_addr =
143                                    Some((received.0, received.1.clone()));
144                                self.message_tx.send(received.1).unwrap();
145                            }
146                            Err(e) => self.error_tx.send(e).unwrap(),
147                        };
148                    }
149                    Instruction::StopExchange => {
150                        return;
151                    }
152                }
153            }
154        }
155    }
156
157    fn receive_packet(
158        &self,
159        max_packet_size: usize,
160    ) -> Result<(SocketAddr, Vec<u8>), ServerMockerError> {
161        let mut whole_received_packet: Vec<u8> = vec![0; max_packet_size];
162
163        let (bytes_read, packet_sender_addr) = self
164            .connection
165            .recv_from(&mut whole_received_packet)
166            .map_err(UnableToReadUdpStream)?;
167
168        // Remove the extra bytes
169        whole_received_packet.truncate(bytes_read);
170
171        Ok((packet_sender_addr, whole_received_packet))
172    }
173
174    fn send_packet_to_last_client(
175        &self,
176        message_to_send: &[u8],
177        last_received_packed_with_addr: &Option<(SocketAddr, Vec<u8>)>,
178    ) -> Result<(), ServerMockerError> {
179        // Last message received with the address of the client, used to send the response
180        last_received_packed_with_addr
181            .as_ref()
182            .ok_or(GotSendMessageBeforeReceiveMessage)?;
183
184        self.connection
185            .send_to(
186                message_to_send,
187                last_received_packed_with_addr.as_ref().unwrap().0,
188            )
189            .map_err(FailedToSendUdpMessage)?;
190        Ok(())
191    }
192}