bevy_async_ecs/
command.rs1use crate::die;
2use async_channel::Receiver;
3use async_channel::Sender;
4use async_channel::TryRecvError;
5use bevy_ecs::prelude::*;
6use bevy_ecs::world::CommandQueue;
7use bevy_log::tracing::debug;
8use std::fmt;
9
10pub struct BoxedCommand(CommandQueue);
12
13impl BoxedCommand {
14 pub fn new<C: Command>(inner: C) -> Self {
16 Self({
17 let mut queue = CommandQueue::default();
18 queue.push(inner);
19 queue
20 })
21 }
22}
23
24impl fmt::Debug for BoxedCommand {
25 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
26 f.debug_struct("BoxedCommand").finish()
27 }
28}
29
30impl From<BoxedCommand> for CommandQueue {
31 fn from(boxed: BoxedCommand) -> Self {
32 boxed.0
33 }
34}
35
36impl Command for BoxedCommand {
37 fn apply(mut self, world: &mut World) {
38 self.0.apply(world);
39 }
40}
41
42pub struct CommandQueueBuilder {
47 inner: CommandQueue,
48 sender: CommandQueueSender,
49}
50
51impl CommandQueueBuilder {
52 pub(crate) fn new(sender: CommandQueueSender) -> Self {
53 let inner = CommandQueue::default();
54 Self { inner, sender }
55 }
56
57 pub fn push<C: Command>(mut self, command: C) -> Self {
61 self.inner.push(command);
62 self
63 }
64
65 pub async fn apply(self) {
69 self.sender.send_queue(self.inner).await;
70 }
71
72 pub fn build(self) -> CommandQueue {
77 self.inner
78 }
79}
80
81impl fmt::Debug for CommandQueueBuilder {
82 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
83 f.debug_struct("CommandQueueBuilder")
84 .field("inner", &"[..]")
85 .field("sender", &self.sender)
86 .finish()
87 }
88}
89
90#[derive(Clone, Debug)]
98pub struct CommandQueueSender(Sender<CommandQueue>);
99
100impl CommandQueueSender {
101 pub(crate) fn new(inner: Sender<CommandQueue>) -> Self {
102 Self(inner)
103 }
104
105 pub async fn send_queue(&self, inner_queue: CommandQueue) {
108 self.0.send(inner_queue).await.unwrap_or_else(die)
109 }
110
111 pub async fn send_single(&self, single: BoxedCommand) {
114 self.send_queue(single.into()).await;
115 }
116}
117
118#[derive(Component)]
119pub(crate) struct CommandQueueReceiver(Receiver<CommandQueue>);
120
121impl CommandQueueReceiver {
122 pub(crate) fn new(receiver: Receiver<CommandQueue>) -> Self {
123 Self(receiver)
124 }
125
126 fn enqueue_into(&self, world_queue: &mut Vec<CommandQueue>) -> Result<(), ()> {
127 loop {
128 match self.0.try_recv() {
129 Ok(command_queue) => world_queue.push(command_queue),
130 Err(TryRecvError::Closed) => break Err(()),
131 Err(TryRecvError::Empty) => break Ok(()),
132 }
133 }
134 }
135}
136
137pub(crate) fn receive_and_apply_commands(world: &mut World) {
138 let mut buffer = Vec::new();
139 let mut closed_receivers = Vec::new();
140
141 let mut query = world.query::<(Entity, &CommandQueueReceiver)>();
142 for (id, receiver) in query.iter(world) {
143 if receiver.enqueue_into(&mut buffer).is_err() {
144 closed_receivers.push(id);
145 }
146 }
147
148 if !buffer.is_empty() {
149 debug!("applying {} CommandQueues", buffer.len());
150 }
151
152 for mut queue in buffer {
153 queue.apply(world);
154 }
155
156 for closed_receiver in closed_receivers {
157 world.despawn(closed_receiver);
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 use crate::AsyncEcsPlugin;
164 use crate::AsyncEntity;
165 use crate::AsyncWorld;
166 use crate::util::insert;
167 use crate::wait_for::StartWaitingFor;
168 use bevy::prelude::*;
169 use bevy::tasks::AsyncComputeTaskPool;
170
171 use super::*;
172
173 #[derive(Component)]
174 struct Marker;
175
176 #[derive(Default, Clone, Component)]
177 struct Counter(u8);
178
179 #[test]
180 fn smoke() {
181 let mut app = App::new();
182 app.add_plugins((MinimalPlugins, AsyncEcsPlugin));
183
184 let async_world = AsyncWorld::from_world(app.world_mut());
185 let operation_sender = async_world.sender();
186 let (sender, receiver) = async_channel::bounded(1);
187 let command = BoxedCommand::new(move |world: &mut World| {
188 let id = world.spawn(Marker).id();
189 sender.send_blocking(id).unwrap();
190 });
191 let debugged = format!("{:?}", command);
192
193 AsyncComputeTaskPool::get()
194 .spawn(async move { async_world.apply(command).await })
195 .detach();
196
197 let id = loop {
198 match receiver.try_recv() {
199 Ok(id) => break id,
200 Err(_) => app.update(),
201 }
202 };
203 app.update();
204
205 assert!(app.world().entity(id).get::<Marker>().is_some());
206 assert_eq!("BoxedCommand", debugged);
207 let debugged = format!("{:?}", CommandQueueBuilder::new(operation_sender));
208 assert_eq!(
209 "CommandQueueBuilder { inner: \"[..]\", sender: CommandQueueSender(Sender { .. }) }",
210 debugged
211 );
212 }
213
214 #[test]
215 fn queue() {
216 let mut app = App::new();
217 app.add_plugins((MinimalPlugins, AsyncEcsPlugin));
218
219 let async_world = AsyncWorld::from_world(app.world_mut());
220 let id = app.world_mut().spawn_empty().id();
221 let (start_waiting_for, value_rx) = StartWaitingFor::<Counter>::component(id);
222
223 let fut = async move {
224 async_world
225 .start_queue()
226 .push(insert(id, Counter(3)))
227 .push(start_waiting_for)
228 .apply()
229 .await;
230 };
231 AsyncComputeTaskPool::get().spawn(fut).detach();
232
233 let counter = loop {
234 match value_rx.try_recv() {
235 Ok(value) => break value,
236 Err(_) => app.update(),
237 }
238 };
239
240 assert_eq!(3, counter.0);
241 }
242
243 #[test]
244 fn sender() {
245 let mut app = App::new();
246 app.add_plugins((MinimalPlugins, AsyncEcsPlugin));
247
248 let async_world = AsyncWorld::from_world(app.world_mut());
249 let sender = async_world.sender();
250 let entity = AsyncEntity::new(Entity::PLACEHOLDER, async_world.clone());
251 let other_sender = entity.sender();
252 assert_eq!(4, sender.0.sender_count());
253 assert_eq!(4, other_sender.0.sender_count());
254 }
255}