use bevy_ecs::{
prelude::{Component, Entity, World},
world::{EntityRef, EntityWorldMut},
};
use backtrace::Backtrace;
use std::{
collections::HashMap,
fmt::{Debug, Display},
sync::Arc,
};
use smallvec::SmallVec;
use thiserror::Error as ThisError;
use crate::{
Cancel, Cancellation, DisposalFailure, OperationResult, OperationRoster, OrBroken,
SeriesMarker, UnhandledErrors, UnusedTarget, operation::ScopeStorage,
};
#[derive(ThisError, Debug, Clone)]
#[error("the output of an operation in a workflow was disposed: {}", .cause)]
pub struct Disposal {
pub cause: Arc<DisposalCause>,
}
impl<T: Into<DisposalCause>> From<T> for Disposal {
fn from(value: T) -> Self {
Disposal {
cause: Arc::new(value.into()),
}
}
}
impl Disposal {
pub fn service_unavailable(service: Entity, for_node: Entity) -> Disposal {
ServiceUnavailable { service, for_node }.into()
}
pub fn task_despawned(task: Entity, node: Entity) -> Disposal {
TaskDespawned { task, node }.into()
}
pub fn branching(
branched_at_node: Entity,
disposed_for_target: Entity,
reason: Option<anyhow::Error>,
) -> Disposal {
DisposedBranch {
branched_at_node,
disposed_for_target,
reason,
}
.into()
}
pub fn buffer_key(accessor_node: Entity, key_for_buffer: Entity) -> Disposal {
DisposedBufferKey {
accessor_node,
key_for_buffer,
}
.into()
}
pub fn supplanted(
supplanted_at_node: Entity,
supplanted_by_node: Entity,
supplanting_session: Entity,
) -> Disposal {
Supplanted {
supplanted_at_node,
supplanted_by_node,
supplanting_session,
}
.into()
}
pub fn filtered(filtered_at_node: Entity, reason: Option<anyhow::Error>) -> Self {
Filtered {
filtered_at_node,
reason,
}
.into()
}
pub fn trimming(trimmer: Entity, nodes: SmallVec<[Entity; 16]>) -> Self {
Trimming { trimmer, nodes }.into()
}
pub fn closed_gate(gate_node: Entity, closed_buffers: SmallVec<[Entity; 8]>) -> Self {
ClosedGate {
gate_node,
closed_buffers,
}
.into()
}
pub fn empty_spread(spread_node: Entity) -> Self {
EmptySpread { spread_node }.into()
}
pub fn deficient_collection(collect_node: Entity, min: usize, actual: usize) -> Self {
DeficientCollection {
collect_node,
min,
actual,
}
.into()
}
pub fn incomplete_split(
split_node: Entity,
missing_keys: SmallVec<[Option<Arc<str>>; 16]>,
) -> Self {
IncompleteSplit {
split_node,
missing_keys,
}
.into()
}
}
#[derive(ThisError, Debug)]
pub enum DisposalCause {
#[error("{}", .0)]
Supplanted(Supplanted),
#[error("{}", .0)]
Filtered(Filtered),
#[error("{}", .0)]
Branching(DisposedBranch),
#[error("{}", .0)]
BufferKey(DisposedBufferKey),
#[error("{}", .0)]
ServiceUnavailable(ServiceUnavailable),
#[error("{}", .0)]
TaskDespawned(TaskDespawned),
#[error("{}", .0)]
PoisonedMutex(PoisonedMutexDisposal),
#[error("{}", .0)]
Scope(Cancellation),
#[error("{}", .0)]
UnusedStreams(UnusedStreams),
#[error("{}", .0)]
Trimming(Trimming),
#[error("{}", .0)]
ClosedGate(ClosedGate),
#[error("{}", .0)]
EmptySpread(EmptySpread),
#[error("{}", .0)]
DeficientCollection(DeficientCollection),
#[error("{}", .0)]
IncompleteSplit(IncompleteSplit),
}
#[derive(ThisError, Debug, Clone, Copy)]
#[error("request was supplanted")]
pub struct Supplanted {
pub supplanted_at_node: Entity,
pub supplanted_by_node: Entity,
pub supplanting_session: Entity,
}
impl Supplanted {
pub fn new(
cancelled_at_node: Entity,
supplanting_node: Entity,
supplanting_session: Entity,
) -> Self {
Self {
supplanted_at_node: cancelled_at_node,
supplanted_by_node: supplanting_node,
supplanting_session,
}
}
}
impl From<Supplanted> for DisposalCause {
fn from(value: Supplanted) -> Self {
DisposalCause::Supplanted(value)
}
}
#[derive(ThisError, Debug)]
pub struct Filtered {
pub filtered_at_node: Entity,
pub reason: Option<anyhow::Error>,
}
impl Filtered {
pub fn new(filtered_at_node: Entity, reason: Option<anyhow::Error>) -> Self {
Self {
filtered_at_node,
reason,
}
}
}
impl From<Filtered> for DisposalCause {
fn from(value: Filtered) -> Self {
Self::Filtered(value)
}
}
impl Display for Filtered {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "filtered at node [{:?}]", self.filtered_at_node)?;
if let Some(reason) = &self.reason {
write!(f, ": {}", reason)?;
} else {
write!(f, " [no reason given]")?;
}
Ok(())
}
}
#[derive(ThisError, Debug)]
pub struct DisposedBranch {
pub branched_at_node: Entity,
pub disposed_for_target: Entity,
pub reason: Option<anyhow::Error>,
}
impl From<DisposedBranch> for DisposalCause {
fn from(value: DisposedBranch) -> Self {
Self::Branching(value)
}
}
impl Display for DisposedBranch {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"branch [{:?}] -> [{:?}] disposed",
self.branched_at_node, self.disposed_for_target,
)?;
if let Some(reason) = &self.reason {
write!(f, ": {}", reason)?;
} else {
write!(f, " [no reason given]")?;
}
Ok(())
}
}
#[derive(ThisError, Debug)]
#[error("buffer key disposed")]
pub struct DisposedBufferKey {
pub accessor_node: Entity,
pub key_for_buffer: Entity,
}
impl From<DisposedBufferKey> for DisposalCause {
fn from(value: DisposedBufferKey) -> Self {
Self::BufferKey(value)
}
}
#[derive(ThisError, Debug)]
#[error("service [{:?}] no longer available for node [{:?}]", .service, .for_node)]
pub struct ServiceUnavailable {
pub service: Entity,
pub for_node: Entity,
}
impl From<ServiceUnavailable> for DisposalCause {
fn from(value: ServiceUnavailable) -> Self {
Self::ServiceUnavailable(value)
}
}
#[derive(ThisError, Debug)]
#[error("task [{:?}] despawned for node [{:?}]", .task, .node)]
pub struct TaskDespawned {
pub task: Entity,
pub node: Entity,
}
impl From<TaskDespawned> for DisposalCause {
fn from(value: TaskDespawned) -> Self {
Self::TaskDespawned(value)
}
}
#[derive(ThisError, Debug)]
#[error("poisoned mutex in node [{:?}]", .for_node)]
pub struct PoisonedMutexDisposal {
pub for_node: Entity,
}
impl From<PoisonedMutexDisposal> for DisposalCause {
fn from(value: PoisonedMutexDisposal) -> Self {
Self::PoisonedMutex(value)
}
}
#[derive(ThisError, Debug)]
#[error("streams unused for node [{:?}]:{}", .node, DisplaySlice(.streams))]
pub struct UnusedStreams {
pub node: Entity,
pub streams: Vec<&'static str>,
}
impl UnusedStreams {
pub fn new(node: Entity) -> Self {
Self {
node,
streams: Default::default(),
}
}
}
impl From<UnusedStreams> for DisposalCause {
fn from(value: UnusedStreams) -> Self {
Self::UnusedStreams(value)
}
}
#[derive(ThisError, Debug)]
#[error("nodes trimmed by [{:?}]:{}", .trimmer, DisplayDebugSlice(.nodes))]
pub struct Trimming {
pub trimmer: Entity,
pub nodes: SmallVec<[Entity; 16]>,
}
impl From<Trimming> for DisposalCause {
fn from(value: Trimming) -> Self {
Self::Trimming(value)
}
}
#[derive(ThisError, Debug)]
#[error("gate [{:?}] closed buffers:{}", .gate_node, DisplayDebugSlice(.closed_buffers))]
pub struct ClosedGate {
pub gate_node: Entity,
pub closed_buffers: SmallVec<[Entity; 8]>,
}
impl From<ClosedGate> for DisposalCause {
fn from(value: ClosedGate) -> Self {
Self::ClosedGate(value)
}
}
#[derive(ThisError, Debug)]
#[error("spread operation [{:?}] had an empty collection", .spread_node)]
pub struct EmptySpread {
pub spread_node: Entity,
}
impl From<EmptySpread> for DisposalCause {
fn from(value: EmptySpread) -> Self {
Self::EmptySpread(value)
}
}
#[derive(ThisError, Debug)]
#[error("collect operation [{:?}] needed {} items but ran out at {}", .collect_node, .min, .actual)]
pub struct DeficientCollection {
pub collect_node: Entity,
pub min: usize,
pub actual: usize,
}
impl From<DeficientCollection> for DisposalCause {
fn from(value: DeficientCollection) -> Self {
Self::DeficientCollection(value)
}
}
#[derive(ThisError, Debug)]
#[error("split operation [{:?}] was missing items for keys:{}", .split_node, DisplayDebugSlice(.missing_keys))]
pub struct IncompleteSplit {
pub split_node: Entity,
pub missing_keys: SmallVec<[Option<Arc<str>>; 16]>,
}
impl From<IncompleteSplit> for DisposalCause {
fn from(value: IncompleteSplit) -> Self {
Self::IncompleteSplit(value)
}
}
pub trait ManageDisposal {
fn emit_disposal(&mut self, session: Entity, disposal: Disposal, roster: &mut OperationRoster);
fn clear_disposals(&mut self, session: Entity);
fn transfer_disposals(&mut self, to_node: Entity) -> OperationResult;
}
pub trait InspectDisposals {
fn get_disposals(&self, session: Entity) -> Option<&Vec<Disposal>>;
}
impl<'w> ManageDisposal for EntityWorldMut<'w> {
fn emit_disposal(&mut self, session: Entity, disposal: Disposal, roster: &mut OperationRoster) {
let Some(scope) = self.get::<ScopeStorage>() else {
if self.contains::<SeriesMarker>() {
if let DisposalCause::Supplanted(supplanted) = disposal.cause.as_ref() {
let cancellation: Cancellation = (*supplanted).into();
roster.cancel(Cancel {
origin: self.id(),
target: session,
session: Some(session),
cancellation,
});
}
} else if !self.contains::<UnusedTarget>() {
let broken_node = self.id();
self.world_scope(|world| {
world
.get_resource_or_insert_with(UnhandledErrors::default)
.disposals
.push(DisposalFailure {
disposal,
broken_node,
backtrace: Some(Backtrace::new()),
});
});
}
return;
};
let scope = scope.get();
if let Some(mut storage) = self.get_mut::<DisposalStorage>() {
storage.disposals.entry(session).or_default().push(disposal);
} else {
let mut storage = DisposalStorage::default();
storage.disposals.entry(session).or_default().push(disposal);
self.insert(storage);
}
roster.disposed(scope, self.id(), session);
}
fn clear_disposals(&mut self, session: Entity) {
if let Some(mut storage) = self.get_mut::<DisposalStorage>() {
storage.disposals.remove(&session);
}
}
fn transfer_disposals(&mut self, to: Entity) -> OperationResult {
if let Some(from_storage) = self.take::<DisposalStorage>() {
self.world_scope::<OperationResult>(|world| {
let mut to_mut = world.get_entity_mut(to).or_broken()?;
match to_mut.get_mut::<DisposalStorage>() {
Some(mut to_storage) => {
for (session, disposals) in from_storage.disposals {
to_storage
.disposals
.entry(session)
.or_default()
.extend(disposals);
}
}
None => {
to_mut.insert(from_storage);
}
}
Ok(())
})?;
}
Ok(())
}
}
impl<'w> InspectDisposals for EntityWorldMut<'w> {
fn get_disposals(&self, session: Entity) -> Option<&Vec<Disposal>> {
if let Some(storage) = self.get::<DisposalStorage>() {
return storage.disposals.get(&session);
}
None
}
}
impl<'w> InspectDisposals for EntityRef<'w> {
fn get_disposals(&self, session: Entity) -> Option<&Vec<Disposal>> {
if let Some(storage) = self.get::<DisposalStorage>() {
return storage.disposals.get(&session);
}
None
}
}
pub fn emit_disposal(
source: Entity,
session: Entity,
disposal: Disposal,
world: &mut World,
roster: &mut OperationRoster,
) {
if let Ok(mut source_mut) = world.get_entity_mut(source) {
source_mut.emit_disposal(session, disposal, roster);
} else {
world
.get_resource_or_insert_with(UnhandledErrors::default)
.disposals
.push(DisposalFailure {
disposal,
broken_node: source,
backtrace: Some(Backtrace::new()),
});
}
}
#[derive(Component, Default)]
struct DisposalStorage {
disposals: HashMap<Entity, Vec<Disposal>>,
}
pub(crate) struct DisplaySlice<'a, T>(&'a [T]);
impl<'a, T> Display for DisplaySlice<'a, T>
where
T: Display,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for item in self.0 {
write!(f, " {}", item)?;
}
Ok(())
}
}
pub(crate) struct DisplayDebugSlice<'a, T>(pub(crate) &'a [T]);
impl<'a, T> Display for DisplayDebugSlice<'a, T>
where
T: Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for item in self.0 {
write!(f, " {:?}", item)?;
}
Ok(())
}
}