use crate::config::{InputConfig, SharedInputConfig};
use crate::input_buffer::InputBuffer;
use crate::input_message::{
ActionStateQueryData, ActionStateSequence, InputMessage, InputSnapshot, InputTarget,
PerTargetData, StateMut, StateRef,
};
use crate::plugin::InputPlugin;
use crate::{HISTORY_DEPTH, InputChannel};
#[cfg(feature = "metrics")]
use alloc::format;
use alloc::{vec, vec::Vec};
use bevy_app::{
App, FixedPostUpdate, FixedPreUpdate, Plugin, PostUpdate, PreUpdate, RunFixedMainLoopSystems,
};
use bevy_ecs::entity::MapEntities;
use bevy_ecs::prelude::*;
use bevy_time::{Real, Time, Timer, TimerMode};
use bevy_utils::prelude::DebugName;
use lightyear_connection::host::HostClient;
use lightyear_core::prelude::*;
use lightyear_core::tick::TickDuration;
#[cfg(feature = "interpolation")]
use lightyear_core::time::TickDelta;
use lightyear_connection::client::Client;
#[cfg(feature = "interpolation")]
use lightyear_interpolation::prelude::*;
use lightyear_messages::plugin::MessageSystems;
#[cfg(feature = "prediction")]
use lightyear_messages::prelude::MessageReceiver;
use lightyear_messages::prelude::MessageSender;
use lightyear_prediction::prelude::*;
use lightyear_replication::prelude::{ControlledBy, PreSpawned};
use lightyear_sync::plugin::SyncSystems;
use lightyear_sync::prelude::client::IsSynced;
use lightyear_sync::prelude::{InputTimeline, InputTimelineConfig};
use lightyear_transport::prelude::ChannelRegistry;
#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};
#[deprecated(note = "Use InputSystems instead")]
pub type InputSet = InputSystems;
#[derive(SystemSet, Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum InputSystems {
ReceiveInputMessages,
WriteClientInputs,
BufferClientInputs,
PrepareInputMessage,
RestoreInputs,
UpdateRemoteInputTicks,
SendInputMessage,
CleanUp,
}
pub struct ClientInputPlugin<S: ActionStateSequence> {
config: InputConfig<S::Action>,
}
impl<S: ActionStateSequence> ClientInputPlugin<S> {
pub fn new(config: InputConfig<S::Action>) -> Self {
Self { config }
}
}
impl<S: ActionStateSequence> ClientInputPlugin<S> {
fn default() -> Self {
Self::new(InputConfig::default())
}
}
impl<S: ActionStateSequence + MapEntities> Plugin for ClientInputPlugin<S> {
fn build(&self, app: &mut App) {
if !app.is_plugin_added::<InputPlugin<S>>() {
app.add_plugins(InputPlugin::<S>::default());
}
app.init_resource::<SharedInputConfig>();
app.insert_resource(self.config);
app.init_resource::<MessageBuffer<S>>();
app.configure_sets(
PreUpdate,
InputSystems::ReceiveInputMessages
.before(RunFixedMainLoopSystems::FixedMainLoop)
.after(RunFixedMainLoopSystems::BeforeFixedMainLoop),
);
app.configure_sets(
FixedPreUpdate,
(
InputSystems::WriteClientInputs,
InputSystems::BufferClientInputs,
)
.chain(),
);
app.configure_sets(FixedPostUpdate, InputSystems::RestoreInputs);
app.configure_sets(
PostUpdate,
((
InputSystems::PrepareInputMessage,
SyncSystems::Sync,
InputSystems::SendInputMessage,
InputSystems::CleanUp,
MessageSystems::Send,
)
.chain(),),
);
#[cfg(feature = "prediction")]
if self.config.rebroadcast_inputs {
app.configure_sets(
PreUpdate,
InputSystems::ReceiveInputMessages
.after(MessageSystems::Receive)
.before(RollbackSystems::Check),
);
app.add_systems(
PreUpdate,
receive_remote_player_input_messages::<S>
.in_set(InputSystems::ReceiveInputMessages),
);
app.configure_sets(
PostUpdate,
InputSystems::UpdateRemoteInputTicks.after(SyncSystems::Sync),
);
app.add_systems(
PostUpdate,
update_last_confirmed_input::<S>.in_set(InputSystems::UpdateRemoteInputTicks),
);
}
app.add_systems(
FixedPreUpdate,
(buffer_action_state::<S>, get_action_state::<S>)
.chain()
.in_set(InputSystems::BufferClientInputs),
);
app.add_systems(
FixedPostUpdate,
(
get_delayed_action_state::<S>.in_set(InputSystems::RestoreInputs),
),
);
app.add_systems(
PostUpdate,
(
prepare_input_message::<S>.in_set(InputSystems::PrepareInputMessage),
clean_buffers::<S>.in_set(InputSystems::CleanUp),
send_input_messages::<S>.in_set(InputSystems::SendInputMessage),
),
);
app.add_observer(receive_tick_events::<S>);
}
}
fn buffer_action_state<S: ActionStateSequence>(
local_timeline: Res<LocalTimeline>,
input_timeline: Single<
(Entity, &InputTimeline),
(
With<Client>,
With<IsSynced<InputTimeline>>,
Without<Rollback>,
),
>,
mut action_state_query: Query<
(
Entity,
StateRef<S>,
&mut InputBuffer<S::Snapshot, S::Action>,
Option<&ControlledBy>,
),
(With<S::Marker>, Allow<PredictionDisable>),
>,
) {
let (client_entity, input_timeline) = input_timeline.into_inner();
let current_tick = local_timeline.tick();
let tick = current_tick + input_timeline.input_delay() as i32;
for (entity, action_state, mut input_buffer, controlled_by) in action_state_query.iter_mut() {
if controlled_by.is_some_and(|controlled_by| controlled_by.owner != client_entity) {
continue;
}
input_buffer.set(tick, S::to_snapshot(action_state));
trace!(
?entity,
?current_tick,
delayed_tick = ?tick,
input_buffer = %input_buffer.as_ref(),
"set action state in input buffer",
);
trace!(
target: "lightyear_debug::input",
kind = "buffer_action_state",
schedule = "FixedPreUpdate",
sample_point = "FixedPreUpdate",
entity = ?entity,
action = ?DebugName::type_name::<S::Action>(),
local_tick = current_tick.0,
input_tick = tick.0,
buffer_len = input_buffer.len(),
input_buffer = %input_buffer.as_ref(),
"buffered local action state"
);
#[cfg(feature = "metrics")]
{
metrics::gauge!(format!(
"inputs::{}::{}::buffer_size",
core::any::type_name::<S::Action>(),
entity
))
.set(input_buffer.len() as f64);
}
}
}
fn get_action_state<S: ActionStateSequence>(
tick_duration: Res<TickDuration>,
config: Res<InputConfig<S::Action>>,
local_timeline: Res<LocalTimeline>,
sender: Query<
(&InputTimeline, &InputTimelineConfig, Has<Rollback>),
(With<Client>, Without<HostClient>),
>,
mut action_state_query: Query<
(
Entity,
StateMut<S>,
&mut InputBuffer<S::Snapshot, S::Action>,
Has<S::Marker>,
),
Allow<PredictionDisable>,
>,
) {
let Ok((input_timeline, input_config, is_rollback)) = sender.single() else {
return;
};
let input_delay = input_timeline.input_delay() as i32;
let tick = local_timeline.tick();
if is_rollback && config.ignore_rollbacks {
return;
}
for (entity, action_state, mut input_buffer, is_local) in action_state_query.iter_mut() {
if !is_rollback && is_local && input_delay == 0 {
continue;
}
if let Some(snapshot) = input_buffer.get(tick) {
S::from_snapshot(S::State::into_inner(action_state), snapshot);
trace!(
?entity,
?tick,
?is_local,
?snapshot,
"fetched action state from input buffer: {:?}",
input_buffer
);
trace!(
target: "lightyear_debug::input",
kind = "get_action_state",
schedule = "FixedPreUpdate",
sample_point = "FixedPreUpdate",
entity = ?entity,
action = ?DebugName::type_name::<S::Action>(),
local_tick = tick.0,
input_tick = tick.0,
is_local,
is_rollback,
snapshot = ?snapshot,
buffer_len = input_buffer.len(),
input_buffer = %*input_buffer,
"restored action state from input buffer"
);
} else if !is_local && config.rebroadcast_inputs {
if input_config.is_lockstep() {
error!("We are in lockstep mode but didn't receive an input for tick {tick:?}!");
}
let mut snapshot = S::to_snapshot(S::State::as_read_only(&action_state));
snapshot.decay_tick(tick_duration.0);
trace!(
?entity,
?tick,
"Action = {}, For remote input; no input for tick so we decay the ActionState to: {:?}",
DebugName::type_name::<S::Action>(),
snapshot
);
trace!(
target: "lightyear_debug::input",
kind = "decay_missing_remote_action_state",
schedule = "FixedPreUpdate",
sample_point = "FixedPreUpdate",
entity = ?entity,
action = ?DebugName::type_name::<S::Action>(),
local_tick = tick.0,
input_tick = tick.0,
is_rollback,
snapshot = ?snapshot,
buffer_len = input_buffer.len(),
"decayed missing remote action state"
);
S::from_snapshot(S::State::into_inner(action_state), &snapshot);
input_buffer.set(tick, snapshot);
}
}
}
fn get_delayed_action_state<S: ActionStateSequence>(
timeline: Res<LocalTimeline>,
sender: Query<
(Entity, &InputTimeline, Has<Rollback>),
(With<Client>, With<IsSynced<InputTimeline>>),
>,
mut action_state_query: Query<
(
Entity,
StateMut<S>,
&InputBuffer<S::Snapshot, S::Action>,
Option<&ControlledBy>,
),
(With<S::Marker>, Allow<PredictionDisable>),
>,
) {
let Ok((client_entity, input_timeline, is_rollback)) = sender.single() else {
return;
};
let input_delay_ticks = input_timeline.input_delay() as i32;
if is_rollback || input_delay_ticks == 0 {
return;
}
let tick = timeline.tick();
let delayed_tick = tick + input_delay_ticks;
for (entity, action_state, input_buffer, controlled_by) in action_state_query.iter_mut() {
if controlled_by.is_some_and(|controlled_by| controlled_by.owner != client_entity) {
continue;
}
if let Some(delayed_action_state) = input_buffer.get(delayed_tick) {
S::from_snapshot(S::State::into_inner(action_state), delayed_action_state);
trace!(
?entity,
?delayed_tick,
"fetched delayed action state from input buffer: {}",
input_buffer
);
trace!(
target: "lightyear_debug::input",
kind = "get_delayed_action_state",
schedule = "RunFixedMainLoop",
sample_point = "RunFixedMainLoop",
entity = ?entity,
action = ?DebugName::type_name::<S::Action>(),
local_tick = tick.0,
input_tick = delayed_tick.0,
snapshot = ?delayed_action_state,
buffer_len = input_buffer.len(),
input_buffer = %input_buffer,
"restored delayed action state"
);
}
}
}
fn clean_buffers<S: ActionStateSequence>(
timeline: Res<LocalTimeline>,
sender: Query<(), (With<Client>, With<InputTimeline>, Without<HostClient>)>,
prediction_manager: Option<Single<&PredictionManager, With<Client>>>,
mut input_buffer_query: Query<
&mut InputBuffer<S::Snapshot, S::Action>,
Allow<PredictionDisable>,
>,
) {
if sender.single().is_err() {
return;
}
let old_tick = timeline.tick() - input_history_depth(prediction_manager.as_deref().copied());
for mut input_buffer in input_buffer_query.iter_mut() {
input_buffer.pop(old_tick);
}
}
fn input_history_depth(prediction_manager: Option<&PredictionManager>) -> u32 {
prediction_manager
.map(|manager| u32::from(manager.rollback_policy.max_rollback_ticks) + 1)
.unwrap_or(0)
.max(HISTORY_DEPTH)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn input_history_depth_covers_prediction_rollback_window() {
assert_eq!(input_history_depth(None), HISTORY_DEPTH);
let mut manager = PredictionManager {
rollback_policy: RollbackPolicy {
max_rollback_ticks: 100,
..Default::default()
},
..Default::default()
};
assert_eq!(input_history_depth(Some(&manager)), 101);
manager.rollback_policy.max_rollback_ticks = 5;
assert_eq!(input_history_depth(Some(&manager)), HISTORY_DEPTH);
}
}
#[derive(Debug, Resource)]
pub(crate) struct MessageBuffer<S>(Vec<InputMessage<S>>);
impl<A> Default for MessageBuffer<A> {
fn default() -> Self {
Self(vec![])
}
}
fn prepare_input_message<S: ActionStateSequence>(
mut message_buffer: ResMut<MessageBuffer<S>>,
tick_duration: Res<TickDuration>,
timeline: Res<LocalTimeline>,
input_config: Res<InputConfig<S::Action>>,
sender: Single<
(Entity, &InputTimeline, Has<HostClient>),
(
With<Client>,
With<IsSynced<InputTimeline>>,
Without<Rollback>,
),
>,
_channel_registry: Res<ChannelRegistry>,
input_buffer_query: Query<
(
Entity,
&InputBuffer<S::Snapshot, S::Action>,
Option<&PreSpawned>,
Option<&ControlledBy>,
),
(With<S::Marker>, Allow<PredictionDisable>),
>,
real_time: Res<Time<Real>>,
mut send_timer: Local<Option<Timer>>,
) {
if !input_config.send_interval.is_zero() {
let timer = send_timer
.get_or_insert_with(|| Timer::new(input_config.send_interval, TimerMode::Repeating));
timer.tick(real_time.delta());
if !timer.is_finished() {
return;
}
}
let (client_entity, input_timeline, is_host_client) = sender.into_inner();
#[cfg(not(feature = "prediction"))]
if is_host_client {
return;
}
#[cfg(feature = "prediction")]
if is_host_client && !input_config.rebroadcast_inputs {
return;
}
let current_tick = timeline.tick();
let tick = current_tick + input_timeline.input_delay() as i32;
trace!(delayed_tick = ?tick, ?current_tick, "prepare_input_message");
trace!(
target: "lightyear_debug::input",
kind = "prepare_input_message_start",
schedule = "PostUpdate",
sample_point = "PostUpdate",
action = ?DebugName::type_name::<S::Action>(),
local_tick = current_tick.0,
input_tick = tick.0,
is_host_client,
"preparing input message"
);
let mut num_ticks: u32 = ((input_config.send_interval.as_nanos() / tick_duration.as_nanos())
+ 1)
.try_into()
.unwrap();
num_ticks *= input_config.packet_redundancy as u32;
let mut message = InputMessage::<S>::new(tick);
for (entity, input_buffer, pre_spawned, controlled_by) in input_buffer_query.iter() {
if controlled_by.is_some_and(|controlled_by| controlled_by.owner != client_entity) {
continue;
}
trace!(
?tick,
?entity,
"Preparing input message with buffer: {:?}",
input_buffer
);
let target = if let Some(prespawned) = pre_spawned
&& let Some(hash) = prespawned.hash
{
debug!(?hash, ?entity, "Sending input for prespawned entity");
InputTarget::PreSpawned(hash)
} else {
InputTarget::Entity(entity)
};
if let Some(state_sequence) = S::build_from_input_buffer(input_buffer, num_ticks, tick) {
trace!(
target: "lightyear_debug::input",
kind = "prepare_input_message_target",
schedule = "PostUpdate",
sample_point = "PostUpdate",
entity = ?entity,
action = ?DebugName::type_name::<S::Action>(),
local_tick = current_tick.0,
input_tick = tick.0,
num_ticks = num_ticks,
buffer_len = input_buffer.len(),
target = ?target,
states = ?state_sequence,
"added target data to input message"
);
message.inputs.push(PerTargetData {
target,
states: state_sequence,
});
}
}
debug!(
?tick,
?num_ticks,
?is_host_client,
"sending input message for {:?}: {}",
DebugName::type_name::<S::Action>().shortname(),
message
);
trace!(
target: "lightyear_debug::input",
kind = "prepare_input_message_finish",
schedule = "PostUpdate",
sample_point = "PostUpdate",
action = ?DebugName::type_name::<S::Action>(),
local_tick = current_tick.0,
input_tick = tick.0,
end_tick = tick.0,
num_ticks = num_ticks,
num_targets = message.inputs.len(),
is_host_client,
message = ?message,
"prepared input message"
);
message_buffer.0.push(message);
}
#[cfg(feature = "prediction")]
fn receive_remote_player_input_messages<S: ActionStateSequence>(
mut commands: Commands,
tick_duration: Res<TickDuration>,
timeline: Res<LocalTimeline>,
link: Single<
(
&mut MessageReceiver<InputMessage<S>>,
Option<&LastConfirmedInput>,
&PredictionManager,
),
(
With<Client>,
With<IsSynced<InputTimeline>>,
Without<HostClient>,
),
>,
mut predicted_query: Query<
Option<&mut InputBuffer<S::Snapshot, S::Action>>,
(Without<S::Marker>, Allow<PredictionDisable>),
>,
prespawned: Query<(Entity, &PreSpawned)>,
) {
let (mut receiver, last_confirmed_input, prediction_manager) = link.into_inner();
let tick = timeline.tick();
let mut received_relevant_input = false;
receiver.receive().for_each(|message| {
trace!(?message.end_tick, ?message, "received remote input message for action: {:?}", DebugName::type_name::<S::Action>());
trace!(
target: "lightyear_debug::input",
kind = "remote_input_message_recv",
schedule = "PreUpdate",
sample_point = "PreUpdate",
action = ?DebugName::type_name::<S::Action>(),
local_tick = tick.0,
end_tick = message.end_tick.0,
num_targets = message.inputs.len(),
rebroadcast = message.rebroadcast,
message = ?message,
"received remote player input message"
);
for target_data in message.inputs {
let Some(entity) = (match target_data.target {
InputTarget::Entity(entity) => {
Some(entity)
}
InputTarget::PreSpawned(hash) => {
prespawned
.iter()
.find_map(|(e, p)| p.hash.is_some_and(|h| h == hash).then_some(e))
}
}) else {
if message.rebroadcast {
debug!(
target = ?target_data.target,
end_tick = ?message.end_tick,
"ignored stale remote player input message for unmapped entity"
);
} else {
warn!("Could not find entity in entity_map for remote player input message {:?}", target_data.target);
}
continue;
};
debug!(
?tick, ?message.end_tick,
"received remote client input message for entity: {:?}. Applying to diff buffer.",
entity
);
trace!(
target: "lightyear_debug::input",
kind = "remote_input_target",
schedule = "PreUpdate",
sample_point = "PreUpdate",
entity = ?entity,
action = ?DebugName::type_name::<S::Action>(),
local_tick = tick.0,
end_tick = message.end_tick.0,
target = ?target_data.target,
states = ?target_data.states,
"applying remote input target data"
);
let Ok(input_buffer) = predicted_query.get_mut(entity) else {
if message.rebroadcast {
debug!(
?entity,
?target_data.states,
end_tick = ?message.end_tick,
"ignored stale remote player input message for unrecognized entity"
);
} else {
error!(?entity, ?target_data.states, end_tick = ?message.end_tick, "received input message for unrecognized entity");
}
continue
};
trace!(predicted=?entity, end_tick = ?message.end_tick, "update action diff buffer for remote player PREDICTED using input message");
if let Some(mut input_buffer) = input_buffer {
if input_buffer.last_remote_tick.is_some_and(|t| t >= message.end_tick) {
trace!("Ignoring input message because our current last_remote_tick {:?} is more recent than the remote_end_tick {:?}", input_buffer.last_remote_tick, message.end_tick);
trace!(
target: "lightyear_debug::input",
kind = "remote_input_ignored_stale",
schedule = "PreUpdate",
sample_point = "PreUpdate",
entity = ?entity,
action = ?DebugName::type_name::<S::Action>(),
local_tick = tick.0,
end_tick = message.end_tick.0,
last_remote_tick = ?input_buffer.last_remote_tick,
"ignored stale remote input message"
);
continue
}
received_relevant_input = true;
update_buffer_from_remote_player_message::<S>(
target_data.states,
&mut input_buffer,
tick,
message.end_tick,
entity,
prediction_manager,
*tick_duration
);
} else {
let mut input_buffer = InputBuffer::<S::Snapshot, S::Action>::default();
received_relevant_input = true;
update_buffer_from_remote_player_message::<S>(
target_data.states,
&mut input_buffer,
tick,
message.end_tick,
entity,
prediction_manager,
*tick_duration
);
let mut action_state = S::State::base_value();
if let Some(last) = input_buffer.get_last() {
S::from_snapshot(S::State::as_mut(&mut action_state), last);
}
commands.entity(entity).insert((
input_buffer,
action_state,
));
};
}
});
if let Some(last_confirmed_input) = last_confirmed_input
&& received_relevant_input
{
last_confirmed_input
.received_any_messages
.store(true, bevy_platform::sync::atomic::Ordering::Relaxed);
}
}
#[cfg(feature = "prediction")]
fn update_last_confirmed_input<S: ActionStateSequence>(
timeline: Res<LocalTimeline>,
last_confirmed_input: Single<
(&mut LastConfirmedInput, &InputTimelineConfig),
(With<Client>, With<IsSynced<InputTimeline>>),
>,
predicted_query: Query<
&InputBuffer<S::Snapshot, S::Action>,
(Without<S::Marker>, Allow<PredictionDisable>),
>,
) {
let (mut last_confirmed_input, input_config) = last_confirmed_input.into_inner();
let tick = timeline.tick();
if input_config.is_lockstep() {
last_confirmed_input.tick.set_if_lower(tick);
return;
}
last_confirmed_input.received_for_all_clients = true;
predicted_query.iter().for_each(|buffer| {
if let Some(end_tick) = buffer.last_remote_tick {
last_confirmed_input.tick.set_if_lower(end_tick);
} else {
last_confirmed_input.received_for_all_clients = false;
}
});
trace!(
target: "lightyear_debug::input",
kind = "last_confirmed_input",
schedule = "PostUpdate",
sample_point = "PostUpdate",
action = ?DebugName::type_name::<S::Action>(),
local_tick = tick.0,
confirmed_tick = last_confirmed_input.tick.get().0,
"updated LastConfirmedInput"
);
}
#[cfg(feature = "prediction")]
fn update_buffer_from_remote_player_message<S: ActionStateSequence>(
sequence: S,
input_buffer: &mut InputBuffer<S::Snapshot, S::Action>,
tick: Tick,
end_tick: Tick,
entity: Entity,
prediction_manager: &PredictionManager,
tick_duration: TickDuration,
) {
if let Some(mismatch) = sequence.update_buffer(input_buffer, end_tick, tick_duration.0) {
trace!(
target: "lightyear_debug::input",
kind = "remote_input_buffer_update",
schedule = "PreUpdate",
sample_point = "PreUpdate",
entity = ?entity,
action = ?DebugName::type_name::<S::Action>(),
local_tick = tick.0,
end_tick = end_tick.0,
mismatch_tick = mismatch.0,
buffer_len = input_buffer.len(),
last_remote_tick = ?input_buffer.last_remote_tick,
input_buffer = %input_buffer,
"updated remote input buffer with mismatch"
);
if let RollbackMode::Check = prediction_manager.rollback_policy.input
&& mismatch <= tick
{
debug!(
?entity,
?tick,
?end_tick,
?mismatch,
"Mismatch detected for remote player input message!",
);
prediction_manager
.earliest_mismatch_input
.has_mismatches
.store(true, bevy_platform::sync::atomic::Ordering::Relaxed);
prediction_manager
.earliest_mismatch_input
.tick
.set_if_lower(mismatch);
}
#[cfg(feature = "metrics")]
{
metrics::counter!(format!(
"inputs::{}::remote_player::receive",
DebugName::type_name::<S::Action>(),
))
.increment(1);
let margin = input_buffer.last_remote_tick.unwrap() - tick;
metrics::gauge!(format!(
"inputs::{}::remote_player::{}::buffer_margin",
DebugName::type_name::<S::Action>(),
entity
))
.set(margin as f64);
metrics::gauge!(format!(
"inputs::{}::remote_player::{}::buffer_size",
DebugName::type_name::<S::Action>(),
entity
))
.set(input_buffer.len() as f64);
}
};
trace!(
target: "lightyear_debug::input",
kind = "remote_input_buffer_state",
schedule = "PreUpdate",
sample_point = "PreUpdate",
entity = ?entity,
action = ?DebugName::type_name::<S::Action>(),
local_tick = tick.0,
end_tick = end_tick.0,
buffer_len = input_buffer.len(),
last_remote_tick = ?input_buffer.last_remote_tick,
input_buffer = %input_buffer,
"remote input buffer state after message"
);
}
fn send_input_messages<S: ActionStateSequence>(
input_config: Res<InputConfig<S::Action>>,
mut message_buffer: ResMut<MessageBuffer<S>>,
sender: Single<
(&mut MessageSender<InputMessage<S>>, Has<HostClient>),
(With<Client>, With<IsSynced<InputTimeline>>),
>,
#[cfg(feature = "interpolation")] interpolation_query: Single<
(&InputTimeline, &InterpolationTimeline),
(
With<Client>,
With<IsSynced<InterpolationTimeline>>,
With<IsSynced<InputTimeline>>,
),
>,
) {
let (mut sender, is_host_client) = sender.into_inner();
#[cfg(not(feature = "prediction"))]
if is_host_client {
message_buffer.0.clear();
return;
}
#[cfg(feature = "prediction")]
if is_host_client && !input_config.rebroadcast_inputs {
message_buffer.0.clear();
return;
}
trace!(
"Number of input messages to send: {:?}",
message_buffer.0.len()
);
trace!(
target: "lightyear_debug::input",
kind = "send_input_messages",
schedule = "PostUpdate",
sample_point = "PostUpdate",
action = ?DebugName::type_name::<S::Action>(),
num_messages = message_buffer.0.len(),
is_host_client,
"sending buffered input messages"
);
#[cfg(feature = "interpolation")]
let interpolation_delay = {
let (input_timeline, interpolation_timeline) = interpolation_query.into_inner();
let mut delay = input_timeline.now() - interpolation_timeline.now();
if delay.is_negative() {
delay = TickDelta::from(Tick(0));
}
InterpolationDelay {
delay: delay.into(),
}
};
#[cfg_attr(
not(feature = "interpolation"),
expect(unused_mut, reason = "Used by expression behind feature flag")
)]
for mut message in message_buffer.0.drain(..) {
#[cfg(feature = "interpolation")]
if input_config.lag_compensation {
message.interpolation_delay = Some(interpolation_delay);
}
sender.send::<InputChannel>(message);
}
}
fn receive_tick_events<S: ActionStateSequence>(
trigger: On<SyncEvent<InputTimelineConfig>>,
mut message_buffer: ResMut<MessageBuffer<S>>,
clients: Query<(), With<Client>>,
mut input_buffer_query: Query<
(
&mut InputBuffer<S::Snapshot, S::Action>,
Option<&ControlledBy>,
),
Allow<PredictionDisable>,
>,
) {
if clients.get(trigger.entity).is_err() {
return;
}
let delta = trigger.tick_delta;
for (mut input_buffer, controlled_by) in input_buffer_query.iter_mut() {
if controlled_by.is_some_and(|controlled_by| controlled_by.owner != trigger.entity) {
continue;
}
if let Some(start_tick) = input_buffer.start_tick {
input_buffer.start_tick = Some(start_tick + delta);
debug!(
"Receive tick snap event {:?}. Updating input buffer start_tick to {:?}!",
trigger.event(),
input_buffer.start_tick
);
}
if let Some(last_remote_tick) = input_buffer.last_remote_tick {
input_buffer.last_remote_tick = Some(last_remote_tick + delta);
}
}
for message in message_buffer.0.iter_mut() {
message.end_tick = message.end_tick + delta;
}
}