bevy_async_ecs/
command.rs1use crate::die;
2use async_channel::Receiver;
3use async_channel::Sender;
4use async_channel::TryRecvError;
5use bevy_ecs::prelude::*;
6use bevy_ecs::world::CommandQueue;
7use std::fmt;
8
9pub struct BoxedCommand(CommandQueue);
11
12impl BoxedCommand {
13 pub fn new<C: Command>(inner: C) -> Self {
15 Self({
16 let mut queue = CommandQueue::default();
17 queue.push(inner);
18 queue
19 })
20 }
21}
22
23impl fmt::Debug for BoxedCommand {
24 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
25 f.debug_struct("BoxedCommand").finish()
26 }
27}
28
29impl From<BoxedCommand> for CommandQueue {
30 fn from(boxed: BoxedCommand) -> Self {
31 boxed.0
32 }
33}
34
35impl Command for BoxedCommand {
36 fn apply(mut self, world: &mut World) {
37 self.0.apply(world);
38 }
39}
40
41pub struct CommandQueueBuilder {
46 inner: CommandQueue,
47 sender: CommandQueueSender,
48}
49
50impl CommandQueueBuilder {
51 pub(crate) fn new(sender: CommandQueueSender) -> Self {
52 let inner = CommandQueue::default();
53 Self { inner, sender }
54 }
55
56 pub fn push<C: Command>(mut self, command: C) -> Self {
60 self.inner.push(command);
61 self
62 }
63
64 pub async fn apply(self) {
68 self.sender.send_queue(self.inner).await;
69 }
70
71 pub fn build(self) -> CommandQueue {
76 self.inner
77 }
78}
79
80impl fmt::Debug for CommandQueueBuilder {
81 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82 f.debug_struct("CommandQueueBuilder")
83 .field("inner", &"[..]")
84 .field("sender", &self.sender)
85 .finish()
86 }
87}
88
89#[derive(Clone, Debug)]
97pub struct CommandQueueSender(Sender<CommandQueue>);
98
99impl CommandQueueSender {
100 pub(crate) fn new(inner: Sender<CommandQueue>) -> Self {
101 Self(inner)
102 }
103
104 pub async fn send_queue(&self, inner_queue: CommandQueue) {
107 self.0.send(inner_queue).await.unwrap_or_else(die)
108 }
109
110 pub async fn send_single(&self, single: BoxedCommand) {
113 self.send_queue(single.into()).await;
114 }
115}
116
117#[derive(Component)]
118pub(crate) struct CommandQueueReceiver(Receiver<CommandQueue>);
119
120impl CommandQueueReceiver {
121 pub(crate) fn new(receiver: Receiver<CommandQueue>) -> Self {
122 Self(receiver)
123 }
124}
125
126pub(crate) fn receive_and_apply_commands(
127 mut commands: Commands,
128 receivers: Query<(Entity, &CommandQueueReceiver)>,
129) {
130 for (id, receiver) in receivers.iter() {
131 loop {
132 match receiver.0.try_recv() {
133 Ok(mut command_queue) => commands.append(&mut command_queue),
134 Err(TryRecvError::Empty) => break,
135 Err(TryRecvError::Closed) => {
136 commands.entity(id).despawn();
137 break;
138 }
139 }
140 }
141 }
142}
143
144#[cfg(test)]
145mod tests {
146 use crate::AsyncEcsPlugin;
147 use crate::AsyncEntity;
148 use crate::AsyncWorld;
149 use crate::util::insert;
150 use crate::wait_for::StartWaitingFor;
151 use bevy::prelude::*;
152 use bevy::tasks::AsyncComputeTaskPool;
153
154 use super::*;
155
156 #[derive(Component)]
157 struct Marker;
158
159 #[derive(Default, Clone, Component)]
160 struct Counter(u8);
161
162 #[test]
163 fn smoke() {
164 let mut app = App::new();
165 app.add_plugins((MinimalPlugins, AsyncEcsPlugin));
166
167 let async_world = AsyncWorld::from_world(app.world_mut());
168 let operation_sender = async_world.sender();
169 let (sender, receiver) = async_channel::bounded(1);
170 let command = BoxedCommand::new(move |world: &mut World| {
171 let id = world.spawn(Marker).id();
172 sender.send_blocking(id).unwrap();
173 });
174 let debugged = format!("{:?}", command);
175
176 AsyncComputeTaskPool::get()
177 .spawn(async move { async_world.apply(command).await })
178 .detach();
179
180 let id = loop {
181 match receiver.try_recv() {
182 Ok(id) => break id,
183 Err(_) => app.update(),
184 }
185 };
186 app.update();
187
188 assert!(app.world().entity(id).get::<Marker>().is_some());
189 assert_eq!("BoxedCommand", debugged);
190 let debugged = format!("{:?}", CommandQueueBuilder::new(operation_sender));
191 assert_eq!(
192 "CommandQueueBuilder { inner: \"[..]\", sender: CommandQueueSender(Sender { .. }) }",
193 debugged
194 );
195 }
196
197 #[test]
198 fn queue() {
199 let mut app = App::new();
200 app.add_plugins((MinimalPlugins, AsyncEcsPlugin));
201
202 let async_world = AsyncWorld::from_world(app.world_mut());
203 let id = app.world_mut().spawn_empty().id();
204 let (start_waiting_for, value_rx) = StartWaitingFor::<Counter>::component(id);
205
206 let fut = async move {
207 async_world
208 .start_queue()
209 .push(insert(id, Counter(3)))
210 .push(start_waiting_for)
211 .apply()
212 .await;
213 };
214 AsyncComputeTaskPool::get().spawn(fut).detach();
215
216 let counter = loop {
217 match value_rx.try_recv() {
218 Ok(value) => break value,
219 Err(_) => app.update(),
220 }
221 };
222
223 assert_eq!(3, counter.0);
224 }
225
226 #[test]
227 fn sender() {
228 let mut app = App::new();
229 app.add_plugins((MinimalPlugins, AsyncEcsPlugin));
230
231 let async_world = AsyncWorld::from_world(app.world_mut());
232 let sender = async_world.sender();
233 let entity = AsyncEntity::new(Entity::PLACEHOLDER, async_world.clone());
234 let other_sender = entity.sender();
235 assert_eq!(4, sender.0.sender_count());
236 assert_eq!(4, other_sender.0.sender_count());
237 }
238}