use core::any::TypeId;
use bevy::{
ecs::{component::ComponentId, entity::MapEntities},
prelude::*,
ptr::PtrMut,
};
use bytes::Bytes;
use log::{debug, error, warn};
use serde::{Serialize, de::DeserializeOwned};
use super::{
client_message,
ctx::{ClientSendCtx, ServerReceiveCtx},
message_fns::{DeserializeFn, MessageFns, SerializeFn, UntypedMessageFns},
registry::RemoteMessageRegistry,
};
use crate::prelude::*;
pub trait SharedMessageAppExt {
fn add_shared_message<M: Message + Serialize + DeserializeOwned>(
&mut self,
channel: Channel,
) -> &mut Self {
self.add_shared_message_with(
channel,
client_message::default_serialize::<M>,
client_message::default_deserialize::<M>,
)
}
fn add_mapped_shared_message<M>(&mut self, channel: Channel) -> &mut Self
where
M: Message + Serialize + DeserializeOwned + MapEntities + Clone,
{
self.add_shared_message_with(
channel,
client_message::default_serialize_mapped::<M>,
client_message::default_deserialize::<M>,
)
}
fn add_shared_message_with<M: Message>(
&mut self,
channel: Channel,
serialize: SerializeFn<ClientSendCtx, M>,
deserialize: DeserializeFn<ServerReceiveCtx, M>,
) -> &mut Self;
}
impl SharedMessageAppExt for App {
fn add_shared_message_with<M: Message>(
&mut self,
channel: Channel,
serialize: SerializeFn<ClientSendCtx, M>,
deserialize: DeserializeFn<ServerReceiveCtx, M>,
) -> &mut Self {
self.world_mut()
.resource_mut::<ProtocolHasher>()
.add_shared_message::<M>();
let fns = MessageFns::new(serialize, deserialize);
let message = SharedMessage::new(self, channel, fns);
let mut registry = self.world_mut().resource_mut::<RemoteMessageRegistry>();
registry.register_shared_message(message);
self
}
}
pub(crate) struct SharedMessage {
messages_id: ComponentId,
shared_messages_id: ComponentId,
channel_id: usize,
type_id: TypeId,
send: SendFn,
receive: ReceiveFn,
send_locally: SendLocallyFn,
reset: ResetFn,
fns: UntypedMessageFns,
}
impl SharedMessage {
pub(super) fn new<M: Message, I: 'static>(
app: &mut App,
channel: Channel,
fns: MessageFns<ClientSendCtx, ServerReceiveCtx, M, I>,
) -> Self {
let channel_id = app
.world_mut()
.resource_mut::<RepliconChannels>()
.create_client_channel(channel);
app.add_message::<M>().add_message::<LocalOrRemote<M>>();
let messages_id = app.world().resource_id::<Messages<M>>().unwrap();
let shared_messages_id = app
.world()
.resource_id::<Messages<LocalOrRemote<M>>>()
.unwrap();
Self {
messages_id,
shared_messages_id,
channel_id,
type_id: TypeId::of::<M>(),
send: Self::send_typed::<M, I>,
receive: Self::receive_typed::<M, I>,
send_locally: Self::send_locally_typed::<M>,
reset: Self::reset_typed::<M>,
fns: fns.into(),
}
}
pub(crate) fn messages_id(&self) -> ComponentId {
self.messages_id
}
pub(crate) fn shared_messages_id(&self) -> ComponentId {
self.shared_messages_id
}
pub(super) fn channel_id(&self) -> usize {
self.channel_id
}
pub(super) fn type_id(&self) -> TypeId {
self.type_id
}
pub(crate) unsafe fn send(
&self,
ctx: &mut ClientSendCtx,
messages: PtrMut,
shared_messages: PtrMut,
client_messages: &mut ClientMessages,
) {
unsafe { (self.send)(self, ctx, messages, shared_messages, client_messages) };
}
unsafe fn send_typed<M: Message, I: 'static>(
&self,
ctx: &mut ClientSendCtx,
messages: PtrMut,
shared_messages: PtrMut,
client_messages: &mut ClientMessages,
) {
let messages: &mut Messages<M> = unsafe { messages.deref_mut() };
let shared_messages: &mut Messages<LocalOrRemote<M>> =
unsafe { shared_messages.deref_mut() };
for message in messages.drain() {
let mut message_bytes = Vec::new();
if let Err(e) = unsafe { self.serialize::<M, I>(ctx, &message, &mut message_bytes) } {
error!(
"ignoring message `{}` that failed to serialize: {e}",
ShortName::of::<M>()
);
continue;
}
debug!("sending shared message `{}`", ShortName::of::<M>());
client_messages.send(self.channel_id, message_bytes);
shared_messages.write(LocalOrRemote {
sender: Sender::Local,
message,
});
}
}
pub(crate) unsafe fn receive(
&self,
ctx: &mut ServerReceiveCtx,
shared_messages: PtrMut,
server_messages: &mut ServerMessages,
) {
unsafe { (self.receive)(self, ctx, shared_messages, server_messages) }
}
unsafe fn receive_typed<M: Message, I: 'static>(
&self,
ctx: &mut ServerReceiveCtx,
shared_messages: PtrMut,
server_messages: &mut ServerMessages,
) {
let shared_messages: &mut Messages<LocalOrRemote<M>> =
unsafe { shared_messages.deref_mut() };
for (client, mut message) in server_messages.receive(self.channel_id) {
match unsafe { self.deserialize::<M, I>(ctx, &mut message) } {
Ok(message) => {
debug!(
"writing shared message `{}` from client `{client}`",
ShortName::of::<M>()
);
shared_messages.write(LocalOrRemote {
sender: Sender::Remote(client.into()),
message,
});
}
Err(e) => debug!(
"ignoring message `{}` from client `{client}` that failed to deserialize: {e}",
ShortName::of::<M>()
),
}
}
}
pub(crate) unsafe fn send_locally(&self, shared_messages: PtrMut, messages: PtrMut) {
unsafe { (self.send_locally)(shared_messages, messages) }
}
unsafe fn send_locally_typed<M: Message>(shared_messages: PtrMut, messages: PtrMut) {
let shared_messages: &mut Messages<LocalOrRemote<M>> =
unsafe { shared_messages.deref_mut() };
let messages: &mut Messages<M> = unsafe { messages.deref_mut() };
if !messages.is_empty() {
debug!(
"writing {} shared message(s) `{}` locally",
messages.len(),
ShortName::of::<M>()
);
shared_messages.write_batch(messages.drain().map(|message| LocalOrRemote {
sender: Sender::Local,
message,
}));
}
}
pub(crate) unsafe fn reset(&self, messages: PtrMut) {
unsafe { (self.reset)(messages) }
}
unsafe fn reset_typed<M: Message>(messages: PtrMut) {
let messages: &mut Messages<M> = unsafe { messages.deref_mut() };
let drained_count = messages.drain().count();
if drained_count > 0 {
warn!(
"discarded {drained_count} shared messages of type `{}` due to a disconnect",
ShortName::of::<M>()
);
}
}
unsafe fn serialize<M: 'static, I: 'static>(
&self,
ctx: &mut ClientSendCtx,
message: &M,
message_bytes: &mut Vec<u8>,
) -> Result<()> {
unsafe {
self.fns
.typed::<ClientSendCtx, ServerReceiveCtx, M, I>()
.serialize(ctx, message, message_bytes)?;
}
if ctx.invalid_entities.is_empty() {
Ok(())
} else {
let error_text = format!(
"unable to map entities `{:?}` for the server, \
make sure that the message references entities visible to the server",
ctx.invalid_entities,
);
ctx.invalid_entities.clear();
Err(error_text.into())
}
}
unsafe fn deserialize<M: 'static, I: 'static>(
&self,
ctx: &mut ServerReceiveCtx,
message: &mut Bytes,
) -> Result<M> {
unsafe {
self.fns
.typed::<ClientSendCtx, ServerReceiveCtx, M, I>()
.deserialize(ctx, message)
}
}
}
type SendFn = unsafe fn(&SharedMessage, &mut ClientSendCtx, PtrMut, PtrMut, &mut ClientMessages);
type ReceiveFn = unsafe fn(&SharedMessage, &mut ServerReceiveCtx, PtrMut, &mut ServerMessages);
type SendLocallyFn = unsafe fn(PtrMut, PtrMut);
type ResetFn = unsafe fn(PtrMut);
#[derive(Message, Event, Deref, DerefMut, Debug, Clone, Copy)]
pub struct LocalOrRemote<T> {
pub sender: Sender,
#[deref]
pub message: T,
}
impl<E: EntityEvent> EntityEvent for LocalOrRemote<E> {
fn event_target(&self) -> Entity {
self.message.event_target()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Sender {
Local,
Remote(ClientId),
}
impl Sender {
pub fn is_remote(self) -> bool {
matches!(self, Self::Remote(_))
}
}