1use std::{
21 any::{Any, TypeId},
22 collections::{HashMap, HashSet, hash_map::Entry},
23 ops::RangeBounds,
24 sync::{Mutex, OnceLock},
25};
26
27use bevy_ecs::{
28 prelude::{Commands, Entity, EntityRef, EntityWorldMut, Mut, World},
29 system::SystemState,
30};
31
32use thiserror::Error as ThisError;
33
34use smallvec::SmallVec;
35
36use crate::{
37 Accessing, Accessor, Buffer, BufferAccessMut, BufferAccessors, BufferError, BufferIdentifier,
38 BufferKey, BufferKeyBuilder, BufferKeyLifecycle, BufferKeyTag, BufferLocation, BufferMap,
39 BufferMapLayout, BufferStorage, Bufferable, Buffering, Builder, CloneFromBuffer, DrainBuffer,
40 FetchFromBuffer, Gate, GateState, IncompatibleLayout, InspectBuffer, Joining, ManageBuffer,
41 MessageTypeHint, MessageTypeHintEvaluation, MessageTypeHintMap, NotifyBufferUpdate,
42 OperationError, OperationResult, OperationRoster, OrBroken, TypeInfo, add_listener_to_source,
43};
44
45#[derive(Clone, Copy)]
48pub struct AnyBuffer {
49 pub(crate) location: BufferLocation,
50 pub(crate) join_behavior: JoinBehavior,
51 pub(crate) interface: &'static (dyn AnyBufferAccessInterface + Send + Sync),
52}
53
54impl AnyBuffer {
55 pub fn join_by_pulling(mut self) -> AnyBuffer {
58 self.join_behavior = JoinBehavior::Pull;
59 self
60 }
61
62 pub fn join_by_cloning(mut self) -> Option<AnyBuffer> {
69 self.interface.clone_for_join_fn()?;
70 self.join_behavior = JoinBehavior::Clone;
71 Some(self)
72 }
73
74 pub fn id(&self) -> Entity {
76 self.location.source
77 }
78
79 pub fn scope(&self) -> Entity {
81 self.location.scope
82 }
83
84 pub fn message_type_id(&self) -> TypeId {
86 self.interface.message_type_id()
87 }
88
89 pub fn message_type_name(&self) -> &'static str {
91 self.interface.message_type_name()
92 }
93
94 pub fn message_type(&self) -> TypeInfo {
96 TypeInfo {
97 type_id: self.message_type_id(),
98 type_name: self.message_type_name(),
99 }
100 }
101
102 pub fn get_interface(&self) -> &'static (dyn AnyBufferAccessInterface + Send + Sync) {
104 self.interface
105 }
106
107 pub fn interface_for<T: 'static + Send + Sync>()
109 -> &'static (dyn AnyBufferAccessInterface + Send + Sync) {
110 static INTERFACE_MAP: OnceLock<
111 Mutex<HashMap<TypeId, &'static (dyn AnyBufferAccessInterface + Send + Sync)>>,
112 > = OnceLock::new();
113 let interfaces = INTERFACE_MAP.get_or_init(|| Mutex::default());
114
115 let mut interfaces_mut = interfaces.lock().unwrap();
118 *interfaces_mut
119 .entry(TypeId::of::<T>())
120 .or_insert_with(|| Box::leak(Box::new(AnyBufferAccessImpl::<T>::new())))
121 }
122}
123
124impl std::fmt::Debug for AnyBuffer {
125 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126 f.debug_struct("AnyBuffer")
127 .field("scope", &self.location.scope)
128 .field("source", &self.location.source)
129 .field("join_behavior", &self.join_behavior)
130 .field("message_type_name", &self.interface.message_type_name())
131 .finish()
132 }
133}
134
135impl AnyBuffer {
136 pub fn downcast_for_message<Message: 'static>(&self) -> Option<Buffer<Message>> {
140 if TypeId::of::<Message>() == self.interface.message_type_id() {
141 Some(Buffer {
142 location: self.location,
143 _ignore: Default::default(),
144 })
145 } else {
146 None
147 }
148 }
149
150 pub fn downcast_buffer<BufferType: 'static>(&self) -> Option<BufferType> {
153 self.interface.buffer_downcast(TypeId::of::<BufferType>())?(*self)
154 .ok()?
155 .downcast::<BufferType>()
156 .ok()
157 .map(|x| *x)
158 }
159}
160
161impl<T: 'static + Send + Sync> From<Buffer<T>> for AnyBuffer {
162 fn from(value: Buffer<T>) -> Self {
163 let interface = AnyBuffer::interface_for::<T>();
164 AnyBuffer {
165 location: value.location,
166 join_behavior: JoinBehavior::Pull,
167 interface,
168 }
169 }
170}
171
172impl<T: 'static + Send + Sync + Clone> From<CloneFromBuffer<T>> for AnyBuffer {
173 fn from(value: CloneFromBuffer<T>) -> Self {
174 let interface = AnyBuffer::interface_for::<T>();
175 AnyBuffer {
176 location: value.location,
177 join_behavior: JoinBehavior::Clone,
178 interface,
179 }
180 }
181}
182
183#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
187pub enum JoinBehavior {
188 #[default]
190 Pull,
191 Clone,
193}
194
195pub trait AsAnyBuffer {
198 fn as_any_buffer(&self) -> AnyBuffer;
200
201 fn message_type_hint() -> MessageTypeHint;
203}
204
205impl AsAnyBuffer for AnyBuffer {
206 fn as_any_buffer(&self) -> AnyBuffer {
207 *self
208 }
209
210 fn message_type_hint() -> MessageTypeHint {
211 MessageTypeHint::fallback::<AnyMessageBox>()
212 }
213}
214
215impl<T: 'static + Send + Sync> AsAnyBuffer for Buffer<T> {
216 fn as_any_buffer(&self) -> AnyBuffer {
217 (*self).into()
218 }
219
220 fn message_type_hint() -> MessageTypeHint {
221 MessageTypeHint::exact::<T>()
222 }
223}
224
225impl<T: 'static + Send + Sync + Clone> AsAnyBuffer for CloneFromBuffer<T> {
226 fn as_any_buffer(&self) -> AnyBuffer {
227 (*self).into()
228 }
229
230 fn message_type_hint() -> MessageTypeHint {
231 MessageTypeHint::exact::<T>()
232 }
233}
234
235#[derive(Clone)]
243pub struct AnyBufferKey {
244 pub(crate) tag: BufferKeyTag,
245 pub(crate) interface: &'static (dyn AnyBufferAccessInterface + Send + Sync),
246}
247
248impl AnyBufferKey {
249 pub fn downcast_for_message<Message: 'static>(self) -> Option<BufferKey<Message>> {
253 if TypeId::of::<Message>() == self.interface.message_type_id() {
254 Some(BufferKey {
255 tag: self.tag,
256 _ignore: Default::default(),
257 })
258 } else {
259 None
260 }
261 }
262
263 pub fn downcast_buffer_key<KeyType: 'static>(self) -> Option<KeyType> {
266 self.interface.key_downcast(TypeId::of::<KeyType>())?(self.tag)
267 .downcast::<KeyType>()
268 .ok()
269 .map(|x| *x)
270 }
271
272 pub fn id(&self) -> Entity {
274 self.tag.buffer
275 }
276
277 pub fn session(&self) -> Entity {
279 self.tag.session
280 }
281}
282
283impl BufferKeyLifecycle for AnyBufferKey {
284 type TargetBuffer = AnyBuffer;
285
286 fn create_key(buffer: &AnyBuffer, builder: &BufferKeyBuilder) -> Self {
287 AnyBufferKey {
288 tag: builder.make_tag(buffer.id()),
289 interface: buffer.interface,
290 }
291 }
292
293 fn is_in_use(&self) -> bool {
294 self.tag.is_in_use()
295 }
296
297 fn deep_clone(&self) -> Self {
298 Self {
299 tag: self.tag.deep_clone(),
300 interface: self.interface,
301 }
302 }
303}
304
305impl std::fmt::Debug for AnyBufferKey {
306 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
307 f.debug_struct("AnyBufferKey")
308 .field("message_type_name", &self.interface.message_type_name())
309 .field("tag", &self.tag)
310 .finish()
311 }
312}
313
314impl<T: 'static + Send + Sync + Any> From<BufferKey<T>> for AnyBufferKey {
315 fn from(value: BufferKey<T>) -> Self {
316 let interface = AnyBuffer::interface_for::<T>();
317 AnyBufferKey {
318 tag: value.tag,
319 interface,
320 }
321 }
322}
323
324impl Accessor for AnyBufferKey {
325 type Buffers = AnyBuffer;
326}
327
328impl BufferMapLayout for AnyBuffer {
329 fn try_from_buffer_map(buffers: &BufferMap) -> Result<Self, IncompatibleLayout> {
330 let mut compatibility = IncompatibleLayout::default();
331
332 if let Ok(any_buffer) = compatibility.require_buffer_for_identifier::<AnyBuffer>(0, buffers)
333 {
334 return Ok(any_buffer);
335 }
336
337 Err(compatibility)
338 }
339
340 fn get_buffer_message_type_hints(
341 identifiers: HashSet<BufferIdentifier<'static>>,
342 ) -> Result<MessageTypeHintMap, IncompatibleLayout> {
343 let mut evaluation = MessageTypeHintEvaluation::new(identifiers);
344 evaluation.fallback::<AnyMessageBox>(0);
345 evaluation.evaluate()
346 }
347}
348
349pub struct AnyBufferView<'a> {
353 storage: Box<dyn AnyBufferViewing + 'a>,
354 gate: &'a GateState,
355 session: Entity,
356}
357
358impl<'a> AnyBufferView<'a> {
359 pub fn oldest(&self) -> Option<AnyMessageRef<'_>> {
361 self.storage.any_oldest(self.session)
362 }
363
364 pub fn newest(&self) -> Option<AnyMessageRef<'_>> {
366 self.storage.any_newest(self.session)
367 }
368
369 pub fn get(&self, index: usize) -> Option<AnyMessageRef<'_>> {
372 self.storage.any_get(self.session, index)
373 }
374
375 pub fn len(&self) -> usize {
377 self.storage.any_count(self.session)
378 }
379
380 pub fn is_empty(&self) -> bool {
382 self.len() == 0
383 }
384
385 pub fn gate(&self) -> Gate {
387 self.gate
388 .map
389 .get(&self.session)
390 .copied()
391 .unwrap_or(Gate::Open)
392 }
393}
394
395pub struct AnyBufferMut<'w, 's, 'a> {
399 storage: Box<dyn AnyBufferManagement + 'a>,
400 buffer: Entity,
401 session: Entity,
402 accessor: Option<Entity>,
403 commands: &'a mut Commands<'w, 's>,
404 modified: bool,
405}
406
407impl<'w, 's, 'a> AnyBufferMut<'w, 's, 'a> {
408 pub fn allow_closed_loops(mut self) -> Self {
412 self.accessor = None;
413 self
414 }
415
416 pub fn oldest(&self) -> Option<AnyMessageRef<'_>> {
418 self.storage.any_oldest(self.session)
419 }
420
421 pub fn newest(&self) -> Option<AnyMessageRef<'_>> {
423 self.storage.any_newest(self.session)
424 }
425
426 pub fn get(&self, index: usize) -> Option<AnyMessageRef<'_>> {
429 self.storage.any_get(self.session, index)
430 }
431
432 pub fn len(&self) -> usize {
434 self.storage.any_count(self.session)
435 }
436
437 pub fn is_empty(&self) -> bool {
439 self.len() == 0
440 }
441
442 pub fn oldest_mut(&mut self) -> Option<AnyMessageMut<'_>> {
444 self.modified = true;
445 self.storage.any_oldest_mut(self.session)
446 }
447
448 pub fn newest_mut(&mut self) -> Option<AnyMessageMut<'_>> {
450 self.modified = true;
451 self.storage.any_newest_mut(self.session)
452 }
453
454 pub fn get_mut(&mut self, index: usize) -> Option<AnyMessageMut<'_>> {
457 self.modified = true;
458 self.storage.any_get_mut(self.session, index)
459 }
460
461 pub fn drain<R: RangeBounds<usize>>(&mut self, range: R) -> DrainAnyBuffer<'_> {
463 self.modified = true;
464 DrainAnyBuffer {
465 interface: self.storage.any_drain(self.session, AnyRange::new(range)),
466 }
467 }
468
469 pub fn pull(&mut self) -> Option<AnyMessageBox> {
471 self.modified = true;
472 self.storage.any_pull(self.session)
473 }
474
475 pub fn pull_newest(&mut self) -> Option<AnyMessageBox> {
478 self.modified = true;
479 self.storage.any_pull_newest(self.session)
480 }
481
482 pub fn push<T: 'static + Send + Sync + Any>(&mut self, value: T) -> Result<Option<T>, T> {
491 if TypeId::of::<T>() != self.storage.any_message_type() {
492 return Err(value);
493 }
494
495 self.modified = true;
496
497 let removed = self
500 .storage
501 .any_push(self.session, Box::new(value))
502 .unwrap()
503 .map(|value| *value.downcast::<T>().unwrap());
504
505 Ok(removed)
506 }
507
508 pub fn push_any(
518 &mut self,
519 value: AnyMessageBox,
520 ) -> Result<Option<AnyMessageBox>, AnyMessageError> {
521 self.storage.any_push(self.session, value)
522 }
523
524 pub fn push_as_oldest<T: 'static + Send + Sync + Any>(
529 &mut self,
530 value: T,
531 ) -> Result<Option<T>, T> {
532 if TypeId::of::<T>() != self.storage.any_message_type() {
533 return Err(value);
534 }
535
536 self.modified = true;
537
538 let removed = self
541 .storage
542 .any_push_as_oldest(self.session, Box::new(value))
543 .unwrap()
544 .map(|value| *value.downcast::<T>().unwrap());
545
546 Ok(removed)
547 }
548
549 pub fn push_any_as_oldest(
554 &mut self,
555 value: AnyMessageBox,
556 ) -> Result<Option<AnyMessageBox>, AnyMessageError> {
557 self.storage.any_push_as_oldest(self.session, value)
558 }
559
560 pub fn pulse(&mut self) {
564 self.modified = true;
565 }
566}
567
568impl<'w, 's, 'a> Drop for AnyBufferMut<'w, 's, 'a> {
569 fn drop(&mut self) {
570 if self.modified {
571 self.commands.queue(NotifyBufferUpdate::new(
572 self.buffer,
573 self.session,
574 self.accessor,
575 ));
576 }
577 }
578}
579
580pub trait AnyBufferWorldAccess {
583 fn any_buffer_view(&self, key: &AnyBufferKey) -> Result<AnyBufferView<'_>, BufferError>;
589
590 fn any_buffer_mut<U>(
595 &mut self,
596 key: &AnyBufferKey,
597 f: impl FnOnce(AnyBufferMut) -> U,
598 ) -> Result<U, BufferError>;
599}
600
601impl AnyBufferWorldAccess for World {
602 fn any_buffer_view(&self, key: &AnyBufferKey) -> Result<AnyBufferView<'_>, BufferError> {
603 key.interface.create_any_buffer_view(key, self)
604 }
605
606 fn any_buffer_mut<U>(
607 &mut self,
608 key: &AnyBufferKey,
609 f: impl FnOnce(AnyBufferMut) -> U,
610 ) -> Result<U, BufferError> {
611 let interface = key.interface;
612 let mut state = interface.create_any_buffer_access_mut_state(self);
613 let mut access = state.get_any_buffer_access_mut(self);
614 let buffer_mut = access.as_any_buffer_mut(key)?;
615 Ok(f(buffer_mut))
616 }
617}
618
619trait AnyBufferViewing {
620 fn any_count(&self, session: Entity) -> usize;
621 fn any_oldest<'a>(&'a self, session: Entity) -> Option<AnyMessageRef<'a>>;
622 fn any_newest<'a>(&'a self, session: Entity) -> Option<AnyMessageRef<'a>>;
623 fn any_get<'a>(&'a self, session: Entity, index: usize) -> Option<AnyMessageRef<'a>>;
624 fn any_message_type(&self) -> TypeId;
625}
626
627trait AnyBufferManagement: AnyBufferViewing {
628 fn any_push(&mut self, session: Entity, value: AnyMessageBox) -> AnyMessagePushResult;
629 fn any_push_as_oldest(&mut self, session: Entity, value: AnyMessageBox)
630 -> AnyMessagePushResult;
631 fn any_pull(&mut self, session: Entity) -> Option<AnyMessageBox>;
632 fn any_pull_newest(&mut self, session: Entity) -> Option<AnyMessageBox>;
633 fn any_oldest_mut<'a>(&'a mut self, session: Entity) -> Option<AnyMessageMut<'a>>;
634 fn any_newest_mut<'a>(&'a mut self, session: Entity) -> Option<AnyMessageMut<'a>>;
635 fn any_get_mut<'a>(&'a mut self, session: Entity, index: usize) -> Option<AnyMessageMut<'a>>;
636 fn any_drain<'a>(
637 &'a mut self,
638 session: Entity,
639 range: AnyRange,
640 ) -> Box<dyn DrainAnyBufferInterface + 'a>;
641}
642
643pub(crate) struct AnyRange {
644 start_bound: std::ops::Bound<usize>,
645 end_bound: std::ops::Bound<usize>,
646}
647
648impl AnyRange {
649 pub(crate) fn new<T: std::ops::RangeBounds<usize>>(range: T) -> Self {
650 AnyRange {
651 start_bound: deref_bound(range.start_bound()),
652 end_bound: deref_bound(range.end_bound()),
653 }
654 }
655}
656
657fn deref_bound(bound: std::ops::Bound<&usize>) -> std::ops::Bound<usize> {
658 match bound {
659 std::ops::Bound::Included(v) => std::ops::Bound::Included(*v),
660 std::ops::Bound::Excluded(v) => std::ops::Bound::Excluded(*v),
661 std::ops::Bound::Unbounded => std::ops::Bound::Unbounded,
662 }
663}
664
665impl std::ops::RangeBounds<usize> for AnyRange {
666 fn start_bound(&self) -> std::ops::Bound<&usize> {
667 self.start_bound.as_ref()
668 }
669
670 fn end_bound(&self) -> std::ops::Bound<&usize> {
671 self.end_bound.as_ref()
672 }
673
674 fn contains<U>(&self, item: &U) -> bool
675 where
676 usize: PartialOrd<U>,
677 U: ?Sized + PartialOrd<usize>,
678 {
679 match self.start_bound {
680 std::ops::Bound::Excluded(lower) => {
681 if *item <= lower {
682 return false;
683 }
684 }
685 std::ops::Bound::Included(lower) => {
686 if *item < lower {
687 return false;
688 }
689 }
690 _ => {}
691 }
692
693 match self.end_bound {
694 std::ops::Bound::Excluded(upper) => {
695 if upper <= *item {
696 return false;
697 }
698 }
699 std::ops::Bound::Included(upper) => {
700 if upper < *item {
701 return false;
702 }
703 }
704 _ => {}
705 }
706
707 return true;
708 }
709}
710
711pub type AnyMessageRef<'a> = &'a (dyn Any + 'static + Send + Sync);
712
713impl<T: 'static + Send + Sync + Any> AnyBufferViewing for &'_ BufferStorage<T> {
714 fn any_count(&self, session: Entity) -> usize {
715 self.count(session)
716 }
717
718 fn any_oldest<'a>(&'a self, session: Entity) -> Option<AnyMessageRef<'a>> {
719 self.oldest(session).map(to_any_ref)
720 }
721
722 fn any_newest<'a>(&'a self, session: Entity) -> Option<AnyMessageRef<'a>> {
723 self.newest(session).map(to_any_ref)
724 }
725
726 fn any_get<'a>(&'a self, session: Entity, index: usize) -> Option<AnyMessageRef<'a>> {
727 self.get(session, index).map(to_any_ref)
728 }
729
730 fn any_message_type(&self) -> TypeId {
731 TypeId::of::<T>()
732 }
733}
734
735impl<T: 'static + Send + Sync + Any> AnyBufferViewing for Mut<'_, BufferStorage<T>> {
736 fn any_count(&self, session: Entity) -> usize {
737 self.count(session)
738 }
739
740 fn any_oldest<'a>(&'a self, session: Entity) -> Option<AnyMessageRef<'a>> {
741 self.oldest(session).map(to_any_ref)
742 }
743
744 fn any_newest<'a>(&'a self, session: Entity) -> Option<AnyMessageRef<'a>> {
745 self.newest(session).map(to_any_ref)
746 }
747
748 fn any_get<'a>(&'a self, session: Entity, index: usize) -> Option<AnyMessageRef<'a>> {
749 self.get(session, index).map(to_any_ref)
750 }
751
752 fn any_message_type(&self) -> TypeId {
753 TypeId::of::<T>()
754 }
755}
756
757pub type AnyMessageMut<'a> = &'a mut (dyn Any + 'static + Send + Sync);
758
759pub type AnyMessageBox = Box<dyn Any + 'static + Send + Sync>;
760
761#[derive(ThisError, Debug)]
762#[error("failed to convert a message")]
763pub struct AnyMessageError {
764 pub value: AnyMessageBox,
766 pub type_id: TypeId,
768 pub type_name: &'static str,
770}
771
772pub type AnyMessagePushResult = Result<Option<AnyMessageBox>, AnyMessageError>;
773
774impl<T: 'static + Send + Sync + Any> AnyBufferManagement for Mut<'_, BufferStorage<T>> {
775 fn any_push(&mut self, session: Entity, value: AnyMessageBox) -> AnyMessagePushResult {
776 let value = from_any_message::<T>(value)?;
777 Ok(self.push(session, value).map(to_any_message))
778 }
779
780 fn any_push_as_oldest(
781 &mut self,
782 session: Entity,
783 value: AnyMessageBox,
784 ) -> AnyMessagePushResult {
785 let value = from_any_message::<T>(value)?;
786 Ok(self.push_as_oldest(session, value).map(to_any_message))
787 }
788
789 fn any_pull(&mut self, session: Entity) -> Option<AnyMessageBox> {
790 self.pull(session).map(to_any_message)
791 }
792
793 fn any_pull_newest(&mut self, session: Entity) -> Option<AnyMessageBox> {
794 self.pull_newest(session).map(to_any_message)
795 }
796
797 fn any_oldest_mut<'a>(&'a mut self, session: Entity) -> Option<AnyMessageMut<'a>> {
798 self.oldest_mut(session).map(to_any_mut)
799 }
800
801 fn any_newest_mut<'a>(&'a mut self, session: Entity) -> Option<AnyMessageMut<'a>> {
802 self.newest_mut(session).map(to_any_mut)
803 }
804
805 fn any_get_mut<'a>(&'a mut self, session: Entity, index: usize) -> Option<AnyMessageMut<'a>> {
806 self.get_mut(session, index).map(to_any_mut)
807 }
808
809 fn any_drain<'a>(
810 &'a mut self,
811 session: Entity,
812 range: AnyRange,
813 ) -> Box<dyn DrainAnyBufferInterface + 'a> {
814 Box::new(self.drain(session, range))
815 }
816}
817
818fn to_any_ref<'a, T: 'static + Send + Sync + Any>(x: &'a T) -> AnyMessageRef<'a> {
819 x
820}
821
822fn to_any_mut<'a, T: 'static + Send + Sync + Any>(x: &'a mut T) -> AnyMessageMut<'a> {
823 x
824}
825
826pub(crate) fn to_any_message<T: 'static + Send + Sync + Any>(x: T) -> AnyMessageBox {
827 Box::new(x)
828}
829
830fn from_any_message<T: 'static + Send + Sync + Any>(
831 value: AnyMessageBox,
832) -> Result<T, AnyMessageError>
833where
834 T: 'static,
835{
836 let value = value.downcast::<T>().map_err(|value| AnyMessageError {
837 value,
838 type_id: TypeId::of::<T>(),
839 type_name: std::any::type_name::<T>(),
840 })?;
841
842 Ok(*value)
843}
844
845pub trait AnyBufferAccessMutState {
846 fn get_any_buffer_access_mut<'s, 'w: 's>(
847 &'s mut self,
848 world: &'w mut World,
849 ) -> Box<dyn AnyBufferAccessMut<'w, 's> + 's>;
850}
851
852impl<T: 'static + Send + Sync + Any> AnyBufferAccessMutState
853 for SystemState<BufferAccessMut<'static, 'static, T>>
854{
855 fn get_any_buffer_access_mut<'s, 'w: 's>(
856 &'s mut self,
857 world: &'w mut World,
858 ) -> Box<dyn AnyBufferAccessMut<'w, 's> + 's> {
859 Box::new(self.get_mut(world))
860 }
861}
862
863pub trait AnyBufferAccessMut<'w, 's> {
864 fn as_any_buffer_mut<'a>(
865 &'a mut self,
866 key: &AnyBufferKey,
867 ) -> Result<AnyBufferMut<'w, 's, 'a>, BufferError>;
868}
869
870impl<'w, 's, T: 'static + Send + Sync + Any> AnyBufferAccessMut<'w, 's>
871 for BufferAccessMut<'w, 's, T>
872{
873 fn as_any_buffer_mut<'a>(
874 &'a mut self,
875 key: &AnyBufferKey,
876 ) -> Result<AnyBufferMut<'w, 's, 'a>, BufferError> {
877 let BufferAccessMut { query, commands } = self;
878 let storage = query
879 .get_mut(key.tag.buffer)
880 .map_err(|_| BufferError::BufferMissing)?;
881
882 Ok(AnyBufferMut {
883 storage: Box::new(storage),
884 buffer: key.tag.buffer,
885 session: key.tag.session,
886 accessor: Some(key.tag.accessor),
887 commands,
888 modified: false,
889 })
890 }
891}
892
893pub trait AnyBufferAccessInterface {
894 fn message_type_id(&self) -> TypeId;
895
896 fn message_type_name(&self) -> &'static str;
897
898 fn buffered_count(&self, entity: &EntityRef, session: Entity) -> Result<usize, OperationError>;
899
900 fn ensure_session(&self, entity_mut: &mut EntityWorldMut, session: Entity) -> OperationResult;
901
902 fn register_buffer_downcast(&self, buffer_type: TypeId, f: BufferDowncastBox);
903
904 fn register_cloning(
906 &self,
907 clone_for_any_join: CloneForAnyFn,
908 clone_for_join_fn: &'static (dyn Any + Send + Sync),
909 );
910
911 fn buffer_downcast(&self, buffer_type: TypeId) -> Option<BufferDowncastRef>;
912
913 fn register_key_downcast(&self, key_type: TypeId, f: KeyDowncastBox);
914
915 fn key_downcast(&self, key_type: TypeId) -> Option<KeyDowncastRef>;
916
917 fn pull(
918 &self,
919 entity_mut: &mut EntityWorldMut,
920 session: Entity,
921 ) -> Result<AnyMessageBox, OperationError>;
922
923 fn clone_from_buffer(
924 &self,
925 entity_reft: &EntityRef,
926 session: Entity,
927 ) -> Result<AnyMessageBox, OperationError>;
928
929 fn clone_for_join_fn(&self) -> Option<&'static (dyn Any + Send + Sync)>;
930
931 fn create_any_buffer_view<'a>(
932 &self,
933 key: &AnyBufferKey,
934 world: &'a World,
935 ) -> Result<AnyBufferView<'a>, BufferError>;
936
937 fn create_any_buffer_access_mut_state(
938 &self,
939 world: &mut World,
940 ) -> Box<dyn AnyBufferAccessMutState>;
941}
942
943pub type AnyMessageResult = Result<AnyMessageBox, OperationError>;
944pub type BufferDowncastBox = Box<dyn Fn(AnyBuffer) -> AnyMessageResult + Send + Sync>;
946pub type BufferDowncastRef = &'static (dyn Fn(AnyBuffer) -> AnyMessageResult + Send + Sync);
947pub type KeyDowncastBox = Box<dyn Fn(BufferKeyTag) -> AnyMessageBox + Send + Sync>;
948pub type KeyDowncastRef = &'static (dyn Fn(BufferKeyTag) -> AnyMessageBox + Send + Sync);
949pub type CloneForAnyFn = fn(&EntityRef, Entity) -> AnyMessageResult;
950
951struct AnyBufferAccessImpl<T> {
952 buffer_downcasts: Mutex<HashMap<TypeId, BufferDowncastRef>>,
953 key_downcasts: Mutex<HashMap<TypeId, KeyDowncastRef>>,
954 cloning: Mutex<Option<CloneInterfaces>>,
955 _ignore: std::marker::PhantomData<fn(T)>,
956}
957
958struct CloneInterfaces {
959 clone_for_any_join: CloneForAnyFn,
960 clone_for_join_fn: &'static (dyn Any + Send + Sync),
963}
964
965impl<T: 'static + Send + Sync> AnyBufferAccessImpl<T> {
966 fn new() -> Self {
967 let mut buffer_downcasts: HashMap<_, BufferDowncastRef> = HashMap::new();
968
969 buffer_downcasts.insert(
975 TypeId::of::<AnyBuffer>(),
976 Box::leak(Box::new(|buffer: AnyBuffer| -> AnyMessageResult {
977 Ok(Box::new(AnyBuffer {
978 location: buffer.location,
979 join_behavior: buffer.join_behavior,
980 interface: AnyBuffer::interface_for::<T>(),
981 }))
982 })),
983 );
984
985 buffer_downcasts.insert(
987 TypeId::of::<Buffer<T>>(),
988 Box::leak(Box::new(|buffer: AnyBuffer| -> AnyMessageResult {
989 Ok(Box::new(Buffer::<T> {
990 location: buffer.location,
991 _ignore: Default::default(),
992 }))
993 })),
994 );
995
996 buffer_downcasts.insert(
998 TypeId::of::<FetchFromBuffer<T>>(),
999 Box::leak(Box::new(|buffer: AnyBuffer| -> AnyMessageResult {
1000 Ok(Box::new(FetchFromBuffer::<T>::try_from(buffer)?))
1001 })),
1002 );
1003
1004 let mut key_downcasts: HashMap<_, KeyDowncastRef> = HashMap::new();
1005
1006 key_downcasts.insert(
1008 TypeId::of::<AnyBufferKey>(),
1009 Box::leak(Box::new(|tag| -> AnyMessageBox {
1010 Box::new(AnyBufferKey {
1011 tag,
1012 interface: AnyBuffer::interface_for::<T>(),
1013 })
1014 })),
1015 );
1016
1017 Self {
1018 buffer_downcasts: Mutex::new(buffer_downcasts),
1019 key_downcasts: Mutex::new(key_downcasts),
1020 cloning: Default::default(),
1021 _ignore: Default::default(),
1022 }
1023 }
1024}
1025
1026impl<T: 'static + Send + Sync + Any> AnyBufferAccessInterface for AnyBufferAccessImpl<T> {
1027 fn message_type_id(&self) -> TypeId {
1028 TypeId::of::<T>()
1029 }
1030
1031 fn message_type_name(&self) -> &'static str {
1032 std::any::type_name::<T>()
1033 }
1034
1035 fn buffered_count(&self, entity: &EntityRef, session: Entity) -> Result<usize, OperationError> {
1036 entity.buffered_count::<T>(session)
1037 }
1038
1039 fn ensure_session(&self, entity_mut: &mut EntityWorldMut, session: Entity) -> OperationResult {
1040 entity_mut.ensure_session::<T>(session)
1041 }
1042
1043 fn register_buffer_downcast(&self, buffer_type: TypeId, f: BufferDowncastBox) {
1044 let mut downcasts = self.buffer_downcasts.lock().unwrap();
1045
1046 if let Entry::Vacant(entry) = downcasts.entry(buffer_type) {
1047 entry.insert(Box::leak(f));
1049 }
1050 }
1051
1052 fn register_cloning(
1053 &self,
1054 clone_for_any_join: CloneForAnyFn,
1055 clone_for_join_fn: &'static (dyn Any + Send + Sync),
1056 ) {
1057 *self.cloning.lock().unwrap() = Some(CloneInterfaces {
1058 clone_for_any_join,
1059 clone_for_join_fn,
1060 });
1061 }
1062
1063 fn buffer_downcast(&self, buffer_type: TypeId) -> Option<BufferDowncastRef> {
1064 self.buffer_downcasts
1065 .lock()
1066 .unwrap()
1067 .get(&buffer_type)
1068 .copied()
1069 }
1070
1071 fn register_key_downcast(&self, key_type: TypeId, f: KeyDowncastBox) {
1072 let mut downcasts = self.key_downcasts.lock().unwrap();
1073
1074 if let Entry::Vacant(entry) = downcasts.entry(key_type) {
1075 entry.insert(Box::leak(f));
1077 }
1078 }
1079
1080 fn key_downcast(&self, key_type: TypeId) -> Option<KeyDowncastRef> {
1081 self.key_downcasts.lock().unwrap().get(&key_type).copied()
1082 }
1083
1084 fn pull(
1085 &self,
1086 entity_mut: &mut EntityWorldMut,
1087 session: Entity,
1088 ) -> Result<AnyMessageBox, OperationError> {
1089 entity_mut
1090 .pull_from_buffer::<T>(session)
1091 .map(to_any_message)
1092 }
1093
1094 fn clone_from_buffer(
1095 &self,
1096 entity_ref: &EntityRef,
1097 session: Entity,
1098 ) -> Result<AnyMessageBox, OperationError> {
1099 let f = self
1100 .cloning
1101 .lock()
1102 .unwrap()
1103 .as_ref()
1104 .or_broken()?
1105 .clone_for_any_join;
1106 f(entity_ref, session)
1107 }
1108
1109 fn clone_for_join_fn(&self) -> Option<&'static (dyn Any + Send + Sync)> {
1110 self.cloning
1111 .lock()
1112 .unwrap()
1113 .as_ref()
1114 .map(|c| c.clone_for_join_fn)
1115 }
1116
1117 fn create_any_buffer_view<'a>(
1118 &self,
1119 key: &AnyBufferKey,
1120 world: &'a World,
1121 ) -> Result<AnyBufferView<'a>, BufferError> {
1122 let buffer_ref = world
1123 .get_entity(key.tag.buffer)
1124 .map_err(|_| BufferError::BufferMissing)?;
1125 let storage = buffer_ref
1126 .get::<BufferStorage<T>>()
1127 .ok_or(BufferError::BufferMissing)?;
1128 let gate = buffer_ref
1129 .get::<GateState>()
1130 .ok_or(BufferError::BufferMissing)?;
1131 Ok(AnyBufferView {
1132 storage: Box::new(storage),
1133 gate,
1134 session: key.tag.session,
1135 })
1136 }
1137
1138 fn create_any_buffer_access_mut_state(
1139 &self,
1140 world: &mut World,
1141 ) -> Box<dyn AnyBufferAccessMutState> {
1142 Box::new(SystemState::<BufferAccessMut<T>>::new(world))
1143 }
1144}
1145
1146pub struct DrainAnyBuffer<'a> {
1147 interface: Box<dyn DrainAnyBufferInterface + 'a>,
1148}
1149
1150impl<'a> Iterator for DrainAnyBuffer<'a> {
1151 type Item = AnyMessageBox;
1152
1153 fn next(&mut self) -> Option<Self::Item> {
1154 self.interface.any_next()
1155 }
1156}
1157
1158trait DrainAnyBufferInterface {
1159 fn any_next(&mut self) -> Option<AnyMessageBox>;
1160}
1161
1162impl<T: 'static + Send + Sync + Any> DrainAnyBufferInterface for DrainBuffer<'_, T> {
1163 fn any_next(&mut self) -> Option<AnyMessageBox> {
1164 self.next().map(to_any_message)
1165 }
1166}
1167
1168impl Bufferable for AnyBuffer {
1169 type BufferType = Self;
1170 fn into_buffer(self, builder: &mut Builder) -> Self::BufferType {
1171 assert_eq!(self.scope(), builder.scope());
1172 self
1173 }
1174}
1175
1176impl Buffering for AnyBuffer {
1177 fn verify_scope(&self, scope: Entity) {
1178 assert_eq!(scope, self.scope());
1179 }
1180
1181 fn buffered_count(&self, session: Entity, world: &World) -> Result<usize, OperationError> {
1182 let entity_ref = world.get_entity(self.id()).or_broken()?;
1183 self.interface.buffered_count(&entity_ref, session)
1184 }
1185
1186 fn buffered_count_for(
1187 &self,
1188 buffer: Entity,
1189 session: Entity,
1190 world: &World,
1191 ) -> Result<usize, OperationError> {
1192 if buffer != self.id() {
1193 return Ok(0);
1194 }
1195
1196 self.buffered_count(session, world)
1197 }
1198
1199 fn add_listener(&self, listener: Entity, world: &mut World) -> OperationResult {
1200 add_listener_to_source(self.id(), listener, world)
1201 }
1202
1203 fn gate_action(
1204 &self,
1205 session: Entity,
1206 action: Gate,
1207 world: &mut World,
1208 roster: &mut OperationRoster,
1209 ) -> OperationResult {
1210 GateState::apply(self.id(), session, action, world, roster)
1211 }
1212
1213 fn as_input(&self) -> SmallVec<[Entity; 8]> {
1214 SmallVec::from_iter([self.id()])
1215 }
1216
1217 fn ensure_active_session(&self, session: Entity, world: &mut World) -> OperationResult {
1218 let mut entity_mut = world.get_entity_mut(self.id()).or_broken()?;
1219 self.interface.ensure_session(&mut entity_mut, session)
1220 }
1221}
1222
1223impl Joining for AnyBuffer {
1224 type Item = AnyMessageBox;
1225 fn fetch_for_join(
1226 &self,
1227 session: Entity,
1228 world: &mut World,
1229 ) -> Result<Self::Item, OperationError> {
1230 match self.join_behavior {
1231 JoinBehavior::Pull => {
1232 let mut buffer_mut = world.get_entity_mut(self.id()).or_broken()?;
1233 self.interface.pull(&mut buffer_mut, session)
1234 }
1235 JoinBehavior::Clone => {
1236 let buffer_ref = world.get_entity(self.id()).or_broken()?;
1237 self.interface.clone_from_buffer(&buffer_ref, session)
1238 }
1239 }
1240 }
1241}
1242
1243impl Accessing for AnyBuffer {
1244 type Key = AnyBufferKey;
1245 fn add_accessor(&self, accessor: Entity, world: &mut World) -> OperationResult {
1246 world
1247 .get_mut::<BufferAccessors>(self.id())
1248 .or_broken()?
1249 .add_accessor(accessor);
1250 Ok(())
1251 }
1252
1253 fn create_key(&self, builder: &super::BufferKeyBuilder) -> Self::Key {
1254 AnyBufferKey {
1255 tag: builder.make_tag(self.id()),
1256 interface: self.interface,
1257 }
1258 }
1259
1260 fn deep_clone_key(key: &Self::Key) -> Self::Key {
1261 key.deep_clone()
1262 }
1263
1264 fn is_key_in_use(key: &Self::Key) -> bool {
1265 key.is_in_use()
1266 }
1267}
1268
1269#[cfg(test)]
1270mod tests {
1271 use crate::{prelude::*, testing::*};
1272 use bevy_ecs::prelude::World;
1273
1274 #[test]
1275 fn test_any_count() {
1276 let mut context = TestingContext::minimal_plugins();
1277
1278 let workflow = context.spawn_io_workflow(|scope, builder| {
1279 let buffer = builder.create_buffer(BufferSettings::keep_all());
1280 let push_multiple_times = builder
1281 .commands()
1282 .spawn_service(push_multiple_times_into_buffer.into_blocking_service());
1283 let count = builder
1284 .commands()
1285 .spawn_service(get_buffer_count.into_blocking_service());
1286
1287 builder
1288 .chain(scope.start)
1289 .with_access(buffer)
1290 .then(push_multiple_times)
1291 .then(count)
1292 .connect(scope.terminate);
1293 });
1294
1295 let count = context.resolve_request(1, workflow);
1296 assert_eq!(count, 5);
1297 }
1298
1299 fn push_multiple_times_into_buffer(
1300 In((value, key)): In<(usize, BufferKey<usize>)>,
1301 mut access: BufferAccessMut<usize>,
1302 ) -> AnyBufferKey {
1303 let mut buffer = access.get_mut(&key).unwrap();
1304 for _ in 0..5 {
1305 buffer.push(value);
1306 }
1307
1308 key.into()
1309 }
1310
1311 fn get_buffer_count(In(key): In<AnyBufferKey>, world: &mut World) -> usize {
1312 world.any_buffer_view(&key).unwrap().len()
1313 }
1314
1315 #[test]
1316 fn test_modify_any_message() {
1317 let mut context = TestingContext::minimal_plugins();
1318
1319 let workflow = context.spawn_io_workflow(|scope, builder| {
1320 let buffer = builder.create_buffer(BufferSettings::keep_all());
1321 let push_multiple_times = builder
1322 .commands()
1323 .spawn_service(push_multiple_times_into_buffer.into_blocking_service());
1324 let modify_content = builder
1325 .commands()
1326 .spawn_service(modify_buffer_content.into_blocking_service());
1327 let drain_content = builder
1328 .commands()
1329 .spawn_service(pull_each_buffer_item.into_blocking_service());
1330
1331 builder
1332 .chain(scope.start)
1333 .with_access(buffer)
1334 .then(push_multiple_times)
1335 .then(modify_content)
1336 .then(drain_content)
1337 .connect(scope.terminate);
1338 });
1339
1340 let values = context.resolve_request(3, workflow);
1341 assert_eq!(values, vec![0, 3, 6, 9, 12]);
1342 }
1343
1344 fn modify_buffer_content(In(key): In<AnyBufferKey>, world: &mut World) -> AnyBufferKey {
1345 world
1346 .any_buffer_mut(&key, |mut access| {
1347 for i in 0..access.len() {
1348 access.get_mut(i).map(|value| {
1349 *value.downcast_mut::<usize>().unwrap() *= i;
1350 });
1351 }
1352 })
1353 .unwrap();
1354
1355 key
1356 }
1357
1358 fn pull_each_buffer_item(In(key): In<AnyBufferKey>, world: &mut World) -> Vec<usize> {
1359 world
1360 .any_buffer_mut(&key, |mut access| {
1361 let mut values = Vec::new();
1362 while let Some(value) = access.pull() {
1363 values.push(*value.downcast::<usize>().unwrap());
1364 }
1365 values
1366 })
1367 .unwrap()
1368 }
1369
1370 #[test]
1371 fn test_drain_any_message() {
1372 let mut context = TestingContext::minimal_plugins();
1373
1374 let workflow = context.spawn_io_workflow(|scope, builder| {
1375 let buffer = builder.create_buffer(BufferSettings::keep_all());
1376 let push_multiple_times = builder
1377 .commands()
1378 .spawn_service(push_multiple_times_into_buffer.into_blocking_service());
1379 let modify_content = builder
1380 .commands()
1381 .spawn_service(modify_buffer_content.into_blocking_service());
1382 let drain_content = builder
1383 .commands()
1384 .spawn_service(drain_buffer_contents.into_blocking_service());
1385
1386 builder
1387 .chain(scope.start)
1388 .with_access(buffer)
1389 .then(push_multiple_times)
1390 .then(modify_content)
1391 .then(drain_content)
1392 .connect(scope.terminate);
1393 });
1394
1395 let values = context.resolve_request(3, workflow);
1396 assert_eq!(values, vec![0, 3, 6, 9, 12]);
1397 }
1398
1399 fn drain_buffer_contents(In(key): In<AnyBufferKey>, world: &mut World) -> Vec<usize> {
1400 world
1401 .any_buffer_mut(&key, |mut access| {
1402 access
1403 .drain(..)
1404 .map(|value| *value.downcast::<usize>().unwrap())
1405 .collect()
1406 })
1407 .unwrap()
1408 }
1409
1410 #[test]
1411 fn double_any_messages() {
1412 let mut context = TestingContext::minimal_plugins();
1413
1414 let workflow =
1415 context.spawn_io_workflow(|scope: Scope<(u32, i32, f32), (u32, i32, f32)>, builder| {
1416 let buffer_u32: AnyBuffer = builder
1417 .create_buffer::<u32>(BufferSettings::default())
1418 .into();
1419 let buffer_i32: AnyBuffer = builder
1420 .create_buffer::<i32>(BufferSettings::default())
1421 .into();
1422 let buffer_f32: AnyBuffer = builder
1423 .create_buffer::<f32>(BufferSettings::default())
1424 .into();
1425
1426 let (input_u32, input_i32, input_f32) = builder.chain(scope.start).unzip();
1427 builder.chain(input_u32).map_block(|v| 2 * v).connect(
1428 buffer_u32
1429 .downcast_for_message::<u32>()
1430 .unwrap()
1431 .input_slot(),
1432 );
1433
1434 builder.chain(input_i32).map_block(|v| 2 * v).connect(
1435 buffer_i32
1436 .downcast_for_message::<i32>()
1437 .unwrap()
1438 .input_slot(),
1439 );
1440
1441 builder.chain(input_f32).map_block(|v| 2.0 * v).connect(
1442 buffer_f32
1443 .downcast_for_message::<f32>()
1444 .unwrap()
1445 .input_slot(),
1446 );
1447
1448 (buffer_u32, buffer_i32, buffer_f32)
1449 .join(builder)
1450 .map_block(|(value_u32, value_i32, value_f32)| {
1451 (
1452 *value_u32.downcast::<u32>().unwrap(),
1453 *value_i32.downcast::<i32>().unwrap(),
1454 *value_f32.downcast::<f32>().unwrap(),
1455 )
1456 })
1457 .connect(scope.terminate);
1458 });
1459
1460 let r = context.resolve_request((1u32, 2i32, 3f32), workflow);
1461 let (v_u32, v_i32, v_f32) = r;
1462 assert_eq!(v_u32, 2);
1463 assert_eq!(v_i32, 4);
1464 assert_eq!(v_f32, 6.0);
1465 }
1466}