use std::any::TypeId;
use std::future::Future;
use std::ops::Deref;
use std::sync::Arc;
use std::{any::Any, collections::VecDeque};
use anyhow::Context;
use async_executor::{Executor, Task};
use rustc_hash::FxHashMap;
use crate::QueryGuard;
use crate::{
chan::{self, mpsc, oneshot, spsc},
plugin::Plugin,
resource_manager::{LoanManager, ResourceManager},
schedule::{Dependency, IsSchedule},
storage::{Components, Entry, IsBundle, IsQuery},
system::{AsyncSchedule, ShouldContinue, SyncSchedule, SyncSystem},
CanFetch, IsResource, LazyResource, Request, Resource, ResourceId, Write,
};
#[derive(Clone)]
pub struct Facade {
pub(crate) resource_request_tx: mpsc::Sender<Request>,
pub(crate) executor: Arc<async_executor::Executor<'static>>,
}
impl Facade {
pub async fn visit<D: CanFetch + Send + Sync + 'static, T: Send + Sync + 'static>(
&mut self,
f: impl FnOnce(D) -> anyhow::Result<T>,
) -> anyhow::Result<T> {
let borrows = D::borrows();
let (deploy_tx, deploy_rx) = spsc::bounded(1);
self.resource_request_tx
.try_send(Request {
borrows,
construct: |loan_mngr: &mut LoanManager| {
let my_d = D::construct(loan_mngr).with_context(|| {
format!("could not construct {}", std::any::type_name::<D>())
})?;
let my_d_in_a_box: Box<D> = Box::new(my_d);
let rez = Resource::from(my_d_in_a_box);
Ok(rez)
},
deploy_tx,
})
.unwrap();
let rez: Resource = deploy_rx.recv().await.unwrap();
let box_d: Box<D> = rez.downcast().map_err(|rez| {
anyhow::anyhow!(
"Facade could not downcast resource '{}' to '{}'",
rez.type_name().unwrap_or("unknown"),
std::any::type_name::<D>()
)
})?;
let d = *box_d;
let t = f(d)?;
Ok(t)
}
pub fn spawn<T: Send + Sync + 'static>(
&self,
future: impl Future<Output = T> + Send + 'static,
) -> Task<T> {
self.executor.spawn(future)
}
}
pub struct LazyOp {
op: Box<
dyn FnOnce(&mut World) -> anyhow::Result<Arc<dyn Any + Send + Sync>>
+ Send
+ Sync
+ 'static,
>,
tx: chan::oneshot::Sender<Arc<dyn Any + Send + Sync>>,
}
pub struct Entity {
id: usize,
gen: usize,
op_sender: mpsc::Sender<LazyOp>,
op_receivers: Vec<chan::oneshot::Receiver<Arc<dyn Any + Send + Sync>>>,
}
impl std::fmt::Debug for Entity {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Entity")
.field("id", &self.id)
.field("gen", &self.gen)
.finish()
}
}
impl Clone for Entity {
fn clone(&self) -> Self {
Self {
id: self.id.clone(),
gen: self.gen.clone(),
op_sender: self.op_sender.clone(),
op_receivers: Default::default(),
}
}
}
impl Deref for Entity {
type Target = usize;
fn deref(&self) -> &Self::Target {
&self.id
}
}
impl Entity {
pub fn id(&self) -> usize {
self.id
}
pub fn with_bundle<B: IsBundle + Send + Sync + 'static>(mut self, bundle: B) -> Self {
self.insert_bundle(bundle);
self
}
pub fn insert_bundle<B: IsBundle + Send + Sync + 'static>(&mut self, bundle: B) {
let id = self.id;
let (tx, rx) = oneshot::oneshot();
let op = LazyOp {
op: Box::new(move |world: &mut World| {
if !world.has_resource::<Components>() {
world.with_resource(Components::default()).unwrap();
}
let all: &mut Components = world.resource_mut()?;
let _ = all.insert_bundle(id, bundle);
Ok(Arc::new(()) as Arc<dyn Any + Send + Sync>)
}),
tx,
};
self.op_sender
.try_send(op)
.expect("could not send entity op");
self.op_receivers.push(rx);
}
pub fn insert_component<T: Send + Sync + 'static>(&mut self, component: T) {
let id = self.id;
let (tx, rx) = oneshot::oneshot();
let op = LazyOp {
op: Box::new(move |world: &mut World| {
if !world.has_resource::<Components>() {
world.with_resource(Components::default()).unwrap();
}
let all: &mut Components = world.resource_mut()?;
let _ = all.insert_component(id, component);
Ok(Arc::new(()) as Arc<dyn Any + Send + Sync>)
}),
tx,
};
self.op_sender
.try_send(op)
.expect("could not send entity op");
self.op_receivers.push(rx);
}
pub fn remove_component<T: Send + Sync + 'static>(&mut self) {
let id = self.id;
let (tx, rx) = oneshot::oneshot();
let op = LazyOp {
op: Box::new(move |world: &mut World| {
if !world.has_resource::<Components>() {
world.with_resource(Components::default()).unwrap();
}
let all: &mut Components = world.resource_mut()?;
let _ = all.remove_component::<T>(id);
Ok(Arc::new(()) as Arc<dyn Any + Send + Sync>)
}),
tx,
};
self.op_sender
.try_send(op)
.expect("could not send entity op");
self.op_receivers.push(rx);
}
pub async fn updates(&mut self) {
let updates: Vec<chan::oneshot::Receiver<Arc<dyn Any + Send + Sync>>> =
std::mem::take(&mut self.op_receivers);
for update in updates.into_iter() {
let _ = update
.await
.map_err(|_| anyhow::anyhow!("updates oneshot is closed"))
.unwrap();
}
}
pub async fn visit<Q: IsQuery + 'static, T: Send + Sync + 'static>(
&self,
f: impl FnOnce(Q::QueryRow<'_>) -> T + Send + Sync + 'static,
) -> Option<T> {
let id = self.id();
let (tx, rx) = oneshot::oneshot();
self.op_sender
.try_send(LazyOp {
op: Box::new(move |world: &mut World| {
if !world.has_resource::<Components>() {
world.with_resource(Components::default())?;
}
let mut storage: Write<Components> = world.fetch()?;
let mut q = storage.query::<Q>();
Ok(Arc::new(q.find_one(id).map(f)) as Arc<dyn Any + Send + Sync>)
}),
tx,
})
.context("could not send entity op")
.unwrap();
let arc: Arc<dyn Any + Send + Sync> = rx
.await
.map_err(|_| anyhow::anyhow!("could not receive get request"))
.unwrap();
let arc_c: Arc<Option<T>> = arc
.downcast()
.map_err(|_| anyhow::anyhow!("could not downcast"))
.unwrap();
let c: Option<T> = Arc::try_unwrap(arc_c)
.map_err(|_| anyhow::anyhow!("could not unwrap"))
.unwrap();
c
}
}
pub struct Entities {
pub(crate) next_k: usize,
pub(crate) generations: Vec<usize>,
pub(crate) recycle: Vec<usize>,
pub(crate) delete_tx: spsc::Sender<usize>,
pub(crate) delete_rx: spsc::Receiver<usize>,
pub(crate) deleted: VecDeque<(u64, Vec<(usize, smallvec::SmallVec<[TypeId; 4]>)>)>,
pub(crate) lazy_op_sender: mpsc::Sender<LazyOp>,
}
impl Default for Entities {
fn default() -> Self {
let (delete_tx, delete_rx) = spsc::unbounded();
Self {
next_k: Default::default(),
generations: vec![],
recycle: Default::default(),
delete_rx,
delete_tx,
deleted: Default::default(),
lazy_op_sender: mpsc::unbounded().0,
}
}
}
impl Entities {
pub fn new(lazy_op_sender: mpsc::Sender<LazyOp>) -> Self {
Self {
lazy_op_sender,
..Default::default()
}
}
fn dequeue(&mut self) -> usize {
if let Some(id) = self.recycle.pop() {
self.generations[id] += 1;
id
} else {
let id = self.next_k;
self.generations.push(0);
self.next_k += 1;
id
}
}
pub fn alive_iter(&self) -> impl Iterator<Item = Entity> + '_ {
self.generations
.iter()
.enumerate()
.filter_map(|(id, _gen)| self.hydrate(id))
}
pub fn alive_len(&self) -> usize {
self.generations.len() - self.recycle.len()
}
pub fn create_many(&mut self, mut how_many: usize) -> Vec<usize> {
let mut ids = vec![];
while let Some(id) = self.recycle.pop() {
self.generations[id] += 1;
ids.push(id);
how_many -= 1;
if how_many == 0 {
return ids;
}
}
let last_id = self.next_k + (how_many - 1);
self.generations.resize_with(last_id, || 0);
ids.extend(self.next_k..=last_id);
self.next_k = last_id + 1;
ids
}
pub fn create(&mut self) -> Entity {
let id = self.dequeue();
Entity {
id,
gen: self.generations[id],
op_sender: self.lazy_op_sender.clone(),
op_receivers: Default::default(),
}
}
pub fn destroy(&self, mut entity: Entity) {
entity.op_receivers = Default::default();
self.delete_tx.try_send(entity.id()).unwrap();
}
pub fn destroy_all(&mut self) {
for id in 0..self.next_k {
if let Some(entity) = self.hydrate(id) {
self.destroy(entity);
}
}
}
pub fn deleted_iter(&self) -> impl Iterator<Item = Entry<()>> + '_ {
self.deleted.iter().flat_map(|(changed, ids)| {
ids.iter().map(|(id, _)| Entry {
value: (),
key: *id,
changed: *changed,
added: true,
})
})
}
pub fn deleted_iter_of<T: 'static>(&self) -> impl Iterator<Item = Entry<()>> + '_ {
let ty = TypeId::of::<Entry<T>>();
self.deleted.iter().flat_map(move |(changed, ids)| {
ids.iter().filter_map(move |(id, tys)| {
if tys.contains(&ty) {
Some(Entry {
value: (),
key: *id,
changed: *changed,
added: true,
})
} else {
None
}
})
})
}
pub fn hydrate(&self, entity_id: usize) -> Option<Entity> {
if self.recycle.contains(&entity_id) {
return None;
}
let gen = self.generations.get(entity_id)?;
Some(Entity {
id: entity_id,
gen: *gen,
op_sender: self.lazy_op_sender.clone(),
op_receivers: Default::default(),
})
}
pub fn insert_bundle<B: IsBundle + Send + Sync + 'static>(&self, entity_id: usize, bundle: B) {
if let Some(mut entity) = self.hydrate(entity_id) {
entity.insert_bundle(bundle);
}
}
pub fn insert_component<T: Send + Sync + 'static>(&self, entity_id: usize, component: T) {
if let Some(mut entity) = self.hydrate(entity_id) {
entity.insert_component(component);
}
}
pub fn remove_component<T: Send + Sync + 'static>(&self, entity_id: usize) {
if let Some(mut entity) = self.hydrate(entity_id) {
entity.remove_component::<T>();
}
}
}
pub enum Parallelism {
Automatic,
Explicit(u32),
}
pub struct World {
pub(crate) resource_manager: ResourceManager,
pub(crate) sync_schedule: SyncSchedule,
pub(crate) async_systems: FxHashMap<String, oneshot::Receiver<anyhow::Result<()>>>,
pub(crate) facade: Facade,
pub(crate) command_rx: mpsc::Receiver<Request>,
pub(crate) lazy_ops: (mpsc::Sender<LazyOp>, mpsc::Receiver<LazyOp>),
}
impl Default for World {
fn default() -> Self {
let lazy_ops = mpsc::unbounded();
let entities = Entities::new(lazy_ops.0.clone());
let async_task_executor = Arc::new(Executor::default());
let (tx, rx) = mpsc::unbounded();
let facade = Facade {
resource_request_tx: tx,
executor: async_task_executor,
};
let mut world = Self {
resource_manager: ResourceManager::default(),
sync_schedule: SyncSchedule::default(),
async_systems: FxHashMap::default(),
facade: facade.clone(),
command_rx: rx,
lazy_ops,
};
world
.with_resource(entities)
.unwrap()
.with_resource(Components::default())
.unwrap()
.with_resource(facade)
.unwrap();
world
}
}
impl World {
pub fn builder() -> Plugin {
Plugin::default()
}
pub fn facade(&self) -> Facade {
self.facade.clone()
}
pub fn with_default_resource<T: Default + IsResource>(&mut self) -> anyhow::Result<&mut Self> {
let resource: T = T::default();
self.with_resource(resource)
}
pub fn with_resource<T: IsResource>(&mut self, resource: T) -> anyhow::Result<&mut Self> {
if self.resource_manager.add(resource).is_some() {
anyhow::bail!("resource {} already exists", std::any::type_name::<T>());
}
Ok(self)
}
pub fn set_resource<T: IsResource>(&mut self, resource: T) -> anyhow::Result<Option<T>> {
if let Some(prev) = self.resource_manager.add(resource) {
match prev.downcast::<T>() {
Ok(t) => Ok(Some(*t)),
Err(_) => Err(anyhow::anyhow!("could not downcast previous resource")),
}
} else {
Ok(None)
}
}
pub fn with_plugin(&mut self, plugin: impl Into<Plugin>) -> anyhow::Result<&mut Self> {
let plugin: Plugin = plugin.into();
let mut missing_resources: FxHashMap<ResourceId, Vec<anyhow::Error>> = FxHashMap::default();
for LazyResource { id, create } in plugin.resources.into_iter() {
if !self.resource_manager.has_resource(&id) {
log::debug!("attempting to create resource {}...", id.name);
match (create)(&mut self.resource_manager.as_mut_loan_manager()) {
Ok(resource) => {
missing_resources.remove(&id);
let _ = self.resource_manager.insert(id, resource);
}
Err(err) => {
let entry = missing_resources.entry(id).or_default();
entry.push(err);
}
}
self.resource_manager
.unify_resources("after building lazy dep")?;
}
}
anyhow::ensure!(
missing_resources.is_empty(),
"missing resources:\n{:#?}",
missing_resources
);
for system in plugin.sync_systems.into_iter() {
if !self.sync_schedule.contains_system(&system.0.name) {
self.sync_schedule.add_system(system.0);
}
}
for asystem in plugin.async_systems.into_iter() {
if self.async_systems.contains_key(&asystem.name) {
continue;
}
self.with_async(asystem.name, asystem.make_future).unwrap();
}
Ok(self)
}
pub fn with_data<T: CanFetch>(&mut self) -> anyhow::Result<&mut Self> {
self.with_plugin(T::plugin())
}
pub fn with_system<T, F>(
&mut self,
name: impl AsRef<str>,
sys_fn: F,
) -> anyhow::Result<&mut Self>
where
F: FnMut(T) -> anyhow::Result<ShouldContinue> + Send + Sync + 'static,
T: CanFetch + 'static,
{
self.with_system_with_dependencies(name, sys_fn, &[], &[])
}
pub fn with_system_with_dependencies<T, F>(
&mut self,
name: impl AsRef<str>,
sys_fn: F,
after_deps: &[&str],
before_deps: &[&str],
) -> anyhow::Result<&mut Self>
where
F: FnMut(T) -> anyhow::Result<ShouldContinue> + Send + Sync + 'static,
T: CanFetch + 'static,
{
let mut deps = after_deps
.iter()
.map(|dep| Dependency::After(dep.to_string()))
.collect::<Vec<_>>();
deps.extend(
before_deps
.iter()
.map(|dep| Dependency::Before(dep.to_string())),
);
let system = SyncSystem::new(name, sys_fn, deps);
self.with_plugin(T::plugin())?;
self.sync_schedule.add_system(system);
Ok(self)
}
pub fn with_system_barrier(&mut self) -> &mut Self {
self.sync_schedule.add_barrier();
self
}
pub fn with_parallelism(&mut self, parallelism: Parallelism) -> &mut Self {
let num_threads = match parallelism {
Parallelism::Automatic => {
#[cfg(target_arch = "wasm32")]
{
1
}
#[cfg(not(target_arch = "wasm32"))]
{
rayon::current_num_threads() as u32
}
}
Parallelism::Explicit(n) => {
if n > 1 {
log::info!("building a rayon thread pool with {} threads", n);
rayon::ThreadPoolBuilder::new()
.num_threads(n as usize)
.build()
.unwrap();
n
} else {
1
}
}
};
self.sync_schedule.set_parallelism(num_threads);
self
}
pub fn with_async<F, Fut>(
&mut self,
name: impl Into<String>,
make_system_future: F,
) -> anyhow::Result<&mut Self>
where
F: FnOnce(Facade) -> Fut,
Fut: Future<Output = anyhow::Result<()>> + Send + Sync + 'static,
{
let name = name.into();
if self.async_systems.contains_key(&name) {
anyhow::bail!("async system '{}' already exists", name);
}
let facade = self.facade.clone();
let (mut tx, rx) = oneshot::oneshot();
let fut = (make_system_future)(facade);
let task = self.facade.spawn(async move {
let result = fut.await;
tx.send(result).unwrap();
});
task.detach();
self.async_systems.insert(name, rx);
Ok(self)
}
pub fn has_resource<T: IsResource>(&self) -> bool {
self.resource_manager.has_resource(&ResourceId::new::<T>())
}
pub fn spawn(&self, future: impl Future<Output = ()> + Send + Sync + 'static) {
let task = self.facade.executor.spawn(future);
task.detach();
}
pub fn tick(&mut self) -> anyhow::Result<()> {
self.tick_async()?;
self.tick_sync()?;
self.tick_lazy()?;
Ok(())
}
pub fn tock(&mut self) {
self.tick_async().unwrap();
self.tick_sync().unwrap();
self.tick_lazy().unwrap();
}
pub fn tick_sync(&mut self) -> anyhow::Result<()> {
log::trace!("tick sync");
log::trace!(
"execution:\n{}",
self.sync_schedule.get_execution_order().join("\n")
);
self.sync_schedule.run((), &mut self.resource_manager)?;
self.resource_manager.unify_resources("tick sync")?;
Ok(())
}
pub fn tick_async(&mut self) -> anyhow::Result<()> {
log::trace!("tick async");
for (name, rx) in std::mem::take(&mut self.async_systems).into_iter() {
match rx.try_recv() {
Err(err) => match err {
oneshot::TryRecvError::Empty(rx) => {
self.async_systems.insert(name, rx);
}
oneshot::TryRecvError::Closed => {}
},
Ok(res) => res?,
}
}
let mut schedule = AsyncSchedule::default();
while let Ok(request) = self.command_rx.try_recv() {
schedule.add_system(request);
}
if !schedule.is_empty() {
log::trace!(
"async system execution:\n{}",
schedule.get_execution_order().join("\n")
);
schedule.run(&self.facade.executor, &mut self.resource_manager)?;
} else {
fn tick(executor: &Executor<'static>) {
while executor.try_tick() {}
}
let parallelism = self.sync_schedule.get_parallelism();
if parallelism > 1 {
rayon::prelude::ParallelIterator::for_each(
rayon::prelude::IntoParallelIterator::into_par_iter(0..parallelism as u32),
|_| tick(&self.facade.executor),
);
} else {
tick(&self.facade.executor);
}
}
self.resource_manager.unify_resources("tick async")?;
Ok(())
}
pub fn tick_lazy(&mut self) -> anyhow::Result<()> {
log::trace!("tick lazy");
while let Ok(LazyOp { op, mut tx }) = self.lazy_ops.1.try_recv() {
self.resource_manager.unify_resources("World::tick_lazy")?;
let t = (op)(self)?;
let _ = tx.send(t);
}
self.resource_manager.unify_resources("World::tick_lazy")?;
let dead_ids: Vec<usize> = {
let entities: &Entities = self.resource()?;
let mut dead_ids = vec![];
while let Some(id) = entities.delete_rx.try_recv().ok() {
dead_ids.push(id);
}
dead_ids
};
if !dead_ids.is_empty() {
let ids_and_types: Vec<(usize, smallvec::SmallVec<[TypeId; 4]>)> = {
let archset: &mut Components = self.resource_mut()?;
let ids_and_types = archset.upkeep(&dead_ids);
ids_and_types
};
let entities: &mut Entities = self.resource_mut()?;
entities
.recycle
.extend(ids_and_types.iter().map(|(id, _)| *id));
entities
.deleted
.push_front((crate::system::current_iteration(), ids_and_types));
while entities.deleted.len() > 3 {
let _ = entities.deleted.pop_back();
}
}
Ok(())
}
pub fn run(&mut self) -> anyhow::Result<&mut Self> {
loop {
self.tick()?;
if self.async_systems.is_empty()
&& self.sync_schedule.is_empty()
&& self.facade.executor.is_empty()
{
break;
}
}
Ok(self)
}
pub fn run_while<T, F, Fut>(&mut self, make_system_future: F) -> anyhow::Result<T>
where
T: Send + Sync + 'static,
F: FnOnce(Facade) -> Fut,
Fut: Future<Output = T> + Send + Sync + 'static,
{
let (tx, rx) = spsc::bounded(1);
let fut = (make_system_future)(self.facade.clone());
self.spawn(async move {
let t = fut.await;
tx.try_send(t).unwrap();
});
loop {
self.tick()?;
match rx.try_recv() {
Ok(t) => return Ok(t),
Err(err) => match err {
spsc::TryRecvError::Empty => {}
spsc::TryRecvError::Closed => unreachable!("this should never happen"),
},
}
}
}
pub fn resource<T: IsResource>(&self) -> anyhow::Result<&T> {
let id = ResourceId::new::<T>();
self.resource_manager.get(&id)
}
pub fn resource_mut<T: IsResource>(&mut self) -> anyhow::Result<&mut T> {
self.resource_manager.get_mut::<T>()
}
pub fn fetch<T: CanFetch>(&mut self) -> anyhow::Result<T> {
self.resource_manager.unify_resources("World::fetch")?;
T::construct(&mut LoanManager(&mut self.resource_manager))
}
pub fn entity(&mut self) -> Entity {
let mut entities = self.fetch::<Write<Entities>>().unwrap();
entities.create()
}
pub fn entity_with_bundle<B: IsBundle>(&mut self, bundle: B) -> Entity {
let entity = self.resource_mut::<Entities>().unwrap().create();
self.resource_mut::<Components>()
.unwrap()
.insert_bundle(entity.id(), bundle);
entity
}
pub fn query<Q: IsQuery + 'static>(&mut self) -> QueryGuard<'_, Q> {
self.resource_mut::<Components>().unwrap().query::<Q>()
}
pub fn insert_component<T: Send + Sync + 'static>(
&mut self,
id: usize,
component: T,
) -> Option<T> {
let components = self.resource_mut::<Components>().unwrap();
components.insert_component(id, component)
}
pub fn insert_bundle<B: IsBundle>(&mut self, id: usize, bundle: B) {
let components = self.resource_mut::<Components>().unwrap();
components.insert_bundle(id, bundle);
}
pub fn get_component<T: Send + Sync + 'static>(
&self,
id: usize,
) -> Option<impl Deref<Target = T> + '_> {
let components = self.resource::<Components>().unwrap();
components.get_component::<T>(id)
}
pub fn get_schedule_description(&self) -> String {
format!("{:#?}", self.sync_schedule)
}
pub fn get_sync_schedule_names(&self) -> Vec<Vec<&str>> {
self.sync_schedule.get_schedule_names()
}
}
#[cfg(test)]
mod test {
use std::sync::Mutex;
use crate::{
self as apecs, anyhow,
chan::spsc,
storage::{Components, Query},
system::*,
world::*,
CanFetch, Read, Write,
};
use futures_lite::future;
use rustc_hash::FxHashMap;
#[derive(Default)]
struct MyMap(FxHashMap<String, u32>);
#[derive(Default)]
struct Number(u32);
#[test]
fn can_closure_system() {
#[derive(Copy, Clone, Debug, PartialEq)]
struct F32s(f32, f32);
#[derive(CanFetch)]
struct StatefulSystemData {
positions: Query<&'static F32s>,
}
fn mk_stateful_system(
tx: spsc::Sender<F32s>,
) -> impl FnMut(StatefulSystemData) -> anyhow::Result<ShouldContinue> {
println!("making stateful system");
let mut highest_pos: F32s = F32s(0.0, f32::NEG_INFINITY);
move |data: StatefulSystemData| {
println!("running stateful system: highest_pos:{:?}", highest_pos);
for pos in data.positions.query().iter_mut() {
if pos.1 > highest_pos.1 {
highest_pos = *pos.value();
println!("set new highest_pos: {:?}", highest_pos);
}
}
println!("sending highest_pos: {:?}", highest_pos);
tx.try_send(highest_pos)?;
ok()
}
}
let (tx, rx) = spsc::bounded(1);
let mut world = World::default();
world
.with_system("stateful", mk_stateful_system(tx))
.unwrap();
{
let mut archset: Write<Components> = world.fetch().unwrap();
archset.insert_component(0, F32s(20.0, 30.0));
archset.insert_component(1, F32s(0.0, 0.0));
archset.insert_component(2, F32s(100.0, 100.0));
}
world.tock();
let highest = rx.try_recv().unwrap();
assert_eq!(F32s(100.0, 100.0), highest);
}
#[test]
fn async_systems_run_and_return_resources() {
let _ = env_logger::builder()
.is_test(true)
.filter_level(log::LevelFilter::Trace)
.try_init();
async fn create(tx: spsc::Sender<()>, mut facade: Facade) -> anyhow::Result<()> {
log::info!("create running");
tx.try_send(()).unwrap();
facade
.visit(
|(mut entities, mut archset): (Write<Entities>, Write<Components>)| {
for n in 0..100u32 {
let e = entities.create();
archset.insert_bundle(e.id(), (format!("entity_{}", n), n));
}
Ok(())
},
)
.await
}
fn maintain_map(
mut data: (Query<(&String, &u32)>, Write<MyMap>),
) -> anyhow::Result<ShouldContinue> {
for (name, number) in data.0.query().iter_mut() {
if !data.1.inner().0.contains_key(name.value()) {
let _ = data
.1
.inner_mut()
.0
.insert(name.to_string(), *number.value());
}
}
ok()
}
let (tx, rx) = spsc::bounded(1);
let mut world = World::default();
world
.with_async("create", |facade| async move { create(tx, facade).await })
.unwrap()
.with_system("maintain", maintain_map)
.unwrap();
world.tock();
rx.try_recv().unwrap();
world.tock();
let book = world.fetch::<Read<MyMap>>().unwrap();
for n in 0..100 {
assert_eq!(book.0.get(&format!("entity_{}", n)), Some(&n));
}
}
#[test]
fn can_create_entities_and_build_convenience() {
struct DataA(f32);
struct DataB(f32);
let mut world = World::default();
assert!(world.has_resource::<Entities>(), "missing entities");
let e = world.entity().with_bundle((DataA(0.0), DataB(0.0)));
let id = e.id();
world
.with_async("insert-bundle", |_| async move {
println!("updating entity");
e.with_bundle((DataA(666.0), DataB(666.0))).updates().await;
println!("done!");
Ok(())
})
.unwrap();
while !world.facade.executor.is_empty() {
world.tick().unwrap();
}
let data: Query<(&DataA, &DataB)> = world.fetch().unwrap();
let mut q = data.query();
let (a, b) = q.find_one(id).unwrap();
assert_eq!(666.0, a.0);
assert_eq!(666.0, b.0);
}
#[test]
fn entities_can_lazy_add_and_get() {
#[derive(Debug, Clone, PartialEq)]
struct Name(&'static str);
#[derive(Debug, Clone, PartialEq)]
struct Age(u32);
let await_points = Arc::new(Mutex::new(vec![]));
let awaits = await_points.clone();
let mut world = World::default();
world
.with_async("test", |mut facade| async move {
await_points.lock().unwrap().push(1);
let mut e = {
let e = facade
.visit(|mut entities: Write<Entities>| Ok(entities.create()))
.await
.unwrap();
await_points.lock().unwrap().push(2);
e
};
e.insert_bundle((Name("ada"), Age(666)));
e.updates().await;
await_points.lock().unwrap().push(3);
let (name, age) = e
.visit::<(&Name, &Age), _>(|(name, age)| {
(name.value().clone(), age.value().clone())
})
.await
.unwrap();
await_points.lock().unwrap().push(4);
assert_eq!(Name("ada"), name);
assert_eq!(Age(666), age);
println!("done!");
Ok(())
})
.unwrap();
for i in 1..=5 {
world.tock();
let num_awaits = i.min(4);
assert_eq!(Some(num_awaits), awaits.lock().unwrap().last().cloned());
}
let ages = world.fetch::<Query<&Age>>().unwrap();
let mut q = ages.query();
let age = q.find_one(0).unwrap();
assert_eq!(&Age(666), age.value());
}
#[test]
fn plugin_inserts_resources_from_canfetch_in_systems() {
#[derive(Default)]
struct MyStr(&'static str);
let _ = env_logger::builder()
.is_test(true)
.filter_level(log::LevelFilter::Trace)
.try_init();
let mut world = World::default();
world.with_system("test", |_: Write<MyStr>| ok()).unwrap();
let s = world.resource_mut::<MyStr>().unwrap();
s.0 = "blah";
}
#[test]
fn sanity_channel_ref() {
let f = 0.0f32;
let (tx, rx) = mpsc::unbounded();
tx.try_send(&f).unwrap();
assert_eq!(&0.0, rx.try_recv().unwrap());
}
#[test]
fn can_query_empty_ref_archetypes_in_same_batch() {
let _ = env_logger::builder()
.is_test(true)
.filter_level(log::LevelFilter::Trace)
.try_init();
let mut world = World::default();
world
.with_system("one", |q: Query<(&f32, &bool)>| {
for (_f, _b) in q.query().iter_mut() {}
ok()
})
.unwrap()
.with_system("two", |q: Query<(&f32, &bool)>| {
for (_f, _b) in q.query().iter_mut() {}
ok()
})
.unwrap();
world.tock();
}
#[test]
fn parallelism() {
let _ = env_logger::builder()
.is_test(true)
.filter_level(log::LevelFilter::Trace)
.try_init();
let mut world = World::default();
world
.with_async("one", |mut facade: Facade| -> AsyncSystemFuture {
Box::pin(async move {
facade
.visit(|mut number: Write<Number>| {
number.inner_mut().0 = 1;
Ok(())
})
.await
})
})
.unwrap()
.with_async("two", |mut facade: Facade| -> AsyncSystemFuture {
Box::pin(async move {
for _ in 0..2 {
facade
.visit(|mut number: Write<Number>| {
number.inner_mut().0 = 2;
Ok(())
})
.await?;
}
Ok(())
})
})
.unwrap()
.with_async("three", |mut facade: Facade| -> AsyncSystemFuture {
Box::pin(async move {
for _ in 0..3 {
facade
.visit(|mut number: Write<Number>| {
number.inner_mut().0 = 3;
Ok(())
})
.await?;
}
Ok(())
})
})
.unwrap()
.with_parallelism(Parallelism::Automatic);
world.run().unwrap();
}
#[test]
fn deleted_entities() {
let _ = env_logger::builder()
.is_test(true)
.filter_level(log::LevelFilter::Trace)
.try_init();
let mut world = World::default();
{
let entities: &mut Entities = world.resource_mut().unwrap();
(0..10u32).for_each(|i| {
entities
.create()
.insert_bundle(("hello".to_string(), false, i))
});
assert_eq!(10, entities.alive_len());
}
world.tock();
{
let q: Query<&u32> = world.fetch().unwrap();
assert_eq!(9, **q.query().find_one(9).unwrap());
}
{
let entities: &Entities = world.resource().unwrap();
let entity = entities.hydrate(9).unwrap();
entities.destroy(entity);
}
world.tock();
{
let entities: &Entities = world.resource().unwrap();
assert_eq!(9, entities.alive_len());
let deleted_strings = entities.deleted_iter_of::<String>().collect::<Vec<_>>();
assert_eq!(9, deleted_strings[0].id());
}
}
#[test]
fn spsc_drop_sanity() {
let (tx, rx) = spsc::bounded::<()>(1);
tx.try_send(()).unwrap();
drop(tx);
assert!(rx.try_recv().is_ok());
}
#[test]
fn can_clone_facade() {
let mut world = World::default();
world
.with_async("async", |mut facade_a: Facade| async move {
let (tx, rx) = spsc::bounded(1);
let mut facade_b = facade_a.clone();
#[allow(unreachable_code)]
let send_loop = async move {
loop {
println!("send_loop visiting");
let count = facade_a.visit(|count: Read<u32>| Ok(*count)).await?;
println!("send_loop sending");
tx.send(count).await.unwrap();
}
anyhow::Ok(())
};
#[allow(unreachable_code)]
let recv_loop = async move {
loop {
println!("recv_loop receiving");
let recv_count: u32 = rx.recv().await.unwrap();
println!("recv_loop visiting");
facade_b
.visit(|mut count: Write<u32>| {
*count = recv_count + 1;
println!("recv_loop updated count to {}", recv_count + 1);
Ok(())
})
.await?;
}
anyhow::Ok(())
};
let _ = future::zip(send_loop, recv_loop).await;
Ok(())
})
.unwrap()
.run_while(|mut facade| async move {
loop {
let count = facade.visit(|count: Read<u32>| Ok(*count)).await?;
println!("run_while loop counted {}", count);
if count >= 3 {
return anyhow::Ok(());
}
}
})
.unwrap()
.unwrap();
}
}