1use bevy_ecs::{
19 prelude::{Commands, World},
20 world::CommandQueue,
21};
22use bevy_hierarchy::BuildChildren;
23
24use crate::{
25 Builder, DeliveryChoice, InputSlot, OperateScope, Output, ScopeEndpoints, ScopeSettingsStorage,
26 Service, ServiceBundle, StreamPack, WorkflowService, WorkflowStorage,
27};
28
29mod internal;
30
31pub trait SpawnWorkflowExt {
33 fn spawn_workflow<Request, Response, Streams, W>(
42 &mut self,
43 build: impl FnOnce(Scope<Request, Response, Streams>, &mut Builder) -> W,
44 ) -> Service<Request, Response, Streams>
45 where
46 Request: 'static + Send + Sync,
47 Response: 'static + Send + Sync,
48 Streams: StreamPack,
49 W: Into<WorkflowSettings>;
50
51 fn spawn_io_workflow<Request, Response, W>(
56 &mut self,
57 build: impl FnOnce(Scope<Request, Response>, &mut Builder) -> W,
58 ) -> Service<Request, Response>
59 where
60 Request: 'static + Send + Sync,
61 Response: 'static + Send + Sync,
62 W: Into<WorkflowSettings>,
63 {
64 self.spawn_workflow::<Request, Response, (), W>(build)
65 }
66}
67
68pub struct Scope<Request, Response, Streams: StreamPack = ()> {
75 pub input: Output<Request>,
78 pub terminate: InputSlot<Response>,
83 pub streams: Streams::StreamInputPack,
87}
88
89#[derive(Default)]
91pub struct WorkflowSettings {
92 delivery: DeliverySettings,
93 scope: ScopeSettings,
94}
95
96impl WorkflowSettings {
97 pub fn new() -> Self {
99 Self::default()
100 }
101
102 pub fn serial() -> Self {
104 Self::default().with_delivery(DeliverySettings::Serial)
105 }
106
107 pub fn parallel() -> Self {
109 Self::default().with_delivery(DeliverySettings::Parallel)
110 }
111
112 pub fn with_delivery(mut self, delivery: DeliverySettings) -> Self {
113 self.delivery = delivery;
114 self
115 }
116 pub fn delivery(&self) -> &DeliverySettings {
117 &self.delivery
118 }
119 pub fn delivery_mut(&mut self) -> &mut DeliverySettings {
120 &mut self.delivery
121 }
122
123 pub fn with_scope(mut self, scope: ScopeSettings) -> Self {
124 self.scope = scope;
125 self
126 }
127 pub fn scope(&self) -> &ScopeSettings {
128 &self.scope
129 }
130 pub fn scope_mut(&mut self) -> &mut ScopeSettings {
131 &mut self.scope
132 }
133
134 pub fn uninterruptible(mut self) -> Self {
136 self.scope.set_uninterruptible(true);
137 self
138 }
139}
140
141impl From<()> for WorkflowSettings {
142 fn from(_: ()) -> Self {
143 WorkflowSettings::default()
144 }
145}
146
147impl From<ScopeSettings> for WorkflowSettings {
148 fn from(value: ScopeSettings) -> Self {
149 WorkflowSettings::new().with_scope(value)
150 }
151}
152
153impl From<DeliverySettings> for WorkflowSettings {
154 fn from(value: DeliverySettings) -> Self {
155 WorkflowSettings::new().with_delivery(value)
156 }
157}
158
159#[derive(Default)]
163pub enum DeliverySettings {
164 Serial,
171
172 #[default]
178 Parallel,
179}
180
181#[derive(Default, Clone)]
183pub struct ScopeSettings {
184 uninterruptible: bool,
188}
189
190impl ScopeSettings {
191 pub fn new() -> Self {
193 Self::default()
194 }
195
196 pub fn uninterruptible() -> Self {
198 Self {
199 uninterruptible: true,
200 }
201 }
202
203 pub fn is_uninterruptible(&self) -> bool {
205 self.uninterruptible
206 }
207
208 pub fn set_uninterruptible(&mut self, uninterruptible: bool) {
210 self.uninterruptible = uninterruptible;
211 }
212}
213
214impl From<()> for ScopeSettings {
215 fn from(_: ()) -> Self {
216 ScopeSettings::default()
217 }
218}
219
220impl<'w, 's> SpawnWorkflowExt for Commands<'w, 's> {
221 fn spawn_workflow<Request, Response, Streams, Settings>(
222 &mut self,
223 build: impl FnOnce(Scope<Request, Response, Streams>, &mut Builder) -> Settings,
224 ) -> Service<Request, Response, Streams>
225 where
226 Request: 'static + Send + Sync,
227 Response: 'static + Send + Sync,
228 Streams: StreamPack,
229 Settings: Into<WorkflowSettings>,
230 {
231 let scope_id = self.spawn(()).id();
232 let ScopeEndpoints {
233 terminal,
234 enter_scope,
235 finish_scope_cancel,
236 } = OperateScope::<Request, Response, Streams>::add(None, scope_id, None, self);
237
238 let mut builder = Builder {
239 scope: scope_id,
240 finish_scope_cancel,
241 commands: self,
242 };
243
244 let streams = Streams::spawn_workflow_streams(&mut builder);
245
246 let scope = Scope {
247 input: Output::new(scope_id, enter_scope),
248 terminate: InputSlot::new(scope_id, terminal),
249 streams,
250 };
251
252 let settings: WorkflowSettings = build(scope, &mut builder).into();
253
254 let mut service = self.spawn((
255 ServiceBundle::<WorkflowService<Request, Response, Streams>>::new(),
256 WorkflowStorage::new(scope_id),
257 Streams::StreamAvailableBundle::default(),
258 ));
259 settings
260 .delivery
261 .apply_entity_commands::<Request>(&mut service);
262 let service = service.id();
263 self.entity(scope_id)
264 .insert(ScopeSettingsStorage(settings.scope))
265 .set_parent(service);
266
267 WorkflowService::<Request, Response, Streams>::cast(service)
268 }
269}
270
271impl SpawnWorkflowExt for World {
272 fn spawn_workflow<Request, Response, Streams, W>(
273 &mut self,
274 build: impl FnOnce(Scope<Request, Response, Streams>, &mut Builder) -> W,
275 ) -> Service<Request, Response, Streams>
276 where
277 Request: 'static + Send + Sync,
278 Response: 'static + Send + Sync,
279 Streams: StreamPack,
280 W: Into<WorkflowSettings>,
281 {
282 let mut command_queue = CommandQueue::default();
283 let mut commands = Commands::new(&mut command_queue, self);
284 let service = commands.spawn_workflow(build);
285 command_queue.apply(self);
286 service
287 }
288}
289
290#[cfg(test)]
291mod tests {
292 use crate::{prelude::*, testing::*};
293
294 #[test]
295 fn test_simple_workflows() {
296 let mut context = TestingContext::minimal_plugins();
297
298 let workflow = context.spawn_io_workflow(|scope, builder| {
299 scope
300 .input
301 .chain(builder)
302 .map_block(add)
303 .connect(scope.terminate);
304 });
305
306 let mut promise =
307 context.command(|commands| commands.request((2.0, 2.0), workflow).take_response());
308
309 context.run_with_conditions(&mut promise, Duration::from_secs(1));
310 assert!(promise.take().available().is_some_and(|v| v == 4.0));
311 assert!(context.no_unhandled_errors());
312
313 let workflow = context.spawn_io_workflow(|scope, builder| {
314 let add_node = builder.create_map_block(add);
315 builder.connect(scope.input, add_node.input);
316 builder.connect(add_node.output, scope.terminate);
317 });
318
319 let mut promise =
320 context.command(|commands| commands.request((3.0, 3.0), workflow).take_response());
321
322 context.run_with_conditions(&mut promise, Duration::from_secs(1));
323 assert!(promise.take().available().is_some_and(|v| v == 6.0));
324 assert!(context.no_unhandled_errors());
325 }
326}