1use std::io;
4use std::net::{SocketAddr, UdpSocket};
5use std::sync::atomic;
6use std::sync::{Arc, Mutex, mpsc};
7use std::thread;
8use std::time::{Duration, Instant};
9
10use crate::dsu::DSUFrame;
11use crate::errors::ServerError;
12use crate::{READ_ATOMIC_BOOL_ORDERING, dsu};
13
14const CLIENT_TIMEOUT: Duration = Duration::from_secs(5);
15const VERSION_TYPE: u32 = 0x100000;
16const INFO_TYPE: u32 = 0x100001;
17const DATA_TYPE: u32 = 0x100002;
18
19#[derive(Debug, Clone)]
26pub struct ServerConfig {
27 pub bind_addr: String,
29 pub port: u16,
31 pub invert_pitch: bool,
33 pub slot: u8,
35 pub device_path: Option<String>,
37}
38
39#[derive(Debug)]
40struct Client {
41 addr: SocketAddr,
42 id: u32,
43 slot: u8,
44 last_seen: Instant,
45 packet_counter: u32,
46}
47
48pub struct Server {
50 running: Arc<atomic::AtomicBool>,
51 running_inner: Arc<atomic::AtomicBool>,
52 clients: Arc<Mutex<Vec<Client>>>,
53 config: ServerConfig,
54 socket: UdpSocket,
55}
56
57struct SendThreadContext {
59 pub running: Arc<atomic::AtomicBool>,
60 pub running_inner: Arc<atomic::AtomicBool>,
61 pub clients: Arc<Mutex<Vec<Client>>>,
62 pub config: ServerConfig,
63 pub socket: UdpSocket,
64 pub rx: mpsc::Receiver<DSUFrame>,
65}
66
67type ThreadResults = (
68 io::Result<()>,
69 Result<(), Box<dyn std::any::Any + Send + 'static>>,
70);
71
72impl Server {
73 pub fn new(
78 running: Arc<atomic::AtomicBool>,
79 config: ServerConfig,
80 ) -> Result<Self, ServerError> {
81 let addr = format!("{}:{}", config.bind_addr, config.port);
82
83 let socket = UdpSocket::bind(&addr).map_err(ServerError::UdpSocketOperationError)?;
84
85 socket
86 .set_read_timeout(Some(Duration::from_secs(1)))
87 .map_err(ServerError::UdpSocketOperationError)?;
88
89 log::info!("CemuHook server listening on {}", addr);
90
91 let clients: Arc<Mutex<Vec<Client>>> = Arc::new(Mutex::new(Vec::new()));
92 let running_inner = Arc::new(atomic::AtomicBool::new(true));
93
94 Ok(Self {
95 running,
96 running_inner,
97 clients,
98 config,
99 socket,
100 })
101 }
102
103 pub fn run(&self, rx: mpsc::Receiver<DSUFrame>) -> Result<ThreadResults, ServerError> {
107 let send_context = SendThreadContext {
108 running: self.running.clone(),
109 running_inner: self.running_inner.clone(),
110 clients: self.clients.clone(),
111 config: self.config.clone(),
112 socket: self
113 .socket
114 .try_clone()
115 .map_err(ServerError::UdpSocketCloneFailed)?,
116 rx,
117 };
118
119 let send_handle = thread::spawn(move || {
121 Self::send_loop(send_context);
122 });
123
124 let recv_result = self.recv_loop();
125 let send_result = send_handle.join();
126
127 Ok((recv_result, send_result))
128 }
129
130 fn recv_loop(&self) -> io::Result<()> {
131 let mut buf = [0u8; 256];
132
133 while self.running.load(READ_ATOMIC_BOOL_ORDERING)
134 && self.running_inner.load(READ_ATOMIC_BOOL_ORDERING)
135 {
136 match self.socket.recv_from(&mut buf) {
137 Ok((msg_len, addr)) => {
138 self.process_received_message(&buf, msg_len, &addr, &self.socket);
139 }
140 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
141 self.prune_clients();
143 }
144 Err(e) if e.kind() == io::ErrorKind::Interrupted => {
145 log::debug!("UDP recv interrupted");
147 }
148 Err(e) => {
149 if e.raw_os_error() != Some(10060) {
151 log::error!("UDP recv error: {:?}", e);
152 }
153 }
154 }
155 }
156
157 Ok(())
158 }
159
160 fn process_received_message(
161 &self,
162 buf: &[u8; 256],
163 msg_len: usize,
164 addr: &SocketAddr,
165 socket: &UdpSocket,
166 ) {
167 if msg_len < 20 {
168 log::trace!("Short packet ({} bytes) from {}", msg_len, addr);
169 return;
170 }
171
172 let magic = &buf[0..4];
173 if magic != b"DSUC" {
174 log::trace!("Ignoring non-DSUC packet from {}", addr);
175 return;
176 }
177
178 let client_id = u32::from_le_bytes([buf[12], buf[13], buf[14], buf[15]]);
179 let event_type = u32::from_le_bytes([buf[16], buf[17], buf[18], buf[19]]);
180
181 log::trace!(
182 "Received {} request from {} (id={})",
183 event_type_str(event_type),
184 addr,
185 client_id
186 );
187
188 match event_type {
189 VERSION_TYPE => handle_version_request(client_id, addr, socket),
190 INFO_TYPE => handle_info_request(buf, client_id, addr, socket, self.config.slot),
191 DATA_TYPE => handle_data_request(
192 buf,
193 msg_len,
194 client_id,
195 addr,
196 &self.clients,
197 self.config.slot,
198 ),
199 _ => {
200 log::trace!("Unhandled event type 0x{:06x} from {}", event_type, addr);
201 }
202 }
203 }
204
205 fn prune_clients(&self) {
206 let Ok(mut list) = self.clients.lock() else {
207 log::error!("Could not lock clients mutex... Skipping client prune.");
208 return;
209 };
210
211 let before = list.len();
212 list.retain(|c| c.last_seen.elapsed() < CLIENT_TIMEOUT);
213 let after = list.len();
214 if before != after {
215 log::info!(
216 "Pruned {} stale client(s), {} remaining",
217 before - after,
218 after
219 );
220 }
221 }
222
223 fn send_loop(context: SendThreadContext) {
224 let mut packet_buf = [0u8; 100];
225 let mut timestamp_us: u64 = 0;
226
227 loop {
228 if !context.running.load(READ_ATOMIC_BOOL_ORDERING)
229 || !context.running_inner.load(READ_ATOMIC_BOOL_ORDERING)
230 {
231 break;
232 }
233
234 let frame = match context.rx.recv() {
235 Ok(f) => f,
236 Err(_) => {
237 log::debug!("Reader's DSUFrame channel closed, send loop exiting.");
238 context.running_inner.store(false, atomic::Ordering::SeqCst);
239 break;
240 }
241 };
242
243 let Ok(mut list) = context.clients.lock() else {
244 log::error!("Not sending data this frame, could not lock clients mutex.");
245 continue;
246 };
247
248 if list.is_empty() {
249 continue;
250 }
251
252 for client in list.iter_mut() {
253 client.packet_counter = client.packet_counter.wrapping_add(1);
254
255 dsu::write_data_event(
256 &mut packet_buf,
257 &frame,
258 client.packet_counter,
259 client.id,
260 client.slot,
261 timestamp_us,
262 context.config.invert_pitch,
263 );
264
265 log::trace!(
266 "Packet {} to {} (slot={}): accel=({:.3}, {:.3}, {:.3}) gyro=({:.1}, {:.1}, {:.1})",
267 client.packet_counter,
268 client.addr,
269 client.slot,
270 frame.accel_x,
271 frame.accel_y,
272 frame.accel_z,
273 frame.gyro_x,
274 frame.gyro_y,
275 frame.gyro_z
276 );
277
278 if let Err(e) = context.socket.send_to(&packet_buf, client.addr) {
279 log::trace!("Send error to {}: {}", client.addr, e);
280 }
281 }
282
283 timestamp_us = timestamp_us.wrapping_add(4000);
284 }
285 }
286}
287
288fn handle_version_request(client_id: u32, addr: &SocketAddr, socket: &UdpSocket) {
289 let mut version_buf = [0u8; 22];
290 dsu::write_version_response(&mut version_buf, client_id);
291 if let Err(e) = socket.send_to(&version_buf, addr) {
292 log::warn!("Failed to send version response to {}: {}", addr, e);
293 }
294}
295
296fn handle_info_request(
297 buf: &[u8; 256],
298 client_id: u32,
299 addr: &SocketAddr,
300 socket: &UdpSocket,
301 configured_slot: u8,
302) {
303 let mut info_buf = [0u8; 32];
304 let port_cnt = i32::from_le_bytes([buf[20], buf[21], buf[22], buf[23]]) as usize;
306 let requested = port_cnt.min(4);
307 for i in 0..requested {
308 let slot = buf[24 + i];
309 if slot == configured_slot {
310 dsu::write_info_response(&mut info_buf, slot, client_id, true);
311 if let Err(e) = socket.send_to(&info_buf, addr) {
312 log::warn!("Failed to send info response to {}: {}", addr, e);
313 }
314 break;
315 }
316 }
317}
318
319fn handle_data_request(
320 buf: &[u8; 256],
321 msg_len: usize,
322 client_id: u32,
323 addr: &SocketAddr,
324 clients: &Arc<Mutex<Vec<Client>>>,
325 configured_slot: u8,
326) {
327 let requested_slot = if msg_len > 21 { buf[21] } else { 0 };
330
331 if requested_slot != configured_slot {
332 log::trace!(
333 "Ignoring data request from {} for slot {} (configured slot is {})",
334 addr,
335 requested_slot,
336 configured_slot
337 );
338 return;
339 }
340
341 let Ok(mut list) = clients.lock() else {
342 log::error!("Not handling data request, could not lock clients mutex...");
343 return;
344 };
345
346 match list.iter_mut().find(|c| c.addr == *addr) {
347 Some(client) => {
348 client.last_seen = Instant::now();
349 client.id = client_id;
350 client.slot = requested_slot;
351 log::trace!("Updated existing client {} (slot={})", addr, requested_slot);
352 }
353 None => {
354 log::info!("New client subscribed: {} (slot={})", addr, requested_slot);
355 list.push(Client {
356 addr: *addr,
357 id: client_id,
358 slot: requested_slot,
359 last_seen: Instant::now(),
360 packet_counter: 0,
361 });
362 }
363 }
364}
365
366fn event_type_str(t: u32) -> &'static str {
367 match t {
368 VERSION_TYPE => "VERSION",
369 INFO_TYPE => "INFO",
370 DATA_TYPE => "DATA",
371 _ => "UNKNOWN",
372 }
373}