bevy_async_ecs/
command.rs

1use crate::die;
2use async_channel::Receiver;
3use async_channel::Sender;
4use async_channel::TryRecvError;
5use bevy_ecs::prelude::*;
6use bevy_ecs::world::CommandQueue;
7use std::fmt;
8
9/// The object-safe equivalent of a `Box<dyn Command>`.
10pub struct BoxedCommand(CommandQueue);
11
12impl BoxedCommand {
13	/// Constructs a new `BoxedCommand` from the given Bevy command.
14	pub fn new<C: Command>(inner: C) -> Self {
15		Self({
16			let mut queue = CommandQueue::default();
17			queue.push(inner);
18			queue
19		})
20	}
21}
22
23impl fmt::Debug for BoxedCommand {
24	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
25		f.debug_struct("BoxedCommand").finish()
26	}
27}
28
29impl From<BoxedCommand> for CommandQueue {
30	fn from(boxed: BoxedCommand) -> Self {
31		boxed.0
32	}
33}
34
35impl Command for BoxedCommand {
36	fn apply(mut self, world: &mut World) {
37		self.0.apply(world);
38	}
39}
40
41/// Builds a `CommandQueue` that can be applied to the world that the builder was
42/// constructed from.
43///
44/// The easiest way to get a `CommandQueueBuilder` is with `AsyncWorld::start_queue()`
45pub struct CommandQueueBuilder {
46	inner: CommandQueue,
47	sender: CommandQueueSender,
48}
49
50impl CommandQueueBuilder {
51	pub(crate) fn new(sender: CommandQueueSender) -> Self {
52		let inner = CommandQueue::default();
53		Self { inner, sender }
54	}
55
56	/// Push a command into the `CommandQueue`.
57	///
58	/// This function is meant to be chained.
59	pub fn push<C: Command>(mut self, command: C) -> Self {
60		self.inner.push(command);
61		self
62	}
63
64	/// Apply the `CommandQueue` to the world it was constructed from.
65	///
66	/// This function is meant to be the end of the chain.
67	pub async fn apply(self) {
68		self.sender.send_queue(self.inner).await;
69	}
70
71	/// Return the built `CommandQueue` _without_ applying it to the world it was
72	/// constructed from.
73	///
74	/// This function is meant to be the end of the chain.
75	pub fn build(self) -> CommandQueue {
76		self.inner
77	}
78}
79
80impl fmt::Debug for CommandQueueBuilder {
81	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82		f.debug_struct("CommandQueueBuilder")
83			.field("inner", &"[..]")
84			.field("sender", &self.sender)
85			.finish()
86	}
87}
88
89/// Use this to send commands (stored in `CommandQueue`s) directly to the Bevy World, where they will
90/// be applied during the Last schedule.
91///
92/// This sender internally operates on `CommandQueue`s rather than individual commands.
93/// Single commands can still be sent with `CommandQueueSender::send_single()`.
94///
95/// The easiest way to get a `CommandQueueSender` is with `AsyncWorld::sender()`.
96#[derive(Clone, Debug)]
97pub struct CommandQueueSender(Sender<CommandQueue>);
98
99impl CommandQueueSender {
100	pub(crate) fn new(inner: Sender<CommandQueue>) -> Self {
101		Self(inner)
102	}
103
104	/// Sends an `CommandQueue` directly to the Bevy `World`, where they will be applied during
105	/// the `Last` schedule.
106	pub async fn send_queue(&self, inner_queue: CommandQueue) {
107		self.0.send(inner_queue).await.unwrap_or_else(die)
108	}
109
110	/// Sends a (boxed) `Command` directly to the Bevy `World`, where they it be applied during
111	/// the `Last` schedule.
112	pub async fn send_single(&self, single: BoxedCommand) {
113		self.send_queue(single.into()).await;
114	}
115}
116
117#[derive(Component)]
118pub(crate) struct CommandQueueReceiver(Receiver<CommandQueue>);
119
120impl CommandQueueReceiver {
121	pub(crate) fn new(receiver: Receiver<CommandQueue>) -> Self {
122		Self(receiver)
123	}
124}
125
126pub(crate) fn receive_and_apply_commands(
127	mut commands: Commands,
128	receivers: Query<(Entity, &CommandQueueReceiver)>,
129) {
130	for (id, receiver) in receivers.iter() {
131		loop {
132			match receiver.0.try_recv() {
133				Ok(mut command_queue) => commands.append(&mut command_queue),
134				Err(TryRecvError::Empty) => break,
135				Err(TryRecvError::Closed) => {
136					commands.entity(id).despawn();
137					break;
138				}
139			}
140		}
141	}
142}
143
144#[cfg(test)]
145mod tests {
146	use crate::AsyncEcsPlugin;
147	use crate::AsyncEntity;
148	use crate::AsyncWorld;
149	use crate::util::insert;
150	use crate::wait_for::StartWaitingFor;
151	use bevy::prelude::*;
152	use bevy::tasks::AsyncComputeTaskPool;
153
154	use super::*;
155
156	#[derive(Component)]
157	struct Marker;
158
159	#[derive(Default, Clone, Component)]
160	struct Counter(u8);
161
162	#[test]
163	fn smoke() {
164		let mut app = App::new();
165		app.add_plugins((MinimalPlugins, AsyncEcsPlugin));
166
167		let async_world = AsyncWorld::from_world(app.world_mut());
168		let operation_sender = async_world.sender();
169		let (sender, receiver) = async_channel::bounded(1);
170		let command = BoxedCommand::new(move |world: &mut World| {
171			let id = world.spawn(Marker).id();
172			sender.send_blocking(id).unwrap();
173		});
174		let debugged = format!("{:?}", command);
175
176		AsyncComputeTaskPool::get()
177			.spawn(async move { async_world.apply(command).await })
178			.detach();
179
180		let id = loop {
181			match receiver.try_recv() {
182				Ok(id) => break id,
183				Err(_) => app.update(),
184			}
185		};
186		app.update();
187
188		assert!(app.world().entity(id).get::<Marker>().is_some());
189		assert_eq!("BoxedCommand", debugged);
190		let debugged = format!("{:?}", CommandQueueBuilder::new(operation_sender));
191		assert_eq!(
192			"CommandQueueBuilder { inner: \"[..]\", sender: CommandQueueSender(Sender { .. }) }",
193			debugged
194		);
195	}
196
197	#[test]
198	fn queue() {
199		let mut app = App::new();
200		app.add_plugins((MinimalPlugins, AsyncEcsPlugin));
201
202		let async_world = AsyncWorld::from_world(app.world_mut());
203		let id = app.world_mut().spawn_empty().id();
204		let (start_waiting_for, value_rx) = StartWaitingFor::<Counter>::component(id);
205
206		let fut = async move {
207			async_world
208				.start_queue()
209				.push(insert(id, Counter(3)))
210				.push(start_waiting_for)
211				.apply()
212				.await;
213		};
214		AsyncComputeTaskPool::get().spawn(fut).detach();
215
216		let counter = loop {
217			match value_rx.try_recv() {
218				Ok(value) => break value,
219				Err(_) => app.update(),
220			}
221		};
222
223		assert_eq!(3, counter.0);
224	}
225
226	#[test]
227	fn sender() {
228		let mut app = App::new();
229		app.add_plugins((MinimalPlugins, AsyncEcsPlugin));
230
231		let async_world = AsyncWorld::from_world(app.world_mut());
232		let sender = async_world.sender();
233		let entity = AsyncEntity::new(Entity::PLACEHOLDER, async_world.clone());
234		let other_sender = entity.sender();
235		assert_eq!(4, sender.0.sender_count());
236		assert_eq!(4, other_sender.0.sender_count());
237	}
238}