use bevy_ecs::prelude::{Bundle, Component, Entity};
use std::future::Future;
use crate::{
async_execution::{spawn_task, task_cancel_sender},
make_stream_buffer_from_world, ActiveTasksStorage, AsyncMap, BlockingMap, CallAsyncMapOnce,
CallBlockingMapOnce, Channel, ChannelQueue, Impulsive, Input, InputBundle, ManageInput,
OperateTask, OperationRequest, OperationResult, OperationSetup, OrBroken, Sendish,
SingleTargetStorage, StreamPack, UnusedStreams,
};
#[derive(Bundle)]
pub(crate) struct ImpulseBlockingMap<F, Request, Response, Streams>
where
F: 'static + Send + Sync,
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Streams: StreamPack,
{
f: BlockingMapOnceStorage<F>,
target: SingleTargetStorage,
#[bundle(ignore)]
_ignore: std::marker::PhantomData<fn(Request, Response, Streams)>,
}
impl<F, Request, Response, Streams> ImpulseBlockingMap<F, Request, Response, Streams>
where
F: 'static + Send + Sync,
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Streams: StreamPack,
{
pub(crate) fn new(target: Entity, f: F) -> Self {
Self {
f: BlockingMapOnceStorage { f },
target: SingleTargetStorage::new(target),
_ignore: Default::default(),
}
}
}
#[derive(Component)]
struct BlockingMapOnceStorage<F> {
f: F,
}
impl<F, Request, Response, Streams> Impulsive for ImpulseBlockingMap<F, Request, Response, Streams>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Streams: StreamPack,
F: CallBlockingMapOnce<Request, Response, Streams> + 'static + Send + Sync,
{
fn setup(self, OperationSetup { source, world }: OperationSetup) -> OperationResult {
world
.entity_mut(source)
.insert((self, InputBundle::<Request>::new()));
Ok(())
}
fn execute(
OperationRequest {
source,
world,
roster,
}: OperationRequest,
) -> OperationResult {
let streams = make_stream_buffer_from_world::<Streams>(source, world)?;
let mut source_mut = world.get_entity_mut(source).or_broken()?;
let target = source_mut.get::<SingleTargetStorage>().or_broken()?.get();
let Input {
session,
data: request,
} = source_mut.take_input::<Request>()?;
let f = source_mut
.take::<BlockingMapOnceStorage<F>>()
.or_broken()?
.f;
let response = f.call(BlockingMap {
request,
streams: streams.clone(),
source,
session,
});
let mut unused_streams = UnusedStreams::new(source);
Streams::process_buffer(streams, source, session, &mut unused_streams, world, roster)?;
world
.get_entity_mut(target)
.or_broken()?
.give_input(session, response, roster)?;
Ok(())
}
}
pub(crate) struct ImpulseAsyncMap<F, Request, Task, Streams>
where
F: 'static + Send + Sync,
Request: 'static + Send + Sync,
Task: 'static + Sendish,
Streams: 'static + Send + Sync,
{
f: AsyncMapOnceStorage<F>,
target: SingleTargetStorage,
_ignore: std::marker::PhantomData<fn(Request, Task, Streams)>,
}
impl<F, Request, Task, Streams> ImpulseAsyncMap<F, Request, Task, Streams>
where
F: 'static + Send + Sync,
Request: 'static + Send + Sync,
Task: 'static + Sendish,
Streams: 'static + Send + Sync,
{
pub(crate) fn new(target: Entity, f: F) -> Self {
Self {
f: AsyncMapOnceStorage { f },
target: SingleTargetStorage::new(target),
_ignore: Default::default(),
}
}
}
#[derive(Component)]
struct AsyncMapOnceStorage<F> {
f: F,
}
impl<F, Request, Task, Streams> Impulsive for ImpulseAsyncMap<F, Request, Task, Streams>
where
Request: 'static + Send + Sync,
Task: Future + 'static + Sendish,
Task::Output: 'static + Send + Sync,
Streams: StreamPack,
F: CallAsyncMapOnce<Request, Task, Streams> + 'static + Send + Sync,
{
fn setup(self, OperationSetup { source, world }: OperationSetup) -> OperationResult {
world.entity_mut(source).insert((
self.f,
self.target,
InputBundle::<Request>::new(),
ActiveTasksStorage::default(),
));
Ok(())
}
fn execute(
OperationRequest {
source,
world,
roster,
}: OperationRequest,
) -> OperationResult {
let sender = world
.get_resource_or_insert_with(ChannelQueue::new)
.sender
.clone();
let mut source_mut = world.get_entity_mut(source).or_broken()?;
let Input {
session,
data: request,
} = source_mut.take_input::<Request>()?;
let target = source_mut.get::<SingleTargetStorage>().or_broken()?.get();
let f = source_mut.take::<AsyncMapOnceStorage<F>>().or_broken()?.f;
let channel = Channel::new(source, session, sender.clone());
let streams = channel.for_streams::<Streams>(world)?;
let task = spawn_task(
f.call(AsyncMap {
request,
streams,
channel,
source,
session,
}),
world,
);
let cancel_sender = task_cancel_sender(world);
let task_source = world.spawn(()).id();
OperateTask::<_, Streams>::new(
task_source,
session,
source,
target,
task,
cancel_sender,
None,
sender,
)
.add(world, roster);
Ok(())
}
}