soe_protocol/sync_rt.rs
1//! A synchronous, dependency-free adapter driving a [`SoeMultiplexer`] over a
2//! blocking [`std::net::UdpSocket`]. Always available; pulls in no async runtime.
3//!
4//! The I/O-agnostic [`SoeMultiplexer`] is runtime-agnostic; this module is a thin
5//! convenience layer mirroring `TokioSoeSocket` (behind the `tokio` feature) for callers who
6//! prefer a plain blocking loop. Both types implement [`SoeSocket`].
7
8use std::io;
9use std::net::{SocketAddr, UdpSocket};
10use std::time::{Duration, Instant};
11
12use bytes::Bytes;
13
14use crate::protocol::DisconnectReason;
15use crate::socket::{SocketConfig, SocketEvent, SoeMultiplexer, SoeSocket};
16
17/// Buffer size for a single received datagram. SOE UDP lengths default to 512 and
18/// rarely exceed it.
19const RECV_BUFFER_SIZE: usize = 2048;
20
21/// A synchronous SOE socket: a [`SoeMultiplexer`] driven over a blocking
22/// [`std::net::UdpSocket`].
23///
24/// Drive it by repeatedly calling [`step`](SyncSoeSocket::step), which performs a
25/// single read-or-tick cycle and returns any [`SocketEvent`]s produced. The socket
26/// is given a read timeout equal to the tick period, so `step` returns promptly when
27/// a datagram arrives and otherwise wakes once per tick to run housekeeping.
28#[derive(Debug)]
29pub struct SyncSoeSocket {
30 mux: SoeMultiplexer<SocketAddr>,
31 socket: UdpSocket,
32 buf: Box<[u8]>,
33}
34
35impl SyncSoeSocket {
36 /// Binds a UDP socket to `local` and prepares to drive sessions, waking at least
37 /// once every `tick_period` to run housekeeping. A period of 1–10ms is typical.
38 pub fn bind(
39 local: SocketAddr,
40 config: SocketConfig,
41 tick_period: Duration,
42 ) -> io::Result<Self> {
43 let socket = UdpSocket::bind(local)?;
44 // A read timeout paces the loop: recv_from returns immediately on data, or
45 // after the tick period so heartbeats, timeouts, and resends still run.
46 socket.set_read_timeout(Some(tick_period))?;
47
48 Ok(Self {
49 mux: SoeMultiplexer::new(config),
50 socket,
51 buf: vec![0u8; RECV_BUFFER_SIZE].into_boxed_slice(),
52 })
53 }
54
55 /// Performs a single drive cycle: waits up to the tick period for an incoming
56 /// datagram, runs a session tick, flushes outgoing datagrams, and returns any
57 /// events.
58 pub fn step(&mut self) -> io::Result<Vec<SocketEvent<SocketAddr>>> {
59 match self.socket.recv_from(&mut self.buf) {
60 Ok((len, from)) => {
61 let datagram = Bytes::copy_from_slice(&self.buf[..len]);
62 self.mux.process_incoming(from, datagram, Instant::now());
63 }
64 // A read timeout surfaces as WouldBlock or TimedOut depending on the
65 // platform; both simply mean "no datagram this tick".
66 Err(e)
67 if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut => {
68 }
69 Err(e) => return Err(e),
70 }
71
72 self.mux.run_tick(Instant::now());
73
74 for (addr, datagram) in self.mux.take_outgoing() {
75 self.socket.send_to(&datagram, addr)?;
76 }
77
78 Ok(self.mux.take_events())
79 }
80}
81
82impl SoeSocket for SyncSoeSocket {
83 fn local_addr(&self) -> io::Result<SocketAddr> {
84 self.socket.local_addr()
85 }
86
87 fn session_count(&self) -> usize {
88 self.mux.session_count()
89 }
90
91 fn connect(&mut self, remote: SocketAddr) {
92 self.mux.connect(remote, Instant::now());
93 }
94
95 fn enqueue_data(&mut self, remote: &SocketAddr, data: &[u8]) -> bool {
96 self.mux.enqueue_data(remote, data)
97 }
98
99 fn terminate(&mut self, remote: &SocketAddr, reason: DisconnectReason) {
100 self.mux.terminate(remote, reason, Instant::now());
101 }
102}