1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
use std::cell::Cell;

use thread_local::ThreadLocal;

use crate::{
    entity::Entities,
    prelude::World,
    system::{SystemParam, SystemParamFetch, SystemParamState},
};

use super::{CommandQueue, Commands};

#[doc(hidden)]
#[derive(Default)]
/// The internal [`SystemParamState`] of the [`ParallelCommands`] type
pub struct ParallelCommandsState {
    thread_local_storage: ThreadLocal<Cell<CommandQueue>>,
}

/// An alternative to [`Commands`] that can be used in parallel contexts, such as those in [`Query::par_for_each`](crate::system::Query::par_for_each)
///
/// Note: Because command application order will depend on how many threads are ran, non-commutative commands may result in non-deterministic results.
///
/// Example:
/// ```
/// # use bevy_ecs::prelude::*;
/// # use bevy_tasks::ComputeTaskPool;
/// #
/// # #[derive(Component)]
/// # struct Velocity;
/// # impl Velocity { fn magnitude(&self) -> f32 { 42.0 } }
/// fn parallel_command_system(
///     mut query: Query<(Entity, &Velocity)>,
///     par_commands: ParallelCommands
/// ) {
///     query.par_for_each(32, |(entity, velocity)| {
///         if velocity.magnitude() > 10.0 {
///             par_commands.command_scope(|mut commands| {
///                 commands.entity(entity).despawn();
///             });
///         }
///     });
/// }
/// # bevy_ecs::system::assert_is_system(parallel_command_system);
///```
pub struct ParallelCommands<'w, 's> {
    state: &'s mut ParallelCommandsState,
    entities: &'w Entities,
}

impl SystemParam for ParallelCommands<'_, '_> {
    type Fetch = ParallelCommandsState;
}

impl<'w, 's> SystemParamFetch<'w, 's> for ParallelCommandsState {
    type Item = ParallelCommands<'w, 's>;

    unsafe fn get_param(
        state: &'s mut Self,
        _: &crate::system::SystemMeta,
        world: &'w World,
        _: u32,
    ) -> Self::Item {
        ParallelCommands {
            state,
            entities: world.entities(),
        }
    }
}

// SAFETY: no component or resource access to report
unsafe impl SystemParamState for ParallelCommandsState {
    fn init(_: &mut World, _: &mut crate::system::SystemMeta) -> Self {
        Self::default()
    }

    fn apply(&mut self, world: &mut World) {
        for cq in &mut self.thread_local_storage {
            cq.get_mut().apply(world);
        }
    }
}

impl<'w, 's> ParallelCommands<'w, 's> {
    pub fn command_scope<R>(&self, f: impl FnOnce(Commands) -> R) -> R {
        let store = &self.state.thread_local_storage;
        let command_queue_cell = store.get_or_default();
        let mut command_queue = command_queue_cell.take();

        let r = f(Commands::new_from_entities(
            &mut command_queue,
            self.entities,
        ));

        command_queue_cell.set(command_queue);
        r
    }
}