Skip to main content

soe_protocol/
tokio_rt.rs

1//! A [Tokio](https://tokio.rs)-based async adapter driving a [`SoeMultiplexer`]
2//! over a UDP socket. Enabled by the `tokio` feature.
3//!
4//! The I/O-agnostic [`SoeMultiplexer`] is runtime-agnostic; this module is a thin,
5//! optional convenience layer for users who want a ready-made async driver. It owns
6//! a [`tokio::net::UdpSocket`] and interleaves socket reads with periodic ticks
7//! (for heartbeats, timeouts, and reliable-data resends), flushing outgoing
8//! datagrams after each step.
9
10use std::io;
11use std::net::SocketAddr;
12use std::time::Instant;
13
14use bytes::Bytes;
15use tokio::net::UdpSocket;
16use tokio::sync::mpsc;
17use tokio::task::JoinHandle;
18use tokio::time::{Duration, Interval, MissedTickBehavior, interval};
19
20use crate::protocol::DisconnectReason;
21use crate::socket::{SocketConfig, SocketEvent, SoeMultiplexer, SoeSocket};
22
23/// Buffer size for a single received datagram. SOE UDP lengths default to 512 and
24/// rarely exceed it.
25const RECV_BUFFER_SIZE: usize = 2048;
26
27/// An async SOE socket: a [`SoeMultiplexer`] driven over a Tokio UDP socket.
28///
29/// Drive it by repeatedly awaiting [`step`](TokioSoeSocket::step), which performs a
30/// single read-or-tick cycle and returns any [`SocketEvent`]s produced. Sessions are
31/// initiated with [`connect`](TokioSoeSocket::connect) and data is sent with
32/// [`enqueue_data`](TokioSoeSocket::enqueue_data).
33#[derive(Debug)]
34pub struct TokioSoeSocket {
35    mux: SoeMultiplexer<SocketAddr>,
36    socket: UdpSocket,
37    tick: Interval,
38    buf: Box<[u8]>,
39}
40
41impl TokioSoeSocket {
42    /// Binds a UDP socket to `local` and prepares to drive sessions, ticking every
43    /// `tick_period`. A period of 1–10ms is typical.
44    pub async fn bind(
45        local: SocketAddr,
46        config: SocketConfig,
47        tick_period: Duration,
48    ) -> io::Result<Self> {
49        let socket = UdpSocket::bind(local).await?;
50        let mut tick = interval(tick_period);
51        // If we fall behind (e.g. while awaiting a send), don't fire a burst of
52        // catch-up ticks; a single delayed tick is enough.
53        tick.set_missed_tick_behavior(MissedTickBehavior::Delay);
54
55        Ok(Self {
56            mux: SoeMultiplexer::new(config),
57            socket,
58            tick,
59            buf: vec![0u8; RECV_BUFFER_SIZE].into_boxed_slice(),
60        })
61    }
62
63    /// Returns the local address the socket is bound to.
64    pub fn local_addr(&self) -> io::Result<SocketAddr> {
65        self.socket.local_addr()
66    }
67
68    /// Performs a single drive cycle: awaits either an incoming datagram or the next
69    /// tick, runs a session tick, flushes outgoing datagrams, and returns any events.
70    pub async fn step(&mut self) -> io::Result<Vec<SocketEvent<SocketAddr>>> {
71        tokio::select! {
72            result = self.socket.recv_from(&mut self.buf) => {
73                let (len, from) = result?;
74                let datagram = Bytes::copy_from_slice(&self.buf[..len]);
75                self.mux.process_incoming(from, datagram, Instant::now());
76            }
77            _ = self.tick.tick() => {}
78        }
79
80        self.mux.run_tick(Instant::now());
81
82        for (addr, datagram) in self.mux.take_outgoing() {
83            self.socket.send_to(&datagram, addr).await?;
84        }
85
86        Ok(self.mux.take_events())
87    }
88}
89
90impl SoeSocket for TokioSoeSocket {
91    fn local_addr(&self) -> io::Result<SocketAddr> {
92        self.socket.local_addr()
93    }
94
95    fn session_count(&self) -> usize {
96        self.mux.session_count()
97    }
98
99    fn connect(&mut self, remote: SocketAddr) {
100        self.mux.connect(remote, Instant::now());
101    }
102
103    fn enqueue_data(&mut self, remote: &SocketAddr, data: &[u8]) -> bool {
104        self.mux.enqueue_data(remote, data)
105    }
106
107    fn terminate(&mut self, remote: &SocketAddr, reason: DisconnectReason) {
108        self.mux.terminate(remote, reason, Instant::now());
109    }
110}
111
112/// A command sent from a [`SoeHandle`] to the [`TokioSoeServer`] driver loop.
113enum Command {
114    Connect(SocketAddr),
115    EnqueueData {
116        remote: SocketAddr,
117        data: Bytes,
118    },
119    Terminate {
120        remote: SocketAddr,
121        reason: DisconnectReason,
122    },
123}
124
125/// A cloneable handle for interacting with a [`TokioSoeServer`] from any task.
126///
127/// All methods are non-blocking: they post a command to the server's driver loop,
128/// which owns the socket and the [`SoeMultiplexer`]. This lets per-client game-logic
129/// tasks send reliable data and manage sessions without sharing the (necessarily
130/// single-owner) protocol state.
131///
132/// Each method returns `false` if the server's driver loop has stopped (e.g. the
133/// [`TokioSoeServer`] was dropped), in which case the command was not delivered.
134#[derive(Clone, Debug)]
135pub struct SoeHandle {
136    commands: mpsc::UnboundedSender<Command>,
137}
138
139impl SoeHandle {
140    /// Opens a client session to `remote`. The session request is sent by the driver
141    /// loop on its next cycle.
142    pub fn connect(&self, remote: SocketAddr) -> bool {
143        self.commands.send(Command::Connect(remote)).is_ok()
144    }
145
146    /// Enqueues application data to be sent reliably to `remote`.
147    ///
148    /// Returns `false` only if the driver loop has stopped; it does **not** report
149    /// whether a session for `remote` exists (that is determined asynchronously by
150    /// the loop).
151    pub fn enqueue_data(&self, remote: SocketAddr, data: impl Into<Bytes>) -> bool {
152        self.commands
153            .send(Command::EnqueueData {
154                remote,
155                data: data.into(),
156            })
157            .is_ok()
158    }
159
160    /// Terminates the session with `remote`, notifying the remote party.
161    pub fn terminate(&self, remote: SocketAddr, reason: DisconnectReason) -> bool {
162        self.commands
163            .send(Command::Terminate { remote, reason })
164            .is_ok()
165    }
166}
167
168/// An actor-style SOE server: a [`SoeMultiplexer`] driven on its own Tokio task,
169/// reachable from any task via a cloneable [`SoeHandle`].
170///
171/// This is the recommended shape for a game server. The driver task owns the UDP
172/// socket and all protocol state (sequence numbers, ciphers, reassembly), which is
173/// inherently single-owner. Application code interacts with it asynchronously:
174///
175/// * Obtain a cloneable [`SoeHandle`] with [`handle`](TokioSoeServer::handle) and
176///   share it with per-client game-logic tasks to send data or manage sessions.
177/// * Receive [`SocketEvent`]s with [`recv_event`](TokioSoeServer::recv_event) and
178///   route them (e.g. fan `DataReceived` out to the matching per-client task).
179///
180/// Because each server owns one socket and one multiplexer, scaling UDP I/O across
181/// cores later is a matter of running several servers — one per `SO_REUSEPORT`
182/// socket — and routing by client address; no change to the core is required.
183///
184/// The driver task runs until the [`TokioSoeServer`] **and** every [`SoeHandle`] are
185/// dropped, or until the event receiver is dropped.
186#[derive(Debug)]
187pub struct TokioSoeServer {
188    handle: SoeHandle,
189    events: mpsc::UnboundedReceiver<SocketEvent<SocketAddr>>,
190    local_addr: SocketAddr,
191    driver: JoinHandle<()>,
192}
193
194impl TokioSoeServer {
195    /// Binds a UDP socket to `local` and spawns the driver loop, ticking every
196    /// `tick_period`. A period of 1–10ms is typical.
197    pub async fn bind(
198        local: SocketAddr,
199        config: SocketConfig,
200        tick_period: Duration,
201    ) -> io::Result<Self> {
202        let socket = UdpSocket::bind(local).await?;
203        let local_addr = socket.local_addr()?;
204
205        let (command_tx, command_rx) = mpsc::unbounded_channel();
206        let (event_tx, event_rx) = mpsc::unbounded_channel();
207
208        let driver = tokio::spawn(drive_loop(
209            socket,
210            config,
211            tick_period,
212            command_rx,
213            event_tx,
214        ));
215
216        Ok(Self {
217            handle: SoeHandle {
218                commands: command_tx,
219            },
220            events: event_rx,
221            local_addr,
222            driver,
223        })
224    }
225
226    /// Returns the local address the server is bound to.
227    pub fn local_addr(&self) -> SocketAddr {
228        self.local_addr
229    }
230
231    /// Returns a cloneable handle for sending commands to the server from any task.
232    pub fn handle(&self) -> SoeHandle {
233        self.handle.clone()
234    }
235
236    /// Awaits the next event from the driver loop, or `None` once the loop has
237    /// stopped.
238    pub async fn recv_event(&mut self) -> Option<SocketEvent<SocketAddr>> {
239        self.events.recv().await
240    }
241
242    /// Aborts the driver task, stopping the server.
243    pub fn abort(&self) {
244        self.driver.abort();
245    }
246}
247
248/// The actor driver loop: owns the socket and multiplexer, interleaving socket
249/// reads, periodic ticks, and commands from [`SoeHandle`]s, flushing outgoing
250/// datagrams and forwarding events after each cycle.
251async fn drive_loop(
252    socket: UdpSocket,
253    config: SocketConfig,
254    tick_period: Duration,
255    mut commands: mpsc::UnboundedReceiver<Command>,
256    events: mpsc::UnboundedSender<SocketEvent<SocketAddr>>,
257) {
258    let mut mux = SoeMultiplexer::new(config);
259    let mut tick = interval(tick_period);
260    tick.set_missed_tick_behavior(MissedTickBehavior::Delay);
261    let mut buf = vec![0u8; RECV_BUFFER_SIZE].into_boxed_slice();
262
263    loop {
264        tokio::select! {
265            result = socket.recv_from(&mut buf) => {
266                match result {
267                    Ok((len, from)) => {
268                        let datagram = Bytes::copy_from_slice(&buf[..len]);
269                        mux.process_incoming(from, datagram, Instant::now());
270                    }
271                    // A transient receive error (e.g. ICMP port-unreachable surfaced
272                    // on some platforms) shouldn't kill the server; skip and continue.
273                    Err(_) => continue,
274                }
275            }
276            _ = tick.tick() => {
277                mux.run_tick(Instant::now());
278            }
279            command = commands.recv() => {
280                match command {
281                    Some(Command::Connect(remote)) => mux.connect(remote, Instant::now()),
282                    Some(Command::EnqueueData { remote, data }) => {
283                        // Fire-and-forget: if no running session exists for `remote`
284                        // the data is dropped (the handle API is intentionally async
285                        // and can't synchronously report this).
286                        let _ = mux.enqueue_data(&remote, &data);
287                    }
288                    Some(Command::Terminate { remote, reason }) => {
289                        mux.terminate(&remote, reason, Instant::now());
290                    }
291                    // All handles dropped: nothing more can drive the server.
292                    None => break,
293                }
294            }
295        }
296
297        for (addr, datagram) in mux.take_outgoing() {
298            // A send failure for one datagram shouldn't tear down every session.
299            let _ = socket.send_to(&datagram, addr).await;
300        }
301        for event in mux.take_events() {
302            // The event receiver was dropped: no one is listening, so shut down.
303            if events.send(event).is_err() {
304                return;
305            }
306        }
307    }
308}