1use {
5 crate::SteamworksClient,
6 aeronet_io::{
7 AeronetIoPlugin, IoSystems, Session,
8 connection::{DisconnectReason, Disconnected, UNKNOWN_DISCONNECT_REASON},
9 packet::RecvPacket,
10 },
11 bevy_app::prelude::*,
12 bevy_ecs::prelude::*,
13 bevy_platform::time::Instant,
14 bytes::Bytes,
15 core::{any::type_name, num::Saturating},
16 derive_more::{Deref, DerefMut, Display, Error},
17 steamworks::{
18 networking_sockets::{NetConnection, NetPollGroup},
19 networking_types::{NetConnectionEnd, NetworkingConnectionState, SendFlags},
20 },
21 tracing::{debug, trace, trace_span, warn},
22};
23
24pub(crate) struct SteamNetSessionPlugin;
25
26impl Plugin for SteamNetSessionPlugin {
27 fn build(&self, app: &mut App) {
28 if !app.is_plugin_added::<AeronetIoPlugin>() {
29 app.add_plugins(AeronetIoPlugin);
30 }
31
32 app.add_systems(
40 PreUpdate,
41 (poll_io, poll_messages)
42 .in_set(IoSystems::Poll)
43 .run_if(resource_exists::<PollGroup>),
44 )
45 .add_systems(PostUpdate, flush.in_set(IoSystems::Flush))
46 .add_observer(init_io);
47 }
48}
49
50#[derive(Component)]
59pub struct SteamNetIo {
60 pub(crate) conn: NetConnection,
61 pub(crate) mtu: usize,
62}
63
64#[derive(Debug, Display, Error)]
66pub enum SessionError {
67 #[display("steam error")]
69 Steam,
70 #[display("backend closed")]
72 BackendClosed,
73 #[display("invalid connection")]
75 InvalidConnection,
76 #[display("problem detected locally")]
79 ProblemDetectedLocally,
80 #[display("connection ended: {_0:?}")]
82 Ended(#[error(ignore)] NetConnectionEnd),
83}
84
85#[derive(Deref, DerefMut, Resource)]
86struct PollGroup(NetPollGroup);
87
88fn init_io(
89 trigger: On<Add, SteamNetIo>,
90 steam: Option<Res<SteamworksClient>>,
91 io: Query<&SteamNetIo>,
92 poll_group: Option<Res<PollGroup>>,
93 mut commands: Commands,
94) {
95 let steam = steam.unwrap_or_else(|| {
96 panic!(
97 "`{}` must be present before creating a Steam IO layer",
98 type_name::<Res<SteamworksClient>>()
99 )
100 });
101
102 let entity = trigger.event_target();
103 let io = io
104 .get(entity)
105 .expect("we are adding this component to this entity");
106
107 if let Some(poll_group) = poll_group {
108 io.conn.set_poll_group(&poll_group);
109 } else {
110 steam
119 .networking_sockets()
120 .init_authentication()
121 .expect("failed to initialize steamworks authentication");
122
123 let poll_group = steam.networking_sockets().create_poll_group();
124 io.conn.set_poll_group(&poll_group);
125 commands.insert_resource(PollGroup(poll_group));
126 }
127}
128
129fn poll_io(
130 mut commands: Commands,
131 sessions: Query<(Entity, &SteamNetIo)>,
132 steam: Res<SteamworksClient>,
133) {
134 let sockets = steam.networking_sockets();
135 for (entity, io) in &sessions {
136 let Ok(info) = sockets.get_connection_info(&io.conn) else {
137 commands.trigger(Disconnected {
138 entity,
139 reason: DisconnectReason::by_error(SessionError::InvalidConnection),
140 });
141 continue;
142 };
143
144 if let Some(end_reason) = info.end_reason() {
145 let reason = match end_reason {
146 NetConnectionEnd::App(_) => DisconnectReason::by_peer(UNKNOWN_DISCONNECT_REASON),
147 reason => DisconnectReason::by_error(SessionError::Ended(reason)),
148 };
149 commands.trigger(Disconnected { entity, reason });
150 continue;
151 }
152
153 match info.state() {
154 Ok(NetworkingConnectionState::FindingRoute | NetworkingConnectionState::Connecting) => {
155 }
156 Ok(NetworkingConnectionState::Connected) => {
157 let mtu = io.mtu;
161 commands
162 .entity(entity)
163 .entry::<Session>()
164 .or_insert_with(move || Session::new(Instant::now(), mtu));
165 }
166 Ok(NetworkingConnectionState::ClosedByPeer) => {
167 commands.trigger(Disconnected {
168 entity,
169 reason: DisconnectReason::by_peer(UNKNOWN_DISCONNECT_REASON),
170 });
171 }
172 Ok(NetworkingConnectionState::None) | Err(_) => {
173 commands.trigger(Disconnected {
174 entity,
175 reason: DisconnectReason::by_error(SessionError::InvalidConnection),
176 });
177 }
178 Ok(NetworkingConnectionState::ProblemDetectedLocally) => {
179 commands.trigger(Disconnected {
180 entity,
181 reason: DisconnectReason::by_error(SessionError::ProblemDetectedLocally),
182 });
183 }
184 }
185 }
186}
187
188fn poll_messages(
189 io: Query<&SteamNetIo>,
190 mut clients: Query<&mut Session>,
191 mut poll_group: ResMut<PollGroup>,
192 mut commands: Commands,
193) {
194 const POLL_BATCH_SIZE: usize = 128;
195
196 let span = trace_span!("poll_messages");
197 let _span = span.enter();
198
199 let mut num_packets = Saturating(0);
200 let mut num_bytes = Saturating(0);
201 loop {
202 let packets = poll_group.receive_messages(POLL_BATCH_SIZE);
203 if packets.is_empty() {
204 break;
205 }
206
207 for packet in packets {
208 num_packets += 1;
209 num_bytes += packet.data().len();
210
211 let user_data = packet.connection_user_data();
212 let Some(entity) = user_data_to_entity(user_data) else {
213 warn!(
214 "Received message on connection with user data {user_data}, which does not \
215 map to a valid entity"
216 );
217 continue;
218 };
219 let io = match io.get(entity) {
220 Ok(io) => io,
221 Err(err) => {
222 warn!(
223 "Received message on {entity}, which does not have `{}`: {err:?}",
224 type_name::<SteamNetIo>()
225 );
226 continue;
227 }
228 };
229
230 let update_session = |session: &mut Session| {
231 let payload = Bytes::from(packet.data().to_vec());
255
256 session.stats.packets_recv += 1;
257 session.stats.bytes_recv += payload.len();
258 session.recv.push(RecvPacket {
259 recv_at: Instant::now(),
260 payload,
261 });
262 };
263
264 if let Ok(mut session) = clients.get_mut(entity) {
265 update_session(&mut session);
266 } else {
267 debug!(
268 "Received message on connection for {entity} before it has been marked as \
269 connected; will manually mark it as connected"
270 );
271
272 let mut session = Session::new(Instant::now(), io.mtu);
273 update_session(&mut session);
274 commands.entity(entity).insert(session);
275 }
276 }
277 }
278
279 if num_packets.0 > 0 {
280 trace!(%num_packets, %num_bytes, "Received packets");
281 }
282}
283
284fn flush(mut sessions: Query<(Entity, &mut Session, &SteamNetIo)>) {
285 for (entity, mut session, io) in &mut sessions {
286 let span = trace_span!("flush", %entity);
287 let _span = span.enter();
288
289 let session = &mut *session;
291 let mut num_packets = Saturating(0);
292 let mut num_bytes = Saturating(0);
293 for packet in session.send.drain(..) {
294 if packet.is_empty() {
295 continue;
301 }
302
303 num_packets += 1;
304 session.stats.packets_sent += 1;
305
306 num_bytes += packet.len();
307 session.stats.bytes_sent += packet.len();
308
309 _ = io
310 .conn
311 .send_message(&packet, SendFlags::UNRELIABLE | SendFlags::NO_NAGLE);
312 }
313
314 if num_packets.0 > 0 {
315 trace!(%num_packets, %num_bytes, "Flushed packets");
316 }
317 }
318}
319
320#[expect(
321 clippy::cast_possible_wrap,
322 reason = "we treat the entity as an opaque identifier"
323)]
324pub(crate) const fn entity_to_user_data(entity: Entity) -> i64 {
325 entity.to_bits() as i64
326}
327
328#[expect(
329 clippy::cast_sign_loss,
330 reason = "we treat this as an opaque identifier"
331)]
332pub(crate) const fn user_data_to_entity(user_data: i64) -> Option<Entity> {
333 Entity::try_from_bits(user_data as u64)
334}