use std::{
any::TypeId,
collections::{HashMap, HashSet},
ops::RangeBounds,
sync::{Mutex, OnceLock},
};
use bevy_ecs::{
prelude::{Commands, Entity, EntityRef, EntityWorldMut, Mut, World},
system::SystemState,
};
use serde::{Serialize, de::DeserializeOwned};
pub use serde_json::Value as JsonMessage;
use smallvec::SmallVec;
use crate::{
Accessing, Accessor, AnyBuffer, AnyBufferAccessInterface, AnyBufferKey, AnyRange, AsAnyBuffer,
Buffer, BufferAccessMut, BufferAccessors, BufferError, BufferIdentifier, BufferKey,
BufferKeyBuilder, BufferKeyLifecycle, BufferKeyTag, BufferLocation, BufferMap, BufferMapLayout,
BufferMapStruct, BufferStorage, Bufferable, Buffering, Builder, CloneFromBuffer, DrainBuffer,
Gate, GateState, IncompatibleLayout, InspectBuffer, JoinBehavior, Joined, Joining,
ManageBuffer, MessageTypeHint, MessageTypeHintEvaluation, MessageTypeHintMap,
NotifyBufferUpdate, OperationError, OperationResult, OrBroken, add_listener_to_source,
};
#[derive(Clone, Copy, Debug)]
pub struct JsonBuffer {
location: BufferLocation,
join_behavior: JoinBehavior,
interface: &'static (dyn JsonBufferAccessInterface + Send + Sync),
}
impl JsonBuffer {
pub fn downcast_for_message<T: 'static>(&self) -> Option<Buffer<T>> {
if TypeId::of::<T>() == self.interface.any_access_interface().message_type_id() {
Some(Buffer {
location: self.location,
_ignore: Default::default(),
})
} else {
None
}
}
pub fn downcast_buffer<BufferType: 'static>(&self) -> Option<BufferType> {
self.as_any_buffer().downcast_buffer::<BufferType>()
}
#[must_use]
pub fn join_by_cloning(self) -> Self {
Self {
join_behavior: JoinBehavior::Clone,
..self
}
}
#[must_use]
pub fn join_by_pulling(self) -> Self {
Self {
join_behavior: JoinBehavior::Pull,
..self
}
}
#[must_use]
pub fn join_behavior(&self) -> JoinBehavior {
self.join_behavior
}
pub fn register_for<T>()
where
T: 'static + Serialize + DeserializeOwned + Send + Sync,
{
JsonBufferAccessImpl::<T>::get_interface();
}
pub fn id(&self) -> Entity {
self.location.source
}
pub fn scope(&self) -> Entity {
self.location.scope
}
pub fn location(&self) -> BufferLocation {
self.location
}
}
impl<T: 'static + Send + Sync + Serialize + DeserializeOwned> From<Buffer<T>> for JsonBuffer {
fn from(value: Buffer<T>) -> Self {
Self {
location: value.location,
join_behavior: JoinBehavior::Pull,
interface: JsonBufferAccessImpl::<T>::get_interface(),
}
}
}
impl<T: 'static + Send + Sync + Serialize + DeserializeOwned + Clone> From<CloneFromBuffer<T>>
for JsonBuffer
{
fn from(value: CloneFromBuffer<T>) -> Self {
Self {
location: value.location,
join_behavior: JoinBehavior::Clone,
interface: JsonBufferAccessImpl::<T>::get_interface(),
}
}
}
impl From<JsonBuffer> for AnyBuffer {
fn from(value: JsonBuffer) -> Self {
Self {
location: value.location,
join_behavior: value.join_behavior,
interface: value.interface.any_access_interface(),
}
}
}
impl AsAnyBuffer for JsonBuffer {
fn as_any_buffer(&self) -> AnyBuffer {
(*self).into()
}
fn message_type_hint() -> MessageTypeHint {
MessageTypeHint::fallback::<JsonMessage>()
}
}
#[derive(Clone)]
pub struct JsonBufferKey {
tag: BufferKeyTag,
interface: &'static (dyn JsonBufferAccessInterface + Send + Sync),
}
impl JsonBufferKey {
pub fn downcast_for_message<T: 'static>(self) -> Option<BufferKey<T>> {
self.as_any_buffer_key().downcast_for_message()
}
pub fn downcast_buffer_key<KeyType: 'static>(self) -> Option<KeyType> {
self.as_any_buffer_key().downcast_buffer_key()
}
pub fn as_any_buffer_key(self) -> AnyBufferKey {
self.into()
}
}
impl BufferKeyLifecycle for JsonBufferKey {
type TargetBuffer = JsonBuffer;
fn create_key(buffer: &Self::TargetBuffer, builder: &BufferKeyBuilder) -> Self {
Self {
tag: builder.make_tag(buffer.id()),
interface: buffer.interface,
}
}
fn is_in_use(&self) -> bool {
self.tag.is_in_use()
}
fn deep_clone(&self) -> Self {
Self {
tag: self.tag.deep_clone(),
interface: self.interface,
}
}
}
impl std::fmt::Debug for JsonBufferKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("JsonBufferKey")
.field(
"message_type_name",
&self.interface.any_access_interface().message_type_name(),
)
.field("tag", &self.tag)
.finish()
}
}
impl<T: 'static + Send + Sync + Serialize + DeserializeOwned> From<BufferKey<T>> for JsonBufferKey {
fn from(value: BufferKey<T>) -> Self {
let interface = JsonBufferAccessImpl::<T>::get_interface();
JsonBufferKey {
tag: value.tag,
interface,
}
}
}
impl From<JsonBufferKey> for AnyBufferKey {
fn from(value: JsonBufferKey) -> Self {
AnyBufferKey {
tag: value.tag,
interface: value.interface.any_access_interface(),
}
}
}
pub struct JsonBufferView<'a> {
storage: Box<dyn JsonBufferViewing + 'a>,
gate: &'a GateState,
session: Entity,
}
impl<'a> JsonBufferView<'a> {
pub fn oldest(&self) -> JsonMessageViewResult {
self.storage.json_oldest(self.session)
}
pub fn newest(&self) -> JsonMessageViewResult {
self.storage.json_newest(self.session)
}
pub fn get(&self, index: usize) -> JsonMessageViewResult {
self.storage.json_get(self.session, index)
}
pub fn len(&self) -> usize {
self.storage.json_count(self.session)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn iter(&self) -> IterJsonBufferView<'a, '_> {
IterJsonBufferView {
index: 0,
view: self,
}
}
pub fn gate(&self) -> Gate {
self.gate
.map
.get(&self.session)
.copied()
.unwrap_or(Gate::Open)
}
}
pub struct IterJsonBufferView<'a, 'b> {
index: usize,
view: &'b JsonBufferView<'a>,
}
impl<'a, 'b> Iterator for IterJsonBufferView<'a, 'b> {
type Item = Result<JsonMessage, serde_json::Error>;
fn next(&mut self) -> Option<Self::Item> {
let next = self.index;
self.index += 1;
self.view.get(next).transpose()
}
}
pub struct JsonBufferMut<'w, 's, 'a> {
storage: Box<dyn JsonBufferManagement + 'a>,
buffer: Entity,
session: Entity,
accessor: Option<Entity>,
commands: &'a mut Commands<'w, 's>,
modified: bool,
}
impl<'w, 's, 'a> JsonBufferMut<'w, 's, 'a> {
pub fn allow_closed_loops(mut self) -> Self {
self.accessor = None;
self
}
pub fn oldest(&self) -> JsonMessageViewResult {
self.storage.json_oldest(self.session)
}
pub fn newest(&self) -> JsonMessageViewResult {
self.storage.json_newest(self.session)
}
pub fn get(&self, index: usize) -> JsonMessageViewResult {
self.storage.json_get(self.session, index)
}
pub fn len(&self) -> usize {
self.storage.json_count(self.session)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn oldest_mut(&mut self) -> Option<JsonMut<'_>> {
self.storage
.json_oldest_mut(self.session, &mut self.modified)
}
pub fn newest_mut(&mut self) -> Option<JsonMut<'_>> {
self.storage
.json_newest_mut(self.session, &mut self.modified)
}
pub fn get_mut(&mut self, index: usize) -> Option<JsonMut<'_>> {
self.storage
.json_get_mut(self.session, index, &mut self.modified)
}
pub fn drain<R: RangeBounds<usize>>(&mut self, range: R) -> DrainJsonBuffer<'_> {
self.modified = true;
DrainJsonBuffer {
interface: self.storage.json_drain(self.session, AnyRange::new(range)),
}
}
pub fn pull(&mut self) -> JsonMessageViewResult {
self.modified = true;
self.storage.json_pull(self.session)
}
pub fn pull_as<T: DeserializeOwned>(&mut self) -> Result<Option<T>, serde_json::Error> {
self.pull()?.map(|m| serde_json::from_value(m)).transpose()
}
pub fn pull_newest(&mut self) -> JsonMessageViewResult {
self.modified = true;
self.storage.json_pull_newest(self.session)
}
pub fn pull_newest_as<T: DeserializeOwned>(&mut self) -> Result<Option<T>, serde_json::Error> {
self.pull_newest()?
.map(|m| serde_json::from_value(m))
.transpose()
}
pub fn push<T: 'static + Serialize>(
&mut self,
value: T,
) -> Result<Option<JsonMessage>, serde_json::Error> {
let message = serde_json::to_value(&value)?;
self.modified = true;
self.storage.json_push(self.session, message)
}
pub fn push_json(
&mut self,
message: JsonMessage,
) -> Result<Option<JsonMessage>, serde_json::Error> {
self.modified = true;
self.storage.json_push(self.session, message)
}
pub fn push_as_oldest<T: 'static + Serialize>(
&mut self,
value: T,
) -> Result<Option<JsonMessage>, serde_json::Error> {
let message = serde_json::to_value(&value)?;
self.modified = true;
self.storage.json_push_as_oldest(self.session, message)
}
pub fn push_json_as_oldest(
&mut self,
message: JsonMessage,
) -> Result<Option<JsonMessage>, serde_json::Error> {
self.modified = true;
self.storage.json_push_as_oldest(self.session, message)
}
pub fn pulse(&mut self) {
self.modified = true;
}
}
impl<'w, 's, 'a> Drop for JsonBufferMut<'w, 's, 'a> {
fn drop(&mut self) {
if self.modified {
self.commands.queue(NotifyBufferUpdate::new(
self.buffer,
self.session,
self.accessor,
));
}
}
}
pub trait JsonBufferWorldAccess {
fn json_buffer_view(&self, key: &JsonBufferKey) -> Result<JsonBufferView<'_>, BufferError>;
fn json_buffer_mut<U>(
&mut self,
key: &JsonBufferKey,
f: impl FnOnce(JsonBufferMut) -> U,
) -> Result<U, BufferError>;
}
impl JsonBufferWorldAccess for World {
fn json_buffer_view(&self, key: &JsonBufferKey) -> Result<JsonBufferView<'_>, BufferError> {
key.interface.create_json_buffer_view(key, self)
}
fn json_buffer_mut<U>(
&mut self,
key: &JsonBufferKey,
f: impl FnOnce(JsonBufferMut) -> U,
) -> Result<U, BufferError> {
let interface = key.interface;
let mut state = interface.create_json_buffer_access_mut_state(self);
let mut access = state.get_json_buffer_access_mut(self);
let buffer_mut = access.as_json_buffer_mut(key)?;
Ok(f(buffer_mut))
}
}
pub struct JsonMut<'a> {
interface: &'a mut dyn JsonMutInterface,
modified: &'a mut bool,
}
impl<'a> JsonMut<'a> {
pub fn serialize(&self) -> Result<JsonMessage, serde_json::Error> {
self.interface.serialize()
}
pub fn deserialize_into<T: DeserializeOwned>(&self) -> Result<T, serde_json::Error> {
serde_json::from_value::<T>(self.serialize()?)
}
#[must_use = "if you are going to discard the returned message, use insert instead"]
pub fn replace(&mut self, message: JsonMessage) -> JsonMessageReplaceResult {
*self.modified = true;
self.interface.replace(message)
}
pub fn insert(&mut self, message: JsonMessage) -> Result<(), serde_json::Error> {
*self.modified = true;
self.interface.insert(message)
}
pub fn modify(&mut self, f: impl FnOnce(&mut JsonMessage)) -> Result<(), serde_json::Error> {
let mut message = self.serialize()?;
f(&mut message);
self.insert(message)
}
}
pub type JsonMessageViewResult = Result<Option<JsonMessage>, serde_json::Error>;
pub type JsonMessagePushResult = Result<Option<JsonMessage>, serde_json::Error>;
pub type JsonMessageReplaceResult = Result<JsonMessage, serde_json::Error>;
trait JsonBufferViewing {
fn json_count(&self, session: Entity) -> usize;
fn json_oldest<'a>(&'a self, session: Entity) -> JsonMessageViewResult;
fn json_newest<'a>(&'a self, session: Entity) -> JsonMessageViewResult;
fn json_get<'a>(&'a self, session: Entity, index: usize) -> JsonMessageViewResult;
}
trait JsonBufferManagement: JsonBufferViewing {
fn json_push(&mut self, session: Entity, value: JsonMessage) -> JsonMessagePushResult;
fn json_push_as_oldest(&mut self, session: Entity, value: JsonMessage)
-> JsonMessagePushResult;
fn json_pull(&mut self, session: Entity) -> JsonMessageViewResult;
fn json_pull_newest(&mut self, session: Entity) -> JsonMessageViewResult;
fn json_oldest_mut<'a>(
&'a mut self,
session: Entity,
modified: &'a mut bool,
) -> Option<JsonMut<'a>>;
fn json_newest_mut<'a>(
&'a mut self,
session: Entity,
modified: &'a mut bool,
) -> Option<JsonMut<'a>>;
fn json_get_mut<'a>(
&'a mut self,
session: Entity,
index: usize,
modified: &'a mut bool,
) -> Option<JsonMut<'a>>;
fn json_drain<'a>(
&'a mut self,
session: Entity,
range: AnyRange,
) -> Box<dyn DrainJsonBufferInterface + 'a>;
}
impl<T> JsonBufferViewing for &'_ BufferStorage<T>
where
T: 'static + Send + Sync + Serialize + DeserializeOwned,
{
fn json_count(&self, session: Entity) -> usize {
self.count(session)
}
fn json_oldest<'a>(&'a self, session: Entity) -> JsonMessageViewResult {
self.oldest(session).map(serde_json::to_value).transpose()
}
fn json_newest<'a>(&'a self, session: Entity) -> JsonMessageViewResult {
self.newest(session).map(serde_json::to_value).transpose()
}
fn json_get<'a>(&'a self, session: Entity, index: usize) -> JsonMessageViewResult {
self.get(session, index)
.map(serde_json::to_value)
.transpose()
}
}
impl<T> JsonBufferViewing for Mut<'_, BufferStorage<T>>
where
T: 'static + Send + Sync + Serialize + DeserializeOwned,
{
fn json_count(&self, session: Entity) -> usize {
self.count(session)
}
fn json_oldest<'a>(&'a self, session: Entity) -> JsonMessageViewResult {
self.oldest(session).map(serde_json::to_value).transpose()
}
fn json_newest<'a>(&'a self, session: Entity) -> JsonMessageViewResult {
self.newest(session).map(serde_json::to_value).transpose()
}
fn json_get<'a>(&'a self, session: Entity, index: usize) -> JsonMessageViewResult {
self.get(session, index)
.map(serde_json::to_value)
.transpose()
}
}
impl<T> JsonBufferManagement for Mut<'_, BufferStorage<T>>
where
T: 'static + Send + Sync + Serialize + DeserializeOwned,
{
fn json_push(&mut self, session: Entity, value: JsonMessage) -> JsonMessagePushResult {
let value: T = serde_json::from_value(value)?;
self.push(session, value)
.map(serde_json::to_value)
.transpose()
}
fn json_push_as_oldest(
&mut self,
session: Entity,
value: JsonMessage,
) -> JsonMessagePushResult {
let value: T = serde_json::from_value(value)?;
self.push(session, value)
.map(serde_json::to_value)
.transpose()
}
fn json_pull(&mut self, session: Entity) -> JsonMessageViewResult {
self.pull(session).map(serde_json::to_value).transpose()
}
fn json_pull_newest(&mut self, session: Entity) -> JsonMessageViewResult {
self.pull_newest(session)
.map(serde_json::to_value)
.transpose()
}
fn json_oldest_mut<'a>(
&'a mut self,
session: Entity,
modified: &'a mut bool,
) -> Option<JsonMut<'a>> {
self.oldest_mut(session).map(|interface| JsonMut {
interface,
modified,
})
}
fn json_newest_mut<'a>(
&'a mut self,
session: Entity,
modified: &'a mut bool,
) -> Option<JsonMut<'a>> {
self.newest_mut(session).map(|interface| JsonMut {
interface,
modified,
})
}
fn json_get_mut<'a>(
&'a mut self,
session: Entity,
index: usize,
modified: &'a mut bool,
) -> Option<JsonMut<'a>> {
self.get_mut(session, index).map(|interface| JsonMut {
interface,
modified,
})
}
fn json_drain<'a>(
&'a mut self,
session: Entity,
range: AnyRange,
) -> Box<dyn DrainJsonBufferInterface + 'a> {
Box::new(self.drain(session, range))
}
}
trait JsonMutInterface {
fn serialize(&self) -> Result<JsonMessage, serde_json::Error>;
fn replace(&mut self, message: JsonMessage) -> JsonMessageReplaceResult;
fn insert(&mut self, message: JsonMessage) -> Result<(), serde_json::Error>;
}
impl<T: 'static + Send + Sync + Serialize + DeserializeOwned> JsonMutInterface for T {
fn serialize(&self) -> Result<JsonMessage, serde_json::Error> {
serde_json::to_value(self)
}
fn replace(&mut self, message: JsonMessage) -> JsonMessageReplaceResult {
let new_message: T = serde_json::from_value(message)?;
let old_message = serde_json::to_value(&self)?;
*self = new_message;
Ok(old_message)
}
fn insert(&mut self, message: JsonMessage) -> Result<(), serde_json::Error> {
let new_message: T = serde_json::from_value(message)?;
*self = new_message;
Ok(())
}
}
trait JsonBufferAccessInterface {
fn any_access_interface(&self) -> &'static (dyn AnyBufferAccessInterface + Send + Sync);
fn buffered_count(
&self,
buffer_ref: &EntityRef,
session: Entity,
) -> Result<usize, OperationError>;
fn ensure_session(&self, buffer_mut: &mut EntityWorldMut, session: Entity) -> OperationResult;
fn pull(
&self,
buffer_mut: &mut EntityWorldMut,
session: Entity,
) -> Result<JsonMessage, OperationError>;
fn clone_from_buffer(
&self,
buffer_ref: &EntityRef,
session: Entity,
) -> Result<JsonMessage, OperationError>;
fn create_json_buffer_view<'a>(
&self,
key: &JsonBufferKey,
world: &'a World,
) -> Result<JsonBufferView<'a>, BufferError>;
fn create_json_buffer_access_mut_state(
&self,
world: &mut World,
) -> Box<dyn JsonBufferAccessMutState>;
}
impl<'a> std::fmt::Debug for &'a (dyn JsonBufferAccessInterface + Send + Sync) {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Message Properties")
.field("type", &self.any_access_interface().message_type_name())
.finish()
}
}
struct JsonBufferAccessImpl<T>(std::marker::PhantomData<T>);
impl<T: 'static + Send + Sync + Serialize + DeserializeOwned> JsonBufferAccessImpl<T> {
pub(crate) fn get_interface() -> &'static (dyn JsonBufferAccessInterface + Send + Sync) {
static INTERFACE_MAP: OnceLock<
Mutex<HashMap<TypeId, &'static (dyn JsonBufferAccessInterface + Send + Sync)>>,
> = OnceLock::new();
let interfaces = INTERFACE_MAP.get_or_init(|| {
let mut interfaces = HashMap::new();
register_basic_types(&mut interfaces);
Mutex::new(interfaces)
});
let mut interfaces_mut = interfaces.lock().unwrap();
Self::get_or_register_type(&mut *interfaces_mut)
}
fn get_or_register_type(
interfaces: &mut HashMap<TypeId, &'static (dyn JsonBufferAccessInterface + Send + Sync)>,
) -> &'static (dyn JsonBufferAccessInterface + Send + Sync) {
*interfaces.entry(TypeId::of::<T>()).or_insert_with(|| {
let any_interface = AnyBuffer::interface_for::<T>();
any_interface.register_buffer_downcast(
TypeId::of::<JsonBuffer>(),
Box::new(|buffer: AnyBuffer| {
Ok(Box::new(JsonBuffer {
location: buffer.location,
join_behavior: buffer.join_behavior,
interface: Self::get_interface(),
}))
}),
);
any_interface.register_key_downcast(
TypeId::of::<JsonBufferKey>(),
Box::new(|tag| {
Box::new(JsonBufferKey {
tag,
interface: Self::get_interface(),
})
}),
);
Box::leak(Box::new(JsonBufferAccessImpl::<T>(Default::default())))
})
}
}
fn register_basic_types(
interfaces: &mut HashMap<TypeId, &'static (dyn JsonBufferAccessInterface + Send + Sync)>,
) {
JsonBufferAccessImpl::<JsonMessage>::get_or_register_type(interfaces);
JsonBufferAccessImpl::<String>::get_or_register_type(interfaces);
JsonBufferAccessImpl::<std::borrow::Cow<'static, str>>::get_or_register_type(interfaces);
JsonBufferAccessImpl::<u8>::get_or_register_type(interfaces);
JsonBufferAccessImpl::<u16>::get_or_register_type(interfaces);
JsonBufferAccessImpl::<u32>::get_or_register_type(interfaces);
JsonBufferAccessImpl::<u64>::get_or_register_type(interfaces);
JsonBufferAccessImpl::<usize>::get_or_register_type(interfaces);
JsonBufferAccessImpl::<i8>::get_or_register_type(interfaces);
JsonBufferAccessImpl::<i16>::get_or_register_type(interfaces);
JsonBufferAccessImpl::<i32>::get_or_register_type(interfaces);
JsonBufferAccessImpl::<i64>::get_or_register_type(interfaces);
JsonBufferAccessImpl::<isize>::get_or_register_type(interfaces);
JsonBufferAccessImpl::<f32>::get_or_register_type(interfaces);
JsonBufferAccessImpl::<f64>::get_or_register_type(interfaces);
JsonBufferAccessImpl::<bool>::get_or_register_type(interfaces);
JsonBufferAccessImpl::<char>::get_or_register_type(interfaces);
JsonBufferAccessImpl::<()>::get_or_register_type(interfaces);
}
impl<T: 'static + Send + Sync + Serialize + DeserializeOwned> JsonBufferAccessInterface
for JsonBufferAccessImpl<T>
{
fn any_access_interface(&self) -> &'static (dyn AnyBufferAccessInterface + Send + Sync) {
AnyBuffer::interface_for::<T>()
}
fn buffered_count(
&self,
buffer_ref: &EntityRef,
session: Entity,
) -> Result<usize, OperationError> {
buffer_ref.buffered_count::<T>(session)
}
fn ensure_session(&self, buffer_mut: &mut EntityWorldMut, session: Entity) -> OperationResult {
buffer_mut.ensure_session::<T>(session)
}
fn pull(
&self,
buffer_mut: &mut EntityWorldMut,
session: Entity,
) -> Result<JsonMessage, OperationError> {
let value = buffer_mut.pull_from_buffer::<T>(session)?;
serde_json::to_value(value).or_broken()
}
fn clone_from_buffer(
&self,
buffer_ref: &EntityRef,
session: Entity,
) -> Result<JsonMessage, OperationError> {
let value = buffer_ref
.get::<BufferStorage<T>>()
.or_broken()?
.newest(session)
.or_broken()?;
serde_json::to_value(value).or_broken()
}
fn create_json_buffer_view<'a>(
&self,
key: &JsonBufferKey,
world: &'a World,
) -> Result<JsonBufferView<'a>, BufferError> {
let buffer_ref = world
.get_entity(key.tag.buffer)
.map_err(|_| BufferError::BufferMissing)?;
let storage = buffer_ref
.get::<BufferStorage<T>>()
.ok_or(BufferError::BufferMissing)?;
let gate = buffer_ref
.get::<GateState>()
.ok_or(BufferError::BufferMissing)?;
Ok(JsonBufferView {
storage: Box::new(storage),
gate,
session: key.tag.session,
})
}
fn create_json_buffer_access_mut_state(
&self,
world: &mut World,
) -> Box<dyn JsonBufferAccessMutState> {
Box::new(SystemState::<BufferAccessMut<T>>::new(world))
}
}
trait JsonBufferAccessMutState {
fn get_json_buffer_access_mut<'s, 'w: 's>(
&'s mut self,
world: &'w mut World,
) -> Box<dyn JsonBufferAccessMut<'w, 's> + 's>;
}
impl<T> JsonBufferAccessMutState for SystemState<BufferAccessMut<'static, 'static, T>>
where
T: 'static + Send + Sync + Serialize + DeserializeOwned,
{
fn get_json_buffer_access_mut<'s, 'w: 's>(
&'s mut self,
world: &'w mut World,
) -> Box<dyn JsonBufferAccessMut<'w, 's> + 's> {
Box::new(self.get_mut(world))
}
}
trait JsonBufferAccessMut<'w, 's> {
fn as_json_buffer_mut<'a>(
&'a mut self,
key: &JsonBufferKey,
) -> Result<JsonBufferMut<'w, 's, 'a>, BufferError>;
}
impl<'w, 's, T> JsonBufferAccessMut<'w, 's> for BufferAccessMut<'w, 's, T>
where
T: 'static + Send + Sync + Serialize + DeserializeOwned,
{
fn as_json_buffer_mut<'a>(
&'a mut self,
key: &JsonBufferKey,
) -> Result<JsonBufferMut<'w, 's, 'a>, BufferError> {
let BufferAccessMut { query, commands } = self;
let storage = query
.get_mut(key.tag.buffer)
.map_err(|_| BufferError::BufferMissing)?;
Ok(JsonBufferMut {
storage: Box::new(storage),
buffer: key.tag.buffer,
session: key.tag.session,
accessor: Some(key.tag.accessor),
commands,
modified: false,
})
}
}
pub struct DrainJsonBuffer<'a> {
interface: Box<dyn DrainJsonBufferInterface + 'a>,
}
impl<'a> Iterator for DrainJsonBuffer<'a> {
type Item = Result<JsonMessage, serde_json::Error>;
fn next(&mut self) -> Option<Self::Item> {
self.interface.json_next()
}
}
trait DrainJsonBufferInterface {
fn json_next(&mut self) -> Option<Result<JsonMessage, serde_json::Error>>;
}
impl<T: 'static + Send + Sync + Serialize> DrainJsonBufferInterface for DrainBuffer<'_, T> {
fn json_next(&mut self) -> Option<Result<JsonMessage, serde_json::Error>> {
self.next().map(serde_json::to_value)
}
}
impl Bufferable for JsonBuffer {
type BufferType = Self;
fn into_buffer(self, builder: &mut Builder) -> Self::BufferType {
assert_eq!(self.scope(), builder.scope());
self
}
}
impl Buffering for JsonBuffer {
fn verify_scope(&self, scope: Entity) {
assert_eq!(scope, self.scope());
}
fn buffered_count(&self, session: Entity, world: &World) -> Result<usize, OperationError> {
let buffer_ref = world.get_entity(self.id()).or_broken()?;
self.interface.buffered_count(&buffer_ref, session)
}
fn buffered_count_for(
&self,
buffer: Entity,
session: Entity,
world: &World,
) -> Result<usize, OperationError> {
if buffer != self.id() {
return Ok(0);
}
self.buffered_count(session, world)
}
fn add_listener(&self, listener: Entity, world: &mut World) -> OperationResult {
add_listener_to_source(self.id(), listener, world)
}
fn gate_action(
&self,
session: Entity,
action: Gate,
world: &mut World,
roster: &mut crate::OperationRoster,
) -> OperationResult {
GateState::apply(self.id(), session, action, world, roster)
}
fn as_input(&self) -> smallvec::SmallVec<[Entity; 8]> {
SmallVec::from_iter([self.id()])
}
fn ensure_active_session(&self, session: Entity, world: &mut World) -> OperationResult {
let mut buffer_mut = world.get_entity_mut(self.id()).or_broken()?;
self.interface.ensure_session(&mut buffer_mut, session)
}
}
impl Joining for JsonBuffer {
type Item = JsonMessage;
fn fetch_for_join(
&self,
session: Entity,
world: &mut World,
) -> Result<Self::Item, OperationError> {
match self.join_behavior {
JoinBehavior::Pull => {
let mut buffer_mut = world.get_entity_mut(self.id()).or_broken()?;
self.interface.pull(&mut buffer_mut, session)
}
JoinBehavior::Clone => {
let buffer_ref = world.get_entity(self.id()).or_broken()?;
self.interface.clone_from_buffer(&buffer_ref, session)
}
}
}
}
impl Accessing for JsonBuffer {
type Key = JsonBufferKey;
fn add_accessor(&self, accessor: Entity, world: &mut World) -> OperationResult {
world
.get_mut::<BufferAccessors>(self.id())
.or_broken()?
.add_accessor(accessor);
Ok(())
}
fn create_key(&self, builder: &BufferKeyBuilder) -> Self::Key {
JsonBufferKey {
tag: builder.make_tag(self.id()),
interface: self.interface,
}
}
fn deep_clone_key(key: &Self::Key) -> Self::Key {
key.deep_clone()
}
fn is_key_in_use(key: &Self::Key) -> bool {
key.is_in_use()
}
}
impl Accessor for JsonBufferKey {
type Buffers = JsonBuffer;
}
impl BufferMapLayout for JsonBuffer {
fn try_from_buffer_map(buffers: &BufferMap) -> Result<Self, IncompatibleLayout> {
let mut compatibility = IncompatibleLayout::default();
if let Ok(downcast_buffer) =
compatibility.require_buffer_for_identifier::<JsonBuffer>(0, buffers)
{
return Ok(downcast_buffer);
}
Err(compatibility)
}
fn get_buffer_message_type_hints(
identifiers: HashSet<BufferIdentifier<'static>>,
) -> Result<super::MessageTypeHintMap, IncompatibleLayout> {
let mut evaluation = MessageTypeHintEvaluation::new(identifiers);
evaluation.fallback::<JsonMessage>(0);
evaluation.evaluate()
}
}
impl Joined for serde_json::Map<String, JsonMessage> {
type Buffers = HashMap<String, JsonBuffer>;
}
impl BufferMapLayout for HashMap<String, JsonBuffer> {
fn try_from_buffer_map(buffers: &BufferMap) -> Result<Self, IncompatibleLayout> {
let mut downcast_buffers = HashMap::new();
let mut compatibility = IncompatibleLayout::default();
for identifier in buffers.keys() {
match identifier {
BufferIdentifier::Name(name) => {
if let Ok(downcast) =
compatibility.require_buffer_for_borrowed_name::<JsonBuffer>(&name, buffers)
{
downcast_buffers.insert(name.clone().into_owned(), downcast);
}
}
BufferIdentifier::Index(index) => {
compatibility
.forbidden_buffers
.push(BufferIdentifier::Index(*index));
}
}
}
compatibility.as_result()?;
Ok(downcast_buffers)
}
fn get_buffer_message_type_hints(
identifiers: HashSet<BufferIdentifier<'static>>,
) -> Result<MessageTypeHintMap, IncompatibleLayout> {
let mut evaluation = MessageTypeHintEvaluation::new(identifiers);
while let Some(identifier) = evaluation.next_name_required() {
evaluation.fallback::<JsonMessage>(identifier);
}
evaluation.evaluate()
}
}
impl BufferMapStruct for HashMap<String, JsonBuffer> {
fn buffer_list(&self) -> SmallVec<[AnyBuffer; 8]> {
self.values().map(|b| b.as_any_buffer()).collect()
}
}
impl Joining for HashMap<String, JsonBuffer> {
type Item = serde_json::Map<String, JsonMessage>;
fn fetch_for_join(
&self,
session: Entity,
world: &mut World,
) -> Result<Self::Item, OperationError> {
self.iter()
.map(|(key, value)| {
value
.fetch_for_join(session, world)
.map(|v| (key.clone(), v))
})
.collect()
}
}
impl Joined for JsonMessage {
type Buffers = HashMap<BufferIdentifier<'static>, JsonBuffer>;
}
impl BufferMapLayout for HashMap<BufferIdentifier<'static>, JsonBuffer> {
fn try_from_buffer_map(buffers: &BufferMap) -> Result<Self, IncompatibleLayout> {
let mut downcast_buffers = HashMap::new();
let mut compatibility = IncompatibleLayout::default();
for identifier in buffers.keys() {
if let Ok(downcast) = compatibility
.require_buffer_for_identifier::<JsonBuffer>(identifier.clone(), buffers)
{
downcast_buffers.insert(identifier.clone(), downcast);
}
}
compatibility.as_result()?;
Ok(downcast_buffers)
}
fn get_buffer_message_type_hints(
identifiers: HashSet<BufferIdentifier<'static>>,
) -> Result<MessageTypeHintMap, IncompatibleLayout> {
let mut evaluation = MessageTypeHintEvaluation::new(identifiers);
while let Some(identifier) = evaluation.next_unevaluated() {
evaluation.fallback::<JsonMessage>(identifier);
}
evaluation.evaluate()
}
}
impl BufferMapStruct for HashMap<BufferIdentifier<'static>, JsonBuffer> {
fn buffer_list(&self) -> SmallVec<[AnyBuffer; 8]> {
self.values().map(|b| b.as_any_buffer()).collect()
}
}
impl Joining for HashMap<BufferIdentifier<'static>, JsonBuffer> {
type Item = JsonMessage;
fn fetch_for_join(
&self,
session: Entity,
world: &mut World,
) -> Result<Self::Item, OperationError> {
let mut object = serde_json::Map::<String, JsonMessage>::new();
let mut array = Vec::<JsonMessage>::new();
for (identifier, buffer) in self.iter() {
match identifier {
BufferIdentifier::Index(index) => {
if *index >= array.len() {
array.resize(*index + 1, JsonMessage::Null);
}
array[*index] = buffer.fetch_for_join(session, world)?;
}
BufferIdentifier::Name(name) => {
object.insert(
name.as_ref().to_owned(),
buffer.fetch_for_join(session, world)?,
);
}
}
}
let value = if !object.is_empty() && !array.is_empty() {
JsonMessage::Object(serde_json::Map::from_iter([
("array".to_owned(), JsonMessage::Array(array)),
("object".to_owned(), JsonMessage::Object(object)),
]))
} else if !object.is_empty() {
JsonMessage::Object(object)
} else if !array.is_empty() {
JsonMessage::Array(array)
} else {
JsonMessage::Null
};
Ok(value)
}
}
#[cfg(test)]
mod tests {
use crate::{AddBufferToMap, prelude::*, testing::*};
use bevy_ecs::prelude::World;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
struct TestMessage {
v_i32: i32,
v_u32: u32,
v_string: String,
}
impl TestMessage {
fn new() -> Self {
Self {
v_i32: 1,
v_u32: 2,
v_string: "hello".to_string(),
}
}
}
#[test]
fn test_json_count() {
let mut context = TestingContext::minimal_plugins();
let workflow = context.spawn_io_workflow(|scope, builder| {
let buffer = builder.create_buffer(BufferSettings::keep_all());
let push_multiple_times = builder
.commands()
.spawn_service(push_multiple_times_into_buffer.into_blocking_service());
let count = builder
.commands()
.spawn_service(get_buffer_count.into_blocking_service());
builder
.chain(scope.start)
.with_access(buffer)
.then(push_multiple_times)
.then(count)
.connect(scope.terminate);
});
let msg = TestMessage::new();
let r = context.resolve_request(msg, workflow);
assert_eq!(r, 5);
}
fn push_multiple_times_into_buffer(
In((value, key)): In<(TestMessage, BufferKey<TestMessage>)>,
mut access: BufferAccessMut<TestMessage>,
) -> JsonBufferKey {
let mut buffer = access.get_mut(&key).unwrap();
for _ in 0..5 {
buffer.push(value.clone());
}
key.into()
}
fn get_buffer_count(In(key): In<JsonBufferKey>, world: &mut World) -> usize {
world.json_buffer_view(&key).unwrap().len()
}
#[test]
fn test_modify_json_message() {
let mut context = TestingContext::minimal_plugins();
let workflow = context.spawn_io_workflow(|scope, builder| {
let buffer = builder.create_buffer(BufferSettings::keep_all());
let push_multiple_times = builder
.commands()
.spawn_service(push_multiple_times_into_buffer.into_blocking_service());
let modify_content = builder
.commands()
.spawn_service(modify_buffer_content.into_blocking_service());
let drain_content = builder
.commands()
.spawn_service(pull_each_buffer_item.into_blocking_service());
builder
.chain(scope.start)
.with_access(buffer)
.then(push_multiple_times)
.then(modify_content)
.then(drain_content)
.connect(scope.terminate);
});
let msg = TestMessage::new();
let values = context.resolve_request(msg, workflow);
assert_eq!(values.len(), 5);
for i in 0..values.len() {
let v_i32 = values[i].get("v_i32").unwrap().as_i64().unwrap();
assert_eq!(v_i32, i as i64);
}
}
fn modify_buffer_content(In(key): In<JsonBufferKey>, world: &mut World) -> JsonBufferKey {
world
.json_buffer_mut(&key, |mut access| {
for i in 0..access.len() {
access
.get_mut(i)
.unwrap()
.modify(|value| {
let v_i32 = value.get_mut("v_i32").unwrap();
let modified_v_i32 = i as i64 * v_i32.as_i64().unwrap();
*v_i32 = modified_v_i32.into();
})
.unwrap();
}
})
.unwrap();
key
}
fn pull_each_buffer_item(In(key): In<JsonBufferKey>, world: &mut World) -> Vec<JsonMessage> {
world
.json_buffer_mut(&key, |mut access| {
let mut values = Vec::new();
while let Ok(Some(value)) = access.pull() {
values.push(value);
}
values
})
.unwrap()
}
#[test]
fn test_drain_json_message() {
let mut context = TestingContext::minimal_plugins();
let workflow = context.spawn_io_workflow(|scope, builder| {
let buffer = builder.create_buffer(BufferSettings::keep_all());
let push_multiple_times = builder
.commands()
.spawn_service(push_multiple_times_into_buffer.into_blocking_service());
let modify_content = builder
.commands()
.spawn_service(modify_buffer_content.into_blocking_service());
let drain_content = builder
.commands()
.spawn_service(drain_buffer_contents.into_blocking_service());
builder
.chain(scope.start)
.with_access(buffer)
.then(push_multiple_times)
.then(modify_content)
.then(drain_content)
.connect(scope.terminate);
});
let msg = TestMessage::new();
let values = context.resolve_request(msg, workflow);
assert_eq!(values.len(), 5);
for i in 0..values.len() {
let v_i32 = values[i].get("v_i32").unwrap().as_i64().unwrap();
assert_eq!(v_i32, i as i64);
}
}
fn drain_buffer_contents(In(key): In<JsonBufferKey>, world: &mut World) -> Vec<JsonMessage> {
world
.json_buffer_mut(&key, |mut access| {
access.drain(..).collect::<Result<Vec<_>, _>>()
})
.unwrap()
.unwrap()
}
#[test]
fn double_json_messages() {
let mut context = TestingContext::minimal_plugins();
let workflow = context.spawn_io_workflow(|scope, builder| {
let buffer_double_u32: JsonBuffer = builder
.create_buffer::<TestMessage>(BufferSettings::default())
.into();
let buffer_double_i32: JsonBuffer = builder
.create_buffer::<TestMessage>(BufferSettings::default())
.into();
let buffer_double_string: JsonBuffer = builder
.create_buffer::<TestMessage>(BufferSettings::default())
.into();
builder.chain(scope.start).fork_clone((
|chain: Chain<_>| {
chain
.map_block(|mut msg: TestMessage| {
msg.v_u32 *= 2;
msg
})
.connect(
buffer_double_u32
.downcast_for_message::<TestMessage>()
.unwrap()
.input_slot(),
)
},
|chain: Chain<_>| {
chain
.map_block(|mut msg: TestMessage| {
msg.v_i32 *= 2;
msg
})
.connect(
buffer_double_i32
.downcast_for_message::<TestMessage>()
.unwrap()
.input_slot(),
)
},
|chain: Chain<_>| {
chain
.map_block(|mut msg: TestMessage| {
msg.v_string = msg.v_string.clone() + &msg.v_string;
msg
})
.connect(
buffer_double_string
.downcast_for_message::<TestMessage>()
.unwrap()
.input_slot(),
)
},
));
(buffer_double_u32, buffer_double_i32, buffer_double_string)
.join(builder)
.connect(scope.terminate);
});
let msg = TestMessage::new();
let r = context.resolve_request(msg, workflow);
let (double_u32, double_i32, double_string) = r;
assert_eq!(4, double_u32.get("v_u32").unwrap().as_i64().unwrap());
assert_eq!(2, double_i32.get("v_i32").unwrap().as_i64().unwrap());
assert_eq!(
"hellohello",
double_string.get("v_string").unwrap().as_str().unwrap()
);
}
#[test]
fn test_buffer_downcast() {
let mut context = TestingContext::minimal_plugins();
let workflow = context.spawn_io_workflow(|scope, builder| {
JsonBuffer::register_for::<TestMessage>();
let buffer = builder.create_buffer::<TestMessage>(BufferSettings::keep_all());
let any_buffer: AnyBuffer = buffer.into();
let json_buffer: JsonBuffer = any_buffer.downcast_buffer().unwrap();
let _original_from_any: Buffer<TestMessage> =
any_buffer.downcast_for_message().unwrap();
let _original_from_json: Buffer<TestMessage> =
json_buffer.downcast_for_message().unwrap();
builder
.chain(scope.start)
.with_access(buffer)
.map_block(|(data, key)| {
let any_key: AnyBufferKey = key.clone().into();
let json_key: JsonBufferKey = any_key.clone().downcast_buffer_key().unwrap();
let _original_from_any: BufferKey<TestMessage> =
any_key.downcast_for_message().unwrap();
let _original_from_json: BufferKey<TestMessage> =
json_key.downcast_for_message().unwrap();
data
})
.connect(scope.terminate);
});
let r = context.resolve_request(1, workflow);
assert_eq!(r, 1);
}
#[derive(Clone, Joined)]
#[joined(buffers_struct_name = TestJoinedValueJsonBuffers)]
struct TestJoinedValueJson {
integer: i64,
float: f64,
#[joined(buffer = JsonBuffer)]
json: JsonMessage,
}
#[test]
fn test_try_join_json() {
let mut context = TestingContext::minimal_plugins();
let workflow = context.spawn_io_workflow(|scope, builder| {
JsonBuffer::register_for::<TestMessage>();
let buffer_i64 = builder.create_buffer(BufferSettings::default());
let buffer_f64 = builder.create_buffer(BufferSettings::default());
let buffer_json = builder.create_buffer(BufferSettings::default());
let mut buffers = BufferMap::default();
buffers.insert_buffer("integer", buffer_i64);
buffers.insert_buffer("float", buffer_f64);
buffers.insert_buffer("json", buffer_json);
builder.chain(scope.start).fork_unzip((
|chain: Chain<_>| chain.connect(buffer_i64.input_slot()),
|chain: Chain<_>| chain.connect(buffer_f64.input_slot()),
|chain: Chain<_>| chain.connect(buffer_json.input_slot()),
));
builder.try_join(&buffers).unwrap().connect(scope.terminate);
});
let value: TestJoinedValueJson =
context.resolve_request((5_i64, 3.14_f64, TestMessage::new()), workflow);
assert_eq!(value.integer, 5);
assert_eq!(value.float, 3.14);
let deserialized_json: TestMessage = serde_json::from_value(value.json).unwrap();
let expected_json = TestMessage::new();
assert_eq!(deserialized_json, expected_json);
}
#[test]
fn test_joined_value_json() {
let mut context = TestingContext::minimal_plugins();
let workflow = context.spawn_io_workflow(|scope, builder| {
JsonBuffer::register_for::<TestMessage>();
let json_buffer = builder.create_buffer::<TestMessage>(BufferSettings::default());
let buffers = TestJoinedValueJsonBuffers {
integer: builder.create_buffer(BufferSettings::default()).into(),
float: builder.create_buffer(BufferSettings::default()).into(),
json: json_buffer.into(),
};
builder.chain(scope.start).fork_unzip((
|chain: Chain<_>| chain.connect(buffers.integer.input_slot()),
|chain: Chain<_>| chain.connect(buffers.float.input_slot()),
|chain: Chain<_>| chain.connect(json_buffer.input_slot()),
));
builder.join(buffers).connect(scope.terminate);
});
let value = context.resolve_request((5_i64, 3.14_f64, TestMessage::new()), workflow);
assert_eq!(value.integer, 5);
assert_eq!(value.float, 3.14);
let deserialized_json: TestMessage = serde_json::from_value(value.json).unwrap();
let expected_json = TestMessage::new();
assert_eq!(deserialized_json, expected_json);
}
#[test]
fn test_select_buffers_json() {
let mut context = TestingContext::minimal_plugins();
let workflow = context.spawn_io_workflow(|scope, builder| {
let buffer_integer = builder.create_buffer::<i64>(BufferSettings::default());
let buffer_float = builder.create_buffer::<f64>(BufferSettings::default());
let buffer_json =
JsonBuffer::from(builder.create_buffer::<TestMessage>(BufferSettings::default()));
let buffers =
TestJoinedValueJson::select_buffers(buffer_integer, buffer_float, buffer_json);
builder.chain(scope.start).fork_unzip((
|chain: Chain<_>| chain.connect(buffers.integer.input_slot()),
|chain: Chain<_>| chain.connect(buffers.float.input_slot()),
|chain: Chain<_>| {
chain.connect(buffers.json.downcast_for_message().unwrap().input_slot())
},
));
builder.join(buffers).connect(scope.terminate);
});
let value = context.resolve_request((5_i64, 3.14_f64, TestMessage::new()), workflow);
assert_eq!(value.integer, 5);
assert_eq!(value.float, 3.14);
let deserialized_json: TestMessage = serde_json::from_value(value.json).unwrap();
let expected_json = TestMessage::new();
assert_eq!(deserialized_json, expected_json);
}
#[test]
fn test_join_json_buffer_vec() {
let mut context = TestingContext::minimal_plugins();
let workflow = context.spawn_io_workflow(|scope, builder| {
let buffer_u32 = builder.create_buffer::<u32>(BufferSettings::default());
let buffer_i32 = builder.create_buffer::<i32>(BufferSettings::default());
let buffer_string = builder.create_buffer::<String>(BufferSettings::default());
let buffer_msg = builder.create_buffer::<TestMessage>(BufferSettings::default());
let buffers: Vec<JsonBuffer> = vec![
buffer_i32.into(),
buffer_u32.into(),
buffer_string.into(),
buffer_msg.into(),
];
builder
.chain(scope.start)
.map_block(|msg: TestMessage| (msg.v_u32, msg.v_i32, msg.v_string.clone(), msg))
.fork_unzip((
|chain: Chain<_>| chain.connect(buffer_u32.input_slot()),
|chain: Chain<_>| chain.connect(buffer_i32.input_slot()),
|chain: Chain<_>| chain.connect(buffer_string.input_slot()),
|chain: Chain<_>| chain.connect(buffer_msg.input_slot()),
));
builder.join(buffers).connect(scope.terminate);
});
let values = context.resolve_request(TestMessage::new(), workflow);
assert_eq!(values.len(), 4);
assert_eq!(values[0], serde_json::Value::Number(1.into()));
assert_eq!(values[1], serde_json::Value::Number(2.into()));
assert_eq!(values[2], serde_json::Value::String("hello".to_string()));
assert_eq!(values[3], serde_json::to_value(TestMessage::new()).unwrap());
}
#[derive(Clone, Accessor)]
#[allow(unused)]
struct TestJsonKeyMap {
integer: BufferKey<i64>,
string: BufferKey<String>,
json: JsonBufferKey,
any: AnyBufferKey,
}
}