bevy_ecs/message/messages.rs
1use crate::{
2 change_detection::MaybeLocation,
3 message::{Message, MessageCursor, MessageId, MessageInstance},
4 resource::Resource,
5};
6use alloc::vec::Vec;
7use core::{
8 marker::PhantomData,
9 ops::{Deref, DerefMut},
10};
11#[cfg(feature = "bevy_reflect")]
12use {
13 crate::reflect::ReflectResource,
14 bevy_reflect::{std_traits::ReflectDefault, Reflect},
15};
16
17/// A message collection that represents the messages that occurred within the last two
18/// [`Messages::update`] calls.
19/// Messages can be written to using a [`MessageWriter`]
20/// and are typically cheaply read using a [`MessageReader`].
21///
22/// Each message can be consumed by multiple systems, in parallel,
23/// with consumption tracked by the [`MessageReader`] on a per-system basis.
24///
25/// If no [ordering](https://github.com/bevyengine/bevy/blob/main/examples/ecs/ecs_guide.rs)
26/// is applied between writing and reading systems, there is a risk of a race condition.
27/// This means that whether the messages arrive before or after the next [`Messages::update`] is unpredictable.
28///
29/// This collection is meant to be paired with a system that calls
30/// [`Messages::update`] exactly once per update/frame.
31///
32/// [`message_update_system`] is a system that does this, typically initialized automatically using
33/// [`add_message`](https://docs.rs/bevy/*/bevy/app/struct.App.html#method.add_message).
34/// [`MessageReader`]s are expected to read messages from this collection at least once per loop/frame.
35/// Messages will persist across a single frame boundary and so ordering of message producers and
36/// consumers is not critical (although poorly-planned ordering may cause accumulating lag).
37/// If messages are not handled by the end of the frame after they are updated, they will be
38/// dropped silently.
39///
40/// # Example
41///
42/// ```
43/// use bevy_ecs::message::{Message, Messages};
44///
45/// #[derive(Message)]
46/// struct MyMessage {
47/// value: usize
48/// }
49///
50/// // setup
51/// let mut messages = Messages::<MyMessage>::default();
52/// let mut cursor = messages.get_cursor();
53///
54/// // run this once per update/frame
55/// messages.update();
56///
57/// // somewhere else: write a message
58/// messages.write(MyMessage { value: 1 });
59///
60/// // somewhere else: read the messages
61/// for message in cursor.read(&messages) {
62/// assert_eq!(message.value, 1)
63/// }
64///
65/// // messages are only processed once per reader
66/// assert_eq!(cursor.read(&messages).count(), 0);
67/// ```
68///
69/// # Details
70///
71/// [`Messages`] is implemented using a variation of a double buffer strategy.
72/// Each call to [`update`](Messages::update) swaps buffers and clears out the oldest one.
73/// - [`MessageReader`]s will read messages from both buffers.
74/// - [`MessageReader`]s that read at least once per update will never drop messages.
75/// - [`MessageReader`]s that read once within two updates might still receive some messages
76/// - [`MessageReader`]s that read after two updates are guaranteed to drop all messages that occurred
77/// before those updates.
78///
79/// The buffers in [`Messages`] will grow indefinitely if [`update`](Messages::update) is never called.
80///
81/// An alternative call pattern would be to call [`update`](Messages::update)
82/// manually across frames to control when messages are cleared.
83/// This complicates consumption and risks ever-expanding memory usage if not cleaned up,
84/// but can be done by adding your message as a resource instead of using
85/// [`add_message`](https://docs.rs/bevy/*/bevy/app/struct.App.html#method.add_message).
86///
87/// [Example usage.](https://github.com/bevyengine/bevy/blob/latest/examples/ecs/message.rs)
88/// [Example usage standalone.](https://github.com/bevyengine/bevy/blob/latest/crates/bevy_ecs/examples/messages.rs)
89///
90/// [`MessageReader`]: super::MessageReader
91/// [`MessageWriter`]: super::MessageWriter
92/// [`message_update_system`]: super::message_update_system
93#[derive(Debug, Resource)]
94#[cfg_attr(feature = "bevy_reflect", derive(Reflect), reflect(Resource, Default))]
95pub struct Messages<E: Message> {
96 /// Holds the oldest still active messages.
97 /// Note that `a.start_message_count + a.len()` should always be equal to `messages_b.start_message_count`.
98 pub(crate) messages_a: MessageSequence<E>,
99 /// Holds the newer messages.
100 pub(crate) messages_b: MessageSequence<E>,
101 pub(crate) message_count: usize,
102}
103
104// Derived Default impl would incorrectly require E: Default
105impl<E: Message> Default for Messages<E> {
106 fn default() -> Self {
107 Self {
108 messages_a: Default::default(),
109 messages_b: Default::default(),
110 message_count: Default::default(),
111 }
112 }
113}
114
115impl<M: Message> Messages<M> {
116 /// Returns the index of the oldest message stored in the message buffer.
117 pub fn oldest_message_count(&self) -> usize {
118 self.messages_a.start_message_count
119 }
120
121 /// Writes an `message` to the current message buffer.
122 /// [`MessageReader`](super::MessageReader)s can then read the message.
123 /// This method returns the [ID](`MessageId`) of the written `message`.
124 #[track_caller]
125 pub fn write(&mut self, message: M) -> MessageId<M> {
126 self.write_with_caller(message, MaybeLocation::caller())
127 }
128
129 pub(crate) fn write_with_caller(&mut self, message: M, caller: MaybeLocation) -> MessageId<M> {
130 let message_id = MessageId {
131 id: self.message_count,
132 caller,
133 _marker: PhantomData,
134 };
135 #[cfg(feature = "detailed_trace")]
136 tracing::trace!("Messages::write() -> id: {}", message_id);
137
138 let message_instance = MessageInstance {
139 message_id,
140 message,
141 };
142
143 self.messages_b.push(message_instance);
144 self.message_count += 1;
145
146 message_id
147 }
148
149 /// Writes a list of `messages` all at once, which can later be read by [`MessageReader`](super::MessageReader)s.
150 /// This is more efficient than writing each message individually.
151 /// This method returns the [IDs](`MessageId`) of the written `messages`.
152 #[track_caller]
153 pub fn write_batch(&mut self, messages: impl IntoIterator<Item = M>) -> WriteBatchIds<M> {
154 let last_count = self.message_count;
155
156 self.extend(messages);
157
158 WriteBatchIds {
159 last_count,
160 message_count: self.message_count,
161 _marker: PhantomData,
162 }
163 }
164
165 /// Writes the default value of the message. Useful when the message is an empty struct.
166 /// This method returns the [ID](`MessageId`) of the written `message`.
167 #[track_caller]
168 pub fn write_default(&mut self) -> MessageId<M>
169 where
170 M: Default,
171 {
172 self.write(Default::default())
173 }
174
175 /// "Sends" an `message` by writing it to the current message buffer.
176 /// [`MessageReader`](super::MessageReader)s can then read the message.
177 /// This method returns the [ID](`MessageId`) of the sent `message`.
178 #[deprecated(since = "0.17.0", note = "Use `Messages<E>::write` instead.")]
179 #[track_caller]
180 pub fn send(&mut self, message: M) -> MessageId<M> {
181 self.write(message)
182 }
183
184 /// Sends a list of `messages` all at once, which can later be read by [`MessageReader`](super::MessageReader)s.
185 /// This is more efficient than sending each message individually.
186 /// This method returns the [IDs](`MessageId`) of the sent `messages`.
187 #[deprecated(since = "0.17.0", note = "Use `Messages<E>::write_batch` instead.")]
188 #[track_caller]
189 pub fn send_batch(&mut self, messages: impl IntoIterator<Item = M>) -> WriteBatchIds<M> {
190 self.write_batch(messages)
191 }
192
193 /// Sends the default value of the message. Useful when the message is an empty struct.
194 /// This method returns the [ID](`MessageId`) of the sent `message`.
195 #[deprecated(since = "0.17.0", note = "Use `Messages<E>::write_default` instead.")]
196 #[track_caller]
197 pub fn send_default(&mut self) -> MessageId<M>
198 where
199 M: Default,
200 {
201 self.write_default()
202 }
203
204 /// Gets a new [`MessageCursor`]. This will include all messages already in the message buffers.
205 pub fn get_cursor(&self) -> MessageCursor<M> {
206 MessageCursor::default()
207 }
208
209 /// Gets a new [`MessageCursor`]. This will ignore all messages already in the message buffers.
210 /// It will read all future messages.
211 pub fn get_cursor_current(&self) -> MessageCursor<M> {
212 MessageCursor {
213 last_message_count: self.message_count,
214 ..Default::default()
215 }
216 }
217
218 /// Swaps the message buffers and clears the oldest message buffer. In general, this should be
219 /// called once per frame/update.
220 ///
221 /// If you need access to the messages that were removed, consider using [`Messages::update_drain`].
222 pub fn update(&mut self) {
223 core::mem::swap(&mut self.messages_a, &mut self.messages_b);
224 self.messages_b.clear();
225 self.messages_b.start_message_count = self.message_count;
226 debug_assert_eq!(
227 self.messages_a.start_message_count + self.messages_a.len(),
228 self.messages_b.start_message_count
229 );
230 }
231
232 /// Swaps the message buffers and drains the oldest message buffer, returning an iterator
233 /// of all messages that were removed. In general, this should be called once per frame/update.
234 ///
235 /// If you do not need to take ownership of the removed messages, use [`Messages::update`] instead.
236 #[must_use = "If you do not need the returned messages, call .update() instead."]
237 pub fn update_drain(&mut self) -> impl Iterator<Item = M> + '_ {
238 core::mem::swap(&mut self.messages_a, &mut self.messages_b);
239 let iter = self.messages_b.messages.drain(..);
240 self.messages_b.start_message_count = self.message_count;
241 debug_assert_eq!(
242 self.messages_a.start_message_count + self.messages_a.len(),
243 self.messages_b.start_message_count
244 );
245
246 iter.map(|e| e.message)
247 }
248
249 #[inline]
250 fn reset_start_message_count(&mut self) {
251 self.messages_a.start_message_count = self.message_count;
252 self.messages_b.start_message_count = self.message_count;
253 }
254
255 /// Removes all messages.
256 #[inline]
257 pub fn clear(&mut self) {
258 self.reset_start_message_count();
259 self.messages_a.clear();
260 self.messages_b.clear();
261 }
262
263 /// Returns the number of messages currently stored in the message buffer.
264 #[inline]
265 pub fn len(&self) -> usize {
266 self.messages_a.len() + self.messages_b.len()
267 }
268
269 /// Returns true if there are no messages currently stored in the message buffer.
270 #[inline]
271 pub fn is_empty(&self) -> bool {
272 self.len() == 0
273 }
274
275 /// Creates a draining iterator that removes all messages.
276 pub fn drain(&mut self) -> impl Iterator<Item = M> + '_ {
277 self.reset_start_message_count();
278
279 // Drain the oldest messages first, then the newest
280 self.messages_a
281 .drain(..)
282 .chain(self.messages_b.drain(..))
283 .map(|i| i.message)
284 }
285
286 /// Iterates over messages that happened since the last "update" call.
287 /// WARNING: You probably don't want to use this call. In most cases you should use an
288 /// [`MessageReader`]. You should only use this if you know you only need to consume messages
289 /// between the last `update()` call and your call to `iter_current_update_messages`.
290 /// If messages happen outside that window, they will not be handled. For example, any messages that
291 /// happen after this call and before the next `update()` call will be dropped.
292 ///
293 /// [`MessageReader`]: super::MessageReader
294 pub fn iter_current_update_messages(&self) -> impl ExactSizeIterator<Item = &M> {
295 self.messages_b.iter().map(|i| &i.message)
296 }
297
298 /// Get a specific message by id if it still exists in the messages buffer.
299 pub fn get_message(&self, id: usize) -> Option<(&M, MessageId<M>)> {
300 if id < self.oldest_message_count() {
301 return None;
302 }
303
304 let sequence = self.sequence(id);
305 let index = id.saturating_sub(sequence.start_message_count);
306
307 sequence
308 .get(index)
309 .map(|instance| (&instance.message, instance.message_id))
310 }
311
312 /// Which message buffer is this message id a part of.
313 fn sequence(&self, id: usize) -> &MessageSequence<M> {
314 if id < self.messages_b.start_message_count {
315 &self.messages_a
316 } else {
317 &self.messages_b
318 }
319 }
320}
321
322impl<E: Message> Extend<E> for Messages<E> {
323 #[track_caller]
324 fn extend<I>(&mut self, iter: I)
325 where
326 I: IntoIterator<Item = E>,
327 {
328 let old_count = self.message_count;
329 let mut message_count = self.message_count;
330 let messages = iter.into_iter().map(|message| {
331 let message_id = MessageId {
332 id: message_count,
333 caller: MaybeLocation::caller(),
334 _marker: PhantomData,
335 };
336 message_count += 1;
337 MessageInstance {
338 message_id,
339 message,
340 }
341 });
342
343 self.messages_b.extend(messages);
344
345 if old_count != message_count {
346 #[cfg(feature = "detailed_trace")]
347 tracing::trace!(
348 "Messages::extend() -> ids: ({}..{})",
349 self.message_count,
350 message_count
351 );
352 }
353
354 self.message_count = message_count;
355 }
356}
357
358#[derive(Debug)]
359#[cfg_attr(feature = "bevy_reflect", derive(Reflect), reflect(Default))]
360pub(crate) struct MessageSequence<E: Message> {
361 pub(crate) messages: Vec<MessageInstance<E>>,
362 pub(crate) start_message_count: usize,
363}
364
365// Derived Default impl would incorrectly require E: Default
366impl<E: Message> Default for MessageSequence<E> {
367 fn default() -> Self {
368 Self {
369 messages: Default::default(),
370 start_message_count: Default::default(),
371 }
372 }
373}
374
375impl<E: Message> Deref for MessageSequence<E> {
376 type Target = Vec<MessageInstance<E>>;
377
378 fn deref(&self) -> &Self::Target {
379 &self.messages
380 }
381}
382
383impl<E: Message> DerefMut for MessageSequence<E> {
384 fn deref_mut(&mut self) -> &mut Self::Target {
385 &mut self.messages
386 }
387}
388
389/// [`Iterator`] over written [`MessageIds`](`MessageId`) from a batch.
390pub struct WriteBatchIds<E> {
391 last_count: usize,
392 message_count: usize,
393 _marker: PhantomData<E>,
394}
395
396/// [`Iterator`] over sent [`MessageIds`](`MessageId`) from a batch.
397#[deprecated(since = "0.17.0", note = "Use `WriteBatchIds` instead.")]
398pub type SendBatchIds<E> = WriteBatchIds<E>;
399
400impl<E: Message> Iterator for WriteBatchIds<E> {
401 type Item = MessageId<E>;
402
403 fn next(&mut self) -> Option<Self::Item> {
404 if self.last_count >= self.message_count {
405 return None;
406 }
407
408 let result = Some(MessageId {
409 id: self.last_count,
410 caller: MaybeLocation::caller(),
411 _marker: PhantomData,
412 });
413
414 self.last_count += 1;
415
416 result
417 }
418}
419
420impl<E: Message> ExactSizeIterator for WriteBatchIds<E> {
421 fn len(&self) -> usize {
422 self.message_count.saturating_sub(self.last_count)
423 }
424}
425
426#[cfg(test)]
427mod tests {
428 use crate::message::{Message, Messages};
429
430 #[test]
431 fn iter_current_update_messages_iterates_over_current_messages() {
432 #[derive(Message, Clone)]
433 struct TestMessage;
434
435 let mut test_messages = Messages::<TestMessage>::default();
436
437 // Starting empty
438 assert_eq!(test_messages.len(), 0);
439 assert_eq!(test_messages.iter_current_update_messages().count(), 0);
440 test_messages.update();
441
442 // Writing one message
443 test_messages.write(TestMessage);
444
445 assert_eq!(test_messages.len(), 1);
446 assert_eq!(test_messages.iter_current_update_messages().count(), 1);
447 test_messages.update();
448
449 // Writing two messages on the next frame
450 test_messages.write(TestMessage);
451 test_messages.write(TestMessage);
452
453 assert_eq!(test_messages.len(), 3); // Messages are double-buffered, so we see 1 + 2 = 3
454 assert_eq!(test_messages.iter_current_update_messages().count(), 2);
455 test_messages.update();
456
457 // Writing zero messages
458 assert_eq!(test_messages.len(), 2); // Messages are double-buffered, so we see 2 + 0 = 2
459 assert_eq!(test_messages.iter_current_update_messages().count(), 0);
460 }
461}