azalea_client/plugins/
connection.rs

1use std::{
2    fmt::Debug,
3    io::Cursor,
4    mem,
5    sync::{
6        Arc,
7        atomic::{self, AtomicBool},
8    },
9};
10
11use azalea_crypto::Aes128CfbEnc;
12use azalea_protocol::{
13    connect::{RawReadConnection, RawWriteConnection},
14    packets::{
15        ConnectionProtocol, Packet, ProtocolPacket, config::ClientboundConfigPacket,
16        game::ClientboundGamePacket, login::ClientboundLoginPacket,
17    },
18    read::{ReadPacketError, deserialize_packet},
19    write::serialize_packet,
20};
21use bevy_app::prelude::*;
22use bevy_ecs::prelude::*;
23use bevy_tasks::{IoTaskPool, futures_lite::future};
24use thiserror::Error;
25use tokio::{
26    io::AsyncWriteExt,
27    net::tcp::OwnedWriteHalf,
28    sync::mpsc::{self},
29};
30use tracing::{debug, error, info, trace};
31
32use super::packet::{
33    config::ReceiveConfigPacketEvent, game::ReceiveGamePacketEvent, login::ReceiveLoginPacketEvent,
34};
35use crate::packet::{config, game, login};
36
37pub struct ConnectionPlugin;
38impl Plugin for ConnectionPlugin {
39    fn build(&self, app: &mut App) {
40        app.add_systems(PreUpdate, (read_packets, poll_all_writer_tasks).chain());
41    }
42}
43
44pub fn read_packets(ecs: &mut World) {
45    let mut entity_and_conn_query = ecs.query::<(Entity, &mut RawConnection)>();
46    let mut conn_query = ecs.query::<&mut RawConnection>();
47
48    let mut entities_handling_packets = Vec::new();
49    let mut entities_with_injected_packets = Vec::new();
50    for (entity, mut raw_conn) in entity_and_conn_query.iter_mut(ecs) {
51        if !raw_conn.injected_clientbound_packets.is_empty() {
52            entities_with_injected_packets.push((
53                entity,
54                mem::take(&mut raw_conn.injected_clientbound_packets),
55            ));
56        }
57
58        if raw_conn.network.is_none() {
59            // no network connection, don't bother with the normal packet handling
60            continue;
61        }
62
63        entities_handling_packets.push(entity);
64    }
65
66    let mut queued_packet_events = QueuedPacketEvents::default();
67
68    // handle injected packets, see the comment on
69    // RawConnection::injected_clientbound_packets for more info
70    for (entity, raw_packets) in entities_with_injected_packets {
71        for raw_packet in raw_packets {
72            let conn = conn_query.get(ecs, entity).unwrap();
73            let state = conn.state;
74
75            trace!("Received injected packet with bytes: {raw_packet:?}");
76            if let Err(e) =
77                handle_raw_packet(ecs, &raw_packet, entity, state, &mut queued_packet_events)
78            {
79                error!("Error reading injected packet: {e}");
80            }
81        }
82    }
83
84    for entity in entities_handling_packets {
85        loop {
86            let mut conn = conn_query.get_mut(ecs, entity).unwrap();
87            let net_conn = conn.net_conn().unwrap();
88            let read_res = net_conn.reader.try_read();
89            let state = conn.state;
90            match read_res {
91                Ok(Some(raw_packet)) => {
92                    let raw_packet = Arc::<[u8]>::from(raw_packet);
93                    if let Err(e) = handle_raw_packet(
94                        ecs,
95                        &raw_packet,
96                        entity,
97                        state,
98                        &mut queued_packet_events,
99                    ) {
100                        error!("Error reading packet: {e}");
101                    }
102                }
103                Ok(None) => {
104                    // no packets available
105                    break;
106                }
107                Err(err) => {
108                    log_for_error(&err);
109
110                    if matches!(
111                        &*err,
112                        ReadPacketError::IoError { .. } | ReadPacketError::ConnectionClosed
113                    ) {
114                        info!("Server closed connection");
115                        // ungraceful disconnect :(
116                        conn.network = None;
117                        // setting this will make us send a DisconnectEvent
118                        conn.is_alive = false;
119                    }
120
121                    break;
122                }
123            }
124        }
125    }
126
127    queued_packet_events.send_events(ecs);
128}
129
130fn poll_all_writer_tasks(mut conn_query: Query<&mut RawConnection>) {
131    for mut conn in conn_query.iter_mut() {
132        if let Some(net_conn) = &mut conn.network {
133            // this needs to be done at some point every update to make sure packets are
134            // actually sent to the network
135
136            if net_conn.poll_writer().is_some() {
137                // means the writer task ended
138                conn.network = None;
139                conn.is_alive = false;
140            }
141        }
142    }
143}
144
145#[derive(Default)]
146pub struct QueuedPacketEvents {
147    login: Vec<ReceiveLoginPacketEvent>,
148    config: Vec<ReceiveConfigPacketEvent>,
149    game: Vec<ReceiveGamePacketEvent>,
150}
151impl QueuedPacketEvents {
152    fn send_events(&mut self, ecs: &mut World) {
153        ecs.send_event_batch(self.login.drain(..));
154        ecs.send_event_batch(self.config.drain(..));
155        ecs.send_event_batch(self.game.drain(..));
156    }
157}
158
159fn log_for_error(error: &ReadPacketError) {
160    if !matches!(*error, ReadPacketError::ConnectionClosed) {
161        error!("Error reading packet from Client: {error:?}");
162    }
163}
164
165/// The client's connection to the server.
166#[derive(Component)]
167pub struct RawConnection {
168    /// The network connection to the server.
169    ///
170    /// This isn't guaranteed to be present, for example during the main packet
171    /// handlers or at all times during tests.
172    ///
173    /// You shouldn't rely on this. Instead, use the events for sending packets
174    /// like [`SendPacketEvent`](crate::packet::game::SendPacketEvent) /
175    /// [`SendConfigPacketEvent`](crate::packet::config::SendConfigPacketEvent)
176    /// / [`SendLoginPacketEvent`](crate::packet::login::SendLoginPacketEvent).
177    ///
178    /// To check if we haven't disconnected from the server, use
179    /// [`Self::is_alive`].
180    pub(crate) network: Option<NetworkConnection>,
181    pub state: ConnectionProtocol,
182    pub(crate) is_alive: bool,
183
184    /// This exists for internal testing purposes and probably shouldn't be used
185    /// for normal bots. It's basically a way to make our client think it
186    /// received a packet from the server without needing to interact with the
187    /// network.
188    pub injected_clientbound_packets: Vec<Box<[u8]>>,
189}
190impl RawConnection {
191    pub fn new(
192        reader: RawReadConnection,
193        writer: RawWriteConnection,
194        state: ConnectionProtocol,
195    ) -> Self {
196        let task_pool = IoTaskPool::get();
197
198        let (network_packet_writer_tx, network_packet_writer_rx) =
199            mpsc::unbounded_channel::<Box<[u8]>>();
200
201        let writer_task =
202            task_pool.spawn(write_task(network_packet_writer_rx, writer.write_stream));
203
204        let mut conn = Self::new_networkless(state);
205        conn.network = Some(NetworkConnection {
206            reader,
207            enc_cipher: writer.enc_cipher,
208            network_packet_writer_tx,
209            writer_task,
210        });
211
212        conn
213    }
214
215    pub fn new_networkless(state: ConnectionProtocol) -> Self {
216        Self {
217            network: None,
218            state,
219            is_alive: true,
220            injected_clientbound_packets: Vec::new(),
221        }
222    }
223
224    pub fn is_alive(&self) -> bool {
225        self.is_alive
226    }
227
228    /// Write a packet to the server without emitting any events.
229    ///
230    /// This is called by the handlers for [`SendPacketEvent`],
231    /// [`SendConfigPacketEvent`], and [`SendLoginPacketEvent`].
232    ///
233    /// [`SendPacketEvent`]: crate::packet::game::SendPacketEvent
234    /// [`SendConfigPacketEvent`]: crate::packet::config::SendConfigPacketEvent
235    /// [`SendLoginPacketEvent`]: crate::packet::login::SendLoginPacketEvent
236    pub fn write<P: ProtocolPacket + Debug>(
237        &mut self,
238        packet: impl Packet<P>,
239    ) -> Result<(), WritePacketError> {
240        if let Some(network) = &mut self.network {
241            network.write(packet)?;
242        } else {
243            static WARNED: AtomicBool = AtomicBool::new(false);
244            if !WARNED.swap(true, atomic::Ordering::Relaxed) {
245                debug!(
246                    "tried to write packet to the network but there is no NetworkConnection. if you're trying to send a packet from the handler function, use self.write instead"
247                );
248            }
249        }
250        Ok(())
251    }
252
253    pub fn net_conn(&mut self) -> Option<&mut NetworkConnection> {
254        self.network.as_mut()
255    }
256}
257
258pub fn handle_raw_packet(
259    ecs: &mut World,
260    raw_packet: &[u8],
261    entity: Entity,
262    state: ConnectionProtocol,
263    queued_packet_events: &mut QueuedPacketEvents,
264) -> Result<(), Box<ReadPacketError>> {
265    let stream = &mut Cursor::new(raw_packet);
266    match state {
267        ConnectionProtocol::Handshake => {
268            unreachable!()
269        }
270        ConnectionProtocol::Game => {
271            let packet = Arc::new(deserialize_packet::<ClientboundGamePacket>(stream)?);
272            trace!("Packet: {packet:?}");
273            game::process_packet(ecs, entity, packet.as_ref());
274            queued_packet_events
275                .game
276                .push(ReceiveGamePacketEvent { entity, packet });
277        }
278        ConnectionProtocol::Status => {
279            unreachable!()
280        }
281        ConnectionProtocol::Login => {
282            let packet = Arc::new(deserialize_packet::<ClientboundLoginPacket>(stream)?);
283            trace!("Packet: {packet:?}");
284            login::process_packet(ecs, entity, &packet);
285            queued_packet_events
286                .login
287                .push(ReceiveLoginPacketEvent { entity, packet });
288        }
289        ConnectionProtocol::Configuration => {
290            let packet = Arc::new(deserialize_packet::<ClientboundConfigPacket>(stream)?);
291            trace!("Packet: {packet:?}");
292            config::process_packet(ecs, entity, &packet);
293            queued_packet_events
294                .config
295                .push(ReceiveConfigPacketEvent { entity, packet });
296        }
297    };
298
299    Ok(())
300}
301
302pub struct NetworkConnection {
303    reader: RawReadConnection,
304    // compression threshold is in the RawReadConnection
305    pub enc_cipher: Option<Aes128CfbEnc>,
306
307    pub writer_task: bevy_tasks::Task<()>,
308    /// A queue of raw TCP packets to send. These will not be modified further,
309    /// they should already be serialized and encrypted and everything before
310    /// being added here.
311    network_packet_writer_tx: mpsc::UnboundedSender<Box<[u8]>>,
312}
313impl NetworkConnection {
314    pub fn write<P: ProtocolPacket + Debug>(
315        &mut self,
316        packet: impl Packet<P>,
317    ) -> Result<(), WritePacketError> {
318        let packet = packet.into_variant();
319        let raw_packet = serialize_packet(&packet)?;
320        self.write_raw(&raw_packet)?;
321
322        Ok(())
323    }
324
325    pub fn write_raw(&mut self, raw_packet: &[u8]) -> Result<(), WritePacketError> {
326        let network_packet = azalea_protocol::write::encode_to_network_packet(
327            raw_packet,
328            self.reader.compression_threshold,
329            &mut self.enc_cipher,
330        );
331        self.network_packet_writer_tx
332            .send(network_packet.into_boxed_slice())?;
333        Ok(())
334    }
335
336    /// Makes sure packets get sent and returns Some(()) if the connection has
337    /// closed.
338    pub fn poll_writer(&mut self) -> Option<()> {
339        let poll_once_res = future::poll_once(&mut self.writer_task);
340        future::block_on(poll_once_res)
341    }
342
343    pub fn set_compression_threshold(&mut self, threshold: Option<u32>) {
344        trace!("Set compression threshold to {threshold:?}");
345        self.reader.compression_threshold = threshold;
346    }
347    /// Set the encryption key that is used to encrypt and decrypt packets. It's
348    /// the same for both reading and writing.
349    pub fn set_encryption_key(&mut self, key: [u8; 16]) {
350        trace!("Enabled protocol encryption");
351        let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
352        self.reader.dec_cipher = Some(dec_cipher);
353        self.enc_cipher = Some(enc_cipher);
354    }
355}
356
357async fn write_task(
358    mut network_packet_writer_rx: mpsc::UnboundedReceiver<Box<[u8]>>,
359    mut write_half: OwnedWriteHalf,
360) {
361    while let Some(network_packet) = network_packet_writer_rx.recv().await {
362        if let Err(e) = write_half.write_all(&network_packet).await {
363            debug!("Error writing packet to server: {e}");
364            break;
365        };
366    }
367
368    trace!("write task is done");
369}
370
371#[derive(Error, Debug)]
372pub enum WritePacketError {
373    #[error("Wrong protocol state: expected {expected:?}, got {got:?}")]
374    WrongState {
375        expected: ConnectionProtocol,
376        got: ConnectionProtocol,
377    },
378    #[error(transparent)]
379    Encoding(#[from] azalea_protocol::write::PacketEncodeError),
380    #[error(transparent)]
381    SendError {
382        #[from]
383        #[backtrace]
384        source: mpsc::error::SendError<Box<[u8]>>,
385    },
386}