Skip to main content

scdsu_core/
server.rs

1use std::io;
2use std::net::{SocketAddr, UdpSocket};
3use std::sync::atomic;
4use std::sync::{Arc, Mutex, mpsc};
5use std::thread;
6use std::time::{Duration, Instant};
7
8use crate::dsu::DSUFrame;
9use crate::errors::ServerError;
10use crate::{READ_ATOMIC_BOOL_ORDERING, dsu};
11
12const CLIENT_TIMEOUT: Duration = Duration::from_secs(5);
13const VERSION_TYPE: u32 = 0x100000;
14const INFO_TYPE: u32 = 0x100001;
15const DATA_TYPE: u32 = 0x100002;
16
17#[derive(Debug, Clone)]
18pub struct ServerConfig {
19    /// Address or host to bind to
20    pub bind_addr: String,
21    // Port to listen on
22    pub port: u16,
23    /// Invert the yaxis values on the gyro and accelerometer
24    pub invert_pitch: bool,
25    /// CemuHook controller slot to report on (0-3)
26    pub slot: u8,
27}
28#[derive(Debug)]
29struct Client {
30    addr: SocketAddr,
31    id: u32,
32    slot: u8,
33    last_seen: Instant,
34    packet_counter: u32,
35}
36
37/// CemuHook UDP Server
38pub struct Server {
39    main_thread_running: Arc<atomic::AtomicBool>,
40    server_thread_running: Arc<atomic::AtomicBool>,
41    clients: Arc<Mutex<Vec<Client>>>,
42    config: ServerConfig,
43    socket: UdpSocket,
44}
45
46/// CemuHook UDP Server Send thread context
47struct SendThreadContext {
48    pub main_thread_running: Arc<atomic::AtomicBool>,
49    pub server_thread_running: Arc<atomic::AtomicBool>,
50    pub clients: Arc<Mutex<Vec<Client>>>,
51    pub config: ServerConfig,
52    pub socket: UdpSocket,
53    pub rx: mpsc::Receiver<DSUFrame>,
54}
55
56type ThreadResults = (
57    io::Result<()>,
58    Result<(), Box<dyn std::any::Any + Send + 'static>>,
59);
60
61impl Server {
62    /// Attempt to create a new `Server`
63    pub fn new(
64        main_thread_running: Arc<atomic::AtomicBool>,
65        config: ServerConfig,
66    ) -> Result<Self, ServerError> {
67        let addr = format!("{}:{}", config.bind_addr, config.port);
68
69        let socket = UdpSocket::bind(&addr).map_err(ServerError::UdpSocketOperationError)?;
70
71        socket
72            .set_read_timeout(Some(Duration::from_secs(1)))
73            .map_err(ServerError::UdpSocketOperationError)?;
74
75        log::info!("CemuHook server listening on {}", addr);
76
77        let clients: Arc<Mutex<Vec<Client>>> = Arc::new(Mutex::new(Vec::new()));
78        let server_thread_running = Arc::new(atomic::AtomicBool::new(true));
79
80        Ok(Self {
81            main_thread_running,
82            server_thread_running,
83            clients,
84            config,
85            socket,
86        })
87    }
88
89    /// Start the CemuHook UDP server and broadcast frames received on `rx` to all subscribed CemuHook clients
90    /// Blocks until both the Receving loop (this thread) and Send loop (background thread) complete
91    /// Returns both results on Success, Err(ServerError) if the server failed to start
92    pub fn run(&self, rx: mpsc::Receiver<DSUFrame>) -> Result<ThreadResults, ServerError> {
93        let send_context = SendThreadContext {
94            main_thread_running: self.main_thread_running.clone(),
95            server_thread_running: self.server_thread_running.clone(),
96            clients: self.clients.clone(),
97            config: self.config.clone(),
98            socket: self
99                .socket
100                .try_clone()
101                .map_err(ServerError::UdpSocketCloneFailed)?,
102            rx,
103        };
104
105        // Spawn the send thread and store the handle
106        let send_handle = thread::spawn(move || {
107            Self::send_loop(send_context);
108        });
109
110        let recv_result = self.recv_loop();
111        let send_result = send_handle.join();
112
113        Ok((recv_result, send_result))
114    }
115
116    fn recv_loop(&self) -> io::Result<()> {
117        let mut buf = [0u8; 256];
118
119        while self.main_thread_running.load(READ_ATOMIC_BOOL_ORDERING)
120            && self.server_thread_running.load(READ_ATOMIC_BOOL_ORDERING)
121        {
122            match self.socket.recv_from(&mut buf) {
123                Ok((msg_len, addr)) => {
124                    self.process_received_message(&buf, msg_len, &addr, &self.socket);
125                }
126                Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
127                    // Read timeout
128                    self.prune_clients();
129                }
130                Err(e) if e.kind() == io::ErrorKind::Interrupted => {
131                    // Probable SIGINT
132                    log::debug!("UDP recv interrupted");
133                }
134                Err(e) => {
135                    log::error!("UDP recv error: {:?}", e);
136                }
137            }
138        }
139
140        Ok(())
141    }
142
143    fn process_received_message(
144        &self,
145        buf: &[u8; 256],
146        msg_len: usize,
147        addr: &SocketAddr,
148        socket: &UdpSocket,
149    ) {
150        if msg_len < 20 {
151            log::trace!("Short packet ({} bytes) from {}", msg_len, addr);
152            return;
153        }
154
155        let magic = &buf[0..4];
156        if magic != b"DSUC" {
157            log::trace!("Ignoring non-DSUC packet from {}", addr);
158            return;
159        }
160
161        let client_id = u32::from_le_bytes([buf[12], buf[13], buf[14], buf[15]]);
162        let event_type = u32::from_le_bytes([buf[16], buf[17], buf[18], buf[19]]);
163
164        log::trace!(
165            "Received {} request from {} (id={})",
166            event_type_str(event_type),
167            addr,
168            client_id
169        );
170
171        match event_type {
172            VERSION_TYPE => handle_version_request(client_id, addr, socket),
173            INFO_TYPE => handle_info_request(buf, client_id, addr, socket, self.config.slot),
174            DATA_TYPE => handle_data_request(
175                buf,
176                msg_len,
177                client_id,
178                addr,
179                &self.clients,
180                self.config.slot,
181            ),
182            _ => {
183                log::trace!("Unhandled event type 0x{:06x} from {}", event_type, addr);
184            }
185        }
186    }
187
188    fn prune_clients(&self) {
189        let Ok(mut list) = self.clients.lock() else {
190            log::error!("Could not lock clients mutex... Skipping client prune.");
191            return;
192        };
193
194        let before = list.len();
195        list.retain(|c| c.last_seen.elapsed() < CLIENT_TIMEOUT);
196        let after = list.len();
197        if before != after {
198            log::info!(
199                "Pruned {} stale client(s), {} remaining",
200                before - after,
201                after
202            );
203        }
204    }
205
206    fn send_loop(context: SendThreadContext) {
207        let mut packet_buf = [0u8; 100];
208        let mut timestamp_us: u64 = 0;
209
210        loop {
211            if !context.main_thread_running.load(READ_ATOMIC_BOOL_ORDERING)
212                || !context
213                    .server_thread_running
214                    .load(READ_ATOMIC_BOOL_ORDERING)
215            {
216                break;
217            }
218
219            let frame = match context.rx.recv() {
220                Ok(f) => f,
221                Err(_) => {
222                    log::debug!("Frame channel closed, send loop exiting");
223                    context
224                        .server_thread_running
225                        .store(false, atomic::Ordering::SeqCst);
226                    break;
227                }
228            };
229
230            let Ok(mut list) = context.clients.lock() else {
231                log::error!("Not sending data this frame, could not lock clients mutex.");
232                continue;
233            };
234
235            if list.is_empty() {
236                continue;
237            }
238
239            for client in list.iter_mut() {
240                client.packet_counter = client.packet_counter.wrapping_add(1);
241
242                dsu::write_data_event(
243                    &mut packet_buf,
244                    &frame,
245                    client.packet_counter,
246                    client.id,
247                    client.slot,
248                    timestamp_us,
249                    context.config.invert_pitch,
250                );
251
252                log::trace!(
253                    "Packet {} to {} (slot={}): accel=({:.3}, {:.3}, {:.3}) gyro=({:.1}, {:.1}, {:.1})",
254                    client.packet_counter,
255                    client.addr,
256                    client.slot,
257                    frame.accel_x,
258                    frame.accel_y,
259                    frame.accel_z,
260                    frame.gyro_x,
261                    frame.gyro_y,
262                    frame.gyro_z
263                );
264
265                if let Err(e) = context.socket.send_to(&packet_buf, client.addr) {
266                    log::trace!("Send error to {}: {}", client.addr, e);
267                }
268            }
269
270            timestamp_us = timestamp_us.wrapping_add(4000);
271        }
272    }
273}
274
275fn handle_version_request(client_id: u32, addr: &SocketAddr, socket: &UdpSocket) {
276    let mut version_buf = [0u8; 22];
277    dsu::write_version_response(&mut version_buf, client_id);
278    if let Err(e) = socket.send_to(&version_buf, addr) {
279        log::warn!("Failed to send version response to {}: {}", addr, e);
280    }
281}
282
283fn handle_info_request(
284    buf: &[u8; 256],
285    client_id: u32,
286    addr: &SocketAddr,
287    socket: &UdpSocket,
288    configured_slot: u8,
289) {
290    let mut info_buf = [0u8; 32];
291    // Parse requested slots.
292    let port_cnt = i32::from_le_bytes([buf[20], buf[21], buf[22], buf[23]]) as usize;
293    let requested = port_cnt.min(4);
294    for i in 0..requested {
295        let slot = buf[24 + i];
296        if slot == configured_slot {
297            dsu::write_info_response(&mut info_buf, slot, client_id, true);
298            if let Err(e) = socket.send_to(&info_buf, addr) {
299                log::warn!("Failed to send info response to {}: {}", addr, e);
300            }
301            break;
302        }
303    }
304}
305
306fn handle_data_request(
307    buf: &[u8; 256],
308    msg_len: usize,
309    client_id: u32,
310    addr: &SocketAddr,
311    clients: &Arc<Mutex<Vec<Client>>>,
312    configured_slot: u8,
313) {
314    // Parse requested slot from payload.
315    // CemuHook DATA request: byte 20 = flags, byte 21 = slot.
316    let requested_slot = if msg_len > 21 { buf[21] } else { 0 };
317
318    if requested_slot != configured_slot {
319        log::trace!(
320            "Ignoring data request from {} for slot {} (configured slot is {})",
321            addr,
322            requested_slot,
323            configured_slot
324        );
325        return;
326    }
327
328    let Ok(mut list) = clients.lock() else {
329        log::error!("Not handling data request, could not lock clients mutex...");
330        return;
331    };
332
333    match list.iter_mut().find(|c| c.addr == *addr) {
334        Some(client) => {
335            client.last_seen = Instant::now();
336            client.id = client_id;
337            client.slot = requested_slot;
338            log::trace!("Updated existing client {} (slot={})", addr, requested_slot);
339        }
340        None => {
341            log::info!("New client subscribed: {} (slot={})", addr, requested_slot);
342            list.push(Client {
343                addr: *addr,
344                id: client_id,
345                slot: requested_slot,
346                last_seen: Instant::now(),
347                packet_counter: 0,
348            });
349        }
350    }
351}
352
353fn event_type_str(t: u32) -> &'static str {
354    match t {
355        VERSION_TYPE => "VERSION",
356        INFO_TYPE => "INFO",
357        DATA_TYPE => "DATA",
358        _ => "UNKNOWN",
359    }
360}