edict/flow/
mod.rs

1//! Flow module provides API to create high-level async workflows on top of the Edict ECS.
2//!
3//! Typical example would be a flow that causes an entity to move towards a target.
4//! It is resolved when the entity reaches the target or target is destroyed.
5//!
6//! Spawning a flow also returns a handle that can be used to await or cancel the flow.
7//! Spawned flow wraps flow result in a `Result` type where `Err` signals that the flow was cancelled.
8//! Flows are canceled when handle is dropped or entity is despawned.
9//!
10//! # Example
11//!
12//! ```ignore
13//! unit.move_to(target).await
14//! ```
15//!
16
17use core::{
18    any::TypeId,
19    cell::UnsafeCell,
20    future::Future,
21    mem::ManuallyDrop,
22    pin::Pin,
23    task::{Context, Poll, Waker},
24};
25
26use alloc::task::Wake;
27
28use alloc::sync::Arc;
29use amity::{flip_queue::FlipQueue, ring_buffer::RingBuffer};
30use hashbrown::HashMap;
31use slab::Slab;
32
33use crate::{
34    system::State,
35    tls, type_id,
36    world::{World, WorldLocal},
37    EntityId,
38};
39
40mod entity;
41mod world;
42
43pub use self::{entity::*, world::*};
44
45/// Task that access world when polled.
46pub trait Flow: 'static {
47    unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>;
48}
49
50pub trait IntoFlow: 'static {
51    type Flow: Flow;
52
53    /// Converts flow into the inner flow type.
54    unsafe fn into_flow(self) -> Self::Flow;
55}
56
57/// Call only from flow context.
58///
59/// This is public for use by custom flows.
60/// Built-in flows use it internally from `FlowWorld` and `FlowEntity`.
61#[inline(always)]
62pub unsafe fn flow_world_ref<'a>() -> &'a WorldLocal {
63    unsafe { tls::get_world_ref() }
64}
65
66/// Call only from flow context.
67///
68/// This is public for use by custom flows.
69/// Built-in flows use it internally from `FlowWorld` and `FlowEntity`.
70#[inline(always)]
71pub unsafe fn flow_world_mut<'a>() -> &'a mut WorldLocal {
72    unsafe { tls::get_world_mut() }
73}
74
75/// Type-erased array of newly inserted flows of a single type.
76trait AnyIntoFlows {
77    /// Returns type of the IntoFlow.
78    fn flow_id(&self) -> TypeId;
79
80    /// Drains the array into the queue.
81    fn drain(&mut self, flows: &mut HashMap<TypeId, AnyQueue>);
82}
83
84impl<'a> dyn AnyIntoFlows + 'a {
85    #[inline(always)]
86    unsafe fn downcast_mut<F: 'static>(&mut self) -> &mut TypedIntoFlows<F> {
87        debug_assert_eq!(self.flow_id(), type_id::<F>());
88
89        unsafe { &mut *(self as *mut Self as *mut TypedIntoFlows<F>) }
90    }
91}
92
93impl<'a> dyn AnyIntoFlows + Send + 'a {
94    #[inline(always)]
95    unsafe fn downcast_mut<F: 'static>(&mut self) -> &mut TypedIntoFlows<F> {
96        debug_assert_eq!(self.flow_id(), type_id::<F>());
97
98        unsafe { &mut *(self as *mut Self as *mut TypedIntoFlows<F>) }
99    }
100}
101
102/// Typed array of newly inserted flows of a single type.
103struct TypedIntoFlows<F> {
104    array: Vec<F>,
105}
106
107impl<F> AnyIntoFlows for TypedIntoFlows<F>
108where
109    F: IntoFlow,
110{
111    fn flow_id(&self) -> TypeId {
112        type_id::<F>()
113    }
114
115    fn drain(&mut self, flows: &mut HashMap<TypeId, AnyQueue>) {
116        // Short-circuit if there are no new flows.
117        if self.array.is_empty() {
118            return;
119        }
120
121        let flow_id = type_id::<F::Flow>();
122
123        // Find queue for this type of flows or create new one.
124        let queue = flows
125            .entry(flow_id)
126            .or_insert_with(AnyQueue::new::<F::Flow>);
127
128        // Safety: TypedFlows<F> is at index `type_id::<F>()` in `flows.map`.
129        let typed_flows = unsafe { queue.flows.downcast_mut::<F::Flow>() };
130
131        // Reserve space to ensure oom can't happen in the loop below.
132        typed_flows.array.reserve(self.array.len());
133
134        for into_flow in self.array.drain(..) {
135            let id = typed_flows.array.vacant_key();
136
137            let task = FlowTask {
138                flow: UnsafeCell::new(ManuallyDrop::new(unsafe { into_flow.into_flow() })),
139                id,
140                queue: queue.queue.clone(),
141            };
142
143            typed_flows.array.insert(Arc::new(task));
144            queue.ready.push(id);
145        }
146    }
147}
148
149struct NewSendFlows {
150    map: HashMap<TypeId, Box<dyn AnyIntoFlows + Send>>,
151}
152
153impl Default for NewSendFlows {
154    fn default() -> Self {
155        Self::new()
156    }
157}
158
159impl NewSendFlows {
160    fn new() -> Self {
161        NewSendFlows {
162            map: HashMap::new(),
163        }
164    }
165
166    pub fn typed_new_flows<F>(&mut self) -> &mut TypedIntoFlows<F>
167    where
168        F: IntoFlow + Send,
169    {
170        let new_flows = self
171            .map
172            .entry(type_id::<F>())
173            .or_insert_with(|| Box::new(TypedIntoFlows::<F> { array: Vec::new() }));
174
175        unsafe { new_flows.downcast_mut::<F>() }
176    }
177
178    pub fn add<F>(&mut self, flow: F)
179    where
180        F: IntoFlow + Send,
181    {
182        let typed_new_flows = self.typed_new_flows();
183        typed_new_flows.array.push(flow);
184    }
185}
186
187struct NewLocalFlows {
188    map: HashMap<TypeId, Box<dyn AnyIntoFlows>>,
189}
190
191impl Default for NewLocalFlows {
192    fn default() -> Self {
193        Self::new()
194    }
195}
196
197impl NewLocalFlows {
198    fn new() -> Self {
199        NewLocalFlows {
200            map: HashMap::new(),
201        }
202    }
203
204    pub fn typed_new_flows<F>(&mut self) -> &mut TypedIntoFlows<F>
205    where
206        F: IntoFlow,
207    {
208        let new_flows = self
209            .map
210            .entry(type_id::<F>())
211            .or_insert_with(|| Box::new(TypedIntoFlows::<F> { array: Vec::new() }));
212
213        unsafe { new_flows.downcast_mut::<F>() }
214    }
215
216    pub fn add<F>(&mut self, flow: F)
217    where
218        F: IntoFlow,
219    {
220        let typed_new_flows = self.typed_new_flows();
221        typed_new_flows.array.push(flow);
222    }
223}
224
225/// Trait implemented by `TypedFlows` with `F: Flow`
226trait AnyFlows {
227    #[cfg(debug_assertions)]
228    fn flow_id(&self) -> TypeId;
229
230    unsafe fn execute(&mut self, front: &[usize], back: &[usize]);
231}
232
233impl dyn AnyFlows {
234    #[inline(always)]
235    unsafe fn downcast_mut<F: 'static>(&mut self) -> &mut TypedFlows<F> {
236        #[cfg(debug_assertions)]
237        assert_eq!(self.flow_id(), type_id::<F>());
238
239        unsafe { &mut *(self as *mut Self as *mut TypedFlows<F>) }
240    }
241}
242
243struct FlowTask<F> {
244    flow: UnsafeCell<ManuallyDrop<F>>,
245    id: usize,
246    queue: Arc<FlipQueue<usize>>,
247}
248
249/// Safety: `FlowTask` can be sent to another thread as `Waker`
250/// which does not access `flow` field.
251unsafe impl<F> Send for FlowTask<F> {}
252unsafe impl<F> Sync for FlowTask<F> {}
253
254impl<F> Wake for FlowTask<F>
255where
256    F: Flow,
257{
258    fn wake(self: Arc<Self>) {
259        self.queue.push(self.id);
260    }
261
262    fn wake_by_ref(self: &Arc<Self>) {
263        self.queue.push(self.id);
264    }
265}
266
267impl<F> FlowTask<F>
268where
269    F: Flow,
270{
271    fn waker(self: &Arc<Self>) -> Waker {
272        Waker::from(self.clone())
273    }
274}
275
276/// Container of spawned flows of specific type.
277struct TypedFlows<F> {
278    array: Slab<Arc<FlowTask<F>>>,
279}
280
281impl<F> TypedFlows<F>
282where
283    F: Flow,
284{
285    #[inline(always)]
286    unsafe fn execute(&mut self, ids: &[usize]) {
287        for &id in ids {
288            let Some(task) = self.array.get(id) else {
289                continue;
290            };
291
292            let waker = task.waker();
293            let mut cx = Context::from_waker(&waker);
294
295            // Safety: This is the only code that can access `task.flow`.
296            // It is destroyed in-place when it is ready or TypedFlows is dropped.
297            let poll = unsafe {
298                let pinned = Pin::new_unchecked(&mut **task.flow.get());
299                unsafe { pinned.poll(&mut cx) }
300            };
301
302            if let Poll::Ready(()) = poll {
303                let task = self.array.remove(id);
304                // Safety: Removed from array. `task.flow` is inaccessible anywhere but here.
305                unsafe {
306                    ManuallyDrop::drop(&mut *task.flow.get());
307                }
308            }
309        }
310    }
311}
312
313impl<F> AnyFlows for TypedFlows<F>
314where
315    F: Flow,
316{
317    #[cfg(debug_assertions)]
318    fn flow_id(&self) -> TypeId {
319        type_id::<F>()
320    }
321
322    unsafe fn execute(&mut self, front: &[usize], back: &[usize]) {
323        self.execute(front);
324        self.execute(back);
325    }
326}
327
328/// Queue of flows of a single type.
329struct AnyQueue {
330    queue: Arc<FlipQueue<usize>>,
331    ready: RingBuffer<usize>,
332    flows: Box<dyn AnyFlows>,
333}
334
335impl AnyQueue {
336    fn new<F: Flow>() -> Self {
337        AnyQueue {
338            queue: Arc::new(FlipQueue::new()),
339            ready: RingBuffer::new(),
340            flows: Box::new(TypedFlows::<F> { array: Slab::new() }),
341        }
342    }
343}
344
345/// Flows container manages running flows,
346/// collects spawned flows and executes them.
347pub struct Flows {
348    new_flows: NewSendFlows,
349    new_local_flows: NewLocalFlows,
350    map: HashMap<TypeId, AnyQueue>,
351}
352
353impl Default for Flows {
354    fn default() -> Self {
355        Self::new()
356    }
357}
358
359impl Flows {
360    pub fn new() -> Self {
361        Flows {
362            new_flows: NewSendFlows::new(),
363            new_local_flows: NewLocalFlows::new(),
364            map: HashMap::new(),
365        }
366    }
367
368    /// Call at least once prior to spawning flows.
369    pub fn init(world: &mut World) {
370        world.with_resource(NewSendFlows::new);
371        world.with_resource(NewLocalFlows::new);
372    }
373
374    fn collect_new_flows<'a>(&mut self, world: &'a mut World) -> Option<tls::Guard<'a>> {
375        let world = world.local();
376
377        let mut new_flows_res = match world.get_resource_mut::<NewSendFlows>() {
378            None => return None,
379            Some(new_flows) => new_flows,
380        };
381
382        std::mem::swap(&mut self.new_flows, &mut *new_flows_res);
383        drop(new_flows_res);
384
385        let mut new_local_flows_res = match world.get_resource_mut::<NewLocalFlows>() {
386            None => return None,
387            Some(new_local_flows) => new_local_flows,
388        };
389
390        std::mem::swap(&mut self.new_local_flows, &mut *new_local_flows_res);
391        drop(new_local_flows_res);
392
393        let guard = tls::Guard::new(world);
394
395        // First swap all queues with ready buffer.
396        for typed in self.map.values_mut() {
397            debug_assert!(typed.ready.is_empty());
398            typed.queue.swap_buffer(&mut typed.ready);
399        }
400
401        // Then drain all new flows into queues.
402        // New flow ids are added to ready buffer.
403        for (_, typed) in &mut self.new_flows.map {
404            typed.drain(&mut self.map);
405        }
406        for (_, typed) in &mut self.new_local_flows.map {
407            typed.drain(&mut self.map);
408        }
409
410        Some(guard)
411    }
412
413    pub fn execute(&mut self, world: &mut World) {
414        let Some(_guard) = self.collect_new_flows(world) else {
415            return;
416        };
417
418        // Execute all ready flows.
419        for typed in self.map.values_mut() {
420            let (front, back) = typed.ready.as_slices();
421            unsafe {
422                typed.flows.execute(front, back);
423            }
424
425            // Clear ready buffer.
426            typed.ready.clear();
427        }
428    }
429}
430
431/// System that executes flows spawned in the world.
432pub fn flows_system(world: &mut World, mut flows: State<Flows>) {
433    let flows = &mut *flows;
434    flows.execute(world);
435}
436
437/// Spawn a flow into the world.
438///
439/// The flow will be polled by the `flows_system`.
440pub fn spawn<F>(world: &World, flow: F)
441where
442    F: IntoFlow + Send,
443{
444    world.expect_resource_mut::<NewSendFlows>().add(flow);
445}
446
447/// Spawn a flow into the world.
448///
449/// The flow will be polled by the `flows_system`.
450pub fn spawn_local<F>(world: &WorldLocal, flow: F)
451where
452    F: IntoFlow,
453{
454    world.expect_resource_mut::<NewLocalFlows>().add(flow);
455}
456
457/// Spawn a flow for the entity into the world.
458///
459/// The flow will be polled by the `flows_system`.
460pub fn spawn_for<F>(world: &World, id: EntityId, flow: F)
461where
462    F: IntoEntityFlow + Send,
463{
464    struct AdHoc<F> {
465        id: EntityId,
466        f: F,
467    }
468
469    impl<F> IntoFlow for AdHoc<F>
470    where
471        F: IntoEntityFlow,
472    {
473        type Flow = F::Flow;
474
475        unsafe fn into_flow(self) -> F::Flow {
476            unsafe { self.f.into_entity_flow(self.id) }
477        }
478    }
479
480    spawn(world, AdHoc { id, f: flow });
481}
482
483/// Spawn a flow for the entity into the world.
484///
485/// The flow will be polled by the `flows_system`.
486pub fn spawn_local_for<F>(world: &WorldLocal, id: EntityId, flow: F)
487where
488    F: IntoEntityFlow,
489{
490    struct AdHoc<F> {
491        id: EntityId,
492        f: F,
493    }
494
495    impl<F> IntoFlow for AdHoc<F>
496    where
497        F: IntoEntityFlow,
498    {
499        type Flow = F::Flow;
500
501        unsafe fn into_flow(self) -> F::Flow {
502            unsafe { self.f.into_entity_flow(self.id) }
503        }
504    }
505
506    spawn_local(world, AdHoc { id, f: flow });
507}
508
509/// Spawns code block as a flow.
510#[macro_export]
511macro_rules! spawn_block {
512    (in $world:ident -> $($closure:tt)*) => {
513        $crate::flow::spawn(&$world, $crate::flow_closure!(|$world: &mut $crate::flow::FlowWorld| { $($closure)* }));
514    };
515    (in $world:ident for $entity:ident -> $($closure:tt)*) => {
516        $crate::flow::spawn_for(&$world, $entity, $crate::flow_closure_for!(|mut $entity| { $($closure)* }));
517    };
518    (for $entity:ident in $world:ident -> $($closure:tt)*) => {
519        $crate::flow::spawn_for(&$world, $entity, $crate::flow_closure_for!(|mut $entity| { $($closure)* }));
520    };
521    (local $world:ident -> $($closure:tt)*) => {
522        $crate::flow::spawn_local(&$world, $crate::flow_closure!(|$world: &mut $crate::flow::FlowWorld| { $($closure)* }));
523    };
524    (local $world:ident for $entity:ident -> $($closure:tt)*) => {
525        $crate::flow::spawn_local_for(&$world, $entity, $crate::flow_closure_for!(|mut $entity| { $($closure)* }));
526    };
527    (for $entity:ident local $world:ident -> $($closure:tt)*) => {
528        $crate::flow::spawn_local_for(&$world, $entity, $crate::flow_closure_for!(|mut $entity| { $($closure)* }));
529    };
530    (for $entity:ident -> $($closure:tt)*) => {{
531        let e = $entity.id();
532        let w = $entity.get_world();
533        $crate::flow::spawn_local_for(w, e, $crate::flow_closure_for!(|mut $entity| { $($closure)* }));
534    }};
535}
536
537pub struct YieldNow {
538    yielded: bool,
539}
540
541impl YieldNow {
542    pub fn new() -> Self {
543        YieldNow { yielded: false }
544    }
545}
546
547impl Future for YieldNow {
548    type Output = ();
549
550    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
551        let me = self.get_mut();
552        if !me.yielded {
553            me.yielded = true;
554            cx.waker().wake_by_ref();
555            Poll::Pending
556        } else {
557            Poll::Ready(())
558        }
559    }
560}
561
562#[macro_export]
563macro_rules! yield_now {
564    () => {
565        $crate::private::YieldNow::new().await
566    };
567}