use super::stage::StagingBuffer;
use crate::authority::AuthorityEvent;
use crate::mirror::{ClientMirror, MAX_MIRROR_ENTITIES};
const SNAPSHOT_ENTITY_STRIDE: usize = std::mem::size_of::<[f32; 3]>() + std::mem::size_of::<[f32; 4]>();
pub fn reconcile(staging: &mut StagingBuffer, mirror: &mut ClientMirror) {
for event in staging.drain() {
apply_event(&event, mirror);
}
mirror.flush_despawns();
mirror.advance_tick();
}
fn apply_event(event: &AuthorityEvent, mirror: &mut ClientMirror) {
match event {
AuthorityEvent::Ack { .. } => {
}
AuthorityEvent::Reject { seq, reason } => {
log::debug!("authority_reject:seq={seq} reason={reason}");
}
AuthorityEvent::CanonEvent {
tick,
event_type,
payload,
} => {
apply_canon_event(*tick, event_type, payload, mirror);
}
AuthorityEvent::SnapshotChunk {
chunk_id,
total_chunks,
data,
} => {
apply_snapshot_chunk(*chunk_id, *total_chunks, data, mirror);
}
}
}
#[inline]
fn read_f32(data: &[u8], offset: usize) -> Option<f32> {
data.get(offset..offset + 4)
.map(|b| f32::from_le_bytes([b[0], b[1], b[2], b[3]]))
}
#[inline]
fn read_u32(data: &[u8], offset: usize) -> Option<u32> {
data.get(offset..offset + 4)
.map(|b| u32::from_le_bytes([b[0], b[1], b[2], b[3]]))
}
fn apply_canon_event(tick: u64, event_type: &str, payload: &[u8], mirror: &mut ClientMirror) {
match event_type {
"entity_move" => {
if payload.len() != 16 {
log::warn!("reconcile:entity_move bad payload len={}", payload.len());
return;
}
let slot = read_u32(payload, 0).unwrap_or(0) as usize;
let x = read_f32(payload, 4).unwrap_or(0.0);
let y = read_f32(payload, 8).unwrap_or(0.0);
let z = read_f32(payload, 12).unwrap_or(0.0);
if slot < mirror.positions.len() {
mirror.positions[slot] = [x, y, z];
}
}
"entity_rotate" => {
if payload.len() != 20 {
log::warn!("reconcile:entity_rotate bad payload len={}", payload.len());
return;
}
let slot = read_u32(payload, 0).unwrap_or(0) as usize;
let qx = read_f32(payload, 4).unwrap_or(0.0);
let qy = read_f32(payload, 8).unwrap_or(0.0);
let qz = read_f32(payload, 12).unwrap_or(0.0);
let qw = read_f32(payload, 16).unwrap_or(0.0);
if slot < mirror.rotations.len() {
mirror.rotations[slot] = [qx, qy, qz, qw];
}
}
"entity_spawn" => {
if mirror.positions.len() >= MAX_MIRROR_ENTITIES {
log::warn!("reconcile:entity_spawn cap reached ({MAX_MIRROR_ENTITIES})");
return;
}
mirror.positions.push([0.0; 3]);
mirror.rotations.push([0.0, 0.0, 0.0, 1.0]);
mirror.entity_count = mirror.positions.len();
}
"entity_despawn" => {
if payload.len() != 4 {
log::warn!("reconcile:entity_despawn bad payload len={}", payload.len());
return;
}
let slot = read_u32(payload, 0).unwrap_or(0) as usize;
mirror.queue_despawn(slot);
}
_ => {
log::trace!("reconcile:unhandled event_type={event_type} tick={tick}");
}
}
}
fn apply_snapshot_chunk(chunk_id: u32, total_chunks: u32, data: &[u8], mirror: &mut ClientMirror) {
if chunk_id == 0 && total_chunks == 1 {
if data.len() % SNAPSHOT_ENTITY_STRIDE != 0 {
log::warn!(
"reconcile:snapshot data len {} not aligned to stride {}",
data.len(),
SNAPSHOT_ENTITY_STRIDE
);
}
let count = (data.len() / SNAPSHOT_ENTITY_STRIDE).min(MAX_MIRROR_ENTITIES);
let mut positions = Vec::with_capacity(count);
let mut rotations = Vec::with_capacity(count);
for i in 0..count {
let base = i * SNAPSHOT_ENTITY_STRIDE;
let Some(x) = read_f32(data, base) else { break };
let Some(y) = read_f32(data, base + 4) else { break };
let Some(z) = read_f32(data, base + 8) else { break };
let Some(qx) = read_f32(data, base + 12) else { break };
let Some(qy) = read_f32(data, base + 16) else { break };
let Some(qz) = read_f32(data, base + 20) else { break };
let Some(qw) = read_f32(data, base + 24) else { break };
positions.push([x, y, z]);
rotations.push([qx, qy, qz, qw]);
}
mirror.apply_snapshot(positions, rotations);
} else {
log::trace!("reconcile:multi-chunk snapshot not yet supported chunk={chunk_id}/{total_chunks}");
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn reconcile_advances_tick() {
let mut staging = StagingBuffer::new();
let mut mirror = ClientMirror::new();
assert_eq!(mirror.tick, 0);
reconcile(&mut staging, &mut mirror);
assert_eq!(mirror.tick, 1);
}
#[test]
fn reconcile_entity_move() {
let mut staging = StagingBuffer::new();
let mut mirror = ClientMirror::new();
mirror.positions.push([0.0; 3]);
mirror.rotations.push([0.0, 0.0, 0.0, 1.0]);
mirror.entity_count = 1;
let mut payload = Vec::new();
payload.extend_from_slice(&0u32.to_le_bytes()); payload.extend_from_slice(&5.0f32.to_le_bytes());
payload.extend_from_slice(&10.0f32.to_le_bytes());
payload.extend_from_slice(&15.0f32.to_le_bytes());
staging.push(AuthorityEvent::CanonEvent {
tick: 1,
event_type: "entity_move".into(),
payload,
});
reconcile(&mut staging, &mut mirror);
assert_eq!(mirror.positions[0], [5.0, 10.0, 15.0]);
}
#[test]
fn reconcile_entity_spawn() {
let mut staging = StagingBuffer::new();
let mut mirror = ClientMirror::new();
staging.push(AuthorityEvent::CanonEvent {
tick: 1,
event_type: "entity_spawn".into(),
payload: Vec::new(),
});
reconcile(&mut staging, &mut mirror);
assert_eq!(mirror.entity_count, 1);
assert_eq!(mirror.positions.len(), 1);
}
#[test]
fn reconcile_entity_despawn_deferred() {
let mut staging = StagingBuffer::new();
let mut mirror = ClientMirror::new();
mirror.positions.push([1.0, 2.0, 3.0]);
mirror.rotations.push([0.0, 0.0, 0.0, 1.0]);
mirror.positions.push([4.0, 5.0, 6.0]);
mirror.rotations.push([0.0, 0.0, 0.0, 1.0]);
mirror.entity_count = 2;
let mut move_payload = Vec::new();
move_payload.extend_from_slice(&1u32.to_le_bytes()); move_payload.extend_from_slice(&10.0f32.to_le_bytes());
move_payload.extend_from_slice(&20.0f32.to_le_bytes());
move_payload.extend_from_slice(&30.0f32.to_le_bytes());
staging.push(AuthorityEvent::CanonEvent {
tick: 1,
event_type: "entity_move".into(),
payload: move_payload,
});
let mut despawn_payload = Vec::new();
despawn_payload.extend_from_slice(&0u32.to_le_bytes());
staging.push(AuthorityEvent::CanonEvent {
tick: 1,
event_type: "entity_despawn".into(),
payload: despawn_payload,
});
reconcile(&mut staging, &mut mirror);
assert_eq!(mirror.entity_count, 1);
assert_eq!(mirror.positions[0], [10.0, 20.0, 30.0]);
}
#[test]
fn reconcile_snapshot_chunk() {
let mut staging = StagingBuffer::new();
let mut mirror = ClientMirror::new();
let mut data = Vec::new();
data.extend_from_slice(&1.0f32.to_le_bytes());
data.extend_from_slice(&2.0f32.to_le_bytes());
data.extend_from_slice(&3.0f32.to_le_bytes());
data.extend_from_slice(&0.0f32.to_le_bytes());
data.extend_from_slice(&0.0f32.to_le_bytes());
data.extend_from_slice(&0.0f32.to_le_bytes());
data.extend_from_slice(&1.0f32.to_le_bytes());
staging.push(AuthorityEvent::SnapshotChunk {
chunk_id: 0,
total_chunks: 1,
data,
});
reconcile(&mut staging, &mut mirror);
assert_eq!(mirror.entity_count, 1);
assert_eq!(mirror.positions[0], [1.0, 2.0, 3.0]);
}
#[test]
fn reconcile_ack_noop() {
let mut staging = StagingBuffer::new();
let mut mirror = ClientMirror::new();
staging.push(AuthorityEvent::Ack { seq: 42 });
reconcile(&mut staging, &mut mirror);
assert_eq!(mirror.entity_count, 0);
}
#[test]
fn reconcile_bad_payload_len_rejected() {
let mut staging = StagingBuffer::new();
let mut mirror = ClientMirror::new();
mirror.positions.push([0.0; 3]);
mirror.rotations.push([0.0, 0.0, 0.0, 1.0]);
mirror.entity_count = 1;
staging.push(AuthorityEvent::CanonEvent {
tick: 1,
event_type: "entity_move".into(),
payload: vec![0; 12], });
reconcile(&mut staging, &mut mirror);
assert_eq!(mirror.positions[0], [0.0, 0.0, 0.0]);
}
#[test]
fn snapshot_stride_constant_matches_layout() {
assert_eq!(SNAPSHOT_ENTITY_STRIDE, 28);
}
}