use bevy_ecs::{
prelude::{Commands, World},
world::CommandQueue,
};
use bevy_hierarchy::BuildChildren;
use crate::{
Builder, DeliveryChoice, InputSlot, OperateScope, Output, ScopeEndpoints, ScopeSettingsStorage,
Service, ServiceBundle, StreamPack, WorkflowService, WorkflowStorage,
};
mod internal;
pub trait SpawnWorkflowExt {
fn spawn_workflow<Request, Response, Streams, W>(
&mut self,
build: impl FnOnce(Scope<Request, Response, Streams>, &mut Builder) -> W,
) -> Service<Request, Response, Streams>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Streams: StreamPack,
W: Into<WorkflowSettings>;
fn spawn_io_workflow<Request, Response, W>(
&mut self,
build: impl FnOnce(Scope<Request, Response>, &mut Builder) -> W,
) -> Service<Request, Response>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
W: Into<WorkflowSettings>,
{
self.spawn_workflow::<Request, Response, (), W>(build)
}
}
pub struct Scope<Request, Response, Streams: StreamPack = ()> {
pub input: Output<Request>,
pub terminate: InputSlot<Response>,
pub streams: Streams::StreamInputPack,
}
#[derive(Default)]
pub struct WorkflowSettings {
delivery: DeliverySettings,
scope: ScopeSettings,
}
impl WorkflowSettings {
pub fn new() -> Self {
Self::default()
}
pub fn serial() -> Self {
Self::default().with_delivery(DeliverySettings::Serial)
}
pub fn parallel() -> Self {
Self::default().with_delivery(DeliverySettings::Parallel)
}
pub fn with_delivery(mut self, delivery: DeliverySettings) -> Self {
self.delivery = delivery;
self
}
pub fn delivery(&self) -> &DeliverySettings {
&self.delivery
}
pub fn delivery_mut(&mut self) -> &mut DeliverySettings {
&mut self.delivery
}
pub fn with_scope(mut self, scope: ScopeSettings) -> Self {
self.scope = scope;
self
}
pub fn scope(&self) -> &ScopeSettings {
&self.scope
}
pub fn scope_mut(&mut self) -> &mut ScopeSettings {
&mut self.scope
}
pub fn uninterruptible(mut self) -> Self {
self.scope.set_uninterruptible(true);
self
}
}
impl From<()> for WorkflowSettings {
fn from(_: ()) -> Self {
WorkflowSettings::default()
}
}
impl From<ScopeSettings> for WorkflowSettings {
fn from(value: ScopeSettings) -> Self {
WorkflowSettings::new().with_scope(value)
}
}
impl From<DeliverySettings> for WorkflowSettings {
fn from(value: DeliverySettings) -> Self {
WorkflowSettings::new().with_delivery(value)
}
}
#[derive(Default)]
pub enum DeliverySettings {
Serial,
#[default]
Parallel,
}
#[derive(Default, Clone)]
pub struct ScopeSettings {
uninterruptible: bool,
}
impl ScopeSettings {
pub fn new() -> Self {
Self::default()
}
pub fn uninterruptible() -> Self {
Self {
uninterruptible: true,
}
}
pub fn is_uninterruptible(&self) -> bool {
self.uninterruptible
}
pub fn set_uninterruptible(&mut self, uninterruptible: bool) {
self.uninterruptible = uninterruptible;
}
}
impl From<()> for ScopeSettings {
fn from(_: ()) -> Self {
ScopeSettings::default()
}
}
impl<'w, 's> SpawnWorkflowExt for Commands<'w, 's> {
fn spawn_workflow<Request, Response, Streams, Settings>(
&mut self,
build: impl FnOnce(Scope<Request, Response, Streams>, &mut Builder) -> Settings,
) -> Service<Request, Response, Streams>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Streams: StreamPack,
Settings: Into<WorkflowSettings>,
{
let scope_id = self.spawn(()).id();
let ScopeEndpoints {
terminal,
enter_scope,
finish_scope_cancel,
} = OperateScope::<Request, Response, Streams>::add(None, scope_id, None, self);
let mut builder = Builder {
scope: scope_id,
finish_scope_cancel,
commands: self,
};
let streams = Streams::spawn_workflow_streams(&mut builder);
let scope = Scope {
input: Output::new(scope_id, enter_scope),
terminate: InputSlot::new(scope_id, terminal),
streams,
};
let settings: WorkflowSettings = build(scope, &mut builder).into();
let mut service = self.spawn((
ServiceBundle::<WorkflowService<Request, Response, Streams>>::new(),
WorkflowStorage::new(scope_id),
Streams::StreamAvailableBundle::default(),
));
settings
.delivery
.apply_entity_commands::<Request>(&mut service);
let service = service.id();
self.entity(scope_id)
.insert(ScopeSettingsStorage(settings.scope))
.set_parent(service);
WorkflowService::<Request, Response, Streams>::cast(service)
}
}
impl SpawnWorkflowExt for World {
fn spawn_workflow<Request, Response, Streams, W>(
&mut self,
build: impl FnOnce(Scope<Request, Response, Streams>, &mut Builder) -> W,
) -> Service<Request, Response, Streams>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Streams: StreamPack,
W: Into<WorkflowSettings>,
{
let mut command_queue = CommandQueue::default();
let mut commands = Commands::new(&mut command_queue, self);
let service = commands.spawn_workflow(build);
command_queue.apply(self);
service
}
}
#[cfg(test)]
mod tests {
use crate::{prelude::*, testing::*};
#[test]
fn test_simple_workflows() {
let mut context = TestingContext::minimal_plugins();
let workflow = context.spawn_io_workflow(|scope, builder| {
scope
.input
.chain(builder)
.map_block(add)
.connect(scope.terminate);
});
let mut promise =
context.command(|commands| commands.request((2.0, 2.0), workflow).take_response());
context.run_with_conditions(&mut promise, Duration::from_secs(1));
assert!(promise.take().available().is_some_and(|v| v == 4.0));
assert!(context.no_unhandled_errors());
let workflow = context.spawn_io_workflow(|scope, builder| {
let add_node = builder.create_map_block(add);
builder.connect(scope.input, add_node.input);
builder.connect(add_node.output, scope.terminate);
});
let mut promise =
context.command(|commands| commands.request((3.0, 3.0), workflow).take_response());
context.run_with_conditions(&mut promise, Duration::from_secs(1));
assert!(promise.take().available().is_some_and(|v| v == 6.0));
assert!(context.no_unhandled_errors());
}
}