use bevy_ecs::prelude::{Command, Component, Entity, World};
use smallvec::SmallVec;
use std::{collections::HashMap, sync::Arc};
use crate::{
Broken, Disposal, ForkTargetStorage, Input, InputBundle, ManageDisposal, ManageInput,
MiscellaneousFailure, Operation, OperationCleanup, OperationError, OperationReachability,
OperationRequest, OperationResult, OperationSetup, OrBroken, ReachabilityResult,
SingleInputStorage, SplitDispatcher, Splittable, UnhandledErrors,
};
#[derive(Component)]
pub(crate) struct OperateSplit<T: Splittable> {
connections: HashMap<T::Key, usize>,
index_to_key: Vec<Arc<str>>,
outputs_cache: Option<Vec<Vec<(T::Identifier, T::Item)>>>,
}
impl<T: Splittable> Default for OperateSplit<T> {
fn default() -> Self {
Self {
connections: Default::default(),
index_to_key: Vec::new(),
outputs_cache: Some(Vec::new()),
}
}
}
impl<T: 'static + Splittable + Send + Sync> Operation for OperateSplit<T> {
fn setup(self, OperationSetup { source, world }: OperationSetup) -> OperationResult {
world.entity_mut(source).insert((
self,
InputBundle::<T>::new(),
ForkTargetStorage::default(),
));
Ok(())
}
fn execute(
OperationRequest {
source,
world,
roster,
}: OperationRequest,
) -> OperationResult {
let mut source_mut = world.get_entity_mut(source).or_broken()?;
let Input { session, data } = source_mut.take_input::<T>()?;
let targets = source_mut.get::<ForkTargetStorage>().or_broken()?.0.clone();
let mut split = source_mut.get_mut::<OperateSplit<T>>().or_broken()?;
let mut outputs = split.outputs_cache.take().unwrap_or(Vec::new());
let dispatcher = SplitDispatcher {
connections: &split.connections,
outputs: &mut outputs,
};
data.split(dispatcher)?;
let mut missed_indices: SmallVec<[usize; 16]> = SmallVec::new();
for (index, (items, target)) in outputs.iter_mut().zip(targets).enumerate() {
if items.is_empty() {
missed_indices.push(index);
}
for output in items.drain(..) {
world
.get_entity_mut(target)
.or_broken()?
.give_input(session, output, roster)?;
}
}
let mut source_mut = world.get_entity_mut(source).or_broken()?;
if !missed_indices.is_empty() {
let split = source_mut.get::<OperateSplit<T>>().or_broken()?;
let missing_keys: SmallVec<[Option<Arc<str>>; 16]> = missed_indices
.into_iter()
.map(|index| split.index_to_key.get(index).cloned())
.collect();
source_mut.emit_disposal(
session,
Disposal::incomplete_split(source, missing_keys),
roster,
);
}
source_mut
.get_mut::<OperateSplit<T>>()
.or_broken()?
.outputs_cache
.replace(outputs);
Ok(())
}
fn cleanup(mut clean: OperationCleanup) -> OperationResult {
clean.cleanup_inputs::<T>()?;
clean.notify_cleaned()
}
fn is_reachable(mut reachability: OperationReachability) -> ReachabilityResult {
if reachability.has_input::<T>()? {
return Ok(true);
}
SingleInputStorage::is_reachable(&mut reachability)
}
}
pub(crate) struct ConnectToSplit<T: Splittable> {
pub(crate) source: Entity,
pub(crate) target: Entity,
pub(crate) key: T::Key,
}
impl<T: 'static + Splittable> Command for ConnectToSplit<T> {
fn apply(self, world: &mut World) {
let node = self.source;
if let Err(OperationError::Broken(backtrace)) = self.connect(world) {
world
.get_resource_or_insert_with(UnhandledErrors::default)
.broken
.push(Broken { node, backtrace });
}
}
}
impl<T: 'static + Splittable> ConnectToSplit<T> {
fn connect(self, world: &mut World) -> Result<(), OperationError> {
let mut target_storage = world
.get_mut::<ForkTargetStorage>(self.source)
.or_broken()?;
let index = target_storage.0.len();
target_storage.0.push(self.target);
world
.get_entity_mut(self.target)
.or_broken()?
.insert(SingleInputStorage::new(self.source));
let mut split = world.get_mut::<OperateSplit<T>>(self.source).or_broken()?;
let previous_index = split.connections.insert(self.key.clone(), index);
split
.outputs_cache
.as_mut()
.or_broken()?
.resize_with(index + 1, Vec::new);
if split.index_to_key.len() != index {
let reverse_map_size = split.index_to_key.len();
world
.get_resource_or_insert_with(UnhandledErrors::default)
.miscellaneous
.push(MiscellaneousFailure {
error: Arc::new(anyhow::anyhow!(
"Mismatch between reverse map size [{}] and new connection index [{}]",
reverse_map_size,
index,
)),
backtrace: Some(backtrace::Backtrace::new()),
});
} else {
split
.index_to_key
.push(format!("{:?}", self.key).as_str().into());
}
if let Some(previous_index) = previous_index {
let target_storage = world.get::<ForkTargetStorage>(self.source).or_broken()?;
let previous_target = *target_storage.0.get(previous_index).or_broken()?;
world
.get_resource_or_insert_with(UnhandledErrors::default)
.miscellaneous
.push(MiscellaneousFailure {
error: Arc::new(anyhow::anyhow!(
"Double-connected key [{:?}] for split node {:?}. Original target: {:?}, new target: {:?}",
self.key,
self.source,
previous_target,
self.target,
)),
backtrace: Some(backtrace::Backtrace::new()),
});
}
Ok(())
}
}