lightyear_deterministic_replication 0.27.0

Primitives for deterministic replication (as opposed to state replication) in the lightyear networking library
Documentation
use crate::mode::CatchUpMode;
use bevy_app::{App, PreUpdate};
use bevy_ecs::entity::Entity;
use bevy_ecs::prelude::*;
use bevy_replicon::client::server_mutate_ticks::ServerMutateTicks;
use bevy_replicon::prelude::RepliconTick;
use core::time::Duration;
use lightyear_connection::client::{Client, Disconnect};
use lightyear_core::prelude::LocalTimeline;
use lightyear_core::tick::Tick;
use lightyear_inputs::client::InputSystems;
use lightyear_messages::prelude::{MessageSender, RemoteEvent};
use lightyear_prediction::prelude::{
    LastConfirmedInput, PredictionManager, PredictionSystems, StateRollbackMetadata,
};
use lightyear_prediction::rollback::CatchUpGated;
use lightyear_replication::metadata::MetadataChannel;
use lightyear_replication::prelude::ReplicationSystems;
use lightyear_sync::prelude::{InputTimeline, IsSynced};
use tracing::{debug, info, warn};

use super::{CatchUpRequest, CatchUpSnapshotReady, CatchUpSystems};

/// Client-side timeout for an in-flight catch-up request.
///
/// This is intentionally a hard panic by default. A client that requested a
/// state-based catch-up but never activates it is running from stale state and
/// should fail loudly instead of silently producing misleading checksums.
#[derive(Resource, Clone, Copy, Debug)]
pub struct CatchUpClientTimeout {
    pub duration: Duration,
}

const CATCH_UP_REQUEST_RETRY_TICKS: i32 = 16;
const CATCH_UP_MAX_REQUESTS: u8 = 10;

impl Default for CatchUpClientTimeout {
    fn default() -> Self {
        Self {
            duration: Duration::from_secs(1),
        }
    }
}

#[derive(Clone, Debug)]
pub(crate) struct PendingCatchUpSnapshot {
    pub(crate) server_tick: Tick,
    pub(crate) replicon_tick: RepliconTick,
}

/// Client-side catch-up state stored on the client link entity.
///
/// The manager owns all per-client metadata for the initial catch-up flow:
/// input coverage, request retry state, accepted snapshot state, and completion
/// status.
#[derive(Component, Debug, Default)]
pub struct CatchUpManager {
    pub(crate) completed: bool,
    pub(crate) pending_snapshot: Option<PendingCatchUpSnapshot>,
    pub(crate) requests_sent: u8,
    pub(crate) request_sent_at_tick: Option<Tick>,
    pub(crate) request_input_safe_tick: Option<Tick>,
    pub(crate) last_emitted_replicon_tick: Option<RepliconTick>,
    pub(crate) suppress_checksums: bool,
}

impl CatchUpManager {
    /// Returns true while the client is running with intentionally stale
    /// deterministic state and checksum sends should be suppressed.
    pub fn suppresses_checksums(&self) -> bool {
        self.suppress_checksums
    }
}

pub(crate) fn build(app: &mut App) {
    app.init_resource::<CatchUpClientTimeout>();
    app.register_required_components::<Client, CatchUpManager>();
    app.add_observer(on_receive_catchup_gated);
    app.add_observer(receive_catch_up_snapshot_ready);
    app.configure_sets(
        PreUpdate,
        (
            CatchUpSystems::SendCatchUpRequest,
            CatchUpSystems::TriggerCatchUpRollback.after(InputSystems::ReceiveInputMessages),
        )
            .run_if(initial_catchup_is_active)
            .after(ReplicationSystems::Receive)
            .before(PredictionSystems::Rollback),
    );
    app.add_systems(
        PreUpdate,
        (
            send_catchup_request.in_set(CatchUpSystems::SendCatchUpRequest),
            trigger_snapshot_rollback.in_set(CatchUpSystems::TriggerCatchUpRollback),
        ),
    );
}

