1use std::{
2 fmt::Debug,
3 io::Cursor,
4 mem,
5 sync::{
6 Arc,
7 atomic::{self, AtomicBool},
8 },
9};
10
11use azalea_crypto::Aes128CfbEnc;
12use azalea_protocol::{
13 connect::{RawReadConnection, RawWriteConnection},
14 packets::{
15 ConnectionProtocol, Packet, ProtocolPacket, config::ClientboundConfigPacket,
16 game::ClientboundGamePacket, login::ClientboundLoginPacket,
17 },
18 read::{ReadPacketError, deserialize_packet},
19 write::serialize_packet,
20};
21use bevy_app::prelude::*;
22use bevy_ecs::prelude::*;
23use bevy_tasks::{IoTaskPool, futures_lite::future};
24use thiserror::Error;
25use tokio::{
26 io::AsyncWriteExt,
27 net::tcp::OwnedWriteHalf,
28 sync::mpsc::{self},
29};
30use tracing::{debug, error, info, trace};
31
32use super::packet::{
33 config::ReceiveConfigPacketEvent, game::ReceiveGamePacketEvent, login::ReceiveLoginPacketEvent,
34};
35use crate::packet::{config, game, login};
36
37pub struct ConnectionPlugin;
38impl Plugin for ConnectionPlugin {
39 fn build(&self, app: &mut App) {
40 app.add_systems(PreUpdate, (read_packets, poll_all_writer_tasks).chain());
41 }
42}
43
44pub fn read_packets(ecs: &mut World) {
45 let mut entity_and_conn_query = ecs.query::<(Entity, &mut RawConnection)>();
46 let mut conn_query = ecs.query::<&mut RawConnection>();
47
48 let mut entities_handling_packets = Vec::new();
49 let mut entities_with_injected_packets = Vec::new();
50 for (entity, mut raw_conn) in entity_and_conn_query.iter_mut(ecs) {
51 if !raw_conn.injected_clientbound_packets.is_empty() {
52 entities_with_injected_packets.push((
53 entity,
54 mem::take(&mut raw_conn.injected_clientbound_packets),
55 ));
56 }
57
58 if raw_conn.network.is_none() {
59 continue;
61 }
62
63 entities_handling_packets.push(entity);
64 }
65
66 let mut queued_packet_events = QueuedPacketEvents::default();
67
68 for (entity, raw_packets) in entities_with_injected_packets {
71 for raw_packet in raw_packets {
72 let conn = conn_query.get(ecs, entity).unwrap();
73 let state = conn.state;
74
75 trace!("Received injected packet with bytes: {raw_packet:?}");
76 if let Err(e) =
77 handle_raw_packet(ecs, &raw_packet, entity, state, &mut queued_packet_events)
78 {
79 error!("Error reading injected packet: {e}");
80 }
81 }
82 }
83
84 for entity in entities_handling_packets {
85 loop {
86 let mut conn = conn_query.get_mut(ecs, entity).unwrap();
87 let net_conn = conn.net_conn().unwrap();
88 let read_res = net_conn.reader.try_read();
89 let state = conn.state;
90 match read_res {
91 Ok(Some(raw_packet)) => {
92 let raw_packet = Arc::<[u8]>::from(raw_packet);
93 if let Err(e) = handle_raw_packet(
94 ecs,
95 &raw_packet,
96 entity,
97 state,
98 &mut queued_packet_events,
99 ) {
100 error!("Error reading packet: {e}");
101 }
102 }
103 Ok(None) => {
104 break;
106 }
107 Err(err) => {
108 log_for_error(&err);
109
110 if matches!(
111 &*err,
112 ReadPacketError::IoError { .. } | ReadPacketError::ConnectionClosed
113 ) {
114 info!("Server closed connection");
115 conn.network = None;
117 conn.is_alive = false;
119 }
120
121 break;
122 }
123 }
124 }
125 }
126
127 queued_packet_events.send_events(ecs);
128}
129
130fn poll_all_writer_tasks(mut conn_query: Query<&mut RawConnection>) {
131 for mut conn in conn_query.iter_mut() {
132 if let Some(net_conn) = &mut conn.network {
133 if net_conn.poll_writer().is_some() {
137 conn.network = None;
139 conn.is_alive = false;
140 }
141 }
142 }
143}
144
145#[derive(Default)]
146pub struct QueuedPacketEvents {
147 login: Vec<ReceiveLoginPacketEvent>,
148 config: Vec<ReceiveConfigPacketEvent>,
149 game: Vec<ReceiveGamePacketEvent>,
150}
151impl QueuedPacketEvents {
152 fn send_events(&mut self, ecs: &mut World) {
153 ecs.send_event_batch(self.login.drain(..));
154 ecs.send_event_batch(self.config.drain(..));
155 ecs.send_event_batch(self.game.drain(..));
156 }
157}
158
159fn log_for_error(error: &ReadPacketError) {
160 if !matches!(*error, ReadPacketError::ConnectionClosed) {
161 error!("Error reading packet from Client: {error:?}");
162 }
163}
164
165#[derive(Component)]
167pub struct RawConnection {
168 pub(crate) network: Option<NetworkConnection>,
181 pub state: ConnectionProtocol,
182 pub(crate) is_alive: bool,
183
184 pub injected_clientbound_packets: Vec<Box<[u8]>>,
189}
190impl RawConnection {
191 pub fn new(
192 reader: RawReadConnection,
193 writer: RawWriteConnection,
194 state: ConnectionProtocol,
195 ) -> Self {
196 let task_pool = IoTaskPool::get();
197
198 let (network_packet_writer_tx, network_packet_writer_rx) =
199 mpsc::unbounded_channel::<Box<[u8]>>();
200
201 let writer_task =
202 task_pool.spawn(write_task(network_packet_writer_rx, writer.write_stream));
203
204 let mut conn = Self::new_networkless(state);
205 conn.network = Some(NetworkConnection {
206 reader,
207 enc_cipher: writer.enc_cipher,
208 network_packet_writer_tx,
209 writer_task,
210 });
211
212 conn
213 }
214
215 pub fn new_networkless(state: ConnectionProtocol) -> Self {
216 Self {
217 network: None,
218 state,
219 is_alive: true,
220 injected_clientbound_packets: Vec::new(),
221 }
222 }
223
224 pub fn is_alive(&self) -> bool {
225 self.is_alive
226 }
227
228 pub fn write<P: ProtocolPacket + Debug>(
237 &mut self,
238 packet: impl Packet<P>,
239 ) -> Result<(), WritePacketError> {
240 if let Some(network) = &mut self.network {
241 network.write(packet)?;
242 } else {
243 static WARNED: AtomicBool = AtomicBool::new(false);
244 if !WARNED.swap(true, atomic::Ordering::Relaxed) {
245 debug!(
246 "tried to write packet to the network but there is no NetworkConnection. if you're trying to send a packet from the handler function, use self.write instead"
247 );
248 }
249 }
250 Ok(())
251 }
252
253 pub fn net_conn(&mut self) -> Option<&mut NetworkConnection> {
254 self.network.as_mut()
255 }
256}
257
258pub fn handle_raw_packet(
259 ecs: &mut World,
260 raw_packet: &[u8],
261 entity: Entity,
262 state: ConnectionProtocol,
263 queued_packet_events: &mut QueuedPacketEvents,
264) -> Result<(), Box<ReadPacketError>> {
265 let stream = &mut Cursor::new(raw_packet);
266 match state {
267 ConnectionProtocol::Handshake => {
268 unreachable!()
269 }
270 ConnectionProtocol::Game => {
271 let packet = Arc::new(deserialize_packet::<ClientboundGamePacket>(stream)?);
272 trace!("Packet: {packet:?}");
273 game::process_packet(ecs, entity, packet.as_ref());
274 queued_packet_events
275 .game
276 .push(ReceiveGamePacketEvent { entity, packet });
277 }
278 ConnectionProtocol::Status => {
279 unreachable!()
280 }
281 ConnectionProtocol::Login => {
282 let packet = Arc::new(deserialize_packet::<ClientboundLoginPacket>(stream)?);
283 trace!("Packet: {packet:?}");
284 login::process_packet(ecs, entity, &packet);
285 queued_packet_events
286 .login
287 .push(ReceiveLoginPacketEvent { entity, packet });
288 }
289 ConnectionProtocol::Configuration => {
290 let packet = Arc::new(deserialize_packet::<ClientboundConfigPacket>(stream)?);
291 trace!("Packet: {packet:?}");
292 config::process_packet(ecs, entity, &packet);
293 queued_packet_events
294 .config
295 .push(ReceiveConfigPacketEvent { entity, packet });
296 }
297 };
298
299 Ok(())
300}
301
302pub struct NetworkConnection {
303 reader: RawReadConnection,
304 pub enc_cipher: Option<Aes128CfbEnc>,
306
307 pub writer_task: bevy_tasks::Task<()>,
308 network_packet_writer_tx: mpsc::UnboundedSender<Box<[u8]>>,
312}
313impl NetworkConnection {
314 pub fn write<P: ProtocolPacket + Debug>(
315 &mut self,
316 packet: impl Packet<P>,
317 ) -> Result<(), WritePacketError> {
318 let packet = packet.into_variant();
319 let raw_packet = serialize_packet(&packet)?;
320 self.write_raw(&raw_packet)?;
321
322 Ok(())
323 }
324
325 pub fn write_raw(&mut self, raw_packet: &[u8]) -> Result<(), WritePacketError> {
326 let network_packet = azalea_protocol::write::encode_to_network_packet(
327 raw_packet,
328 self.reader.compression_threshold,
329 &mut self.enc_cipher,
330 );
331 self.network_packet_writer_tx
332 .send(network_packet.into_boxed_slice())?;
333 Ok(())
334 }
335
336 pub fn poll_writer(&mut self) -> Option<()> {
339 let poll_once_res = future::poll_once(&mut self.writer_task);
340 future::block_on(poll_once_res)
341 }
342
343 pub fn set_compression_threshold(&mut self, threshold: Option<u32>) {
344 trace!("Set compression threshold to {threshold:?}");
345 self.reader.compression_threshold = threshold;
346 }
347 pub fn set_encryption_key(&mut self, key: [u8; 16]) {
350 trace!("Enabled protocol encryption");
351 let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
352 self.reader.dec_cipher = Some(dec_cipher);
353 self.enc_cipher = Some(enc_cipher);
354 }
355}
356
357async fn write_task(
358 mut network_packet_writer_rx: mpsc::UnboundedReceiver<Box<[u8]>>,
359 mut write_half: OwnedWriteHalf,
360) {
361 while let Some(network_packet) = network_packet_writer_rx.recv().await {
362 if let Err(e) = write_half.write_all(&network_packet).await {
363 debug!("Error writing packet to server: {e}");
364 break;
365 };
366 }
367
368 trace!("write task is done");
369}
370
371#[derive(Error, Debug)]
372pub enum WritePacketError {
373 #[error("Wrong protocol state: expected {expected:?}, got {got:?}")]
374 WrongState {
375 expected: ConnectionProtocol,
376 got: ConnectionProtocol,
377 },
378 #[error(transparent)]
379 Encoding(#[from] azalea_protocol::write::PacketEncodeError),
380 #[error(transparent)]
381 SendError {
382 #[from]
383 #[backtrace]
384 source: mpsc::error::SendError<Box<[u8]>>,
385 },
386}