use crate::{
AddOperation, OperateService, ProvideOnce, Provider, RunCommandsOnWorldExt, StreamOf,
StreamPack,
};
use bevy_app::prelude::App;
use bevy_derive::{Deref, DerefMut};
use bevy_ecs::{
prelude::{Commands, Component, Entity, Event, World},
schedule::ScheduleLabel,
define_label, intern::Interned
};
use std::{any::TypeId, collections::HashSet};
use thiserror::Error as ThisError;
mod async_srv;
pub use async_srv::*;
mod blocking;
pub use blocking::*;
mod continuous;
pub use continuous::*;
mod service_builder;
pub use service_builder::ServiceBuilder;
pub(crate) mod delivery;
pub(crate) use delivery::*;
pub mod discovery;
pub use discovery::*;
pub(crate) mod internal;
pub(crate) use internal::*;
pub mod traits;
pub use traits::*;
mod workflow;
pub(crate) use workflow::*;
#[derive(Debug, PartialEq, Eq)]
pub struct Service<Request, Response, Streams = ()> {
provider: Entity,
instructions: Option<DeliveryInstructions>,
_ignore: std::marker::PhantomData<fn(Request, Response, Streams)>,
}
impl<Req, Res, S> Clone for Service<Req, Res, S> {
fn clone(&self) -> Self {
*self
}
}
impl<Req, Res, S> Copy for Service<Req, Res, S> {}
#[derive(ThisError, Debug, Clone)]
#[error("The original service is missing streams that are needed by the target service")]
pub struct MissingStreamsError {
types: HashSet<TypeId>,
}
impl<Request, Response, Streams> Service<Request, Response, Streams> {
pub fn provider(&self) -> Entity {
self.provider
}
pub fn instructions(&self) -> Option<&DeliveryInstructions> {
self.instructions.as_ref()
}
pub fn instruct(mut self, instructions: impl Into<DeliveryInstructions>) -> Self {
self.instructions = Some(instructions.into());
self
}
pub fn stream_cast<TargetStreams>(
self,
) -> Result<Service<Request, Response, TargetStreams>, MissingStreamsError>
where
Streams: StreamPack,
TargetStreams: StreamPack,
{
let mut original = HashSet::new();
Streams::insert_types(&mut original);
let mut target = HashSet::new();
Streams::insert_types(&mut target);
for t in original {
target.remove(&t);
}
if !target.is_empty() {
return Err(MissingStreamsError { types: target });
}
Ok(Service {
provider: self.provider,
instructions: self.instructions,
_ignore: Default::default(),
})
}
pub fn optional_stream_cast<TargetStreams>(self) -> Service<Request, Response, TargetStreams> {
Service {
provider: self.provider,
instructions: self.instructions,
_ignore: Default::default(),
}
}
fn new(entity: Entity) -> Self {
Self {
provider: entity,
instructions: None,
_ignore: Default::default(),
}
}
}
define_label!(
DeliveryLabel,
DELIVERY_LABEL_INTERNER
);
#[derive(Component, Clone, Copy, Debug, PartialEq, Eq)]
pub struct DeliveryInstructions {
pub(crate) label: DeliveryLabelId,
pub(crate) preempt: bool,
pub(crate) ensure: bool,
}
#[derive(Clone, Copy, Debug, Deref, DerefMut, Hash, PartialEq, Eq)]
pub struct DeliveryLabelId(Interned<dyn DeliveryLabel>);
impl DeliveryInstructions {
pub fn new(label: impl DeliveryLabel) -> Self {
Self {
label: DeliveryLabelId(label.intern()),
preempt: false,
ensure: false,
}
}
pub fn label(&self) -> &DeliveryLabelId {
&self.label
}
pub fn preempt(mut self) -> Self {
self.preempt = true;
self
}
pub fn is_preemptive(&self) -> bool {
self.preempt
}
pub fn with_preemptive(mut self, preempt: bool) -> Self {
self.preempt = preempt;
self
}
pub fn ensure(mut self) -> Self {
self.ensure = true;
self
}
pub fn is_ensured(&self) -> bool {
self.ensure
}
pub fn with_ensured(mut self, ensure: bool) -> Self {
self.ensure = ensure;
self
}
}
impl<L: DeliveryLabel> From<L> for DeliveryInstructions {
fn from(label: L) -> Self {
DeliveryInstructions::new(label)
}
}
pub trait AsDeliveryInstructions {
fn preempt(self) -> DeliveryInstructions;
fn with_preemptive(self, preempt: bool) -> DeliveryInstructions;
fn ensure(self) -> DeliveryInstructions;
fn with_ensured(self, ensure: bool) -> DeliveryInstructions;
}
impl<T: Into<DeliveryInstructions>> AsDeliveryInstructions for T {
fn preempt(self) -> DeliveryInstructions {
self.into().preempt()
}
fn with_preemptive(self, preempt: bool) -> DeliveryInstructions {
self.into().with_preemptive(preempt)
}
fn ensure(self) -> DeliveryInstructions {
self.into().ensure()
}
fn with_ensured(self, ensure: bool) -> DeliveryInstructions {
self.into().with_ensured(ensure)
}
}
pub trait SpawnServicesExt<'w, 's> {
fn spawn_service<M1, M2, B: IntoServiceBuilder<M1, Also = (), Configure = ()>>(
&mut self,
builder: B,
) -> ServiceOf<M1, M2, B>
where
B::Service: IntoService<M2>,
B::Deliver: DeliveryChoice,
B::With: WithEntityCommands,
RequestOf<M1, M2, B>: 'static + Send + Sync,
ResponseOf<M1, M2, B>: 'static + Send + Sync,
StreamsOf<M1, M2, B>: StreamPack;
}
impl<'w, 's> SpawnServicesExt<'w, 's> for Commands<'w, 's> {
fn spawn_service<M1, M2, B: IntoServiceBuilder<M1, Also = (), Configure = ()>>(
&mut self,
builder: B,
) -> ServiceOf<M1, M2, B>
where
B::Service: IntoService<M2>,
B::Deliver: DeliveryChoice,
B::With: WithEntityCommands,
<B::Service as IntoService<M2>>::Request: 'static + Send + Sync,
<B::Service as IntoService<M2>>::Response: 'static + Send + Sync,
<B::Service as IntoService<M2>>::Streams: StreamPack,
{
builder.into_service_builder().spawn_service(self)
}
}
impl<'w, 's> SpawnServicesExt<'w, 's> for World {
fn spawn_service<M1, M2, B: IntoServiceBuilder<M1, Also = (), Configure = ()>>(
&mut self,
builder: B,
) -> ServiceOf<M1, M2, B>
where
B::Service: IntoService<M2>,
B::Deliver: DeliveryChoice,
B::With: WithEntityCommands,
<B::Service as IntoService<M2>>::Request: 'static + Send + Sync,
<B::Service as IntoService<M2>>::Response: 'static + Send + Sync,
<B::Service as IntoService<M2>>::Streams: StreamPack,
{
self.command(move |commands| commands.spawn_service(builder))
}
}
pub trait AddServicesExt {
fn add_service<M1, M2, B: IntoServiceBuilder<M1, Configure = ()>>(
&mut self,
builder: B,
) -> &mut Self
where
B::Service: IntoService<M2>,
B::Deliver: DeliveryChoice,
B::With: WithEntityWorldMut,
B::Also: AlsoAdd<RequestOf<M1, M2, B>, ResponseOf<M1, M2, B>, StreamsOf<M1, M2, B>>,
RequestOf<M1, M2, B>: 'static + Send + Sync,
ResponseOf<M1, M2, B>: 'static + Send + Sync,
StreamsOf<M1, M2, B>: StreamPack,
{
self.spawn_service(builder);
self
}
fn spawn_service<M1, M2, B: IntoServiceBuilder<M1, Configure = ()>>(
&mut self,
builder: B,
) -> ServiceOf<M1, M2, B>
where
B::Service: IntoService<M2>,
B::Deliver: DeliveryChoice,
B::With: WithEntityWorldMut,
B::Also: AlsoAdd<RequestOf<M1, M2, B>, ResponseOf<M1, M2, B>, StreamsOf<M1, M2, B>>,
RequestOf<M1, M2, B>: 'static + Send + Sync,
ResponseOf<M1, M2, B>: 'static + Send + Sync,
StreamsOf<M1, M2, B>: StreamPack;
}
impl AddServicesExt for App {
fn spawn_service<M1, M2, B: IntoServiceBuilder<M1, Configure = ()>>(
&mut self,
builder: B,
) -> ServiceOf<M1, M2, B>
where
B::Service: IntoService<M2>,
B::Deliver: DeliveryChoice,
B::With: WithEntityWorldMut,
B::Also: AlsoAdd<RequestOf<M1, M2, B>, ResponseOf<M1, M2, B>, StreamsOf<M1, M2, B>>,
RequestOf<M1, M2, B>: 'static + Send + Sync,
ResponseOf<M1, M2, B>: 'static + Send + Sync,
StreamsOf<M1, M2, B>: StreamPack,
{
builder.into_service_builder().spawn_app_service(self)
}
}
type RequestOf<M1, M2, B> = <<B as IntoServiceBuilder<M1>>::Service as IntoService<M2>>::Request;
type ResponseOf<M1, M2, B> = <<B as IntoServiceBuilder<M1>>::Service as IntoService<M2>>::Response;
type StreamsOf<M1, M2, B> = <<B as IntoServiceBuilder<M1>>::Service as IntoService<M2>>::Streams;
type ServiceOf<M1, M2, B> =
Service<RequestOf<M1, M2, B>, ResponseOf<M1, M2, B>, StreamsOf<M1, M2, B>>;
pub trait AddContinuousServicesExt {
fn spawn_continuous_service<M1, M2, B: IntoServiceBuilder<M1>>(
&mut self,
schedule: impl ScheduleLabel,
builder: B,
) -> ServiceOfC<M1, M2, B>
where
B::Service: IntoContinuousService<M2>,
B::Deliver: DeliveryChoice,
B::With: WithEntityWorldMut,
B::Also: AlsoAdd<RequestOfC<M1, M2, B>, ResponseOfC<M1, M2, B>, StreamsOfC<M1, M2, B>>,
B::Configure: ConfigureContinuousService,
RequestOfC<M1, M2, B>: 'static + Send + Sync,
ResponseOfC<M1, M2, B>: 'static + Send + Sync,
StreamsOfC<M1, M2, B>: StreamPack;
fn add_continuous_service<M1, M2, B: IntoServiceBuilder<M1>>(
&mut self,
schedule: impl ScheduleLabel,
builder: B,
) -> &mut Self
where
B::Service: IntoContinuousService<M2>,
B::Deliver: DeliveryChoice,
B::With: WithEntityWorldMut,
B::Also: AlsoAdd<RequestOfC<M1, M2, B>, ResponseOfC<M1, M2, B>, StreamsOfC<M1, M2, B>>,
B::Configure: ConfigureContinuousService,
RequestOfC<M1, M2, B>: 'static + Send + Sync,
ResponseOfC<M1, M2, B>: 'static + Send + Sync,
StreamsOfC<M1, M2, B>: StreamPack,
{
self.spawn_continuous_service(schedule, builder);
self
}
fn spawn_event_streaming_service<E>(
&mut self,
schedule: impl ScheduleLabel,
) -> Service<(), (), StreamOf<E>>
where
E: 'static + Event + Send + Sync + Unpin + Clone,
{
self.spawn_continuous_service(schedule, event_streaming_service::<E>)
}
}
impl AddContinuousServicesExt for App {
fn spawn_continuous_service<M1, M2, B: IntoServiceBuilder<M1>>(
&mut self,
schedule: impl ScheduleLabel,
builder: B,
) -> ServiceOfC<M1, M2, B>
where
B::Service: IntoContinuousService<M2>,
B::Deliver: DeliveryChoice,
B::With: WithEntityWorldMut,
B::Also: AlsoAdd<RequestOfC<M1, M2, B>, ResponseOfC<M1, M2, B>, StreamsOfC<M1, M2, B>>,
B::Configure: ConfigureContinuousService,
RequestOfC<M1, M2, B>: 'static + Send + Sync,
ResponseOfC<M1, M2, B>: 'static + Send + Sync,
StreamsOfC<M1, M2, B>: StreamPack,
{
builder
.into_service_builder()
.spawn_continuous_service(schedule, self)
}
}
type RequestOfC<M1, M2, B> =
<<B as IntoServiceBuilder<M1>>::Service as IntoContinuousService<M2>>::Request;
type ResponseOfC<M1, M2, B> =
<<B as IntoServiceBuilder<M1>>::Service as IntoContinuousService<M2>>::Response;
type StreamsOfC<M1, M2, B> =
<<B as IntoServiceBuilder<M1>>::Service as IntoContinuousService<M2>>::Streams;
type ServiceOfC<M1, M2, B> =
Service<RequestOfC<M1, M2, B>, ResponseOfC<M1, M2, B>, StreamsOfC<M1, M2, B>>;
impl<Request, Response, Streams> ProvideOnce for Service<Request, Response, Streams>
where
Request: 'static + Send + Sync,
{
type Request = Request;
type Response = Response;
type Streams = Streams;
fn connect(
self,
scope: Option<Entity>,
source: Entity,
target: Entity,
commands: &mut Commands,
) {
commands.add(AddOperation::new(
scope,
source,
OperateService::new(self, target),
));
}
}
impl<Request, Response, Streams> Provider for Service<Request, Response, Streams> where
Request: 'static + Send + Sync
{
}
#[cfg(test)]
mod tests {
use crate::{prelude::*, testing::*, ServiceMarker};
use bevy_app::{PostUpdate, PreUpdate, Startup};
use bevy_ecs::{
prelude::*,
system::{StaticSystemParam, SystemParam},
world::EntityWorldMut,
};
use smallvec::SmallVec;
use std::future::Future;
#[derive(Component)]
struct TestPeople {
name: String,
age: u64,
}
#[derive(Component)]
struct Multiplier(u64);
#[derive(Resource)]
struct TestSystemRan(bool);
#[derive(Resource)]
struct MyServiceProvider {
#[allow(unused)]
provider: Service<String, u64>,
}
#[test]
fn test_spawn_async_service() {
let mut app = App::new();
app.insert_resource(TestSystemRan(false))
.add_systems(Startup, sys_spawn_async_service)
.add_systems(Update, sys_find_service);
app.update();
assert!(app.world().resource::<TestSystemRan>().0);
}
#[test]
fn test_add_async_service() {
let mut app = App::new();
app.insert_resource(TestSystemRan(false))
.add_service(sys_async_service)
.add_systems(Update, sys_find_service);
app.update();
assert!(app.world().resource::<TestSystemRan>().0);
}
#[test]
fn test_add_async_service_serial() {
let mut app = App::new();
app.insert_resource(TestSystemRan(false))
.add_service(sys_async_service.serial())
.add_systems(Update, sys_find_service);
app.update();
assert!(app.world().resource::<TestSystemRan>().0);
}
#[test]
fn test_add_built_async_service() {
let mut app = App::new();
app.insert_resource(TestSystemRan(false))
.add_service(sys_async_service.also(|app: &mut App, provider| {
app.insert_resource(MyServiceProvider { provider });
}))
.add_systems(Update, sys_use_my_service_provider);
app.update();
assert!(app.world().resource::<TestSystemRan>().0);
}
#[test]
fn test_spawn_blocking_service() {
let mut app = App::new();
app.insert_resource(TestSystemRan(false))
.add_systems(Startup, sys_spawn_blocking_service)
.add_systems(Update, sys_find_service);
app.update();
assert!(app.world().resource::<TestSystemRan>().0);
}
#[test]
fn test_add_simple_blocking_service() {
let mut app = App::new();
app.insert_resource(TestSystemRan(false))
.add_service(sys_blocking_system.into_blocking_service())
.add_systems(Update, sys_find_service);
app.update();
assert!(app.world().resource::<TestSystemRan>().0);
}
#[test]
fn test_add_self_aware_blocking_service() {
let mut app = App::new();
app.insert_resource(TestSystemRan(false))
.add_service(sys_blocking_service.with(|mut entity_mut: EntityWorldMut| {
entity_mut.insert(Multiplier(2));
}))
.add_systems(Update, sys_find_service);
app.update();
assert!(app.world().resource::<TestSystemRan>().0);
}
fn sys_async_service(
In(AsyncService { request, .. }): AsyncServiceInput<String>,
people: Query<&TestPeople>,
) -> impl Future<Output = u64> {
let mut matching_people = Vec::new();
for person in &people {
if person.name == request {
matching_people.push(person.age);
}
}
async move { matching_people.into_iter().fold(0, |sum, age| sum + age) }
}
fn sys_spawn_async_service(mut commands: Commands) {
commands.spawn_service(sys_async_service);
}
fn sys_blocking_service(
In(BlockingService {
request, provider, ..
}): BlockingServiceInput<String>,
people: Query<&TestPeople>,
multipliers: Query<&Multiplier>,
) -> u64 {
let mut sum = 0;
let multiplier = multipliers.get(provider).unwrap().0;
for person in &people {
if person.name == request {
sum += multiplier * person.age;
}
}
sum
}
fn sys_blocking_system(In(name): In<String>, people: Query<&TestPeople>) -> u64 {
let mut sum = 0;
for person in &people {
if person.name == name {
sum += person.age;
}
}
sum
}
fn sys_spawn_blocking_service(mut commands: Commands) {
commands.spawn_service(sys_blocking_service);
}
fn sys_find_service(query: Query<&ServiceMarker<String, u64>>, mut ran: ResMut<TestSystemRan>) {
assert!(!query.is_empty());
ran.0 = true;
}
fn sys_use_my_service_provider(
my_provider: Option<Res<MyServiceProvider>>,
mut ran: ResMut<TestSystemRan>,
) {
assert!(my_provider.is_some());
ran.0 = true;
}
#[derive(SystemParam)]
struct CustomParamA<'w, 's> {
_commands: Commands<'w, 's>,
}
fn service_with_generic<P: SystemParam>(
In(BlockingService { .. }): BlockingServiceInput<()>,
_: StaticSystemParam<P>,
) {
}
#[test]
fn test_generic_service() {
let mut context = TestingContext::minimal_plugins();
context
.app
.add_service(service_with_generic::<CustomParamA>);
}
#[test]
fn test_event_streaming_service() {
let mut context = TestingContext::minimal_plugins();
context.app.add_systems(PreUpdate, flush_impulses());
context.app.add_systems(PostUpdate, flush_impulses());
context.app.add_event::<CustomEvent>();
let event_streamer = context
.app
.spawn_event_streaming_service::<CustomEvent>(Update);
let mut recipient = context.command(|commands| commands.request((), event_streamer).take());
context.app.world_mut().send_event(CustomEvent(0));
context.app.world_mut().send_event(CustomEvent(1));
context.app.world_mut().send_event(CustomEvent(2));
context.run_with_conditions(&mut recipient.response, 1);
let mut result: SmallVec<[_; 3]> = SmallVec::new();
while let Ok(r) = recipient.streams.try_recv() {
result.push(r.0 .0);
}
assert_eq!(&result[..], &[0, 1, 2]);
}
#[derive(Event, Clone, Copy)]
struct CustomEvent(i64);
}