dreamwell-runtime 1.0.0

Dreamwell Runtime — cross-platform GPU-accelerated game client
Documentation
//! Reconciliation — apply canonical corrections at tick boundary.
//! When authority sends corrections, presentation interpolates through them.
//!
//! Despawns are deferred to end of tick to prevent swap_remove from
//! invalidating slot indices referenced by later events in the same batch.

use super::stage::StagingBuffer;
use crate::authority::AuthorityEvent;
use crate::mirror::{ClientMirror, MAX_MIRROR_ENTITIES};

/// Wire format stride: 3×f32 position + 4×f32 rotation = 28 bytes.
const SNAPSHOT_ENTITY_STRIDE: usize = std::mem::size_of::<[f32; 3]>() + std::mem::size_of::<[f32; 4]>();

/// Apply all staged authority events to the client mirror.
/// Called once per tick boundary, not per frame.
/// Drains the staging buffer in-place — no intermediate Vec allocation.
pub fn reconcile(staging: &mut StagingBuffer, mirror: &mut ClientMirror) {
    for event in staging.drain() {
        apply_event(&event, mirror);
    }
    // Flush deferred despawns after all other events processed.
    mirror.flush_despawns();
    mirror.advance_tick();
}

/// Apply a single authority event to the mirror.
fn apply_event(event: &AuthorityEvent, mirror: &mut ClientMirror) {
    match event {
        AuthorityEvent::Ack { .. } => {
            // Acknowledged intent — no mirror mutation needed.
        }
        AuthorityEvent::Reject { seq, reason } => {
            log::debug!("authority_reject:seq={seq} reason={reason}");
        }
        AuthorityEvent::CanonEvent {
            tick,
            event_type,
            payload,
        } => {
            apply_canon_event(*tick, event_type, payload, mirror);
        }
        AuthorityEvent::SnapshotChunk {
            chunk_id,
            total_chunks,
            data,
        } => {
            apply_snapshot_chunk(*chunk_id, *total_chunks, data, mirror);
        }
    }
}

/// Read a little-endian f32 from a byte slice at the given offset.
/// Returns None if the slice is too short.
#[inline]
fn read_f32(data: &[u8], offset: usize) -> Option<f32> {
    data.get(offset..offset + 4)
        .map(|b| f32::from_le_bytes([b[0], b[1], b[2], b[3]]))
}

/// Read a little-endian u32 from a byte slice at the given offset.
#[inline]
fn read_u32(data: &[u8], offset: usize) -> Option<u32> {
    data.get(offset..offset + 4)
        .map(|b| u32::from_le_bytes([b[0], b[1], b[2], b[3]]))
}

/// Apply a canonical event to the mirror state.
fn apply_canon_event(tick: u64, event_type: &str, payload: &[u8], mirror: &mut ClientMirror) {
    match event_type {
        "entity_move" => {
            // Payload: [entity_slot: u32, x: f32, y: f32, z: f32] = 16 bytes exact
            if payload.len() != 16 {
                log::warn!("reconcile:entity_move bad payload len={}", payload.len());
                return;
            }
            let slot = read_u32(payload, 0).unwrap_or(0) as usize;
            let x = read_f32(payload, 4).unwrap_or(0.0);
            let y = read_f32(payload, 8).unwrap_or(0.0);
            let z = read_f32(payload, 12).unwrap_or(0.0);
            if slot < mirror.positions.len() {
                mirror.positions[slot] = [x, y, z];
            }
        }
        "entity_rotate" => {
            // Payload: [entity_slot: u32, qx: f32, qy: f32, qz: f32, qw: f32] = 20 bytes exact
            if payload.len() != 20 {
                log::warn!("reconcile:entity_rotate bad payload len={}", payload.len());
                return;
            }
            let slot = read_u32(payload, 0).unwrap_or(0) as usize;
            let qx = read_f32(payload, 4).unwrap_or(0.0);
            let qy = read_f32(payload, 8).unwrap_or(0.0);
            let qz = read_f32(payload, 12).unwrap_or(0.0);
            let qw = read_f32(payload, 16).unwrap_or(0.0);
            if slot < mirror.rotations.len() {
                mirror.rotations[slot] = [qx, qy, qz, qw];
            }
        }
        "entity_spawn" => {
            // Extend mirror arrays for new entity, respecting cap.
            if mirror.positions.len() >= MAX_MIRROR_ENTITIES {
                log::warn!("reconcile:entity_spawn cap reached ({MAX_MIRROR_ENTITIES})");
                return;
            }
            mirror.positions.push([0.0; 3]);
            mirror.rotations.push([0.0, 0.0, 0.0, 1.0]);
            mirror.entity_count = mirror.positions.len();
        }
        "entity_despawn" => {
            // Payload: [entity_slot: u32] = 4 bytes exact
            if payload.len() != 4 {
                log::warn!("reconcile:entity_despawn bad payload len={}", payload.len());
                return;
            }
            let slot = read_u32(payload, 0).unwrap_or(0) as usize;
            // Defer to end of tick to avoid invalidating indices.
            mirror.queue_despawn(slot);
        }
        _ => {
            log::trace!("reconcile:unhandled event_type={event_type} tick={tick}");
        }
    }
}

