use bevy_ecs::prelude::{Bundle, Component, Entity};
use std::future::Future;
use crate::{
ActiveTasksStorage, AsyncMap, BlockingMap, CallAsyncMapOnce, CallBlockingMapOnce, Channel,
ChannelQueue, Executable, Input, InputBundle, ManageInput, OperateTask, OperationRequest,
OperationResult, OperationSetup, OrBroken, Sendish, SingleTargetStorage, StreamPack,
UnusedStreams,
async_execution::{spawn_task, task_cancel_sender},
make_stream_buffers_from_world,
};
#[derive(Bundle)]
pub(crate) struct BlockingMapOnce<F, Request, Response, Streams>
where
F: 'static + Send + Sync,
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Streams: StreamPack,
{
f: BlockingMapOnceStorage<F>,
target: SingleTargetStorage,
#[bundle(ignore)]
_ignore: std::marker::PhantomData<fn(Request, Response, Streams)>,
}
impl<F, Request, Response, Streams> BlockingMapOnce<F, Request, Response, Streams>
where
F: 'static + Send + Sync,
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Streams: StreamPack,
{
pub(crate) fn new(target: Entity, f: F) -> Self {
Self {
f: BlockingMapOnceStorage { f },
target: SingleTargetStorage::new(target),
_ignore: Default::default(),
}
}
}
#[derive(Component)]
struct BlockingMapOnceStorage<F> {
f: F,
}
impl<F, Request, Response, Streams> Executable for BlockingMapOnce<F, Request, Response, Streams>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Streams: StreamPack,
F: CallBlockingMapOnce<Request, Response, Streams> + 'static + Send + Sync,
{
fn setup(self, OperationSetup { source, world }: OperationSetup) -> OperationResult {
world
.entity_mut(source)
.insert((self, InputBundle::<Request>::new()));
Ok(())
}
fn execute(
OperationRequest {
source,
world,
roster,
}: OperationRequest,
) -> OperationResult {
let streams = make_stream_buffers_from_world::<Streams>(source, world)?;
let mut source_mut = world.get_entity_mut(source).or_broken()?;
let target = source_mut.get::<SingleTargetStorage>().or_broken()?.get();
let Input {
session,
data: request,
} = source_mut.take_input::<Request>()?;
let f = source_mut
.take::<BlockingMapOnceStorage<F>>()
.or_broken()?
.f;
let response = f.call(BlockingMap {
request,
streams: streams.clone(),
source,
session,
});
let mut unused_streams = UnusedStreams::new(source);
Streams::process_stream_buffers(
streams,
source,
session,
&mut unused_streams,
world,
roster,
)?;
world
.get_entity_mut(target)
.or_broken()?
.give_input(session, response, roster)?;
Ok(())
}
}
pub(crate) struct AsyncMapOnce<F, Request, Task, Streams>
where
F: 'static + Send + Sync,
Request: 'static + Send + Sync,
Task: 'static + Sendish,
Streams: 'static + Send + Sync,
{
f: AsyncMapOnceStorage<F>,
target: SingleTargetStorage,
_ignore: std::marker::PhantomData<fn(Request, Task, Streams)>,
}
impl<F, Request, Task, Streams> AsyncMapOnce<F, Request, Task, Streams>
where
F: 'static + Send + Sync,
Request: 'static + Send + Sync,
Task: 'static + Sendish,
Streams: 'static + Send + Sync,
{
pub(crate) fn new(target: Entity, f: F) -> Self {
Self {
f: AsyncMapOnceStorage { f },
target: SingleTargetStorage::new(target),
_ignore: Default::default(),
}
}
}
#[derive(Component)]
struct AsyncMapOnceStorage<F> {
f: F,
}
impl<F, Request, Task, Streams> Executable for AsyncMapOnce<F, Request, Task, Streams>
where
Request: 'static + Send + Sync,
Task: Future + 'static + Sendish,
Task::Output: 'static + Send + Sync,
Streams: StreamPack,
F: CallAsyncMapOnce<Request, Task, Streams> + 'static + Send + Sync,
{
fn setup(self, OperationSetup { source, world }: OperationSetup) -> OperationResult {
world.entity_mut(source).insert((
self.f,
self.target,
InputBundle::<Request>::new(),
ActiveTasksStorage::default(),
));
Ok(())
}
fn execute(
OperationRequest {
source,
world,
roster,
}: OperationRequest,
) -> OperationResult {
let sender = world
.get_resource_or_insert_with(ChannelQueue::new)
.sender
.clone();
let mut source_mut = world.get_entity_mut(source).or_broken()?;
let Input {
session,
data: request,
} = source_mut.take_input::<Request>()?;
let target = source_mut.get::<SingleTargetStorage>().or_broken()?.get();
let f = source_mut.take::<AsyncMapOnceStorage<F>>().or_broken()?.f;
let channel = Channel::new(source, session, sender.clone());
let streams = channel.for_streams::<Streams>(world)?;
let task = spawn_task(
f.call(AsyncMap {
request,
streams,
channel,
source,
session,
}),
world,
);
let cancel_sender = task_cancel_sender(world);
let task_source = world.spawn(()).id();
OperateTask::<_, Streams>::new(
task_source,
session,
source,
target,
task,
cancel_sender,
None,
sender,
)
.add(world, roster);
Ok(())
}
}