fn initial_catchup_is_active(
    mode: Option<Res<CatchUpMode>>,
    manager: Option<Single<&CatchUpManager, With<Client>>>,
) -> bool {
    let Some(mode) = mode else {
        return false;
    };
    *mode != CatchUpMode::InputOnly && manager.is_some_and(|manager| !manager.completed)
}

/// Client system: sends a [`CatchUpRequest`] once replicated input state is
/// synced and the input plugin has confirmed a rollback-safe tick across
/// remote clients.
///
/// [`LastConfirmedInput`] is updated by the input plugin from all registered
/// remote input buffers. We use the previous frame's value here; being one
/// frame conservative is preferable to duplicating that input coverage logic.
pub(crate) fn send_catchup_request(
    timeline: Res<LocalTimeline>,
    client: Single<
        (
            Entity,
            &mut CatchUpManager,
            &LastConfirmedInput,
            &mut MessageSender<CatchUpRequest>,
            Has<IsSynced<InputTimeline>>,
        ),
        With<Client>,
    >,
    awaiting: Query<Entity, With<CatchUpGated>>,
) {
    let (client_entity, mut manager, last_confirmed_input, mut sender, is_synced) =
        client.into_inner();
    if !is_synced {
        return;
    }
    if manager.completed {
        return;
    }
    let local_tick = timeline.tick();
    if !last_confirmed_input.received_for_all_clients {
        return;
    }
    let Some(input_safe_tick) = last_confirmed_input.get() else {
        return;
    };
    if awaiting.is_empty() || manager.pending_snapshot.is_some() {
        return;
    }
    if manager
        .request_sent_at_tick
        .is_some_and(|sent_at_tick| local_tick - sent_at_tick < CATCH_UP_REQUEST_RETRY_TICKS)
    {
        return;
    }
    debug!(
        ?client_entity,
        ?input_safe_tick,
        previous_input_safe_tick = ?manager.request_input_safe_tick,
        "sending CatchUpRequest to server"
    );
    sender.send::<MetadataChannel>(CatchUpRequest { input_safe_tick });
    manager.requests_sent += 1;
    if manager.requests_sent > CATCH_UP_MAX_REQUESTS {
        panic!(
            "client {client_entity:?} has sent {} CatchUpRequests but still has no pending snapshot; \
             this likely means the server is failing to respond to the request; \
             check server logs for errors and verify that the server is configured to accept catch-up requests",
            manager.requests_sent
        );
    }
    manager.request_sent_at_tick = Some(local_tick);
    manager.request_input_safe_tick = Some(input_safe_tick);
    manager.suppress_checksums = true;
}

/// Client observer: stores the replicated catch-up snapshot metadata sent by
/// the server. The event is re-triggered locally only after the corresponding
/// Replicon checkpoint is confirmed.
fn receive_catch_up_snapshot_ready(
    trigger: On<RemoteEvent<CatchUpSnapshotReady>>,
    mut manager: Single<&mut CatchUpManager, With<Client>>,
    gated: Query<Entity, With<CatchUpGated>>,
    mut commands: Commands,
) {
    if manager.completed {
        return;
    }
    let event = &trigger.event().trigger;
    debug!(
        ?event.server_tick,
        ?event.replicon_tick,
        "received replicated CatchUpSnapshotReady"
    );
    if event.is_not_required() {
        debug!("server reported catch-up is not required");
        commands.trigger(event.clone());
        complete_catch_up(&mut manager, &gated, &mut commands);
        return;
    }
    if manager
        .pending_snapshot
        .as_mut()
        .is_none_or(|pending| pending.server_tick < event.server_tick)
    {
        manager.pending_snapshot = Some(PendingCatchUpSnapshot {
            server_tick: event.server_tick,
            replicon_tick: event.replicon_tick,
        });
    }
}

