bevy_ecs/message/
messages.rs

1use crate::{
2    change_detection::MaybeLocation,
3    message::{Message, MessageCursor, MessageId, MessageInstance},
4    resource::Resource,
5};
6use alloc::vec::Vec;
7use core::{
8    marker::PhantomData,
9    ops::{Deref, DerefMut},
10};
11#[cfg(feature = "bevy_reflect")]
12use {
13    crate::reflect::ReflectResource,
14    bevy_reflect::{std_traits::ReflectDefault, Reflect},
15};
16
17/// A message collection that represents the messages that occurred within the last two
18/// [`Messages::update`] calls.
19/// Messages can be written to using a [`MessageWriter`]
20/// and are typically cheaply read using a [`MessageReader`].
21///
22/// Each message can be consumed by multiple systems, in parallel,
23/// with consumption tracked by the [`MessageReader`] on a per-system basis.
24///
25/// If no [ordering](https://github.com/bevyengine/bevy/blob/main/examples/ecs/ecs_guide.rs)
26/// is applied between writing and reading systems, there is a risk of a race condition.
27/// This means that whether the messages arrive before or after the next [`Messages::update`] is unpredictable.
28///
29/// This collection is meant to be paired with a system that calls
30/// [`Messages::update`] exactly once per update/frame.
31///
32/// [`message_update_system`] is a system that does this, typically initialized automatically using
33/// [`add_message`](https://docs.rs/bevy/*/bevy/app/struct.App.html#method.add_message).
34/// [`MessageReader`]s are expected to read messages from this collection at least once per loop/frame.
35/// Messages will persist across a single frame boundary and so ordering of message producers and
36/// consumers is not critical (although poorly-planned ordering may cause accumulating lag).
37/// If messages are not handled by the end of the frame after they are updated, they will be
38/// dropped silently.
39///
40/// # Example
41///
42/// ```
43/// use bevy_ecs::message::{Message, Messages};
44///
45/// #[derive(Message)]
46/// struct MyMessage {
47///     value: usize
48/// }
49///
50/// // setup
51/// let mut messages = Messages::<MyMessage>::default();
52/// let mut cursor = messages.get_cursor();
53///
54/// // run this once per update/frame
55/// messages.update();
56///
57/// // somewhere else: write a message
58/// messages.write(MyMessage { value: 1 });
59///
60/// // somewhere else: read the messages
61/// for message in cursor.read(&messages) {
62///     assert_eq!(message.value, 1)
63/// }
64///
65/// // messages are only processed once per reader
66/// assert_eq!(cursor.read(&messages).count(), 0);
67/// ```
68///
69/// # Details
70///
71/// [`Messages`] is implemented using a variation of a double buffer strategy.
72/// Each call to [`update`](Messages::update) swaps buffers and clears out the oldest one.
73/// - [`MessageReader`]s will read messages from both buffers.
74/// - [`MessageReader`]s that read at least once per update will never drop messages.
75/// - [`MessageReader`]s that read once within two updates might still receive some messages
76/// - [`MessageReader`]s that read after two updates are guaranteed to drop all messages that occurred
77///   before those updates.
78///
79/// The buffers in [`Messages`] will grow indefinitely if [`update`](Messages::update) is never called.
80///
81/// An alternative call pattern would be to call [`update`](Messages::update)
82/// manually across frames to control when messages are cleared.
83/// This complicates consumption and risks ever-expanding memory usage if not cleaned up,
84/// but can be done by adding your message as a resource instead of using
85/// [`add_message`](https://docs.rs/bevy/*/bevy/app/struct.App.html#method.add_message).
86///
87/// [Example usage.](https://github.com/bevyengine/bevy/blob/latest/examples/ecs/message.rs)
88/// [Example usage standalone.](https://github.com/bevyengine/bevy/blob/latest/crates/bevy_ecs/examples/messages.rs)
89///
90/// [`MessageReader`]: super::MessageReader
91/// [`MessageWriter`]: super::MessageWriter
92/// [`message_update_system`]: super::message_update_system
93#[derive(Debug, Resource)]
94#[cfg_attr(feature = "bevy_reflect", derive(Reflect), reflect(Resource, Default))]
95pub struct Messages<E: Message> {
96    /// Holds the oldest still active messages.
97    /// Note that `a.start_message_count + a.len()` should always be equal to `messages_b.start_message_count`.
98    pub(crate) messages_a: MessageSequence<E>,
99    /// Holds the newer messages.
100    pub(crate) messages_b: MessageSequence<E>,
101    pub(crate) message_count: usize,
102}
103
104// Derived Default impl would incorrectly require E: Default
105impl<E: Message> Default for Messages<E> {
106    fn default() -> Self {
107        Self {
108            messages_a: Default::default(),
109            messages_b: Default::default(),
110            message_count: Default::default(),
111        }
112    }
113}
114
115impl<M: Message> Messages<M> {
116    /// Returns the index of the oldest message stored in the message buffer.
117    pub fn oldest_message_count(&self) -> usize {
118        self.messages_a.start_message_count
119    }
120
121    /// Writes an `message` to the current message buffer.
122    /// [`MessageReader`](super::MessageReader)s can then read the message.
123    /// This method returns the [ID](`MessageId`) of the written `message`.
124    #[track_caller]
125    pub fn write(&mut self, message: M) -> MessageId<M> {
126        self.write_with_caller(message, MaybeLocation::caller())
127    }
128
129    pub(crate) fn write_with_caller(&mut self, message: M, caller: MaybeLocation) -> MessageId<M> {
130        let message_id = MessageId {
131            id: self.message_count,
132            caller,
133            _marker: PhantomData,
134        };
135        #[cfg(feature = "detailed_trace")]
136        tracing::trace!("Messages::write() -> id: {}", message_id);
137
138        let message_instance = MessageInstance {
139            message_id,
140            message,
141        };
142
143        self.messages_b.push(message_instance);
144        self.message_count += 1;
145
146        message_id
147    }
148
149    /// Writes a list of `messages` all at once, which can later be read by [`MessageReader`](super::MessageReader)s.
150    /// This is more efficient than writing each message individually.
151    /// This method returns the [IDs](`MessageId`) of the written `messages`.
152    #[track_caller]
153    pub fn write_batch(&mut self, messages: impl IntoIterator<Item = M>) -> WriteBatchIds<M> {
154        let last_count = self.message_count;
155
156        self.extend(messages);
157
158        WriteBatchIds {
159            last_count,
160            message_count: self.message_count,
161            _marker: PhantomData,
162        }
163    }
164
165    /// Writes the default value of the message. Useful when the message is an empty struct.
166    /// This method returns the [ID](`MessageId`) of the written `message`.
167    #[track_caller]
168    pub fn write_default(&mut self) -> MessageId<M>
169    where
170        M: Default,
171    {
172        self.write(Default::default())
173    }
174
175    /// "Sends" an `message` by writing it to the current message buffer.
176    /// [`MessageReader`](super::MessageReader)s can then read the message.
177    /// This method returns the [ID](`MessageId`) of the sent `message`.
178    #[deprecated(since = "0.17.0", note = "Use `Messages<E>::write` instead.")]
179    #[track_caller]
180    pub fn send(&mut self, message: M) -> MessageId<M> {
181        self.write(message)
182    }
183
184    /// Sends a list of `messages` all at once, which can later be read by [`MessageReader`](super::MessageReader)s.
185    /// This is more efficient than sending each message individually.
186    /// This method returns the [IDs](`MessageId`) of the sent `messages`.
187    #[deprecated(since = "0.17.0", note = "Use `Messages<E>::write_batch` instead.")]
188    #[track_caller]
189    pub fn send_batch(&mut self, messages: impl IntoIterator<Item = M>) -> WriteBatchIds<M> {
190        self.write_batch(messages)
191    }
192
193    /// Sends the default value of the message. Useful when the message is an empty struct.
194    /// This method returns the [ID](`MessageId`) of the sent `message`.
195    #[deprecated(since = "0.17.0", note = "Use `Messages<E>::write_default` instead.")]
196    #[track_caller]
197    pub fn send_default(&mut self) -> MessageId<M>
198    where
199        M: Default,
200    {
201        self.write_default()
202    }
203
204    /// Gets a new [`MessageCursor`]. This will include all messages already in the message buffers.
205    pub fn get_cursor(&self) -> MessageCursor<M> {
206        MessageCursor::default()
207    }
208
209    /// Gets a new [`MessageCursor`]. This will ignore all messages already in the message buffers.
210    /// It will read all future messages.
211    pub fn get_cursor_current(&self) -> MessageCursor<M> {
212        MessageCursor {
213            last_message_count: self.message_count,
214            ..Default::default()
215        }
216    }
217
218    /// Swaps the message buffers and clears the oldest message buffer. In general, this should be
219    /// called once per frame/update.
220    ///
221    /// If you need access to the messages that were removed, consider using [`Messages::update_drain`].
222    pub fn update(&mut self) {
223        core::mem::swap(&mut self.messages_a, &mut self.messages_b);
224        self.messages_b.clear();
225        self.messages_b.start_message_count = self.message_count;
226        debug_assert_eq!(
227            self.messages_a.start_message_count + self.messages_a.len(),
228            self.messages_b.start_message_count
229        );
230    }
231
232    /// Swaps the message buffers and drains the oldest message buffer, returning an iterator
233    /// of all messages that were removed. In general, this should be called once per frame/update.
234    ///
235    /// If you do not need to take ownership of the removed messages, use [`Messages::update`] instead.
236    #[must_use = "If you do not need the returned messages, call .update() instead."]
237    pub fn update_drain(&mut self) -> impl Iterator<Item = M> + '_ {
238        core::mem::swap(&mut self.messages_a, &mut self.messages_b);
239        let iter = self.messages_b.messages.drain(..);
240        self.messages_b.start_message_count = self.message_count;
241        debug_assert_eq!(
242            self.messages_a.start_message_count + self.messages_a.len(),
243            self.messages_b.start_message_count
244        );
245
246        iter.map(|e| e.message)
247    }
248
249    #[inline]
250    fn reset_start_message_count(&mut self) {
251        self.messages_a.start_message_count = self.message_count;
252        self.messages_b.start_message_count = self.message_count;
253    }
254
255    /// Removes all messages.
256    #[inline]
257    pub fn clear(&mut self) {
258        self.reset_start_message_count();
259        self.messages_a.clear();
260        self.messages_b.clear();
261    }
262
263    /// Returns the number of messages currently stored in the message buffer.
264    #[inline]
265    pub fn len(&self) -> usize {
266        self.messages_a.len() + self.messages_b.len()
267    }
268
269    /// Returns true if there are no messages currently stored in the message buffer.
270    #[inline]
271    pub fn is_empty(&self) -> bool {
272        self.len() == 0
273    }
274
275    /// Creates a draining iterator that removes all messages.
276    pub fn drain(&mut self) -> impl Iterator<Item = M> + '_ {
277        self.reset_start_message_count();
278
279        // Drain the oldest messages first, then the newest
280        self.messages_a
281            .drain(..)
282            .chain(self.messages_b.drain(..))
283            .map(|i| i.message)
284    }
285
286    /// Iterates over messages that happened since the last "update" call.
287    /// WARNING: You probably don't want to use this call. In most cases you should use an
288    /// [`MessageReader`]. You should only use this if you know you only need to consume messages
289    /// between the last `update()` call and your call to `iter_current_update_messages`.
290    /// If messages happen outside that window, they will not be handled. For example, any messages that
291    /// happen after this call and before the next `update()` call will be dropped.
292    ///
293    /// [`MessageReader`]: super::MessageReader
294    pub fn iter_current_update_messages(&self) -> impl ExactSizeIterator<Item = &M> {
295        self.messages_b.iter().map(|i| &i.message)
296    }
297
298    /// Get a specific message by id if it still exists in the messages buffer.
299    pub fn get_message(&self, id: usize) -> Option<(&M, MessageId<M>)> {
300        if id < self.oldest_message_count() {
301            return None;
302        }
303
304        let sequence = self.sequence(id);
305        let index = id.saturating_sub(sequence.start_message_count);
306
307        sequence
308            .get(index)
309            .map(|instance| (&instance.message, instance.message_id))
310    }
311
312    /// Which message buffer is this message id a part of.
313    fn sequence(&self, id: usize) -> &MessageSequence<M> {
314        if id < self.messages_b.start_message_count {
315            &self.messages_a
316        } else {
317            &self.messages_b
318        }
319    }
320}
321
322impl<E: Message> Extend<E> for Messages<E> {
323    #[track_caller]
324    fn extend<I>(&mut self, iter: I)
325    where
326        I: IntoIterator<Item = E>,
327    {
328        let old_count = self.message_count;
329        let mut message_count = self.message_count;
330        let messages = iter.into_iter().map(|message| {
331            let message_id = MessageId {
332                id: message_count,
333                caller: MaybeLocation::caller(),
334                _marker: PhantomData,
335            };
336            message_count += 1;
337            MessageInstance {
338                message_id,
339                message,
340            }
341        });
342
343        self.messages_b.extend(messages);
344
345        if old_count != message_count {
346            #[cfg(feature = "detailed_trace")]
347            tracing::trace!(
348                "Messages::extend() -> ids: ({}..{})",
349                self.message_count,
350                message_count
351            );
352        }
353
354        self.message_count = message_count;
355    }
356}
357
358#[derive(Debug)]
359#[cfg_attr(feature = "bevy_reflect", derive(Reflect), reflect(Default))]
360pub(crate) struct MessageSequence<E: Message> {
361    pub(crate) messages: Vec<MessageInstance<E>>,
362    pub(crate) start_message_count: usize,
363}
364
365// Derived Default impl would incorrectly require E: Default
366impl<E: Message> Default for MessageSequence<E> {
367    fn default() -> Self {
368        Self {
369            messages: Default::default(),
370            start_message_count: Default::default(),
371        }
372    }
373}
374
375impl<E: Message> Deref for MessageSequence<E> {
376    type Target = Vec<MessageInstance<E>>;
377
378    fn deref(&self) -> &Self::Target {
379        &self.messages
380    }
381}
382
383impl<E: Message> DerefMut for MessageSequence<E> {
384    fn deref_mut(&mut self) -> &mut Self::Target {
385        &mut self.messages
386    }
387}
388
389/// [`Iterator`] over written [`MessageIds`](`MessageId`) from a batch.
390pub struct WriteBatchIds<E> {
391    last_count: usize,
392    message_count: usize,
393    _marker: PhantomData<E>,
394}
395
396/// [`Iterator`] over sent [`MessageIds`](`MessageId`) from a batch.
397#[deprecated(since = "0.17.0", note = "Use `WriteBatchIds` instead.")]
398pub type SendBatchIds<E> = WriteBatchIds<E>;
399
400impl<E: Message> Iterator for WriteBatchIds<E> {
401    type Item = MessageId<E>;
402
403    fn next(&mut self) -> Option<Self::Item> {
404        if self.last_count >= self.message_count {
405            return None;
406        }
407
408        let result = Some(MessageId {
409            id: self.last_count,
410            caller: MaybeLocation::caller(),
411            _marker: PhantomData,
412        });
413
414        self.last_count += 1;
415
416        result
417    }
418}
419
420impl<E: Message> ExactSizeIterator for WriteBatchIds<E> {
421    fn len(&self) -> usize {
422        self.message_count.saturating_sub(self.last_count)
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use crate::message::{Message, Messages};
429
430    #[test]
431    fn iter_current_update_messages_iterates_over_current_messages() {
432        #[derive(Message, Clone)]
433        struct TestMessage;
434
435        let mut test_messages = Messages::<TestMessage>::default();
436
437        // Starting empty
438        assert_eq!(test_messages.len(), 0);
439        assert_eq!(test_messages.iter_current_update_messages().count(), 0);
440        test_messages.update();
441
442        // Writing one message
443        test_messages.write(TestMessage);
444
445        assert_eq!(test_messages.len(), 1);
446        assert_eq!(test_messages.iter_current_update_messages().count(), 1);
447        test_messages.update();
448
449        // Writing two messages on the next frame
450        test_messages.write(TestMessage);
451        test_messages.write(TestMessage);
452
453        assert_eq!(test_messages.len(), 3); // Messages are double-buffered, so we see 1 + 2 = 3
454        assert_eq!(test_messages.iter_current_update_messages().count(), 2);
455        test_messages.update();
456
457        // Writing zero messages
458        assert_eq!(test_messages.len(), 2); // Messages are double-buffered, so we see 2 + 0 = 2
459        assert_eq!(test_messages.iter_current_update_messages().count(), 0);
460    }
461}