use bevy_ecs::prelude::Component;
use bevy_hierarchy::DespawnRecursiveExt;
use tokio::sync::mpsc::UnboundedSender as Sender;
use crate::{
promise::private::Sender as PromiseSender, ImpulseLifecycleChannel, Impulsive, Input,
InputBundle, ManageInput, OnTerminalCancelled, OperationCancel, OperationRequest,
OperationResult, OperationSetup, OrBroken,
};
#[derive(Component)]
pub(crate) struct TakenResponse<T> {
sender: PromiseSender<T>,
}
impl<T> TakenResponse<T> {
pub(crate) fn new(sender: PromiseSender<T>) -> Self {
Self { sender }
}
}
impl<T: 'static + Send + Sync> Impulsive for TakenResponse<T> {
fn setup(mut self, OperationSetup { source, world }: OperationSetup) -> OperationResult {
let lifecycle_sender = world
.get_resource_or_insert_with(ImpulseLifecycleChannel::default)
.sender
.clone();
self.sender.on_promise_drop(move || {
lifecycle_sender.send(source).ok();
});
world.entity_mut(source).insert((
InputBundle::<T>::new(),
OnTerminalCancelled(cancel_taken_target::<T>),
self,
));
Ok(())
}
fn execute(OperationRequest { source, world, .. }: OperationRequest) -> OperationResult {
let mut source_mut = world.get_entity_mut(source).or_broken()?;
let Input { data, .. } = source_mut.take_input::<T>()?;
let sender = source_mut.take::<TakenResponse<T>>().or_broken()?.sender;
sender.send(data).ok();
source_mut.despawn_recursive();
Ok(())
}
}
#[derive(Component)]
pub(crate) struct TakenStream<T> {
sender: Sender<T>,
}
impl<T> TakenStream<T> {
pub fn new(sender: Sender<T>) -> Self {
Self { sender }
}
}
impl<T: 'static + Send + Sync> Impulsive for TakenStream<T> {
fn setup(self, OperationSetup { source, world }: OperationSetup) -> OperationResult {
world
.entity_mut(source)
.insert((InputBundle::<T>::new(), self));
Ok(())
}
fn execute(OperationRequest { source, world, .. }: OperationRequest) -> OperationResult {
let mut source_mut = world.get_entity_mut(source).or_broken()?;
let Input { data, .. } = source_mut.take_input::<T>()?;
let stream = source_mut.get::<TakenStream<T>>().or_broken()?;
stream.sender.send(data).ok();
Ok(())
}
}
fn cancel_taken_target<T>(OperationCancel { cancel, world, .. }: OperationCancel) -> OperationResult
where
T: 'static + Send + Sync,
{
let mut target_mut = world.get_entity_mut(cancel.target).or_broken()?;
let taken = target_mut.take::<TakenResponse<T>>().or_broken()?;
taken.sender.cancel(cancel.cancellation).ok();
target_mut.despawn_recursive();
Ok(())
}