use crate::InputChannel;
use crate::config::InputConfig;
use crate::input_buffer::InputBuffer;
use crate::input_message::{ActionStateSequence, InputMessage, InputTarget, PerTargetData};
use crate::plugin::InputPlugin;
#[cfg(feature = "metrics")]
use alloc::format;
use alloc::{vec, vec::Vec};
use bevy_app::{
App, FixedPostUpdate, FixedPreUpdate, Plugin, PostUpdate, RunFixedMainLoop,
RunFixedMainLoopSystem,
};
use bevy_ecs::{
entity::{Entity, MapEntities},
observer::Trigger,
query::{Has, With, Without},
resource::Resource,
schedule::{IntoScheduleConfigs, SystemSet},
system::{Commands, Query, Res, ResMut, Single, StaticSystemParam},
};
use lightyear_connection::host::HostClient;
#[cfg(feature = "interpolation")]
use lightyear_core::prelude::Tick;
use lightyear_core::prelude::{NetworkTimeline, Rollback};
use lightyear_core::tick::TickDuration;
#[cfg(feature = "interpolation")]
use lightyear_core::time::TickDelta;
use lightyear_core::timeline::LocalTimeline;
use lightyear_core::timeline::SyncEvent;
#[cfg(feature = "interpolation")]
use lightyear_interpolation::plugin::InterpolationDelay;
#[cfg(feature = "interpolation")]
use lightyear_interpolation::prelude::InterpolationTimeline;
use lightyear_messages::MessageManager;
use lightyear_messages::plugin::MessageSet;
use lightyear_messages::prelude::{MessageReceiver, MessageSender};
use lightyear_prediction::Predicted;
use lightyear_replication::components::{Confirmed, PrePredicted};
use lightyear_sync::plugin::SyncSet;
use lightyear_sync::prelude::InputTimeline;
use lightyear_sync::prelude::client::IsSynced;
use lightyear_transport::channel::ChannelKind;
use lightyear_transport::prelude::ChannelRegistry;
use tracing::{debug, error, trace};
#[derive(SystemSet, Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum InputSet {
ReceiveInputMessages,
WriteClientInputs,
BufferClientInputs,
PrepareInputMessage,
RestoreInputs,
SendInputMessage,
CleanUp,
}
pub struct ClientInputPlugin<S: ActionStateSequence> {
config: InputConfig<S::Action>,
}
impl<S: ActionStateSequence> ClientInputPlugin<S> {
pub fn new(config: InputConfig<S::Action>) -> Self {
Self { config }
}
}
impl<S: ActionStateSequence> ClientInputPlugin<S> {
fn default() -> Self {
Self::new(InputConfig::default())
}
}
impl<S: ActionStateSequence + MapEntities> Plugin for ClientInputPlugin<S> {
fn build(&self, app: &mut App) {
if !app.is_plugin_added::<InputPlugin<S>>() {
app.add_plugins(InputPlugin::<S>::default());
}
app.insert_resource(self.config);
app.init_resource::<MessageBuffer<S>>();
app.configure_sets(
RunFixedMainLoop,
InputSet::ReceiveInputMessages
.before(RunFixedMainLoopSystem::FixedMainLoop)
.after(RunFixedMainLoopSystem::BeforeFixedMainLoop),
);
app.configure_sets(
FixedPreUpdate,
(InputSet::WriteClientInputs, InputSet::BufferClientInputs).chain(),
);
app.configure_sets(
FixedPostUpdate,
(
InputSet::PrepareInputMessage,
InputSet::RestoreInputs,
)
.chain(),
);
app.configure_sets(
PostUpdate,
((
SyncSet::Sync,
InputSet::SendInputMessage,
InputSet::CleanUp,
MessageSet::Send,
)
.chain(),),
);
if self.config.rebroadcast_inputs {
app.add_systems(
RunFixedMainLoop,
receive_remote_player_input_messages::<S>.in_set(InputSet::ReceiveInputMessages),
);
}
app.add_systems(
FixedPreUpdate,
(buffer_action_state::<S>, get_action_state::<S>)
.chain()
.in_set(InputSet::BufferClientInputs),
);
app.add_systems(
FixedPostUpdate,
(
get_delayed_action_state::<S>.in_set(InputSet::RestoreInputs),
prepare_input_message::<S>.in_set(InputSet::PrepareInputMessage),
),
);
app.add_systems(
PostUpdate,
(
clean_buffers::<S>.in_set(InputSet::CleanUp),
send_input_messages::<S>.in_set(InputSet::SendInputMessage),
),
);
app.add_observer(receive_tick_events::<S>);
}
}
fn buffer_action_state<S: ActionStateSequence>(
context: StaticSystemParam<S::Context>,
sender: Single<(&InputTimeline, &LocalTimeline), Without<Rollback>>,
mut action_state_query: Query<
(Entity, &S::State, &mut InputBuffer<S::Snapshot>),
With<S::Marker>,
>,
) {
let (input_timeline, local_timeline) = sender.into_inner();
let current_tick = local_timeline.tick();
let tick = current_tick + input_timeline.input_delay() as i16;
for (entity, action_state, mut input_buffer) in action_state_query.iter_mut() {
InputBuffer::set(
&mut input_buffer,
tick,
S::to_snapshot(action_state, &context),
);
trace!(
?entity,
?current_tick,
delayed_tick = ?tick,
input_buffer = %input_buffer.as_ref(),
"set action state in input buffer",
);
#[cfg(feature = "metrics")]
{
metrics::gauge!(format!(
"inputs::{}::{}::buffer_size",
core::any::type_name::<S::Action>(),
entity
))
.set(input_buffer.len() as f64);
}
}
}
fn get_action_state<S: ActionStateSequence>(
context: StaticSystemParam<S::Context>,
sender: Single<(&LocalTimeline, &InputTimeline, Has<Rollback>), Without<HostClient>>,
mut action_state_query: Query<(Entity, &mut S::State, &InputBuffer<S::Snapshot>)>,
) {
let (local_timeline, input_timeline, is_rollback) = sender.into_inner();
let input_delay = input_timeline.input_delay() as i16;
if !is_rollback && input_delay == 0 {
return;
}
let tick = local_timeline.tick();
for (entity, mut action_state, input_buffer) in action_state_query.iter_mut() {
if let Some(snapshot) = input_buffer.get(tick) {
S::from_snapshot(action_state.as_mut(), snapshot, &context);
trace!(
?entity,
?tick,
"fetched action state from input buffer: {:?}",
input_buffer
);
}
}
}
fn get_delayed_action_state<S: ActionStateSequence>(
context: StaticSystemParam<S::Context>,
sender: Query<(&InputTimeline, &LocalTimeline, Has<Rollback>), With<IsSynced<InputTimeline>>>,
mut action_state_query: Query<
(Entity, &mut S::State, &InputBuffer<S::Snapshot>),
With<S::Marker>,
>,
) {
let Ok((input_timeline, local_timeline, is_rollback)) = sender.single() else {
return;
};
let input_delay_ticks = input_timeline.input_delay() as i16;
if is_rollback || input_delay_ticks == 0 {
return;
}
let delayed_tick = local_timeline.tick() + input_delay_ticks;
for (entity, mut action_state, input_buffer) in action_state_query.iter_mut() {
if let Some(delayed_action_state) = input_buffer.get(delayed_tick) {
S::from_snapshot(action_state.as_mut(), delayed_action_state, &context);
trace!(
?entity,
?delayed_tick,
"fetched delayed action state from input buffer: {}",
input_buffer
);
}
}
}
fn clean_buffers<S: ActionStateSequence>(
sender: Query<&LocalTimeline, (With<InputTimeline>, Without<HostClient>)>,
mut input_buffer_query: Query<&mut InputBuffer<S::Snapshot>>,
) {
let Ok(local_timeline) = sender.single() else {
return;
};
let old_tick = local_timeline.tick() - 20;
for mut input_buffer in input_buffer_query.iter_mut() {
input_buffer.pop(old_tick);
}
}
#[derive(Debug, Resource)]
pub(crate) struct MessageBuffer<S>(Vec<InputMessage<S>>);
impl<A> Default for MessageBuffer<A> {
fn default() -> Self {
Self(vec![])
}
}
fn prepare_input_message<S: ActionStateSequence>(
mut message_buffer: ResMut<MessageBuffer<S>>,
tick_duration: Res<TickDuration>,
input_config: Res<InputConfig<S::Action>>,
sender: Single<
(
&LocalTimeline,
&InputTimeline,
&MessageManager,
Has<HostClient>,
),
(With<IsSynced<InputTimeline>>, Without<Rollback>),
>,
channel_registry: Res<ChannelRegistry>,
input_buffer_query: Query<
(
Entity,
&InputBuffer<S::Snapshot>,
Option<&Predicted>,
Option<&PrePredicted>,
),
With<S::Marker>,
>,
) {
let (local_timeline, input_timeline, message_manager, is_host_client) = sender.into_inner();
if is_host_client && !input_config.rebroadcast_inputs {
return;
}
let current_tick = local_timeline.tick();
let tick = current_tick + input_timeline.input_delay() as i16;
trace!(delayed_tick = ?tick, ?current_tick, "prepare_input_message");
let input_send_interval = channel_registry
.settings(ChannelKind::of::<InputChannel>())
.unwrap()
.send_frequency;
let mut num_tick: u16 = ((input_send_interval.as_nanos() / tick_duration.as_nanos()) + 1)
.try_into()
.unwrap();
num_tick *= input_config.packet_redundancy;
let mut message = InputMessage::<S>::new(tick);
for (entity, input_buffer, predicted, pre_predicted) in input_buffer_query.iter() {
trace!(
?tick,
?entity,
"Preparing input message with buffer: {:?}",
input_buffer
);
if let Some(target) = if is_host_client {
Some(InputTarget::PrePredictedEntity(entity))
} else if pre_predicted.is_some() {
if predicted.is_none() {
continue;
}
trace!(
?tick,
"sending inputs for pre-predicted entity! Local client entity: {:?}", entity
);
Some(InputTarget::PrePredictedEntity(entity))
} else {
if let Some(confirmed) = predicted.map_or(Some(entity), |p| p.confirmed_entity) {
message_manager.entity_mapper.get_remote(confirmed).map(|server_entity|{
trace!("sending input for server entity: {:?}. local entity: {:?}, confirmed: {:?}", server_entity, entity, confirmed);
InputTarget::Entity(server_entity)
})
} else {
trace!("not sending inputs because couldnt find server entity");
None
}
} {
if let Some(state_sequence) = S::build_from_input_buffer(input_buffer, num_tick, tick) {
message.inputs.push(PerTargetData {
target,
states: state_sequence,
});
}
}
}
trace!(
?tick,
?num_tick,
"sending input message for {:?}: {:?}",
core::any::type_name::<S::Action>(),
message
);
message_buffer.0.push(message);
}
fn receive_remote_player_input_messages<S: ActionStateSequence>(
mut commands: Commands,
link: Single<
(
&MessageManager,
&mut MessageReceiver<InputMessage<S>>,
&LocalTimeline,
),
With<IsSynced<InputTimeline>>,
>,
confirmed_query: Query<&Confirmed, Without<S::Marker>>,
mut predicted_query: Query<
Option<&mut InputBuffer<S::Snapshot>>,
(With<Predicted>, Without<S::Marker>, Without<HostClient>),
>,
) {
let (manager, mut receiver, timeline) = link.into_inner();
let tick = timeline.tick();
receiver.receive().for_each(|message| {
trace!(?message.end_tick, ?message, "received remote input message for action: {:?}", core::any::type_name::<S::Action>());
for target_data in message.inputs {
let entity = match target_data.target {
InputTarget::Entity(entity) => {
manager
.entity_mapper
.get_local(entity)
}
InputTarget::PrePredictedEntity(entity) => Some(entity),
};
if let Some(entity) = entity {
debug!(
"received input message for entity: {:?}. Applying to diff buffer.",
entity
);
if let Ok(confirmed) = confirmed_query.get(entity) {
if let Some(predicted) = confirmed.predicted {
if let Ok(input_buffer) = predicted_query.get_mut(predicted) {
trace!(confirmed= ?entity, ?predicted, end_tick = ?message.end_tick, "update action diff buffer for remote player PREDICTED using input message");
if let Some(mut input_buffer) = input_buffer {
target_data.states.update_buffer(&mut input_buffer, message.end_tick);
#[cfg(feature = "metrics")]
{
let margin = input_buffer.end_tick().unwrap() - tick;
metrics::gauge!(format!(
"inputs::{}::remote_player::{}::buffer_margin",
core::any::type_name::<S::Action>(),
entity
))
.set(margin as f64);
metrics::gauge!(format!(
"inputs::{}::remote_player::{}::buffer_size",
core::any::type_name::<S::Action>(),
entity
))
.set(input_buffer.len() as f64);
}
} else {
let mut input_buffer = InputBuffer::<S::Snapshot>::default();
target_data.states.update_buffer(&mut input_buffer, message.end_tick);
commands.entity(predicted).insert((
input_buffer,
S::State::default()
));
};
}
}
} else {
error!(?entity, ?target_data.states, end_tick = ?message.end_tick, "received input message for unrecognized entity");
}
} else {
error!("received remote player input message for unrecognized entity");
}
}
});
}
fn send_input_messages<S: ActionStateSequence>(
input_config: Res<InputConfig<S::Action>>,
mut message_buffer: ResMut<MessageBuffer<S>>,
sender: Single<
(&mut MessageSender<InputMessage<S>>, Has<HostClient>),
With<IsSynced<InputTimeline>>,
>,
#[cfg(feature = "interpolation")] interpolation_query: Single<
(&InputTimeline, &InterpolationTimeline),
(
With<IsSynced<InterpolationTimeline>>,
With<IsSynced<InputTimeline>>,
),
>,
) {
let (mut sender, is_host_client) = sender.into_inner();
if is_host_client && !input_config.rebroadcast_inputs {
message_buffer.0.clear();
return;
}
trace!(
"Number of input messages to send: {:?}",
message_buffer.0.len()
);
#[cfg(feature = "interpolation")]
let interpolation_delay = {
let (input_timeline, interpolation_timeline) = interpolation_query.into_inner();
let mut delay = input_timeline.now() - interpolation_timeline.now();
if delay.is_negative() {
delay = TickDelta::from(Tick(0));
}
InterpolationDelay {
delay: delay.into(),
}
};
#[cfg_attr(
not(feature = "interpolation"),
expect(unused_mut, reason = "Used by expression behind feature flag")
)]
for mut message in message_buffer.0.drain(..) {
#[cfg(feature = "interpolation")]
if input_config.lag_compensation {
message.interpolation_delay = Some(interpolation_delay);
}
sender.send::<InputChannel>(message);
}
}
fn receive_tick_events<S: ActionStateSequence>(
trigger: Trigger<SyncEvent<InputTimeline>>,
mut message_buffer: ResMut<MessageBuffer<S>>,
mut input_buffer_query: Query<&mut InputBuffer<S::Snapshot>>,
) {
let delta = trigger.tick_delta;
for mut input_buffer in input_buffer_query.iter_mut() {
if let Some(start_tick) = input_buffer.start_tick {
input_buffer.start_tick = Some(start_tick + delta);
debug!(
"Receive tick snap event {:?}. Updating input buffer start_tick to {:?}!",
trigger.event(),
input_buffer.start_tick
);
}
}
for message in message_buffer.0.iter_mut() {
message.end_tick = message.end_tick + delta;
}
}