pub mod diff_index;
use core::iter;
use alloc::collections::vec_deque::{self, VecDeque};
use bevy::{
ecs::{change_detection::Tick, component::Mutable},
platform::collections::HashMap,
prelude::*,
};
use serde::{Deserialize, Serialize, Serializer, de::DeserializeOwned, ser::SerializeSeq};
use crate::shared::replication::storage::ReplicationStorage;
use diff_index::DiffIndex;
pub trait Diffable: Component<Mutability = Mutable> + Serialize + DeserializeOwned + Sized {
type Diff: Serialize + DeserializeOwned + Send + Sync + 'static;
const HISTORY_LEN: usize = 64;
fn apply_diff(&mut self, diff: &Self::Diff) -> Result<()>;
}
pub trait EntityDiffExt {
fn apply_diff<C: Diffable>(&mut self, diff: C::Diff) -> Result<()>;
}
impl EntityDiffExt for EntityWorldMut<'_> {
fn apply_diff<C: Diffable>(&mut self, diff: C::Diff) -> Result<()> {
let entity = self.id();
let mut component = self
.get_mut::<C>()
.ok_or_else(|| format!("`{entity}` doesn't have `{}`", ShortName::of::<C>()))?;
let before_diff = component.last_changed();
component.apply_diff(&diff)?;
let after_diff = component.last_changed();
let mut storage = self.resource_mut::<ReplicationStorage>();
let history = storage.get_or_default::<DiffHistory<C>>(entity);
history.record(diff, before_diff, after_diff);
Ok(())
}
}
pub trait EntityCommandsDiffExt {
fn apply_diff<C: Diffable>(&mut self, diff: C::Diff) -> &mut Self;
}
impl EntityCommandsDiffExt for EntityCommands<'_> {
fn apply_diff<C: Diffable>(&mut self, diff: C::Diff) -> &mut Self {
self.queue(move |mut entity: EntityWorldMut| entity.apply_diff::<C>(diff))
}
}
pub trait WorldDiffExt {
fn apply_resource_diff<R: Resource + Diffable>(&mut self, diff: R::Diff) -> Result<()>;
}
impl WorldDiffExt for World {
fn apply_resource_diff<R: Resource + Diffable>(&mut self, diff: R::Diff) -> Result<()> {
let Some(entity) = self
.component_id::<R>()
.and_then(|id| self.resource_entities().get(id))
else {
return Err(format!("missing resource `{}`", ShortName::of::<R>()).into());
};
self.entity_mut(entity).apply_diff::<R>(diff)
}
}
pub trait CommandsDiffExt {
fn apply_resource_diff<R: Resource + Diffable>(&mut self, diff: R::Diff) -> &mut Self;
}
impl CommandsDiffExt for Commands<'_, '_> {
fn apply_resource_diff<R: Resource + Diffable>(&mut self, diff: R::Diff) -> &mut Self {
self.queue(move |entity: &mut World| entity.apply_resource_diff::<R>(diff));
self
}
}
#[derive(Debug, Clone)]
pub struct DiffHistory<C: Diffable> {
next_index: DiffIndex,
last_changed: Option<Tick>,
diffs: VecDeque<C::Diff>,
}
impl<C: Diffable> DiffHistory<C> {
fn record(&mut self, diff: C::Diff, before_diff: Tick, after_diff: Tick) {
debug_assert!(
C::HISTORY_LEN <= DiffIndex::MAX_NEWER_DISTANCE as usize,
"`{}::HISTORY_LEN` cannot exceed {}",
ShortName::of::<C>(),
DiffIndex::MAX_NEWER_DISTANCE
);
if self.last_changed.is_some_and(|tick| tick != before_diff) {
self.diffs.clear();
self.last_changed = Some(after_diff);
self.next_index += 2;
return;
}
self.next_index += 1;
self.last_changed = Some(after_diff);
self.diffs.push_back(diff);
let excess = self.diffs.len().saturating_sub(C::HISTORY_LEN);
if excess > 0 {
self.diffs.drain(..excess);
}
}
pub fn diffs_after(
&mut self,
cursor: Option<DiffIndex>,
last_changed: Tick,
) -> (DiffIndex, DiffIter<'_, C::Diff>) {
if self.last_changed.is_none_or(|tick| tick != last_changed) {
self.diffs.clear();
self.last_changed = Some(last_changed);
let current = self.next_index;
self.next_index += 1;
return (current, DiffIter::empty(&self.diffs));
}
let current = self.current_index();
let Some(cursor) = cursor else {
return (current, DiffIter::empty(&self.diffs));
};
let missing_count = current.distance_after(cursor) as usize;
if self.diffs.len() <= missing_count {
return (current, DiffIter::empty(&self.diffs));
}
let start = self.diffs.len() - missing_count;
(current, DiffIter::new(&self.diffs, start))
}
pub fn current_index(&self) -> DiffIndex {
self.next_index - 1
}
}
impl<C: Diffable> Default for DiffHistory<C> {
fn default() -> Self {
Self {
last_changed: None,
next_index: DiffIndex::default(),
diffs: Default::default(),
}
}
}
#[derive(Deserialize)]
#[serde(bound(deserialize = "C: Diffable"))]
pub enum ComponentDelta<C: Diffable> {
Snapshot {
index: DiffIndex,
component: C,
},
Diffs {
index: DiffIndex,
diffs: Vec<C::Diff>,
},
}
#[derive(Serialize)]
pub enum ComponentDeltaRef<'a, C: Diffable> {
Snapshot {
index: DiffIndex,
component: &'a C,
},
Diffs {
index: DiffIndex,
diffs: DiffIter<'a, C::Diff>,
},
}
#[must_use]
#[derive(Deref)]
pub struct DiffIter<'a, T>(vec_deque::Iter<'a, T>);
impl<'a, T> DiffIter<'a, T> {
fn new(diffs: &'a VecDeque<T>, start: usize) -> Self {
Self(diffs.range(start..))
}
fn empty(diffs: &'a VecDeque<T>) -> Self {
Self(diffs.range(diffs.len()..))
}
}
impl<T: Serialize> Serialize for DiffIter<'_, T> {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let mut seq = serializer.serialize_seq(Some(self.len()))?;
for diff in self.0.clone() {
seq.serialize_element(diff)?;
}
seq.end()
}
}
#[derive(Component, Debug)]
pub struct DiffBuffer<C: Diffable> {
last_applied: Option<DiffIndex>,
pending: HashMap<DiffIndex, C::Diff>,
}
impl<C: Diffable> DiffBuffer<C> {
pub fn set_last_applied(&mut self, last_applied: DiffIndex) {
self.last_applied = Some(last_applied);
self.pending.clear();
}
pub fn push(&mut self, last_index: DiffIndex, diffs: Vec<C::Diff>) {
for (offset, diff) in diffs.into_iter().rev().enumerate() {
let index = last_index - offset as u16;
if self
.last_applied
.is_none_or(|last_applied| index.is_newer_than(last_applied))
{
self.pending.insert(index, diff);
}
}
}
pub fn drain_ready(&mut self) -> impl Iterator<Item = C::Diff> + '_ {
iter::from_fn(move || {
let index = self.last_applied.map_or(DiffIndex::new(0), |i| i + 1);
let diff = self.pending.remove(&index)?;
self.last_applied = Some(index);
Some(diff)
})
}
pub fn last_applied(&self) -> Option<DiffIndex> {
self.last_applied
}
}
impl<C: Diffable> Default for DiffBuffer<C> {
fn default() -> Self {
Self {
last_applied: None,
pending: Default::default(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
#[should_panic]
fn too_long_history() {
let mut history = DiffHistory::<TooLongHistory>::default();
history.record((), Tick::new(0), Tick::new(1));
}
#[test]
fn history_recording() {
let mut history = DiffHistory::<Value>::default();
history.record(ValueDiff::Add(0), Tick::new(0), Tick::new(1));
assert_eq!(history.current_index().get(), 0);
assert_eq!(history.diffs, [ValueDiff::Add(0)]);
}
#[test]
fn history_trimming() {
let mut history = DiffHistory::<Value>::default();
history.record(ValueDiff::Add(0), Tick::new(0), Tick::new(1));
history.record(ValueDiff::Add(1), Tick::new(1), Tick::new(2));
history.record(ValueDiff::Add(2), Tick::new(2), Tick::new(3));
history.record(ValueDiff::Add(3), Tick::new(3), Tick::new(4));
assert_eq!(history.current_index().get(), 3);
assert_eq!(
history.diffs,
[ValueDiff::Add(1), ValueDiff::Add(2), ValueDiff::Add(3)],
"history should retain at most `HISTORY_LEN` diffs"
);
}
#[test]
fn history_reset_on_record() {
let mut history = DiffHistory::<Value>::default();
history.record(ValueDiff::Add(0), Tick::new(0), Tick::new(1));
history.record(ValueDiff::Add(1), Tick::new(10), Tick::new(11));
assert_eq!(history.current_index().get(), 2);
assert!(
history.diffs.is_empty(),
"should reset if the recorded tick differs"
);
}
#[test]
fn history_diffs_after() {
let mut history = DiffHistory::<Value>::default();
history.record(ValueDiff::Add(0), Tick::new(0), Tick::new(1));
history.record(ValueDiff::Add(1), Tick::new(1), Tick::new(2));
history.record(ValueDiff::Add(2), Tick::new(2), Tick::new(3));
history.record(ValueDiff::Add(3), Tick::new(3), Tick::new(4));
let (index, diffs) = history.diffs_after(None, Tick::new(4));
assert_eq!(index.get(), 3);
assert_eq!(diffs.len(), 0);
let (index, diffs) = history.diffs_after(Some(DiffIndex::new(0)), Tick::new(4));
assert_eq!(index.get(), 3);
assert_eq!(
diffs.len(),
0,
"shouldn't return diffs for indices outside of the history"
);
let (index, diffs) = history.diffs_after(Some(DiffIndex::new(1)), Tick::new(4));
assert_eq!(index.get(), 3);
assert_eq!(
diffs.0.copied().collect::<Vec<_>>(),
[ValueDiff::Add(2), ValueDiff::Add(3)]
);
let (index, diffs) = history.diffs_after(Some(DiffIndex::new(3)), Tick::new(4));
assert_eq!(index.get(), 3);
assert_eq!(diffs.len(), 0);
}
#[test]
fn history_reset_on_diffs_after() {
let mut history = DiffHistory::<Value>::default();
history.record(ValueDiff::Add(0), Tick::new(0), Tick::new(1));
history.record(ValueDiff::Add(1), Tick::new(1), Tick::new(2));
let (index, diffs) = history.diffs_after(Some(DiffIndex::new(0)), Tick::new(3));
assert_eq!(index.get(), 2);
assert_eq!(diffs.len(), 0);
assert!(
history.diffs.is_empty(),
"should reset if the recorded tick differs"
);
let (index, diffs) = history.diffs_after(Some(DiffIndex::new(1)), Tick::new(3));
assert_eq!(index.get(), 2);
assert_eq!(
diffs.len(),
0,
"shouldn't return any diffs since the history is now empty"
);
}
#[test]
fn buffering() {
let mut buffer = DiffBuffer::<Value>::default();
let diffs = [ValueDiff::Add(0), ValueDiff::Add(1)];
buffer.push(DiffIndex::new(1), diffs.into());
assert_eq!(buffer.pending.len(), 2);
let ready: Vec<_> = buffer.drain_ready().collect();
assert_eq!(ready, diffs);
assert_eq!(buffer.last_applied, Some(DiffIndex::new(1)));
}
#[test]
fn buffering_with_intersection() {
let mut buffer = DiffBuffer::<Value>::default();
buffer.push(
DiffIndex::new(1),
vec![ValueDiff::Add(0), ValueDiff::Add(1)],
);
buffer.push(
DiffIndex::new(2),
vec![ValueDiff::Add(1), ValueDiff::Add(2)],
);
assert_eq!(buffer.pending.len(), 3);
let ready: Vec<_> = buffer.drain_ready().collect();
assert_eq!(
ready,
[ValueDiff::Add(0), ValueDiff::Add(1), ValueDiff::Add(2)]
);
assert_eq!(buffer.last_applied, Some(DiffIndex::new(2)));
}
#[test]
fn buffering_out_of_order() {
let mut buffer = DiffBuffer::<Value>::default();
buffer.push(
DiffIndex::new(3),
vec![ValueDiff::Add(2), ValueDiff::Add(3)],
);
buffer.push(
DiffIndex::new(1),
vec![ValueDiff::Add(0), ValueDiff::Add(1)],
);
assert_eq!(buffer.pending.len(), 4);
let ready: Vec<_> = buffer.drain_ready().collect();
assert_eq!(
ready,
[
ValueDiff::Add(0),
ValueDiff::Add(1),
ValueDiff::Add(2),
ValueDiff::Add(3)
]
);
assert_eq!(buffer.last_applied, Some(DiffIndex::new(3)));
}
#[test]
fn buffering_with_missing() {
let mut buffer = DiffBuffer::<Value>::default();
buffer.push(DiffIndex::new(0), vec![ValueDiff::Add(0)]);
buffer.push(DiffIndex::new(2), vec![ValueDiff::Add(2)]);
assert_eq!(buffer.pending.len(), 2);
let ready: Vec<_> = buffer.drain_ready().collect();
assert_eq!(
ready,
[ValueDiff::Add(0)],
"diff 2 requires diff 1 in the buffer"
);
assert_eq!(buffer.last_applied, Some(DiffIndex::new(0)));
buffer.push(DiffIndex::new(1), vec![ValueDiff::Add(1)]);
assert_eq!(buffer.pending.len(), 2);
let ready: Vec<_> = buffer.drain_ready().collect();
assert_eq!(
ready,
[ValueDiff::Add(1), ValueDiff::Add(2),],
"diff 2 should be ready after receiving diff 1"
);
assert_eq!(buffer.last_applied, Some(DiffIndex::new(2)));
}
#[test]
fn apply_diff_command() {
let mut world = World::new();
world.init_resource::<ReplicationStorage>();
let entity = world.spawn(Value::default()).id();
let mut commands = world.commands();
commands
.entity(entity)
.apply_diff::<Value>(ValueDiff::Add(10))
.apply_diff::<Value>(ValueDiff::Sub(3));
world.flush();
assert_eq!(world.get::<Value>(entity).copied(), Some(Value(7)));
let storage = world.resource::<ReplicationStorage>();
let history = storage.get::<DiffHistory<Value>>(entity).unwrap();
assert_eq!(history.diffs, [ValueDiff::Add(10), ValueDiff::Sub(3)]);
}
#[test]
fn apply_resource_diff_command() {
let mut world = World::new();
world.init_resource::<ReplicationStorage>();
let entity = world.spawn(Value::default()).id();
let mut commands = world.commands();
commands
.apply_resource_diff::<Value>(ValueDiff::Add(10))
.apply_resource_diff::<Value>(ValueDiff::Sub(3));
world.flush();
assert_eq!(*world.resource::<Value>(), Value(7));
let storage = world.resource::<ReplicationStorage>();
let history = storage.get::<DiffHistory<Value>>(entity).unwrap();
assert_eq!(history.diffs, [ValueDiff::Add(10), ValueDiff::Sub(3)]);
}
#[test]
fn apply_missing_component() {
let mut world = World::new();
world.init_resource::<ReplicationStorage>();
let mut entity = world.spawn_empty();
assert!(entity.apply_diff::<Value>(ValueDiff::Add(10)).is_err());
assert!(!world.contains_resource::<Value>());
}
#[test]
fn apply_missing_resource() {
let mut world = World::new();
world.init_resource::<ReplicationStorage>();
assert!(
world
.apply_resource_diff::<Value>(ValueDiff::Add(10))
.is_err()
);
assert!(!world.contains_resource::<Value>());
}
#[test]
fn apply_with_external_mutation() {
let mut world = World::new();
world.init_resource::<ReplicationStorage>();
let entity = world.spawn(Value::default()).id();
world
.apply_resource_diff::<Value>(ValueDiff::Add(10))
.unwrap();
world.increment_change_tick();
let mut value = world.resource_mut::<Value>();
assert_eq!(*value, Value(10));
value.set_changed();
world
.apply_resource_diff::<Value>(ValueDiff::Sub(3))
.unwrap();
assert_eq!(*world.resource::<Value>(), Value(7));
let storage = world.resource::<ReplicationStorage>();
let history = storage.get::<DiffHistory<Value>>(entity).unwrap();
assert!(
history.diffs.is_empty(),
"history should be cleared on external mutation"
);
}
#[derive(Component, Serialize, Deserialize)]
struct TooLongHistory;
impl Diffable for TooLongHistory {
const HISTORY_LEN: usize = u16::MAX as usize;
type Diff = ();
fn apply_diff(&mut self, _diff: &Self::Diff) -> Result<()> {
Ok(())
}
}
#[derive(Resource, Default, Deserialize, Serialize, PartialEq, Debug, Clone, Copy)]
struct Value(u8);
impl Diffable for Value {
const HISTORY_LEN: usize = 3;
type Diff = ValueDiff;
fn apply_diff(&mut self, diff: &Self::Diff) -> Result<()> {
match *diff {
ValueDiff::Add(value) => self.0 += value,
ValueDiff::Sub(value) => self.0 -= value,
}
Ok(())
}
}
#[derive(Debug, Deserialize, Serialize, PartialEq, Clone, Copy)]
enum ValueDiff {
Add(u8),
Sub(u8),
}
}