pub mod message;
pub mod related_entities;
pub(super) mod removal_buffer;
pub mod replicated_archetypes;
pub(super) mod replication_messages;
mod replication_query;
pub mod server_tick;
pub mod visibility;
use core::time::Duration;
use bevy::{
ecs::{
archetype::Archetypes,
change_detection::{CheckChangeTicks, Tick},
entity::{Entities, EntityHash, EntityHashMap},
intern::Interned,
schedule::ScheduleLabel,
system::SystemChangeTick,
},
platform::collections::{HashSet, hash_map::Entry},
prelude::*,
time::common_conditions::on_timer,
};
use bytes::Buf;
use log::{Level, debug, log_enabled, trace, warn};
use crate::{
postcard_utils,
prelude::*,
server::{
replicated_archetypes::ReplicatedArchetypes,
replication_messages::{mutations::MutationsSplit, serialized_data::ErasedComponent},
visibility::registry::FilterRegistry,
},
shared::{
backend::channels::ClientChannel,
message::server_message::message_buffer::MessageBuffer,
replication::{
client_ticks::{ClientTicks, EntityTicks},
registry::{
ComponentIndex, ReplicationRegistry, component_mask::ComponentMask,
ctx::SerializeCtx,
},
rules::ReplicationRules,
storage::ReplicationStorage,
track_mutate_messages::TrackMutateMessages,
visibility::VisibilityScope,
},
},
};
use related_entities::RelatedEntities;
use removal_buffer::RemovalBuffer;
use replication_messages::{
mutations::Mutations, serialized_data::SerializedData, updates::Updates,
};
use replication_query::ReplicationQuery;
use server_tick::ServerTick;
use visibility::client_visibility::ClientVisibility;
pub struct ServerPlugin {
pub tick_schedule: Option<Interned<dyn ScheduleLabel>>,
pub mutations_timeout: Duration,
}
impl ServerPlugin {
pub fn new(tick_schedule: impl ScheduleLabel) -> Self {
Self {
tick_schedule: Some(tick_schedule.intern()),
mutations_timeout: Duration::from_secs(10),
}
}
}
impl Default for ServerPlugin {
fn default() -> Self {
Self::new(FixedPostUpdate)
}
}
impl Plugin for ServerPlugin {
fn build(&self, app: &mut App) {
app.init_resource::<DespawnBuffer>()
.init_resource::<RemovalBuffer>()
.init_resource::<SerializedData>()
.init_resource::<ServerMessages>()
.init_resource::<ServerTick>()
.init_resource::<ServerChangeTick>()
.init_resource::<ReplicatedArchetypes>()
.init_resource::<MessageBuffer>()
.init_resource::<RelatedEntities>()
.init_resource::<FilterRegistry>()
.configure_sets(
PreUpdate,
(ServerSystems::ReceivePackets, ServerSystems::Receive).chain(),
)
.configure_sets(
PostUpdate,
(
ServerSystems::IncrementTick,
ServerSystems::Send,
ServerSystems::SendPackets,
)
.chain(),
)
.add_observer(handle_connect)
.add_observer(handle_disconnect)
.add_observer(check_mutation_ticks)
.add_observer(buffer_despawn)
.add_observer(cleanup_storage)
.add_systems(
PreUpdate,
(
receive_acks,
cleanup_acks(self.mutations_timeout).run_if(on_timer(self.mutations_timeout)),
)
.chain()
.in_set(ServerSystems::Receive)
.run_if(in_state(ServerState::Running)),
)
.add_systems(OnExit(ServerState::Running), reset)
.add_systems(
PostUpdate,
(
prepare_messages,
collect_mappings,
collect_despawns,
collect_removals,
collect_changes,
send_messages,
)
.chain()
.run_if(resource_changed::<ServerTick>)
.in_set(ServerSystems::Send)
.run_if(in_state(ServerState::Running)),
);
if let Some(tick_schedule) = self.tick_schedule {
debug!("using tick schedule `{tick_schedule:?}`");
app.add_systems(
tick_schedule,
increment_tick
.in_set(ServerSystems::IncrementTick)
.run_if(in_state(ServerState::Running)),
);
}
let auth_method = app.world().resource::<AuthMethod>();
debug!("using authorization method `{auth_method:?}`");
match auth_method {
AuthMethod::ProtocolCheck => {
app.add_observer(check_protocol);
}
AuthMethod::None => {
app.register_required_components::<ConnectedClient, AuthorizedClient>();
}
AuthMethod::Custom => (),
}
if log_enabled!(Level::Debug) {
app.add_systems(OnEnter(ServerState::Running), || debug!("running"))
.add_systems(OnEnter(ServerState::Stopped), || debug!("stopped"));
}
}
fn finish(&self, app: &mut App) {
let rules = app.world().resource::<ReplicationRules>();
let replicated_ids: HashSet<_> = rules
.iter()
.flat_map(|rule| &rule.components)
.map(|component| component.id)
.collect();
if !replicated_ids.is_empty() {
let mut remove_observer = Observer::new(buffer_removals);
for id in replicated_ids {
remove_observer = remove_observer.with_component(id);
}
app.world_mut().spawn(remove_observer);
}
app.world_mut()
.resource_scope(|world, mut messages: Mut<ServerMessages>| {
let channels = world.resource::<RepliconChannels>();
messages.setup_client_channels(channels.client_channels().len());
});
}
}
fn handle_connect(add: On<Add, ConnectedClient>, mut message_buffer: ResMut<MessageBuffer>) {
debug!("client `{}` connected", add.entity);
message_buffer.exclude_client(add.entity);
}
fn handle_disconnect(remove: On<Remove, ConnectedClient>, mut messages: ResMut<ServerMessages>) {
debug!("client `{}` disconnected", remove.entity);
messages.remove_client(remove.entity);
}
fn check_protocol(
client_protocol: On<FromClient<ProtocolHash>>,
mut commands: Commands,
mut disconnects: MessageWriter<DisconnectRequest>,
protocol: Res<ProtocolHash>,
) {
let client = client_protocol
.client_id
.entity()
.expect("protocol hash sent only from clients");
if **client_protocol == *protocol {
debug!("marking client `{client}` as authorized");
commands.entity(client).insert(AuthorizedClient);
} else {
debug!(
"disconnecting client `{client}` due to protocol mismatch (client: `{:?}`, server: `{:?}`)",
**client_protocol, *protocol
);
commands.server_trigger(ToClients {
targets: SendTargets::Single(client_protocol.client_id),
message: ProtocolMismatch,
});
disconnects.write(DisconnectRequest { client });
}
}
fn check_mutation_ticks(check: On<CheckChangeTicks>, mut clients: Query<&mut ClientTicks>) {
debug!(
"checking mutation ticks for overflow for {:?}",
check.present_tick()
);
for mut ticks in &mut clients {
for entity_ticks in ticks.entities.values_mut() {
entity_ticks.system_tick.check_tick(*check);
}
}
}
pub fn increment_tick(mut server_tick: ResMut<ServerTick>) {
trace!("incrementing `{:?}`", *server_tick);
server_tick.increment();
}
fn buffer_removals(
remove: On<Remove>,
entities: &Entities,
archetypes: &Archetypes,
state: Res<State<ServerState>>,
mut replicated_archetypes: ResMut<ReplicatedArchetypes>,
rules: Res<ReplicationRules>,
registry: Option<Res<ReplicationRegistry>>,
mut removals: ResMut<RemovalBuffer>,
) {
if *state != ServerState::Running {
return;
}
let components = remove.trigger().components;
if components.contains(&replicated_archetypes.marker_id()) {
trace!("ignoring removals for despawned `{}`", remove.entity);
return;
}
let registry = registry.expect("registry should always exist on the server");
replicated_archetypes.update(archetypes, &rules);
let location = entities.get_spawned(remove.entity).unwrap();
let Some(archetype) = replicated_archetypes.get(location.archetype_id) else {
trace!(
"ignoring `{components:?}` removal for non-replicated `{}`",
remove.entity
);
return;
};
removals.insert(remove.entity, components, archetype, ®istry);
}
fn buffer_despawn(
remove: On<Remove, Replicated>,
mut despawn_buffer: ResMut<DespawnBuffer>,
state: Res<State<ServerState>>,
) {
if *state == ServerState::Running {
trace!("buffering despawn of `{}`", remove.entity);
despawn_buffer.push(remove.entity);
}
}
fn cleanup_acks(
mutations_timeout: Duration,
) -> impl FnMut(Query<&mut ClientTicks>, Res<Time<Real>>) {
move |mut clients: Query<&mut ClientTicks>, time: Res<Time<Real>>| {
let min_timestamp = time.elapsed().saturating_sub(mutations_timeout);
for mut ticks in &mut clients {
ticks.cleanup_older_mutations(min_timestamp);
}
}
}
fn receive_acks(mut messages: ResMut<ServerMessages>, mut clients: Query<&mut ClientTicks>) {
for (client, mut message) in messages.receive(ClientChannel::MutationAcks) {
let Ok(mut ticks) = clients.get_mut(client) else {
debug!("ignoring acks for disconnected client `{client}`");
continue;
};
while message.has_remaining() {
match postcard_utils::from_buf(&mut message) {
Ok(mutate_index) => {
ticks.ack_mutate_message(client, mutate_index);
}
Err(e) => {
debug!("unable to deserialize mutate index from client `{client}`: {e}")
}
}
}
}
}
fn prepare_messages(
change_tick: SystemChangeTick,
mut related_entities: ResMut<RelatedEntities>,
mut server_change_tick: ResMut<ServerChangeTick>,
clients: Query<(&mut Updates, &mut Mutations)>,
) {
**server_change_tick = change_tick.this_run();
related_entities.rebuild_graphs();
for (mut updates, mut mutations) in clients {
updates.clear();
mutations.reset(related_entities.graphs_count());
}
}
fn collect_mappings(
despawn_buffer: Res<DespawnBuffer>,
registry: Res<FilterRegistry>,
mut serialized: ResMut<SerializedData>,
entities: Query<(Entity, &Signature), With<Replicated>>,
mut clients: Query<(
Entity,
&mut Updates,
&mut ClientTicks,
&mut ClientVisibility,
)>,
) -> Result<()> {
for (entity, signature) in entities {
let hash = signature.hash();
let mut mapping_range = None;
if let Some(client) = signature.client() {
let Ok((_, mut message, ticks, visibility)) = clients.get_mut(client) else {
continue;
};
if should_send_mapping(entity, &despawn_buffer, ®istry, &visibility, &ticks) {
trace!(
"writing mapping `{entity}` to 0x{hash:016x} dedicated for client `{client}`"
);
let mapping_range =
serialized.write_cached_mapping(&mut mapping_range, entity, hash)?;
message.add_mapping(mapping_range);
}
} else {
for (client, mut message, ticks, visibility) in &mut clients {
if should_send_mapping(entity, &despawn_buffer, ®istry, &visibility, &ticks) {
trace!("writing mapping `{entity}` to 0x{hash:016x} for client `{client}`");
let mapping_range =
serialized.write_cached_mapping(&mut mapping_range, entity, hash)?;
message.add_mapping(mapping_range);
}
}
}
}
Ok(())
}
fn should_send_mapping(
entity: Entity,
despawn_buffer: &DespawnBuffer,
registry: &FilterRegistry,
visibility: &ClientVisibility,
ticks: &ClientTicks,
) -> bool {
if visibility.get(entity).is_hidden(registry) || despawn_buffer.contains(&entity) {
return false;
}
!ticks.entities.contains_key(&entity)
}
fn collect_despawns(
registry: Res<FilterRegistry>,
mut serialized: ResMut<SerializedData>,
mut despawn_buffer: ResMut<DespawnBuffer>,
mut clients: Query<(
Entity,
&mut Updates,
&mut ClientTicks,
&mut PriorityMap,
&mut ClientVisibility,
)>,
) -> Result<()> {
for entity in despawn_buffer.drain(..) {
let entity_range = serialized.write_entity(entity)?;
for (client, mut message, mut ticks, mut priority, mut visibility) in &mut clients {
if ticks.entities.remove(&entity).is_some() {
trace!("writing despawn for `{entity}` for client `{client}`");
message.add_despawn(entity_range.clone());
}
visibility.remove_despawned(entity);
priority.remove(&entity);
}
}
for (client, mut message, mut ticks, mut priority, visibility) in clients {
for (entity, filter_mask) in visibility.iter_lost() {
if !filter_mask.is_hidden(®istry) {
continue;
}
if ticks.entities.remove(&entity).is_some() {
trace!("writing visibility lost for `{entity}` for client `{client}`");
let entity_range = serialized.write_entity(entity)?;
message.add_despawn(entity_range);
}
priority.remove(&entity);
}
}
Ok(())
}
fn collect_removals(
archetypes: &Archetypes,
entities: &Entities,
removal_buffer: Res<RemovalBuffer>,
rules: Res<ReplicationRules>,
registry: Res<ReplicationRegistry>,
filter_registry: Res<FilterRegistry>,
mut replicated_archetypes: ResMut<ReplicatedArchetypes>,
mut serialized: ResMut<SerializedData>,
mut lost_buffer: Local<Vec<ComponentIndex>>,
mut clients: Query<(
Entity,
&mut Updates,
&mut ClientTicks,
&mut ClientVisibility,
)>,
) -> Result<()> {
replicated_archetypes.update(archetypes, &rules);
for (&entity, remove_ids) in removal_buffer.iter() {
let mut entity_range = None;
for (_, mut message, _, _) in &mut clients {
message.start_entity_removals();
}
for &(component_index, fns_id) in remove_ids {
let mut fns_id_range = None;
for (client, mut message, mut ticks, _) in &mut clients {
let Some(entity_ticks) = ticks.entities.get_mut(&entity) else {
continue;
};
if !entity_ticks.components.contains(component_index) {
continue;
}
trace!("writing `{fns_id:?}` removal for `{entity}` for client `{client}`");
if !message.removals_entity_added() {
let entity_range = serialized.write_cached_entity(&mut entity_range, entity)?;
message.add_removals_entity(entity_range);
}
let fns_id_range = serialized.write_cached_fns_id(&mut fns_id_range, fns_id)?;
message.add_removal(fns_id_range);
entity_ticks.remove_component(component_index);
}
}
}
for (client, mut message, mut ticks, mut visibility) in &mut clients {
for (entity, filter_mask) in visibility.drain_lost() {
if filter_mask.is_hidden(&filter_registry) {
continue;
}
let Some(entity_ticks) = ticks.entities.get_mut(&entity) else {
continue;
};
let Ok(location) = entities.get_spawned(entity) else {
warn!(
"`{entity}` was despawned after despawn processing but before sending, \
so the despawn will be sent on the next tick; \
consider ordering your despawn before `{:?}`",
ServerSystems::Send
);
continue;
};
let archetype = replicated_archetypes
.get(location.archetype_id)
.unwrap_or_else(|| {
panic!("`{entity}` should be replicated because the client knows about it")
});
let mut entity_range = None;
message.start_entity_removals();
let mut write_lost = |component_index, entity_ticks: &mut EntityTicks| -> Result<()> {
let &(id, _) = registry.get_by_index(component_index).unwrap_or_else(|| {
panic!("`{component_index:?}` should've been registered to be marked as lost")
});
let rule = archetype.find_rule(id).unwrap_or_else(|| {
panic!("`{id:?}` should match a rule since the client knows about it")
});
trace!(
"writing `{:?}` lost for `{entity}` for client `{client}`",
rule.fns_id
);
if !message.removals_entity_added() {
let entity_range = serialized.write_cached_entity(&mut entity_range, entity)?;
message.add_removals_entity(entity_range);
}
let fns_id_range = serialized.write_fns_id(rule.fns_id)?;
message.add_removal(fns_id_range);
entity_ticks.remove_component(component_index);
Ok(())
};
for scope in filter_mask.scopes(&filter_registry) {
match scope {
VisibilityScope::Entity => {
unreachable!("entity filters are processed during despawn collection")
}
VisibilityScope::Components(mask) => {
for component_index in mask.iter() {
if entity_ticks.components.contains(component_index) {
write_lost(component_index, entity_ticks)?;
}
}
}
VisibilityScope::AllExcept(mask) => {
lost_buffer.extend(
entity_ticks
.components
.iter()
.filter(|&component_index| !mask.contains(component_index)),
);
for component_index in lost_buffer.drain(..) {
write_lost(component_index, entity_ticks)?;
}
}
}
}
}
}
Ok(())
}
fn collect_changes(
archetypes: &Archetypes,
query: ReplicationQuery,
server_tick: Res<ServerTick>,
change_tick: Res<ServerChangeTick>,
registry: Res<ReplicationRegistry>,
filter_registry: Res<FilterRegistry>,
type_registry: Res<AppTypeRegistry>,
related_entities: Res<RelatedEntities>,
rules: Res<ReplicationRules>,
mut replication_storage: ResMut<ReplicationStorage>,
mut replicated_archetypes: ResMut<ReplicatedArchetypes>,
mut serialized: ResMut<SerializedData>,
mut removal_buffer: ResMut<RemovalBuffer>,
mut clients: Query<(
Entity,
&mut Updates,
&mut Mutations,
&mut ClientTicks,
&mut PriorityMap,
&mut ClientVisibility,
)>,
) -> Result<()> {
replicated_archetypes.update(archetypes, &rules);
for replicated_archetype in replicated_archetypes.iter() {
let archetype = unsafe { archetypes.get(replicated_archetype.id).unwrap_unchecked() };
for entity in archetype.entities() {
let mut entity_range = None;
for (_, mut updates, mut mutations, ..) in &mut clients {
updates.start_entity_changes();
mutations.start_entity();
}
for &(rule, storage) in &replicated_archetype.components {
let (component_index, component_id, fns) = registry.get(rule.fns_id);
let (ptr, ticks) = unsafe {
query.get_component_unchecked(
entity,
archetype.table_id(),
storage,
component_id,
)
};
let mut component = unsafe { ErasedComponent::new(fns, ptr, rule.fns_id) };
let mut ctx = SerializeCtx {
entity: entity.id(),
component_id,
last_changed: ticks.changed,
server_tick: **server_tick,
diff_cursor: None,
type_registry: &type_registry,
storage: &mut replication_storage,
};
let mut component_range = None;
for (client, mut updates, mut mutations, client_ticks, priority, visibility) in
&mut clients
{
if visibility
.get(entity.id())
.is_component_hidden(&filter_registry, component_index)
{
continue;
}
if let Some(entity_ticks) = client_ticks.entities.get(&entity.id())
&& entity_ticks.components.contains(component_index)
{
let base_priority = priority.get(&entity.id()).copied().unwrap_or(1.0);
let tick_diff = **server_tick - entity_ticks.server_tick;
if rule.mode != ReplicationMode::Once
&& base_priority * tick_diff as f32 >= 1.0
&& ticks.is_changed(entity_ticks.system_tick, **change_tick)
{
trace!(
"writing `{:?}` mutation for `{}` for client `{client}`",
rule.fns_id,
entity.id(),
);
if !mutations.entity_added() {
let graph_index = related_entities.graph_index(entity.id());
let entity_range = serialized
.write_cached_entity(&mut entity_range, entity.id())?;
mutations.add_entity(entity.id(), graph_index, entity_range);
}
let diff_cursor = entity_ticks.diff_cursor(component_index);
let component_range = if diff_cursor.is_none() {
serialized.write_cached_component(
&mut ctx,
&mut component_range,
&mut component,
)?
} else {
ctx.diff_cursor = diff_cursor;
let range = serialized.write_component(&mut ctx, &mut component)?;
if let Some(cursor) = ctx.diff_cursor.take() {
mutations.add_diff_cursor(component_index, cursor);
}
range
};
mutations.add_component(component_range);
}
} else {
trace!(
"writing `{:?}` insertion for `{}` for client `{client}`",
rule.fns_id,
entity.id(),
);
if !updates.changed_entity_added() {
let entity_range =
serialized.write_cached_entity(&mut entity_range, entity.id())?;
updates.add_changed_entity(entity_range);
}
let component_range = serialized.write_cached_component(
&mut ctx,
&mut component_range,
&mut component,
)?;
updates.add_inserted_component(component_range, component_index);
}
}
}
for (client, mut updates, mut mutations, mut ticks, _, visibility) in &mut clients {
if visibility.get(entity.id()).is_hidden(&filter_registry) {
continue;
}
let entity_ticks = ticks.entities.entry(entity.id());
let new_for_client = matches!(entity_ticks, Entry::Vacant(_));
if new_for_client
|| updates.changed_entity_added()
|| removal_buffer.contains_key(&entity.id())
{
if mutations.entity_added() {
trace!(
"merging mutations for `{}` with updates for client `{client}`",
entity.id()
);
updates.take_added_entity(&mut mutations);
}
update_ticks(
entity_ticks,
**change_tick,
**server_tick,
updates.take_changed_components(),
);
}
if new_for_client && !updates.changed_entity_added() {
trace!("writing empty `{}` for client `{client}`", entity.id());
let entity_range =
serialized.write_cached_entity(&mut entity_range, entity.id())?;
updates.add_changed_entity(entity_range);
}
}
}
}
removal_buffer.clear();
Ok(())
}
fn update_ticks(
entity_ticks: Entry<Entity, EntityTicks, EntityHash>,
system_tick: Tick,
server_tick: RepliconTick,
components: ComponentMask,
) {
match entity_ticks {
Entry::Occupied(entry) => {
let entity_ticks = entry.into_mut();
entity_ticks.system_tick = system_tick;
entity_ticks.server_tick = server_tick;
entity_ticks.components |= &components;
}
Entry::Vacant(entry) => {
entry.insert(EntityTicks::new(server_tick, system_tick, components));
}
}
}
fn send_messages(
mut split_buffer: Local<Vec<MutationsSplit>>,
time: Res<Time<Real>>,
server_tick: Res<ServerTick>,
change_tick: Res<ServerChangeTick>,
track_mutate_messages: Res<TrackMutateMessages>,
mut serialized: ResMut<SerializedData>,
mut messages: ResMut<ServerMessages>,
mut clients: Query<(
Entity,
&mut Updates,
&mut Mutations,
&ConnectedClient,
&mut ClientTicks,
)>,
) -> Result<()> {
let mut server_tick_range = None;
for (client, updates, mut mutations, connected, mut ticks) in &mut clients {
if !updates.is_empty() {
ticks.update_tick = **server_tick;
let server_tick_range =
serialized.write_cached_tick(&mut server_tick_range, **server_tick)?;
updates.send(&mut messages, client, &serialized, server_tick_range)?;
}
if !mutations.is_empty() || **track_mutate_messages {
let server_tick_range =
serialized.write_cached_tick(&mut server_tick_range, **server_tick)?;
mutations.send(
&mut messages,
client,
&mut ticks,
&mut split_buffer,
&serialized,
**track_mutate_messages,
server_tick_range,
**server_tick,
**change_tick,
time.elapsed(),
connected.max_size,
)?;
}
}
serialized.clear();
Ok(())
}
fn cleanup_storage(remove: On<Remove, Replicated>, mut storage: If<ResMut<ReplicationStorage>>) {
storage.entities.remove(&remove.entity);
}
fn reset(
mut commands: Commands,
mut messages: ResMut<ServerMessages>,
mut server_tick: ResMut<ServerTick>,
mut related_entities: ResMut<RelatedEntities>,
clients: Query<Entity, With<ConnectedClient>>,
mut message_buffer: ResMut<MessageBuffer>,
) {
messages.clear();
*server_tick = Default::default();
message_buffer.clear();
related_entities.clear();
for client in &clients {
commands.entity(client).despawn();
}
}
#[derive(SystemSet, Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum ServerSystems {
ReceivePackets,
Receive,
ReadRelations,
IncrementTick,
Send,
SendPackets,
}
#[derive(Resource, Deref, DerefMut, Default)]
struct ServerChangeTick(Tick);
#[derive(Resource, Deref, DerefMut, Default)]
struct DespawnBuffer(Vec<Entity>);
#[derive(Component, Reflect, Default)]
#[component(immutable)]
#[require(ClientTicks, ClientVisibility, PriorityMap, Updates, Mutations)]
pub struct AuthorizedClient;
#[derive(Component, Reflect, Deref, DerefMut, Default, Debug, Clone)]
pub struct PriorityMap(EntityHashMap<f32>);