use crate::mode::CatchUpMode;
use bevy_app::{App, PreUpdate};
use bevy_ecs::entity::Entity;
use bevy_ecs::prelude::*;
use bevy_replicon::client::server_mutate_ticks::ServerMutateTicks;
use bevy_replicon::prelude::RepliconTick;
use core::time::Duration;
use lightyear_connection::client::{Client, Disconnect};
use lightyear_core::prelude::LocalTimeline;
use lightyear_core::tick::Tick;
use lightyear_inputs::client::InputSystems;
use lightyear_messages::prelude::{MessageSender, RemoteEvent};
use lightyear_prediction::prelude::{
LastConfirmedInput, PredictionManager, PredictionSystems, StateRollbackMetadata,
};
use lightyear_prediction::rollback::CatchUpGated;
use lightyear_replication::metadata::MetadataChannel;
use lightyear_replication::prelude::ReplicationSystems;
use lightyear_sync::prelude::{InputTimeline, IsSynced};
use tracing::{debug, info, warn};
use super::{CatchUpRequest, CatchUpSnapshotReady, CatchUpSystems};
#[derive(Resource, Clone, Copy, Debug)]
pub struct CatchUpClientTimeout {
pub duration: Duration,
}
const CATCH_UP_REQUEST_RETRY_TICKS: i32 = 16;
const CATCH_UP_MAX_REQUESTS: u8 = 10;
impl Default for CatchUpClientTimeout {
fn default() -> Self {
Self {
duration: Duration::from_secs(1),
}
}
}
#[derive(Clone, Debug)]
pub(crate) struct PendingCatchUpSnapshot {
pub(crate) server_tick: Tick,
pub(crate) replicon_tick: RepliconTick,
}
#[derive(Component, Debug, Default)]
pub struct CatchUpManager {
pub(crate) completed: bool,
pub(crate) pending_snapshot: Option<PendingCatchUpSnapshot>,
pub(crate) requests_sent: u8,
pub(crate) request_sent_at_tick: Option<Tick>,
pub(crate) request_input_safe_tick: Option<Tick>,
pub(crate) last_emitted_replicon_tick: Option<RepliconTick>,
pub(crate) suppress_checksums: bool,
}
impl CatchUpManager {
pub fn suppresses_checksums(&self) -> bool {
self.suppress_checksums
}
}
pub(crate) fn build(app: &mut App) {
app.init_resource::<CatchUpClientTimeout>();
app.register_required_components::<Client, CatchUpManager>();
app.add_observer(on_receive_catchup_gated);
app.add_observer(receive_catch_up_snapshot_ready);
app.configure_sets(
PreUpdate,
(
CatchUpSystems::SendCatchUpRequest,
CatchUpSystems::TriggerCatchUpRollback.after(InputSystems::ReceiveInputMessages),
)
.run_if(initial_catchup_is_active)
.after(ReplicationSystems::Receive)
.before(PredictionSystems::Rollback),
);
app.add_systems(
PreUpdate,
(
send_catchup_request.in_set(CatchUpSystems::SendCatchUpRequest),
trigger_snapshot_rollback.in_set(CatchUpSystems::TriggerCatchUpRollback),
),
);
}
fn initial_catchup_is_active(
mode: Option<Res<CatchUpMode>>,
manager: Option<Single<&CatchUpManager, With<Client>>>,
) -> bool {
let Some(mode) = mode else {
return false;
};
*mode != CatchUpMode::InputOnly && manager.is_some_and(|manager| !manager.completed)
}
pub(crate) fn send_catchup_request(
timeline: Res<LocalTimeline>,
client: Single<
(
Entity,
&mut CatchUpManager,
&LastConfirmedInput,
&mut MessageSender<CatchUpRequest>,
Has<IsSynced<InputTimeline>>,
),
With<Client>,
>,
awaiting: Query<Entity, With<CatchUpGated>>,
) {
let (client_entity, mut manager, last_confirmed_input, mut sender, is_synced) =
client.into_inner();
if !is_synced {
return;
}
if manager.completed {
return;
}
let local_tick = timeline.tick();
if !last_confirmed_input.received_for_all_clients {
return;
}
let Some(input_safe_tick) = last_confirmed_input.get() else {
return;
};
if awaiting.is_empty() || manager.pending_snapshot.is_some() {
return;
}
if manager
.request_sent_at_tick
.is_some_and(|sent_at_tick| local_tick - sent_at_tick < CATCH_UP_REQUEST_RETRY_TICKS)
{
return;
}
debug!(
?client_entity,
?input_safe_tick,
previous_input_safe_tick = ?manager.request_input_safe_tick,
"sending CatchUpRequest to server"
);
sender.send::<MetadataChannel>(CatchUpRequest { input_safe_tick });
manager.requests_sent += 1;
if manager.requests_sent > CATCH_UP_MAX_REQUESTS {
panic!(
"client {client_entity:?} has sent {} CatchUpRequests but still has no pending snapshot; \
this likely means the server is failing to respond to the request; \
check server logs for errors and verify that the server is configured to accept catch-up requests",
manager.requests_sent
);
}
manager.request_sent_at_tick = Some(local_tick);
manager.request_input_safe_tick = Some(input_safe_tick);
manager.suppress_checksums = true;
}
fn receive_catch_up_snapshot_ready(
trigger: On<RemoteEvent<CatchUpSnapshotReady>>,
mut manager: Single<&mut CatchUpManager, With<Client>>,
gated: Query<Entity, With<CatchUpGated>>,
mut commands: Commands,
) {
if manager.completed {
return;
}
let event = &trigger.event().trigger;
debug!(
?event.server_tick,
?event.replicon_tick,
"received replicated CatchUpSnapshotReady"
);
if event.is_not_required() {
debug!("server reported catch-up is not required");
commands.trigger(event.clone());
complete_catch_up(&mut manager, &gated, &mut commands);
return;
}
if manager
.pending_snapshot
.as_mut()
.is_none_or(|pending| pending.server_tick < event.server_tick)
{
manager.pending_snapshot = Some(PendingCatchUpSnapshot {
server_tick: event.server_tick,
replicon_tick: event.replicon_tick,
});
}
}
fn on_receive_catchup_gated(
add: On<Add, CatchUpGated>,
timeline: Res<LocalTimeline>,
mut manager: Single<&mut CatchUpManager, With<Client>>,
mut commands: Commands,
) {
if !manager.completed {
manager.suppress_checksums = true;
} else {
let tick = timeline.tick();
commands.trigger(CatchUpSnapshotReady {
replicon_tick: RepliconTick::new(tick.0),
server_tick: tick,
});
commands.entity(add.entity).remove::<CatchUpGated>();
}
}
pub(crate) fn trigger_snapshot_rollback(
timeline: Res<LocalTimeline>,
manager: Single<
(
Entity,
&mut CatchUpManager,
&LastConfirmedInput,
&PredictionManager,
),
With<Client>,
>,
server_mutate_ticks: Res<ServerMutateTicks>,
mut state_metadata: ResMut<StateRollbackMetadata>,
gated: Query<Entity, With<CatchUpGated>>,
mut commands: Commands,
) {
let (client_entity, mut manager, last_confirmed_input, prediction_manager) =
manager.into_inner();
if manager.completed {
return;
}
let Some(snapshot) = manager.pending_snapshot.clone() else {
return;
};
let snapshot_replicon_tick = snapshot.replicon_tick;
let snapshot_server_tick = snapshot.server_tick;
let local_tick = timeline.tick();
let rollback_delta = local_tick - snapshot_server_tick;
if rollback_delta < 0 {
return;
}
let activation_headroom =
i32::from(manager.last_emitted_replicon_tick != Some(snapshot_replicon_tick));
let max_rollback_ticks = i32::from(prediction_manager.rollback_policy.max_rollback_ticks);
if rollback_delta + activation_headroom > max_rollback_ticks {
warn!(
?client_entity,
?local_tick,
?snapshot_server_tick,
?snapshot_replicon_tick,
rollback_delta,
max_rollback_ticks,
"disconnecting client because deterministic catch-up snapshot is too old"
);
commands.trigger(Disconnect {
entity: client_entity,
});
return;
}
if !last_confirmed_input.received_for_all_clients
|| last_confirmed_input
.get()
.is_none_or(|t| t < snapshot_server_tick)
{
return;
}
if manager.last_emitted_replicon_tick == Some(snapshot_replicon_tick) {
state_metadata.request_forced_rollback(snapshot_server_tick);
info!("Triggering catchup rollback since snapshot tick: {snapshot_server_tick:?}");
complete_catch_up(&mut manager, &gated, &mut commands);
return;
}
if !server_mutate_ticks.contains(snapshot_replicon_tick) {
return;
}
manager.last_emitted_replicon_tick = Some(snapshot_replicon_tick);
commands.trigger(CatchUpSnapshotReady {
replicon_tick: snapshot_replicon_tick,
server_tick: snapshot_server_tick,
});
}
fn complete_catch_up(
manager: &mut CatchUpManager,
gated: &Query<Entity, With<CatchUpGated>>,
commands: &mut Commands,
) {
for entity in gated.iter() {
commands.entity(entity).remove::<CatchUpGated>();
}
manager.completed = true;
manager.pending_snapshot = None;
manager.requests_sent = 0;
manager.request_sent_at_tick = None;
manager.request_input_safe_tick = None;
manager.last_emitted_replicon_tick = None;
manager.suppress_checksums = false;
}