use core::{
any::TypeId,
cell::UnsafeCell,
future::Future,
mem::ManuallyDrop,
pin::Pin,
task::{Context, Poll, Waker},
};
use alloc::task::Wake;
use alloc::sync::Arc;
use amity::{flip_queue::FlipQueue, ring_buffer::RingBuffer};
use hashbrown::HashMap;
use slab::Slab;
use crate::{
system::State,
tls, type_id,
world::{World, WorldLocal},
EntityId,
};
mod entity;
mod world;
pub use self::{entity::*, world::*};
pub trait Flow: 'static {
unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>;
}
pub trait IntoFlow: 'static {
type Flow: Flow;
unsafe fn into_flow(self) -> Self::Flow;
}
#[inline(always)]
pub unsafe fn flow_world_ref<'a>() -> &'a WorldLocal {
unsafe { tls::get_world_ref() }
}
#[inline(always)]
pub unsafe fn flow_world_mut<'a>() -> &'a mut WorldLocal {
unsafe { tls::get_world_mut() }
}
trait AnyIntoFlows {
fn flow_id(&self) -> TypeId;
fn drain(&mut self, flows: &mut HashMap<TypeId, AnyQueue>);
}
impl<'a> dyn AnyIntoFlows + 'a {
#[inline(always)]
unsafe fn downcast_mut<F: 'static>(&mut self) -> &mut TypedIntoFlows<F> {
debug_assert_eq!(self.flow_id(), type_id::<F>());
unsafe { &mut *(self as *mut Self as *mut TypedIntoFlows<F>) }
}
}
impl<'a> dyn AnyIntoFlows + Send + 'a {
#[inline(always)]
unsafe fn downcast_mut<F: 'static>(&mut self) -> &mut TypedIntoFlows<F> {
debug_assert_eq!(self.flow_id(), type_id::<F>());
unsafe { &mut *(self as *mut Self as *mut TypedIntoFlows<F>) }
}
}
struct TypedIntoFlows<F> {
array: Vec<F>,
}
impl<F> AnyIntoFlows for TypedIntoFlows<F>
where
F: IntoFlow,
{
fn flow_id(&self) -> TypeId {
type_id::<F>()
}
fn drain(&mut self, flows: &mut HashMap<TypeId, AnyQueue>) {
if self.array.is_empty() {
return;
}
let flow_id = type_id::<F::Flow>();
let queue = flows
.entry(flow_id)
.or_insert_with(AnyQueue::new::<F::Flow>);
let typed_flows = unsafe { queue.flows.downcast_mut::<F::Flow>() };
typed_flows.array.reserve(self.array.len());
for into_flow in self.array.drain(..) {
let id = typed_flows.array.vacant_key();
let task = FlowTask {
flow: UnsafeCell::new(ManuallyDrop::new(unsafe { into_flow.into_flow() })),
id,
queue: queue.queue.clone(),
};
typed_flows.array.insert(Arc::new(task));
queue.ready.push(id);
}
}
}
struct NewSendFlows {
map: HashMap<TypeId, Box<dyn AnyIntoFlows + Send>>,
}
impl Default for NewSendFlows {
fn default() -> Self {
Self::new()
}
}
impl NewSendFlows {
fn new() -> Self {
NewSendFlows {
map: HashMap::new(),
}
}
pub fn typed_new_flows<F>(&mut self) -> &mut TypedIntoFlows<F>
where
F: IntoFlow + Send,
{
let new_flows = self
.map
.entry(type_id::<F>())
.or_insert_with(|| Box::new(TypedIntoFlows::<F> { array: Vec::new() }));
unsafe { new_flows.downcast_mut::<F>() }
}
pub fn add<F>(&mut self, flow: F)
where
F: IntoFlow + Send,
{
let typed_new_flows = self.typed_new_flows();
typed_new_flows.array.push(flow);
}
}
struct NewLocalFlows {
map: HashMap<TypeId, Box<dyn AnyIntoFlows>>,
}
impl Default for NewLocalFlows {
fn default() -> Self {
Self::new()
}
}
impl NewLocalFlows {
fn new() -> Self {
NewLocalFlows {
map: HashMap::new(),
}
}
pub fn typed_new_flows<F>(&mut self) -> &mut TypedIntoFlows<F>
where
F: IntoFlow,
{
let new_flows = self
.map
.entry(type_id::<F>())
.or_insert_with(|| Box::new(TypedIntoFlows::<F> { array: Vec::new() }));
unsafe { new_flows.downcast_mut::<F>() }
}
pub fn add<F>(&mut self, flow: F)
where
F: IntoFlow,
{
let typed_new_flows = self.typed_new_flows();
typed_new_flows.array.push(flow);
}
}
trait AnyFlows {
#[cfg(debug_assertions)]
fn flow_id(&self) -> TypeId;
unsafe fn execute(&mut self, front: &[usize], back: &[usize]);
}
impl dyn AnyFlows {
#[inline(always)]
unsafe fn downcast_mut<F: 'static>(&mut self) -> &mut TypedFlows<F> {
#[cfg(debug_assertions)]
assert_eq!(self.flow_id(), type_id::<F>());
unsafe { &mut *(self as *mut Self as *mut TypedFlows<F>) }
}
}
struct FlowTask<F> {
flow: UnsafeCell<ManuallyDrop<F>>,
id: usize,
queue: Arc<FlipQueue<usize>>,
}
unsafe impl<F> Send for FlowTask<F> {}
unsafe impl<F> Sync for FlowTask<F> {}
impl<F> Wake for FlowTask<F>
where
F: Flow,
{
fn wake(self: Arc<Self>) {
self.queue.push(self.id);
}
fn wake_by_ref(self: &Arc<Self>) {
self.queue.push(self.id);
}
}
impl<F> FlowTask<F>
where
F: Flow,
{
fn waker(self: &Arc<Self>) -> Waker {
Waker::from(self.clone())
}
}
struct TypedFlows<F> {
array: Slab<Arc<FlowTask<F>>>,
}
impl<F> TypedFlows<F>
where
F: Flow,
{
#[inline(always)]
unsafe fn execute(&mut self, ids: &[usize]) {
for &id in ids {
let Some(task) = self.array.get(id) else {
continue;
};
let waker = task.waker();
let mut cx = Context::from_waker(&waker);
let poll = unsafe {
let pinned = Pin::new_unchecked(&mut **task.flow.get());
unsafe { pinned.poll(&mut cx) }
};
if let Poll::Ready(()) = poll {
let task = self.array.remove(id);
unsafe {
ManuallyDrop::drop(&mut *task.flow.get());
}
}
}
}
}
impl<F> AnyFlows for TypedFlows<F>
where
F: Flow,
{
#[cfg(debug_assertions)]
fn flow_id(&self) -> TypeId {
type_id::<F>()
}
unsafe fn execute(&mut self, front: &[usize], back: &[usize]) {
self.execute(front);
self.execute(back);
}
}
struct AnyQueue {
queue: Arc<FlipQueue<usize>>,
ready: RingBuffer<usize>,
flows: Box<dyn AnyFlows>,
}
impl AnyQueue {
fn new<F: Flow>() -> Self {
AnyQueue {
queue: Arc::new(FlipQueue::new()),
ready: RingBuffer::new(),
flows: Box::new(TypedFlows::<F> { array: Slab::new() }),
}
}
}
pub struct Flows {
new_flows: NewSendFlows,
new_local_flows: NewLocalFlows,
map: HashMap<TypeId, AnyQueue>,
}
impl Default for Flows {
fn default() -> Self {
Self::new()
}
}
impl Flows {
pub fn new() -> Self {
Flows {
new_flows: NewSendFlows::new(),
new_local_flows: NewLocalFlows::new(),
map: HashMap::new(),
}
}
pub fn init(world: &mut World) {
world.with_resource(NewSendFlows::new);
world.with_resource(NewLocalFlows::new);
}
fn collect_new_flows<'a>(&mut self, world: &'a mut World) -> Option<tls::Guard<'a>> {
let world = world.local();
let mut new_flows_res = match world.get_resource_mut::<NewSendFlows>() {
None => return None,
Some(new_flows) => new_flows,
};
std::mem::swap(&mut self.new_flows, &mut *new_flows_res);
drop(new_flows_res);
let mut new_local_flows_res = match world.get_resource_mut::<NewLocalFlows>() {
None => return None,
Some(new_local_flows) => new_local_flows,
};
std::mem::swap(&mut self.new_local_flows, &mut *new_local_flows_res);
drop(new_local_flows_res);
let guard = tls::Guard::new(world);
for typed in self.map.values_mut() {
debug_assert!(typed.ready.is_empty());
typed.queue.swap_buffer(&mut typed.ready);
}
for (_, typed) in &mut self.new_flows.map {
typed.drain(&mut self.map);
}
for (_, typed) in &mut self.new_local_flows.map {
typed.drain(&mut self.map);
}
Some(guard)
}
pub fn execute(&mut self, world: &mut World) {
let Some(_guard) = self.collect_new_flows(world) else {
return;
};
for typed in self.map.values_mut() {
let (front, back) = typed.ready.as_slices();
unsafe {
typed.flows.execute(front, back);
}
typed.ready.clear();
}
}
}
pub fn flows_system(world: &mut World, mut flows: State<Flows>) {
let flows = &mut *flows;
flows.execute(world);
}
pub fn spawn<F>(world: &World, flow: F)
where
F: IntoFlow + Send,
{
world.expect_resource_mut::<NewSendFlows>().add(flow);
}
pub fn spawn_local<F>(world: &WorldLocal, flow: F)
where
F: IntoFlow,
{
world.expect_resource_mut::<NewLocalFlows>().add(flow);
}
pub fn spawn_for<F>(world: &World, id: EntityId, flow: F)
where
F: IntoEntityFlow + Send,
{
struct AdHoc<F> {
id: EntityId,
f: F,
}
impl<F> IntoFlow for AdHoc<F>
where
F: IntoEntityFlow,
{
type Flow = F::Flow;
unsafe fn into_flow(self) -> F::Flow {
unsafe { self.f.into_entity_flow(self.id) }
}
}
spawn(world, AdHoc { id, f: flow });
}
pub fn spawn_local_for<F>(world: &WorldLocal, id: EntityId, flow: F)
where
F: IntoEntityFlow,
{
struct AdHoc<F> {
id: EntityId,
f: F,
}
impl<F> IntoFlow for AdHoc<F>
where
F: IntoEntityFlow,
{
type Flow = F::Flow;
unsafe fn into_flow(self) -> F::Flow {
unsafe { self.f.into_entity_flow(self.id) }
}
}
spawn_local(world, AdHoc { id, f: flow });
}
#[macro_export]
macro_rules! spawn_block {
(in $world:ident -> $($closure:tt)*) => {
$crate::flow::spawn(&$world, $crate::flow_closure!(|$world: &mut $crate::flow::FlowWorld| { $($closure)* }));
};
(in $world:ident for $entity:ident -> $($closure:tt)*) => {
$crate::flow::spawn_for(&$world, $entity, $crate::flow_closure_for!(|mut $entity| { $($closure)* }));
};
(for $entity:ident in $world:ident -> $($closure:tt)*) => {
$crate::flow::spawn_for(&$world, $entity, $crate::flow_closure_for!(|mut $entity| { $($closure)* }));
};
(local $world:ident -> $($closure:tt)*) => {
$crate::flow::spawn_local(&$world, $crate::flow_closure!(|$world: &mut $crate::flow::FlowWorld| { $($closure)* }));
};
(local $world:ident for $entity:ident -> $($closure:tt)*) => {
$crate::flow::spawn_local_for(&$world, $entity, $crate::flow_closure_for!(|mut $entity| { $($closure)* }));
};
(for $entity:ident local $world:ident -> $($closure:tt)*) => {
$crate::flow::spawn_local_for(&$world, $entity, $crate::flow_closure_for!(|mut $entity| { $($closure)* }));
};
(for $entity:ident -> $($closure:tt)*) => {{
let e = $entity.id();
let w = $entity.get_world();
$crate::flow::spawn_local_for(w, e, $crate::flow_closure_for!(|mut $entity| { $($closure)* }));
}};
}
pub struct YieldNow {
yielded: bool,
}
impl YieldNow {
pub fn new() -> Self {
YieldNow { yielded: false }
}
}
impl Future for YieldNow {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.get_mut();
if !me.yielded {
me.yielded = true;
cx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(())
}
}
}
#[macro_export]
macro_rules! yield_now {
() => {
$crate::private::YieldNow::new().await
};
}