use bevy_ecs::prelude::{Bundle, Component, Entity, Resource, World};
use tokio::sync::mpsc::{
unbounded_channel, UnboundedReceiver as TokioReceiver, UnboundedSender as TokioSender,
};
use anyhow::anyhow;
use std::{collections::VecDeque, sync::Arc};
use crate::{
dispose_for_despawned_service, DeliveryInstructions, MiscellaneousFailure, OperationError,
OperationRequest, OperationRoster, PendingOperationRequest, ServiceTrait, UnhandledErrors,
};
pub struct ServiceRequest<'a> {
pub(crate) provider: Entity,
pub(crate) target: Entity,
pub(crate) instructions: Option<DeliveryInstructions>,
pub(crate) operation: OperationRequest<'a>,
}
#[derive(Clone, Copy)]
pub struct PendingServiceRequest {
pub provider: Entity,
pub target: Entity,
pub instructions: Option<DeliveryInstructions>,
pub operation: PendingOperationRequest,
}
impl PendingServiceRequest {
fn activate<'a>(
self,
world: &'a mut World,
roster: &'a mut OperationRoster,
) -> ServiceRequest<'a> {
ServiceRequest {
provider: self.provider,
target: self.target,
instructions: self.instructions,
operation: self.operation.activate(world, roster),
}
}
}
#[derive(Component)]
pub(crate) struct ServiceMarker<Request, Response> {
_ignore: std::marker::PhantomData<fn(Request, Response)>,
}
impl<Request, Response> Default for ServiceMarker<Request, Response> {
fn default() -> Self {
Self {
_ignore: Default::default(),
}
}
}
#[derive(Component)]
pub(crate) struct ServiceHook {
pub(crate) trigger: fn(ServiceRequest),
pub(crate) lifecycle: Option<ServiceLifecycle>,
}
impl ServiceHook {
pub(crate) fn new(callback: fn(ServiceRequest)) -> Self {
Self {
trigger: callback,
lifecycle: None,
}
}
}
pub(crate) struct ServiceLifecycle {
entity: Entity,
sender: TokioSender<Entity>,
}
impl ServiceLifecycle {
pub(crate) fn new(entity: Entity, sender: TokioSender<Entity>) -> Self {
Self { entity, sender }
}
}
impl Drop for ServiceLifecycle {
fn drop(&mut self) {
self.sender.send(self.entity).ok();
}
}
#[derive(Resource)]
pub(crate) struct ServiceLifecycleChannel {
pub(crate) sender: TokioSender<Entity>,
pub(crate) receiver: TokioReceiver<Entity>,
}
impl ServiceLifecycleChannel {
pub(crate) fn new() -> Self {
let (sender, receiver) = unbounded_channel();
Self { sender, receiver }
}
}
impl Default for ServiceLifecycleChannel {
fn default() -> Self {
Self::new()
}
}
#[derive(Bundle)]
pub(crate) struct ServiceBundle<Srv: ServiceTrait + 'static + Send + Sync> {
hook: ServiceHook,
marker: ServiceMarker<Srv::Request, Srv::Response>,
}
impl<Srv: ServiceTrait + 'static + Send + Sync> ServiceBundle<Srv> {
pub(crate) fn new() -> Self {
Self {
hook: ServiceHook::new(service_hook::<Srv>),
marker: Default::default(),
}
}
}
fn service_hook<Srv: ServiceTrait>(
ServiceRequest {
provider,
target,
instructions,
operation:
OperationRequest {
source,
world,
roster,
},
}: ServiceRequest,
) {
match Srv::serve(ServiceRequest {
provider,
target,
instructions,
operation: OperationRequest {
source,
world,
roster,
},
}) {
Ok(()) | Err(OperationError::NotReady) => {
}
Err(OperationError::Broken(backtrace)) => {
world.get_resource_or_insert_with(UnhandledErrors::default)
.miscellaneous
.push(MiscellaneousFailure {
error: Arc::new(anyhow!(
"Failed to serve: provider {provider:?}, source {source:?}, target {target:?}",
)),
backtrace,
});
}
}
}
#[derive(Resource)]
struct ServiceQueue {
is_delivering: bool,
queue: VecDeque<PendingServiceRequest>,
}
impl ServiceQueue {
fn new() -> Self {
Self {
is_delivering: false,
queue: VecDeque::new(),
}
}
}
pub(crate) fn dispatch_service(
ServiceRequest {
provider,
target,
instructions,
operation:
OperationRequest {
source,
world,
roster,
},
}: ServiceRequest,
) {
let pending = PendingServiceRequest {
provider,
target,
instructions,
operation: PendingOperationRequest { source },
};
let mut service_queue = world.get_resource_or_insert_with(ServiceQueue::new);
service_queue.queue.push_back(pending);
if service_queue.is_delivering {
return;
}
service_queue.is_delivering = true;
while let Some(pending) = world.resource_mut::<ServiceQueue>().queue.pop_back() {
let Some(hook) = world.get::<ServiceHook>(pending.provider) else {
dispose_for_despawned_service(provider, world, roster);
continue;
};
(hook.trigger)(pending.activate(world, roster));
}
world.resource_mut::<ServiceQueue>().is_delivering = false;
}