use crate::HISTORY_DEPTH;
#[cfg(feature = "prediction")]
use crate::InputChannel;
use crate::input_buffer::InputBuffer;
use crate::input_message::{
ActionStateQueryData, ActionStateSequence, InputMessage, InputTarget, StateMut,
};
use crate::plugin::InputPlugin;
#[cfg(feature = "metrics")]
use alloc::format;
use bevy_app::{App, FixedPreUpdate, Plugin, PreUpdate};
use bevy_ecs::component::Component;
use bevy_ecs::prelude::Has;
use bevy_ecs::relationship::RelationshipTarget;
use bevy_ecs::{
entity::{Entity, MapEntities},
error::Result,
query::With,
resource::Resource,
schedule::{IntoScheduleConfigs, SystemSet},
system::{Commands, Query, Res, Single},
};
use bevy_utils::prelude::DebugName;
use core::fmt::{Debug, Formatter};
use core::time::Duration;
use lightyear_connection::client::Connected;
use lightyear_connection::host::HostServer;
use lightyear_connection::prelude::NetworkTarget;
use lightyear_connection::server::Started;
use lightyear_core::id::RemoteId;
use lightyear_core::prelude::LocalTimeline;
use lightyear_core::tick::{Tick, TickDuration};
use lightyear_link::prelude::{LinkOf, Server};
use lightyear_messages::plugin::MessageSystems;
use lightyear_messages::prelude::MessageReceiver;
use lightyear_messages::server::ServerMultiMessageSender;
use lightyear_replication::control::ControlledByRemote;
use lightyear_replication::prelude::{PreSpawned, RoomId, Rooms};
use tracing::{debug, error, trace};
const MAX_INPUT_LOOKAHEAD_TICKS: i32 = 64;
const MAX_INPUT_PAST_TICKS: i32 = 256;
pub(crate) fn is_input_within_lookahead(end_tick: Tick, server_tick: Tick) -> bool {
let delta = end_tick - server_tick;
(-MAX_INPUT_PAST_TICKS..=MAX_INPUT_LOOKAHEAD_TICKS).contains(&delta)
}
pub struct ServerInputPlugin<S> {
pub rebroadcast_inputs: bool,
pub marker: core::marker::PhantomData<S>,
}
impl<S> Default for ServerInputPlugin<S> {
fn default() -> Self {
Self {
rebroadcast_inputs: false,
marker: core::marker::PhantomData,
}
}
}
#[derive(Resource)]
pub struct ServerInputConfig<S> {
pub rebroadcast_inputs: bool,
pub marker: core::marker::PhantomData<S>,
}
#[deprecated(note = "Use InputSystems instead")]
pub type InputSet = InputSystems;
#[derive(SystemSet, Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum InputSystems {
ValidateInputs,
ReceiveInputs,
UpdateActionState,
}
pub trait InputValidationAppExt {
fn add_input_validator<M>(
&mut self,
systems: impl IntoScheduleConfigs<bevy_ecs::system::ScheduleSystem, M>,
) -> &mut Self;
}
impl InputValidationAppExt for App {
fn add_input_validator<M>(
&mut self,
systems: impl IntoScheduleConfigs<bevy_ecs::system::ScheduleSystem, M>,
) -> &mut Self {
self.add_systems(PreUpdate, systems.in_set(InputSystems::ValidateInputs));
self
}
}
pub fn authorize_controlled_targets<S: ActionStateSequence>(
mut receivers: Query<
(
&RemoteId,
Option<&ControlledByRemote>,
&mut MessageReceiver<InputMessage<S>>,
),
With<Connected>,
>,
) {
for (client_id, controlled_by_remote, mut receiver) in receivers.iter_mut() {
if client_id.is_local() {
continue;
}
receiver.retain_messages(|message| {
let before = message.inputs.len();
message.inputs.retain(|data| match data.target {
InputTarget::Entity(entity) => controlled_by_remote
.is_some_and(|controlled| controlled.collection().contains(&entity)),
InputTarget::PreSpawned(_) => true,
});
let dropped = before - message.inputs.len();
if dropped > 0 {
trace!(
?client_id,
dropped, "authorize_controlled_targets: stripped unauthorized input targets"
);
}
true
});
}
}
#[derive(Component)]
pub enum InputRebroadcaster<S> {
Room(RoomId),
Target(NetworkTarget),
Marker(core::marker::PhantomData<S>),
}
impl<S> Debug for InputRebroadcaster<S> {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
match self {
InputRebroadcaster::Room(id) => f.debug_tuple("Room").field(id).finish(),
InputRebroadcaster::Target(target) => f.debug_tuple("Target").field(target).finish(),
InputRebroadcaster::Marker(_) => f
.debug_tuple("Marker")
.field(&DebugName::type_name::<S>())
.finish(),
}
}
}
impl<S> Default for InputRebroadcaster<S> {
fn default() -> Self {
Self::Target(NetworkTarget::All)
}
}
impl<S: ActionStateSequence + MapEntities> Plugin for ServerInputPlugin<S> {
fn build(&self, app: &mut App) {
if !app.is_plugin_added::<InputPlugin<S>>() {
app.add_plugins(InputPlugin::<S>::default());
}
app.insert_resource(ServerInputConfig::<S::Action> {
rebroadcast_inputs: self.rebroadcast_inputs,
marker: core::marker::PhantomData,
});
app.configure_sets(
PreUpdate,
(
MessageSystems::Receive,
InputSystems::ValidateInputs,
InputSystems::ReceiveInputs,
)
.chain(),
);
app.configure_sets(FixedPreUpdate, InputSystems::UpdateActionState);
#[cfg(feature = "client")]
app.configure_sets(
FixedPreUpdate,
InputSystems::UpdateActionState.after(crate::client::InputSystems::BufferClientInputs),
);
app.add_systems(
PreUpdate,
receive_input_message::<S>.in_set(InputSystems::ReceiveInputs),
);
app.add_systems(
FixedPreUpdate,
update_action_state::<S>.in_set(InputSystems::UpdateActionState),
);
}
}
fn receive_input_message<S: ActionStateSequence>(
config: Res<ServerInputConfig<S::Action>>,
server: Query<&Server>,
#[cfg_attr(not(feature = "prediction"), allow(unused_mut))]
mut sender: ServerMultiMessageSender<With<Connected>>,
tick_duration: Res<TickDuration>,
rooms_query: Query<(Entity, &Rooms), With<Connected>>,
timeline: Res<LocalTimeline>,
mut receivers: Query<
(
Entity,
&LinkOf,
&mut MessageReceiver<InputMessage<S>>,
&RemoteId,
Option<&InputRebroadcaster<S::Action>>,
),
With<Connected>,
>,
mut query: Query<Option<&mut InputBuffer<S::Snapshot, S::Action>>>,
prespawned: Query<
(Entity, &PreSpawned),
(
With<<S::State as ActionStateQueryData>::Main>,
With<InputBuffer<S::Snapshot, S::Action>>,
),
>,
mut commands: Commands,
) -> Result {
receivers.iter_mut().try_for_each(|(client_entity, link_of, mut receiver, client_id, rebroadcaster)| {
let server_entity = link_of.server;
let tick = timeline.tick();
receiver.receive().try_for_each(|message| {
#[cfg(feature = "prediction")]
let mut message = message;
if client_id.is_local() && !config.rebroadcast_inputs {
error!("Received input message from HostClient for action {:?} even though rebroadcasting is disabled. Ignoring the message.", DebugName::type_name::<S::Action>().shortname());
return Ok(())
}
trace!(?tick, ?client_id, action = ?DebugName::type_name::<S::Action>().shortname(), ?message.end_tick, ?message.inputs, "received input message");
trace!(
target: "lightyear_debug::input",
kind = "server_input_message_recv",
schedule = "PreUpdate",
sample_point = "PreUpdate",
entity = ?client_entity,
server_entity = ?server_entity,
client_id = ?client_id.0,
action = ?DebugName::type_name::<S::Action>(),
local_tick = tick.0,
end_tick = message.end_tick.0,
num_targets = message.inputs.len(),
rebroadcast = message.rebroadcast,
message = ?message,
"server received input message"
);
if !is_input_within_lookahead(message.end_tick, tick) {
trace!(
?tick,
?client_id,
end_tick = ?message.end_tick,
"Dropping input message: end_tick outside [server-{}, server+{}] window",
MAX_INPUT_PAST_TICKS,
MAX_INPUT_LOOKAHEAD_TICKS,
);
return Ok(())
}
#[cfg(feature = "interpolation")]
if let Some(interpolation_delay) = message.interpolation_delay {
commands.entity(client_entity).insert(interpolation_delay);
}
#[cfg(feature = "prediction")]
if config.rebroadcast_inputs && let Ok(server) = server.get(server_entity) {
if !message.rebroadcast {
for input in message.inputs.iter_mut() {
if let InputTarget::PreSpawned(hash) = input.target
&& let Some(server_e) = prespawned.iter()
.find_map(|(e, p)| p.hash.is_some_and(|h| h == hash).then_some(e))
{
input.target = InputTarget::Entity(server_e);
}
}
debug!(action = ?DebugName::type_name::<S>().shortname(), "Rebroadcast input message {message:?} from client {client_id:?} with rebroadcaster {rebroadcaster:?}");
message.rebroadcast = true;
trace!(
target: "lightyear_debug::input",
kind = "server_input_rebroadcast",
schedule = "PreUpdate",
sample_point = "PreUpdate",
entity = ?client_entity,
server_entity = ?server_entity,
client_id = ?client_id.0,
action = ?DebugName::type_name::<S::Action>(),
local_tick = tick.0,
end_tick = message.end_tick.0,
rebroadcaster = ?rebroadcaster,
num_targets = message.inputs.len(),
"server rebroadcasting input message"
);
match rebroadcaster {
None => {
sender.send::<_, InputChannel>(
&message,
server,
&NetworkTarget::AllExceptSingle(client_id.0)
)?;
}
Some(InputRebroadcaster::Room(room)) => {
let targets: bevy_ecs::entity::EntityHashSet = rooms_query.iter()
.filter(|(e, rooms)| *e != client_entity && rooms.contains_room(*room))
.map(|(e, _)| e)
.collect();
sender.send_to_entities::<_, InputChannel>(
&message,
&targets
)?;
},
Some(InputRebroadcaster::Target(target)) => {
sender.send::<_, InputChannel>(
&message,
server,
target
)?;
}
Some(InputRebroadcaster::Marker(_)) => unreachable!()
}
}
}
for data in message.inputs {
let Some(entity) = (match data.target {
InputTarget::Entity(entity) => {
Some(entity)
},
InputTarget::PreSpawned(hash) => {
debug!(?hash, "Received input for prespawned entity");
prespawned
.iter()
.filter_map(|(e, p)| p.hash.is_some_and(|h| h == hash).then_some(e)).next()
}
}) else {
debug!(?data.states, ?data.target, end_tick = ?message.end_tick, "received input message for unrecognized entity");
continue
};
if let Ok(buffer) = query.get_mut(entity) {
if let Some(mut buffer) = buffer {
trace!(
"Updating InputBuffer: {} using: {:?}",
buffer.as_ref(),
data.states
);
let previous_last_remote_tick = buffer.last_remote_tick;
if let Some((rewrite_tick, previous, incoming)) =
detect_input_history_rewrite::<S>(
data.states.clone(),
&buffer,
message.end_tick,
tick_duration.0,
)
{
if rewrite_tick < tick {
error!(
target: "lightyear_debug::input",
kind = "server_input_history_rewrite",
schedule = "PreUpdate",
sample_point = "PreUpdate",
entity = ?entity,
client_entity = ?client_entity,
client_id = ?client_id.0,
action = ?DebugName::type_name::<S::Action>(),
local_tick = tick.0,
rewrite_tick = rewrite_tick.0,
end_tick = message.end_tick.0,
previous_last_remote_tick = ?previous_last_remote_tick,
already_simulated = true,
previous = ?previous,
incoming = ?incoming,
buffer_len = buffer.len(),
input_buffer = %*buffer,
"server received a different input for an already-simulated tick"
);
} else {
trace!(
target: "lightyear_debug::input",
kind = "server_input_history_rewrite",
schedule = "PreUpdate",
sample_point = "PreUpdate",
entity = ?entity,
client_entity = ?client_entity,
client_id = ?client_id.0,
action = ?DebugName::type_name::<S::Action>(),
local_tick = tick.0,
rewrite_tick = rewrite_tick.0,
end_tick = message.end_tick.0,
previous_last_remote_tick = ?previous_last_remote_tick,
already_simulated = false,
previous = ?previous,
incoming = ?incoming,
buffer_len = buffer.len(),
input_buffer = %*buffer,
"server received a different input for a future tick already covered by an earlier client input packet"
);
}
}
let mismatch =
data.states
.update_buffer(&mut buffer, message.end_tick, tick_duration.0);
if let Some(mismatch_tick) = mismatch
&& mismatch_tick < tick
{
error!(
target: "lightyear_debug::input",
kind = "server_late_input_mismatch",
schedule = "PreUpdate",
sample_point = "PreUpdate",
entity = ?entity,
client_entity = ?client_entity,
client_id = ?client_id.0,
action = ?DebugName::type_name::<S::Action>(),
local_tick = tick.0,
mismatch_tick = mismatch_tick.0,
end_tick = message.end_tick.0,
previous_last_remote_tick = ?previous_last_remote_tick,
last_remote_tick = ?buffer.last_remote_tick,
buffer_len = buffer.len(),
input_buffer = %*buffer,
"server received an input correction for an already-simulated tick"
);
}
trace!(
target: "lightyear_debug::input",
kind = "server_input_buffer_update",
schedule = "PreUpdate",
sample_point = "PreUpdate",
entity = ?entity,
client_id = ?client_id.0,
action = ?DebugName::type_name::<S::Action>(),
local_tick = tick.0,
end_tick = message.end_tick.0,
buffer_len = buffer.len(),
input_buffer = %*buffer,
"server updated input buffer"
);
} else {
debug!("Adding InputBuffer and ActionState which are missing on the entity");
let mut buffer = InputBuffer::<S::Snapshot, S::Action>::default();
let mismatch =
data.states
.update_buffer(&mut buffer, message.end_tick, tick_duration.0);
if let Some(mismatch_tick) = mismatch
&& mismatch_tick < tick
{
error!(
target: "lightyear_debug::input",
kind = "server_late_input_mismatch",
schedule = "PreUpdate",
sample_point = "PreUpdate",
entity = ?entity,
client_entity = ?client_entity,
client_id = ?client_id.0,
action = ?DebugName::type_name::<S::Action>(),
local_tick = tick.0,
mismatch_tick = mismatch_tick.0,
end_tick = message.end_tick.0,
previous_last_remote_tick = ?None::<lightyear_core::tick::Tick>,
last_remote_tick = ?buffer.last_remote_tick,
buffer_len = buffer.len(),
input_buffer = %buffer,
"server received initial input for an already-simulated tick"
);
}
trace!(
target: "lightyear_debug::input",
kind = "server_input_buffer_insert",
schedule = "PreUpdate",
sample_point = "PreUpdate",
entity = ?entity,
client_id = ?client_id.0,
action = ?DebugName::type_name::<S::Action>(),
local_tick = tick.0,
end_tick = message.end_tick.0,
buffer_len = buffer.len(),
input_buffer = %buffer,
"server inserted input buffer"
);
commands.entity(entity).insert((
buffer,
S::State::base_value()
));
}
} else {
debug!(?entity, ?data.states, end_tick = ?message.end_tick, "received input message for non-existing entity");
}
}
Ok(())
})
})
}
fn detect_input_history_rewrite<S: ActionStateSequence>(
states: S,
input_buffer: &InputBuffer<S::Snapshot, S::Action>,
end_tick: lightyear_core::tick::Tick,
tick_duration: Duration,
) -> Option<(
lightyear_core::tick::Tick,
Option<S::Snapshot>,
Option<S::Snapshot>,
)> {
let last_remote_tick = input_buffer.last_remote_tick?;
let buffer_start_tick = input_buffer.start_tick?;
let buffer_end_tick = input_buffer.end_tick()?;
let start_tick = end_tick + 1 - states.len() as u32;
let mut incoming = None;
for (delta, input) in states.get_snapshots_from_message(tick_duration).enumerate() {
let tick = start_tick + lightyear_core::tick::Tick(delta as u32);
match input {
crate::input_buffer::Compressed::Absent => incoming = None,
crate::input_buffer::Compressed::Input(value) => incoming = Some(value),
crate::input_buffer::Compressed::SameAsPrecedent => {}
}
if tick <= last_remote_tick {
if tick < buffer_start_tick || tick > buffer_end_tick {
continue;
}
let previous = input_buffer.get(tick).cloned();
if previous != incoming {
return Some((tick, previous, incoming));
}
}
}
None
}
fn update_action_state<S: ActionStateSequence>(
timeline: Res<LocalTimeline>,
server: Single<(Entity, Has<HostServer>), With<Started>>,
mut action_state_query: Query<(
Entity,
StateMut<S>,
&mut InputBuffer<S::Snapshot, S::Action>,
)>,
) {
let (server, host_client) = server.into_inner();
let tick = timeline.tick();
for (entity, action_state, mut input_buffer) in action_state_query.iter_mut() {
trace!(?tick, ?server, ?input_buffer, "input buffer on server");
if let Some(snapshot) = input_buffer.get_predict(tick) {
S::from_snapshot_transitions(S::State::into_inner(action_state), snapshot);
trace!(
?tick,
?entity,
"action state after update. Input Buffer: {}",
input_buffer.as_ref()
);
trace!(
target: "lightyear_debug::input",
kind = "server_update_action_state",
schedule = "FixedPreUpdate",
sample_point = "FixedPreUpdate",
entity = ?entity,
server_entity = ?server,
action = ?DebugName::type_name::<S::Action>(),
local_tick = tick.0,
input_tick = tick.0,
host_client,
snapshot = ?snapshot,
buffer_len = input_buffer.len(),
input_buffer = %input_buffer.as_ref(),
"server applied input buffer to action state"
);
#[cfg(feature = "metrics")]
{
metrics::gauge!(format!(
"inputs::{}::{}::buffer_size",
DebugName::type_name::<S::Action>(),
entity
))
.set(input_buffer.len() as f64);
}
}
let history_depth = if host_client {
HISTORY_DEPTH
} else {
1
};
input_buffer.pop_keeping_last(tick - history_depth);
}
}
#[cfg(test)]
mod lookahead_tests {
use super::*;
#[test]
fn accepts_within_forward_bound() {
let server = Tick(1_000);
assert!(is_input_within_lookahead(server, server));
assert!(is_input_within_lookahead(
server + MAX_INPUT_LOOKAHEAD_TICKS,
server
));
}
#[test]
fn rejects_beyond_forward_bound() {
let server = Tick(1_000);
assert!(!is_input_within_lookahead(
server + (MAX_INPUT_LOOKAHEAD_TICKS + 1),
server
));
}
#[test]
fn accepts_within_past_bound() {
let server = Tick(1_000);
assert!(is_input_within_lookahead(
server + (-MAX_INPUT_PAST_TICKS),
server
));
}
#[test]
fn rejects_far_future_end_tick() {
let server = Tick(1_000);
assert!(!is_input_within_lookahead(server + 30_000, server));
}
#[test]
fn rejects_wraparound_to_i32_min() {
let server = Tick(1_000);
let wrapped = Tick(server.0.wrapping_add(1 << 31));
assert_eq!(wrapped - server, i32::MIN);
assert!(!is_input_within_lookahead(wrapped, server));
}
}