use std::any::TypeId;
use std::borrow::Cow;
use std::fmt::{Debug, Display, Formatter, Result};
use std::hint::spin_loop;
use std::marker::PhantomData;
use std::mem::transmute;
use std::sync::atomic::Ordering;
use async_channel::{bounded, Receiver, RecvError, Sender};
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use pi_append_vec::AppendVec;
use pi_arr::Iter;
use pi_async_rt::prelude::AsyncRuntime;
use pi_null::Null;
use pi_share::{fence, Share, ShareMutex, ShareU32, ShareU64};
use crate::archetype::{Archetype, ArchetypeDependResult, Flags};
use crate::dot::{Config, Dot};
use crate::listener::Listener;
use crate::safe_vec::SafeVec;
use crate::system::BoxedSystem;
use crate::world::{ArchetypeInit, World};
const NODE_STATUS_STEP: u32 = 0x1000_0000;
const NODE_STATUS_WAIT: u32 = 0;
const NODE_STATUS_RUN_START: u32 = NODE_STATUS_STEP; const NODE_STATUS_RUNNING: u32 = NODE_STATUS_RUN_START + NODE_STATUS_STEP; const NODE_STATUS_RUN_END: u32 = NODE_STATUS_RUNNING + NODE_STATUS_STEP; const NODE_STATUS_OVER: u32 = NODE_STATUS_RUN_END + NODE_STATUS_STEP;
#[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Ord, Eq, Hash)]
pub struct NodeIndex(u32);
impl NodeIndex {
#[inline]
pub fn new(index: usize) -> Self {
NodeIndex(index as u32)
}
#[inline]
pub fn index(self) -> usize {
self.0 as usize
}
}
impl Null for NodeIndex {
fn null() -> Self {
Self(u32::null())
}
fn is_null(&self) -> bool {
self.0.is_null()
}
}
#[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Ord, Eq, Hash)]
pub struct EdgeIndex(u32);
impl EdgeIndex {
#[inline]
pub fn new(index: usize) -> Self {
EdgeIndex(index as u32)
}
#[inline]
pub fn index(self) -> usize {
self.0 as usize
}
}
impl Null for EdgeIndex {
fn null() -> Self {
Self(u32::null())
}
fn is_null(&self) -> bool {
self.0.is_null()
}
}
#[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Ord, Eq, Hash)]
#[repr(usize)]
pub enum Direction {
From = 0,
To = 1,
}
impl Direction {
#[inline]
pub const fn opposite(self) -> Direction {
unsafe { transmute((self as usize) ^ 1) }
}
#[inline]
pub const fn index(self) -> usize {
self as usize
}
}
#[derive(Clone, Default)]
pub struct ExecGraph(Share<GraphInner>);
impl ExecGraph {
pub fn add_system(&self, sys_index: usize, sys_name: Cow<'static, str>) -> usize {
let inner = self.0.as_ref();
inner.nodes.insert(Node::new_system(sys_index, sys_name))
}
pub fn node_references<'a>(&'a self) -> Iter<'a, Node> {
self.0.as_ref().nodes.iter()
}
pub fn edge_references<'a>(&'a self) -> Iter<'a, Edge> {
self.0.as_ref().edges.iter()
}
pub fn neighbors(&self, node_index: NodeIndex, d: Direction) -> NeighborIter<'_> {
self.0.as_ref().neighbors(node_index, d)
}
pub fn froms(&self) -> &Vec<NodeIndex> {
let inner = self.0.as_ref();
&inner.froms
}
pub fn to_len(&self) -> u32 {
let inner = self.0.as_ref();
inner.to_len.load(Ordering::Relaxed)
}
pub fn initialize(&mut self, systems: Share<SafeVec<BoxedSystem>>, world: &mut World) {
let inner = self.0.as_ref();
inner
.to_len
.store(inner.nodes.len() as u32, Ordering::Relaxed);
for r in world.single_res_map.iter() {
self.add_res_node(&systems, r.key(), r.value().name(), true, world);
}
for r in world.multi_res_map.iter() {
self.add_res_node(&systems, r.key(), r.value().name(), false, world);
}
for r in world.archetype_arr.iter() {
self.add_archetype_node(&systems, r, world);
}
dbg!(
"res & archtypes initialized",
Dot::with_config(&self, Config::empty())
);
let inner = Share::<GraphInner>::get_mut(&mut self.0).unwrap();
inner.nodes.collect();
inner.edges.collect();
let mut to_len = 0;
for (index, node) in inner.nodes.iter().enumerate() {
if node.edge(Direction::From).0 == 0 {
inner.froms.push(NodeIndex::new(index));
}
if node.edge(Direction::To).0 == 0 {
to_len += 1;
}
}
assert_eq!(to_len, self.to_len());
let notify = Notify(self.clone(), systems, true, PhantomData);
world.listener_mgr.register_event(Share::new(notify));
world.listener_mgr.collect();
}
fn add_res_node(
&self,
systems: &Share<SafeVec<BoxedSystem>>,
tid: &TypeId,
name: &Cow<'static, str>,
single: bool,
world: &World,
) {
let inner = self.0.as_ref();
let _unused = inner.lock.lock();
inner.to_len.fetch_add(1, Ordering::Relaxed);
let node_index = NodeIndex::new(inner.nodes.insert(Node::new_res(name.clone())));
for (system_index, node) in inner.nodes.iter().enumerate() {
let system_index = NodeIndex::new(system_index);
match &node.label {
NodeType::System(sys_index, _) => {
let sys = unsafe { systems.load_unchecked_mut(*sys_index) };
let mut result = Flags::empty();
sys.res_depend(world, tid, name, single, &mut result);
if result == Flags::READ {
inner.add_edge(node_index, system_index);
continue;
} else if result == Flags::WRITE {
inner.adjust_edge(system_index, node_index);
} else if result == Flags::SHARE_WRITE {
inner.add_edge(system_index, node_index);
} else {
continue;
}
}
_ => break,
}
}
}
fn add_archetype_node(
&self,
systems: &Share<SafeVec<BoxedSystem>>,
archetype: &Archetype,
world: &World,
) {
let inner = self.0.as_ref();
let _unused = inner.lock.lock();
let id_name = (*archetype.id(), archetype.name().clone());
let node_index = inner.find_node(id_name);
let mut depend = ArchetypeDependResult::new();
for (system_index, node) in inner.nodes.iter().enumerate() {
let system_index = NodeIndex::new(system_index);
match &node.label {
NodeType::System(sys_index, _) => {
let sys = unsafe { systems.load_unchecked_mut(*sys_index) };
depend.clear();
sys.archetype_depend(world, archetype, &mut depend);
if depend.flag.contains(Flags::WITHOUT) {
continue;
}
if !depend.alters.is_empty() {
inner.adjust_edge(system_index, node_index);
for id_name in depend.alters.iter() {
let alter_node_index = inner.find_node(id_name.clone());
if alter_node_index != node_index {
inner.adjust_edge(system_index, alter_node_index);
}
}
} else if depend.flag == Flags::READ {
inner.add_edge(node_index, system_index);
continue;
} else if depend.flag.bits() != 0 {
inner.adjust_edge(system_index, node_index);
} else {
continue;
}
}
_ => break,
}
}
}
pub async fn run<A: AsyncRuntime>(
&self,
systems: &'static Share<SafeVec<BoxedSystem>>,
rt: &A,
world: &'static World,
) -> std::result::Result<(), RecvError> {
let inner = self.0.as_ref();
assert!(inner.to_len.load(Ordering::Relaxed) > 0);
let to_len = inner.to_len.load(Ordering::Relaxed);
inner.to_count.store(to_len, Ordering::Relaxed);
fence(Ordering::Acquire);
for node in inner.nodes.iter() {
node.status.store(NODE_STATUS_WAIT, Ordering::Relaxed);
node.from_count
.store(node.edge(Direction::From).0, Ordering::Relaxed);
}
for i in inner.froms.iter() {
let node = unsafe { inner.nodes.load_unchecked(i.index()) };
self.exec(systems, rt, world, *i, node, vec![], u32::null());
}
inner.receiver.recv().await
}
fn exec<A: AsyncRuntime>(
&self,
systems: &'static SafeVec<BoxedSystem>,
rt: &A,
world: &'static World,
node_index: NodeIndex,
node: &Node,
mut vec: Vec<u32>,
parent: u32,
) {
match node.label {
NodeType::System(sys_index, _) => {
let r = node.status.fetch_add(NODE_STATUS_STEP, Ordering::Relaxed);
if r != NODE_STATUS_WAIT {
panic!("status err:{}, node_index:{} node:{:?}, parent:{} vec:{:?}", r, node_index.index(), node, parent, vec)
}else if parent == 0 {
let p = unsafe { self.0.as_ref().nodes.load_unchecked(parent as usize) };
if p.status.load(Ordering::Relaxed) != NODE_STATUS_RUN_END {
panic!("parent status err, node_index:{} node:{:?}, vec:{:?}", r, node, vec)
}
}
vec.push(r);
let rt1 = rt.clone();
let g = self.clone();
let _ = rt.spawn(async move {
let sys = unsafe { systems.load_unchecked_mut(sys_index) };
sys.align(world);
let inner = g.0.as_ref();
let node = unsafe { inner.nodes.load_unchecked(node_index.index()) };
let r = node.status.fetch_add(NODE_STATUS_STEP, Ordering::Relaxed);
vec.push(r);
sys.run(world).await;
g.exec_end(systems, &rt1, world, node, vec, node_index)
});
}
_ => {
node.status
.fetch_add(NODE_STATUS_STEP + NODE_STATUS_STEP, Ordering::Relaxed);
self.exec_end(systems, rt, world, node, vec, node_index)
}
}
}
fn exec_end<A: AsyncRuntime>(
&self,
systems: &'static SafeVec<BoxedSystem>,
rt: &A,
world: &'static World,
node: &Node,
mut vec: Vec<u32>,
node_index: NodeIndex,
) {
let mut status =
node.status.fetch_add(NODE_STATUS_STEP, Ordering::Relaxed) + NODE_STATUS_STEP;
while status != NODE_STATUS_RUN_END {
let s = status;
spin_loop();
status = node.status.load(Ordering::Relaxed);
panic!("status err node_index:{} status:{}={} node:{:?} vec:{:?}, ", node_index.index(), s, status, node, vec);
}
let inner = self.0.as_ref();
let it = NeighborIter::new(inner, Direction::To, node.edge(Direction::To));
vec.clear();
let mut it1 = it.clone();
while let Some(n) = it1.next() {
vec.push(n.index() as u32);
}
vec.push(u32::null());
if it.edge.0 == 0 {
node.status.fetch_add(NODE_STATUS_STEP, Ordering::Relaxed);
return inner.run_over(rt);
}
for n in it {
let node = unsafe { inner.nodes.load_unchecked(n.index()) };
let r = node.from_count.fetch_sub(1, Ordering::Relaxed);
if r == 1 {
self.exec(systems, rt, world, n, node, vec.clone(), node_index.index() as u32);
}
}
node.status.fetch_add(NODE_STATUS_STEP, Ordering::Relaxed);
}
pub fn collect(&mut self) {
let inner = unsafe { Share::get_mut_unchecked(&mut self.0) };
inner.nodes.collect();
inner.edges.collect();
}
}
pub struct GraphInner {
nodes: AppendVec<Node>,
edges: AppendVec<Edge>,
map: DashMap<u128, NodeIndex>,
to_len: ShareU32,
froms: Vec<NodeIndex>,
lock: ShareMutex<()>,
to_count: ShareU32,
sender: Sender<()>,
receiver: Receiver<()>,
}
impl GraphInner {
fn find_node(&self, id_name: (u128, Cow<'static, str>)) -> NodeIndex {
match self.map.entry(id_name.0) {
Entry::Occupied(entry) => entry.get().clone(),
Entry::Vacant(entry) => {
self.to_len.fetch_add(1, Ordering::Relaxed);
let node_index = NodeIndex::new(self.nodes.insert(Node::new_archetype(id_name)));
entry.insert(node_index);
node_index
}
}
}
fn adjust_edge(&self, from: NodeIndex, to: NodeIndex) {
let mut big_node_index = u32::MAX;
let mut small_node_index = -1;
for old_from in self.neighbors(to, Direction::From) {
if old_from < from {
if old_from.index() as i32 > small_node_index {
small_node_index = old_from.index() as i32;
}
} else if old_from > from {
if old_from.0 < big_node_index {
big_node_index = old_from.0;
}
} else {
return;
}
}
if big_node_index != u32::MAX && !self.has_edge(from, NodeIndex(big_node_index)) {
self.add_edge(from, NodeIndex(big_node_index));
}
if small_node_index >= 0 && !self.has_edge(NodeIndex(small_node_index as u32), from) {
self.add_edge(NodeIndex(small_node_index as u32), from);
}
self.add_edge(from, to);
}
fn has_edge(&self, from: NodeIndex, to: NodeIndex) -> bool {
for old_from in self.neighbors(to, Direction::From) {
if old_from == from {
return true;
}
}
false
}
fn add_edge(&self, from: NodeIndex, to: NodeIndex) {
let to_node = unsafe { self.nodes.load_unchecked(to.index()) };
let from_node = unsafe { self.nodes.load_unchecked(from.index()) };
let status = from_node.status.fetch_add(1, Ordering::Relaxed);
let r = if status < NODE_STATUS_RUN_END {
to_node.from_count.fetch_add(1, Ordering::Relaxed)
} else if status >= NODE_STATUS_OVER {
1
} else {
while from_node.status.load(Ordering::Relaxed) < NODE_STATUS_OVER {
spin_loop();
}
1
};
let (from_edge_len, from_next_edge) = to_node.edge(Direction::From);
let from_cur = encode(from_edge_len, from_next_edge.0);
let (to_edge_len, to_next_edge) = from_node.edge(Direction::To);
let to_cur = encode(to_edge_len, to_next_edge.0);
let e = Edge::new(from, from_next_edge, to, to_next_edge);
let edge_index = EdgeIndex::new(self.edges.insert(e));
let e = unsafe { self.edges.load_unchecked(edge_index.index()) };
let _ = self.link_edge(
from,
&to_node.edges[Direction::From.index()],
from_cur,
from_edge_len,
edge_index,
&e,
Direction::From,
);
let old_to_len = self.link_edge(
to,
&from_node.edges[Direction::To.index()],
to_cur,
to_edge_len,
edge_index,
&e,
Direction::To,
);
from_node.status.fetch_sub(1, Ordering::Relaxed);
if old_to_len == 0 {
self.to_len.fetch_sub(1, Ordering::Relaxed);
self.to_count.fetch_sub(1, Ordering::Relaxed);
}
if r == 0 {
while to_node.status.load(Ordering::Relaxed) < NODE_STATUS_RUNNING {
spin_loop();
}
}
}
fn link_edge(
&self,
node_index: NodeIndex,
node_edge: &ShareU64,
mut cur: u64,
mut edge_len: u32,
edge_index: EdgeIndex,
edge: &Edge,
d: Direction,
) -> u32 {
cur = match node_edge.compare_exchange(
cur,
encode(edge_len + 1, edge_index.0),
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => return edge_len,
Err(old) => old,
};
let next_edge = loop {
spin_loop();
let r = decode(cur);
edge_len = r.0;
cur = match node_edge.compare_exchange(
cur,
encode(edge_len + 1, edge_index.0),
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break EdgeIndex(r.1),
Err(old) => old,
};
};
edge.store(d, node_index, next_edge);
return edge_len;
}
fn neighbors(&self, node_index: NodeIndex, d: Direction) -> NeighborIter<'_> {
let edge = if let Some(node) = self.nodes.load(node_index.index()) {
node.edge(d)
} else {
(0, EdgeIndex(u32::null()))
};
NeighborIter::new(self, d, edge)
}
fn run_over<A: AsyncRuntime>(&self, rt: &A) {
let r = self.to_count.fetch_sub(1, Ordering::Relaxed);
if r == 1 {
let s = self.sender.clone();
let _ = rt.spawn(async move {
s.send(()).await.unwrap();
});
}
}
}
impl Default for GraphInner {
fn default() -> Self {
let (sender, receiver) = bounded(1);
Self {
nodes: Default::default(),
edges: Default::default(),
map: Default::default(),
to_len: ShareU32::new(0),
froms: Default::default(),
lock: ShareMutex::new(()),
to_count: ShareU32::new(0),
sender,
receiver,
}
}
}
#[derive(Clone)]
pub struct NeighborIter<'a> {
inner: &'a GraphInner,
d: Direction,
edge: (u32, EdgeIndex),
}
impl<'a> NeighborIter<'a> {
fn new(inner: &'a GraphInner, d: Direction, edge: (u32, EdgeIndex)) -> Self {
Self { inner, d, edge }
}
}
impl<'a> Iterator for NeighborIter<'a> {
type Item = NodeIndex;
#[inline(always)]
fn next(&mut self) -> Option<Self::Item> {
if self.edge.0 == 0 {
return None;
}
self.edge.0 -= 1;
let edge = unsafe { self.inner.edges.load_unchecked(self.edge.1.index()) };
let (node_index, next_edge) = edge.load(self.d);
self.edge.1 = next_edge;
Some(node_index)
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.edge.0 as usize, Some(self.edge.0 as usize))
}
}
pub enum NodeType {
None,
System(usize, Cow<'static, str>),
Archetype((u128, Cow<'static, str>)),
Res(Cow<'static, str>),
}
impl NodeType {
pub fn type_name(&self) -> &Cow<'static, str> {
match &self {
NodeType::None => &Cow::Borrowed("None"),
NodeType::System(_, sys_name) => &sys_name,
NodeType::Archetype(s) => &s.1,
NodeType::Res(s) => &s,
}
}
}
impl Debug for NodeType {
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
match &self {
NodeType::None => write!(f, "None"),
NodeType::System(_, sys_name) => write!(f, "System({:?})", sys_name),
NodeType::Archetype(s) => write!(f, "Archetype({:?})", s.1),
NodeType::Res(s) => write!(f, "Res({:?})", s),
}
}
}
impl Display for NodeType {
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
write!(f, "{:#}", self.type_name())
}
}
const NULL_EDGE: u64 = encode(0, u32::MAX);
pub struct Node {
edges: [ShareU64; 2],
status: ShareU32,
label: NodeType,
from_count: ShareU32,
}
impl Node {
#[inline(always)]
fn new_system(sys_index: usize, sys_name: Cow<'static, str>) -> Self {
Self {
edges: [ShareU64::new(NULL_EDGE), ShareU64::new(NULL_EDGE)],
status: ShareU32::new(NODE_STATUS_OVER),
label: NodeType::System(sys_index, sys_name),
from_count: ShareU32::new(0),
}
}
#[inline(always)]
fn new_archetype(id_name: (u128, Cow<'static, str>)) -> Self {
Self {
edges: [ShareU64::new(NULL_EDGE), ShareU64::new(NULL_EDGE)],
status: ShareU32::new(NODE_STATUS_OVER),
label: NodeType::Archetype(id_name),
from_count: ShareU32::new(0),
}
}
#[inline(always)]
fn new_res(name: Cow<'static, str>) -> Self {
Self {
edges: [ShareU64::new(NULL_EDGE), ShareU64::new(NULL_EDGE)],
status: ShareU32::new(NODE_STATUS_OVER),
label: NodeType::Res(name),
from_count: ShareU32::new(0),
}
}
#[inline(always)]
pub fn label(&self) -> &NodeType {
&self.label
}
#[inline(always)]
pub fn is_system(&self) -> bool {
match self.label {
NodeType::System(_, _) => true,
_ => false,
}
}
#[inline(always)]
pub fn edge(&self, d: Direction) -> (u32, EdgeIndex) {
unsafe {
transmute(decode(
self.edges.get_unchecked(d.index()).load(Ordering::Relaxed),
))
}
}
}
impl Null for Node {
fn null() -> Self {
Self {
edges: [ShareU64::new(NULL_EDGE), ShareU64::new(NULL_EDGE)],
status: ShareU32::new(NODE_STATUS_OVER),
label: NodeType::None,
from_count: ShareU32::new(0),
}
}
fn is_null(&self) -> bool {
match self.label {
NodeType::None => true,
_ => false,
}
}
}
impl Debug for Node {
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
let (from_len, from) = self.edge(Direction::From);
let (to_len, to) = self.edge(Direction::To);
f.debug_struct("Node")
.field("label", &self.label)
.field("from_len", &from_len)
.field("from_first_edge", &from)
.field("to_len", &to_len)
.field("to_first_edge", &to)
.finish()
}
}
pub struct Edge([ShareU64; 2]);
impl Edge {
#[inline(always)]
fn new(
from_node: NodeIndex,
from_next_edge: EdgeIndex,
to_node: NodeIndex,
to_next_edge: EdgeIndex,
) -> Self {
Self([
ShareU64::new(encode(from_node.0, from_next_edge.0)),
ShareU64::new(encode(to_node.0, to_next_edge.0)),
])
}
#[inline(always)]
pub fn load(&self, d: Direction) -> (NodeIndex, EdgeIndex) {
unsafe {
transmute(decode(
self.0.get_unchecked(d.index()).load(Ordering::Relaxed),
))
}
}
#[inline(always)]
fn store(&self, d: Direction, node: NodeIndex, next_edge: EdgeIndex) {
unsafe {
self.0
.get_unchecked(d.index())
.store(encode(node.0, next_edge.0), Ordering::Relaxed)
};
}
}
impl Null for Edge {
#[inline(always)]
fn null() -> Self {
Self([ShareU64::null(), ShareU64::null()])
}
#[inline(always)]
fn is_null(&self) -> bool {
self.0[0].is_null()
}
}
impl Debug for Edge {
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
let (from_node, from_next_edge) = self.load(Direction::From);
let (to_node, to_next_edge) = self.load(Direction::To);
f.debug_struct("Edge")
.field("from_node", &from_node)
.field("from_next_edge", &from_next_edge)
.field("to_node", &to_node)
.field("to_next_edge", &to_next_edge)
.finish()
}
}
#[inline(always)]
pub(crate) const fn encode(low: u32, high: u32) -> u64 {
(low as u64) | ((high as u64) << 32)
}
#[inline(always)]
pub(crate) const fn decode(value: u64) -> (u32, u32) {
let low = value & 0xffff_ffff;
let high = value >> 32;
(low as u32, high as u32)
}
struct Notify<'a>(
ExecGraph,
Share<SafeVec<BoxedSystem>>,
bool,
PhantomData<&'a ()>,
);
impl<'a> Listener for Notify<'a> {
type Event = ArchetypeInit<'a>;
#[inline(always)]
fn listen(&self, ar: Self::Event) {
self.0.add_archetype_node(&self.1, &ar.0, &ar.1);
if self.2 {
dbg!(Dot::with_config(&self.0, Config::empty()));
}
}
}