use crate::plugin::MessagePlugin;
use crate::prelude::MessageReceiver;
use crate::receive::ReceivedMessage;
use crate::registry::{MessageError, MessageKind, MessageRegistry};
use crate::{Message, MessageManager, MessageNetId};
use alloc::sync::Arc;
use alloc::vec::Vec;
use bevy_ecs::lifecycle::HookContext;
use bevy_ecs::{
change_detection::MutUntyped,
component::Component,
entity::Entity,
query::{With, Without},
system::{ParallelCommands, Query, Res},
world::{DeferredWorld, FilteredEntityMut, World},
};
use bevy_reflect::Reflect;
use bevy_utils::prelude::DebugName;
use lightyear_connection::client::Connected;
use lightyear_connection::host::HostClient;
use lightyear_core::prelude::{LocalTimeline, Tick};
use lightyear_serde::ToBytes;
use lightyear_serde::entity_map::SendEntityMap;
use lightyear_serde::registry::ErasedSerializeFns;
use lightyear_serde::writer::Writer;
use lightyear_transport::channel::{Channel, ChannelKind};
use lightyear_transport::prelude::Transport;
#[allow(unused_imports)]
use tracing::{error, info, trace};
pub type Priority = f32;
#[derive(Component, Reflect)]
#[component(on_add = MessageSender::<M>::on_add_hook)]
#[require(MessageManager)]
pub struct MessageSender<M: Message> {
send: Vec<(M, ChannelKind, Priority)>,
#[reflect(ignore)]
writer: Writer,
}
impl<M: Message> Default for MessageSender<M> {
fn default() -> Self {
Self {
send: Vec::new(),
writer: Writer::default(),
}
}
}
pub(crate) type SendMessageFn = unsafe fn(
sender: MutUntyped,
message_net_id: MessageNetId,
transport: &Transport,
serialize_metadata: &ErasedSerializeFns,
entity_map: &mut SendEntityMap,
) -> Result<(), MessageError>;
pub(crate) type SendLocalMessageFn =
unsafe fn(sender: MutUntyped, receiver: MutUntyped, tick: Tick);
impl<M: Message> MessageSender<M> {
pub fn send_with_priority<C: Channel>(&mut self, message: M, priority: Priority) {
self.send.push((message, ChannelKind::of::<C>(), priority));
}
pub fn send<C: Channel>(&mut self, message: M) {
self.send_with_priority::<C>(message, 1.0);
}
pub(crate) unsafe fn send_message_typed(
message_sender: MutUntyped,
net_id: MessageNetId,
transport: &Transport,
serialize_metadata: &ErasedSerializeFns,
entity_map: &mut SendEntityMap,
) -> Result<(), MessageError> {
let mut sender = unsafe { message_sender.with_type::<Self>() };
let sender = &mut *sender;
sender.send.drain(..).try_for_each(|(message, channel_kind, priority)| {
net_id.to_bytes(&mut sender.writer)?;
unsafe { serialize_metadata.serialize::<SendEntityMap, M, M>(&message, &mut sender.writer, entity_map)? };
let bytes = sender.writer.split();
#[cfg(feature = "metrics")]
{
metrics::counter!("message/send", "message" => core::any::type_name::<M>()).increment(1);
metrics::gauge!("message/send_bytes", "message" => core::any::type_name::<M>()).increment(bytes.len() as f64);
}
trace!("Sending message of type {:?} with net_id {net_id:?}/kind {:?} on channel {channel_kind:?}", DebugName::type_name::<M>(), MessageKind::of::<M>());
transport.send_erased(channel_kind, bytes, priority)?;
Ok(())
})
}
pub(crate) unsafe fn send_local_message_typed(
message_sender: MutUntyped,
message_receiver: MutUntyped,
tick: Tick,
) {
let mut sender = unsafe { message_sender.with_type::<Self>() };
let mut receiver = unsafe { message_receiver.with_type::<MessageReceiver<M>>() };
let sender = &mut *sender;
sender
.send
.drain(..)
.for_each(|(message, channel_kind, _)| {
trace!(
"Send local message of type {:?} on channel {channel_kind:?}",
DebugName::type_name::<M>()
);
receiver.recv.push(ReceivedMessage::<M> {
data: message,
remote_tick: tick,
channel_kind,
message_id: None,
});
})
}
pub fn on_add_hook(mut world: DeferredWorld, context: HookContext) {
world.commands().queue(move |world: &mut World| {
let mut entity_mut = world.entity_mut(context.entity);
let mut message_manager = entity_mut.get_mut::<MessageManager>().unwrap();
let message_kind_present = message_manager
.send_messages
.iter()
.any(|(message_kind, _)| *message_kind == MessageKind::of::<M>());
if !message_kind_present {
message_manager
.send_messages
.push((MessageKind::of::<M>(), context.component_id));
}
})
}
}
impl MessagePlugin {
pub fn send(
mut transport_query: Query<
(Entity, &Transport, &mut MessageManager),
(With<Connected>, Without<HostClient>),
>,
message_sender_query: Query<FilteredEntityMut>,
registry: Res<MessageRegistry>,
) {
let message_sender_query = Arc::new(message_sender_query);
transport_query
.par_iter_mut()
.for_each(|(entity, transport, mut message_manager)| {
let mut message_sender_query = unsafe { message_sender_query.reborrow_unsafe() };
let message_manager = &mut *message_manager;
message_manager
.send_messages
.iter()
.try_for_each(|(message_kind, sender_id)| {
let mut entity_mut = message_sender_query.get_mut(entity).unwrap();
let message_sender = entity_mut
.get_mut_by_id(*sender_id)
.ok_or(MessageError::MissingComponent(*sender_id))?;
let send_metadata = registry
.send_metadata
.get(message_kind)
.ok_or(MessageError::UnrecognizedMessage(*message_kind))?;
let serialize_fns = registry
.serialize_fns_map
.get(message_kind)
.ok_or(MessageError::UnrecognizedMessage(*message_kind))?;
let message_id = registry
.kind_map
.net_id(message_kind)
.ok_or(MessageError::UnrecognizedMessage(*message_kind))?;
unsafe {
(send_metadata.send_message_fn)(
message_sender,
*message_id,
transport,
serialize_fns,
&mut message_manager.entity_mapper.local_to_remote,
)?;
}
Ok::<_, MessageError>(())
})
.inspect_err(|e| error!("error sending message: {e:?}"))
.ok();
let message_manager = &mut *message_manager;
message_manager
.send_triggers
.iter()
.try_for_each(|(message_kind, sender_id)| {
let mut entity_mut = message_sender_query.get_mut(entity).unwrap();
let message_sender = entity_mut
.get_mut_by_id(*sender_id)
.ok_or(MessageError::MissingComponent(*sender_id))?;
let send_metadata = registry
.send_trigger_metadata
.get(message_kind)
.ok_or(MessageError::UnrecognizedMessage(*message_kind))?;
let serialize_fns = registry
.serialize_fns_map
.get(message_kind)
.ok_or(MessageError::UnrecognizedMessage(*message_kind))?;
let message_id = registry
.kind_map
.net_id(message_kind)
.ok_or(MessageError::UnrecognizedMessage(*message_kind))?;
unsafe {
(send_metadata.send_trigger_fn)(
message_sender,
*message_id,
transport,
serialize_fns,
&mut message_manager.entity_mapper.local_to_remote,
)?;
}
Ok::<_, MessageError>(())
})
.inspect_err(|e| error!("error sending trigger: {e:?}"))
.ok();
})
}
pub fn send_local(
timeline: Res<LocalTimeline>,
mut manager_query: Query<
(Entity, &mut MessageManager),
(With<Connected>, With<HostClient>),
>,
message_components_query: Query<FilteredEntityMut>,
commands: ParallelCommands,
registry: Res<MessageRegistry>,
) {
let tick = timeline.tick();
let message_components_query = Arc::new(message_components_query);
manager_query
.par_iter_mut()
.for_each(|(entity, mut message_manager)| {
let mut message_sender_query =
unsafe { message_components_query.reborrow_unsafe() };
let mut message_receiver_query =
unsafe { message_components_query.reborrow_unsafe() };
let message_manager = &mut *message_manager;
message_manager
.send_messages
.iter()
.try_for_each(|(message_kind, sender_id)| {
let mut entity_mut = message_sender_query.get_mut(entity).unwrap();
let message_sender = entity_mut
.get_mut_by_id(*sender_id)
.ok_or(MessageError::MissingComponent(*sender_id))?;
let receiver_id = message_manager
.receive_messages
.iter()
.find_map(
|(kind, id)| if kind == message_kind { Some(id) } else { None },
)
.ok_or(MessageError::UnrecognizedMessage(*message_kind))?;
let mut entity_mut = message_receiver_query.get_mut(entity).unwrap();
let message_receiver = entity_mut
.get_mut_by_id(*receiver_id)
.ok_or(MessageError::MissingComponent(*receiver_id))?;
let send_metadata = registry
.send_metadata
.get(message_kind)
.ok_or(MessageError::UnrecognizedMessage(*message_kind))?;
unsafe {
(send_metadata.send_local_message_fn)(
message_sender,
message_receiver,
tick,
);
}
Ok::<_, MessageError>(())
})
.inspect_err(|e| error!("error sending message on host-client: {e:?}"))
.ok();
let message_manager = &mut *message_manager;
message_manager
.send_triggers
.iter()
.try_for_each(|(message_kind, sender_id)| {
let mut entity_mut = message_sender_query.get_mut(entity).unwrap();
let message_sender = entity_mut
.get_mut_by_id(*sender_id)
.ok_or(MessageError::MissingComponent(*sender_id))?;
let send_metadata = registry
.send_trigger_metadata
.get(message_kind)
.ok_or(MessageError::UnrecognizedMessage(*message_kind))?;
unsafe {
(send_metadata.send_local_trigger_fn)(message_sender, &commands);
}
Ok::<_, MessageError>(())
})
.inspect_err(|e| error!("error sending trigger on host-client: {e:?}"))
.ok();
})
}
}