Skip to main content

scdsu_core/
server.rs

1//! Provides the CemuHook (DSU) UDP server implementation.
2
3use std::io;
4use std::net::{SocketAddr, UdpSocket};
5use std::sync::atomic;
6use std::sync::{Arc, Mutex, mpsc};
7use std::thread;
8use std::time::{Duration, Instant};
9
10use crate::dsu::DSUFrame;
11use crate::errors::ServerError;
12use crate::{READ_ATOMIC_BOOL_ORDERING, dsu};
13
14const CLIENT_TIMEOUT: Duration = Duration::from_secs(5);
15const VERSION_TYPE: u32 = 0x100000;
16const INFO_TYPE: u32 = 0x100001;
17const DATA_TYPE: u32 = 0x100002;
18
19/// CemuHook server configuration.
20#[derive(Debug, Clone)]
21pub struct ServerConfig {
22    /// Address or host to bind to
23    pub bind_addr: String,
24    /// Port to listen on
25    pub port: u16,
26    /// Invert the yaxis values on the gyro and accelerometer
27    pub invert_pitch: bool,
28    /// CemuHook controller slot to report on (0-3)
29    pub slot: u8,
30    /// Specific HID device path to use instead of auto-detecting
31    pub device_path: Option<String>,
32}
33
34#[derive(Debug)]
35struct Client {
36    addr: SocketAddr,
37    id: u32,
38    slot: u8,
39    last_seen: Instant,
40    packet_counter: u32,
41}
42
43/// CemuHook UDP Server
44pub struct Server {
45    running: Arc<atomic::AtomicBool>,
46    running_inner: Arc<atomic::AtomicBool>,
47    clients: Arc<Mutex<Vec<Client>>>,
48    config: ServerConfig,
49    socket: UdpSocket,
50}
51
52/// CemuHook UDP Server Send thread context
53struct SendThreadContext {
54    pub running: Arc<atomic::AtomicBool>,
55    pub running_inner: Arc<atomic::AtomicBool>,
56    pub clients: Arc<Mutex<Vec<Client>>>,
57    pub config: ServerConfig,
58    pub socket: UdpSocket,
59    pub rx: mpsc::Receiver<DSUFrame>,
60}
61
62type ThreadResults = (
63    io::Result<()>,
64    Result<(), Box<dyn std::any::Any + Send + 'static>>,
65);
66
67impl Server {
68    /// Attempt to create a new `Server`.
69    ///
70    /// The first argument is an [`AtomicBool`](std::sync::atomic::AtomicBool) within an `Arc<>`
71    /// that, when `false`, signals that the server should be shut down.
72    pub fn new(
73        running: Arc<atomic::AtomicBool>,
74        config: ServerConfig,
75    ) -> Result<Self, ServerError> {
76        let addr = format!("{}:{}", config.bind_addr, config.port);
77
78        let socket = UdpSocket::bind(&addr).map_err(ServerError::UdpSocketOperationError)?;
79
80        socket
81            .set_read_timeout(Some(Duration::from_secs(1)))
82            .map_err(ServerError::UdpSocketOperationError)?;
83
84        log::info!("CemuHook server listening on {}", addr);
85
86        let clients: Arc<Mutex<Vec<Client>>> = Arc::new(Mutex::new(Vec::new()));
87        let running_inner = Arc::new(atomic::AtomicBool::new(true));
88
89        Ok(Self {
90            running,
91            running_inner,
92            clients,
93            config,
94            socket,
95        })
96    }
97
98    /// Start the CemuHook UDP server and broadcast frames received on `rx` to all subscribed CemuHook clients
99    /// Blocks until both the Receving loop (this thread) and Send loop (background thread) complete
100    /// Returns both results on Success, Err(ServerError) if the server failed to start
101    pub fn run(&self, rx: mpsc::Receiver<DSUFrame>) -> Result<ThreadResults, ServerError> {
102        let send_context = SendThreadContext {
103            running: self.running.clone(),
104            running_inner: self.running_inner.clone(),
105            clients: self.clients.clone(),
106            config: self.config.clone(),
107            socket: self
108                .socket
109                .try_clone()
110                .map_err(ServerError::UdpSocketCloneFailed)?,
111            rx,
112        };
113
114        // Spawn the send thread and store the handle
115        let send_handle = thread::spawn(move || {
116            Self::send_loop(send_context);
117        });
118
119        let recv_result = self.recv_loop();
120        let send_result = send_handle.join();
121
122        Ok((recv_result, send_result))
123    }
124
125    fn recv_loop(&self) -> io::Result<()> {
126        let mut buf = [0u8; 256];
127
128        while self.running.load(READ_ATOMIC_BOOL_ORDERING)
129            && self.running_inner.load(READ_ATOMIC_BOOL_ORDERING)
130        {
131            match self.socket.recv_from(&mut buf) {
132                Ok((msg_len, addr)) => {
133                    self.process_received_message(&buf, msg_len, &addr, &self.socket);
134                }
135                Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
136                    // Read timeout
137                    self.prune_clients();
138                }
139                Err(e) if e.kind() == io::ErrorKind::Interrupted => {
140                    // Probable SIGINT
141                    log::debug!("UDP recv interrupted");
142                }
143                Err(e) => {
144                    log::error!("UDP recv error: {:?}", e);
145                }
146            }
147        }
148
149        Ok(())
150    }
151
152    fn process_received_message(
153        &self,
154        buf: &[u8; 256],
155        msg_len: usize,
156        addr: &SocketAddr,
157        socket: &UdpSocket,
158    ) {
159        if msg_len < 20 {
160            log::trace!("Short packet ({} bytes) from {}", msg_len, addr);
161            return;
162        }
163
164        let magic = &buf[0..4];
165        if magic != b"DSUC" {
166            log::trace!("Ignoring non-DSUC packet from {}", addr);
167            return;
168        }
169
170        let client_id = u32::from_le_bytes([buf[12], buf[13], buf[14], buf[15]]);
171        let event_type = u32::from_le_bytes([buf[16], buf[17], buf[18], buf[19]]);
172
173        log::trace!(
174            "Received {} request from {} (id={})",
175            event_type_str(event_type),
176            addr,
177            client_id
178        );
179
180        match event_type {
181            VERSION_TYPE => handle_version_request(client_id, addr, socket),
182            INFO_TYPE => handle_info_request(buf, client_id, addr, socket, self.config.slot),
183            DATA_TYPE => handle_data_request(
184                buf,
185                msg_len,
186                client_id,
187                addr,
188                &self.clients,
189                self.config.slot,
190            ),
191            _ => {
192                log::trace!("Unhandled event type 0x{:06x} from {}", event_type, addr);
193            }
194        }
195    }
196
197    fn prune_clients(&self) {
198        let Ok(mut list) = self.clients.lock() else {
199            log::error!("Could not lock clients mutex... Skipping client prune.");
200            return;
201        };
202
203        let before = list.len();
204        list.retain(|c| c.last_seen.elapsed() < CLIENT_TIMEOUT);
205        let after = list.len();
206        if before != after {
207            log::info!(
208                "Pruned {} stale client(s), {} remaining",
209                before - after,
210                after
211            );
212        }
213    }
214
215    fn send_loop(context: SendThreadContext) {
216        let mut packet_buf = [0u8; 100];
217        let mut timestamp_us: u64 = 0;
218
219        loop {
220            if !context.running.load(READ_ATOMIC_BOOL_ORDERING)
221                || !context.running_inner.load(READ_ATOMIC_BOOL_ORDERING)
222            {
223                break;
224            }
225
226            let frame = match context.rx.recv() {
227                Ok(f) => f,
228                Err(_) => {
229                    log::debug!("Reader's DSUFrame channel closed, send loop exiting.");
230                    context.running_inner.store(false, atomic::Ordering::SeqCst);
231                    break;
232                }
233            };
234
235            let Ok(mut list) = context.clients.lock() else {
236                log::error!("Not sending data this frame, could not lock clients mutex.");
237                continue;
238            };
239
240            if list.is_empty() {
241                continue;
242            }
243
244            for client in list.iter_mut() {
245                client.packet_counter = client.packet_counter.wrapping_add(1);
246
247                dsu::write_data_event(
248                    &mut packet_buf,
249                    &frame,
250                    client.packet_counter,
251                    client.id,
252                    client.slot,
253                    timestamp_us,
254                    context.config.invert_pitch,
255                );
256
257                log::trace!(
258                    "Packet {} to {} (slot={}): accel=({:.3}, {:.3}, {:.3}) gyro=({:.1}, {:.1}, {:.1})",
259                    client.packet_counter,
260                    client.addr,
261                    client.slot,
262                    frame.accel_x,
263                    frame.accel_y,
264                    frame.accel_z,
265                    frame.gyro_x,
266                    frame.gyro_y,
267                    frame.gyro_z
268                );
269
270                if let Err(e) = context.socket.send_to(&packet_buf, client.addr) {
271                    log::trace!("Send error to {}: {}", client.addr, e);
272                }
273            }
274
275            timestamp_us = timestamp_us.wrapping_add(4000);
276        }
277    }
278}
279
280fn handle_version_request(client_id: u32, addr: &SocketAddr, socket: &UdpSocket) {
281    let mut version_buf = [0u8; 22];
282    dsu::write_version_response(&mut version_buf, client_id);
283    if let Err(e) = socket.send_to(&version_buf, addr) {
284        log::warn!("Failed to send version response to {}: {}", addr, e);
285    }
286}
287
288fn handle_info_request(
289    buf: &[u8; 256],
290    client_id: u32,
291    addr: &SocketAddr,
292    socket: &UdpSocket,
293    configured_slot: u8,
294) {
295    let mut info_buf = [0u8; 32];
296    // Parse requested slots.
297    let port_cnt = i32::from_le_bytes([buf[20], buf[21], buf[22], buf[23]]) as usize;
298    let requested = port_cnt.min(4);
299    for i in 0..requested {
300        let slot = buf[24 + i];
301        if slot == configured_slot {
302            dsu::write_info_response(&mut info_buf, slot, client_id, true);
303            if let Err(e) = socket.send_to(&info_buf, addr) {
304                log::warn!("Failed to send info response to {}: {}", addr, e);
305            }
306            break;
307        }
308    }
309}
310
311fn handle_data_request(
312    buf: &[u8; 256],
313    msg_len: usize,
314    client_id: u32,
315    addr: &SocketAddr,
316    clients: &Arc<Mutex<Vec<Client>>>,
317    configured_slot: u8,
318) {
319    // Parse requested slot from payload.
320    // CemuHook DATA request: byte 20 = flags, byte 21 = slot.
321    let requested_slot = if msg_len > 21 { buf[21] } else { 0 };
322
323    if requested_slot != configured_slot {
324        log::trace!(
325            "Ignoring data request from {} for slot {} (configured slot is {})",
326            addr,
327            requested_slot,
328            configured_slot
329        );
330        return;
331    }
332
333    let Ok(mut list) = clients.lock() else {
334        log::error!("Not handling data request, could not lock clients mutex...");
335        return;
336    };
337
338    match list.iter_mut().find(|c| c.addr == *addr) {
339        Some(client) => {
340            client.last_seen = Instant::now();
341            client.id = client_id;
342            client.slot = requested_slot;
343            log::trace!("Updated existing client {} (slot={})", addr, requested_slot);
344        }
345        None => {
346            log::info!("New client subscribed: {} (slot={})", addr, requested_slot);
347            list.push(Client {
348                addr: *addr,
349                id: client_id,
350                slot: requested_slot,
351                last_seen: Instant::now(),
352                packet_counter: 0,
353            });
354        }
355    }
356}
357
358fn event_type_str(t: u32) -> &'static str {
359    match t {
360        VERSION_TYPE => "VERSION",
361        INFO_TYPE => "INFO",
362        DATA_TYPE => "DATA",
363        _ => "UNKNOWN",
364    }
365}