Skip to main content

aeronet_steam/
session.rs

1//! Implementation for Steam networking sessions, shared between clients and
2//! servers.
3
4use {
5    crate::SteamworksClient,
6    aeronet_io::{
7        AeronetIoPlugin, IoSystems, Session,
8        connection::{DisconnectReason, Disconnected, UNKNOWN_DISCONNECT_REASON},
9        packet::RecvPacket,
10    },
11    bevy_app::prelude::*,
12    bevy_ecs::prelude::*,
13    bevy_platform::time::Instant,
14    bytes::Bytes,
15    core::{any::type_name, num::Saturating},
16    derive_more::{Deref, DerefMut, Display, Error},
17    steamworks::{
18        networking_sockets::{NetConnection, NetPollGroup},
19        networking_types::{NetConnectionEnd, NetworkingConnectionState, SendFlags},
20    },
21    tracing::{debug, trace, trace_span, warn},
22};
23
24pub(crate) struct SteamNetSessionPlugin;
25
26impl Plugin for SteamNetSessionPlugin {
27    fn build(&self, app: &mut App) {
28        if !app.is_plugin_added::<AeronetIoPlugin>() {
29            app.add_plugins(AeronetIoPlugin);
30        }
31
32        // We don't do Steam init like `init_authentication` or poll groups here
33        // because this plugin just adds the *capability* of using Steam IO,
34        // but we don't know for sure at runtime if we'll be using Steam here.
35        // We leave that up to the user: if they decide to spawn an entity with
36        // a `SteamNetIo`, only *then* do we do Steam init (and thus assume that
37        // a Steam client is running on the user's machine).
38
39        app.add_systems(
40            PreUpdate,
41            (poll_io, poll_messages)
42                .in_set(IoSystems::Poll)
43                .run_if(resource_exists::<PollGroup>),
44        )
45        .add_systems(PostUpdate, flush.in_set(IoSystems::Flush))
46        .add_observer(init_io);
47    }
48}
49
50/// Manages a Steam networking session's connection.
51///
52/// This may represent either an outgoing client connection (this session is
53/// connecting to a server), or an incoming client connection (this session is
54/// a child of a server that the user has spawned).
55///
56/// You should not add or remove this component directly - it is managed
57/// entirely by the client and server implementations.
58#[derive(Component)]
59pub struct SteamNetIo {
60    pub(crate) conn: NetConnection,
61    pub(crate) mtu: usize,
62}
63
64/// Error that occurs when polling a session using the [`SteamNetIo`] IO layer.
65#[derive(Debug, Display, Error)]
66pub enum SessionError {
67    /// Internal Steamworks SDK error occurred.
68    #[display("steam error")]
69    Steam,
70    /// Backend task was unexpectedly cancelled.
71    #[display("backend closed")]
72    BackendClosed,
73    /// Connection is no longer valid under the Steamworks API.
74    #[display("invalid connection")]
75    InvalidConnection,
76    /// Problem has been detected locally, i.e. a timeout, network connection
77    /// lost, etc.
78    #[display("problem detected locally")]
79    ProblemDetectedLocally,
80    /// Connection ended.
81    #[display("connection ended: {_0:?}")]
82    Ended(#[error(ignore)] NetConnectionEnd),
83}
84
85#[derive(Deref, DerefMut, Resource)]
86struct PollGroup(NetPollGroup);
87
88fn init_io(
89    trigger: On<Add, SteamNetIo>,
90    steam: Option<Res<SteamworksClient>>,
91    io: Query<&SteamNetIo>,
92    poll_group: Option<Res<PollGroup>>,
93    mut commands: Commands,
94) {
95    let steam = steam.unwrap_or_else(|| {
96        panic!(
97            "`{}` must be present before creating a Steam IO layer",
98            type_name::<Res<SteamworksClient>>()
99        )
100    });
101
102    let entity = trigger.event_target();
103    let io = io
104        .get(entity)
105        .expect("we are adding this component to this entity");
106
107    if let Some(poll_group) = poll_group {
108        io.conn.set_poll_group(&poll_group);
109    } else {
110        // we combine 2 steps into one "global steam init" step:
111        // - init authentication
112        // - create poll group
113        //
114        // this means that if the poll group is removed, we re-init authentication
115        // but this should be fine, I think
116
117        // https://github.com/cBournhonesque/lightyear/issues/243
118        steam
119            .networking_sockets()
120            .init_authentication()
121            .expect("failed to initialize steamworks authentication");
122
123        let poll_group = steam.networking_sockets().create_poll_group();
124        io.conn.set_poll_group(&poll_group);
125        commands.insert_resource(PollGroup(poll_group));
126    }
127}
128
129fn poll_io(
130    mut commands: Commands,
131    sessions: Query<(Entity, &SteamNetIo)>,
132    steam: Res<SteamworksClient>,
133) {
134    let sockets = steam.networking_sockets();
135    for (entity, io) in &sessions {
136        let Ok(info) = sockets.get_connection_info(&io.conn) else {
137            commands.trigger(Disconnected {
138                entity,
139                reason: DisconnectReason::by_error(SessionError::InvalidConnection),
140            });
141            continue;
142        };
143
144        if let Some(end_reason) = info.end_reason() {
145            let reason = match end_reason {
146                NetConnectionEnd::App(_) => DisconnectReason::by_peer(UNKNOWN_DISCONNECT_REASON),
147                reason => DisconnectReason::by_error(SessionError::Ended(reason)),
148            };
149            commands.trigger(Disconnected { entity, reason });
150            continue;
151        }
152
153        match info.state() {
154            Ok(NetworkingConnectionState::FindingRoute | NetworkingConnectionState::Connecting) => {
155            }
156            Ok(NetworkingConnectionState::Connected) => {
157                // make sure we don't replace any existing session
158                // since `Connected` could theoretically be called twice,
159                // and we may make a `Session` manually *before* receiving this event
160                let mtu = io.mtu;
161                commands
162                    .entity(entity)
163                    .entry::<Session>()
164                    .or_insert_with(move || Session::new(Instant::now(), mtu));
165            }
166            Ok(NetworkingConnectionState::ClosedByPeer) => {
167                commands.trigger(Disconnected {
168                    entity,
169                    reason: DisconnectReason::by_peer(UNKNOWN_DISCONNECT_REASON),
170                });
171            }
172            Ok(NetworkingConnectionState::None) | Err(_) => {
173                commands.trigger(Disconnected {
174                    entity,
175                    reason: DisconnectReason::by_error(SessionError::InvalidConnection),
176                });
177            }
178            Ok(NetworkingConnectionState::ProblemDetectedLocally) => {
179                commands.trigger(Disconnected {
180                    entity,
181                    reason: DisconnectReason::by_error(SessionError::ProblemDetectedLocally),
182                });
183            }
184        }
185    }
186}
187
188fn poll_messages(
189    io: Query<&SteamNetIo>,
190    mut clients: Query<&mut Session>,
191    mut poll_group: ResMut<PollGroup>,
192    mut commands: Commands,
193) {
194    const POLL_BATCH_SIZE: usize = 128;
195
196    let span = trace_span!("poll_messages");
197    let _span = span.enter();
198
199    let mut num_packets = Saturating(0);
200    let mut num_bytes = Saturating(0);
201    loop {
202        let packets = poll_group.receive_messages(POLL_BATCH_SIZE);
203        if packets.is_empty() {
204            break;
205        }
206
207        for packet in packets {
208            num_packets += 1;
209            num_bytes += packet.data().len();
210
211            let user_data = packet.connection_user_data();
212            let Some(entity) = user_data_to_entity(user_data) else {
213                warn!(
214                    "Received message on connection with user data {user_data}, which does not \
215                     map to a valid entity"
216                );
217                continue;
218            };
219            let io = match io.get(entity) {
220                Ok(io) => io,
221                Err(err) => {
222                    warn!(
223                        "Received message on {entity}, which does not have `{}`: {err:?}",
224                        type_name::<SteamNetIo>()
225                    );
226                    continue;
227                }
228            };
229
230            let update_session = |session: &mut Session| {
231                // !!! TODO: THIS IS REALLY REALLY BAD !!!
232                //
233                // From `steamworks-rs`'s `packet.data()`:
234                //
235                //     pub fn data(&self) -> &[u8] {
236                //         unsafe {
237                //             std::slice::from_raw_parts(
238                //                 (*self.message).m_pData as _,
239                //                 (*self.message).m_cbSize as usize,
240                //             )
241                //         }
242                //     }
243                //
244                // This code is UNSOUND, because the message is of length 0,
245                // this panics due to debug assertions in `std`
246                // (and in release, will fail silently, causing memory unsafety!)
247                //
248                // `steamworks-rs` maintainer is unresponsive, and there hasn't been an update
249                // in a long time (as of 28 Mar 2025). We should make a `steam-sockets` crate
250                // which provides bindings for only the Steam socket functionality, and irons
251                // out all of the issues of `steamworks-rs`.
252                //
253                // This would also let us fix a bunch of other miscellaneous issues.
254                let payload = Bytes::from(packet.data().to_vec());
255
256                session.stats.packets_recv += 1;
257                session.stats.bytes_recv += payload.len();
258                session.recv.push(RecvPacket {
259                    recv_at: Instant::now(),
260                    payload,
261                });
262            };
263
264            if let Ok(mut session) = clients.get_mut(entity) {
265                update_session(&mut session);
266            } else {
267                debug!(
268                    "Received message on connection for {entity} before it has been marked as \
269                     connected; will manually mark it as connected"
270                );
271
272                let mut session = Session::new(Instant::now(), io.mtu);
273                update_session(&mut session);
274                commands.entity(entity).insert(session);
275            }
276        }
277    }
278
279    if num_packets.0 > 0 {
280        trace!(%num_packets, %num_bytes, "Received packets");
281    }
282}
283
284fn flush(mut sessions: Query<(Entity, &mut Session, &SteamNetIo)>) {
285    for (entity, mut session, io) in &mut sessions {
286        let span = trace_span!("flush", %entity);
287        let _span = span.enter();
288
289        // explicit deref so we can access disjoint fields
290        let session = &mut *session;
291        let mut num_packets = Saturating(0);
292        let mut num_bytes = Saturating(0);
293        for packet in session.send.drain(..) {
294            if packet.is_empty() {
295                // See the big scary safety comment in `poll_messages`
296                // for why we don't allow sending empty messages.
297                //
298                // Note: a malicious client can still screw up our code
299                // by manually sending an empty message!
300                continue;
301            }
302
303            num_packets += 1;
304            session.stats.packets_sent += 1;
305
306            num_bytes += packet.len();
307            session.stats.bytes_sent += packet.len();
308
309            _ = io
310                .conn
311                .send_message(&packet, SendFlags::UNRELIABLE | SendFlags::NO_NAGLE);
312        }
313
314        if num_packets.0 > 0 {
315            trace!(%num_packets, %num_bytes, "Flushed packets");
316        }
317    }
318}
319
320#[expect(
321    clippy::cast_possible_wrap,
322    reason = "we treat the entity as an opaque identifier"
323)]
324pub(crate) const fn entity_to_user_data(entity: Entity) -> i64 {
325    entity.to_bits() as i64
326}
327
328#[expect(
329    clippy::cast_sign_loss,
330    reason = "we treat this as an opaque identifier"
331)]
332pub(crate) const fn user_data_to_entity(user_data: i64) -> Option<Entity> {
333    Entity::try_from_bits(user_data as u64)
334}