Skip to main content

scdsu_core/
server.rs

1//! Provides the CemuHook (DSU) UDP server implementation.
2
3use std::io;
4use std::net::{SocketAddr, UdpSocket};
5use std::sync::atomic;
6use std::sync::{Arc, Mutex, mpsc};
7use std::thread;
8use std::time::{Duration, Instant};
9
10use crate::dsu::DSUFrame;
11use crate::errors::ServerError;
12use crate::{READ_ATOMIC_BOOL_ORDERING, dsu};
13
14const CLIENT_TIMEOUT: Duration = Duration::from_secs(5);
15const VERSION_TYPE: u32 = 0x100000;
16const INFO_TYPE: u32 = 0x100001;
17const DATA_TYPE: u32 = 0x100002;
18
19/// DSU server configuration.
20///
21/// Defines configurable behavior within device DSU server.
22///
23/// For configuration affecting the device behavior,
24/// see ['DeviceConfig'](crate::devices::DeviceConfig)
25#[derive(Debug, Clone)]
26pub struct ServerConfig {
27    /// Address or host to bind to
28    pub bind_addr: String,
29    /// Port to listen on
30    pub port: u16,
31    /// Invert the yaxis values on the gyro and accelerometer
32    pub invert_pitch: bool,
33    /// CemuHook controller slot to report on (0-3)
34    pub slot: u8,
35    /// Specific HID device path to use instead of auto-detecting
36    pub device_path: Option<String>,
37}
38
39#[derive(Debug)]
40struct Client {
41    addr: SocketAddr,
42    id: u32,
43    slot: u8,
44    last_seen: Instant,
45    packet_counter: u32,
46}
47
48/// CemuHook UDP Server
49pub struct Server {
50    running: Arc<atomic::AtomicBool>,
51    running_inner: Arc<atomic::AtomicBool>,
52    clients: Arc<Mutex<Vec<Client>>>,
53    config: ServerConfig,
54    socket: UdpSocket,
55}
56
57/// CemuHook UDP Server Send thread context
58struct SendThreadContext {
59    pub running: Arc<atomic::AtomicBool>,
60    pub running_inner: Arc<atomic::AtomicBool>,
61    pub clients: Arc<Mutex<Vec<Client>>>,
62    pub config: ServerConfig,
63    pub socket: UdpSocket,
64    pub rx: mpsc::Receiver<DSUFrame>,
65}
66
67type ThreadResults = (
68    io::Result<()>,
69    Result<(), Box<dyn std::any::Any + Send + 'static>>,
70);
71
72impl Server {
73    /// Attempt to create a new `Server`.
74    ///
75    /// The first argument is an [`AtomicBool`](std::sync::atomic::AtomicBool) within an `Arc<>`
76    /// that, when `false`, signals that the server should be shut down.
77    pub fn new(
78        running: Arc<atomic::AtomicBool>,
79        config: ServerConfig,
80    ) -> Result<Self, ServerError> {
81        let addr = format!("{}:{}", config.bind_addr, config.port);
82
83        let socket = UdpSocket::bind(&addr).map_err(ServerError::UdpSocketOperationError)?;
84
85        socket
86            .set_read_timeout(Some(Duration::from_secs(1)))
87            .map_err(ServerError::UdpSocketOperationError)?;
88
89        log::info!("CemuHook server listening on {}", addr);
90
91        let clients: Arc<Mutex<Vec<Client>>> = Arc::new(Mutex::new(Vec::new()));
92        let running_inner = Arc::new(atomic::AtomicBool::new(true));
93
94        Ok(Self {
95            running,
96            running_inner,
97            clients,
98            config,
99            socket,
100        })
101    }
102
103    /// Start the CemuHook UDP server and broadcast frames received on `rx` to all subscribed CemuHook clients
104    /// Blocks until both the Receving loop (this thread) and Send loop (background thread) complete
105    /// Returns both results on Success, Err(ServerError) if the server failed to start
106    pub fn run(&self, rx: mpsc::Receiver<DSUFrame>) -> Result<ThreadResults, ServerError> {
107        let send_context = SendThreadContext {
108            running: self.running.clone(),
109            running_inner: self.running_inner.clone(),
110            clients: self.clients.clone(),
111            config: self.config.clone(),
112            socket: self
113                .socket
114                .try_clone()
115                .map_err(ServerError::UdpSocketCloneFailed)?,
116            rx,
117        };
118
119        // Spawn the send thread and store the handle
120        let send_handle = thread::spawn(move || {
121            Self::send_loop(send_context);
122        });
123
124        let recv_result = self.recv_loop();
125        let send_result = send_handle.join();
126
127        Ok((recv_result, send_result))
128    }
129
130    fn recv_loop(&self) -> io::Result<()> {
131        let mut buf = [0u8; 256];
132
133        while self.running.load(READ_ATOMIC_BOOL_ORDERING)
134            && self.running_inner.load(READ_ATOMIC_BOOL_ORDERING)
135        {
136            match self.socket.recv_from(&mut buf) {
137                Ok((msg_len, addr)) => {
138                    self.process_received_message(&buf, msg_len, &addr, &self.socket);
139                }
140                Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
141                    // Read timeout
142                    self.prune_clients();
143                }
144                Err(e) if e.kind() == io::ErrorKind::Interrupted => {
145                    // Probable SIGINT
146                    log::debug!("UDP recv interrupted");
147                }
148                Err(e) => {
149                    // shut up, windows
150                    if e.raw_os_error() != Some(10060) {
151                        log::error!("UDP recv error: {:?}", e);
152                    }
153                }
154            }
155        }
156
157        Ok(())
158    }
159
160    fn process_received_message(
161        &self,
162        buf: &[u8; 256],
163        msg_len: usize,
164        addr: &SocketAddr,
165        socket: &UdpSocket,
166    ) {
167        if msg_len < 20 {
168            log::trace!("Short packet ({} bytes) from {}", msg_len, addr);
169            return;
170        }
171
172        let magic = &buf[0..4];
173        if magic != b"DSUC" {
174            log::trace!("Ignoring non-DSUC packet from {}", addr);
175            return;
176        }
177
178        let client_id = u32::from_le_bytes([buf[12], buf[13], buf[14], buf[15]]);
179        let event_type = u32::from_le_bytes([buf[16], buf[17], buf[18], buf[19]]);
180
181        log::trace!(
182            "Received {} request from {} (id={})",
183            event_type_str(event_type),
184            addr,
185            client_id
186        );
187
188        match event_type {
189            VERSION_TYPE => handle_version_request(client_id, addr, socket),
190            INFO_TYPE => handle_info_request(buf, client_id, addr, socket, self.config.slot),
191            DATA_TYPE => handle_data_request(
192                buf,
193                msg_len,
194                client_id,
195                addr,
196                &self.clients,
197                self.config.slot,
198            ),
199            _ => {
200                log::trace!("Unhandled event type 0x{:06x} from {}", event_type, addr);
201            }
202        }
203    }
204
205    fn prune_clients(&self) {
206        let Ok(mut list) = self.clients.lock() else {
207            log::error!("Could not lock clients mutex... Skipping client prune.");
208            return;
209        };
210
211        let before = list.len();
212        list.retain(|c| c.last_seen.elapsed() < CLIENT_TIMEOUT);
213        let after = list.len();
214        if before != after {
215            log::info!(
216                "Pruned {} stale client(s), {} remaining",
217                before - after,
218                after
219            );
220        }
221    }
222
223    fn send_loop(context: SendThreadContext) {
224        let mut packet_buf = [0u8; 100];
225        let mut timestamp_us: u64 = 0;
226
227        loop {
228            if !context.running.load(READ_ATOMIC_BOOL_ORDERING)
229                || !context.running_inner.load(READ_ATOMIC_BOOL_ORDERING)
230            {
231                break;
232            }
233
234            let frame = match context.rx.recv() {
235                Ok(f) => f,
236                Err(_) => {
237                    log::debug!("Reader's DSUFrame channel closed, send loop exiting.");
238                    context.running_inner.store(false, atomic::Ordering::SeqCst);
239                    break;
240                }
241            };
242
243            let Ok(mut list) = context.clients.lock() else {
244                log::error!("Not sending data this frame, could not lock clients mutex.");
245                continue;
246            };
247
248            if list.is_empty() {
249                continue;
250            }
251
252            for client in list.iter_mut() {
253                client.packet_counter = client.packet_counter.wrapping_add(1);
254
255                dsu::write_data_event(
256                    &mut packet_buf,
257                    &frame,
258                    client.packet_counter,
259                    client.id,
260                    client.slot,
261                    timestamp_us,
262                    context.config.invert_pitch,
263                );
264
265                log::trace!(
266                    "Packet {} to {} (slot={}): accel=({:.3}, {:.3}, {:.3}) gyro=({:.1}, {:.1}, {:.1})",
267                    client.packet_counter,
268                    client.addr,
269                    client.slot,
270                    frame.accel_x,
271                    frame.accel_y,
272                    frame.accel_z,
273                    frame.gyro_x,
274                    frame.gyro_y,
275                    frame.gyro_z
276                );
277
278                if let Err(e) = context.socket.send_to(&packet_buf, client.addr) {
279                    log::trace!("Send error to {}: {}", client.addr, e);
280                }
281            }
282
283            timestamp_us = timestamp_us.wrapping_add(4000);
284        }
285    }
286}
287
288fn handle_version_request(client_id: u32, addr: &SocketAddr, socket: &UdpSocket) {
289    let mut version_buf = [0u8; 22];
290    dsu::write_version_response(&mut version_buf, client_id);
291    if let Err(e) = socket.send_to(&version_buf, addr) {
292        log::warn!("Failed to send version response to {}: {}", addr, e);
293    }
294}
295
296fn handle_info_request(
297    buf: &[u8; 256],
298    client_id: u32,
299    addr: &SocketAddr,
300    socket: &UdpSocket,
301    configured_slot: u8,
302) {
303    let mut info_buf = [0u8; 32];
304    // Parse requested slots.
305    let port_cnt = i32::from_le_bytes([buf[20], buf[21], buf[22], buf[23]]) as usize;
306    let requested = port_cnt.min(4);
307    for i in 0..requested {
308        let slot = buf[24 + i];
309        if slot == configured_slot {
310            dsu::write_info_response(&mut info_buf, slot, client_id, true);
311            if let Err(e) = socket.send_to(&info_buf, addr) {
312                log::warn!("Failed to send info response to {}: {}", addr, e);
313            }
314            break;
315        }
316    }
317}
318
319fn handle_data_request(
320    buf: &[u8; 256],
321    msg_len: usize,
322    client_id: u32,
323    addr: &SocketAddr,
324    clients: &Arc<Mutex<Vec<Client>>>,
325    configured_slot: u8,
326) {
327    // Parse requested slot from payload.
328    // CemuHook DATA request: byte 20 = flags, byte 21 = slot.
329    let requested_slot = if msg_len > 21 { buf[21] } else { 0 };
330
331    if requested_slot != configured_slot {
332        log::trace!(
333            "Ignoring data request from {} for slot {} (configured slot is {})",
334            addr,
335            requested_slot,
336            configured_slot
337        );
338        return;
339    }
340
341    let Ok(mut list) = clients.lock() else {
342        log::error!("Not handling data request, could not lock clients mutex...");
343        return;
344    };
345
346    match list.iter_mut().find(|c| c.addr == *addr) {
347        Some(client) => {
348            client.last_seen = Instant::now();
349            client.id = client_id;
350            client.slot = requested_slot;
351            log::trace!("Updated existing client {} (slot={})", addr, requested_slot);
352        }
353        None => {
354            log::info!("New client subscribed: {} (slot={})", addr, requested_slot);
355            list.push(Client {
356                addr: *addr,
357                id: client_id,
358                slot: requested_slot,
359                last_seen: Instant::now(),
360                packet_counter: 0,
361            });
362        }
363    }
364}
365
366fn event_type_str(t: u32) -> &'static str {
367    match t {
368        VERSION_TYPE => "VERSION",
369        INFO_TYPE => "INFO",
370        DATA_TYPE => "DATA",
371        _ => "UNKNOWN",
372    }
373}