use crate::{
ActiveTasksStorage, AddExecution, Cleanup, CleanupContents, DisposeForUnavailableService,
Executable, FinalizeCleanup, FinalizeCleanupRequest, Input, InputBundle, ManageDisposal,
ManageInput, OperateService, Operation, OperationCleanup, OperationReachability,
OperationRequest, OperationResult, OperationSetup, OrBroken, ProviderStorage,
ReachabilityResult, ScopeStorage, ServiceInstructions, ServiceRequest, SingleInputStorage,
SingleTargetStorage, StreamPack, StreamTargetMap, dispatch_service,
};
use bevy_ecs::prelude::{Command, Component, Entity};
use smallvec::SmallVec;
use std::collections::HashMap;
pub(crate) struct Injection<Request, Response, Streams> {
target: Entity,
_ignore: std::marker::PhantomData<fn(Request, Response, Streams)>,
}
impl<Request, Response, Streams> Operation for Injection<Request, Response, Streams>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Streams: StreamPack,
{
fn setup(self, OperationSetup { source, world }: OperationSetup) -> OperationResult {
world
.get_entity_mut(self.target)
.or_broken()?
.insert(SingleInputStorage::new(source));
world.entity_mut(source).insert((
InjectionStorage::default(),
InputBundle::<(Request, ServiceInstructions<Request, Response, Streams>)>::new(),
SingleTargetStorage::new(self.target),
CleanupContents::new(),
AwaitingCleanup::default(),
FinalizeCleanup::new(Self::finalize_cleanup),
));
Ok(())
}
fn execute(
OperationRequest {
source,
world,
roster,
}: OperationRequest,
) -> OperationResult {
let mut source_mut = world.get_entity_mut(source).or_broken()?;
let Input {
session,
data: (request, service),
} = source_mut
.take_input::<(Request, ServiceInstructions<Request, Response, Streams>)>()?;
let scope = source_mut.get::<ScopeStorage>().or_broken()?.get();
let provider = service.provider();
let instructions = service.instructions().cloned();
let stream_targets = source_mut
.get::<StreamTargetMap>()
.cloned()
.unwrap_or_else(|| StreamTargetMap::default());
let finish = world
.spawn((InputBundle::<Response>::new(), InjectionSource(source)))
.id();
AddExecution::new(None, finish, InjectionFinish::<Response>::new()).apply(world);
let task = world
.spawn((
InputBundle::<Request>::new(),
ProviderStorage(provider),
SingleTargetStorage::new(finish),
ActiveTasksStorage::default(),
DisposeForUnavailableService::new::<Request>(),
ScopeStorage::new(scope),
stream_targets,
))
.id();
let execute = unsafe {
world
.entity_mut(task)
.sneak_input(session, request, false, roster)?
};
if !execute {
None.or_broken()?;
}
let mut storage = world.get_mut::<InjectionStorage>(source).or_broken()?;
storage.list.push(Injected {
session,
task,
finish,
});
dispatch_service(ServiceRequest {
provider,
target: finish,
instructions,
operation: OperationRequest {
source: task,
world,
roster,
},
});
Ok(())
}
fn cleanup(mut clean: OperationCleanup) -> OperationResult {
clean.cleanup_inputs::<(Request, ServiceInstructions<Request, Response, Streams>)>()?;
clean.cleanup_disposals()?;
let OperationCleanup {
source,
cleanup,
world,
roster,
} = clean;
let session = cleanup.session;
let cleanup_id = cleanup.cleanup_id;
let mut storage = world.get_mut::<InjectionStorage>(source).or_broken()?;
let nodes: SmallVec<[Entity; 16]> = storage
.list
.iter()
.filter_map(|injected| {
if injected.session == session {
Some(injected.task)
} else {
None
}
})
.collect();
storage.list.retain(|injected| injected.session != session);
if nodes.is_empty() {
cleanup.notify_cleaned(world, roster)?;
return Ok(());
}
world
.get_mut::<CleanupContents>(source)
.or_broken()?
.add_cleanup(cleanup_id, nodes.clone());
world
.get_mut::<AwaitingCleanup>(source)
.or_broken()?
.map
.insert(cleanup_id, cleanup);
for node in nodes.iter().copied() {
let cleanup = Cleanup {
cleaner: source,
node,
session,
cleanup_id,
};
let clean = OperationCleanup {
source: node,
cleanup,
world,
roster,
};
OperateService::<Request>::cleanup(clean)?;
}
Ok(())
}
fn is_reachable(mut reachability: OperationReachability) -> ReachabilityResult {
if reachability.has_input::<(Request, ServiceInstructions<Request, Response, Streams>)>()? {
return Ok(true);
}
if InjectionStorage::contains_session(&reachability)? {
return Ok(true);
}
SingleInputStorage::is_reachable(&mut reachability)
}
}
impl<Request, Response, Streams> Injection<Request, Response, Streams> {
fn finalize_cleanup(
FinalizeCleanupRequest {
cleanup,
world,
roster,
}: FinalizeCleanupRequest,
) -> OperationResult {
let source = cleanup.cleaner;
let parent_cleanup = world
.get_mut::<AwaitingCleanup>(source)
.or_broken()?
.map
.remove(&cleanup.cleanup_id)
.or_broken()?;
parent_cleanup.notify_cleaned(world, roster)
}
pub(crate) fn new(target: Entity) -> Self {
Self {
target,
_ignore: Default::default(),
}
}
}
#[derive(Component, Default)]
struct InjectionStorage {
list: SmallVec<[Injected; 16]>,
}
#[derive(Component, Default)]
struct AwaitingCleanup {
map: HashMap<Entity, Cleanup>,
}
impl InjectionStorage {
fn contains_session(r: &OperationReachability) -> ReachabilityResult {
Ok(r.world()
.get::<Self>(r.source())
.or_broken()?
.list
.iter()
.any(|injected| injected.session == r.session))
}
}
#[derive(Clone, Copy)]
struct Injected {
session: Entity,
task: Entity,
finish: Entity,
}
#[derive(Component)]
struct InjectionSource(Entity);
struct InjectionFinish<Response> {
_ignore: std::marker::PhantomData<fn(Response)>,
}
impl<Response> InjectionFinish<Response> {
fn new() -> Self {
Self {
_ignore: Default::default(),
}
}
}
impl<Response> Executable for InjectionFinish<Response>
where
Response: 'static + Send + Sync,
{
fn setup(self, _: OperationSetup) -> OperationResult {
Ok(())
}
fn execute(
OperationRequest {
source,
world,
roster,
}: OperationRequest,
) -> OperationResult {
let mut source_mut = world.get_entity_mut(source).or_broken()?;
let Input { session, data } = source_mut.take_input::<Response>()?;
let injector = source_mut.get::<InjectionSource>().or_broken()?.0;
source_mut.despawn();
let mut injector_mut = world.get_entity_mut(injector).or_broken()?;
let target = injector_mut.get::<SingleTargetStorage>().or_broken()?.get();
let mut storage = injector_mut.get_mut::<InjectionStorage>().or_broken()?;
let injected = *storage
.list
.iter()
.find(|injected| injected.finish == source)
.or_broken()?;
storage.list.retain(|injected| injected.finish != source);
let mut task_mut = world.get_entity_mut(injected.task).or_broken()?;
task_mut.transfer_disposals(injector)?;
task_mut.despawn();
world
.get_entity_mut(target)
.or_broken()?
.give_input(session, data, roster)?;
Ok(())
}
}