my_ecs/ecs/
post.rs

1use crate::{
2    ds::{AnyVec, UnsafeFuture},
3    ecs::{
4        cmd::{Command, EntityMoveCommandBuilder},
5        ent::{
6            component::{Component, ComponentKey},
7            entity::{EntityId, EntityKeyRef},
8            storage::{EntityContainer, EntityReg},
9        },
10        entry::{Ecs, EcsEntry},
11        lock::RequestLockFuture,
12        resource::Resource,
13        sched::{
14            comm::{CommandSender, ParkingSender},
15            ctrl::{MainWaker, SubContext, UnsafeWaker, SUB_CONTEXT},
16            task::{AsyncTask, Task},
17        },
18        sys::request::Request,
19        worker::Message,
20        DynResult,
21    },
22    util::macros::debug_format,
23};
24use std::{
25    cmp,
26    collections::{hash_map::Entry, HashMap},
27    future::Future,
28    ptr::NonNull,
29    sync::{
30        atomic::{AtomicU32, Ordering},
31        Arc, Mutex, MutexGuard,
32    },
33};
34
35pub mod prelude {
36    pub use super::{Commander, Post};
37}
38
39/// A [`Resource`] to send command or future.
40///
41/// This resource provides clients funtionalities to send commands or futures
42/// in their systems. This resource also provides interior mutability, so that
43/// clients can request the resource with [`ResRead`](crate::prelude::ResRead).
44//
45// By registering this resource in each ECS instance, it is possible to have
46// multiple ECS instances in one worker. Global function & static variable
47// approach, on the other hand, is not good option for that because it cannot
48// determine which ECS instance is the destination of sending easily.
49pub struct Post {
50    tx_cmd: CommandSender,
51    tx_msg: ParkingSender<Message>,
52    tx_dedi: ParkingSender<Task>,
53    fut_cnt: Arc<AtomicU32>,
54    ent_move: Mutex<EntMoveStorage>,
55}
56
57impl Post {
58    pub(super) fn new(
59        tx_cmd: CommandSender,
60        tx_msg: ParkingSender<Message>,
61        tx_dedi: ParkingSender<Task>,
62        fut_cnt: Arc<AtomicU32>,
63    ) -> Self {
64        Self {
65            tx_cmd,
66            tx_msg,
67            tx_dedi,
68            fut_cnt,
69            ent_move: Mutex::new(EntMoveStorage::new()),
70        }
71    }
72
73    /// Sends the given command to ECS scheduler.
74    ///
75    /// Commands are executed on the main worker, but the command is not
76    /// executed at the time of sending. ECS scheduler runs all buffered
77    /// commands at the end of current cycle and before the next cycle.
78    ///
79    /// # Examples
80    ///
81    /// ```
82    /// use my_ecs::prelude::*;
83    /// use std::sync::mpsc;
84    ///
85    /// let (tx, rx) = mpsc::channel();
86    ///
87    /// // A system sending a command.
88    /// let tx_a = tx.clone();
89    /// let sys_a = move |rr: ResRead<Post>| {
90    ///     tx_a.send("sys_a").unwrap();
91    ///     let tx_aa = tx_a.clone();
92    ///     rr.send_command(move |ecs| {
93    ///         tx_aa.send("cmd_a").unwrap();
94    ///         Ok(())
95    ///     });
96    /// };
97    ///
98    /// // A system sending a command.
99    /// let tx_b = tx.clone();
100    /// let sys_b = move |rr: ResRead<Post>| {
101    ///     tx_b.send("sys_b").unwrap();
102    ///     let tx_bb = tx_b.clone();
103    ///     rr.send_command(move |ecs| {
104    ///         tx_bb.send("cmd_b").unwrap();
105    ///         Ok(())
106    ///     });
107    /// };
108    ///
109    /// Ecs::default(WorkerPool::with_len(2), [2])
110    ///     .add_systems((sys_a, sys_b))
111    ///     .step();
112    ///
113    /// // No data dependency between `sys_a` and `sys_b`, so they can be run
114    /// // simultaneously.
115    /// assert!(matches!(rx.try_recv(), Ok("sys_a") | Ok("sys_b")));
116    /// assert!(matches!(rx.try_recv(), Ok("sys_a") | Ok("sys_b")));
117    /// assert!(matches!(rx.try_recv(), Ok("cmd_a") | Ok("cmd_b")));
118    /// assert!(matches!(rx.try_recv(), Ok("cmd_a") | Ok("cmd_b")));
119    /// ```
120    pub fn send_command<F>(&self, f: F)
121    where
122        F: FnOnce(Ecs) -> DynResult<()> + Send + 'static,
123    {
124        let wrapped = Some(f);
125        let boxed: Box<dyn Command> = Box::new(wrapped);
126        self.tx_cmd.send_or_cancel(boxed.into());
127    }
128
129    /// Sends the given future to ECS scheduler.
130    ///
131    /// The future is guaranteed to be polled more than or just once in the
132    /// current cycle.
133    ///
134    /// # Examples
135    ///
136    /// ```
137    /// use my_ecs::prelude::*;
138    /// use std::{time::Duration, sync::{Arc, Mutex}};
139    ///
140    /// let state = Arc::new(Mutex::new(0));
141    ///
142    /// let c_state = Arc::clone(&state);
143    /// let foo = async move {
144    ///     *c_state.lock().unwrap() = 1;
145    ///     async_io::Timer::after(Duration::from_millis(10)).await;
146    ///     *c_state.lock().unwrap() = 2;
147    /// };
148    ///
149    /// Ecs::default(WorkerPool::new(), [])
150    ///     .add_once_system(move |rr: ResRead<Post>| {
151    ///         rr.send_future(foo);
152    ///     })
153    ///     .run(|_| {});
154    ///
155    /// assert_eq!(*state.lock().unwrap(), 2);
156    /// ```
157    pub fn send_future<F, R>(&self, future: F)
158    where
159        F: Future<Output = R> + Send + 'static,
160        R: Command,
161    {
162        let ptr = SUB_CONTEXT.get();
163
164        if ptr.is_dangling() {
165            on_main(self, future);
166        } else {
167            on_sub(future, *ptr);
168        }
169
170        // === Internal helper functions ===
171
172        fn on_main<F, R>(this: &Post, future: F)
173        where
174            F: Future<Output = R> + Send + 'static,
175            R: Command,
176        {
177            // Allocates memory for the future.
178            let waker = MainWaker::new(this.tx_dedi.clone());
179            let handle = UnsafeFuture::new(future, waker, consume_ready_future::<R>);
180
181            // Increases future count.
182            this.fut_cnt.fetch_add(1, Ordering::Relaxed);
183
184            // Pushes the future handle onto dedicated queue.
185            let task = Task::Async(AsyncTask(handle));
186            this.tx_dedi.send(task).unwrap();
187        }
188
189        fn on_sub<F, R>(future: F, cx: NonNull<SubContext>)
190        where
191            F: Future<Output = R> + Send + 'static,
192            R: Command,
193        {
194            // Allocates memory for the future.
195            let waker = UnsafeWaker::new(cx.as_ptr());
196            let handle = UnsafeFuture::new(future, waker, consume_ready_future::<R>);
197
198            // Pushes the future handle onto local future queue.
199            // Safety: Current worker has valid sub context pointer.
200            let comm = unsafe { cx.as_ref().get_comm() };
201            comm.push_future_task(handle);
202
203            // Increases future count.
204            // Main worker will check this whenever it needs.
205            comm.signal().add_future_count(1);
206
207            // If current worker's local queue is not empty, current worker cannot
208            // do the future task promptly, so wakes another worker to steal it.
209            if !comm.is_local_empty() {
210                comm.signal().sub().notify_one();
211            }
212        }
213    }
214
215    pub fn request_lock<'buf, Req>(&self) -> RequestLockFuture<'buf, Req>
216    where
217        Req: Request,
218    {
219        RequestLockFuture::new(self.tx_cmd.clone(), self.tx_msg.clone())
220    }
221
222    pub fn change_entity(&self, eid: EntityId) -> EntityMoveCommandBuilder<'_> {
223        let guard = self.lock_entity_move_storage();
224        EntityMoveCommandBuilder::new(&self.tx_cmd, guard, eid)
225    }
226
227    pub(crate) fn as_commander(&self) -> Commander<'_> {
228        Commander(self)
229    }
230
231    pub(crate) fn lock_entity_move_storage(&self) -> MutexGuard<'_, EntMoveStorage> {
232        self.ent_move.lock().unwrap()
233    }
234
235    pub(crate) fn shrink_to_fit(&self) {
236        let mut guard = self.ent_move.lock().unwrap();
237        guard.shrink_to_fit();
238    }
239}
240
241impl Resource for Post {}
242
243pub struct Commander<'a>(&'a Post);
244
245impl Commander<'_> {
246    pub fn change_entity(&self, eid: EntityId) -> EntityMoveCommandBuilder<'_> {
247        self.0.change_entity(eid)
248    }
249}
250
251pub(crate) fn consume_ready_future<T: Command>(mut res: T, ecs: Ecs<'_>) -> DynResult<()> {
252    res.command(ecs)
253}
254
255/// A temporary storage containing entity move commands caused by attaching or
256/// detaching components to currently existing entities.
257#[derive(Debug)]
258pub(crate) struct EntMoveStorage {
259    /// Operations on entities whether to add or remove components.
260    ops: Vec<Operation>,
261
262    /// Lengths of commands.
263    ///
264    /// A command is composed of a set of operations. Length of it is number
265    /// of operations of the set.
266    /// In other words, `lens.iter().sum() == ops.len()`.
267    lens: Vec<usize>,
268
269    /// Component values to be added.
270    adds: HashMap<ComponentKey, AnyVec>,
271
272    /// A buffer for holding component keys temporarily.
273    ckey_buf: Vec<ComponentKey>,
274}
275
276impl EntMoveStorage {
277    pub(crate) fn new() -> Self {
278        Self {
279            ops: Vec::new(),
280            lens: Vec::new(),
281            adds: HashMap::default(),
282            ckey_buf: Vec::new(),
283        }
284    }
285
286    pub(crate) fn insert_addition<C: Component>(&mut self, eid: EntityId, comp: C) {
287        self.ops.push(Operation {
288            from: eid,
289            target: C::key(),
290            dir: Direction::Add,
291        });
292        match self.adds.entry(C::key()) {
293            Entry::Occupied(mut entry) => {
294                let v = entry.get_mut();
295                // Safety: Vector holds `AddingComponent`.
296                unsafe { v.push(comp) };
297            }
298            Entry::Vacant(entry) => {
299                let mut v = AnyVec::new(C::type_info());
300                // Safety: Vector holds `AddingComponent`.
301                unsafe { v.push(comp) };
302                entry.insert(v);
303            }
304        }
305    }
306
307    pub(crate) fn insert_removal(&mut self, eid: EntityId, ckey: ComponentKey) {
308        self.ops.push(Operation {
309            from: eid,
310            target: ckey,
311            dir: Direction::Remove,
312        });
313    }
314
315    pub(crate) fn set_command_length(&mut self, len: usize) {
316        debug_assert!(len > 0);
317        self.lens.push(len);
318    }
319
320    pub(crate) fn consume(&mut self, mut ecs: Ecs<'_>) {
321        while let Some(len) = self.lens.pop() {
322            self.handle(len, &mut ecs);
323        }
324    }
325
326    /// Moves an entity from one entity container to another.
327    ///
328    /// If failed to find source entity, does nothing.
329    ///
330    /// # Safety
331    ///
332    /// - `adds` must contain proper component values in it.
333    fn handle(&mut self, len: usize, ecs: &mut Ecs<'_>) {
334        // Safety: We got an operation, which means length must exist.
335        let src_eid = unsafe { self.ops.last().unwrap_unchecked().from };
336
337        // Gets src entity container.
338        let ei = src_eid.container_index();
339        let src_ekey = EntityKeyRef::Index(&ei);
340        let Some(mut src_cont) = ecs.entity_container_ptr(src_ekey) else {
341            return;
342        };
343        // Safety: We got the pointer from Ecs right before.
344        let src_cont_mut = unsafe { src_cont.as_mut() };
345
346        // Gets dst entity container.
347        self.set_dst_ckeys(len, src_cont_mut);
348        let mut dst_cont = self.find_dst_cont(src_cont_mut, ecs);
349        debug_assert_ne!(src_cont, dst_cont);
350
351        // Safety: We got the pointer from Ecs right before.
352        let dst_cont_mut = unsafe { dst_cont.as_mut() };
353
354        // Moves an entity from src to dst.
355        if let Some(src_vi) = src_cont_mut.to_value_index(src_eid.row_index()) {
356            self.move_entity(src_vi, src_cont_mut, dst_cont_mut);
357        }
358
359        self.ops.truncate(self.ops.len() - len);
360    }
361
362    /// # Panics
363    ///
364    /// Panics if
365    /// - Inserted operations try to add components that already belong to
366    ///   the source entity container.
367    /// - Inserted operations try to remove components that doesn't belong
368    ///   to the source entity container.
369    fn set_dst_ckeys(&mut self, len: usize, src_cont: &EntityContainer) {
370        let contains_src =
371            |ckey: &ComponentKey| src_cont.get_tag().get_component_keys().contains(ckey);
372
373        self.ckey_buf.clear();
374        self.ckey_buf
375            .extend(src_cont.get_tag().get_component_keys().iter());
376
377        let op_iter = self.ops.iter().rev().take(len);
378
379        for Operation {
380            from: _from,
381            target,
382            dir,
383        } in op_iter
384        {
385            match dir {
386                Direction::Add => {
387                    let reason = debug_format!(
388                        "adding component({:?}) to entity({:?}) failed. it already belongs to the entity",
389                        target,
390                        _from
391                    );
392                    assert!(!contains_src(target), "{}", reason);
393                    self.ckey_buf.push(*target);
394                }
395                Direction::Remove => {
396                    let reason = debug_format!(
397                        "removing component({:?}) from entity({:?}) failed. it doesn't belong to the entity",
398                        target,
399                        _from
400                    );
401                    assert!(contains_src(target), "{}", reason);
402
403                    let reason =
404                        debug_format!("removing the same component more than once is not allowed");
405                    let (i, _) = self
406                        .ckey_buf
407                        .iter()
408                        .enumerate()
409                        .find(|(_, ckey)| *ckey == target)
410                        .expect(&reason);
411                    self.ckey_buf.swap_remove(i);
412                }
413            }
414        }
415
416        self.ckey_buf.sort_unstable();
417    }
418
419    fn find_dst_cont(
420        &self,
421        src_cont: &EntityContainer,
422        ecs: &mut Ecs<'_>,
423    ) -> NonNull<EntityContainer> {
424        let dst_ckeys = &self.ckey_buf;
425
426        let dst_ekey = EntityKeyRef::Ckeys(dst_ckeys);
427        if let Some(dst_cont) = ecs.entity_container_ptr(dst_ekey) {
428            dst_cont
429        } else {
430            let mut desc = EntityReg::new(None, src_cont.create_twin());
431
432            for dst_ckey in dst_ckeys.iter() {
433                if src_cont.contains_column(dst_ckey) {
434                    // Safety: Infallible.
435                    let tinfo = unsafe {
436                        let ci = src_cont.get_column_index(dst_ckey).unwrap_unchecked();
437                        *src_cont.get_column_info(ci).unwrap_unchecked()
438                    };
439                    desc.add_component(tinfo);
440                } else {
441                    // Safety: Infallible.
442                    debug_assert!(self.adds.contains_key(dst_ckey));
443                    let v = unsafe { self.adds.get(dst_ckey).unwrap_unchecked() };
444                    desc.add_component(*v.type_info())
445                }
446            }
447
448            let res = ecs.register_entity(desc);
449            debug_assert!(res.is_ok());
450
451            let dst_ekey = EntityKeyRef::Ckeys(dst_ckeys);
452            ecs.entity_container_ptr(dst_ekey).unwrap()
453        }
454    }
455
456    #[rustfmt::skip]
457    fn move_entity(
458        &mut self,
459        src_vi: usize,
460        src_cont: &mut EntityContainer,
461        dst_cont: &mut EntityContainer,
462    ) {
463        // TODO: Test required.
464        //
465        // Safety
466        // 1. We call begin_xxx -> add/remove_xxx -> end_xxx to add/remove
467        //    entity to/from an entity container.
468        // 2. We get component key from entity container. Therefore
469        //    unwrapping column index gotten using the key is safe.
470
471        unsafe {
472            let src_ckeys = Arc::clone(src_cont.get_tag().get_component_keys());
473            let dst_ckeys = Arc::clone(dst_cont.get_tag().get_component_keys());
474
475            src_cont.begin_remove_row_by_value_index(src_vi);
476            dst_cont.begin_add_row();
477
478            let (mut src_ci, src_len) = (0, src_ckeys.len());
479            let (mut dst_ci, dst_len) = (0, dst_ckeys.len());
480
481            while src_ci < src_len && dst_ci < dst_len {
482                match src_ckeys[src_ci].cmp(&dst_ckeys[dst_ci]) {
483                    cmp::Ordering::Equal => { // src = dst : src -> dst
484                        src_to_dst(src_cont, dst_cont, src_ci, src_vi, dst_ci);
485                        src_ci += 1;
486                        dst_ci += 1;
487                    }
488                    cmp::Ordering::Greater => { // src > dst : buf -> dst
489                        buf_to_dst(&mut self.adds, dst_cont, &dst_ckeys[dst_ci], dst_ci);
490                        dst_ci += 1;
491                    }
492                    cmp::Ordering::Less => { // src < dst : src -> drop
493                        src_cont.drop_value_by_value_index(src_ci, src_vi);
494                        src_ci += 1;
495                    }
496                }
497            }
498            while src_ci < src_len { // src -> drop
499                src_cont.drop_value_by_value_index(src_ci, src_vi);
500                src_ci += 1;
501            }
502            while dst_ci < dst_len { // buf -> dst
503                buf_to_dst(&mut self.adds, dst_cont, &dst_ckeys[dst_ci], dst_ci);
504                dst_ci += 1;
505            }
506
507            src_cont.end_remove_row_by_value_index(src_vi);
508            dst_cont.end_add_row();
509        }
510
511        #[inline]
512        unsafe fn src_to_dst(
513            src_cont: &mut EntityContainer,
514            dst_cont: &mut EntityContainer,
515            src_ci: usize,
516            src_vi: usize,
517            dst_ci: usize,
518        ) {
519            unsafe {
520                let src_ptr = src_cont
521                    .value_ptr_by_value_index(src_ci, src_vi)
522                    .unwrap_unchecked();
523                dst_cont.add_value(dst_ci, src_ptr);
524                src_cont.forget_value_by_value_index(src_ci, src_vi);
525            }
526        }
527
528        #[inline]
529        unsafe fn buf_to_dst(
530            bufs: &mut HashMap<ComponentKey, AnyVec>,
531            dst_cont: &mut EntityContainer,
532            dst_ckey: &ComponentKey,
533            dst_ci: usize,
534        ) {
535            let buf = unsafe {
536                let buf = bufs
537                    .get_mut(dst_ckey)
538                    .unwrap_unchecked();
539                let buf_ptr = buf
540                    .get_raw_unchecked(buf.len() - 1); // Infallible
541                dst_cont.add_value(dst_ci, buf_ptr);
542                buf
543            };
544            buf.pop_forget();
545        }
546    }
547
548    fn shrink_to_fit(&mut self) {
549        self.ops.shrink_to_fit();
550        for v in self.adds.values_mut() {
551            v.shrink_to_fit();
552        }
553        // No need for `self.adds`. We never remove items from it.
554    }
555}
556
557#[derive(Debug, Clone, Copy)]
558struct Operation {
559    /// Entity that the operation will be executed on.
560    from: EntityId,
561
562    /// The operation's target component.
563    target: ComponentKey,
564
565    /// Whether be added to the entity or removed from the entity.
566    dir: Direction,
567}
568
569#[derive(Debug, Clone, Copy)]
570enum Direction {
571    Add,
572    Remove,
573}
574
575#[cfg(test)]
576mod tests {
577    #[test]
578    #[should_panic]
579    #[rustfmt::skip]
580    fn test_add_existing_component_panic() {
581        use crate as my_ecs;
582        use my_ecs::prelude::*;
583
584        #[derive(Entity)]
585        struct Ea { ca: Ca }
586        #[derive(Component)] struct Ca;
587
588        Ecs::default(WorkerPool::with_len(1), [1])
589            .register_entity_of::<Ea>()
590            .add_system(SystemDesc::new().with_once(|rr: ResRead<Post>, ew: EntWrite<Ea>| {
591                let eid = ew.take_recur().add(Ea { ca: Ca });
592                // `Ea` contains `Ca`, so clients cannot add `Ca` again.
593                rr.change_entity(eid).attach(Ca);
594            }))
595            .step();
596    }
597
598    #[test]
599    #[should_panic]
600    #[rustfmt::skip]
601    fn test_add_duplicated_components_panic() {
602        use crate as my_ecs;
603        use my_ecs::prelude::*;
604
605        #[derive(Entity)]
606        struct Ea { ca: Ca }
607        #[derive(Component)] struct Ca;
608        #[derive(Component)] struct Cb;
609
610        Ecs::default(WorkerPool::with_len(1), [1])
611            .register_entity_of::<Ea>()
612            .add_system(SystemDesc::new().with_once(|rr: ResRead<Post>, ew: EntWrite<Ea>| {
613                let eid = ew.take_recur().add(Ea { ca: Ca });
614                // Duplicated components are not allowed.
615                rr.change_entity(eid).attach(Cb).attach(Cb);
616            }))
617            .step();
618    }
619
620    #[test]
621    #[should_panic]
622    #[rustfmt::skip]
623    fn test_remove_unknown_component_panic() {
624        use crate as my_ecs;
625        use my_ecs::prelude::*;
626
627        #[derive(Entity)]
628        struct Ea { ca: Ca }
629        #[derive(Component)] struct Ca;
630        #[derive(Component)] struct Cb;
631
632        Ecs::default(WorkerPool::with_len(1), [1])
633            .register_entity_of::<Ea>()
634            .add_system(SystemDesc::new().with_once(|rr: ResRead<Post>, ew: EntWrite<Ea>| {
635                let eid = ew.take_recur().add(Ea { ca: Ca });
636                // `Ea` doesn't contain `Cb`, so clients cannot remove `Cb`.
637                rr.change_entity(eid).detach::<Cb>();
638            }))
639            .step();
640    }
641
642    #[test]
643    #[should_panic]
644    #[rustfmt::skip]
645    fn test_remove_the_same_component_many_panic() {
646        use crate as my_ecs;
647        use my_ecs::prelude::*;
648
649        #[derive(Entity)]
650        struct Ea { ca: Ca, cb: Cb }
651        #[derive(Component)] struct Ca;
652        #[derive(Component)] struct Cb;
653
654        Ecs::default(WorkerPool::with_len(1), [1])
655            .register_entity_of::<Ea>()
656            .add_system(SystemDesc::new().with_once(|rr: ResRead<Post>, ew: EntWrite<Ea>| {
657                let eid = ew.take_recur().add(Ea { ca: Ca, cb: Cb });
658                // Removing the same component multiple times is not allowed.
659                rr.change_entity(eid).detach::<Cb>().detach::<Cb>();
660            }))
661            .step();
662    }
663}