azalea-client 0.16.0+mc26.1

A headless Minecraft client.
Documentation
//! Used for Minecraft's chunk batching introduced in 23w31a (1.20.2). It's used
//! for making the server spread out how often it sends us chunk packets
//! depending on our receiving speed.

use std::{
    io::Cursor,
    time::{Duration, Instant},
};

use azalea_core::position::ChunkPos;
use azalea_protocol::packets::game::{
    c_level_chunk_with_light::ClientboundLevelChunkWithLight,
    s_chunk_batch_received::ServerboundChunkBatchReceived,
};
use bevy_app::{App, Plugin, Update};
use bevy_ecs::prelude::*;
use tracing::{error, trace};

use crate::{
    inventory::InventorySystems, local_player::WorldHolder, packet::game::SendGamePacketEvent,
    respawn::perform_respawn,
};

pub struct ChunksPlugin;
impl Plugin for ChunksPlugin {
    fn build(&self, app: &mut App) {
        app.add_systems(
            Update,
            (
                handle_chunk_batch_start_event,
                handle_receive_chunk_event,
                handle_chunk_batch_finished_event,
            )
                .chain()
                .before(InventorySystems)
                .before(perform_respawn),
        )
        .add_message::<ReceiveChunkEvent>()
        .add_message::<ChunkBatchStartEvent>()
        .add_message::<ChunkBatchFinishedEvent>();
    }
}

#[derive(Message)]
pub struct ReceiveChunkEvent {
    pub entity: Entity,
    pub packet: ClientboundLevelChunkWithLight,
}

#[derive(Clone, Component, Debug)]
pub struct ChunkBatchInfo {
    pub start_time: Instant,
    pub aggregated_duration_per_chunk: Duration,
    pub old_samples_weight: u32,
}

#[derive(Message)]
pub struct ChunkBatchStartEvent {
    pub entity: Entity,
}
#[derive(Message)]
pub struct ChunkBatchFinishedEvent {
    pub entity: Entity,
    pub batch_size: u32,
}

pub fn handle_receive_chunk_event(
    mut events: MessageReader<ReceiveChunkEvent>,
    mut query: Query<&WorldHolder>,
) {
    for event in events.read() {
        let pos = ChunkPos::new(event.packet.x, event.packet.z);

        let local_player = query.get_mut(event.entity).unwrap();

        let mut world = local_player.shared.write();
        let mut partial_world = local_player.partial.write();

        // OPTIMIZATION: if we already know about the chunk from the shared world (and
        // not ourselves), then we don't need to parse it again. This is only used when
        // we have a shared world, since we check that the chunk isn't currently owned
        // by this client.
        let shared_chunk = world.chunks.get(&pos);
        let this_client_has_chunk = partial_world.chunks.limited_get(&pos).is_some();

        if !this_client_has_chunk && let Some(shared_chunk) = shared_chunk {
            trace!("Skipping parsing chunk {pos:?} because we already know about it");
            partial_world.chunks.limited_set(&pos, Some(shared_chunk));
            continue;
        }

        let heightmaps = &event.packet.chunk_data.heightmaps;

        if let Err(e) = partial_world.chunks.replace_with_packet_data(
            &pos,
            &mut Cursor::new(&event.packet.chunk_data.data),
            heightmaps,
            &mut world.chunks,
        ) {
            error!(
                "Couldn't set chunk data: {e}. World height: {}",
                world.chunks.height()
            );
        }
    }
}

impl ChunkBatchInfo {
    pub fn batch_finished(&mut self, batch_size: u32) {
        if batch_size == 0 {
            return;
        }
        let batch_duration = self.start_time.elapsed();
        let duration_per_chunk = batch_duration / batch_size;
        let clamped_duration = Duration::clamp(
            duration_per_chunk,
            self.aggregated_duration_per_chunk / 3,
            self.aggregated_duration_per_chunk * 3,
        );
        self.aggregated_duration_per_chunk =
            ((self.aggregated_duration_per_chunk * self.old_samples_weight) + clamped_duration)
                / (self.old_samples_weight + 1);
        self.old_samples_weight = u32::min(49, self.old_samples_weight + 1);
    }

    pub fn desired_chunks_per_tick(&self) -> f32 {
        (7000000. / self.aggregated_duration_per_chunk.as_nanos() as f64) as f32
    }
}

pub fn handle_chunk_batch_start_event(
    mut query: Query<&mut ChunkBatchInfo>,
    mut events: MessageReader<ChunkBatchStartEvent>,
) {
    for event in events.read() {
        if let Ok(mut chunk_batch_info) = query.get_mut(event.entity) {
            chunk_batch_info.start_time = Instant::now();
        }
    }
}

pub fn handle_chunk_batch_finished_event(
    mut query: Query<&mut ChunkBatchInfo>,
    mut events: MessageReader<ChunkBatchFinishedEvent>,
    mut commands: Commands,
) {
    for event in events.read() {
        if let Ok(mut chunk_batch_info) = query.get_mut(event.entity) {
            chunk_batch_info.batch_finished(event.batch_size);
            let desired_chunks_per_tick = chunk_batch_info.desired_chunks_per_tick();
            commands.trigger(SendGamePacketEvent::new(
                event.entity,
                ServerboundChunkBatchReceived {
                    desired_chunks_per_tick,
                },
            ));
        }
    }
}

#[derive(Clone, Debug)]
pub struct ChunkReceiveSpeedAccumulator {
    batch_sizes: Vec<u32>,
    /// as milliseconds
    batch_durations: Vec<u32>,
    index: usize,
    filled_size: usize,
}
impl ChunkReceiveSpeedAccumulator {
    pub fn new(capacity: usize) -> Self {
        Self {
            batch_sizes: vec![0; capacity],
            batch_durations: vec![0; capacity],
            index: 0,
            filled_size: 0,
        }
    }

    pub fn accumulate(&mut self, batch_size: u32, batch_duration: Duration) {
        self.batch_sizes[self.index] = batch_size;
        self.batch_durations[self.index] =
            f32::clamp(batch_duration.as_millis() as f32, 0., 15000.) as u32;
        self.index = (self.index + 1) % self.batch_sizes.len();
        if self.filled_size < self.batch_sizes.len() {
            self.filled_size += 1;
        }
    }

    pub fn get_millis_per_chunk(&self) -> f64 {
        let mut total_batch_size = 0;
        let mut total_batch_duration = 0;
        for i in 0..self.filled_size {
            total_batch_size += self.batch_sizes[i];
            total_batch_duration += self.batch_durations[i];
        }
        if total_batch_size == 0 {
            return 0.;
        }
        total_batch_duration as f64 / total_batch_size as f64
    }
}

impl Default for ChunkBatchInfo {
    fn default() -> Self {
        Self {
            start_time: Instant::now(),
            aggregated_duration_per_chunk: Duration::from_millis(2),
            old_samples_weight: 1,
        }
    }
}