bevy_async_ecs/
command.rs1use crate::die;
2use async_channel::{Receiver, Sender, TryRecvError};
3use bevy_ecs::prelude::*;
4use bevy_ecs::world::CommandQueue;
5use bevy_log::tracing::debug;
6use std::fmt;
7
8pub struct BoxedCommand(CommandQueue);
10
11impl BoxedCommand {
12 pub fn new<C: Command>(inner: C) -> Self {
14 Self({
15 let mut queue = CommandQueue::default();
16 queue.push(inner);
17 queue
18 })
19 }
20}
21
22impl fmt::Debug for BoxedCommand {
23 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24 f.debug_struct("BoxedCommand").finish()
25 }
26}
27
28impl From<BoxedCommand> for CommandQueue {
29 fn from(boxed: BoxedCommand) -> Self {
30 boxed.0
31 }
32}
33
34impl Command for BoxedCommand {
35 fn apply(mut self, world: &mut World) {
36 self.0.apply(world);
37 }
38}
39
40pub struct CommandQueueBuilder {
45 inner: CommandQueue,
46 sender: CommandQueueSender,
47}
48
49impl CommandQueueBuilder {
50 pub(crate) fn new(sender: CommandQueueSender) -> Self {
51 let inner = CommandQueue::default();
52 Self { inner, sender }
53 }
54
55 pub fn push<C: Command>(mut self, command: C) -> Self {
59 self.inner.push(command);
60 self
61 }
62
63 pub async fn apply(self) {
67 self.sender.send_queue(self.inner).await;
68 }
69
70 pub fn build(self) -> CommandQueue {
75 self.inner
76 }
77}
78
79impl fmt::Debug for CommandQueueBuilder {
80 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
81 f.debug_struct("CommandQueueBuilder")
82 .field("inner", &"[..]")
83 .field("sender", &self.sender)
84 .finish()
85 }
86}
87
88#[derive(Clone, Debug)]
96pub struct CommandQueueSender(Sender<CommandQueue>);
97
98impl CommandQueueSender {
99 pub(crate) fn new(inner: Sender<CommandQueue>) -> Self {
100 Self(inner)
101 }
102
103 pub async fn send_queue(&self, inner_queue: CommandQueue) {
106 self.0.send(inner_queue).await.unwrap_or_else(die)
107 }
108
109 pub async fn send_single(&self, single: BoxedCommand) {
112 self.send_queue(single.into()).await;
113 }
114}
115
116#[derive(Component)]
117pub(crate) struct CommandQueueReceiver(Receiver<CommandQueue>);
118
119impl CommandQueueReceiver {
120 pub(crate) fn new(receiver: Receiver<CommandQueue>) -> Self {
121 Self(receiver)
122 }
123
124 fn enqueue_into(&self, world_queue: &mut WorldCommandQueue) -> Result<(), ()> {
125 loop {
126 match self.0.try_recv() {
127 Ok(command_queue) => world_queue.0.push(command_queue),
128 Err(TryRecvError::Closed) => break Err(()),
129 Err(TryRecvError::Empty) => break Ok(()),
130 }
131 }
132 }
133}
134
135#[derive(Resource)]
136pub(crate) struct WorldCommandQueue(Vec<CommandQueue>);
137
138impl WorldCommandQueue {
139 pub(crate) fn drain(&mut self) -> Vec<CommandQueue> {
140 self.0.drain(..).collect()
141 }
142}
143
144impl Default for WorldCommandQueue {
145 fn default() -> Self {
146 Self(Vec::with_capacity(16))
147 }
148}
149
150pub(crate) fn initialize_command_queue(mut commands: Commands) {
151 commands.init_resource::<WorldCommandQueue>();
152}
153
154pub(crate) fn receive_commands(
155 mut commands: Commands,
156 receivers: Query<(Entity, &CommandQueueReceiver)>,
157 mut queue: ResMut<WorldCommandQueue>,
158) {
159 for (id, receiver) in receivers.iter() {
160 if receiver.enqueue_into(&mut queue).is_err() {
161 commands.entity(id).despawn()
162 }
163 }
164}
165
166pub(crate) fn apply_commands(world: &mut World) {
167 let commands = world.resource_mut::<WorldCommandQueue>().drain();
168 if !commands.is_empty() {
169 debug!("applying {} CommandQueues", commands.len());
170 }
171
172 for mut command in commands {
173 command.apply(world);
174 }
175}
176
177#[cfg(test)]
178mod tests {
179 use crate::util::insert;
180 use crate::wait_for::StartWaitingFor;
181 use crate::{AsyncEcsPlugin, AsyncEntity, AsyncWorld};
182 use bevy::prelude::*;
183 use bevy::tasks::AsyncComputeTaskPool;
184
185 use super::*;
186
187 #[derive(Component)]
188 struct Marker;
189
190 #[derive(Default, Clone, Component)]
191 struct Counter(u8);
192
193 #[test]
194 fn smoke() {
195 let mut app = App::new();
196 app.add_plugins((MinimalPlugins, AsyncEcsPlugin));
197
198 let async_world = AsyncWorld::from_world(app.world_mut());
199 let operation_sender = async_world.sender();
200 let (sender, receiver) = async_channel::bounded(1);
201 let command = BoxedCommand::new(move |world: &mut World| {
202 let id = world.spawn(Marker).id();
203 sender.send_blocking(id).unwrap();
204 });
205 let debugged = format!("{:?}", command);
206
207 AsyncComputeTaskPool::get()
208 .spawn(async move { async_world.apply(command).await })
209 .detach();
210
211 let id = loop {
212 match receiver.try_recv() {
213 Ok(id) => break id,
214 Err(_) => app.update(),
215 }
216 };
217 app.update();
218
219 assert!(app.world().entity(id).get::<Marker>().is_some());
220 assert_eq!("BoxedCommand", debugged);
221 let debugged = format!("{:?}", CommandQueueBuilder::new(operation_sender));
222 assert_eq!(
223 "CommandQueueBuilder { inner: \"[..]\", sender: CommandQueueSender(Sender { .. }) }",
224 debugged
225 );
226 }
227
228 #[test]
229 fn queue() {
230 let mut app = App::new();
231 app.add_plugins((MinimalPlugins, AsyncEcsPlugin));
232
233 let async_world = AsyncWorld::from_world(app.world_mut());
234 let id = app.world_mut().spawn_empty().id();
235 let (start_waiting_for, value_rx) = StartWaitingFor::<Counter>::component(id);
236
237 let fut = async move {
238 async_world
239 .start_queue()
240 .push(insert(id, Counter(3)))
241 .push(start_waiting_for)
242 .apply()
243 .await;
244 };
245 AsyncComputeTaskPool::get().spawn(fut).detach();
246
247 let counter = loop {
248 match value_rx.try_recv() {
249 Ok(value) => break value,
250 Err(_) => app.update(),
251 }
252 };
253
254 assert_eq!(3, counter.0);
255 }
256
257 #[test]
258 fn sender() {
259 let mut app = App::new();
260 app.add_plugins((MinimalPlugins, AsyncEcsPlugin));
261
262 let async_world = AsyncWorld::from_world(app.world_mut());
263 let sender = async_world.sender();
264 let entity = AsyncEntity::new(Entity::PLACEHOLDER, async_world.clone());
265 let other_sender = entity.sender();
266 assert_eq!(4, sender.0.sender_count());
267 assert_eq!(4, other_sender.0.sender_count());
268 }
269}