bevy_async_ecs/
command.rs

1use crate::die;
2use async_channel::Receiver;
3use async_channel::Sender;
4use async_channel::TryRecvError;
5use bevy_ecs::prelude::*;
6use bevy_ecs::world::CommandQueue;
7use bevy_log::tracing::debug;
8use std::fmt;
9
10/// The object-safe equivalent of a `Box<dyn Command>`.
11pub struct BoxedCommand(CommandQueue);
12
13impl BoxedCommand {
14	/// Constructs a new `BoxedCommand` from the given Bevy command.
15	pub fn new<C: Command>(inner: C) -> Self {
16		Self({
17			let mut queue = CommandQueue::default();
18			queue.push(inner);
19			queue
20		})
21	}
22}
23
24impl fmt::Debug for BoxedCommand {
25	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
26		f.debug_struct("BoxedCommand").finish()
27	}
28}
29
30impl From<BoxedCommand> for CommandQueue {
31	fn from(boxed: BoxedCommand) -> Self {
32		boxed.0
33	}
34}
35
36impl Command for BoxedCommand {
37	fn apply(mut self, world: &mut World) {
38		self.0.apply(world);
39	}
40}
41
42/// Builds a `CommandQueue` that can be applied to the world that the builder was
43/// constructed from.
44///
45/// The easiest way to get a `CommandQueueBuilder` is with `AsyncWorld::start_queue()`
46pub struct CommandQueueBuilder {
47	inner: CommandQueue,
48	sender: CommandQueueSender,
49}
50
51impl CommandQueueBuilder {
52	pub(crate) fn new(sender: CommandQueueSender) -> Self {
53		let inner = CommandQueue::default();
54		Self { inner, sender }
55	}
56
57	/// Push a command into the `CommandQueue`.
58	///
59	/// This function is meant to be chained.
60	pub fn push<C: Command>(mut self, command: C) -> Self {
61		self.inner.push(command);
62		self
63	}
64
65	/// Apply the `CommandQueue` to the world it was constructed from.
66	///
67	/// This function is meant to be the end of the chain.
68	pub async fn apply(self) {
69		self.sender.send_queue(self.inner).await;
70	}
71
72	/// Return the built `CommandQueue` _without_ applying it to the world it was
73	/// constructed from.
74	///
75	/// This function is meant to be the end of the chain.
76	pub fn build(self) -> CommandQueue {
77		self.inner
78	}
79}
80
81impl fmt::Debug for CommandQueueBuilder {
82	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
83		f.debug_struct("CommandQueueBuilder")
84			.field("inner", &"[..]")
85			.field("sender", &self.sender)
86			.finish()
87	}
88}
89
90/// Use this to send commands (stored in `CommandQueue`s) directly to the Bevy World, where they will
91/// be applied during the Last schedule.
92///
93/// This sender internally operates on `CommandQueue`s rather than individual commands.
94/// Single commands can still be sent with `CommandQueueSender::send_single()`.
95///
96/// The easiest way to get a `CommandQueueSender` is with `AsyncWorld::sender()`.
97#[derive(Clone, Debug)]
98pub struct CommandQueueSender(Sender<CommandQueue>);
99
100impl CommandQueueSender {
101	pub(crate) fn new(inner: Sender<CommandQueue>) -> Self {
102		Self(inner)
103	}
104
105	/// Sends an `CommandQueue` directly to the Bevy `World`, where they will be applied during
106	/// the `Last` schedule.
107	pub async fn send_queue(&self, inner_queue: CommandQueue) {
108		self.0.send(inner_queue).await.unwrap_or_else(die)
109	}
110
111	/// Sends a (boxed) `Command` directly to the Bevy `World`, where they it be applied during
112	/// the `Last` schedule.
113	pub async fn send_single(&self, single: BoxedCommand) {
114		self.send_queue(single.into()).await;
115	}
116}
117
118#[derive(Component)]
119pub(crate) struct CommandQueueReceiver(Receiver<CommandQueue>);
120
121impl CommandQueueReceiver {
122	pub(crate) fn new(receiver: Receiver<CommandQueue>) -> Self {
123		Self(receiver)
124	}
125
126	fn enqueue_into(&self, world_queue: &mut Vec<CommandQueue>) -> Result<(), ()> {
127		loop {
128			match self.0.try_recv() {
129				Ok(command_queue) => world_queue.push(command_queue),
130				Err(TryRecvError::Closed) => break Err(()),
131				Err(TryRecvError::Empty) => break Ok(()),
132			}
133		}
134	}
135}
136
137pub(crate) fn receive_and_apply_commands(world: &mut World) {
138	let mut buffer = Vec::new();
139	let mut closed_receivers = Vec::new();
140
141	let mut query = world.query::<(Entity, &CommandQueueReceiver)>();
142	for (id, receiver) in query.iter(world) {
143		if receiver.enqueue_into(&mut buffer).is_err() {
144			closed_receivers.push(id);
145		}
146	}
147
148	if !buffer.is_empty() {
149		debug!("applying {} CommandQueues", buffer.len());
150	}
151
152	for mut queue in buffer {
153		queue.apply(world);
154	}
155
156	for closed_receiver in closed_receivers {
157		world.despawn(closed_receiver);
158	}
159}
160
161#[cfg(test)]
162mod tests {
163	use crate::AsyncEcsPlugin;
164	use crate::AsyncEntity;
165	use crate::AsyncWorld;
166	use crate::util::insert;
167	use crate::wait_for::StartWaitingFor;
168	use bevy::prelude::*;
169	use bevy::tasks::AsyncComputeTaskPool;
170
171	use super::*;
172
173	#[derive(Component)]
174	struct Marker;
175
176	#[derive(Default, Clone, Component)]
177	struct Counter(u8);
178
179	#[test]
180	fn smoke() {
181		let mut app = App::new();
182		app.add_plugins((MinimalPlugins, AsyncEcsPlugin));
183
184		let async_world = AsyncWorld::from_world(app.world_mut());
185		let operation_sender = async_world.sender();
186		let (sender, receiver) = async_channel::bounded(1);
187		let command = BoxedCommand::new(move |world: &mut World| {
188			let id = world.spawn(Marker).id();
189			sender.send_blocking(id).unwrap();
190		});
191		let debugged = format!("{:?}", command);
192
193		AsyncComputeTaskPool::get()
194			.spawn(async move { async_world.apply(command).await })
195			.detach();
196
197		let id = loop {
198			match receiver.try_recv() {
199				Ok(id) => break id,
200				Err(_) => app.update(),
201			}
202		};
203		app.update();
204
205		assert!(app.world().entity(id).get::<Marker>().is_some());
206		assert_eq!("BoxedCommand", debugged);
207		let debugged = format!("{:?}", CommandQueueBuilder::new(operation_sender));
208		assert_eq!(
209			"CommandQueueBuilder { inner: \"[..]\", sender: CommandQueueSender(Sender { .. }) }",
210			debugged
211		);
212	}
213
214	#[test]
215	fn queue() {
216		let mut app = App::new();
217		app.add_plugins((MinimalPlugins, AsyncEcsPlugin));
218
219		let async_world = AsyncWorld::from_world(app.world_mut());
220		let id = app.world_mut().spawn_empty().id();
221		let (start_waiting_for, value_rx) = StartWaitingFor::<Counter>::component(id);
222
223		let fut = async move {
224			async_world
225				.start_queue()
226				.push(insert(id, Counter(3)))
227				.push(start_waiting_for)
228				.apply()
229				.await;
230		};
231		AsyncComputeTaskPool::get().spawn(fut).detach();
232
233		let counter = loop {
234			match value_rx.try_recv() {
235				Ok(value) => break value,
236				Err(_) => app.update(),
237			}
238		};
239
240		assert_eq!(3, counter.0);
241	}
242
243	#[test]
244	fn sender() {
245		let mut app = App::new();
246		app.add_plugins((MinimalPlugins, AsyncEcsPlugin));
247
248		let async_world = AsyncWorld::from_world(app.world_mut());
249		let sender = async_world.sender();
250		let entity = AsyncEntity::new(Entity::PLACEHOLDER, async_world.clone());
251		let other_sender = entity.sender();
252		assert_eq!(4, sender.0.sender_count());
253		assert_eq!(4, other_sender.0.sender_count());
254	}
255}