use bevy_connect::ClientId;
use bevy_ecs::message::Message;
use bevy_ecs::message::MessageReader;
use bevy_ecs::schedule::IntoScheduleConfigs;
use bevy_ecs::schedule::common_conditions::resource_added;
use bevy_ecs::system::Commands;
use bevy_ecs::system::Res;
use bevy_mesh::Mesh;
use tracing::info;
use tracing::trace;
use tracing::warn;
mod image_serde;
mod mesh_serde;
use crate::{
lib_priv::SyncTrackerRes,
networking::assets::{image_serde::image_to_bin, mesh_serde::mesh_to_bin},
};
use bevy_app::{App, Plugin, Update};
use bevy_asset::{AssetId, Assets};
use bevy_audio::AudioSource;
use bevy_connect::{events::MessageReceivedEvent, prelude::Channel};
use bevy_ecs::{
schedule::common_conditions::resource_exists,
system::{Command, ResMut},
world::World,
};
use bevy_image::Image;
use image_serde::bin_to_image;
use mesh_serde::bin_to_mesh;
use serde::{Deserialize, Serialize};
use tracing::debug;
#[derive(Serialize, Deserialize, Debug, Copy, Clone)]
pub(crate) enum AssetDataType {
Mesh,
Image,
Audio,
}
#[derive(Serialize, Deserialize, Debug)]
pub(crate) enum AssetMessage {
RequestInitialAssets(u128),
RequestAsset(u128, AssetDataType, uuid::Uuid),
Asset {
asset_uuid: uuid::Uuid,
asset_type: AssetDataType,
data: Vec<u8>,
},
}
#[derive(Message)]
struct AssetNotFound(AssetDataType, uuid::Uuid);
pub(crate) struct SyncAssetPlugin;
impl Plugin for SyncAssetPlugin {
fn build(&self, app: &mut App) {
app.add_message::<AssetNotFound>();
app.add_systems(
Update,
receive_message.run_if(resource_exists::<Channel<AssetMessage>>),
);
app.add_systems(
Update,
asset_not_found.run_if(resource_exists::<Channel<AssetMessage>>),
);
app.add_systems(
Update,
initial_request_of_assets.run_if(resource_added::<Channel<AssetMessage>>),
);
}
}
fn initial_request_of_assets(mut channel: ResMut<Channel<AssetMessage>>) {
if channel.is_host() {
return;
}
let Some(host) = channel.host_uuid() else {
warn!("No host uuid known");
return;
};
let client = channel.uuid();
channel.send_to(host, AssetMessage::RequestInitialAssets(client.0));
}
fn asset_not_found(
mut events: MessageReader<AssetNotFound>,
mut channel: ResMut<Channel<AssetMessage>>,
) {
if channel.is_host() {
return;
}
for e in events.read() {
let Some(dst) = channel.host_uuid() else {
warn!("No host uuid known");
continue;
};
let src = channel.uuid();
channel.send_to(dst, AssetMessage::RequestAsset(src.0, e.0, e.1));
}
}
pub(crate) struct SendMeshCommand(pub(crate) Option<ClientId>, pub(crate) uuid::Uuid);
impl Command for SendMeshCommand {
fn apply(self, world: &mut World) {
if world.get_resource::<Channel<AssetMessage>>().is_none() {
debug!("No network, mesh {} not sent.", self.1);
return;
}
let meshes = world.resource::<Assets<Mesh>>();
let Some(mesh) = meshes.get(self.1) else {
warn!("Mesh not found in assets: {}", self.1);
world.write_message(AssetNotFound(AssetDataType::Mesh, self.1));
return;
};
let Some(v) = mesh_to_bin(mesh) else {
warn!("Error serializing mesh {}.", self.1);
return;
};
let mut channel = world.resource_mut::<Channel<AssetMessage>>();
debug!(
"Sending (host:{}) (to {:?}) mesh {} len {}...",
channel.is_host(),
self.0,
self.1,
v.len()
);
let msg = AssetMessage::Asset {
asset_uuid: self.1,
asset_type: AssetDataType::Mesh,
data: v,
};
if let Some(dst) = self.0 {
channel.send_to(dst, msg);
} else {
channel.broadcast(msg);
}
}
}
pub(crate) struct SendImageCommand(pub(crate) Option<ClientId>, pub(crate) uuid::Uuid);
impl Command for SendImageCommand {
fn apply(self, world: &mut World) {
if world.get_resource::<Channel<AssetMessage>>().is_none() {
debug!("No network, image {} not sent.", self.1);
return;
}
let images = world.resource::<Assets<Image>>();
let Some(image) = images.get(self.1) else {
warn!("Image not found in assets: {}", self.1);
world.write_message(AssetNotFound(AssetDataType::Image, self.1));
return;
};
let Some(v) = image_to_bin(image) else {
warn!("Error serializing image {}.", self.1);
return;
};
let mut channel = world.resource_mut::<Channel<AssetMessage>>();
debug!(
"Sending (host:{}) (to {:?}) image {} len {}...",
channel.is_host(),
self.0,
self.1,
v.len()
);
let msg = AssetMessage::Asset {
asset_uuid: self.1,
asset_type: AssetDataType::Image,
data: v,
};
if let Some(dst) = self.0 {
channel.send_to(dst, msg);
} else {
channel.broadcast(msg);
}
}
}
pub(crate) struct SendAudioCommand(pub(crate) Option<ClientId>, pub(crate) uuid::Uuid);
impl Command for SendAudioCommand {
fn apply(self, world: &mut World) {
if world.get_resource::<Channel<AssetMessage>>().is_none() {
debug!("No network, audio {} not sent.", self.1);
return;
}
let audios = world.resource::<Assets<AudioSource>>();
let Some(audio) = audios.get(self.1) else {
warn!("Audio not found in assets: {}", self.1);
world.write_message(AssetNotFound(AssetDataType::Audio, self.1));
return;
};
let v = audio.as_ref().to_vec();
let mut channel = world.resource_mut::<Channel<AssetMessage>>();
debug!(
"Sending (host:{}) (to {:?}) audio {} len {}...",
channel.is_host(),
self.0,
self.1,
v.len()
);
let msg = AssetMessage::Asset {
asset_uuid: self.1,
asset_type: AssetDataType::Audio,
data: v,
};
if let Some(dst) = self.0 {
channel.send_to(dst, msg);
} else {
channel.broadcast(msg);
}
}
}
#[allow(clippy::needless_pass_by_value)]
fn receive_message(
mut cmd: Commands,
mut events: MessageReader<MessageReceivedEvent<AssetMessage>>,
mut meshes: ResMut<Assets<Mesh>>,
mut images: ResMut<Assets<Image>>,
mut audios: ResMut<Assets<AudioSource>>,
mut track: ResMut<SyncTrackerRes>,
channel: Res<Channel<AssetMessage>>,
) {
for event in events.read() {
match event.message.as_ref() {
AssetMessage::RequestInitialAssets(client_id) => {
if !channel.is_host() {
warn!("A client was requested to provide initial assets, ignoring.");
continue;
}
let id = *client_id;
cmd.queue(move |world: &mut World| {
send_initial_asset_sync(ClientId(id), world);
});
}
AssetMessage::RequestAsset(client, asset_type, asset_uuid) => match asset_type {
AssetDataType::Mesh => cmd.queue(SendMeshCommand(
Some(ClientId(*client)),
*asset_uuid,
)),
AssetDataType::Image => cmd.queue(SendImageCommand(
Some(ClientId(*client)),
*asset_uuid,
)),
AssetDataType::Audio => cmd.queue(SendAudioCommand(
Some(ClientId(*client)),
*asset_uuid,
)),
},
AssetMessage::Asset {
asset_uuid,
asset_type,
data,
} => {
if data.len() > 1000 {
trace!("Received asset type {:?} of len {}", asset_type, data.len());
} else {
trace!("Received asset\n{:?}", event.message);
}
match asset_type {
AssetDataType::Mesh => {
debug!("Received mesh {} len {}", asset_uuid, data.len());
let uuid: AssetId<Mesh> = AssetId::Uuid { uuid: *asset_uuid };
if let Some(old_asset) = meshes.get(uuid)
&& let Some(old_bin) = mesh_to_bin(old_asset)
&& *data == old_bin
{
debug!("Not writing mesh {} as it is the same", asset_uuid);
continue;
}
let Some(mesh) = bin_to_mesh(data) else {
warn!("Error converting mesh {}", asset_uuid);
continue;
};
debug!("Applying received mesh {}", asset_uuid);
track.pushed_handles_from_network.insert(*asset_uuid);
let _ = meshes.insert(uuid, mesh);
}
AssetDataType::Image => {
debug!("Received image {} len {}", asset_uuid, data.len());
let uuid: AssetId<Image> = AssetId::Uuid { uuid: *asset_uuid };
if let Some(old_asset) = images.get(uuid)
&& let Some(old_bin) = image_to_bin(old_asset)
&& *data == old_bin
{
debug!("Not writing image {} as it is the same", asset_uuid);
continue;
}
let Some(image) = bin_to_image(data) else {
warn!("Error converting image {}", asset_uuid);
continue;
};
debug!("Applying received image {}", asset_uuid);
track.pushed_handles_from_network.insert(*asset_uuid);
let _ = images.insert(uuid, image);
}
AssetDataType::Audio => {
debug!("Received audio {} len {}", asset_uuid, data.len());
let uuid: AssetId<AudioSource> = AssetId::Uuid { uuid: *asset_uuid };
if let Some(old_asset) = audios.get(uuid)
&& data.as_slice() == old_asset.bytes.as_ref()
{
debug!("Not writing audio {} as it is the same", asset_uuid);
continue;
}
debug!("Applying received audio {}", asset_uuid);
track.pushed_handles_from_network.insert(*asset_uuid);
let _ = audios.insert(
uuid,
AudioSource {
bytes: data.clone().into(),
},
);
}
}
}
}
}
}
fn send_initial_asset_sync(client_id: ClientId, world: &mut World) {
info!("Sending initial assets to client {}", client_id);
let meshs = send_meshes(client_id, world);
let images = send_images(client_id, world);
let audios = send_audios(client_id, world);
for m in meshs {
m.apply(world);
}
for i in images {
i.apply(world);
}
for a in audios {
a.apply(world);
}
}
fn send_meshes(client_id: ClientId, world: &mut World) -> Vec<SendMeshCommand> {
let track = world.resource_mut::<SyncTrackerRes>();
if !track.sync_meshes {
return vec![];
}
let mut meshes_to_add = Vec::<uuid::Uuid>::new();
let meshes = world.resource::<Assets<Mesh>>();
for (id, _) in meshes.iter() {
let AssetId::Uuid { uuid: id } = id else {
continue;
};
meshes_to_add.push(id);
}
debug!("Found meshes to send for initial sync: {:?}", meshes_to_add);
let mut cmds = vec![];
for id in &meshes_to_add {
cmds.push(SendMeshCommand(Some(client_id), *id));
}
cmds
}
fn send_images(client_id: ClientId, world: &mut World) -> Vec<SendImageCommand> {
let track = world.resource_mut::<SyncTrackerRes>();
if !track.sync_materials {
return vec![];
}
let mut images_to_add = Vec::<uuid::Uuid>::new();
let images = world.resource::<Assets<Image>>();
for (id, _) in images.iter() {
let AssetId::Uuid { uuid: id } = id else {
continue;
};
images_to_add.push(id);
}
debug!("Found images to send for initial sync: {:?}", images_to_add);
let mut cmds = vec![];
for id in &images_to_add {
cmds.push(SendImageCommand(Some(client_id), *id));
}
cmds
}
fn send_audios(client_id: ClientId, world: &mut World) -> Vec<SendAudioCommand> {
let track = world.resource_mut::<SyncTrackerRes>();
if !track.sync_audios {
return vec![];
}
let mut audios_to_add = Vec::<uuid::Uuid>::new();
let audios = world.resource::<Assets<AudioSource>>();
for (id, _) in audios.iter() {
let AssetId::Uuid { uuid: id } = id else {
continue;
};
audios_to_add.push(id);
}
debug!("Found audios to send for initial sync: {:?}", audios_to_add);
let mut cmds = vec![];
for id in &audios_to_add {
cmds.push(SendAudioCommand(Some(client_id), *id));
}
cmds
}