use bevy_ecs::{
prelude::{Commands, Component, Entity, Event, EventReader, In, Local, Query, World},
schedule::IntoSystemConfigs,
system::{IntoSystem, SystemParam},
world::{EntityWorldMut, Command},
};
use bevy_hierarchy::prelude::{BuildWorldChildren, DespawnRecursiveExt};
use smallvec::SmallVec;
use std::collections::HashMap;
use crate::{
dispose_for_despawned_service, emit_disposal, insert_new_order, pop_next_delivery, Blocker,
Broken, ContinuousService, ContinuousServiceInput, DeferredRoster, Deliver, Delivery,
DeliveryOrder, DeliveryUpdate, Disposal, Input, IntoContinuousService, IntoServiceBuilder,
ManageInput, OperationCleanup, OperationError, OperationReachability, OperationRequest,
OperationResult, OperationRoster, OrBroken, ProviderStorage, ReachabilityResult, ScopeStorage,
ServiceBuilder, ServiceBundle, ServiceRequest, ServiceTrait, SingleTargetStorage, StreamOf,
StreamPack, StreamTargetMap, UnhandledErrors,
};
pub use bevy_ecs::schedule::SystemConfigs;
pub struct ContinuousServiceKey<Request, Response, Streams> {
provider: Entity,
_ignore: std::marker::PhantomData<fn(Request, Response, Streams)>,
}
impl<Request, Response, Streams> ContinuousServiceKey<Request, Response, Streams> {
fn new(provider: Entity) -> Self {
Self {
provider,
_ignore: Default::default(),
}
}
}
impl<Request, Response, Streams> ContinuousServiceKey<Request, Response, Streams> {
pub fn provider(&self) -> Entity {
self.provider
}
}
impl<Request, Response, Streams> Clone for ContinuousServiceKey<Request, Response, Streams> {
fn clone(&self) -> Self {
*self
}
}
impl<Request, Response, Streams> Copy for ContinuousServiceKey<Request, Response, Streams> {}
#[derive(SystemParam)]
pub struct ContinuousQuery<'w, 's, Request, Response, Streams = ()>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Streams: StreamPack,
{
queues: Query<'w, 's, &'static ContinuousQueueStorage<Request>>,
streams: Query<'w, 's, StreamTargetQuery<Streams>>,
delivered: Local<'s, HashMap<Entity, DeliveredQueue<Response>>>,
commands: Commands<'w, 's>,
}
impl<'w, 's, Request, Response, Streams> ContinuousQuery<'w, 's, Request, Response, Streams>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Streams: StreamPack,
{
pub fn view<'a>(
&'a self,
key: &ContinuousServiceKey<Request, Response, Streams>,
) -> Option<ContinuousQueueView<'a, Request, Response>> {
self.queues
.get(key.provider())
.ok()
.map(|queue| ContinuousQueueView {
queue,
delivered: self.delivered.get(&key.provider()),
})
}
pub fn get_mut<'a>(
&'a mut self,
key: &ContinuousServiceKey<Request, Response, Streams>,
) -> Option<ContinuousQueueMut<'w, 's, 'a, Request, Response, Streams>> {
self.queues
.get(key.provider())
.ok()
.map(|queue| ContinuousQueueMut {
queue,
provider: key.provider(),
streams: &self.streams,
delivered: self.delivered.entry(key.provider()).or_default(),
commands: &mut self.commands,
})
}
}
struct DeliveredQueue<Response> {
queue: SmallVec<[Delivered<Response>; 16]>,
}
impl<Response> Default for DeliveredQueue<Response> {
fn default() -> Self {
Self {
queue: Default::default(),
}
}
}
struct Delivered<Response> {
index: usize,
response: DeliverResponse<Response>,
}
impl<Response> DeliveredQueue<Response> {
fn contains_key(&self, key: &usize) -> bool {
self.queue.iter().any(|d| d.index == *key)
}
fn len(&self) -> usize {
self.queue.len()
}
fn insert(&mut self, index: usize, response: DeliverResponse<Response>) {
self.queue.push(Delivered { index, response });
}
}
pub struct ContinuousQueueView<'a, Request, Response>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
{
queue: &'a ContinuousQueueStorage<Request>,
delivered: Option<&'a DeliveredQueue<Response>>,
}
impl<'a, Request, Response> ContinuousQueueView<'a, Request, Response>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
{
pub fn iter(&self) -> impl Iterator<Item = OrderView<'_, Request>> {
self.queue
.inner
.iter()
.enumerate()
.filter(|(i, _)| !self.delivered.is_some_and(|d| d.contains_key(i)))
.map(|(index, item)| OrderView { index, order: item })
}
pub fn get(&self, index: usize) -> Option<OrderView<'_, Request>> {
if self.delivered.is_some_and(|d| d.contains_key(&index)) {
return None;
}
self.queue
.inner
.get(index)
.map(|item| OrderView { index, order: item })
}
pub fn len(&self) -> usize {
self.queue.inner.len() - self.delivered.map(|d| d.len()).unwrap_or(0)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
pub struct OrderView<'a, Request> {
order: &'a ContinuousOrder<Request>,
index: usize,
}
impl<'a, Request> OrderView<'a, Request> {
pub fn request(&self) -> &Request {
&self.order.data
}
pub fn session(&self) -> Entity {
self.order.session
}
pub fn source(&self) -> Entity {
self.order.source
}
pub fn index(&self) -> usize {
self.index
}
pub fn id(&self) -> Entity {
self.order.task_id
}
}
pub struct ContinuousQueueMut<'w, 's, 'a, Request, Response, Streams>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Streams: StreamPack,
{
provider: Entity,
queue: &'a ContinuousQueueStorage<Request>,
streams: &'a Query<'w, 's, StreamTargetQuery<Streams>>,
delivered: &'a mut DeliveredQueue<Response>,
commands: &'a mut Commands<'w, 's>,
}
impl<'w, 's, 'a, Request, Response, Streams>
ContinuousQueueMut<'w, 's, 'a, Request, Response, Streams>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Streams: StreamPack,
{
pub fn iter(&self) -> impl Iterator<Item = OrderView<'_, Request>> {
self.queue
.inner
.iter()
.enumerate()
.filter(|(i, _)| !self.delivered.contains_key(i))
.map(|(index, item)| OrderView { order: item, index })
}
pub fn get(&self, index: usize) -> Option<OrderView<'_, Request>> {
if self.delivered.contains_key(&index) {
return None;
}
self.queue
.inner
.get(index)
.map(|item| OrderView { index, order: item })
}
pub fn len(&self) -> usize {
self.queue.inner.len() - self.delivered.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn get_mut<'b>(
&'b mut self,
index: usize,
) -> Option<OrderMut<'w, 's, 'b, Request, Response, Streams>> {
if index >= self.queue.inner.len() {
return None;
}
if self.delivered.contains_key(&index) {
return None;
}
let item = self.queue.inner.get(index).unwrap();
let streams = self.make_stream_buffer(item.source);
Some(OrderMut {
index,
streams: Some(streams),
provider: self.provider,
request: item,
delivered: self.delivered,
commands: self.commands,
})
}
pub fn for_each(&mut self, mut f: impl FnMut(OrderMut<Request, Response, Streams>)) {
for (index, item) in self.queue.inner.iter().enumerate() {
if self.delivered.contains_key(&index) {
continue;
}
let streams = self.make_stream_buffer(item.source);
f(OrderMut {
index,
streams: Some(streams),
provider: self.provider,
request: item,
delivered: self.delivered,
commands: self.commands,
});
}
}
pub fn for_each_out<U>(
&mut self,
mut f: impl FnMut(OrderMut<Request, Response, Streams>) -> U,
) -> SmallVec<[(usize, U); 16]> {
let mut output = SmallVec::new();
for (index, item) in self.queue.inner.iter().enumerate() {
if self.delivered.contains_key(&index) {
continue;
}
let streams = self.make_stream_buffer(item.source);
let u = f(OrderMut {
index,
streams: Some(streams),
provider: self.provider,
request: item,
delivered: self.delivered,
commands: self.commands,
});
output.push((index, u));
}
output
}
fn make_stream_buffer(&self, source: Entity) -> Streams::Buffer {
let (target_indices, target_map) = self.streams.get(source).unwrap();
Streams::make_buffer(target_indices, target_map)
}
}
pub struct OrderMut<'w, 's, 'a, Request, Response, Streams>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Streams: StreamPack,
{
request: &'a ContinuousOrder<Request>,
provider: Entity,
index: usize,
streams: Option<Streams::Buffer>,
delivered: &'a mut DeliveredQueue<Response>,
commands: &'a mut Commands<'w, 's>,
}
impl<'w, 's, 'a, Request, Response, Streams> OrderMut<'w, 's, 'a, Request, Response, Streams>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Streams: StreamPack,
{
pub fn request(&self) -> &Request {
&self.request.data
}
pub fn session(&self) -> Entity {
self.request.session
}
pub fn source(&self) -> Entity {
self.request.source
}
pub fn respond(self, response: Response) {
self.delivered.insert(
self.index,
DeliverResponse {
provider: self.provider,
source: self.request.source,
session: self.request.session,
task_id: self.request.task_id,
data: response,
index: self.index,
},
);
}
pub fn streams(&self) -> &Streams::Buffer {
self.streams.as_ref().unwrap()
}
pub fn index(&self) -> usize {
self.index
}
pub fn id(&self) -> Entity {
self.request.task_id
}
}
impl<'w, 's, 'a, Request, Response, Streams> Drop
for OrderMut<'w, 's, 'a, Request, Response, Streams>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Streams: StreamPack,
{
fn drop(&mut self) {
Streams::defer_buffer(
self.streams.take().unwrap(),
self.request.source,
self.request.session,
self.commands,
);
}
}
impl<'w, 's, Request, Response, Streams> Drop
for ContinuousQuery<'w, 's, Request, Response, Streams>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Streams: StreamPack,
{
fn drop(&mut self) {
let mut responses = SmallVec::new();
for (_, delivered) in self.delivered.drain() {
for deliver in delivered.queue {
responses.push(deliver.response);
}
}
if !responses.is_empty() {
self.commands
.add(DeliverResponses::<Request, Response, Streams> {
responses,
_ignore: Default::default(),
});
}
}
}
#[derive(Component)]
struct ContinuousQueueStorage<Request> {
inner: SmallVec<[ContinuousOrder<Request>; 16]>,
}
impl<Request> std::fmt::Debug for ContinuousQueueStorage<Request> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_list().entries(self.inner.iter()).finish()
}
}
impl<Request> ContinuousQueueStorage<Request> {
fn new() -> Self {
Self {
inner: Default::default(),
}
}
fn contains_session(provider: Entity, session: Entity, world: &World) -> ReachabilityResult
where
Request: 'static + Send + Sync,
{
let Some(queue) = world.get_entity(provider).or_broken()?.get::<Self>() else {
return Ok(false);
};
Ok(queue.inner.iter().any(|order| order.session == session))
}
fn cleanup(provider: Entity, session: Entity, world: &mut World) -> OperationResult
where
Request: 'static + Send + Sync,
{
let mut provider_mut = world.get_entity_mut(provider).or_broken()?;
let Some(mut queue) = provider_mut.get_mut::<Self>() else {
return Ok(());
};
queue.inner.retain(|order| order.session != session);
Ok(())
}
}
#[derive(Component)]
pub(crate) struct ActiveContinuousSessions {
reachability: fn(Entity, Entity, &World) -> ReachabilityResult,
cleanup: fn(Entity, Entity, &mut World) -> OperationResult,
}
impl ActiveContinuousSessions {
fn new<T: 'static + Send + Sync>() -> Self {
Self {
reachability: ContinuousQueueStorage::<T>::contains_session,
cleanup: ContinuousQueueStorage::<T>::cleanup,
}
}
pub(crate) fn contains_session(r: &OperationReachability) -> ReachabilityResult {
let provider = r
.world()
.get::<ProviderStorage>(r.source())
.or_broken()?
.get();
let Some(active) = r.world().get::<ActiveContinuousSessions>(provider) else {
return Ok(false);
};
let f = active.reachability;
f(provider, r.session(), r.world())
}
pub(crate) fn cleanup(clean: &mut OperationCleanup) -> OperationResult {
let source = clean.source;
let provider = clean
.world
.get::<ProviderStorage>(source)
.or_broken()?
.get();
let Some(active) = clean.world.get::<ActiveContinuousSessions>(provider) else {
return Ok(());
};
let f = active.cleanup;
f(provider, clean.cleanup.session, clean.world)
}
}
struct ContinuousOrder<Request> {
data: Request,
session: Entity,
source: Entity,
task_id: Entity,
unblock: Option<Blocker>,
}
impl<Request> std::fmt::Debug for ContinuousOrder<Request> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ContinuousQueueStorage")
.field("session", &self.session)
.field("source", &self.source)
.field("task_id", &self.task_id)
.finish()
}
}
struct DeliverResponses<Request, Response, Streams> {
responses: SmallVec<[DeliverResponse<Response>; 16]>,
_ignore: std::marker::PhantomData<fn(Request, Streams)>,
}
struct DeliverResponse<Response> {
provider: Entity,
source: Entity,
session: Entity,
task_id: Entity,
data: Response,
index: usize,
}
impl<Request, Response, Streams> Command for DeliverResponses<Request, Response, Streams>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Streams: StreamPack,
{
fn apply(self, world: &mut World) {
world.get_resource_or_insert_with(DeferredRoster::default);
world.resource_scope::<DeferredRoster, _>(|world: &mut World, mut deferred| {
let mut remove: SmallVec<[(usize, Entity); 16]> = SmallVec::new();
for DeliverResponse {
provider,
source,
session,
data,
index,
task_id,
} in self.responses
{
remove.push((index, provider));
let r = try_give_response(source, session, data, world, &mut deferred);
if let Err(OperationError::Broken(backtrace)) = r {
world
.get_resource_or_insert_with(UnhandledErrors::default)
.broken
.push(Broken {
node: provider,
backtrace,
});
}
if Streams::has_streams() {
if let Some(scope) = world.get::<ScopeStorage>(source) {
deferred.disposed(scope.get(), source, session);
}
}
if let Some(task_mut) = world.get_entity_mut(task_id) {
task_mut.despawn_recursive();
}
}
remove.sort_by(|(index_a, _), (index_b, _)| index_b.cmp(index_a));
for (index, provider) in remove {
let r = try_retire_request::<Request>(provider, index, world, &mut deferred);
if let Err(OperationError::Broken(backtrace)) = r {
world
.get_resource_or_insert_with(UnhandledErrors::default)
.broken
.push(Broken {
node: provider,
backtrace,
});
}
}
});
}
}
fn try_give_response<Response: 'static + Send + Sync>(
source: Entity,
session: Entity,
data: Response,
world: &mut World,
roster: &mut OperationRoster,
) -> OperationResult {
let target = world.get::<SingleTargetStorage>(source).or_broken()?.get();
world
.get_entity_mut(target)
.or_broken()?
.give_input(session, data, roster)
}
type StreamTargetQuery<Streams> = (
<Streams as StreamPack>::TargetIndexQuery,
Option<&'static StreamTargetMap>,
);
fn try_retire_request<Request: 'static + Send + Sync>(
provider: Entity,
index: usize,
world: &mut World,
roster: &mut OperationRoster,
) -> OperationResult {
let mut storage = world
.get_mut::<ContinuousQueueStorage<Request>>(provider)
.or_broken()?;
let finished_order = storage.inner.remove(index);
if let Some(unblock) = finished_order.unblock {
let f = unblock.serve_next;
f(unblock, world, roster);
}
Ok(())
}
struct ContinuousServiceImpl<Request, Response, Streams> {
_ignore: std::marker::PhantomData<fn(Request, Response, Streams)>,
}
impl<Request, Response, Streams> ServiceTrait for ContinuousServiceImpl<Request, Response, Streams>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Streams: StreamPack,
{
type Request = Request;
type Response = Response;
fn serve(
ServiceRequest {
provider,
target,
instructions,
operation:
OperationRequest {
source,
world,
roster,
},
}: ServiceRequest,
) -> OperationResult {
let mut source_mut = world.get_entity_mut(source).or_broken()?;
let Input {
session,
data: request,
} = source_mut.take_input::<Request>()?;
let task_id = world.spawn(()).set_parent(source).id();
let Some(mut delivery) = world.get_mut::<Delivery<Request>>(provider) else {
dispose_for_despawned_service(provider, world, roster);
return Err(OperationError::NotReady);
};
let update = insert_new_order::<Request>(
delivery.as_mut(),
DeliveryOrder {
source,
session,
task_id,
request,
instructions,
},
);
let (request, blocker) = match update {
DeliveryUpdate::Immediate { blocking, request } => {
let serve_next = serve_next_continuous_request::<Request, Response, Streams>;
let blocker = blocking.map(|label| Blocker {
provider,
source,
session,
label,
serve_next,
});
(request, blocker)
}
DeliveryUpdate::Queued {
cancelled,
stop,
label,
} => {
for cancelled in cancelled {
let disposal = Disposal::supplanted(cancelled.source, source, session);
emit_disposal(cancelled.source, cancelled.session, disposal, world, roster);
if let Some(task_mut) = world.get_entity_mut(cancelled.task_id) {
task_mut.despawn_recursive();
}
}
if let Some(stop) = stop {
let mut queue = world
.get_mut::<ContinuousQueueStorage<Request>>(provider)
.or_broken()?;
let stopped_index = queue
.inner
.iter()
.enumerate()
.find(|(_, r)| r.task_id == stop.task_id)
.map(|(index, _)| index);
if let Some(unblock) = stopped_index.and_then(|i| queue.inner.remove(i).unblock)
{
let f = unblock.serve_next;
f(unblock, world, roster);
} else {
let serve_next =
serve_next_continuous_request::<Request, Response, Streams>;
roster.unblock(Blocker {
provider,
source: stop.source,
session: stop.session,
label,
serve_next,
});
}
let disposal = Disposal::supplanted(stop.source, source, session);
emit_disposal(stop.source, stop.session, disposal, world, roster);
if let Some(task_mut) = world.get_entity_mut(stop.task_id) {
task_mut.despawn_recursive();
}
}
return Ok(());
}
};
serve_continuous_request::<Request>(
request,
blocker,
session,
task_id,
ServiceRequest {
provider,
target,
instructions,
operation: OperationRequest {
source,
world,
roster,
},
},
)
}
}
fn serve_continuous_request<Request>(
request: Request,
blocker: Option<Blocker>,
session: Entity,
task_id: Entity,
ServiceRequest {
provider,
operation: OperationRequest { source, world, .. },
..
}: ServiceRequest,
) -> OperationResult
where
Request: 'static + Send + Sync,
{
let mut queue = world
.get_mut::<ContinuousQueueStorage<Request>>(provider)
.or_broken()?;
queue.inner.push(ContinuousOrder {
data: request,
session,
source,
task_id,
unblock: blocker,
});
Ok(())
}
fn serve_next_continuous_request<Request, Response, Streams>(
unblock: Blocker,
world: &mut World,
roster: &mut OperationRoster,
) where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Streams: StreamPack,
{
let Blocker {
provider, label, ..
} = unblock;
loop {
let Some(Deliver {
request,
task_id,
blocker,
}) = pop_next_delivery::<Request>(
provider,
label,
serve_next_continuous_request::<Request, Response, Streams>,
world,
)
else {
return;
};
let session = blocker.session;
let source = blocker.source;
let Some(target) = world.get::<SingleTargetStorage>(source) else {
continue;
};
let target = target.get();
if serve_continuous_request::<Request>(
request,
Some(blocker),
session,
task_id,
ServiceRequest {
provider,
target,
instructions: None,
operation: OperationRequest {
source,
world,
roster,
},
},
)
.is_err()
{
continue;
}
return;
}
}
impl<Request, Response, Streams, M, Sys> IntoContinuousService<(Request, Response, Streams, M)>
for Sys
where
Sys: IntoSystem<ContinuousService<Request, Response, Streams>, (), M>,
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Streams: StreamPack,
{
type Request = Request;
type Response = Response;
type Streams = Streams;
fn into_system_config(self, entity_mut: &mut EntityWorldMut) -> SystemConfigs {
let provider = entity_mut
.insert((
ContinuousQueueStorage::<Request>::new(),
ActiveContinuousSessions::new::<Request>(),
ServiceBundle::<ContinuousServiceImpl<Request, Response, Streams>>::new(),
))
.id();
let continuous_key = move || ContinuousService {
key: ContinuousServiceKey::new(provider),
};
continuous_key.pipe(self).into_configs()
}
}
pub struct IntoContinuousServiceBuilderMarker<M>(std::marker::PhantomData<fn(M)>);
impl<M, Srv> IntoServiceBuilder<IntoContinuousServiceBuilderMarker<M>> for Srv
where
Srv: IntoContinuousService<M>,
{
type Service = Srv;
type Deliver = ();
type With = ();
type Also = ();
type Configure = ();
fn into_service_builder(self) -> ServiceBuilder<Self::Service, (), (), (), ()> {
ServiceBuilder::new(self)
}
}
pub fn event_streaming_service<E>(
In(ContinuousService { key }): ContinuousServiceInput<(), (), StreamOf<E>>,
mut requests: ContinuousQuery<(), (), StreamOf<E>>,
mut events: EventReader<E>,
) where
E: Event + 'static + Send + Sync + Unpin + Clone,
{
let Some(mut requests) = requests.get_mut(&key) else {
return;
};
for event in events.read() {
requests.for_each(|order| {
order.streams().send(StreamOf(event.clone()));
});
}
}