use crate::archetypes::ChecksumWorld;
#[cfg(all(feature = "client", feature = "replication"))]
use crate::late_join::CatchUpManager;
use crate::plugin::DeterministicReplicationPlugin;
use alloc::collections::BTreeMap;
#[cfg(feature = "server")]
use bevy_app::FixedLast;
use bevy_app::{App, Plugin, PostUpdate};
use bevy_ecs::prelude::*;
use core::hash::Hasher;
#[cfg(feature = "client")]
use lightyear_connection::client::Client;
#[cfg(feature = "server")]
use lightyear_connection::client::Connected;
use lightyear_connection::direction::NetworkDirection;
#[cfg(feature = "server")]
use lightyear_connection::server::Started;
#[cfg(feature = "server")]
use lightyear_core::id::RemoteId;
use lightyear_core::prelude::LocalTimeline;
use lightyear_core::tick::Tick;
#[cfg(feature = "client")]
use lightyear_inputs::InputChannel;
#[cfg(feature = "client")]
use lightyear_inputs::client::InputSystems;
#[cfg(feature = "server")]
use lightyear_link::server::{LinkOf, Server};
#[cfg(feature = "client")]
use lightyear_messages::plugin::MessageSystems;
use lightyear_messages::prelude::AppMessageExt;
#[cfg(feature = "client")]
use lightyear_messages::prelude::MessageSender;
#[cfg(feature = "server")]
use lightyear_messages::receive::MessageReceiver;
#[cfg(feature = "client")]
use lightyear_prediction::manager::{LastConfirmedInput, StateRollbackMetadata};
#[cfg(feature = "client")]
use lightyear_sync::prelude::{InputTimeline, IsSynced};
use serde::{Deserialize, Serialize};
#[cfg(feature = "server")]
use tracing::error;
use tracing::{debug, trace};
#[derive(Component, Debug, Default)]
pub struct ChecksumHistory {
history: BTreeMap<Tick, u64>,
}
#[cfg(feature = "client")]
pub struct ChecksumSendPlugin;
#[cfg(feature = "client")]
impl ChecksumSendPlugin {
fn compute_and_send_checksum(
mut world: ChecksumWorld<'_, '_, true>,
local_timeline: Res<LocalTimeline>,
client: Single<
(&LastConfirmedInput, &mut MessageSender<ChecksumMessage>),
(With<Client>, With<IsSynced<InputTimeline>>),
>,
#[cfg(feature = "replication")] catchup_manager: Option<
Single<&CatchUpManager, With<Client>>,
>,
state_metadata: Res<StateRollbackMetadata>,
) {
let mut checksum = 0u64;
let current_tick = local_timeline.tick();
let (last_confirmed_input, mut sender) = client.into_inner();
let tick = last_confirmed_input.tick.get();
if tick > current_tick {
return;
}
#[cfg(feature = "replication")]
if catchup_manager.is_some_and(|manager| manager.suppresses_checksums()) {
return;
}
if state_metadata.forced_rollback_tick().is_some() {
return;
}
world.update_archetypes();
unsafe { world.iter_archetypes() }.for_each(|(archetype, checksum_archetype)| {
archetype.entities().iter().for_each(|entity| {
checksum_archetype.components.iter().for_each(|(component_id, storage_type)| {
trace!("Adding component {:?} from entity {:?} to checksum for tick {:?}",
component_id, entity.id(), tick);
let history_ptr = unsafe {
lightyear_utils::ecs::get_component_unchecked_mut(world.world, entity, archetype.table_id(), *storage_type, *component_id)
};
let (hash_fn, pop_until_tick_and_hash_fn) = world.state.hash_fns.get(component_id).expect("Component in checksum archetype must have a hash function registered");
let mut hasher = seahash::SeaHasher::default();
pop_until_tick_and_hash_fn.unwrap()(history_ptr, tick, &mut hasher, hash_fn.inner);
let hash = hasher.finish();
checksum ^= hash; });
});
});
debug!(
?current_tick,
"Computed checksum for LastConfirmedInput tick {:?}: {:016x}", tick, checksum
);
sender.send::<InputChannel>(ChecksumMessage { tick, checksum });
}
}
#[derive(Serialize, Deserialize)]
pub struct ChecksumMessage {
pub tick: Tick,
pub checksum: u64,
}
#[cfg(feature = "client")]
impl Plugin for ChecksumSendPlugin {
fn build(&self, app: &mut App) {
if !app.is_plugin_added::<DeterministicReplicationPlugin>() {
app.add_plugins(DeterministicReplicationPlugin);
}
app.register_required_components::<InputTimeline, LastConfirmedInput>();
if !app.is_message_registered::<ChecksumMessage>() {
app.register_message::<ChecksumMessage>()
.add_direction(NetworkDirection::ClientToServer);
}
}
fn finish(&self, app: &mut App) {
app.add_systems(
PostUpdate,
ChecksumSendPlugin::compute_and_send_checksum
.after(InputSystems::UpdateRemoteInputTicks)
.before(MessageSystems::Send),
);
}
}
#[cfg(feature = "server")]
pub struct ChecksumReceivePlugin;
#[cfg(feature = "server")]
impl ChecksumReceivePlugin {
fn compute_and_store_checksum(
mut world: ChecksumWorld<'_, '_, false>,
timeline: Res<LocalTimeline>,
server: Single<&mut ChecksumHistory, With<Started>>,
) {
let mut checksum = 0u64;
let tick = timeline.tick();
let mut history = server.into_inner();
world.update_archetypes();
unsafe { world.iter_archetypes() }.for_each(|(archetype, checksum_archetype)| {
archetype.entities().iter().for_each(|entity| {
checksum_archetype
.components
.iter()
.for_each(|(component_id, storage_type)| {
trace!(
"Adding component {:?} from entity {:?} to checksum for tick {:?}",
component_id,
entity.id(),
tick
);
let component_ptr = unsafe {
lightyear_utils::ecs::get_component_unchecked(
world.world,
entity,
archetype.table_id(),
*storage_type,
*component_id,
)
};
let (hash_fn, _) = world.state.hash_fns.get(component_id).expect(
"Component in checksum archetype must have a hash function registered",
);
let mut hasher = seahash::SeaHasher::default();
hash_fn.hash_component(component_ptr, &mut hasher);
let hash = hasher.finish();
checksum ^= hash; });
});
});
debug!("Computed checksum for tick {:?}: {:016x}", tick, checksum);
history.history.insert(tick, checksum);
}
fn receive_checksum_message(
mut messages: Query<
(&mut MessageReceiver<ChecksumMessage>, &LinkOf, &RemoteId),
With<Connected>,
>,
server: Query<&ChecksumHistory, (With<Server>, With<Started>)>,
) {
messages.iter_mut().for_each(|(mut receiver, link_of, remote_id)| {
if let Ok(history) = server.get(link_of.server) {
receiver.receive().for_each(|message| {
let Some(&expected) = history.history.get(&message.tick) else {
return;
};
if expected == message.checksum {
debug!("Checksum match from client {:?} at tick {:?}: {:016x}", remote_id, message.tick, message.checksum);
} else if message.checksum != 0 {
error!("Checksum mismatch from client {:?} at tick {:?}: expected {:016x}, got {:016x}", remote_id, message.tick, expected, message.checksum);
}
})
}
})
}
fn clean_history(
timeline: Res<LocalTimeline>,
history: Single<&mut ChecksumHistory, (With<Server>, With<Started>)>,
) {
let tick = timeline.tick();
let mut history = history.into_inner();
history.history.retain(|t, _| *t >= tick - 30);
}
}
#[cfg(feature = "server")]
impl Plugin for ChecksumReceivePlugin {
fn build(&self, app: &mut App) {
if !app.is_plugin_added::<DeterministicReplicationPlugin>() {
app.add_plugins(DeterministicReplicationPlugin);
}
app.register_required_components::<Server, ChecksumHistory>();
if !app.is_message_registered::<ChecksumMessage>() {
app.register_message::<ChecksumMessage>()
.add_direction(NetworkDirection::ClientToServer);
}
app.add_systems(
PostUpdate,
(
ChecksumReceivePlugin::clean_history,
ChecksumReceivePlugin::receive_checksum_message,
),
);
}
fn finish(&self, app: &mut App) {
app.add_systems(FixedLast, ChecksumReceivePlugin::compute_and_store_checksum);
}
}