pub mod events;
pub(crate) mod message;
use anyhow::Result;
use bevy::prelude::{Entity, World};
use serde::{Deserialize, Serialize};
use tracing::{info, trace, trace_span};
use crate::channel::builder::{EntityUpdatesChannel, PingChannel};
use crate::channel::senders::ChannelSend;
use crate::connection::events::ConnectionEvents;
use crate::connection::message::ProtocolMessage;
use crate::connection::message::ProtocolMessage::Replication;
use crate::packet::message_manager::MessageManager;
use crate::packet::packet_manager::Payload;
use crate::prelude::MapEntities;
use crate::protocol::channel::{ChannelKind, ChannelRegistry};
use crate::protocol::Protocol;
use crate::serialize::reader::ReadBuffer;
use crate::shared::ping::manager::{PingConfig, PingManager};
use crate::shared::ping::message::SyncMessage;
use crate::shared::replication::manager::ReplicationManager;
use crate::shared::replication::{ReplicationMessage, ReplicationMessageData};
use crate::shared::tick_manager::Tick;
use crate::shared::tick_manager::TickManager;
use crate::shared::time_manager::TimeManager;
use crate::utils::named::Named;
pub struct Connection<P: Protocol> {
pub ping_manager: PingManager,
pub message_manager: MessageManager<ProtocolMessage<P>>,
pub(crate) replication_manager: ReplicationManager<P>,
pub events: ConnectionEvents<P>,
}
impl<P: Protocol> Connection<P> {
pub fn new(channel_registry: &ChannelRegistry, ping_config: &PingConfig) -> Self {
let mut message_manager = MessageManager::new(channel_registry);
let update_acks_tracker = message_manager
.channels
.get_mut(&ChannelKind::of::<EntityUpdatesChannel>())
.unwrap()
.sender
.subscribe_acks();
let replication_manager = ReplicationManager::new(update_acks_tracker);
Self {
ping_manager: PingManager::new(ping_config),
message_manager,
replication_manager,
events: ConnectionEvents::new(),
}
}
}
impl<P: Protocol> Connection<P> {
pub fn update(&mut self, time_manager: &TimeManager, tick_manager: &TickManager) {
self.message_manager
.update(time_manager, &self.ping_manager, tick_manager);
self.ping_manager.update(time_manager);
}
pub fn buffer_message(&mut self, message: P::Message, channel: ChannelKind) -> Result<()> {
let channel_name = self
.message_manager
.channel_registry
.name(&channel)
.unwrap_or("unknown")
.to_string();
let message = ProtocolMessage::Message(message);
message.emit_send_logs(&channel_name);
self.message_manager.buffer_send(message, channel)?;
Ok(())
}
pub fn buffer_replication_messages(&mut self, tick: Tick) -> Result<()> {
self.replication_manager
.finalize(tick)
.into_iter()
.try_for_each(|(channel, group_id, message_data)| {
let should_track_ack = matches!(message_data, ReplicationMessageData::Updates(_));
let channel_name = self
.message_manager
.channel_registry
.name(&channel)
.unwrap_or("unknown")
.to_string();
let message = Replication(ReplicationMessage {
group_id,
data: message_data,
});
message.emit_send_logs(&channel_name);
let message_id = self
.message_manager
.buffer_send(message, channel)?
.expect("The EntityUpdatesChannel should always return a message_id");
if should_track_ack {
self.replication_manager
.updates_message_id_to_group_id
.insert(message_id, group_id);
}
Ok(())
})
}
pub fn receive(
&mut self,
world: &mut World,
time_manager: &TimeManager,
) -> ConnectionEvents<P> {
let _span = trace_span!("receive").entered();
for (channel_kind, messages) in self.message_manager.read_messages() {
let channel_name = self
.message_manager
.channel_registry
.name(&channel_kind)
.unwrap_or("unknown");
let _span_channel = trace_span!("channel", channel = channel_name).entered();
if !messages.is_empty() {
trace!(?channel_name, "Received messages");
for (tick, mut message) in messages.into_iter() {
message.map_entities(&self.replication_manager.entity_map);
match message {
ProtocolMessage::Message(message) => {
self.events.push_message(channel_kind, message);
}
ProtocolMessage::Replication(replication) => {
self.replication_manager.recv_message(replication, tick);
}
ProtocolMessage::Sync(ref sync) => {
match sync {
SyncMessage::Ping(ping) => {
self.ping_manager.buffer_pending_pong(ping, time_manager);
}
SyncMessage::Pong(pong) => {
self.ping_manager.process_pong(pong, time_manager);
}
}
}
}
}
for (group, replication_list) in self.replication_manager.read_messages() {
trace!(?group, ?replication_list, "read replication messages");
replication_list.into_iter().for_each(|(_, replication)| {
self.replication_manager
.apply_world(world, replication, &mut self.events);
});
}
}
}
std::mem::replace(&mut self.events, ConnectionEvents::new())
}
pub fn send_packets(
&mut self,
time_manager: &TimeManager,
tick_manager: &TickManager,
) -> Result<Vec<Payload>> {
if time_manager.is_ready_to_send() {
if let Some(ping) = self.ping_manager.maybe_prepare_ping(time_manager) {
trace!("Sending ping {:?}", ping);
let message = ProtocolMessage::Sync(SyncMessage::Ping(ping));
let channel = ChannelKind::of::<PingChannel>();
self.message_manager.buffer_send(message, channel)?;
}
self.ping_manager
.take_pending_pongs()
.into_iter()
.try_for_each(|mut pong| {
trace!("Sending pong {:?}", pong);
pong.pong_sent_time = time_manager.current_time();
let message = ProtocolMessage::Sync(SyncMessage::Pong(pong));
let channel = ChannelKind::of::<PingChannel>();
self.message_manager.buffer_send(message, channel)?;
Ok::<(), anyhow::Error>(())
})?;
}
self.message_manager
.send_packets(tick_manager.current_tick())
}
pub fn recv_packet(&mut self, reader: &mut impl ReadBuffer) -> Result<Tick> {
self.message_manager.recv_packet(reader)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tests::protocol::*;
}