1use std::io;
2use std::net::{SocketAddr, UdpSocket};
3use std::sync::atomic;
4use std::sync::{Arc, Mutex, mpsc};
5use std::thread;
6use std::time::{Duration, Instant};
7
8use crate::dsu::DSUFrame;
9use crate::errors::ServerError;
10use crate::{READ_ATOMIC_BOOL_ORDERING, dsu};
11
12const CLIENT_TIMEOUT: Duration = Duration::from_secs(5);
13const VERSION_TYPE: u32 = 0x100000;
14const INFO_TYPE: u32 = 0x100001;
15const DATA_TYPE: u32 = 0x100002;
16
17#[derive(Debug, Clone)]
18pub struct ServerConfig {
19 pub bind_addr: String,
21 pub port: u16,
23 pub invert_pitch: bool,
25 pub slot: u8,
27}
28#[derive(Debug)]
29struct Client {
30 addr: SocketAddr,
31 id: u32,
32 slot: u8,
33 last_seen: Instant,
34 packet_counter: u32,
35}
36
37pub struct Server {
39 main_thread_running: Arc<atomic::AtomicBool>,
40 server_thread_running: Arc<atomic::AtomicBool>,
41 clients: Arc<Mutex<Vec<Client>>>,
42 config: ServerConfig,
43 socket: UdpSocket,
44}
45
46struct SendThreadContext {
48 pub main_thread_running: Arc<atomic::AtomicBool>,
49 pub server_thread_running: Arc<atomic::AtomicBool>,
50 pub clients: Arc<Mutex<Vec<Client>>>,
51 pub config: ServerConfig,
52 pub socket: UdpSocket,
53 pub rx: mpsc::Receiver<DSUFrame>,
54}
55
56type ThreadResults = (
57 io::Result<()>,
58 Result<(), Box<dyn std::any::Any + Send + 'static>>,
59);
60
61impl Server {
62 pub fn new(
64 main_thread_running: Arc<atomic::AtomicBool>,
65 config: ServerConfig,
66 ) -> Result<Self, ServerError> {
67 let addr = format!("{}:{}", config.bind_addr, config.port);
68
69 let socket = UdpSocket::bind(&addr).map_err(ServerError::UdpSocketOperationError)?;
70
71 socket
72 .set_read_timeout(Some(Duration::from_secs(1)))
73 .map_err(ServerError::UdpSocketOperationError)?;
74
75 log::info!("CemuHook server listening on {}", addr);
76
77 let clients: Arc<Mutex<Vec<Client>>> = Arc::new(Mutex::new(Vec::new()));
78 let server_thread_running = Arc::new(atomic::AtomicBool::new(true));
79
80 Ok(Self {
81 main_thread_running,
82 server_thread_running,
83 clients,
84 config,
85 socket,
86 })
87 }
88
89 pub fn run(&self, rx: mpsc::Receiver<DSUFrame>) -> Result<ThreadResults, ServerError> {
93 let send_context = SendThreadContext {
94 main_thread_running: self.main_thread_running.clone(),
95 server_thread_running: self.server_thread_running.clone(),
96 clients: self.clients.clone(),
97 config: self.config.clone(),
98 socket: self
99 .socket
100 .try_clone()
101 .map_err(ServerError::UdpSocketCloneFailed)?,
102 rx,
103 };
104
105 let send_handle = thread::spawn(move || {
107 Self::send_loop(send_context);
108 });
109
110 let recv_result = self.recv_loop();
111 let send_result = send_handle.join();
112
113 Ok((recv_result, send_result))
114 }
115
116 fn recv_loop(&self) -> io::Result<()> {
117 let mut buf = [0u8; 256];
118
119 while self.main_thread_running.load(READ_ATOMIC_BOOL_ORDERING)
120 && self.server_thread_running.load(READ_ATOMIC_BOOL_ORDERING)
121 {
122 match self.socket.recv_from(&mut buf) {
123 Ok((msg_len, addr)) => {
124 self.process_received_message(&buf, msg_len, &addr, &self.socket);
125 }
126 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
127 self.prune_clients();
129 }
130 Err(e) if e.kind() == io::ErrorKind::Interrupted => {
131 log::debug!("UDP recv interrupted");
133 }
134 Err(e) => {
135 log::error!("UDP recv error: {:?}", e);
136 }
137 }
138 }
139
140 Ok(())
141 }
142
143 fn process_received_message(
144 &self,
145 buf: &[u8; 256],
146 msg_len: usize,
147 addr: &SocketAddr,
148 socket: &UdpSocket,
149 ) {
150 if msg_len < 20 {
151 log::trace!("Short packet ({} bytes) from {}", msg_len, addr);
152 return;
153 }
154
155 let magic = &buf[0..4];
156 if magic != b"DSUC" {
157 log::trace!("Ignoring non-DSUC packet from {}", addr);
158 return;
159 }
160
161 let client_id = u32::from_le_bytes([buf[12], buf[13], buf[14], buf[15]]);
162 let event_type = u32::from_le_bytes([buf[16], buf[17], buf[18], buf[19]]);
163
164 log::trace!(
165 "Received {} request from {} (id={})",
166 event_type_str(event_type),
167 addr,
168 client_id
169 );
170
171 match event_type {
172 VERSION_TYPE => handle_version_request(client_id, addr, socket),
173 INFO_TYPE => handle_info_request(buf, client_id, addr, socket, self.config.slot),
174 DATA_TYPE => handle_data_request(
175 buf,
176 msg_len,
177 client_id,
178 addr,
179 &self.clients,
180 self.config.slot,
181 ),
182 _ => {
183 log::trace!("Unhandled event type 0x{:06x} from {}", event_type, addr);
184 }
185 }
186 }
187
188 fn prune_clients(&self) {
189 let Ok(mut list) = self.clients.lock() else {
190 log::error!("Could not lock clients mutex... Skipping client prune.");
191 return;
192 };
193
194 let before = list.len();
195 list.retain(|c| c.last_seen.elapsed() < CLIENT_TIMEOUT);
196 let after = list.len();
197 if before != after {
198 log::info!(
199 "Pruned {} stale client(s), {} remaining",
200 before - after,
201 after
202 );
203 }
204 }
205
206 fn send_loop(context: SendThreadContext) {
207 let mut packet_buf = [0u8; 100];
208 let mut timestamp_us: u64 = 0;
209
210 loop {
211 if !context.main_thread_running.load(READ_ATOMIC_BOOL_ORDERING)
212 || !context
213 .server_thread_running
214 .load(READ_ATOMIC_BOOL_ORDERING)
215 {
216 break;
217 }
218
219 let frame = match context.rx.recv() {
220 Ok(f) => f,
221 Err(_) => {
222 log::debug!("Frame channel closed, send loop exiting");
223 context
224 .server_thread_running
225 .store(false, atomic::Ordering::SeqCst);
226 break;
227 }
228 };
229
230 let Ok(mut list) = context.clients.lock() else {
231 log::error!("Not sending data this frame, could not lock clients mutex.");
232 continue;
233 };
234
235 if list.is_empty() {
236 continue;
237 }
238
239 for client in list.iter_mut() {
240 client.packet_counter = client.packet_counter.wrapping_add(1);
241
242 dsu::write_data_event(
243 &mut packet_buf,
244 &frame,
245 client.packet_counter,
246 client.id,
247 client.slot,
248 timestamp_us,
249 context.config.invert_pitch,
250 );
251
252 log::trace!(
253 "Packet {} to {} (slot={}): accel=({:.3}, {:.3}, {:.3}) gyro=({:.1}, {:.1}, {:.1})",
254 client.packet_counter,
255 client.addr,
256 client.slot,
257 frame.accel_x,
258 frame.accel_y,
259 frame.accel_z,
260 frame.gyro_x,
261 frame.gyro_y,
262 frame.gyro_z
263 );
264
265 if let Err(e) = context.socket.send_to(&packet_buf, client.addr) {
266 log::trace!("Send error to {}: {}", client.addr, e);
267 }
268 }
269
270 timestamp_us = timestamp_us.wrapping_add(4000);
271 }
272 }
273}
274
275fn handle_version_request(client_id: u32, addr: &SocketAddr, socket: &UdpSocket) {
276 let mut version_buf = [0u8; 22];
277 dsu::write_version_response(&mut version_buf, client_id);
278 if let Err(e) = socket.send_to(&version_buf, addr) {
279 log::warn!("Failed to send version response to {}: {}", addr, e);
280 }
281}
282
283fn handle_info_request(
284 buf: &[u8; 256],
285 client_id: u32,
286 addr: &SocketAddr,
287 socket: &UdpSocket,
288 configured_slot: u8,
289) {
290 let mut info_buf = [0u8; 32];
291 let port_cnt = i32::from_le_bytes([buf[20], buf[21], buf[22], buf[23]]) as usize;
293 let requested = port_cnt.min(4);
294 for i in 0..requested {
295 let slot = buf[24 + i];
296 if slot == configured_slot {
297 dsu::write_info_response(&mut info_buf, slot, client_id, true);
298 if let Err(e) = socket.send_to(&info_buf, addr) {
299 log::warn!("Failed to send info response to {}: {}", addr, e);
300 }
301 break;
302 }
303 }
304}
305
306fn handle_data_request(
307 buf: &[u8; 256],
308 msg_len: usize,
309 client_id: u32,
310 addr: &SocketAddr,
311 clients: &Arc<Mutex<Vec<Client>>>,
312 configured_slot: u8,
313) {
314 let requested_slot = if msg_len > 21 { buf[21] } else { 0 };
317
318 if requested_slot != configured_slot {
319 log::trace!(
320 "Ignoring data request from {} for slot {} (configured slot is {})",
321 addr,
322 requested_slot,
323 configured_slot
324 );
325 return;
326 }
327
328 let Ok(mut list) = clients.lock() else {
329 log::error!("Not handling data request, could not lock clients mutex...");
330 return;
331 };
332
333 match list.iter_mut().find(|c| c.addr == *addr) {
334 Some(client) => {
335 client.last_seen = Instant::now();
336 client.id = client_id;
337 client.slot = requested_slot;
338 log::trace!("Updated existing client {} (slot={})", addr, requested_slot);
339 }
340 None => {
341 log::info!("New client subscribed: {} (slot={})", addr, requested_slot);
342 list.push(Client {
343 addr: *addr,
344 id: client_id,
345 slot: requested_slot,
346 last_seen: Instant::now(),
347 packet_counter: 0,
348 });
349 }
350 }
351}
352
353fn event_type_str(t: u32) -> &'static str {
354 match t {
355 VERSION_TYPE => "VERSION",
356 INFO_TYPE => "INFO",
357 DATA_TYPE => "DATA",
358 _ => "UNKNOWN",
359 }
360}