nevy_messages 0.5.0

Structured messages for nevy
Documentation
use std::{any::TypeId, collections::VecDeque};

use bevy::prelude::*;
use log::warn;
use serde::de::DeserializeOwned;

use crate::{
    MessageSystems, NevyMessagesSchedule,
    protocol::{ConnectionProtocolEntity, Protocol},
    reader::MessageStreamReaders,
};

#[derive(Component)]
#[require(MessageStreamReaders)]
pub struct ReceivedMessages<T> {
    messages: VecDeque<T>,
}

impl<T> Default for ReceivedMessages<T> {
    fn default() -> Self {
        ReceivedMessages {
            messages: VecDeque::new(),
        }
    }
}

impl<T> ReceivedMessages<T> {
    pub fn drain(&mut self) -> impl Iterator<Item = T> {
        self.messages.drain(..)
    }
}

pub(crate) fn build_message<T>(app: &mut App)
where
    T: Send + Sync + 'static + DeserializeOwned,
{
    app.add_observer(insert_received_messages::<T>);

    let schedule = **app.world().resource::<NevyMessagesSchedule>();

    app.add_systems(
        schedule,
        deserialize_messages::<T>.in_set(MessageSystems::DeserializeMessages),
    );
}

fn insert_received_messages<T>(
    insert: On<Insert, ConnectionProtocolEntity>,
    mut commands: Commands,
    connection_q: Query<&ConnectionProtocolEntity>,
    protocol_q: Query<&Protocol>,
) -> Result
where
    T: Send + Sync + 'static,
{
    let protocol_entity = connection_q.get(insert.entity)?;
    let protocol = protocol_q.get(**protocol_entity)?;

    if protocol.lookup.contains_key(&TypeId::of::<T>()) {
        commands
            .entity(insert.entity)
            .insert(ReceivedMessages::<T>::default());
    }

    Ok(())
}

fn deserialize_messages<T>(
    mut connection_q: Query<(
        &mut MessageStreamReaders,
        &mut ReceivedMessages<T>,
        &ConnectionProtocolEntity,
    )>,
    protocol_q: Query<&Protocol>,
) -> Result
where
    T: Send + Sync + 'static + DeserializeOwned,
{
    for (mut readers, mut deserialized_buffer, protocol_entity) in &mut connection_q {
        let protocol = protocol_q.get(**protocol_entity)?;
        let message_id = protocol
            .lookup
            .get(&TypeId::of::<T>())
            .ok_or("Protocol should have a message id for this type")?;

        let Some(serialized_buffer) = readers.buffers.get_mut(message_id) else {
            continue;
        };

        while let Some(message) = serialized_buffer.pop_front() {
            let message = match postcard::from_bytes::<T>(&message) {
                Ok(message) => message,
                Err(err) => {
                    warn!(
                        "Failed to deserialize message of type {}: {}",
                        std::any::type_name::<T>(),
                        err
                    );
                    continue;
                }
            };

            deserialized_buffer.messages.push_back(message);
        }
    }

    Ok(())
}