1use core::{
18 any::TypeId,
19 cell::UnsafeCell,
20 future::Future,
21 mem::ManuallyDrop,
22 pin::Pin,
23 task::{Context, Poll, Waker},
24};
25
26use alloc::task::Wake;
27
28use alloc::sync::Arc;
29use amity::{flip_queue::FlipQueue, ring_buffer::RingBuffer};
30use hashbrown::HashMap;
31use slab::Slab;
32
33use crate::{
34 system::State,
35 tls, type_id,
36 world::{World, WorldLocal},
37 EntityId,
38};
39
40mod entity;
41mod world;
42
43pub use self::{entity::*, world::*};
44
45pub trait Flow: 'static {
47 unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>;
48}
49
50pub trait IntoFlow: 'static {
51 type Flow: Flow;
52
53 unsafe fn into_flow(self) -> Self::Flow;
55}
56
57#[inline(always)]
62pub unsafe fn flow_world_ref<'a>() -> &'a WorldLocal {
63 unsafe { tls::get_world_ref() }
64}
65
66#[inline(always)]
71pub unsafe fn flow_world_mut<'a>() -> &'a mut WorldLocal {
72 unsafe { tls::get_world_mut() }
73}
74
75trait AnyIntoFlows {
77 fn flow_id(&self) -> TypeId;
79
80 fn drain(&mut self, flows: &mut HashMap<TypeId, AnyQueue>);
82}
83
84impl<'a> dyn AnyIntoFlows + 'a {
85 #[inline(always)]
86 unsafe fn downcast_mut<F: 'static>(&mut self) -> &mut TypedIntoFlows<F> {
87 debug_assert_eq!(self.flow_id(), type_id::<F>());
88
89 unsafe { &mut *(self as *mut Self as *mut TypedIntoFlows<F>) }
90 }
91}
92
93impl<'a> dyn AnyIntoFlows + Send + 'a {
94 #[inline(always)]
95 unsafe fn downcast_mut<F: 'static>(&mut self) -> &mut TypedIntoFlows<F> {
96 debug_assert_eq!(self.flow_id(), type_id::<F>());
97
98 unsafe { &mut *(self as *mut Self as *mut TypedIntoFlows<F>) }
99 }
100}
101
102struct TypedIntoFlows<F> {
104 array: Vec<F>,
105}
106
107impl<F> AnyIntoFlows for TypedIntoFlows<F>
108where
109 F: IntoFlow,
110{
111 fn flow_id(&self) -> TypeId {
112 type_id::<F>()
113 }
114
115 fn drain(&mut self, flows: &mut HashMap<TypeId, AnyQueue>) {
116 if self.array.is_empty() {
118 return;
119 }
120
121 let flow_id = type_id::<F::Flow>();
122
123 let queue = flows
125 .entry(flow_id)
126 .or_insert_with(AnyQueue::new::<F::Flow>);
127
128 let typed_flows = unsafe { queue.flows.downcast_mut::<F::Flow>() };
130
131 typed_flows.array.reserve(self.array.len());
133
134 for into_flow in self.array.drain(..) {
135 let id = typed_flows.array.vacant_key();
136
137 let task = FlowTask {
138 flow: UnsafeCell::new(ManuallyDrop::new(unsafe { into_flow.into_flow() })),
139 id,
140 queue: queue.queue.clone(),
141 };
142
143 typed_flows.array.insert(Arc::new(task));
144 queue.ready.push(id);
145 }
146 }
147}
148
149struct NewSendFlows {
150 map: HashMap<TypeId, Box<dyn AnyIntoFlows + Send>>,
151}
152
153impl Default for NewSendFlows {
154 fn default() -> Self {
155 Self::new()
156 }
157}
158
159impl NewSendFlows {
160 fn new() -> Self {
161 NewSendFlows {
162 map: HashMap::new(),
163 }
164 }
165
166 pub fn typed_new_flows<F>(&mut self) -> &mut TypedIntoFlows<F>
167 where
168 F: IntoFlow + Send,
169 {
170 let new_flows = self
171 .map
172 .entry(type_id::<F>())
173 .or_insert_with(|| Box::new(TypedIntoFlows::<F> { array: Vec::new() }));
174
175 unsafe { new_flows.downcast_mut::<F>() }
176 }
177
178 pub fn add<F>(&mut self, flow: F)
179 where
180 F: IntoFlow + Send,
181 {
182 let typed_new_flows = self.typed_new_flows();
183 typed_new_flows.array.push(flow);
184 }
185}
186
187struct NewLocalFlows {
188 map: HashMap<TypeId, Box<dyn AnyIntoFlows>>,
189}
190
191impl Default for NewLocalFlows {
192 fn default() -> Self {
193 Self::new()
194 }
195}
196
197impl NewLocalFlows {
198 fn new() -> Self {
199 NewLocalFlows {
200 map: HashMap::new(),
201 }
202 }
203
204 pub fn typed_new_flows<F>(&mut self) -> &mut TypedIntoFlows<F>
205 where
206 F: IntoFlow,
207 {
208 let new_flows = self
209 .map
210 .entry(type_id::<F>())
211 .or_insert_with(|| Box::new(TypedIntoFlows::<F> { array: Vec::new() }));
212
213 unsafe { new_flows.downcast_mut::<F>() }
214 }
215
216 pub fn add<F>(&mut self, flow: F)
217 where
218 F: IntoFlow,
219 {
220 let typed_new_flows = self.typed_new_flows();
221 typed_new_flows.array.push(flow);
222 }
223}
224
225trait AnyFlows {
227 #[cfg(debug_assertions)]
228 fn flow_id(&self) -> TypeId;
229
230 unsafe fn execute(&mut self, front: &[usize], back: &[usize]);
231}
232
233impl dyn AnyFlows {
234 #[inline(always)]
235 unsafe fn downcast_mut<F: 'static>(&mut self) -> &mut TypedFlows<F> {
236 #[cfg(debug_assertions)]
237 assert_eq!(self.flow_id(), type_id::<F>());
238
239 unsafe { &mut *(self as *mut Self as *mut TypedFlows<F>) }
240 }
241}
242
243struct FlowTask<F> {
244 flow: UnsafeCell<ManuallyDrop<F>>,
245 id: usize,
246 queue: Arc<FlipQueue<usize>>,
247}
248
249unsafe impl<F> Send for FlowTask<F> {}
252unsafe impl<F> Sync for FlowTask<F> {}
253
254impl<F> Wake for FlowTask<F>
255where
256 F: Flow,
257{
258 fn wake(self: Arc<Self>) {
259 self.queue.push(self.id);
260 }
261
262 fn wake_by_ref(self: &Arc<Self>) {
263 self.queue.push(self.id);
264 }
265}
266
267impl<F> FlowTask<F>
268where
269 F: Flow,
270{
271 fn waker(self: &Arc<Self>) -> Waker {
272 Waker::from(self.clone())
273 }
274}
275
276struct TypedFlows<F> {
278 array: Slab<Arc<FlowTask<F>>>,
279}
280
281impl<F> TypedFlows<F>
282where
283 F: Flow,
284{
285 #[inline(always)]
286 unsafe fn execute(&mut self, ids: &[usize]) {
287 for &id in ids {
288 let Some(task) = self.array.get(id) else {
289 continue;
290 };
291
292 let waker = task.waker();
293 let mut cx = Context::from_waker(&waker);
294
295 let poll = unsafe {
298 let pinned = Pin::new_unchecked(&mut **task.flow.get());
299 unsafe { pinned.poll(&mut cx) }
300 };
301
302 if let Poll::Ready(()) = poll {
303 let task = self.array.remove(id);
304 unsafe {
306 ManuallyDrop::drop(&mut *task.flow.get());
307 }
308 }
309 }
310 }
311}
312
313impl<F> AnyFlows for TypedFlows<F>
314where
315 F: Flow,
316{
317 #[cfg(debug_assertions)]
318 fn flow_id(&self) -> TypeId {
319 type_id::<F>()
320 }
321
322 unsafe fn execute(&mut self, front: &[usize], back: &[usize]) {
323 self.execute(front);
324 self.execute(back);
325 }
326}
327
328struct AnyQueue {
330 queue: Arc<FlipQueue<usize>>,
331 ready: RingBuffer<usize>,
332 flows: Box<dyn AnyFlows>,
333}
334
335impl AnyQueue {
336 fn new<F: Flow>() -> Self {
337 AnyQueue {
338 queue: Arc::new(FlipQueue::new()),
339 ready: RingBuffer::new(),
340 flows: Box::new(TypedFlows::<F> { array: Slab::new() }),
341 }
342 }
343}
344
345pub struct Flows {
348 new_flows: NewSendFlows,
349 new_local_flows: NewLocalFlows,
350 map: HashMap<TypeId, AnyQueue>,
351}
352
353impl Default for Flows {
354 fn default() -> Self {
355 Self::new()
356 }
357}
358
359impl Flows {
360 pub fn new() -> Self {
361 Flows {
362 new_flows: NewSendFlows::new(),
363 new_local_flows: NewLocalFlows::new(),
364 map: HashMap::new(),
365 }
366 }
367
368 pub fn init(world: &mut World) {
370 world.with_resource(NewSendFlows::new);
371 world.with_resource(NewLocalFlows::new);
372 }
373
374 fn collect_new_flows<'a>(&mut self, world: &'a mut World) -> Option<tls::Guard<'a>> {
375 let world = world.local();
376
377 let mut new_flows_res = match world.get_resource_mut::<NewSendFlows>() {
378 None => return None,
379 Some(new_flows) => new_flows,
380 };
381
382 std::mem::swap(&mut self.new_flows, &mut *new_flows_res);
383 drop(new_flows_res);
384
385 let mut new_local_flows_res = match world.get_resource_mut::<NewLocalFlows>() {
386 None => return None,
387 Some(new_local_flows) => new_local_flows,
388 };
389
390 std::mem::swap(&mut self.new_local_flows, &mut *new_local_flows_res);
391 drop(new_local_flows_res);
392
393 let guard = tls::Guard::new(world);
394
395 for typed in self.map.values_mut() {
397 debug_assert!(typed.ready.is_empty());
398 typed.queue.swap_buffer(&mut typed.ready);
399 }
400
401 for (_, typed) in &mut self.new_flows.map {
404 typed.drain(&mut self.map);
405 }
406 for (_, typed) in &mut self.new_local_flows.map {
407 typed.drain(&mut self.map);
408 }
409
410 Some(guard)
411 }
412
413 pub fn execute(&mut self, world: &mut World) {
414 let Some(_guard) = self.collect_new_flows(world) else {
415 return;
416 };
417
418 for typed in self.map.values_mut() {
420 let (front, back) = typed.ready.as_slices();
421 unsafe {
422 typed.flows.execute(front, back);
423 }
424
425 typed.ready.clear();
427 }
428 }
429}
430
431pub fn flows_system(world: &mut World, mut flows: State<Flows>) {
433 let flows = &mut *flows;
434 flows.execute(world);
435}
436
437pub fn spawn<F>(world: &World, flow: F)
441where
442 F: IntoFlow + Send,
443{
444 world.expect_resource_mut::<NewSendFlows>().add(flow);
445}
446
447pub fn spawn_local<F>(world: &WorldLocal, flow: F)
451where
452 F: IntoFlow,
453{
454 world.expect_resource_mut::<NewLocalFlows>().add(flow);
455}
456
457pub fn spawn_for<F>(world: &World, id: EntityId, flow: F)
461where
462 F: IntoEntityFlow + Send,
463{
464 struct AdHoc<F> {
465 id: EntityId,
466 f: F,
467 }
468
469 impl<F> IntoFlow for AdHoc<F>
470 where
471 F: IntoEntityFlow,
472 {
473 type Flow = F::Flow;
474
475 unsafe fn into_flow(self) -> F::Flow {
476 unsafe { self.f.into_entity_flow(self.id) }
477 }
478 }
479
480 spawn(world, AdHoc { id, f: flow });
481}
482
483pub fn spawn_local_for<F>(world: &WorldLocal, id: EntityId, flow: F)
487where
488 F: IntoEntityFlow,
489{
490 struct AdHoc<F> {
491 id: EntityId,
492 f: F,
493 }
494
495 impl<F> IntoFlow for AdHoc<F>
496 where
497 F: IntoEntityFlow,
498 {
499 type Flow = F::Flow;
500
501 unsafe fn into_flow(self) -> F::Flow {
502 unsafe { self.f.into_entity_flow(self.id) }
503 }
504 }
505
506 spawn_local(world, AdHoc { id, f: flow });
507}
508
509#[macro_export]
511macro_rules! spawn_block {
512 (in $world:ident -> $($closure:tt)*) => {
513 $crate::flow::spawn(&$world, $crate::flow_closure!(|$world: &mut $crate::flow::FlowWorld| { $($closure)* }));
514 };
515 (in $world:ident for $entity:ident -> $($closure:tt)*) => {
516 $crate::flow::spawn_for(&$world, $entity, $crate::flow_closure_for!(|mut $entity| { $($closure)* }));
517 };
518 (for $entity:ident in $world:ident -> $($closure:tt)*) => {
519 $crate::flow::spawn_for(&$world, $entity, $crate::flow_closure_for!(|mut $entity| { $($closure)* }));
520 };
521 (local $world:ident -> $($closure:tt)*) => {
522 $crate::flow::spawn_local(&$world, $crate::flow_closure!(|$world: &mut $crate::flow::FlowWorld| { $($closure)* }));
523 };
524 (local $world:ident for $entity:ident -> $($closure:tt)*) => {
525 $crate::flow::spawn_local_for(&$world, $entity, $crate::flow_closure_for!(|mut $entity| { $($closure)* }));
526 };
527 (for $entity:ident local $world:ident -> $($closure:tt)*) => {
528 $crate::flow::spawn_local_for(&$world, $entity, $crate::flow_closure_for!(|mut $entity| { $($closure)* }));
529 };
530 (for $entity:ident -> $($closure:tt)*) => {{
531 let e = $entity.id();
532 let w = $entity.get_world();
533 $crate::flow::spawn_local_for(w, e, $crate::flow_closure_for!(|mut $entity| { $($closure)* }));
534 }};
535}
536
537pub struct YieldNow {
538 yielded: bool,
539}
540
541impl YieldNow {
542 pub fn new() -> Self {
543 YieldNow { yielded: false }
544 }
545}
546
547impl Future for YieldNow {
548 type Output = ();
549
550 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
551 let me = self.get_mut();
552 if !me.yielded {
553 me.yielded = true;
554 cx.waker().wake_by_ref();
555 Poll::Pending
556 } else {
557 Poll::Ready(())
558 }
559 }
560}
561
562#[macro_export]
563macro_rules! yield_now {
564 () => {
565 $crate::private::YieldNow::new().await
566 };
567}