soe_protocol/tokio_rt.rs
1//! A [Tokio](https://tokio.rs)-based async adapter driving a [`SoeMultiplexer`]
2//! over a UDP socket. Enabled by the `tokio` feature.
3//!
4//! The I/O-agnostic [`SoeMultiplexer`] is runtime-agnostic; this module is a thin,
5//! optional convenience layer for users who want a ready-made async driver. It owns
6//! a [`tokio::net::UdpSocket`] and interleaves socket reads with periodic ticks
7//! (for heartbeats, timeouts, and reliable-data resends), flushing outgoing
8//! datagrams after each step.
9
10use std::io;
11use std::net::SocketAddr;
12use std::time::Instant;
13
14use bytes::Bytes;
15use tokio::net::UdpSocket;
16use tokio::sync::mpsc;
17use tokio::task::JoinHandle;
18use tokio::time::{Duration, Interval, MissedTickBehavior, interval};
19
20use crate::protocol::DisconnectReason;
21use crate::socket::{SocketConfig, SocketEvent, SoeMultiplexer, SoeSocket};
22
23/// Buffer size for a single received datagram. SOE UDP lengths default to 512 and
24/// rarely exceed it.
25const RECV_BUFFER_SIZE: usize = 2048;
26
27/// An async SOE socket: a [`SoeMultiplexer`] driven over a Tokio UDP socket.
28///
29/// Drive it by repeatedly awaiting [`step`](TokioSoeSocket::step), which performs a
30/// single read-or-tick cycle and returns any [`SocketEvent`]s produced. Sessions are
31/// initiated with [`connect`](TokioSoeSocket::connect) and data is sent with
32/// [`enqueue_data`](TokioSoeSocket::enqueue_data).
33#[derive(Debug)]
34pub struct TokioSoeSocket {
35 mux: SoeMultiplexer<SocketAddr>,
36 socket: UdpSocket,
37 tick: Interval,
38 buf: Box<[u8]>,
39}
40
41impl TokioSoeSocket {
42 /// Binds a UDP socket to `local` and prepares to drive sessions, ticking every
43 /// `tick_period`. A period of 1–10ms is typical.
44 pub async fn bind(
45 local: SocketAddr,
46 config: SocketConfig,
47 tick_period: Duration,
48 ) -> io::Result<Self> {
49 let socket = UdpSocket::bind(local).await?;
50 let mut tick = interval(tick_period);
51 // If we fall behind (e.g. while awaiting a send), don't fire a burst of
52 // catch-up ticks; a single delayed tick is enough.
53 tick.set_missed_tick_behavior(MissedTickBehavior::Delay);
54
55 Ok(Self {
56 mux: SoeMultiplexer::new(config),
57 socket,
58 tick,
59 buf: vec![0u8; RECV_BUFFER_SIZE].into_boxed_slice(),
60 })
61 }
62
63 /// Returns the local address the socket is bound to.
64 pub fn local_addr(&self) -> io::Result<SocketAddr> {
65 self.socket.local_addr()
66 }
67
68 /// Performs a single drive cycle: awaits either an incoming datagram or the next
69 /// tick, runs a session tick, flushes outgoing datagrams, and returns any events.
70 pub async fn step(&mut self) -> io::Result<Vec<SocketEvent<SocketAddr>>> {
71 tokio::select! {
72 result = self.socket.recv_from(&mut self.buf) => {
73 let (len, from) = result?;
74 let datagram = Bytes::copy_from_slice(&self.buf[..len]);
75 self.mux.process_incoming(from, datagram, Instant::now());
76 }
77 _ = self.tick.tick() => {}
78 }
79
80 self.mux.run_tick(Instant::now());
81
82 for (addr, datagram) in self.mux.take_outgoing() {
83 self.socket.send_to(&datagram, addr).await?;
84 }
85
86 Ok(self.mux.take_events())
87 }
88}
89
90impl SoeSocket for TokioSoeSocket {
91 fn local_addr(&self) -> io::Result<SocketAddr> {
92 self.socket.local_addr()
93 }
94
95 fn session_count(&self) -> usize {
96 self.mux.session_count()
97 }
98
99 fn connect(&mut self, remote: SocketAddr) {
100 self.mux.connect(remote, Instant::now());
101 }
102
103 fn enqueue_data(&mut self, remote: &SocketAddr, data: &[u8]) -> bool {
104 self.mux.enqueue_data(remote, data)
105 }
106
107 fn terminate(&mut self, remote: &SocketAddr, reason: DisconnectReason) {
108 self.mux.terminate(remote, reason, Instant::now());
109 }
110}
111
112/// A command sent from a [`SoeHandle`] to the [`TokioSoeServer`] driver loop.
113enum Command {
114 Connect(SocketAddr),
115 EnqueueData {
116 remote: SocketAddr,
117 data: Bytes,
118 },
119 Terminate {
120 remote: SocketAddr,
121 reason: DisconnectReason,
122 },
123}
124
125/// A cloneable handle for interacting with a [`TokioSoeServer`] from any task.
126///
127/// All methods are non-blocking: they post a command to the server's driver loop,
128/// which owns the socket and the [`SoeMultiplexer`]. This lets per-client game-logic
129/// tasks send reliable data and manage sessions without sharing the (necessarily
130/// single-owner) protocol state.
131///
132/// Each method returns `false` if the server's driver loop has stopped (e.g. the
133/// [`TokioSoeServer`] was dropped), in which case the command was not delivered.
134#[derive(Clone, Debug)]
135pub struct SoeHandle {
136 commands: mpsc::UnboundedSender<Command>,
137}
138
139impl SoeHandle {
140 /// Opens a client session to `remote`. The session request is sent by the driver
141 /// loop on its next cycle.
142 pub fn connect(&self, remote: SocketAddr) -> bool {
143 self.commands.send(Command::Connect(remote)).is_ok()
144 }
145
146 /// Enqueues application data to be sent reliably to `remote`.
147 ///
148 /// Returns `false` only if the driver loop has stopped; it does **not** report
149 /// whether a session for `remote` exists (that is determined asynchronously by
150 /// the loop).
151 pub fn enqueue_data(&self, remote: SocketAddr, data: impl Into<Bytes>) -> bool {
152 self.commands
153 .send(Command::EnqueueData {
154 remote,
155 data: data.into(),
156 })
157 .is_ok()
158 }
159
160 /// Terminates the session with `remote`, notifying the remote party.
161 pub fn terminate(&self, remote: SocketAddr, reason: DisconnectReason) -> bool {
162 self.commands
163 .send(Command::Terminate { remote, reason })
164 .is_ok()
165 }
166}
167
168/// An actor-style SOE server: a [`SoeMultiplexer`] driven on its own Tokio task,
169/// reachable from any task via a cloneable [`SoeHandle`].
170///
171/// This is the recommended shape for a game server. The driver task owns the UDP
172/// socket and all protocol state (sequence numbers, ciphers, reassembly), which is
173/// inherently single-owner. Application code interacts with it asynchronously:
174///
175/// * Obtain a cloneable [`SoeHandle`] with [`handle`](TokioSoeServer::handle) and
176/// share it with per-client game-logic tasks to send data or manage sessions.
177/// * Receive [`SocketEvent`]s with [`recv_event`](TokioSoeServer::recv_event) and
178/// route them (e.g. fan `DataReceived` out to the matching per-client task).
179///
180/// Because each server owns one socket and one multiplexer, scaling UDP I/O across
181/// cores later is a matter of running several servers — one per `SO_REUSEPORT`
182/// socket — and routing by client address; no change to the core is required.
183///
184/// The driver task runs until the [`TokioSoeServer`] **and** every [`SoeHandle`] are
185/// dropped, or until the event receiver is dropped.
186#[derive(Debug)]
187pub struct TokioSoeServer {
188 handle: SoeHandle,
189 events: mpsc::UnboundedReceiver<SocketEvent<SocketAddr>>,
190 local_addr: SocketAddr,
191 driver: JoinHandle<()>,
192}
193
194impl TokioSoeServer {
195 /// Binds a UDP socket to `local` and spawns the driver loop, ticking every
196 /// `tick_period`. A period of 1–10ms is typical.
197 pub async fn bind(
198 local: SocketAddr,
199 config: SocketConfig,
200 tick_period: Duration,
201 ) -> io::Result<Self> {
202 let socket = UdpSocket::bind(local).await?;
203 let local_addr = socket.local_addr()?;
204
205 let (command_tx, command_rx) = mpsc::unbounded_channel();
206 let (event_tx, event_rx) = mpsc::unbounded_channel();
207
208 let driver = tokio::spawn(drive_loop(
209 socket,
210 config,
211 tick_period,
212 command_rx,
213 event_tx,
214 ));
215
216 Ok(Self {
217 handle: SoeHandle {
218 commands: command_tx,
219 },
220 events: event_rx,
221 local_addr,
222 driver,
223 })
224 }
225
226 /// Returns the local address the server is bound to.
227 pub fn local_addr(&self) -> SocketAddr {
228 self.local_addr
229 }
230
231 /// Returns a cloneable handle for sending commands to the server from any task.
232 pub fn handle(&self) -> SoeHandle {
233 self.handle.clone()
234 }
235
236 /// Awaits the next event from the driver loop, or `None` once the loop has
237 /// stopped.
238 pub async fn recv_event(&mut self) -> Option<SocketEvent<SocketAddr>> {
239 self.events.recv().await
240 }
241
242 /// Aborts the driver task, stopping the server.
243 pub fn abort(&self) {
244 self.driver.abort();
245 }
246}
247
248/// The actor driver loop: owns the socket and multiplexer, interleaving socket
249/// reads, periodic ticks, and commands from [`SoeHandle`]s, flushing outgoing
250/// datagrams and forwarding events after each cycle.
251async fn drive_loop(
252 socket: UdpSocket,
253 config: SocketConfig,
254 tick_period: Duration,
255 mut commands: mpsc::UnboundedReceiver<Command>,
256 events: mpsc::UnboundedSender<SocketEvent<SocketAddr>>,
257) {
258 let mut mux = SoeMultiplexer::new(config);
259 let mut tick = interval(tick_period);
260 tick.set_missed_tick_behavior(MissedTickBehavior::Delay);
261 let mut buf = vec![0u8; RECV_BUFFER_SIZE].into_boxed_slice();
262
263 loop {
264 tokio::select! {
265 result = socket.recv_from(&mut buf) => {
266 match result {
267 Ok((len, from)) => {
268 let datagram = Bytes::copy_from_slice(&buf[..len]);
269 mux.process_incoming(from, datagram, Instant::now());
270 }
271 // A transient receive error (e.g. ICMP port-unreachable surfaced
272 // on some platforms) shouldn't kill the server; skip and continue.
273 Err(_) => continue,
274 }
275 }
276 _ = tick.tick() => {
277 mux.run_tick(Instant::now());
278 }
279 command = commands.recv() => {
280 match command {
281 Some(Command::Connect(remote)) => mux.connect(remote, Instant::now()),
282 Some(Command::EnqueueData { remote, data }) => {
283 // Fire-and-forget: if no running session exists for `remote`
284 // the data is dropped (the handle API is intentionally async
285 // and can't synchronously report this).
286 let _ = mux.enqueue_data(&remote, &data);
287 }
288 Some(Command::Terminate { remote, reason }) => {
289 mux.terminate(&remote, reason, Instant::now());
290 }
291 // All handles dropped: nothing more can drive the server.
292 None => break,
293 }
294 }
295 }
296
297 for (addr, datagram) in mux.take_outgoing() {
298 // A send failure for one datagram shouldn't tear down every session.
299 let _ = socket.send_to(&datagram, addr).await;
300 }
301 for event in mux.take_events() {
302 // The event receiver was dropped: no one is listening, so shut down.
303 if events.send(event).is_err() {
304 return;
305 }
306 }
307 }
308}