1use std::io;
4use std::net::{SocketAddr, UdpSocket};
5use std::sync::atomic;
6use std::sync::{Arc, Mutex, mpsc};
7use std::thread;
8use std::time::{Duration, Instant};
9
10use crate::dsu::DSUFrame;
11use crate::errors::ServerError;
12use crate::{READ_ATOMIC_BOOL_ORDERING, dsu};
13
14const CLIENT_TIMEOUT: Duration = Duration::from_secs(5);
15const VERSION_TYPE: u32 = 0x100000;
16const INFO_TYPE: u32 = 0x100001;
17const DATA_TYPE: u32 = 0x100002;
18
19#[derive(Debug, Clone)]
21pub struct ServerConfig {
22 pub bind_addr: String,
24 pub port: u16,
26 pub invert_pitch: bool,
28 pub slot: u8,
30 pub device_path: Option<String>,
32}
33
34#[derive(Debug)]
35struct Client {
36 addr: SocketAddr,
37 id: u32,
38 slot: u8,
39 last_seen: Instant,
40 packet_counter: u32,
41}
42
43pub struct Server {
45 running: Arc<atomic::AtomicBool>,
46 running_inner: Arc<atomic::AtomicBool>,
47 clients: Arc<Mutex<Vec<Client>>>,
48 config: ServerConfig,
49 socket: UdpSocket,
50}
51
52struct SendThreadContext {
54 pub running: Arc<atomic::AtomicBool>,
55 pub running_inner: Arc<atomic::AtomicBool>,
56 pub clients: Arc<Mutex<Vec<Client>>>,
57 pub config: ServerConfig,
58 pub socket: UdpSocket,
59 pub rx: mpsc::Receiver<DSUFrame>,
60}
61
62type ThreadResults = (
63 io::Result<()>,
64 Result<(), Box<dyn std::any::Any + Send + 'static>>,
65);
66
67impl Server {
68 pub fn new(
73 running: Arc<atomic::AtomicBool>,
74 config: ServerConfig,
75 ) -> Result<Self, ServerError> {
76 let addr = format!("{}:{}", config.bind_addr, config.port);
77
78 let socket = UdpSocket::bind(&addr).map_err(ServerError::UdpSocketOperationError)?;
79
80 socket
81 .set_read_timeout(Some(Duration::from_secs(1)))
82 .map_err(ServerError::UdpSocketOperationError)?;
83
84 log::info!("CemuHook server listening on {}", addr);
85
86 let clients: Arc<Mutex<Vec<Client>>> = Arc::new(Mutex::new(Vec::new()));
87 let running_inner = Arc::new(atomic::AtomicBool::new(true));
88
89 Ok(Self {
90 running,
91 running_inner,
92 clients,
93 config,
94 socket,
95 })
96 }
97
98 pub fn run(&self, rx: mpsc::Receiver<DSUFrame>) -> Result<ThreadResults, ServerError> {
102 let send_context = SendThreadContext {
103 running: self.running.clone(),
104 running_inner: self.running_inner.clone(),
105 clients: self.clients.clone(),
106 config: self.config.clone(),
107 socket: self
108 .socket
109 .try_clone()
110 .map_err(ServerError::UdpSocketCloneFailed)?,
111 rx,
112 };
113
114 let send_handle = thread::spawn(move || {
116 Self::send_loop(send_context);
117 });
118
119 let recv_result = self.recv_loop();
120 let send_result = send_handle.join();
121
122 Ok((recv_result, send_result))
123 }
124
125 fn recv_loop(&self) -> io::Result<()> {
126 let mut buf = [0u8; 256];
127
128 while self.running.load(READ_ATOMIC_BOOL_ORDERING)
129 && self.running_inner.load(READ_ATOMIC_BOOL_ORDERING)
130 {
131 match self.socket.recv_from(&mut buf) {
132 Ok((msg_len, addr)) => {
133 self.process_received_message(&buf, msg_len, &addr, &self.socket);
134 }
135 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
136 self.prune_clients();
138 }
139 Err(e) if e.kind() == io::ErrorKind::Interrupted => {
140 log::debug!("UDP recv interrupted");
142 }
143 Err(e) => {
144 log::error!("UDP recv error: {:?}", e);
145 }
146 }
147 }
148
149 Ok(())
150 }
151
152 fn process_received_message(
153 &self,
154 buf: &[u8; 256],
155 msg_len: usize,
156 addr: &SocketAddr,
157 socket: &UdpSocket,
158 ) {
159 if msg_len < 20 {
160 log::trace!("Short packet ({} bytes) from {}", msg_len, addr);
161 return;
162 }
163
164 let magic = &buf[0..4];
165 if magic != b"DSUC" {
166 log::trace!("Ignoring non-DSUC packet from {}", addr);
167 return;
168 }
169
170 let client_id = u32::from_le_bytes([buf[12], buf[13], buf[14], buf[15]]);
171 let event_type = u32::from_le_bytes([buf[16], buf[17], buf[18], buf[19]]);
172
173 log::trace!(
174 "Received {} request from {} (id={})",
175 event_type_str(event_type),
176 addr,
177 client_id
178 );
179
180 match event_type {
181 VERSION_TYPE => handle_version_request(client_id, addr, socket),
182 INFO_TYPE => handle_info_request(buf, client_id, addr, socket, self.config.slot),
183 DATA_TYPE => handle_data_request(
184 buf,
185 msg_len,
186 client_id,
187 addr,
188 &self.clients,
189 self.config.slot,
190 ),
191 _ => {
192 log::trace!("Unhandled event type 0x{:06x} from {}", event_type, addr);
193 }
194 }
195 }
196
197 fn prune_clients(&self) {
198 let Ok(mut list) = self.clients.lock() else {
199 log::error!("Could not lock clients mutex... Skipping client prune.");
200 return;
201 };
202
203 let before = list.len();
204 list.retain(|c| c.last_seen.elapsed() < CLIENT_TIMEOUT);
205 let after = list.len();
206 if before != after {
207 log::info!(
208 "Pruned {} stale client(s), {} remaining",
209 before - after,
210 after
211 );
212 }
213 }
214
215 fn send_loop(context: SendThreadContext) {
216 let mut packet_buf = [0u8; 100];
217 let mut timestamp_us: u64 = 0;
218
219 loop {
220 if !context.running.load(READ_ATOMIC_BOOL_ORDERING)
221 || !context.running_inner.load(READ_ATOMIC_BOOL_ORDERING)
222 {
223 break;
224 }
225
226 let frame = match context.rx.recv() {
227 Ok(f) => f,
228 Err(_) => {
229 log::debug!("Reader's DSUFrame channel closed, send loop exiting.");
230 context.running_inner.store(false, atomic::Ordering::SeqCst);
231 break;
232 }
233 };
234
235 let Ok(mut list) = context.clients.lock() else {
236 log::error!("Not sending data this frame, could not lock clients mutex.");
237 continue;
238 };
239
240 if list.is_empty() {
241 continue;
242 }
243
244 for client in list.iter_mut() {
245 client.packet_counter = client.packet_counter.wrapping_add(1);
246
247 dsu::write_data_event(
248 &mut packet_buf,
249 &frame,
250 client.packet_counter,
251 client.id,
252 client.slot,
253 timestamp_us,
254 context.config.invert_pitch,
255 );
256
257 log::trace!(
258 "Packet {} to {} (slot={}): accel=({:.3}, {:.3}, {:.3}) gyro=({:.1}, {:.1}, {:.1})",
259 client.packet_counter,
260 client.addr,
261 client.slot,
262 frame.accel_x,
263 frame.accel_y,
264 frame.accel_z,
265 frame.gyro_x,
266 frame.gyro_y,
267 frame.gyro_z
268 );
269
270 if let Err(e) = context.socket.send_to(&packet_buf, client.addr) {
271 log::trace!("Send error to {}: {}", client.addr, e);
272 }
273 }
274
275 timestamp_us = timestamp_us.wrapping_add(4000);
276 }
277 }
278}
279
280fn handle_version_request(client_id: u32, addr: &SocketAddr, socket: &UdpSocket) {
281 let mut version_buf = [0u8; 22];
282 dsu::write_version_response(&mut version_buf, client_id);
283 if let Err(e) = socket.send_to(&version_buf, addr) {
284 log::warn!("Failed to send version response to {}: {}", addr, e);
285 }
286}
287
288fn handle_info_request(
289 buf: &[u8; 256],
290 client_id: u32,
291 addr: &SocketAddr,
292 socket: &UdpSocket,
293 configured_slot: u8,
294) {
295 let mut info_buf = [0u8; 32];
296 let port_cnt = i32::from_le_bytes([buf[20], buf[21], buf[22], buf[23]]) as usize;
298 let requested = port_cnt.min(4);
299 for i in 0..requested {
300 let slot = buf[24 + i];
301 if slot == configured_slot {
302 dsu::write_info_response(&mut info_buf, slot, client_id, true);
303 if let Err(e) = socket.send_to(&info_buf, addr) {
304 log::warn!("Failed to send info response to {}: {}", addr, e);
305 }
306 break;
307 }
308 }
309}
310
311fn handle_data_request(
312 buf: &[u8; 256],
313 msg_len: usize,
314 client_id: u32,
315 addr: &SocketAddr,
316 clients: &Arc<Mutex<Vec<Client>>>,
317 configured_slot: u8,
318) {
319 let requested_slot = if msg_len > 21 { buf[21] } else { 0 };
322
323 if requested_slot != configured_slot {
324 log::trace!(
325 "Ignoring data request from {} for slot {} (configured slot is {})",
326 addr,
327 requested_slot,
328 configured_slot
329 );
330 return;
331 }
332
333 let Ok(mut list) = clients.lock() else {
334 log::error!("Not handling data request, could not lock clients mutex...");
335 return;
336 };
337
338 match list.iter_mut().find(|c| c.addr == *addr) {
339 Some(client) => {
340 client.last_seen = Instant::now();
341 client.id = client_id;
342 client.slot = requested_slot;
343 log::trace!("Updated existing client {} (slot={})", addr, requested_slot);
344 }
345 None => {
346 log::info!("New client subscribed: {} (slot={})", addr, requested_slot);
347 list.push(Client {
348 addr: *addr,
349 id: client_id,
350 slot: requested_slot,
351 last_seen: Instant::now(),
352 packet_counter: 0,
353 });
354 }
355 }
356}
357
358fn event_type_str(t: u32) -> &'static str {
359 match t {
360 VERSION_TYPE => "VERSION",
361 INFO_TYPE => "INFO",
362 DATA_TYPE => "DATA",
363 _ => "UNKNOWN",
364 }
365}