azalea_client/plugins/
join.rs1use std::sync::Arc;
2
3use azalea_entity::{LocalEntity, indexing::EntityUuidIndex};
4use azalea_protocol::{
5 address::ResolvedAddr,
6 common::client_information::ClientInformation,
7 connect::{Connection, ConnectionError, Proxy},
8 packets::{
9 ClientIntention, ConnectionProtocol, PROTOCOL_VERSION,
10 handshake::ServerboundIntention,
11 login::{ClientboundLoginPacket, ServerboundHello, ServerboundLoginPacket},
12 },
13};
14use azalea_world::Instance;
15use bevy_app::prelude::*;
16use bevy_ecs::prelude::*;
17use bevy_tasks::{IoTaskPool, Task, futures_lite::future};
18use parking_lot::RwLock;
19use tokio::sync::mpsc;
20use tracing::{debug, warn};
21
22use super::events::LocalPlayerEvents;
23use crate::{
24 Account, LocalPlayerBundle,
25 connection::RawConnection,
26 packet::login::{InLoginState, SendLoginPacketEvent},
27};
28
29pub struct JoinPlugin;
31impl Plugin for JoinPlugin {
32 fn build(&self, app: &mut App) {
33 app.add_message::<StartJoinServerEvent>()
34 .add_message::<ConnectionFailedEvent>()
35 .add_systems(
36 Update,
37 (
38 handle_start_join_server_event.before(super::login::poll_auth_task),
39 poll_create_connection_task,
40 )
41 .chain(),
42 );
43 }
44}
45
46#[derive(Debug, Message)]
51pub struct StartJoinServerEvent {
52 pub account: Account,
53 pub connect_opts: ConnectOpts,
54 pub event_sender: Option<mpsc::UnboundedSender<crate::Event>>,
55
56 pub start_join_callback_tx: Option<mpsc::UnboundedSender<Entity>>,
58}
59
60#[derive(Clone, Component, Debug)]
65pub struct ConnectOpts {
66 pub address: ResolvedAddr,
67 pub server_proxy: Option<Proxy>,
69 pub sessionserver_proxy: Option<Proxy>,
78}
79
80#[derive(Message)]
87pub struct ConnectionFailedEvent {
88 pub entity: Entity,
89 pub error: ConnectionError,
90}
91
92pub fn handle_start_join_server_event(
93 mut commands: Commands,
94 mut events: MessageReader<StartJoinServerEvent>,
95 mut entity_uuid_index: ResMut<EntityUuidIndex>,
96 connection_query: Query<&RawConnection>,
97) {
98 for event in events.read() {
99 let uuid = event.account.uuid_or_offline();
100 let entity = if let Some(entity) = entity_uuid_index.get(&uuid) {
101 debug!("Reusing entity {entity:?} for client");
102
103 if let Ok(conn) = connection_query.get(entity)
105 && conn.is_alive()
106 {
107 if let Some(start_join_callback_tx) = &event.start_join_callback_tx {
108 warn!(
109 "Received StartJoinServerEvent for {entity:?} but it's already connected. Ignoring the event but replying with Ok."
110 );
111 let _ = start_join_callback_tx.send(entity);
112 } else {
113 warn!(
114 "Received StartJoinServerEvent for {entity:?} but it's already connected. Ignoring the event."
115 );
116 }
117 return;
118 }
119
120 entity
121 } else {
122 let entity = commands.spawn_empty().id();
123 debug!("Created new entity {entity:?} for client");
124 entity_uuid_index.insert(uuid, entity);
126 entity
127 };
128
129 if let Some(start_join_callback) = &event.start_join_callback_tx {
130 let _ = start_join_callback.send(entity);
131 }
132
133 let mut entity_mut = commands.entity(entity);
134
135 entity_mut.insert((
136 event.account.to_owned(),
138 LocalEntity,
141 ClientInformation::default(),
143 event.connect_opts.clone(),
145 ));
149
150 if let Some(event_sender) = &event.event_sender {
151 entity_mut.insert(LocalPlayerEvents(event_sender.clone()));
154 }
155
156 let task_pool = IoTaskPool::get();
157 let connect_opts = event.connect_opts.clone();
158 let task = task_pool.spawn(async_compat::Compat::new(
159 create_conn_and_send_intention_packet(connect_opts),
160 ));
161
162 entity_mut.insert(CreateConnectionTask(task));
163 }
164}
165
166async fn create_conn_and_send_intention_packet(
167 opts: ConnectOpts,
168) -> Result<LoginConn, ConnectionError> {
169 let mut conn = if let Some(proxy) = opts.server_proxy {
170 Connection::new_with_proxy(&opts.address.socket, proxy).await?
171 } else {
172 Connection::new(&opts.address.socket).await?
173 };
174
175 conn.write(ServerboundIntention {
176 protocol_version: PROTOCOL_VERSION,
177 hostname: opts.address.server.host.clone(),
178 port: opts.address.server.port,
179 intention: ClientIntention::Login,
180 })
181 .await?;
182
183 let conn = conn.login();
184
185 Ok(conn)
186}
187
188type LoginConn = Connection<ClientboundLoginPacket, ServerboundLoginPacket>;
189
190#[derive(Component)]
191pub struct CreateConnectionTask(pub Task<Result<LoginConn, ConnectionError>>);
192
193pub fn poll_create_connection_task(
194 mut commands: Commands,
195 mut query: Query<(Entity, &mut CreateConnectionTask, &Account)>,
196 mut connection_failed_events: MessageWriter<ConnectionFailedEvent>,
197) {
198 for (entity, mut task, account) in query.iter_mut() {
199 if let Some(poll_res) = future::block_on(future::poll_once(&mut task.0)) {
200 let mut entity_mut = commands.entity(entity);
201 entity_mut.remove::<CreateConnectionTask>();
202 let conn = match poll_res {
203 Ok(conn) => conn,
204 Err(error) => {
205 warn!("failed to create connection: {error}");
206 connection_failed_events.write(ConnectionFailedEvent { entity, error });
207 return;
208 }
209 };
210
211 let (read_conn, write_conn) = conn.into_split();
212 let (read_conn, write_conn) = (read_conn.raw, write_conn.raw);
213
214 let instance = Instance::default();
215 let instance_holder = crate::local_player::InstanceHolder::new(
216 entity,
217 Arc::new(RwLock::new(instance)),
220 );
221
222 entity_mut.insert((
223 LocalPlayerBundle {
225 raw_connection: RawConnection::new(
226 read_conn,
227 write_conn,
228 ConnectionProtocol::Login,
229 ),
230 instance_holder,
231 metadata: azalea_entity::metadata::PlayerMetadataBundle::default(),
232 },
233 InLoginState,
234 ));
235
236 commands.trigger(SendLoginPacketEvent::new(
237 entity,
238 ServerboundHello {
239 name: account.username.clone(),
240 profile_id: account.uuid_or_offline(),
241 },
242 ));
243 }
244 }
245}