use crate::{
try_emit_broken, Broken, Cancel, DeliveryLabelId, InspectInput, SetupFailure, StreamTargetMap,
UnhandledErrors,
};
use bevy_ecs::{
prelude::{Component, Entity, World},
world::Command,
};
use bevy_hierarchy::prelude::BuildWorldChildren;
use backtrace::Backtrace;
use std::collections::{hash_map::Entry, HashMap, HashSet, VecDeque};
use smallvec::SmallVec;
mod branching;
pub(crate) use branching::*;
mod cleanup;
pub(crate) use cleanup::*;
mod collect;
pub(crate) use collect::*;
mod filter;
pub(crate) use filter::*;
mod fork_clone;
pub(crate) use fork_clone::*;
mod fork_unzip;
pub(crate) use fork_unzip::*;
mod injection;
pub(crate) use injection::*;
mod join;
pub(crate) use join::*;
mod listen;
pub(crate) use listen::*;
mod noop;
pub(crate) use noop::*;
mod operate_buffer;
pub use operate_buffer::*;
mod operate_buffer_access;
pub use operate_buffer_access::*;
mod operate_callback;
pub(crate) use operate_callback::*;
mod operate_gate;
pub(crate) use operate_gate::*;
mod operate_map;
pub(crate) use operate_map::*;
mod operate_service;
pub(crate) use operate_service::*;
mod operate_task;
pub(crate) use operate_task::*;
mod operate_trim;
pub(crate) use operate_trim::*;
mod scope;
pub use scope::*;
mod spread;
pub(crate) use spread::*;
#[derive(Component, Clone)]
pub struct SingleInputStorage(SmallVec<[Entity; 8]>);
impl SingleInputStorage {
pub fn new(input: Entity) -> Self {
Self(SmallVec::from_iter([input]))
}
pub fn empty() -> Self {
Self(SmallVec::new())
}
pub fn get(&self) -> &SmallVec<[Entity; 8]> {
&self.0
}
pub fn take(self) -> SmallVec<[Entity; 8]> {
self.0
}
pub(crate) fn add(&mut self, input: Entity) {
if !self.0.contains(&input) {
self.0.push(input);
}
}
pub fn is_reachable(r: &mut OperationReachability) -> ReachabilityResult {
let Some(inputs) = r.world.get_entity(r.source).or_broken()?.get::<Self>() else {
return Ok(false);
};
for input in &inputs.0 {
if r.check_upstream(*input)? {
return Ok(true);
}
}
Ok(false)
}
}
#[derive(Component, Clone, Default)]
pub struct FunnelInputStorage(pub(crate) SmallVec<[Entity; 8]>);
impl FunnelInputStorage {
pub fn new() -> Self {
Self::default()
}
pub fn get(&self) -> &SmallVec<[Entity; 8]> {
&self.0
}
}
impl FromIterator<Entity> for FunnelInputStorage {
fn from_iter<T: IntoIterator<Item = Entity>>(iter: T) -> Self {
Self(SmallVec::from_iter(iter))
}
}
impl From<SmallVec<[Entity; 8]>> for FunnelInputStorage {
fn from(value: SmallVec<[Entity; 8]>) -> Self {
Self(value)
}
}
#[derive(Component, Clone, Copy, Debug)]
pub struct SingleTargetStorage(Entity);
impl SingleTargetStorage {
pub fn new(target: Entity) -> Self {
Self(target)
}
pub fn get(&self) -> Entity {
self.0
}
pub(crate) fn set(&mut self, target: Entity) {
self.0 = target;
}
}
#[derive(Component, Clone, Default)]
pub struct ForkTargetStorage(pub SmallVec<[Entity; 8]>);
impl ForkTargetStorage {
pub fn new() -> Self {
Self::default()
}
}
impl FromIterator<Entity> for ForkTargetStorage {
fn from_iter<T: IntoIterator<Item = Entity>>(iter: T) -> Self {
Self(SmallVec::from_iter(iter))
}
}
#[derive(Component)]
pub(crate) struct UnusedTarget;
#[derive(Default)]
pub struct OperationRoster {
pub(crate) queue: VecDeque<Entity>,
pub(crate) awake: VecDeque<Entity>,
pub(crate) deferred_queue: VecDeque<Entity>,
pub(crate) cancel: VecDeque<Cancel>,
pub(crate) unblock: VecDeque<Blocker>,
pub(crate) disposed: Vec<DisposalNotice>,
pub(crate) cleanup_finished: Vec<Cleanup>,
}
impl OperationRoster {
pub fn new() -> Self {
Self::default()
}
pub fn queue(&mut self, source: Entity) {
self.queue.push_back(source);
}
pub fn awake(&mut self, source: Entity) {
self.awake.push_back(source);
}
pub fn defer(&mut self, source: Entity) {
self.deferred_queue.push_back(source);
}
pub fn cancel(&mut self, source: Cancel) {
self.cancel.push_back(source);
}
pub(crate) fn unblock(&mut self, provider: Blocker) {
self.unblock.push_back(provider);
}
pub fn disposed(&mut self, scope: Entity, origin: Entity, session: Entity) {
self.disposed.push(DisposalNotice {
source: scope,
origin,
session,
});
}
pub fn cleanup_finished(&mut self, cleanup: Cleanup) {
self.cleanup_finished.push(cleanup);
}
pub fn is_empty(&self) -> bool {
self.queue.is_empty()
&& self.awake.is_empty()
&& self.deferred_queue.is_empty()
&& self.cancel.is_empty()
&& self.unblock.is_empty()
&& self.disposed.is_empty()
&& self.cleanup_finished.is_empty()
}
pub fn append(&mut self, other: &mut Self) {
self.queue.append(&mut other.queue);
self.cancel.append(&mut other.cancel);
self.unblock.append(&mut other.unblock);
self.disposed.append(&mut other.disposed);
self.cleanup_finished.append(&mut other.cleanup_finished);
}
pub fn purge(&mut self, target: Entity) {
self.queue.retain(|e| *e != target);
self.deferred_queue.retain(|e| *e != target);
}
pub fn process_deferals(&mut self) {
for e in self.deferred_queue.drain(..) {
self.queue.push_back(e);
}
}
}
pub struct DisposalNotice {
pub source: Entity,
pub origin: Entity,
pub session: Entity,
}
pub(crate) struct Blocker {
pub(crate) provider: Entity,
pub(crate) source: Entity,
pub(crate) session: Entity,
pub(crate) label: Option<DeliveryLabelId>,
pub(crate) serve_next: fn(Blocker, &mut World, &mut OperationRoster),
}
#[derive(Clone, Debug)]
pub enum OperationError {
Broken(Option<Backtrace>),
NotReady,
}
impl OperationError {
pub fn broken_here() -> Self {
OperationError::Broken(Some(Backtrace::new()))
}
}
pub type OperationResult = Result<(), OperationError>;
pub type ReachabilityResult = Result<bool, OperationError>;
pub struct OperationSetup<'a> {
pub(crate) source: Entity,
pub(crate) world: &'a mut World,
}
pub struct OperationRequest<'a> {
pub source: Entity,
pub world: &'a mut World,
pub roster: &'a mut OperationRoster,
}
impl<'a> OperationRequest<'a> {
pub fn pend(self) -> PendingOperationRequest {
PendingOperationRequest {
source: self.source,
}
}
}
pub struct OperationReachability<'a> {
source: Entity,
session: Entity,
disposed: Option<Entity>,
world: &'a World,
visited: &'a mut HashMap<Entity, bool>,
}
impl<'a> OperationReachability<'a> {
pub fn new(
session: Entity,
source: Entity,
disposed: Option<Entity>,
world: &'a World,
visited: &'a mut HashMap<Entity, bool>,
) -> OperationReachability<'a> {
Self {
session,
source,
disposed,
world,
visited,
}
}
pub fn check_upstream(&mut self, source: Entity) -> ReachabilityResult {
match self.visited.entry(source) {
Entry::Occupied(occupied) => {
return Ok(*occupied.get());
}
Entry::Vacant(vacant) => {
vacant.insert(false);
}
}
let reachabiility = self
.world
.get_entity(source)
.or_broken()?
.get::<OperationReachabilityStorage>()
.or_broken()?
.0;
let is_reachable = reachabiility(OperationReachability {
source,
session: self.session,
disposed: self.disposed,
world: self.world,
visited: self.visited,
})?;
if is_reachable {
self.visited.insert(source, is_reachable);
}
Ok(is_reachable)
}
pub fn has_input<T: 'static + Send + Sync>(&self) -> ReachabilityResult {
self.world
.get_entity(self.source)
.or_broken()?
.has_input::<T>(self.session)
}
pub fn source(&self) -> Entity {
self.source
}
pub fn session(&self) -> Entity {
self.session
}
pub fn world(&self) -> &World {
self.world
}
}
pub fn check_reachability(
session: Entity,
source: Entity,
disposed: Option<Entity>,
world: &World,
) -> ReachabilityResult {
let mut visited = HashMap::new();
let mut r = OperationReachability {
source,
session,
disposed,
world,
visited: &mut visited,
};
r.check_upstream(source)
}
#[derive(Clone, Copy)]
pub struct PendingOperationRequest {
pub source: Entity,
}
impl PendingOperationRequest {
pub fn activate<'a>(
self,
world: &'a mut World,
roster: &'a mut OperationRoster,
) -> OperationRequest<'a> {
OperationRequest {
source: self.source,
world,
roster,
}
}
}
pub trait Operation {
fn setup(self, info: OperationSetup) -> OperationResult;
fn execute(request: OperationRequest) -> OperationResult;
fn cleanup(clean: OperationCleanup) -> OperationResult;
fn is_reachable(reachability: OperationReachability) -> ReachabilityResult;
}
pub trait OrBroken: Sized {
type Value;
fn or_not_ready(self) -> Result<Self::Value, OperationError>;
fn or_broken(self) -> Result<Self::Value, OperationError> {
self.or_broken_impl(true)
}
fn or_broken_hide(self) -> Result<Self::Value, OperationError> {
self.or_broken_impl(false)
}
fn or_broken_impl(self, with_backtrace: bool) -> Result<Self::Value, OperationError>;
}
impl<T, E> OrBroken for Result<T, E> {
type Value = T;
fn or_not_ready(self) -> Result<Self::Value, OperationError> {
self.map_err(|_| OperationError::NotReady)
}
fn or_broken_impl(self, with_backtrace: bool) -> Result<T, OperationError> {
if with_backtrace {
self.map_err(|_| OperationError::Broken(Some(Backtrace::new())))
} else {
self.map_err(|_| OperationError::Broken(None))
}
}
}
impl<T> OrBroken for Option<T> {
type Value = T;
fn or_not_ready(self) -> Result<Self::Value, OperationError> {
self.ok_or(OperationError::NotReady)
}
fn or_broken_impl(self, with_backtrace: bool) -> Result<T, OperationError> {
if with_backtrace {
self.ok_or_else(|| OperationError::Broken(Some(Backtrace::new())))
} else {
self.ok_or(OperationError::Broken(None))
}
}
}
pub(crate) struct AddOperation<Op: Operation> {
scope: Option<Entity>,
source: Entity,
operation: Op,
}
impl<Op: Operation> AddOperation<Op> {
pub(crate) fn new(scope: Option<Entity>, source: Entity, operation: Op) -> Self {
Self {
scope,
source,
operation,
}
}
}
impl<Op: Operation + 'static + Sync + Send> Command for AddOperation<Op> {
fn apply(self, world: &mut World) {
if let Err(error) = self.operation.setup(OperationSetup {
source: self.source,
world,
}) {
world
.get_resource_or_insert_with(UnhandledErrors::default)
.setup
.push(SetupFailure {
broken_node: self.source,
error,
});
}
let mut source_mut = world.entity_mut(self.source);
source_mut.insert((
OperationExecuteStorage(perform_operation::<Op>),
OperationCleanupStorage(Op::cleanup),
OperationReachabilityStorage(Op::is_reachable),
));
if let Some(scope) = self.scope {
source_mut
.insert(ScopeStorage::new(scope))
.set_parent(scope);
match world.get_mut::<ScopeContents>(scope).or_broken() {
Ok(mut contents) => {
contents.add_node(self.source);
}
Err(error) => {
world
.get_resource_or_insert_with(UnhandledErrors::default)
.setup
.push(SetupFailure {
broken_node: self.source,
error,
});
}
}
}
}
}
#[derive(Component)]
pub(crate) struct OperationExecuteStorage(pub(crate) fn(OperationRequest));
#[derive(Component)]
pub(crate) struct OperationReachabilityStorage(fn(OperationReachability) -> ReachabilityResult);
pub fn execute_operation(request: OperationRequest) {
let Some(operator) = request.world.get::<OperationExecuteStorage>(request.source) else {
if request.world.get::<UnusedTarget>(request.source).is_none() {
if request.world.get_entity(request.source).is_some() {
request
.world
.get_resource_or_insert_with(UnhandledErrors::default)
.broken
.push(Broken {
node: request.source,
backtrace: Some(Backtrace::new()),
});
}
}
return;
};
let operator = operator.0;
operator(request);
}
pub fn awaken_task(request: OperationRequest) {
let Some(operator) = request.world.get::<OperationExecuteStorage>(request.source) else {
return;
};
let operator = operator.0;
operator(request);
}
fn perform_operation<Op: Operation>(
OperationRequest {
source,
world,
roster,
}: OperationRequest,
) {
match Op::execute(OperationRequest {
source,
world,
roster,
}) {
Ok(()) => {
}
Err(OperationError::NotReady) => {
}
Err(OperationError::Broken(backtrace)) => {
try_emit_broken(source, backtrace, world, roster);
}
}
}
pub struct DownstreamIter<'a> {
output: DownstreamFinishIter<'a>,
streams: Option<std::slice::Iter<'a, Entity>>,
}
impl<'a> Iterator for DownstreamIter<'a> {
type Item = Entity;
fn next(&mut self) -> Option<Self::Item> {
if let Some(output) = self.output.next() {
return Some(output);
}
if let Some(streams) = &mut self.streams {
return streams.next().copied();
}
None
}
}
enum DownstreamFinishIter<'a> {
Single(Option<Entity>),
Fork(std::slice::Iter<'a, Entity>),
}
impl<'a> Iterator for DownstreamFinishIter<'a> {
type Item = Entity;
fn next(&mut self) -> Option<Self::Item> {
match self {
Self::Single(iter) => iter.take(),
Self::Fork(iter) => iter.next().copied(),
}
}
}
pub fn immediately_downstream_of(source: Entity, world: &World) -> DownstreamIter<'_> {
let output = if let Some(target) = world.get::<SingleTargetStorage>(source) {
DownstreamFinishIter::Single(Some(target.get()))
} else if let Some(fork) = world.get::<ForkTargetStorage>(source) {
DownstreamFinishIter::Fork(fork.0.iter())
} else {
DownstreamFinishIter::Single(None)
};
let streams = world.get::<StreamTargetMap>(source).map(|s| s.map.iter());
DownstreamIter { output, streams }
}
pub fn is_downstream_of(source: Entity, target: Entity, world: &World) -> bool {
if source == target {
return false;
}
let mut queue: Vec<Entity> = Vec::new();
let mut visited = HashSet::new();
queue.push(source);
while let Some(top) = queue.pop() {
if top == target {
return true;
}
if !visited.insert(top) {
continue;
}
for next in immediately_downstream_of(top, world) {
queue.push(next);
}
}
false
}
pub struct DisposalUpdate<'a> {
pub source: Entity,
pub origin: Entity,
pub session: Entity,
pub world: &'a mut World,
pub roster: &'a mut OperationRoster,
}
#[derive(Component)]
pub struct DisposalListener(pub fn(DisposalUpdate) -> OperationResult);