Skip to main content

scdsu_core/
server.rs

1//! This module provides the CemuHook (DSU) UDP server implementation
2
3use std::io;
4use std::net::{SocketAddr, UdpSocket};
5use std::sync::atomic;
6use std::sync::{Arc, Mutex, mpsc};
7use std::thread;
8use std::time::{Duration, Instant};
9
10use crate::dsu::DSUFrame;
11use crate::errors::ServerError;
12use crate::{READ_ATOMIC_BOOL_ORDERING, dsu};
13
14const CLIENT_TIMEOUT: Duration = Duration::from_secs(5);
15const VERSION_TYPE: u32 = 0x100000;
16const INFO_TYPE: u32 = 0x100001;
17const DATA_TYPE: u32 = 0x100002;
18
19#[derive(Debug, Clone)]
20pub struct ServerConfig {
21    /// Address or host to bind to
22    pub bind_addr: String,
23    // Port to listen on
24    pub port: u16,
25    /// Invert the yaxis values on the gyro and accelerometer
26    pub invert_pitch: bool,
27    /// CemuHook controller slot to report on (0-3)
28    pub slot: u8,
29}
30
31#[derive(Debug)]
32struct Client {
33    addr: SocketAddr,
34    id: u32,
35    slot: u8,
36    last_seen: Instant,
37    packet_counter: u32,
38}
39
40/// CemuHook UDP Server
41pub struct Server {
42    main_thread_run: Arc<atomic::AtomicBool>,
43    server_thread_run: Arc<atomic::AtomicBool>,
44    clients: Arc<Mutex<Vec<Client>>>,
45    config: ServerConfig,
46    socket: UdpSocket,
47}
48
49/// CemuHook UDP Server Send thread context
50struct SendThreadContext {
51    pub main_thread_run: Arc<atomic::AtomicBool>,
52    pub server_thread_run: Arc<atomic::AtomicBool>,
53    pub clients: Arc<Mutex<Vec<Client>>>,
54    pub config: ServerConfig,
55    pub socket: UdpSocket,
56    pub rx: mpsc::Receiver<DSUFrame>,
57}
58
59type ThreadResults = (
60    io::Result<()>,
61    Result<(), Box<dyn std::any::Any + Send + 'static>>,
62);
63
64impl Server {
65    /// Attempt to create a new `Server`.
66    ///
67    /// The first argument is an [`AtomicBool`](std::sync::atomic::AtomicBool) within an `Arc<>`
68    /// that, when `false`, signals that the server should be shut down.
69    pub fn new(
70        main_thread_run: Arc<atomic::AtomicBool>,
71        config: ServerConfig,
72    ) -> Result<Self, ServerError> {
73        let addr = format!("{}:{}", config.bind_addr, config.port);
74
75        let socket = UdpSocket::bind(&addr).map_err(ServerError::UdpSocketOperationError)?;
76
77        socket
78            .set_read_timeout(Some(Duration::from_secs(1)))
79            .map_err(ServerError::UdpSocketOperationError)?;
80
81        log::info!("CemuHook server listening on {}", addr);
82
83        let clients: Arc<Mutex<Vec<Client>>> = Arc::new(Mutex::new(Vec::new()));
84        let server_thread_run = Arc::new(atomic::AtomicBool::new(true));
85
86        Ok(Self {
87            main_thread_run,
88            server_thread_run,
89            clients,
90            config,
91            socket,
92        })
93    }
94
95    /// Start the CemuHook UDP server and broadcast frames received on `rx` to all subscribed CemuHook clients
96    /// Blocks until both the Receving loop (this thread) and Send loop (background thread) complete
97    /// Returns both results on Success, Err(ServerError) if the server failed to start
98    pub fn run(&self, rx: mpsc::Receiver<DSUFrame>) -> Result<ThreadResults, ServerError> {
99        let send_context = SendThreadContext {
100            main_thread_run: self.main_thread_run.clone(),
101            server_thread_run: self.server_thread_run.clone(),
102            clients: self.clients.clone(),
103            config: self.config.clone(),
104            socket: self
105                .socket
106                .try_clone()
107                .map_err(ServerError::UdpSocketCloneFailed)?,
108            rx,
109        };
110
111        // Spawn the send thread and store the handle
112        let send_handle = thread::spawn(move || {
113            Self::send_loop(send_context);
114        });
115
116        let recv_result = self.recv_loop();
117        let send_result = send_handle.join();
118
119        Ok((recv_result, send_result))
120    }
121
122    fn recv_loop(&self) -> io::Result<()> {
123        let mut buf = [0u8; 256];
124
125        while self.main_thread_run.load(READ_ATOMIC_BOOL_ORDERING)
126            && self.server_thread_run.load(READ_ATOMIC_BOOL_ORDERING)
127        {
128            match self.socket.recv_from(&mut buf) {
129                Ok((msg_len, addr)) => {
130                    self.process_received_message(&buf, msg_len, &addr, &self.socket);
131                }
132                Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
133                    // Read timeout
134                    self.prune_clients();
135                }
136                Err(e) if e.kind() == io::ErrorKind::Interrupted => {
137                    // Probable SIGINT
138                    log::debug!("UDP recv interrupted");
139                }
140                Err(e) => {
141                    log::error!("UDP recv error: {:?}", e);
142                }
143            }
144        }
145
146        Ok(())
147    }
148
149    fn process_received_message(
150        &self,
151        buf: &[u8; 256],
152        msg_len: usize,
153        addr: &SocketAddr,
154        socket: &UdpSocket,
155    ) {
156        if msg_len < 20 {
157            log::trace!("Short packet ({} bytes) from {}", msg_len, addr);
158            return;
159        }
160
161        let magic = &buf[0..4];
162        if magic != b"DSUC" {
163            log::trace!("Ignoring non-DSUC packet from {}", addr);
164            return;
165        }
166
167        let client_id = u32::from_le_bytes([buf[12], buf[13], buf[14], buf[15]]);
168        let event_type = u32::from_le_bytes([buf[16], buf[17], buf[18], buf[19]]);
169
170        log::trace!(
171            "Received {} request from {} (id={})",
172            event_type_str(event_type),
173            addr,
174            client_id
175        );
176
177        match event_type {
178            VERSION_TYPE => handle_version_request(client_id, addr, socket),
179            INFO_TYPE => handle_info_request(buf, client_id, addr, socket, self.config.slot),
180            DATA_TYPE => handle_data_request(
181                buf,
182                msg_len,
183                client_id,
184                addr,
185                &self.clients,
186                self.config.slot,
187            ),
188            _ => {
189                log::trace!("Unhandled event type 0x{:06x} from {}", event_type, addr);
190            }
191        }
192    }
193
194    fn prune_clients(&self) {
195        let Ok(mut list) = self.clients.lock() else {
196            log::error!("Could not lock clients mutex... Skipping client prune.");
197            return;
198        };
199
200        let before = list.len();
201        list.retain(|c| c.last_seen.elapsed() < CLIENT_TIMEOUT);
202        let after = list.len();
203        if before != after {
204            log::info!(
205                "Pruned {} stale client(s), {} remaining",
206                before - after,
207                after
208            );
209        }
210    }
211
212    fn send_loop(context: SendThreadContext) {
213        let mut packet_buf = [0u8; 100];
214        let mut timestamp_us: u64 = 0;
215
216        loop {
217            if !context.main_thread_run.load(READ_ATOMIC_BOOL_ORDERING)
218                || !context.server_thread_run.load(READ_ATOMIC_BOOL_ORDERING)
219            {
220                break;
221            }
222
223            let frame = match context.rx.recv() {
224                Ok(f) => f,
225                Err(_) => {
226                    log::debug!("Frame channel closed, send loop exiting");
227                    context
228                        .server_thread_run
229                        .store(false, atomic::Ordering::SeqCst);
230                    break;
231                }
232            };
233
234            let Ok(mut list) = context.clients.lock() else {
235                log::error!("Not sending data this frame, could not lock clients mutex.");
236                continue;
237            };
238
239            if list.is_empty() {
240                continue;
241            }
242
243            for client in list.iter_mut() {
244                client.packet_counter = client.packet_counter.wrapping_add(1);
245
246                dsu::write_data_event(
247                    &mut packet_buf,
248                    &frame,
249                    client.packet_counter,
250                    client.id,
251                    client.slot,
252                    timestamp_us,
253                    context.config.invert_pitch,
254                );
255
256                log::trace!(
257                    "Packet {} to {} (slot={}): accel=({:.3}, {:.3}, {:.3}) gyro=({:.1}, {:.1}, {:.1})",
258                    client.packet_counter,
259                    client.addr,
260                    client.slot,
261                    frame.accel_x,
262                    frame.accel_y,
263                    frame.accel_z,
264                    frame.gyro_x,
265                    frame.gyro_y,
266                    frame.gyro_z
267                );
268
269                if let Err(e) = context.socket.send_to(&packet_buf, client.addr) {
270                    log::trace!("Send error to {}: {}", client.addr, e);
271                }
272            }
273
274            timestamp_us = timestamp_us.wrapping_add(4000);
275        }
276    }
277}
278
279fn handle_version_request(client_id: u32, addr: &SocketAddr, socket: &UdpSocket) {
280    let mut version_buf = [0u8; 22];
281    dsu::write_version_response(&mut version_buf, client_id);
282    if let Err(e) = socket.send_to(&version_buf, addr) {
283        log::warn!("Failed to send version response to {}: {}", addr, e);
284    }
285}
286
287fn handle_info_request(
288    buf: &[u8; 256],
289    client_id: u32,
290    addr: &SocketAddr,
291    socket: &UdpSocket,
292    configured_slot: u8,
293) {
294    let mut info_buf = [0u8; 32];
295    // Parse requested slots.
296    let port_cnt = i32::from_le_bytes([buf[20], buf[21], buf[22], buf[23]]) as usize;
297    let requested = port_cnt.min(4);
298    for i in 0..requested {
299        let slot = buf[24 + i];
300        if slot == configured_slot {
301            dsu::write_info_response(&mut info_buf, slot, client_id, true);
302            if let Err(e) = socket.send_to(&info_buf, addr) {
303                log::warn!("Failed to send info response to {}: {}", addr, e);
304            }
305            break;
306        }
307    }
308}
309
310fn handle_data_request(
311    buf: &[u8; 256],
312    msg_len: usize,
313    client_id: u32,
314    addr: &SocketAddr,
315    clients: &Arc<Mutex<Vec<Client>>>,
316    configured_slot: u8,
317) {
318    // Parse requested slot from payload.
319    // CemuHook DATA request: byte 20 = flags, byte 21 = slot.
320    let requested_slot = if msg_len > 21 { buf[21] } else { 0 };
321
322    if requested_slot != configured_slot {
323        log::trace!(
324            "Ignoring data request from {} for slot {} (configured slot is {})",
325            addr,
326            requested_slot,
327            configured_slot
328        );
329        return;
330    }
331
332    let Ok(mut list) = clients.lock() else {
333        log::error!("Not handling data request, could not lock clients mutex...");
334        return;
335    };
336
337    match list.iter_mut().find(|c| c.addr == *addr) {
338        Some(client) => {
339            client.last_seen = Instant::now();
340            client.id = client_id;
341            client.slot = requested_slot;
342            log::trace!("Updated existing client {} (slot={})", addr, requested_slot);
343        }
344        None => {
345            log::info!("New client subscribed: {} (slot={})", addr, requested_slot);
346            list.push(Client {
347                addr: *addr,
348                id: client_id,
349                slot: requested_slot,
350                last_seen: Instant::now(),
351                packet_counter: 0,
352            });
353        }
354    }
355}
356
357fn event_type_str(t: u32) -> &'static str {
358    match t {
359        VERSION_TYPE => "VERSION",
360        INFO_TYPE => "INFO",
361        DATA_TYPE => "DATA",
362        _ => "UNKNOWN",
363    }
364}