1use std::io;
4use std::net::{SocketAddr, UdpSocket};
5use std::sync::atomic;
6use std::sync::{Arc, Mutex, mpsc};
7use std::thread;
8use std::time::{Duration, Instant};
9
10use crate::dsu::DSUFrame;
11use crate::errors::ServerError;
12use crate::{READ_ATOMIC_BOOL_ORDERING, dsu};
13
14const CLIENT_TIMEOUT: Duration = Duration::from_secs(5);
15const VERSION_TYPE: u32 = 0x100000;
16const INFO_TYPE: u32 = 0x100001;
17const DATA_TYPE: u32 = 0x100002;
18
19#[derive(Debug, Clone)]
20pub struct ServerConfig {
21 pub bind_addr: String,
23 pub port: u16,
25 pub invert_pitch: bool,
27 pub slot: u8,
29}
30
31#[derive(Debug)]
32struct Client {
33 addr: SocketAddr,
34 id: u32,
35 slot: u8,
36 last_seen: Instant,
37 packet_counter: u32,
38}
39
40pub struct Server {
42 main_thread_run: Arc<atomic::AtomicBool>,
43 server_thread_run: Arc<atomic::AtomicBool>,
44 clients: Arc<Mutex<Vec<Client>>>,
45 config: ServerConfig,
46 socket: UdpSocket,
47}
48
49struct SendThreadContext {
51 pub main_thread_run: Arc<atomic::AtomicBool>,
52 pub server_thread_run: Arc<atomic::AtomicBool>,
53 pub clients: Arc<Mutex<Vec<Client>>>,
54 pub config: ServerConfig,
55 pub socket: UdpSocket,
56 pub rx: mpsc::Receiver<DSUFrame>,
57}
58
59type ThreadResults = (
60 io::Result<()>,
61 Result<(), Box<dyn std::any::Any + Send + 'static>>,
62);
63
64impl Server {
65 pub fn new(
70 main_thread_run: Arc<atomic::AtomicBool>,
71 config: ServerConfig,
72 ) -> Result<Self, ServerError> {
73 let addr = format!("{}:{}", config.bind_addr, config.port);
74
75 let socket = UdpSocket::bind(&addr).map_err(ServerError::UdpSocketOperationError)?;
76
77 socket
78 .set_read_timeout(Some(Duration::from_secs(1)))
79 .map_err(ServerError::UdpSocketOperationError)?;
80
81 log::info!("CemuHook server listening on {}", addr);
82
83 let clients: Arc<Mutex<Vec<Client>>> = Arc::new(Mutex::new(Vec::new()));
84 let server_thread_run = Arc::new(atomic::AtomicBool::new(true));
85
86 Ok(Self {
87 main_thread_run,
88 server_thread_run,
89 clients,
90 config,
91 socket,
92 })
93 }
94
95 pub fn run(&self, rx: mpsc::Receiver<DSUFrame>) -> Result<ThreadResults, ServerError> {
99 let send_context = SendThreadContext {
100 main_thread_run: self.main_thread_run.clone(),
101 server_thread_run: self.server_thread_run.clone(),
102 clients: self.clients.clone(),
103 config: self.config.clone(),
104 socket: self
105 .socket
106 .try_clone()
107 .map_err(ServerError::UdpSocketCloneFailed)?,
108 rx,
109 };
110
111 let send_handle = thread::spawn(move || {
113 Self::send_loop(send_context);
114 });
115
116 let recv_result = self.recv_loop();
117 let send_result = send_handle.join();
118
119 Ok((recv_result, send_result))
120 }
121
122 fn recv_loop(&self) -> io::Result<()> {
123 let mut buf = [0u8; 256];
124
125 while self.main_thread_run.load(READ_ATOMIC_BOOL_ORDERING)
126 && self.server_thread_run.load(READ_ATOMIC_BOOL_ORDERING)
127 {
128 match self.socket.recv_from(&mut buf) {
129 Ok((msg_len, addr)) => {
130 self.process_received_message(&buf, msg_len, &addr, &self.socket);
131 }
132 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
133 self.prune_clients();
135 }
136 Err(e) if e.kind() == io::ErrorKind::Interrupted => {
137 log::debug!("UDP recv interrupted");
139 }
140 Err(e) => {
141 log::error!("UDP recv error: {:?}", e);
142 }
143 }
144 }
145
146 Ok(())
147 }
148
149 fn process_received_message(
150 &self,
151 buf: &[u8; 256],
152 msg_len: usize,
153 addr: &SocketAddr,
154 socket: &UdpSocket,
155 ) {
156 if msg_len < 20 {
157 log::trace!("Short packet ({} bytes) from {}", msg_len, addr);
158 return;
159 }
160
161 let magic = &buf[0..4];
162 if magic != b"DSUC" {
163 log::trace!("Ignoring non-DSUC packet from {}", addr);
164 return;
165 }
166
167 let client_id = u32::from_le_bytes([buf[12], buf[13], buf[14], buf[15]]);
168 let event_type = u32::from_le_bytes([buf[16], buf[17], buf[18], buf[19]]);
169
170 log::trace!(
171 "Received {} request from {} (id={})",
172 event_type_str(event_type),
173 addr,
174 client_id
175 );
176
177 match event_type {
178 VERSION_TYPE => handle_version_request(client_id, addr, socket),
179 INFO_TYPE => handle_info_request(buf, client_id, addr, socket, self.config.slot),
180 DATA_TYPE => handle_data_request(
181 buf,
182 msg_len,
183 client_id,
184 addr,
185 &self.clients,
186 self.config.slot,
187 ),
188 _ => {
189 log::trace!("Unhandled event type 0x{:06x} from {}", event_type, addr);
190 }
191 }
192 }
193
194 fn prune_clients(&self) {
195 let Ok(mut list) = self.clients.lock() else {
196 log::error!("Could not lock clients mutex... Skipping client prune.");
197 return;
198 };
199
200 let before = list.len();
201 list.retain(|c| c.last_seen.elapsed() < CLIENT_TIMEOUT);
202 let after = list.len();
203 if before != after {
204 log::info!(
205 "Pruned {} stale client(s), {} remaining",
206 before - after,
207 after
208 );
209 }
210 }
211
212 fn send_loop(context: SendThreadContext) {
213 let mut packet_buf = [0u8; 100];
214 let mut timestamp_us: u64 = 0;
215
216 loop {
217 if !context.main_thread_run.load(READ_ATOMIC_BOOL_ORDERING)
218 || !context.server_thread_run.load(READ_ATOMIC_BOOL_ORDERING)
219 {
220 break;
221 }
222
223 let frame = match context.rx.recv() {
224 Ok(f) => f,
225 Err(_) => {
226 log::debug!("Frame channel closed, send loop exiting");
227 context
228 .server_thread_run
229 .store(false, atomic::Ordering::SeqCst);
230 break;
231 }
232 };
233
234 let Ok(mut list) = context.clients.lock() else {
235 log::error!("Not sending data this frame, could not lock clients mutex.");
236 continue;
237 };
238
239 if list.is_empty() {
240 continue;
241 }
242
243 for client in list.iter_mut() {
244 client.packet_counter = client.packet_counter.wrapping_add(1);
245
246 dsu::write_data_event(
247 &mut packet_buf,
248 &frame,
249 client.packet_counter,
250 client.id,
251 client.slot,
252 timestamp_us,
253 context.config.invert_pitch,
254 );
255
256 log::trace!(
257 "Packet {} to {} (slot={}): accel=({:.3}, {:.3}, {:.3}) gyro=({:.1}, {:.1}, {:.1})",
258 client.packet_counter,
259 client.addr,
260 client.slot,
261 frame.accel_x,
262 frame.accel_y,
263 frame.accel_z,
264 frame.gyro_x,
265 frame.gyro_y,
266 frame.gyro_z
267 );
268
269 if let Err(e) = context.socket.send_to(&packet_buf, client.addr) {
270 log::trace!("Send error to {}: {}", client.addr, e);
271 }
272 }
273
274 timestamp_us = timestamp_us.wrapping_add(4000);
275 }
276 }
277}
278
279fn handle_version_request(client_id: u32, addr: &SocketAddr, socket: &UdpSocket) {
280 let mut version_buf = [0u8; 22];
281 dsu::write_version_response(&mut version_buf, client_id);
282 if let Err(e) = socket.send_to(&version_buf, addr) {
283 log::warn!("Failed to send version response to {}: {}", addr, e);
284 }
285}
286
287fn handle_info_request(
288 buf: &[u8; 256],
289 client_id: u32,
290 addr: &SocketAddr,
291 socket: &UdpSocket,
292 configured_slot: u8,
293) {
294 let mut info_buf = [0u8; 32];
295 let port_cnt = i32::from_le_bytes([buf[20], buf[21], buf[22], buf[23]]) as usize;
297 let requested = port_cnt.min(4);
298 for i in 0..requested {
299 let slot = buf[24 + i];
300 if slot == configured_slot {
301 dsu::write_info_response(&mut info_buf, slot, client_id, true);
302 if let Err(e) = socket.send_to(&info_buf, addr) {
303 log::warn!("Failed to send info response to {}: {}", addr, e);
304 }
305 break;
306 }
307 }
308}
309
310fn handle_data_request(
311 buf: &[u8; 256],
312 msg_len: usize,
313 client_id: u32,
314 addr: &SocketAddr,
315 clients: &Arc<Mutex<Vec<Client>>>,
316 configured_slot: u8,
317) {
318 let requested_slot = if msg_len > 21 { buf[21] } else { 0 };
321
322 if requested_slot != configured_slot {
323 log::trace!(
324 "Ignoring data request from {} for slot {} (configured slot is {})",
325 addr,
326 requested_slot,
327 configured_slot
328 );
329 return;
330 }
331
332 let Ok(mut list) = clients.lock() else {
333 log::error!("Not handling data request, could not lock clients mutex...");
334 return;
335 };
336
337 match list.iter_mut().find(|c| c.addr == *addr) {
338 Some(client) => {
339 client.last_seen = Instant::now();
340 client.id = client_id;
341 client.slot = requested_slot;
342 log::trace!("Updated existing client {} (slot={})", addr, requested_slot);
343 }
344 None => {
345 log::info!("New client subscribed: {} (slot={})", addr, requested_slot);
346 list.push(Client {
347 addr: *addr,
348 id: client_id,
349 slot: requested_slot,
350 last_seen: Instant::now(),
351 packet_counter: 0,
352 });
353 }
354 }
355}
356
357fn event_type_str(t: u32) -> &'static str {
358 match t {
359 VERSION_TYPE => "VERSION",
360 INFO_TYPE => "INFO",
361 DATA_TYPE => "DATA",
362 _ => "UNKNOWN",
363 }
364}