bevy_async_ecs/
command.rs

1use crate::die;
2use async_channel::{Receiver, Sender, TryRecvError};
3use bevy_ecs::prelude::*;
4use bevy_ecs::world::CommandQueue;
5use bevy_log::tracing::debug;
6use std::fmt;
7
8/// The object-safe equivalent of a `Box<dyn Command>`.
9pub struct BoxedCommand(CommandQueue);
10
11impl BoxedCommand {
12	/// Constructs a new `BoxedCommand` from the given Bevy command.
13	pub fn new<C: Command>(inner: C) -> Self {
14		Self({
15			let mut queue = CommandQueue::default();
16			queue.push(inner);
17			queue
18		})
19	}
20}
21
22impl fmt::Debug for BoxedCommand {
23	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24		f.debug_struct("BoxedCommand").finish()
25	}
26}
27
28impl From<BoxedCommand> for CommandQueue {
29	fn from(boxed: BoxedCommand) -> Self {
30		boxed.0
31	}
32}
33
34impl Command for BoxedCommand {
35	fn apply(mut self, world: &mut World) {
36		self.0.apply(world);
37	}
38}
39
40/// Builds a `CommandQueue` that can be applied to the world that the builder was
41/// constructed from.
42///
43/// The easiest way to get a `CommandQueueBuilder` is with `AsyncWorld::start_queue()`
44pub struct CommandQueueBuilder {
45	inner: CommandQueue,
46	sender: CommandQueueSender,
47}
48
49impl CommandQueueBuilder {
50	pub(crate) fn new(sender: CommandQueueSender) -> Self {
51		let inner = CommandQueue::default();
52		Self { inner, sender }
53	}
54
55	/// Push a command into the `CommandQueue`.
56	///
57	/// This function is meant to be chained.
58	pub fn push<C: Command>(mut self, command: C) -> Self {
59		self.inner.push(command);
60		self
61	}
62
63	/// Apply the `CommandQueue` to the world it was constructed from.
64	///
65	/// This function is meant to be the end of the chain.
66	pub async fn apply(self) {
67		self.sender.send_queue(self.inner).await;
68	}
69
70	/// Return the built `CommandQueue` _without_ applying it to the world it was
71	/// constructed from.
72	///
73	/// This function is meant to be the end of the chain.
74	pub fn build(self) -> CommandQueue {
75		self.inner
76	}
77}
78
79impl fmt::Debug for CommandQueueBuilder {
80	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
81		f.debug_struct("CommandQueueBuilder")
82			.field("inner", &"[..]")
83			.field("sender", &self.sender)
84			.finish()
85	}
86}
87
88/// Use this to send commands (stored in `CommandQueue`s) directly to the Bevy World, where they will
89/// be applied during the Last schedule.
90///
91/// This sender internally operates on `CommandQueue`s rather than individual commands.
92/// Single commands can still be sent with `CommandQueueSender::send_single()`.
93///
94/// The easiest way to get a `CommandQueueSender` is with `AsyncWorld::sender()`.
95#[derive(Clone, Debug)]
96pub struct CommandQueueSender(Sender<CommandQueue>);
97
98impl CommandQueueSender {
99	pub(crate) fn new(inner: Sender<CommandQueue>) -> Self {
100		Self(inner)
101	}
102
103	/// Sends an `CommandQueue` directly to the Bevy `World`, where they will be applied during
104	/// the `Last` schedule.
105	pub async fn send_queue(&self, inner_queue: CommandQueue) {
106		self.0.send(inner_queue).await.unwrap_or_else(die)
107	}
108
109	/// Sends a (boxed) `Command` directly to the Bevy `World`, where they it be applied during
110	/// the `Last` schedule.
111	pub async fn send_single(&self, single: BoxedCommand) {
112		self.send_queue(single.into()).await;
113	}
114}
115
116#[derive(Component)]
117pub(crate) struct CommandQueueReceiver(Receiver<CommandQueue>);
118
119impl CommandQueueReceiver {
120	pub(crate) fn new(receiver: Receiver<CommandQueue>) -> Self {
121		Self(receiver)
122	}
123
124	fn enqueue_into(&self, world_queue: &mut WorldCommandQueue) -> Result<(), ()> {
125		loop {
126			match self.0.try_recv() {
127				Ok(command_queue) => world_queue.0.push(command_queue),
128				Err(TryRecvError::Closed) => break Err(()),
129				Err(TryRecvError::Empty) => break Ok(()),
130			}
131		}
132	}
133}
134
135#[derive(Resource)]
136pub(crate) struct WorldCommandQueue(Vec<CommandQueue>);
137
138impl WorldCommandQueue {
139	pub(crate) fn drain(&mut self) -> Vec<CommandQueue> {
140		self.0.drain(..).collect()
141	}
142}
143
144impl Default for WorldCommandQueue {
145	fn default() -> Self {
146		Self(Vec::with_capacity(16))
147	}
148}
149
150pub(crate) fn initialize_command_queue(mut commands: Commands) {
151	commands.init_resource::<WorldCommandQueue>();
152}
153
154pub(crate) fn receive_commands(
155	mut commands: Commands,
156	receivers: Query<(Entity, &CommandQueueReceiver)>,
157	mut queue: ResMut<WorldCommandQueue>,
158) {
159	for (id, receiver) in receivers.iter() {
160		if receiver.enqueue_into(&mut queue).is_err() {
161			commands.entity(id).despawn()
162		}
163	}
164}
165
166pub(crate) fn apply_commands(world: &mut World) {
167	let commands = world.resource_mut::<WorldCommandQueue>().drain();
168	if !commands.is_empty() {
169		debug!("applying {} CommandQueues", commands.len());
170	}
171
172	for mut command in commands {
173		command.apply(world);
174	}
175}
176
177#[cfg(test)]
178mod tests {
179	use crate::util::insert;
180	use crate::wait_for::StartWaitingFor;
181	use crate::{AsyncEcsPlugin, AsyncEntity, AsyncWorld};
182	use bevy::prelude::*;
183	use bevy::tasks::AsyncComputeTaskPool;
184
185	use super::*;
186
187	#[derive(Component)]
188	struct Marker;
189
190	#[derive(Default, Clone, Component)]
191	struct Counter(u8);
192
193	#[test]
194	fn smoke() {
195		let mut app = App::new();
196		app.add_plugins((MinimalPlugins, AsyncEcsPlugin));
197
198		let async_world = AsyncWorld::from_world(app.world_mut());
199		let operation_sender = async_world.sender();
200		let (sender, receiver) = async_channel::bounded(1);
201		let command = BoxedCommand::new(move |world: &mut World| {
202			let id = world.spawn(Marker).id();
203			sender.send_blocking(id).unwrap();
204		});
205		let debugged = format!("{:?}", command);
206
207		AsyncComputeTaskPool::get()
208			.spawn(async move { async_world.apply(command).await })
209			.detach();
210
211		let id = loop {
212			match receiver.try_recv() {
213				Ok(id) => break id,
214				Err(_) => app.update(),
215			}
216		};
217		app.update();
218
219		assert!(app.world().entity(id).get::<Marker>().is_some());
220		assert_eq!("BoxedCommand", debugged);
221		let debugged = format!("{:?}", CommandQueueBuilder::new(operation_sender));
222		assert_eq!(
223			"CommandQueueBuilder { inner: \"[..]\", sender: CommandQueueSender(Sender { .. }) }",
224			debugged
225		);
226	}
227
228	#[test]
229	fn queue() {
230		let mut app = App::new();
231		app.add_plugins((MinimalPlugins, AsyncEcsPlugin));
232
233		let async_world = AsyncWorld::from_world(app.world_mut());
234		let id = app.world_mut().spawn_empty().id();
235		let (start_waiting_for, value_rx) = StartWaitingFor::<Counter>::component(id);
236
237		let fut = async move {
238			async_world
239				.start_queue()
240				.push(insert(id, Counter(3)))
241				.push(start_waiting_for)
242				.apply()
243				.await;
244		};
245		AsyncComputeTaskPool::get().spawn(fut).detach();
246
247		let counter = loop {
248			match value_rx.try_recv() {
249				Ok(value) => break value,
250				Err(_) => app.update(),
251			}
252		};
253
254		assert_eq!(3, counter.0);
255	}
256
257	#[test]
258	fn sender() {
259		let mut app = App::new();
260		app.add_plugins((MinimalPlugins, AsyncEcsPlugin));
261
262		let async_world = AsyncWorld::from_world(app.world_mut());
263		let sender = async_world.sender();
264		let entity = AsyncEntity::new(Entity::PLACEHOLDER, async_world.clone());
265		let other_sender = entity.sender();
266		assert_eq!(4, sender.0.sender_count());
267		assert_eq!(4, other_sender.0.sender_count());
268	}
269}