use crate::MessageManager;
use crate::plugin::MessagePlugin;
use crate::registry::{MessageError, MessageKind, MessageRegistry};
use crate::{Message, MessageNetId};
use alloc::vec::Vec;
use bevy_ecs::{
change_detection::MutUntyped,
component::Component,
entity::Entity,
event::Event,
query::With,
system::{ParallelCommands, Query, Res},
world::{DeferredWorld, FilteredEntityMut, World},
};
use lightyear_core::tick::Tick;
use lightyear_serde::ToBytes;
use lightyear_serde::entity_map::ReceiveEntityMap;
use lightyear_serde::reader::Reader;
use lightyear_transport::channel::ChannelKind;
use lightyear_transport::channel::receivers::ChannelReceive;
use lightyear_transport::prelude::Transport;
use alloc::sync::Arc;
use bevy_ecs::lifecycle::HookContext;
use bevy_utils::prelude::DebugName;
use bytes::Bytes;
use lightyear_connection::client::Connected;
use lightyear_connection::host::HostClient;
use lightyear_core::id::{PeerId, RemoteId};
use lightyear_core::prelude::LocalTimeline;
use lightyear_serde::registry::ErasedSerializeFns;
use lightyear_transport::packet::message::MessageId;
use tracing::{error, trace};
#[derive(Event)]
pub struct RemoteOn<M: Message> {
pub trigger: M,
pub from: PeerId,
}
#[derive(Component)]
#[require(MessageManager)]
#[component(on_add = MessageReceiver::<M>::on_add_hook)]
pub struct MessageReceiver<M: Message> {
pub(crate) recv: Vec<ReceivedMessage<M>>,
}
#[derive(Debug)]
pub struct ReceivedMessage<M: Message> {
pub data: M,
pub remote_tick: Tick,
pub channel_kind: ChannelKind,
pub message_id: Option<MessageId>,
}
impl<M: Message> Default for MessageReceiver<M> {
fn default() -> Self {
Self { recv: Vec::new() }
}
}
impl<M: Message> MessageReceiver<M> {
pub fn has_messages(&self) -> bool {
!self.recv.is_empty()
}
pub fn receive(&mut self) -> impl Iterator<Item = M> {
self.recv.drain(..).map(|m| m.data)
}
pub fn receive_with_tick(&mut self) -> impl Iterator<Item = ReceivedMessage<M>> {
self.recv.drain(..)
}
pub fn num_messages(&self) -> usize {
self.recv.len()
}
fn on_add_hook(mut world: DeferredWorld, context: HookContext) {
world.commands().queue(move |world: &mut World| {
let mut entity_mut = world.entity_mut(context.entity);
let mut message_manager = entity_mut.get_mut::<MessageManager>().unwrap();
let message_kind_present = message_manager
.receive_messages
.iter()
.any(|(message_kind, _)| *message_kind == MessageKind::of::<M>());
if !message_kind_present {
message_manager
.receive_messages
.push((MessageKind::of::<M>(), context.component_id));
}
})
}
}
pub(crate) type ReceiveMessageFn = unsafe fn(
receiver: MutUntyped,
reader: &mut Reader,
channel_kind: ChannelKind,
remote_tick: Tick,
message_id: Option<MessageId>,
serialize_metadata: &ErasedSerializeFns,
entity_map: &mut ReceiveEntityMap,
) -> Result<(), MessageError>;
pub(crate) type ClearMessageFn = unsafe fn(receiver: MutUntyped);
impl<M: Message> MessageReceiver<M> {
pub(crate) unsafe fn receive_message_typed(
receiver: MutUntyped,
reader: &mut Reader,
channel_kind: ChannelKind,
remote_tick: Tick,
message_id: Option<MessageId>,
serialize_metadata: &ErasedSerializeFns,
entity_map: &mut ReceiveEntityMap,
) -> Result<(), MessageError> {
let mut receiver = unsafe { receiver.with_type::<Self>() };
let message = unsafe { serialize_metadata.deserialize::<_, M, M>(reader, entity_map)? };
let received_message = ReceivedMessage {
data: message,
remote_tick,
channel_kind,
message_id,
};
trace!(
"Received message {:?} on channel {channel_kind:?}",
DebugName::type_name::<M>()
);
receiver.recv.push(received_message);
Ok(())
}
pub(crate) unsafe fn clear_typed(receiver: MutUntyped) {
let mut receiver = unsafe { receiver.with_type::<Self>() };
receiver.recv.clear();
}
}
impl MessagePlugin {
fn receive_message_bytes(
bytes: Bytes,
registry: &MessageRegistry,
receiver_query: &mut Query<FilteredEntityMut>,
entity: Entity,
channel_kind: ChannelKind,
tick: Tick,
message_id: Option<MessageId>,
message_manager: &mut MessageManager,
commands: &ParallelCommands,
remote_peer_id: PeerId,
) -> Result<(), MessageError> {
trace!(
"Received message (id:{message_id:?}) from peer {:?} on channel {channel_kind:?}. {entity:?}",
remote_peer_id
);
let mut reader = Reader::from(bytes);
let message_net_id = MessageNetId::from_bytes(&mut reader)?;
let message_kind = registry
.kind_map
.kind(message_net_id)
.ok_or(MessageError::UnrecognizedMessageId(message_net_id))?;
let serialize_fns = registry
.serialize_fns_map
.get(message_kind)
.ok_or(MessageError::UnrecognizedMessage(*message_kind))?;
if let Some(recv_metadata) = registry.receive_metadata.get(message_kind) {
let component_id = recv_metadata.component_id;
let mut entity_mut = receiver_query.get_mut(entity).unwrap();
let receiver = entity_mut
.get_mut_by_id(component_id)
.ok_or(MessageError::MissingComponent(component_id))?;
unsafe {
(recv_metadata.receive_message_fn)(
receiver,
&mut reader,
channel_kind,
tick,
message_id,
serialize_fns,
&mut message_manager.entity_mapper.remote_to_local,
)
}
} else if let Some(trigger_fn) = registry.receive_trigger.get(message_kind) {
unsafe {
trigger_fn(
commands,
&mut reader,
channel_kind,
tick,
message_id,
serialize_fns,
&mut message_manager.entity_mapper.remote_to_local,
remote_peer_id,
)
}
} else {
Err(MessageError::UnrecognizedMessageId(message_net_id))
}
}
pub fn recv(
timeline: Res<LocalTimeline>,
mut transport_query: Query<
(
Entity,
&mut MessageManager,
&mut Transport,
&RemoteId,
Option<&mut HostClient>,
),
With<Connected>,
>,
receiver_query: Query<FilteredEntityMut>,
registry: Res<MessageRegistry>,
commands: ParallelCommands,
) {
let tick = timeline.tick();
let receiver_query = Arc::new(receiver_query);
transport_query.par_iter_mut().for_each(
|(
entity,
mut message_manager,
mut transport,
remote_peer_id,
mut host_client,
)| {
let mut receiver_query = unsafe { receiver_query.reborrow_unsafe() };
let transport = &mut *transport;
if let Some(host_client) = host_client.as_mut() {
transport
.senders
.iter_mut()
.try_for_each(|(channel_kind, sender_metadata)| {
host_client.buffer.drain(..).try_for_each(
|(bytes, channel_type_id)| {
trace!("Received local message bytes from server on host-client {entity:?} on channel {channel_kind:?}");
Self::receive_message_bytes(
bytes,
®istry,
&mut receiver_query,
entity,
ChannelKind(channel_type_id),
tick,
None,
&mut message_manager,
&commands,
remote_peer_id.0,
)
},
)?;
Ok::<_, MessageError>(())
})
.inspect_err(|e| error!("Error receiving messages: {e:?}"))
.ok();
} else {
transport
.receivers
.values_mut()
.try_for_each(|receiver_metadata| {
let channel_kind = receiver_metadata.channel_kind;
while let Some((tick, bytes, message_id)) =
receiver_metadata.receiver.read_message()
{
Self::receive_message_bytes(
bytes,
®istry,
&mut receiver_query,
entity,
channel_kind,
tick,
message_id,
&mut message_manager,
&commands,
remote_peer_id.0,
)?;
}
Ok::<_, MessageError>(())
})
.inspect_err(|e| error!("Error receiving messages: {e:?}"))
.ok();
}
},
)
}
pub fn clear(
manager_query: Query<(Entity, &MessageManager), With<Connected>>,
mut receiver_query: Query<FilteredEntityMut>,
registry: Res<MessageRegistry>,
) {
manager_query.iter().for_each(|(entity, manager)| {
manager
.receive_messages
.iter()
.for_each(|(kind, component_id)| {
let mut entity_mut = receiver_query.get_mut(entity).unwrap();
let receiver = entity_mut.get_mut_by_id(*component_id).unwrap();
let clear_fn = registry
.receive_metadata
.get(kind)
.unwrap()
.message_clear_fn;
unsafe { clear_fn(receiver) };
})
});
}
}