/// Apply a snapshot chunk for initial sync or resync.
fn apply_snapshot_chunk(chunk_id: u32, total_chunks: u32, data: &[u8], mirror: &mut ClientMirror) {
    // Full snapshot: chunk 0 of 1 contains packed positions+rotations.
    if chunk_id == 0 && total_chunks == 1 {
        if data.len() % SNAPSHOT_ENTITY_STRIDE != 0 {
            log::warn!(
                "reconcile:snapshot data len {} not aligned to stride {}",
                data.len(),
                SNAPSHOT_ENTITY_STRIDE
            );
        }
        let count = (data.len() / SNAPSHOT_ENTITY_STRIDE).min(MAX_MIRROR_ENTITIES);
        let mut positions = Vec::with_capacity(count);
        let mut rotations = Vec::with_capacity(count);
        for i in 0..count {
            let base = i * SNAPSHOT_ENTITY_STRIDE;
            let Some(x) = read_f32(data, base) else { break };
            let Some(y) = read_f32(data, base + 4) else { break };
            let Some(z) = read_f32(data, base + 8) else { break };
            let Some(qx) = read_f32(data, base + 12) else { break };
            let Some(qy) = read_f32(data, base + 16) else { break };
            let Some(qz) = read_f32(data, base + 20) else { break };
            let Some(qw) = read_f32(data, base + 24) else { break };
            positions.push([x, y, z]);
            rotations.push([qx, qy, qz, qw]);
        }
        mirror.apply_snapshot(positions, rotations);
    } else {
        log::trace!("reconcile:multi-chunk snapshot not yet supported chunk={chunk_id}/{total_chunks}");
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn reconcile_advances_tick() {
        let mut staging = StagingBuffer::new();
        let mut mirror = ClientMirror::new();
        assert_eq!(mirror.tick, 0);
        reconcile(&mut staging, &mut mirror);
        assert_eq!(mirror.tick, 1);
    }

    #[test]
    fn reconcile_entity_move() {
        let mut staging = StagingBuffer::new();
        let mut mirror = ClientMirror::new();
        mirror.positions.push([0.0; 3]);
        mirror.rotations.push([0.0, 0.0, 0.0, 1.0]);
        mirror.entity_count = 1;

        let mut payload = Vec::new();
        payload.extend_from_slice(&0u32.to_le_bytes()); // slot 0
        payload.extend_from_slice(&5.0f32.to_le_bytes());
        payload.extend_from_slice(&10.0f32.to_le_bytes());
        payload.extend_from_slice(&15.0f32.to_le_bytes());
        staging.push(AuthorityEvent::CanonEvent {
            tick: 1,
            event_type: "entity_move".into(),
            payload,
        });

        reconcile(&mut staging, &mut mirror);
        assert_eq!(mirror.positions[0], [5.0, 10.0, 15.0]);
    }

    #[test]
    fn reconcile_entity_spawn() {
        let mut staging = StagingBuffer::new();
        let mut mirror = ClientMirror::new();

        staging.push(AuthorityEvent::CanonEvent {
            tick: 1,
            event_type: "entity_spawn".into(),
            payload: Vec::new(),
        });

        reconcile(&mut staging, &mut mirror);
        assert_eq!(mirror.entity_count, 1);
        assert_eq!(mirror.positions.len(), 1);
    }

    #[test]
    fn reconcile_entity_despawn_deferred() {
        let mut staging = StagingBuffer::new();
        let mut mirror = ClientMirror::new();
        // Spawn two entities.
        mirror.positions.push([1.0, 2.0, 3.0]);
        mirror.rotations.push([0.0, 0.0, 0.0, 1.0]);
        mirror.positions.push([4.0, 5.0, 6.0]);
        mirror.rotations.push([0.0, 0.0, 0.0, 1.0]);
        mirror.entity_count = 2;

        // Move entity 1 and despawn entity 0 in same tick.
        // Without deferred despawns, the move would hit the wrong entity.
        let mut move_payload = Vec::new();
        move_payload.extend_from_slice(&1u32.to_le_bytes()); // slot 1
        move_payload.extend_from_slice(&10.0f32.to_le_bytes());
        move_payload.extend_from_slice(&20.0f32.to_le_bytes());
        move_payload.extend_from_slice(&30.0f32.to_le_bytes());
        staging.push(AuthorityEvent::CanonEvent {
            tick: 1,
            event_type: "entity_move".into(),
            payload: move_payload,
        });

        let mut despawn_payload = Vec::new();
        despawn_payload.extend_from_slice(&0u32.to_le_bytes());
        staging.push(AuthorityEvent::CanonEvent {
            tick: 1,
            event_type: "entity_despawn".into(),
            payload: despawn_payload,
        });

        reconcile(&mut staging, &mut mirror);
        // After: entity 0 despawned, entity 1 moved to [10,20,30].
        // swap_remove(0) moves last element to slot 0.
        assert_eq!(mirror.entity_count, 1);
        assert_eq!(mirror.positions[0], [10.0, 20.0, 30.0]);
    }

    #[test]
    fn reconcile_snapshot_chunk() {
        let mut staging = StagingBuffer::new();
        let mut mirror = ClientMirror::new();

        let mut data = Vec::new();
        data.extend_from_slice(&1.0f32.to_le_bytes());
        data.extend_from_slice(&2.0f32.to_le_bytes());
        data.extend_from_slice(&3.0f32.to_le_bytes());
        data.extend_from_slice(&0.0f32.to_le_bytes());
        data.extend_from_slice(&0.0f32.to_le_bytes());
        data.extend_from_slice(&0.0f32.to_le_bytes());
        data.extend_from_slice(&1.0f32.to_le_bytes());

        staging.push(AuthorityEvent::SnapshotChunk {
            chunk_id: 0,
            total_chunks: 1,
            data,
        });

        reconcile(&mut staging, &mut mirror);
        assert_eq!(mirror.entity_count, 1);
        assert_eq!(mirror.positions[0], [1.0, 2.0, 3.0]);
    }

    #[test]
    fn reconcile_ack_noop() {
        let mut staging = StagingBuffer::new();
        let mut mirror = ClientMirror::new();
        staging.push(AuthorityEvent::Ack { seq: 42 });
        reconcile(&mut staging, &mut mirror);
        assert_eq!(mirror.entity_count, 0);
    }

    #[test]
    fn reconcile_bad_payload_len_rejected() {
        let mut staging = StagingBuffer::new();
        let mut mirror = ClientMirror::new();
        mirror.positions.push([0.0; 3]);
        mirror.rotations.push([0.0, 0.0, 0.0, 1.0]);
        mirror.entity_count = 1;

        // entity_move with wrong payload size — should be rejected.
        staging.push(AuthorityEvent::CanonEvent {
            tick: 1,
            event_type: "entity_move".into(),
            payload: vec![0; 12], // too short (need 16)
        });

        reconcile(&mut staging, &mut mirror);
        // Position unchanged.
        assert_eq!(mirror.positions[0], [0.0, 0.0, 0.0]);
    }

    #[test]
    fn snapshot_stride_constant_matches_layout() {
        assert_eq!(SNAPSHOT_ENTITY_STRIDE, 28);
    }
}