crossflow/buffer/
any_buffer.rs

1/*
2 * Copyright (C) 2025 Open Source Robotics Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16*/
17
18// TODO(@mxgrey): Add module-level documentation describing how to use AnyBuffer
19
20use std::{
21    any::{Any, TypeId},
22    collections::{HashMap, HashSet, hash_map::Entry},
23    ops::RangeBounds,
24    sync::{Mutex, OnceLock},
25};
26
27use bevy_ecs::{
28    prelude::{Commands, Entity, EntityRef, EntityWorldMut, Mut, World},
29    system::SystemState,
30};
31
32use thiserror::Error as ThisError;
33
34use smallvec::SmallVec;
35
36use crate::{
37    Accessing, Accessor, Buffer, BufferAccessMut, BufferAccessors, BufferError, BufferIdentifier,
38    BufferKey, BufferKeyBuilder, BufferKeyLifecycle, BufferKeyTag, BufferLocation, BufferMap,
39    BufferMapLayout, BufferStorage, Bufferable, Buffering, Builder, CloneFromBuffer, DrainBuffer,
40    FetchFromBuffer, Gate, GateState, IncompatibleLayout, InspectBuffer, Joining, ManageBuffer,
41    MessageTypeHint, MessageTypeHintEvaluation, MessageTypeHintMap, NotifyBufferUpdate,
42    OperationError, OperationResult, OperationRoster, OrBroken, TypeInfo, add_listener_to_source,
43};
44
45/// A [`Buffer`] whose message type has been anonymized. Joining with this buffer
46/// type will yield an [`AnyMessageBox`].
47#[derive(Clone, Copy)]
48pub struct AnyBuffer {
49    pub(crate) location: BufferLocation,
50    pub(crate) join_behavior: JoinBehavior,
51    pub(crate) interface: &'static (dyn AnyBufferAccessInterface + Send + Sync),
52}
53
54impl AnyBuffer {
55    /// Specify that you want this buffer to join by pulling an element. This is
56    /// always supported.
57    pub fn join_by_pulling(mut self) -> AnyBuffer {
58        self.join_behavior = JoinBehavior::Pull;
59        self
60    }
61
62    /// Specify that you want this buffer to join by cloning an element. This is
63    /// only supported for underlying message types that support the [`Clone`]
64    /// trait.
65    ///
66    /// If you are using the diagram workflow builder, make sure the message type
67    /// stored by this buffer has registered its [`Clone`] trait.
68    pub fn join_by_cloning(mut self) -> Option<AnyBuffer> {
69        self.interface.clone_for_join_fn()?;
70        self.join_behavior = JoinBehavior::Clone;
71        Some(self)
72    }
73
74    /// The buffer ID for this key.
75    pub fn id(&self) -> Entity {
76        self.location.source
77    }
78
79    /// ID of the workflow that this buffer is associated with.
80    pub fn scope(&self) -> Entity {
81        self.location.scope
82    }
83
84    /// Get the type ID of the messages that this buffer supports.
85    pub fn message_type_id(&self) -> TypeId {
86        self.interface.message_type_id()
87    }
88
89    /// Get the type name of the messages that this buffer supports.
90    pub fn message_type_name(&self) -> &'static str {
91        self.interface.message_type_name()
92    }
93
94    /// Get the [`TypeInfo`] of this buffer's messages.
95    pub fn message_type(&self) -> TypeInfo {
96        TypeInfo {
97            type_id: self.message_type_id(),
98            type_name: self.message_type_name(),
99        }
100    }
101
102    /// Get the [`AnyBufferAccessInterface`] for this specific instance of [`AnyBuffer`].
103    pub fn get_interface(&self) -> &'static (dyn AnyBufferAccessInterface + Send + Sync) {
104        self.interface
105    }
106
107    /// Get the [`AnyBufferAccessInterface`] for a concrete message type.
108    pub fn interface_for<T: 'static + Send + Sync>()
109    -> &'static (dyn AnyBufferAccessInterface + Send + Sync) {
110        static INTERFACE_MAP: OnceLock<
111            Mutex<HashMap<TypeId, &'static (dyn AnyBufferAccessInterface + Send + Sync)>>,
112        > = OnceLock::new();
113        let interfaces = INTERFACE_MAP.get_or_init(|| Mutex::default());
114
115        // SAFETY: This will leak memory exactly once per type, so the leakage is bounded.
116        // Leaking this allows the interface to be shared freely across all instances.
117        let mut interfaces_mut = interfaces.lock().unwrap();
118        *interfaces_mut
119            .entry(TypeId::of::<T>())
120            .or_insert_with(|| Box::leak(Box::new(AnyBufferAccessImpl::<T>::new())))
121    }
122}
123
124impl std::fmt::Debug for AnyBuffer {
125    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126        f.debug_struct("AnyBuffer")
127            .field("scope", &self.location.scope)
128            .field("source", &self.location.source)
129            .field("join_behavior", &self.join_behavior)
130            .field("message_type_name", &self.interface.message_type_name())
131            .finish()
132    }
133}
134
135impl AnyBuffer {
136    /// Downcast this into a concrete [`Buffer`] for the specified message type.
137    ///
138    /// To downcast into a specialized kind of buffer, use [`Self::downcast_buffer`] instead.
139    pub fn downcast_for_message<Message: 'static>(&self) -> Option<Buffer<Message>> {
140        if TypeId::of::<Message>() == self.interface.message_type_id() {
141            Some(Buffer {
142                location: self.location,
143                _ignore: Default::default(),
144            })
145        } else {
146            None
147        }
148    }
149
150    /// Downcast this into a different special buffer representation, such as a
151    /// `JsonBuffer`.
152    pub fn downcast_buffer<BufferType: 'static>(&self) -> Option<BufferType> {
153        self.interface.buffer_downcast(TypeId::of::<BufferType>())?(*self)
154            .ok()?
155            .downcast::<BufferType>()
156            .ok()
157            .map(|x| *x)
158    }
159}
160
161impl<T: 'static + Send + Sync> From<Buffer<T>> for AnyBuffer {
162    fn from(value: Buffer<T>) -> Self {
163        let interface = AnyBuffer::interface_for::<T>();
164        AnyBuffer {
165            location: value.location,
166            join_behavior: JoinBehavior::Pull,
167            interface,
168        }
169    }
170}
171
172impl<T: 'static + Send + Sync + Clone> From<CloneFromBuffer<T>> for AnyBuffer {
173    fn from(value: CloneFromBuffer<T>) -> Self {
174        let interface = AnyBuffer::interface_for::<T>();
175        AnyBuffer {
176            location: value.location,
177            join_behavior: JoinBehavior::Clone,
178            interface,
179        }
180    }
181}
182
183/// What should the behavior be for this buffer when it gets joined? You can
184/// make copies of the [`Buffer`] reference and give each copy a different behavior
185/// so that it gets used differently for each join operation that it takes part in.
186#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
187pub enum JoinBehavior {
188    /// Pull a value from the buffer while joining
189    #[default]
190    Pull,
191    /// Clone a value from the buffer while joining
192    Clone,
193}
194
195/// A trait for turning a buffer into an [`AnyBuffer`]. It is expected that all
196/// buffer types implement this trait.
197pub trait AsAnyBuffer {
198    /// Convert this buffer into an [`AnyBuffer`].
199    fn as_any_buffer(&self) -> AnyBuffer;
200
201    /// What would be the message type hint for this kind of buffer?
202    fn message_type_hint() -> MessageTypeHint;
203}
204
205impl AsAnyBuffer for AnyBuffer {
206    fn as_any_buffer(&self) -> AnyBuffer {
207        *self
208    }
209
210    fn message_type_hint() -> MessageTypeHint {
211        MessageTypeHint::fallback::<AnyMessageBox>()
212    }
213}
214
215impl<T: 'static + Send + Sync> AsAnyBuffer for Buffer<T> {
216    fn as_any_buffer(&self) -> AnyBuffer {
217        (*self).into()
218    }
219
220    fn message_type_hint() -> MessageTypeHint {
221        MessageTypeHint::exact::<T>()
222    }
223}
224
225impl<T: 'static + Send + Sync + Clone> AsAnyBuffer for CloneFromBuffer<T> {
226    fn as_any_buffer(&self) -> AnyBuffer {
227        (*self).into()
228    }
229
230    fn message_type_hint() -> MessageTypeHint {
231        MessageTypeHint::exact::<T>()
232    }
233}
234
235/// Similar to a [`BufferKey`] except it can be used for any buffer without
236/// knowing the buffer's message type at compile time.
237///
238/// This can key be used with a [`World`][1] to directly view or manipulate the
239/// contents of a buffer through the [`AnyBufferWorldAccess`] interface.
240///
241/// [1]: bevy_ecs::prelude::World
242#[derive(Clone)]
243pub struct AnyBufferKey {
244    pub(crate) tag: BufferKeyTag,
245    pub(crate) interface: &'static (dyn AnyBufferAccessInterface + Send + Sync),
246}
247
248impl AnyBufferKey {
249    /// Downcast this into a concrete [`BufferKey`] for the specified message type.
250    ///
251    /// To downcast to a specialized kind of key, use [`Self::downcast_buffer_key`] instead.
252    pub fn downcast_for_message<Message: 'static>(self) -> Option<BufferKey<Message>> {
253        if TypeId::of::<Message>() == self.interface.message_type_id() {
254            Some(BufferKey {
255                tag: self.tag,
256                _ignore: Default::default(),
257            })
258        } else {
259            None
260        }
261    }
262
263    /// Downcast this into a different special buffer key representation, such
264    /// as a `JsonBufferKey`.
265    pub fn downcast_buffer_key<KeyType: 'static>(self) -> Option<KeyType> {
266        self.interface.key_downcast(TypeId::of::<KeyType>())?(self.tag)
267            .downcast::<KeyType>()
268            .ok()
269            .map(|x| *x)
270    }
271
272    /// The buffer ID of this key.
273    pub fn id(&self) -> Entity {
274        self.tag.buffer
275    }
276
277    /// The session that this key belongs to.
278    pub fn session(&self) -> Entity {
279        self.tag.session
280    }
281}
282
283impl BufferKeyLifecycle for AnyBufferKey {
284    type TargetBuffer = AnyBuffer;
285
286    fn create_key(buffer: &AnyBuffer, builder: &BufferKeyBuilder) -> Self {
287        AnyBufferKey {
288            tag: builder.make_tag(buffer.id()),
289            interface: buffer.interface,
290        }
291    }
292
293    fn is_in_use(&self) -> bool {
294        self.tag.is_in_use()
295    }
296
297    fn deep_clone(&self) -> Self {
298        Self {
299            tag: self.tag.deep_clone(),
300            interface: self.interface,
301        }
302    }
303}
304
305impl std::fmt::Debug for AnyBufferKey {
306    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
307        f.debug_struct("AnyBufferKey")
308            .field("message_type_name", &self.interface.message_type_name())
309            .field("tag", &self.tag)
310            .finish()
311    }
312}
313
314impl<T: 'static + Send + Sync + Any> From<BufferKey<T>> for AnyBufferKey {
315    fn from(value: BufferKey<T>) -> Self {
316        let interface = AnyBuffer::interface_for::<T>();
317        AnyBufferKey {
318            tag: value.tag,
319            interface,
320        }
321    }
322}
323
324impl Accessor for AnyBufferKey {
325    type Buffers = AnyBuffer;
326}
327
328impl BufferMapLayout for AnyBuffer {
329    fn try_from_buffer_map(buffers: &BufferMap) -> Result<Self, IncompatibleLayout> {
330        let mut compatibility = IncompatibleLayout::default();
331
332        if let Ok(any_buffer) = compatibility.require_buffer_for_identifier::<AnyBuffer>(0, buffers)
333        {
334            return Ok(any_buffer);
335        }
336
337        Err(compatibility)
338    }
339
340    fn get_buffer_message_type_hints(
341        identifiers: HashSet<BufferIdentifier<'static>>,
342    ) -> Result<MessageTypeHintMap, IncompatibleLayout> {
343        let mut evaluation = MessageTypeHintEvaluation::new(identifiers);
344        evaluation.fallback::<AnyMessageBox>(0);
345        evaluation.evaluate()
346    }
347}
348
349/// Similar to [`BufferView`][crate::BufferView], but this can be unlocked with
350/// an [`AnyBufferKey`], so it can work for any buffer whose message types
351/// support serialization and deserialization.
352pub struct AnyBufferView<'a> {
353    storage: Box<dyn AnyBufferViewing + 'a>,
354    gate: &'a GateState,
355    session: Entity,
356}
357
358impl<'a> AnyBufferView<'a> {
359    /// Look at the oldest message in the buffer.
360    pub fn oldest(&self) -> Option<AnyMessageRef<'_>> {
361        self.storage.any_oldest(self.session)
362    }
363
364    /// Look at the newest message in the buffer.
365    pub fn newest(&self) -> Option<AnyMessageRef<'_>> {
366        self.storage.any_newest(self.session)
367    }
368
369    /// Borrow a message from the buffer. Index 0 is the oldest message in the buffer
370    /// while the highest index is the newest message in the buffer.
371    pub fn get(&self, index: usize) -> Option<AnyMessageRef<'_>> {
372        self.storage.any_get(self.session, index)
373    }
374
375    /// Get how many messages are in this buffer.
376    pub fn len(&self) -> usize {
377        self.storage.any_count(self.session)
378    }
379
380    /// Check if the buffer is empty.
381    pub fn is_empty(&self) -> bool {
382        self.len() == 0
383    }
384
385    /// Check whether the gate of this buffer is open or closed.
386    pub fn gate(&self) -> Gate {
387        self.gate
388            .map
389            .get(&self.session)
390            .copied()
391            .unwrap_or(Gate::Open)
392    }
393}
394
395/// Similar to [`BufferMut`][crate::BufferMut], but this can be unlocked with an
396/// [`AnyBufferKey`], so it can work for any buffer regardless of the data type
397/// inside.
398pub struct AnyBufferMut<'w, 's, 'a> {
399    storage: Box<dyn AnyBufferManagement + 'a>,
400    buffer: Entity,
401    session: Entity,
402    accessor: Option<Entity>,
403    commands: &'a mut Commands<'w, 's>,
404    modified: bool,
405}
406
407impl<'w, 's, 'a> AnyBufferMut<'w, 's, 'a> {
408    /// Same as [BufferMut::allow_closed_loops][1].
409    ///
410    /// [1]: crate::BufferMut::allow_closed_loops
411    pub fn allow_closed_loops(mut self) -> Self {
412        self.accessor = None;
413        self
414    }
415
416    /// Look at the oldest message in the buffer.
417    pub fn oldest(&self) -> Option<AnyMessageRef<'_>> {
418        self.storage.any_oldest(self.session)
419    }
420
421    /// Look at the newest message in the buffer.
422    pub fn newest(&self) -> Option<AnyMessageRef<'_>> {
423        self.storage.any_newest(self.session)
424    }
425
426    /// Borrow a message from the buffer. Index 0 is the oldest message in the buffer
427    /// while the highest index is the newest message in the buffer.
428    pub fn get(&self, index: usize) -> Option<AnyMessageRef<'_>> {
429        self.storage.any_get(self.session, index)
430    }
431
432    /// Get how many messages are in this buffer.
433    pub fn len(&self) -> usize {
434        self.storage.any_count(self.session)
435    }
436
437    /// Check if the buffer is empty.
438    pub fn is_empty(&self) -> bool {
439        self.len() == 0
440    }
441
442    /// Modify the oldest message in the buffer.
443    pub fn oldest_mut(&mut self) -> Option<AnyMessageMut<'_>> {
444        self.modified = true;
445        self.storage.any_oldest_mut(self.session)
446    }
447
448    /// Modify the newest message in the buffer.
449    pub fn newest_mut(&mut self) -> Option<AnyMessageMut<'_>> {
450        self.modified = true;
451        self.storage.any_newest_mut(self.session)
452    }
453
454    /// Modify a message in the buffer. Index 0 is the oldest message in the buffer
455    /// with the highest index being the newest message in the buffer.
456    pub fn get_mut(&mut self, index: usize) -> Option<AnyMessageMut<'_>> {
457        self.modified = true;
458        self.storage.any_get_mut(self.session, index)
459    }
460
461    /// Drain a range of messages out of the buffer.
462    pub fn drain<R: RangeBounds<usize>>(&mut self, range: R) -> DrainAnyBuffer<'_> {
463        self.modified = true;
464        DrainAnyBuffer {
465            interface: self.storage.any_drain(self.session, AnyRange::new(range)),
466        }
467    }
468
469    /// Pull the oldest message from the buffer.
470    pub fn pull(&mut self) -> Option<AnyMessageBox> {
471        self.modified = true;
472        self.storage.any_pull(self.session)
473    }
474
475    /// Pull the message that was most recently put into the buffer (instead of the
476    /// oldest, which is what [`Self::pull`] gives).
477    pub fn pull_newest(&mut self) -> Option<AnyMessageBox> {
478        self.modified = true;
479        self.storage.any_pull_newest(self.session)
480    }
481
482    /// Attempt to push a new value into the buffer.
483    ///
484    /// If the input value matches the message type of the buffer, this will
485    /// return [`Ok`]. If the buffer is at its limit before a successful push, this
486    /// will return the value that needed to be removed.
487    ///
488    /// If the input value does not match the message type of the buffer, this
489    /// will return [`Err`] and give back the message that you tried to push.
490    pub fn push<T: 'static + Send + Sync + Any>(&mut self, value: T) -> Result<Option<T>, T> {
491        if TypeId::of::<T>() != self.storage.any_message_type() {
492            return Err(value);
493        }
494
495        self.modified = true;
496
497        // SAFETY: We checked that T matches the message type for this buffer,
498        // so pushing and downcasting should not exhibit any errors.
499        let removed = self
500            .storage
501            .any_push(self.session, Box::new(value))
502            .unwrap()
503            .map(|value| *value.downcast::<T>().unwrap());
504
505        Ok(removed)
506    }
507
508    /// Attempt to push a new value of any message type into the buffer.
509    ///
510    /// If the input value matches the message type of the buffer, this will
511    /// return [`Ok`]. If the buffer is at its limit before a successful push, this
512    /// will return the value that needed to be removed.
513    ///
514    /// If the input value does not match the message type of the buffer, this
515    /// will return [`Err`] and give back an error with the message that you
516    /// tried to push and the type information for the expected message type.
517    pub fn push_any(
518        &mut self,
519        value: AnyMessageBox,
520    ) -> Result<Option<AnyMessageBox>, AnyMessageError> {
521        self.storage.any_push(self.session, value)
522    }
523
524    /// Attempt to push a value into the buffer as if it is the oldest value of
525    /// the buffer.
526    ///
527    /// The result follows the same rules as [`Self::push`].
528    pub fn push_as_oldest<T: 'static + Send + Sync + Any>(
529        &mut self,
530        value: T,
531    ) -> Result<Option<T>, T> {
532        if TypeId::of::<T>() != self.storage.any_message_type() {
533            return Err(value);
534        }
535
536        self.modified = true;
537
538        // SAFETY: We checked that T matches the message type for this buffer,
539        // so pushing and downcasting should not exhibit any errors.
540        let removed = self
541            .storage
542            .any_push_as_oldest(self.session, Box::new(value))
543            .unwrap()
544            .map(|value| *value.downcast::<T>().unwrap());
545
546        Ok(removed)
547    }
548
549    /// Attempt to push a value into the buffer as if it is the oldest value of
550    /// the buffer.
551    ///
552    /// The result follows the same rules as [`Self::push_any`].
553    pub fn push_any_as_oldest(
554        &mut self,
555        value: AnyMessageBox,
556    ) -> Result<Option<AnyMessageBox>, AnyMessageError> {
557        self.storage.any_push_as_oldest(self.session, value)
558    }
559
560    /// Trigger the listeners for this buffer to wake up even if nothing in the
561    /// buffer has changed. This could be used for timers or timeout elements
562    /// in a workflow.
563    pub fn pulse(&mut self) {
564        self.modified = true;
565    }
566}
567
568impl<'w, 's, 'a> Drop for AnyBufferMut<'w, 's, 'a> {
569    fn drop(&mut self) {
570        if self.modified {
571            self.commands.queue(NotifyBufferUpdate::new(
572                self.buffer,
573                self.session,
574                self.accessor,
575            ));
576        }
577    }
578}
579
580/// This trait allows [`World`] to give you access to any buffer using an
581/// [`AnyBufferKey`].
582pub trait AnyBufferWorldAccess {
583    /// Call this to get read-only access to any buffer.
584    ///
585    /// For technical reasons this requires direct [`World`] access, but you can
586    /// do other read-only queries on the world while holding onto the
587    /// [`AnyBufferView`].
588    fn any_buffer_view(&self, key: &AnyBufferKey) -> Result<AnyBufferView<'_>, BufferError>;
589
590    /// Call this to get mutable access to any buffer.
591    ///
592    /// Pass in a callback that will receive a [`AnyBufferMut`], allowing it to
593    /// view and modify the contents of the buffer.
594    fn any_buffer_mut<U>(
595        &mut self,
596        key: &AnyBufferKey,
597        f: impl FnOnce(AnyBufferMut) -> U,
598    ) -> Result<U, BufferError>;
599}
600
601impl AnyBufferWorldAccess for World {
602    fn any_buffer_view(&self, key: &AnyBufferKey) -> Result<AnyBufferView<'_>, BufferError> {
603        key.interface.create_any_buffer_view(key, self)
604    }
605
606    fn any_buffer_mut<U>(
607        &mut self,
608        key: &AnyBufferKey,
609        f: impl FnOnce(AnyBufferMut) -> U,
610    ) -> Result<U, BufferError> {
611        let interface = key.interface;
612        let mut state = interface.create_any_buffer_access_mut_state(self);
613        let mut access = state.get_any_buffer_access_mut(self);
614        let buffer_mut = access.as_any_buffer_mut(key)?;
615        Ok(f(buffer_mut))
616    }
617}
618
619trait AnyBufferViewing {
620    fn any_count(&self, session: Entity) -> usize;
621    fn any_oldest<'a>(&'a self, session: Entity) -> Option<AnyMessageRef<'a>>;
622    fn any_newest<'a>(&'a self, session: Entity) -> Option<AnyMessageRef<'a>>;
623    fn any_get<'a>(&'a self, session: Entity, index: usize) -> Option<AnyMessageRef<'a>>;
624    fn any_message_type(&self) -> TypeId;
625}
626
627trait AnyBufferManagement: AnyBufferViewing {
628    fn any_push(&mut self, session: Entity, value: AnyMessageBox) -> AnyMessagePushResult;
629    fn any_push_as_oldest(&mut self, session: Entity, value: AnyMessageBox)
630    -> AnyMessagePushResult;
631    fn any_pull(&mut self, session: Entity) -> Option<AnyMessageBox>;
632    fn any_pull_newest(&mut self, session: Entity) -> Option<AnyMessageBox>;
633    fn any_oldest_mut<'a>(&'a mut self, session: Entity) -> Option<AnyMessageMut<'a>>;
634    fn any_newest_mut<'a>(&'a mut self, session: Entity) -> Option<AnyMessageMut<'a>>;
635    fn any_get_mut<'a>(&'a mut self, session: Entity, index: usize) -> Option<AnyMessageMut<'a>>;
636    fn any_drain<'a>(
637        &'a mut self,
638        session: Entity,
639        range: AnyRange,
640    ) -> Box<dyn DrainAnyBufferInterface + 'a>;
641}
642
643pub(crate) struct AnyRange {
644    start_bound: std::ops::Bound<usize>,
645    end_bound: std::ops::Bound<usize>,
646}
647
648impl AnyRange {
649    pub(crate) fn new<T: std::ops::RangeBounds<usize>>(range: T) -> Self {
650        AnyRange {
651            start_bound: deref_bound(range.start_bound()),
652            end_bound: deref_bound(range.end_bound()),
653        }
654    }
655}
656
657fn deref_bound(bound: std::ops::Bound<&usize>) -> std::ops::Bound<usize> {
658    match bound {
659        std::ops::Bound::Included(v) => std::ops::Bound::Included(*v),
660        std::ops::Bound::Excluded(v) => std::ops::Bound::Excluded(*v),
661        std::ops::Bound::Unbounded => std::ops::Bound::Unbounded,
662    }
663}
664
665impl std::ops::RangeBounds<usize> for AnyRange {
666    fn start_bound(&self) -> std::ops::Bound<&usize> {
667        self.start_bound.as_ref()
668    }
669
670    fn end_bound(&self) -> std::ops::Bound<&usize> {
671        self.end_bound.as_ref()
672    }
673
674    fn contains<U>(&self, item: &U) -> bool
675    where
676        usize: PartialOrd<U>,
677        U: ?Sized + PartialOrd<usize>,
678    {
679        match self.start_bound {
680            std::ops::Bound::Excluded(lower) => {
681                if *item <= lower {
682                    return false;
683                }
684            }
685            std::ops::Bound::Included(lower) => {
686                if *item < lower {
687                    return false;
688                }
689            }
690            _ => {}
691        }
692
693        match self.end_bound {
694            std::ops::Bound::Excluded(upper) => {
695                if upper <= *item {
696                    return false;
697                }
698            }
699            std::ops::Bound::Included(upper) => {
700                if upper < *item {
701                    return false;
702                }
703            }
704            _ => {}
705        }
706
707        return true;
708    }
709}
710
711pub type AnyMessageRef<'a> = &'a (dyn Any + 'static + Send + Sync);
712
713impl<T: 'static + Send + Sync + Any> AnyBufferViewing for &'_ BufferStorage<T> {
714    fn any_count(&self, session: Entity) -> usize {
715        self.count(session)
716    }
717
718    fn any_oldest<'a>(&'a self, session: Entity) -> Option<AnyMessageRef<'a>> {
719        self.oldest(session).map(to_any_ref)
720    }
721
722    fn any_newest<'a>(&'a self, session: Entity) -> Option<AnyMessageRef<'a>> {
723        self.newest(session).map(to_any_ref)
724    }
725
726    fn any_get<'a>(&'a self, session: Entity, index: usize) -> Option<AnyMessageRef<'a>> {
727        self.get(session, index).map(to_any_ref)
728    }
729
730    fn any_message_type(&self) -> TypeId {
731        TypeId::of::<T>()
732    }
733}
734
735impl<T: 'static + Send + Sync + Any> AnyBufferViewing for Mut<'_, BufferStorage<T>> {
736    fn any_count(&self, session: Entity) -> usize {
737        self.count(session)
738    }
739
740    fn any_oldest<'a>(&'a self, session: Entity) -> Option<AnyMessageRef<'a>> {
741        self.oldest(session).map(to_any_ref)
742    }
743
744    fn any_newest<'a>(&'a self, session: Entity) -> Option<AnyMessageRef<'a>> {
745        self.newest(session).map(to_any_ref)
746    }
747
748    fn any_get<'a>(&'a self, session: Entity, index: usize) -> Option<AnyMessageRef<'a>> {
749        self.get(session, index).map(to_any_ref)
750    }
751
752    fn any_message_type(&self) -> TypeId {
753        TypeId::of::<T>()
754    }
755}
756
757pub type AnyMessageMut<'a> = &'a mut (dyn Any + 'static + Send + Sync);
758
759pub type AnyMessageBox = Box<dyn Any + 'static + Send + Sync>;
760
761#[derive(ThisError, Debug)]
762#[error("failed to convert a message")]
763pub struct AnyMessageError {
764    /// The original value provided
765    pub value: AnyMessageBox,
766    /// The ID of the type expected by the buffer
767    pub type_id: TypeId,
768    /// The name of the type expected by the buffer
769    pub type_name: &'static str,
770}
771
772pub type AnyMessagePushResult = Result<Option<AnyMessageBox>, AnyMessageError>;
773
774impl<T: 'static + Send + Sync + Any> AnyBufferManagement for Mut<'_, BufferStorage<T>> {
775    fn any_push(&mut self, session: Entity, value: AnyMessageBox) -> AnyMessagePushResult {
776        let value = from_any_message::<T>(value)?;
777        Ok(self.push(session, value).map(to_any_message))
778    }
779
780    fn any_push_as_oldest(
781        &mut self,
782        session: Entity,
783        value: AnyMessageBox,
784    ) -> AnyMessagePushResult {
785        let value = from_any_message::<T>(value)?;
786        Ok(self.push_as_oldest(session, value).map(to_any_message))
787    }
788
789    fn any_pull(&mut self, session: Entity) -> Option<AnyMessageBox> {
790        self.pull(session).map(to_any_message)
791    }
792
793    fn any_pull_newest(&mut self, session: Entity) -> Option<AnyMessageBox> {
794        self.pull_newest(session).map(to_any_message)
795    }
796
797    fn any_oldest_mut<'a>(&'a mut self, session: Entity) -> Option<AnyMessageMut<'a>> {
798        self.oldest_mut(session).map(to_any_mut)
799    }
800
801    fn any_newest_mut<'a>(&'a mut self, session: Entity) -> Option<AnyMessageMut<'a>> {
802        self.newest_mut(session).map(to_any_mut)
803    }
804
805    fn any_get_mut<'a>(&'a mut self, session: Entity, index: usize) -> Option<AnyMessageMut<'a>> {
806        self.get_mut(session, index).map(to_any_mut)
807    }
808
809    fn any_drain<'a>(
810        &'a mut self,
811        session: Entity,
812        range: AnyRange,
813    ) -> Box<dyn DrainAnyBufferInterface + 'a> {
814        Box::new(self.drain(session, range))
815    }
816}
817
818fn to_any_ref<'a, T: 'static + Send + Sync + Any>(x: &'a T) -> AnyMessageRef<'a> {
819    x
820}
821
822fn to_any_mut<'a, T: 'static + Send + Sync + Any>(x: &'a mut T) -> AnyMessageMut<'a> {
823    x
824}
825
826pub(crate) fn to_any_message<T: 'static + Send + Sync + Any>(x: T) -> AnyMessageBox {
827    Box::new(x)
828}
829
830fn from_any_message<T: 'static + Send + Sync + Any>(
831    value: AnyMessageBox,
832) -> Result<T, AnyMessageError>
833where
834    T: 'static,
835{
836    let value = value.downcast::<T>().map_err(|value| AnyMessageError {
837        value,
838        type_id: TypeId::of::<T>(),
839        type_name: std::any::type_name::<T>(),
840    })?;
841
842    Ok(*value)
843}
844
845pub trait AnyBufferAccessMutState {
846    fn get_any_buffer_access_mut<'s, 'w: 's>(
847        &'s mut self,
848        world: &'w mut World,
849    ) -> Box<dyn AnyBufferAccessMut<'w, 's> + 's>;
850}
851
852impl<T: 'static + Send + Sync + Any> AnyBufferAccessMutState
853    for SystemState<BufferAccessMut<'static, 'static, T>>
854{
855    fn get_any_buffer_access_mut<'s, 'w: 's>(
856        &'s mut self,
857        world: &'w mut World,
858    ) -> Box<dyn AnyBufferAccessMut<'w, 's> + 's> {
859        Box::new(self.get_mut(world))
860    }
861}
862
863pub trait AnyBufferAccessMut<'w, 's> {
864    fn as_any_buffer_mut<'a>(
865        &'a mut self,
866        key: &AnyBufferKey,
867    ) -> Result<AnyBufferMut<'w, 's, 'a>, BufferError>;
868}
869
870impl<'w, 's, T: 'static + Send + Sync + Any> AnyBufferAccessMut<'w, 's>
871    for BufferAccessMut<'w, 's, T>
872{
873    fn as_any_buffer_mut<'a>(
874        &'a mut self,
875        key: &AnyBufferKey,
876    ) -> Result<AnyBufferMut<'w, 's, 'a>, BufferError> {
877        let BufferAccessMut { query, commands } = self;
878        let storage = query
879            .get_mut(key.tag.buffer)
880            .map_err(|_| BufferError::BufferMissing)?;
881
882        Ok(AnyBufferMut {
883            storage: Box::new(storage),
884            buffer: key.tag.buffer,
885            session: key.tag.session,
886            accessor: Some(key.tag.accessor),
887            commands,
888            modified: false,
889        })
890    }
891}
892
893pub trait AnyBufferAccessInterface {
894    fn message_type_id(&self) -> TypeId;
895
896    fn message_type_name(&self) -> &'static str;
897
898    fn buffered_count(&self, entity: &EntityRef, session: Entity) -> Result<usize, OperationError>;
899
900    fn ensure_session(&self, entity_mut: &mut EntityWorldMut, session: Entity) -> OperationResult;
901
902    fn register_buffer_downcast(&self, buffer_type: TypeId, f: BufferDowncastBox);
903
904    /// Allows AnyBuffer to support join_by_cloning
905    fn register_cloning(
906        &self,
907        clone_for_any_join: CloneForAnyFn,
908        clone_for_join_fn: &'static (dyn Any + Send + Sync),
909    );
910
911    fn buffer_downcast(&self, buffer_type: TypeId) -> Option<BufferDowncastRef>;
912
913    fn register_key_downcast(&self, key_type: TypeId, f: KeyDowncastBox);
914
915    fn key_downcast(&self, key_type: TypeId) -> Option<KeyDowncastRef>;
916
917    fn pull(
918        &self,
919        entity_mut: &mut EntityWorldMut,
920        session: Entity,
921    ) -> Result<AnyMessageBox, OperationError>;
922
923    fn clone_from_buffer(
924        &self,
925        entity_reft: &EntityRef,
926        session: Entity,
927    ) -> Result<AnyMessageBox, OperationError>;
928
929    fn clone_for_join_fn(&self) -> Option<&'static (dyn Any + Send + Sync)>;
930
931    fn create_any_buffer_view<'a>(
932        &self,
933        key: &AnyBufferKey,
934        world: &'a World,
935    ) -> Result<AnyBufferView<'a>, BufferError>;
936
937    fn create_any_buffer_access_mut_state(
938        &self,
939        world: &mut World,
940    ) -> Box<dyn AnyBufferAccessMutState>;
941}
942
943pub type AnyMessageResult = Result<AnyMessageBox, OperationError>;
944// TODO(@mxgrey): Consider changing this trait box into a function pointer
945pub type BufferDowncastBox = Box<dyn Fn(AnyBuffer) -> AnyMessageResult + Send + Sync>;
946pub type BufferDowncastRef = &'static (dyn Fn(AnyBuffer) -> AnyMessageResult + Send + Sync);
947pub type KeyDowncastBox = Box<dyn Fn(BufferKeyTag) -> AnyMessageBox + Send + Sync>;
948pub type KeyDowncastRef = &'static (dyn Fn(BufferKeyTag) -> AnyMessageBox + Send + Sync);
949pub type CloneForAnyFn = fn(&EntityRef, Entity) -> AnyMessageResult;
950
951struct AnyBufferAccessImpl<T> {
952    buffer_downcasts: Mutex<HashMap<TypeId, BufferDowncastRef>>,
953    key_downcasts: Mutex<HashMap<TypeId, KeyDowncastRef>>,
954    cloning: Mutex<Option<CloneInterfaces>>,
955    _ignore: std::marker::PhantomData<fn(T)>,
956}
957
958struct CloneInterfaces {
959    clone_for_any_join: CloneForAnyFn,
960    /// Contains a function pointer that can be downcast into a type-specific
961    /// fetch_for_join function pointer for [`FetchFromBuffer`].
962    clone_for_join_fn: &'static (dyn Any + Send + Sync),
963}
964
965impl<T: 'static + Send + Sync> AnyBufferAccessImpl<T> {
966    fn new() -> Self {
967        let mut buffer_downcasts: HashMap<_, BufferDowncastRef> = HashMap::new();
968
969        // SAFETY: These leaks are okay because we will only ever instantiate
970        // AnyBufferAccessImpl once per generic argument T, which puts a firm
971        // ceiling on how many of these callbacks will get leaked.
972
973        // Automatically register a downcast into AnyBuffer
974        buffer_downcasts.insert(
975            TypeId::of::<AnyBuffer>(),
976            Box::leak(Box::new(|buffer: AnyBuffer| -> AnyMessageResult {
977                Ok(Box::new(AnyBuffer {
978                    location: buffer.location,
979                    join_behavior: buffer.join_behavior,
980                    interface: AnyBuffer::interface_for::<T>(),
981                }))
982            })),
983        );
984
985        // Allow downcasting back to the original Buffer<T>
986        buffer_downcasts.insert(
987            TypeId::of::<Buffer<T>>(),
988            Box::leak(Box::new(|buffer: AnyBuffer| -> AnyMessageResult {
989                Ok(Box::new(Buffer::<T> {
990                    location: buffer.location,
991                    _ignore: Default::default(),
992                }))
993            })),
994        );
995
996        // Allow downcasting to the very general FetchFromBuffer type
997        buffer_downcasts.insert(
998            TypeId::of::<FetchFromBuffer<T>>(),
999            Box::leak(Box::new(|buffer: AnyBuffer| -> AnyMessageResult {
1000                Ok(Box::new(FetchFromBuffer::<T>::try_from(buffer)?))
1001            })),
1002        );
1003
1004        let mut key_downcasts: HashMap<_, KeyDowncastRef> = HashMap::new();
1005
1006        // Automatically register a downcast to AnyBufferKey
1007        key_downcasts.insert(
1008            TypeId::of::<AnyBufferKey>(),
1009            Box::leak(Box::new(|tag| -> AnyMessageBox {
1010                Box::new(AnyBufferKey {
1011                    tag,
1012                    interface: AnyBuffer::interface_for::<T>(),
1013                })
1014            })),
1015        );
1016
1017        Self {
1018            buffer_downcasts: Mutex::new(buffer_downcasts),
1019            key_downcasts: Mutex::new(key_downcasts),
1020            cloning: Default::default(),
1021            _ignore: Default::default(),
1022        }
1023    }
1024}
1025
1026impl<T: 'static + Send + Sync + Any> AnyBufferAccessInterface for AnyBufferAccessImpl<T> {
1027    fn message_type_id(&self) -> TypeId {
1028        TypeId::of::<T>()
1029    }
1030
1031    fn message_type_name(&self) -> &'static str {
1032        std::any::type_name::<T>()
1033    }
1034
1035    fn buffered_count(&self, entity: &EntityRef, session: Entity) -> Result<usize, OperationError> {
1036        entity.buffered_count::<T>(session)
1037    }
1038
1039    fn ensure_session(&self, entity_mut: &mut EntityWorldMut, session: Entity) -> OperationResult {
1040        entity_mut.ensure_session::<T>(session)
1041    }
1042
1043    fn register_buffer_downcast(&self, buffer_type: TypeId, f: BufferDowncastBox) {
1044        let mut downcasts = self.buffer_downcasts.lock().unwrap();
1045
1046        if let Entry::Vacant(entry) = downcasts.entry(buffer_type) {
1047            // SAFETY: We only leak this into the register once per type
1048            entry.insert(Box::leak(f));
1049        }
1050    }
1051
1052    fn register_cloning(
1053        &self,
1054        clone_for_any_join: CloneForAnyFn,
1055        clone_for_join_fn: &'static (dyn Any + Send + Sync),
1056    ) {
1057        *self.cloning.lock().unwrap() = Some(CloneInterfaces {
1058            clone_for_any_join,
1059            clone_for_join_fn,
1060        });
1061    }
1062
1063    fn buffer_downcast(&self, buffer_type: TypeId) -> Option<BufferDowncastRef> {
1064        self.buffer_downcasts
1065            .lock()
1066            .unwrap()
1067            .get(&buffer_type)
1068            .copied()
1069    }
1070
1071    fn register_key_downcast(&self, key_type: TypeId, f: KeyDowncastBox) {
1072        let mut downcasts = self.key_downcasts.lock().unwrap();
1073
1074        if let Entry::Vacant(entry) = downcasts.entry(key_type) {
1075            // SAFTY: We only leak this in to the register once per type
1076            entry.insert(Box::leak(f));
1077        }
1078    }
1079
1080    fn key_downcast(&self, key_type: TypeId) -> Option<KeyDowncastRef> {
1081        self.key_downcasts.lock().unwrap().get(&key_type).copied()
1082    }
1083
1084    fn pull(
1085        &self,
1086        entity_mut: &mut EntityWorldMut,
1087        session: Entity,
1088    ) -> Result<AnyMessageBox, OperationError> {
1089        entity_mut
1090            .pull_from_buffer::<T>(session)
1091            .map(to_any_message)
1092    }
1093
1094    fn clone_from_buffer(
1095        &self,
1096        entity_ref: &EntityRef,
1097        session: Entity,
1098    ) -> Result<AnyMessageBox, OperationError> {
1099        let f = self
1100            .cloning
1101            .lock()
1102            .unwrap()
1103            .as_ref()
1104            .or_broken()?
1105            .clone_for_any_join;
1106        f(entity_ref, session)
1107    }
1108
1109    fn clone_for_join_fn(&self) -> Option<&'static (dyn Any + Send + Sync)> {
1110        self.cloning
1111            .lock()
1112            .unwrap()
1113            .as_ref()
1114            .map(|c| c.clone_for_join_fn)
1115    }
1116
1117    fn create_any_buffer_view<'a>(
1118        &self,
1119        key: &AnyBufferKey,
1120        world: &'a World,
1121    ) -> Result<AnyBufferView<'a>, BufferError> {
1122        let buffer_ref = world
1123            .get_entity(key.tag.buffer)
1124            .map_err(|_| BufferError::BufferMissing)?;
1125        let storage = buffer_ref
1126            .get::<BufferStorage<T>>()
1127            .ok_or(BufferError::BufferMissing)?;
1128        let gate = buffer_ref
1129            .get::<GateState>()
1130            .ok_or(BufferError::BufferMissing)?;
1131        Ok(AnyBufferView {
1132            storage: Box::new(storage),
1133            gate,
1134            session: key.tag.session,
1135        })
1136    }
1137
1138    fn create_any_buffer_access_mut_state(
1139        &self,
1140        world: &mut World,
1141    ) -> Box<dyn AnyBufferAccessMutState> {
1142        Box::new(SystemState::<BufferAccessMut<T>>::new(world))
1143    }
1144}
1145
1146pub struct DrainAnyBuffer<'a> {
1147    interface: Box<dyn DrainAnyBufferInterface + 'a>,
1148}
1149
1150impl<'a> Iterator for DrainAnyBuffer<'a> {
1151    type Item = AnyMessageBox;
1152
1153    fn next(&mut self) -> Option<Self::Item> {
1154        self.interface.any_next()
1155    }
1156}
1157
1158trait DrainAnyBufferInterface {
1159    fn any_next(&mut self) -> Option<AnyMessageBox>;
1160}
1161
1162impl<T: 'static + Send + Sync + Any> DrainAnyBufferInterface for DrainBuffer<'_, T> {
1163    fn any_next(&mut self) -> Option<AnyMessageBox> {
1164        self.next().map(to_any_message)
1165    }
1166}
1167
1168impl Bufferable for AnyBuffer {
1169    type BufferType = Self;
1170    fn into_buffer(self, builder: &mut Builder) -> Self::BufferType {
1171        assert_eq!(self.scope(), builder.scope());
1172        self
1173    }
1174}
1175
1176impl Buffering for AnyBuffer {
1177    fn verify_scope(&self, scope: Entity) {
1178        assert_eq!(scope, self.scope());
1179    }
1180
1181    fn buffered_count(&self, session: Entity, world: &World) -> Result<usize, OperationError> {
1182        let entity_ref = world.get_entity(self.id()).or_broken()?;
1183        self.interface.buffered_count(&entity_ref, session)
1184    }
1185
1186    fn buffered_count_for(
1187        &self,
1188        buffer: Entity,
1189        session: Entity,
1190        world: &World,
1191    ) -> Result<usize, OperationError> {
1192        if buffer != self.id() {
1193            return Ok(0);
1194        }
1195
1196        self.buffered_count(session, world)
1197    }
1198
1199    fn add_listener(&self, listener: Entity, world: &mut World) -> OperationResult {
1200        add_listener_to_source(self.id(), listener, world)
1201    }
1202
1203    fn gate_action(
1204        &self,
1205        session: Entity,
1206        action: Gate,
1207        world: &mut World,
1208        roster: &mut OperationRoster,
1209    ) -> OperationResult {
1210        GateState::apply(self.id(), session, action, world, roster)
1211    }
1212
1213    fn as_input(&self) -> SmallVec<[Entity; 8]> {
1214        SmallVec::from_iter([self.id()])
1215    }
1216
1217    fn ensure_active_session(&self, session: Entity, world: &mut World) -> OperationResult {
1218        let mut entity_mut = world.get_entity_mut(self.id()).or_broken()?;
1219        self.interface.ensure_session(&mut entity_mut, session)
1220    }
1221}
1222
1223impl Joining for AnyBuffer {
1224    type Item = AnyMessageBox;
1225    fn fetch_for_join(
1226        &self,
1227        session: Entity,
1228        world: &mut World,
1229    ) -> Result<Self::Item, OperationError> {
1230        match self.join_behavior {
1231            JoinBehavior::Pull => {
1232                let mut buffer_mut = world.get_entity_mut(self.id()).or_broken()?;
1233                self.interface.pull(&mut buffer_mut, session)
1234            }
1235            JoinBehavior::Clone => {
1236                let buffer_ref = world.get_entity(self.id()).or_broken()?;
1237                self.interface.clone_from_buffer(&buffer_ref, session)
1238            }
1239        }
1240    }
1241}
1242
1243impl Accessing for AnyBuffer {
1244    type Key = AnyBufferKey;
1245    fn add_accessor(&self, accessor: Entity, world: &mut World) -> OperationResult {
1246        world
1247            .get_mut::<BufferAccessors>(self.id())
1248            .or_broken()?
1249            .add_accessor(accessor);
1250        Ok(())
1251    }
1252
1253    fn create_key(&self, builder: &super::BufferKeyBuilder) -> Self::Key {
1254        AnyBufferKey {
1255            tag: builder.make_tag(self.id()),
1256            interface: self.interface,
1257        }
1258    }
1259
1260    fn deep_clone_key(key: &Self::Key) -> Self::Key {
1261        key.deep_clone()
1262    }
1263
1264    fn is_key_in_use(key: &Self::Key) -> bool {
1265        key.is_in_use()
1266    }
1267}
1268
1269#[cfg(test)]
1270mod tests {
1271    use crate::{prelude::*, testing::*};
1272    use bevy_ecs::prelude::World;
1273
1274    #[test]
1275    fn test_any_count() {
1276        let mut context = TestingContext::minimal_plugins();
1277
1278        let workflow = context.spawn_io_workflow(|scope, builder| {
1279            let buffer = builder.create_buffer(BufferSettings::keep_all());
1280            let push_multiple_times = builder
1281                .commands()
1282                .spawn_service(push_multiple_times_into_buffer.into_blocking_service());
1283            let count = builder
1284                .commands()
1285                .spawn_service(get_buffer_count.into_blocking_service());
1286
1287            builder
1288                .chain(scope.start)
1289                .with_access(buffer)
1290                .then(push_multiple_times)
1291                .then(count)
1292                .connect(scope.terminate);
1293        });
1294
1295        let count = context.resolve_request(1, workflow);
1296        assert_eq!(count, 5);
1297    }
1298
1299    fn push_multiple_times_into_buffer(
1300        In((value, key)): In<(usize, BufferKey<usize>)>,
1301        mut access: BufferAccessMut<usize>,
1302    ) -> AnyBufferKey {
1303        let mut buffer = access.get_mut(&key).unwrap();
1304        for _ in 0..5 {
1305            buffer.push(value);
1306        }
1307
1308        key.into()
1309    }
1310
1311    fn get_buffer_count(In(key): In<AnyBufferKey>, world: &mut World) -> usize {
1312        world.any_buffer_view(&key).unwrap().len()
1313    }
1314
1315    #[test]
1316    fn test_modify_any_message() {
1317        let mut context = TestingContext::minimal_plugins();
1318
1319        let workflow = context.spawn_io_workflow(|scope, builder| {
1320            let buffer = builder.create_buffer(BufferSettings::keep_all());
1321            let push_multiple_times = builder
1322                .commands()
1323                .spawn_service(push_multiple_times_into_buffer.into_blocking_service());
1324            let modify_content = builder
1325                .commands()
1326                .spawn_service(modify_buffer_content.into_blocking_service());
1327            let drain_content = builder
1328                .commands()
1329                .spawn_service(pull_each_buffer_item.into_blocking_service());
1330
1331            builder
1332                .chain(scope.start)
1333                .with_access(buffer)
1334                .then(push_multiple_times)
1335                .then(modify_content)
1336                .then(drain_content)
1337                .connect(scope.terminate);
1338        });
1339
1340        let values = context.resolve_request(3, workflow);
1341        assert_eq!(values, vec![0, 3, 6, 9, 12]);
1342    }
1343
1344    fn modify_buffer_content(In(key): In<AnyBufferKey>, world: &mut World) -> AnyBufferKey {
1345        world
1346            .any_buffer_mut(&key, |mut access| {
1347                for i in 0..access.len() {
1348                    access.get_mut(i).map(|value| {
1349                        *value.downcast_mut::<usize>().unwrap() *= i;
1350                    });
1351                }
1352            })
1353            .unwrap();
1354
1355        key
1356    }
1357
1358    fn pull_each_buffer_item(In(key): In<AnyBufferKey>, world: &mut World) -> Vec<usize> {
1359        world
1360            .any_buffer_mut(&key, |mut access| {
1361                let mut values = Vec::new();
1362                while let Some(value) = access.pull() {
1363                    values.push(*value.downcast::<usize>().unwrap());
1364                }
1365                values
1366            })
1367            .unwrap()
1368    }
1369
1370    #[test]
1371    fn test_drain_any_message() {
1372        let mut context = TestingContext::minimal_plugins();
1373
1374        let workflow = context.spawn_io_workflow(|scope, builder| {
1375            let buffer = builder.create_buffer(BufferSettings::keep_all());
1376            let push_multiple_times = builder
1377                .commands()
1378                .spawn_service(push_multiple_times_into_buffer.into_blocking_service());
1379            let modify_content = builder
1380                .commands()
1381                .spawn_service(modify_buffer_content.into_blocking_service());
1382            let drain_content = builder
1383                .commands()
1384                .spawn_service(drain_buffer_contents.into_blocking_service());
1385
1386            builder
1387                .chain(scope.start)
1388                .with_access(buffer)
1389                .then(push_multiple_times)
1390                .then(modify_content)
1391                .then(drain_content)
1392                .connect(scope.terminate);
1393        });
1394
1395        let values = context.resolve_request(3, workflow);
1396        assert_eq!(values, vec![0, 3, 6, 9, 12]);
1397    }
1398
1399    fn drain_buffer_contents(In(key): In<AnyBufferKey>, world: &mut World) -> Vec<usize> {
1400        world
1401            .any_buffer_mut(&key, |mut access| {
1402                access
1403                    .drain(..)
1404                    .map(|value| *value.downcast::<usize>().unwrap())
1405                    .collect()
1406            })
1407            .unwrap()
1408    }
1409
1410    #[test]
1411    fn double_any_messages() {
1412        let mut context = TestingContext::minimal_plugins();
1413
1414        let workflow =
1415            context.spawn_io_workflow(|scope: Scope<(u32, i32, f32), (u32, i32, f32)>, builder| {
1416                let buffer_u32: AnyBuffer = builder
1417                    .create_buffer::<u32>(BufferSettings::default())
1418                    .into();
1419                let buffer_i32: AnyBuffer = builder
1420                    .create_buffer::<i32>(BufferSettings::default())
1421                    .into();
1422                let buffer_f32: AnyBuffer = builder
1423                    .create_buffer::<f32>(BufferSettings::default())
1424                    .into();
1425
1426                let (input_u32, input_i32, input_f32) = builder.chain(scope.start).unzip();
1427                builder.chain(input_u32).map_block(|v| 2 * v).connect(
1428                    buffer_u32
1429                        .downcast_for_message::<u32>()
1430                        .unwrap()
1431                        .input_slot(),
1432                );
1433
1434                builder.chain(input_i32).map_block(|v| 2 * v).connect(
1435                    buffer_i32
1436                        .downcast_for_message::<i32>()
1437                        .unwrap()
1438                        .input_slot(),
1439                );
1440
1441                builder.chain(input_f32).map_block(|v| 2.0 * v).connect(
1442                    buffer_f32
1443                        .downcast_for_message::<f32>()
1444                        .unwrap()
1445                        .input_slot(),
1446                );
1447
1448                (buffer_u32, buffer_i32, buffer_f32)
1449                    .join(builder)
1450                    .map_block(|(value_u32, value_i32, value_f32)| {
1451                        (
1452                            *value_u32.downcast::<u32>().unwrap(),
1453                            *value_i32.downcast::<i32>().unwrap(),
1454                            *value_f32.downcast::<f32>().unwrap(),
1455                        )
1456                    })
1457                    .connect(scope.terminate);
1458            });
1459
1460        let r = context.resolve_request((1u32, 2i32, 3f32), workflow);
1461        let (v_u32, v_i32, v_f32) = r;
1462        assert_eq!(v_u32, 2);
1463        assert_eq!(v_i32, 4);
1464        assert_eq!(v_f32, 6.0);
1465    }
1466}