/// Client system: on receiving any CatchUpGated component, suppress checksums while
/// we wait to complete the catchup process
fn on_receive_catchup_gated(
    add: On<Add, CatchUpGated>,
    timeline: Res<LocalTimeline>,
    mut manager: Single<&mut CatchUpManager, With<Client>>,
    mut commands: Commands,
) {
    if !manager.completed {
        manager.suppress_checksums = true;
    } else {
        let tick = timeline.tick();
        commands.trigger(CatchUpSnapshotReady {
            replicon_tick: RepliconTick::new(tick.0),
            server_tick: tick,
        });
        commands.entity(add.entity).remove::<CatchUpGated>();
    }
}

/// Client system: trigger the catchup rollback after we confirm the snapshot tick has been
/// fully receive (by checking ServerMutateTicks)
pub(crate) fn trigger_snapshot_rollback(
    timeline: Res<LocalTimeline>,
    manager: Single<
        (
            Entity,
            &mut CatchUpManager,
            &LastConfirmedInput,
            &PredictionManager,
        ),
        With<Client>,
    >,
    server_mutate_ticks: Res<ServerMutateTicks>,
    mut state_metadata: ResMut<StateRollbackMetadata>,
    gated: Query<Entity, With<CatchUpGated>>,
    mut commands: Commands,
) {
    let (client_entity, mut manager, last_confirmed_input, prediction_manager) =
        manager.into_inner();
    if manager.completed {
        return;
    }
    let Some(snapshot) = manager.pending_snapshot.clone() else {
        return;
    };
    let snapshot_replicon_tick = snapshot.replicon_tick;
    let snapshot_server_tick = snapshot.server_tick;
    let local_tick = timeline.tick();
    let rollback_delta = local_tick - snapshot_server_tick;
    if rollback_delta < 0 {
        return;
    }
    // Local activation observers run before the forced rollback is requested,
    // so the snapshot must still fit the rollback window on the next pass.
    let activation_headroom =
        i32::from(manager.last_emitted_replicon_tick != Some(snapshot_replicon_tick));
    let max_rollback_ticks = i32::from(prediction_manager.rollback_policy.max_rollback_ticks);
    if rollback_delta + activation_headroom > max_rollback_ticks {
        warn!(
            ?client_entity,
            ?local_tick,
            ?snapshot_server_tick,
            ?snapshot_replicon_tick,
            rollback_delta,
            max_rollback_ticks,
            "disconnecting client because deterministic catch-up snapshot is too old"
        );
        commands.trigger(Disconnect {
            entity: client_entity,
        });
        return;
    }
    if !last_confirmed_input.received_for_all_clients
        || last_confirmed_input
            .get()
            .is_none_or(|t| t < snapshot_server_tick)
    {
        return;
    }
    if manager.last_emitted_replicon_tick == Some(snapshot_replicon_tick) {
        state_metadata.request_forced_rollback(snapshot_server_tick);
        info!("Triggering catchup rollback since snapshot tick: {snapshot_server_tick:?}");
        complete_catch_up(&mut manager, &gated, &mut commands);
        return;
    }
    if !server_mutate_ticks.contains(snapshot_replicon_tick) {
        return;
    }
    manager.last_emitted_replicon_tick = Some(snapshot_replicon_tick);
    commands.trigger(CatchUpSnapshotReady {
        replicon_tick: snapshot_replicon_tick,
        server_tick: snapshot_server_tick,
    });
}

fn complete_catch_up(
    manager: &mut CatchUpManager,
    gated: &Query<Entity, With<CatchUpGated>>,
    commands: &mut Commands,
) {
    for entity in gated.iter() {
        commands.entity(entity).remove::<CatchUpGated>();
    }
    manager.completed = true;
    manager.pending_snapshot = None;
    manager.requests_sent = 0;
    manager.request_sent_at_tick = None;
    manager.request_input_safe_tick = None;
    manager.last_emitted_replicon_tick = None;
    manager.suppress_checksums = false;
}