use bevy::{
ecs::system::{FilteredResourcesMutParamBuilder, FilteredResourcesParamBuilder, ParamBuilder},
prelude::*,
};
use super::ServerUpdateTick;
use crate::{
prelude::*,
shared::{
message::{
ctx::{ClientReceiveCtx, ClientSendCtx},
registry::RemoteMessageRegistry,
},
server_entity_map::ServerEntityMap,
},
};
pub struct ClientMessagePlugin;
impl Plugin for ClientMessagePlugin {
fn build(&self, _app: &mut App) {}
fn finish(&self, app: &mut App) {
let registry = app
.world_mut()
.remove_resource::<RemoteMessageRegistry>()
.expect("message registry should be initialized on app build");
let send_fn = (
FilteredResourcesParamBuilder::new(|builder| {
for message in registry.iter_all_client() {
builder.add_read_by_id(message.messages_id());
}
}),
FilteredResourcesMutParamBuilder::new(|builder| {
for message in registry.iter_all_client() {
builder.add_write_by_id(message.reader_id());
}
}),
ParamBuilder,
ParamBuilder,
ParamBuilder,
ParamBuilder,
)
.build_state(app.world_mut())
.build_system(send);
let receive_builder = (
FilteredResourcesMutParamBuilder::new(|builder| {
for message in registry.iter_all_server() {
builder.add_write_by_id(message.messages_id());
}
}),
FilteredResourcesMutParamBuilder::new(|builder| {
for message in registry.iter_all_server() {
builder.add_write_by_id(message.queue_id());
}
}),
ParamBuilder,
ParamBuilder,
ParamBuilder,
ParamBuilder,
ParamBuilder,
);
let receive_fn = receive_builder
.clone()
.build_state(app.world_mut())
.build_system(receive);
let enter_receive_fn = receive_builder
.build_state(app.world_mut())
.build_system(receive);
let trigger_builder = (
FilteredResourcesMutParamBuilder::new(|builder| {
for event in registry.iter_server_events() {
builder.add_write_by_id(event.message().messages_id());
}
}),
ParamBuilder,
ParamBuilder,
);
let trigger_fn = trigger_builder
.clone()
.build_state(app.world_mut())
.build_system(trigger);
let enter_trigger_fn = trigger_builder
.build_state(app.world_mut())
.build_system(trigger);
let send_locally_fn = (
FilteredResourcesMutParamBuilder::new(|builder| {
for message in registry.iter_all_client() {
builder.add_write_by_id(message.from_messages_id());
}
}),
FilteredResourcesMutParamBuilder::new(|builder| {
for message in registry.iter_all_client() {
builder.add_write_by_id(message.messages_id());
}
}),
ParamBuilder,
)
.build_state(app.world_mut())
.build_system(send_locally);
let reset_fn = (
FilteredResourcesMutParamBuilder::new(|builder| {
for message in registry.iter_all_client() {
builder.add_write_by_id(message.messages_id());
}
}),
FilteredResourcesMutParamBuilder::new(|builder| {
for message in registry.iter_all_server() {
builder.add_write_by_id(message.queue_id());
}
}),
ParamBuilder,
)
.build_state(app.world_mut())
.build_system(reset);
app.insert_resource(registry)
.add_systems(
PreUpdate,
(
receive_fn.run_if(in_state(ClientState::Connected)),
trigger_fn,
)
.chain()
.after(super::receive_replication)
.in_set(ClientSystems::Receive),
)
.add_systems(
OnEnter(ClientState::Connected),
(enter_receive_fn, enter_trigger_fn)
.chain()
.after(super::receive_replication)
.in_set(ClientSystems::Receive),
)
.add_systems(
OnExit(ClientState::Connected),
reset_fn.in_set(ClientSystems::Reset),
)
.add_systems(
PostUpdate,
(
send_fn.run_if(in_state(ClientState::Connected)),
send_locally_fn.run_if(in_state(ClientState::Disconnected)),
)
.chain()
.in_set(ClientSystems::Send),
);
}
}
fn send(
messages: FilteredResources,
mut readers: FilteredResourcesMut,
mut client_messages: ResMut<ClientMessages>,
type_registry: Res<AppTypeRegistry>,
entity_map: Res<ServerEntityMap>,
registry: Res<RemoteMessageRegistry>,
) {
let mut ctx = ClientSendCtx {
entity_map: &entity_map,
type_registry: &type_registry,
invalid_entities: Vec::new(),
};
for message in registry.iter_all_client() {
let messages = messages
.get_by_id(message.messages_id())
.expect("messages resource should be accessible");
let reader = readers
.get_mut_by_id(message.reader_id())
.expect("message reader resource should be accessible");
unsafe {
message.send(
&mut ctx,
&messages,
reader.into_inner(),
&mut client_messages,
);
}
}
}
fn receive(
mut messages: FilteredResourcesMut,
mut queues: FilteredResourcesMut,
mut client_messages: ResMut<ClientMessages>,
type_registry: Res<AppTypeRegistry>,
entity_map: Res<ServerEntityMap>,
message_registry: Res<RemoteMessageRegistry>,
update_tick: Res<ServerUpdateTick>,
) {
let mut ctx = ClientReceiveCtx {
type_registry: &type_registry,
entity_map: &entity_map,
invalid_entities: Vec::new(),
};
for message in message_registry.iter_all_server() {
let messages = messages
.get_mut_by_id(message.messages_id())
.expect("messages resource should be accessible");
let queue = queues
.get_mut_by_id(message.queue_id())
.expect("queue resource should be accessible");
unsafe {
message.receive(
&mut ctx,
messages.into_inner(),
queue.into_inner(),
&mut client_messages,
**update_tick,
)
};
}
}
fn trigger(
mut messages: FilteredResourcesMut,
mut commands: Commands,
registry: Res<RemoteMessageRegistry>,
) {
for event in registry.iter_server_events() {
let messages = messages
.get_mut_by_id(event.message().messages_id())
.expect("messages resource should be accessible");
event.trigger(&mut commands, messages.into_inner());
}
}
fn send_locally(
mut from_messages: FilteredResourcesMut,
mut messages: FilteredResourcesMut,
registry: Res<RemoteMessageRegistry>,
) {
for message in registry.iter_all_client() {
let from_messages = from_messages
.get_mut_by_id(message.from_messages_id())
.expect("from messages resource should be accessible");
let messages = messages
.get_mut_by_id(message.messages_id())
.expect("messages resource should be accessible");
unsafe { message.send_locally(from_messages.into_inner(), messages.into_inner()) };
}
}
fn reset(
mut messages: FilteredResourcesMut,
mut queues: FilteredResourcesMut,
registry: Res<RemoteMessageRegistry>,
) {
for message in registry.iter_all_client() {
let messages = messages
.get_mut_by_id(message.messages_id())
.expect("messages resource should be accessible");
unsafe { message.reset(messages.into_inner()) };
}
for messages in registry.iter_all_server() {
let queue = queues
.get_mut_by_id(messages.queue_id())
.expect("queue resource should be accessible");
unsafe { messages.reset(queue.into_inner()) };
}
}