use crate::{
change_detection::MaybeLocation,
system::{Command, SystemBuffer, SystemMeta},
world::{DeferredWorld, World},
};
use alloc::{boxed::Box, vec::Vec};
use bevy_ptr::{OwningPtr, Unaligned};
use core::{
fmt::Debug,
mem::{size_of, MaybeUninit},
panic::AssertUnwindSafe,
ptr::{addr_of_mut, NonNull},
};
use log::warn;
struct CommandMeta {
consume_command_and_get_size:
unsafe fn(value: OwningPtr<Unaligned>, world: Option<NonNull<World>>, cursor: &mut usize),
}
pub struct CommandQueue {
pub(crate) bytes: Vec<MaybeUninit<u8>>,
pub(crate) cursor: usize,
pub(crate) panic_recovery: Vec<MaybeUninit<u8>>,
pub(crate) caller: MaybeLocation,
}
impl Default for CommandQueue {
#[track_caller]
fn default() -> Self {
Self {
bytes: Default::default(),
cursor: Default::default(),
panic_recovery: Default::default(),
caller: MaybeLocation::caller(),
}
}
}
#[derive(Clone)]
pub(crate) struct RawCommandQueue {
pub(crate) bytes: NonNull<Vec<MaybeUninit<u8>>>,
pub(crate) cursor: NonNull<usize>,
pub(crate) panic_recovery: NonNull<Vec<MaybeUninit<u8>>>,
}
impl Debug for CommandQueue {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("CommandQueue")
.field("len_bytes", &self.bytes.len())
.field("caller", &self.caller)
.finish_non_exhaustive()
}
}
unsafe impl Send for CommandQueue {}
unsafe impl Sync for CommandQueue {}
impl CommandQueue {
#[inline]
pub fn push(&mut self, command: impl Command<Out = ()>) {
unsafe {
self.get_raw().push(command);
}
}
#[inline]
pub fn apply(&mut self, world: &mut World) {
world.flush_commands();
unsafe {
self.get_raw().apply_or_drop_queued(Some(world.into()));
}
}
pub fn append(&mut self, other: &mut CommandQueue) {
self.bytes.append(&mut other.bytes);
}
#[inline]
pub fn is_empty(&self) -> bool {
self.cursor >= self.bytes.len()
}
pub(crate) fn get_raw(&mut self) -> RawCommandQueue {
unsafe {
RawCommandQueue {
bytes: NonNull::new_unchecked(addr_of_mut!(self.bytes)),
cursor: NonNull::new_unchecked(addr_of_mut!(self.cursor)),
panic_recovery: NonNull::new_unchecked(addr_of_mut!(self.panic_recovery)),
}
}
}
}
impl RawCommandQueue {
pub(crate) fn new() -> Self {
unsafe {
Self {
bytes: NonNull::new_unchecked(Box::into_raw(Box::default())),
cursor: NonNull::new_unchecked(Box::into_raw(Box::new(0usize))),
panic_recovery: NonNull::new_unchecked(Box::into_raw(Box::default())),
}
}
}
pub unsafe fn is_empty(&self) -> bool {
(unsafe { *self.cursor.as_ref() }) >= (unsafe { self.bytes.as_ref() }).len()
}
#[inline]
pub unsafe fn push<C: Command<Out = ()>>(&mut self, command: C) {
#[repr(C, packed)]
struct Packed<C: Command<Out = ()>> {
meta: CommandMeta,
command: C,
}
let meta = CommandMeta {
consume_command_and_get_size: |command, world, cursor| {
*cursor += size_of::<C>();
let command: C = unsafe { command.read_unaligned() };
match world {
Some(mut world) => {
let world = unsafe { world.as_mut() };
command.apply(world);
world.flush();
}
None => drop(command),
}
},
};
let bytes = unsafe { self.bytes.as_mut() };
let old_len = bytes.len();
bytes.reserve(size_of::<Packed<C>>());
let ptr = unsafe { bytes.as_mut_ptr().add(old_len) };
unsafe {
ptr.cast::<Packed<C>>()
.write_unaligned(Packed { meta, command });
}
unsafe {
bytes.set_len(old_len + size_of::<Packed<C>>());
}
}
#[inline]
pub(crate) unsafe fn apply_or_drop_queued(&mut self, world: Option<NonNull<World>>) {
let start = *self.cursor.as_ref();
let stop = self.bytes.as_ref().len();
let mut local_cursor = start;
*self.cursor.as_mut() = stop;
while local_cursor < stop {
let meta = unsafe {
self.bytes
.as_mut()
.as_mut_ptr()
.add(local_cursor)
.cast::<CommandMeta>()
.read_unaligned()
};
local_cursor += size_of::<CommandMeta>();
let cmd = unsafe {
OwningPtr::<Unaligned>::new(NonNull::new_unchecked(
self.bytes.as_mut().as_mut_ptr().add(local_cursor).cast(),
))
};
let f = AssertUnwindSafe(|| {
unsafe { (meta.consume_command_and_get_size)(cmd, world, &mut local_cursor) };
});
#[cfg(feature = "std")]
{
let result = std::panic::catch_unwind(f);
if let Err(payload) = result {
let panic_recovery = self.panic_recovery.as_mut();
let bytes = self.bytes.as_mut();
let current_stop = bytes.len();
panic_recovery.extend_from_slice(&bytes[local_cursor..current_stop]);
bytes.set_len(start);
*self.cursor.as_mut() = start;
if start == 0 {
bytes.append(panic_recovery);
}
std::panic::resume_unwind(payload);
}
}
#[cfg(not(feature = "std"))]
(f)();
}
unsafe {
self.bytes.as_mut().set_len(start);
*self.cursor.as_mut() = start;
};
}
}
impl Drop for CommandQueue {
fn drop(&mut self) {
if !self.bytes.is_empty() {
if let Some(caller) = self.caller.into_option() {
warn!("CommandQueue has un-applied commands being dropped. Did you forget to call SystemState::apply? caller:{caller:?}");
} else {
warn!("CommandQueue has un-applied commands being dropped. Did you forget to call SystemState::apply?");
}
}
unsafe { self.get_raw().apply_or_drop_queued(None) };
}
}
impl SystemBuffer for CommandQueue {
#[inline]
fn apply(&mut self, _system_meta: &SystemMeta, world: &mut World) {
#[cfg(feature = "trace")]
let _span_guard = _system_meta.commands_span.enter();
self.apply(world);
}
#[inline]
fn queue(&mut self, _system_meta: &SystemMeta, mut world: DeferredWorld) {
world.commands().append(self);
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::{component::Component, resource::Resource};
use alloc::{borrow::ToOwned, string::String, sync::Arc};
use core::{
panic::AssertUnwindSafe,
sync::atomic::{AtomicU32, Ordering},
};
#[cfg(miri)]
use alloc::format;
struct DropCheck(Arc<AtomicU32>);
impl DropCheck {
fn new() -> (Self, Arc<AtomicU32>) {
let drops = Arc::new(AtomicU32::new(0));
(Self(drops.clone()), drops)
}
}
impl Drop for DropCheck {
fn drop(&mut self) {
self.0.fetch_add(1, Ordering::Relaxed);
}
}
impl Command for DropCheck {
type Out = ();
fn apply(self, _: &mut World) {}
}
#[test]
fn test_command_queue_inner_drop() {
let mut queue = CommandQueue::default();
let (dropcheck_a, drops_a) = DropCheck::new();
let (dropcheck_b, drops_b) = DropCheck::new();
queue.push(dropcheck_a);
queue.push(dropcheck_b);
assert_eq!(drops_a.load(Ordering::Relaxed), 0);
assert_eq!(drops_b.load(Ordering::Relaxed), 0);
let mut world = World::new();
queue.apply(&mut world);
assert_eq!(drops_a.load(Ordering::Relaxed), 1);
assert_eq!(drops_b.load(Ordering::Relaxed), 1);
}
#[test]
fn test_command_queue_inner_drop_early() {
let mut queue = CommandQueue::default();
let (dropcheck_a, drops_a) = DropCheck::new();
let (dropcheck_b, drops_b) = DropCheck::new();
queue.push(dropcheck_a);
queue.push(dropcheck_b);
assert_eq!(drops_a.load(Ordering::Relaxed), 0);
assert_eq!(drops_b.load(Ordering::Relaxed), 0);
drop(queue);
assert_eq!(drops_a.load(Ordering::Relaxed), 1);
assert_eq!(drops_b.load(Ordering::Relaxed), 1);
}
#[derive(Component)]
struct A;
struct SpawnCommand;
impl Command for SpawnCommand {
type Out = ();
fn apply(self, world: &mut World) {
world.spawn(A);
}
}
#[test]
fn test_command_queue_inner() {
let mut queue = CommandQueue::default();
queue.push(SpawnCommand);
queue.push(SpawnCommand);
let mut world = World::new();
queue.apply(&mut world);
assert_eq!(world.query::<&A>().query(&world).count(), 2);
queue.apply(&mut world);
assert_eq!(world.query::<&A>().query(&world).count(), 2);
}
#[expect(
dead_code,
reason = "The inner string is used to ensure that, when the PanicCommand gets pushed to the queue, some data is written to the `bytes` vector."
)]
struct PanicCommand(String);
impl Command for PanicCommand {
type Out = ();
fn apply(self, _: &mut World) {
panic!("command is panicking");
}
}
#[test]
fn test_command_queue_inner_panic_safe() {
std::panic::set_hook(Box::new(|_| {}));
let mut queue = CommandQueue::default();
queue.push(PanicCommand("I panic!".to_owned()));
queue.push(SpawnCommand);
let mut world = World::new();
let _ = std::panic::catch_unwind(AssertUnwindSafe(|| {
queue.apply(&mut world);
}));
queue.push(SpawnCommand);
queue.push(SpawnCommand);
queue.apply(&mut world);
assert_eq!(world.query::<&A>().query(&world).count(), 3);
}
#[test]
fn test_command_queue_inner_nested_panic_safe() {
std::panic::set_hook(Box::new(|_| {}));
#[derive(Resource, Default)]
struct Order(Vec<usize>);
let mut world = World::new();
world.init_resource::<Order>();
fn add_index(index: usize) -> impl Command {
move |world: &mut World| world.resource_mut::<Order>().0.push(index)
}
world.commands().queue(add_index(1));
world.commands().queue(|world: &mut World| {
world.commands().queue(add_index(2));
world.commands().queue(PanicCommand("I panic!".to_owned()));
world.commands().queue(add_index(3));
world.flush_commands();
});
world.commands().queue(add_index(4));
let _ = std::panic::catch_unwind(AssertUnwindSafe(|| {
world.flush_commands();
}));
world.commands().queue(add_index(5));
world.flush_commands();
assert_eq!(&world.resource::<Order>().0, &[1, 2, 3, 4, 5]);
}
fn assert_is_send_impl(_: impl Send) {}
fn assert_is_send(command: impl Command) {
assert_is_send_impl(command);
}
#[test]
fn test_command_is_send() {
assert_is_send(SpawnCommand);
}
#[expect(
dead_code,
reason = "This struct is used to test how the CommandQueue reacts to padding added by rust's compiler."
)]
struct CommandWithPadding(u8, u16);
impl Command for CommandWithPadding {
type Out = ();
fn apply(self, _: &mut World) {}
}
#[cfg(miri)]
#[test]
fn test_uninit_bytes() {
let mut queue = CommandQueue::default();
queue.push(CommandWithPadding(0, 0));
let _ = format!("{:?}", queue.bytes);
}
}