bevy_impulse/
workflow.rs

1/*
2 * Copyright (C) 2024 Open Source Robotics Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16*/
17
18use bevy_ecs::{
19    prelude::{Commands, World},
20    world::CommandQueue,
21};
22use bevy_hierarchy::BuildChildren;
23
24use crate::{
25    Builder, DeliveryChoice, InputSlot, OperateScope, Output, ScopeEndpoints, ScopeSettingsStorage,
26    Service, ServiceBundle, StreamPack, WorkflowService, WorkflowStorage,
27};
28
29mod internal;
30
31/// Trait to allow workflows to be spawned from a [`Commands`] or a [`World`].
32pub trait SpawnWorkflowExt {
33    /// Spawn a workflow.
34    ///
35    /// * `build` - A function that takes in a [`Scope`] and a [`Builder`] to
36    ///    build the workflow
37    ///
38    /// If you want any particular settings for your workflow, specify that with
39    /// the return value of `build`. Returning nothing `()` will use default
40    /// workflow settings.
41    fn spawn_workflow<Request, Response, Streams, W>(
42        &mut self,
43        build: impl FnOnce(Scope<Request, Response, Streams>, &mut Builder) -> W,
44    ) -> Service<Request, Response, Streams>
45    where
46        Request: 'static + Send + Sync,
47        Response: 'static + Send + Sync,
48        Streams: StreamPack,
49        W: Into<WorkflowSettings>;
50
51    /// Spawn a pure input/output (io) workflow with no streams. This is just a
52    /// convenience wrapper around `spawn_workflow` which usually allows you to
53    /// avoid specifying any generic parameters when there are no streams being
54    /// used.
55    fn spawn_io_workflow<Request, Response, W>(
56        &mut self,
57        build: impl FnOnce(Scope<Request, Response>, &mut Builder) -> W,
58    ) -> Service<Request, Response>
59    where
60        Request: 'static + Send + Sync,
61        Response: 'static + Send + Sync,
62        W: Into<WorkflowSettings>,
63    {
64        self.spawn_workflow::<Request, Response, (), W>(build)
65    }
66}
67
68/// A view of a scope's inputs and outputs from inside of the scope.
69///
70/// It is _not_ a mistake that the [`Scope::input`] field has an [`Output`]
71/// type or that the [`Scope::terminate`] field has an [`InputSlot`] type. From
72/// the perspective inside of the scope, the scope's input would be received as
73/// an output, and the scope's output would be passed into an input slot.
74pub struct Scope<Request, Response, Streams: StreamPack = ()> {
75    /// The data entering the scope. The workflow of the scope must be built
76    /// out from here.
77    pub input: Output<Request>,
78    /// The slot that the final output of the scope must connect into. Once you
79    /// provide an input into this slot, the entire session of the scope will
80    /// wind down. The input will be passed out of the scope once all
81    /// uninterruptible data flows within the scope have finished.
82    pub terminate: InputSlot<Response>,
83    /// The input slot(s) that receive data for the output streams of the scope.
84    /// You can feed data into these input slots at any time during the execution
85    /// of the workflow.
86    pub streams: Streams::StreamInputPack,
87}
88
89/// Settings that describe some aspects of a workflow's behavior.
90#[derive(Default)]
91pub struct WorkflowSettings {
92    delivery: DeliverySettings,
93    scope: ScopeSettings,
94}
95
96impl WorkflowSettings {
97    /// Use default workflow settings.
98    pub fn new() -> Self {
99        Self::default()
100    }
101
102    /// Make a workflow with serial delivery behavior.
103    pub fn serial() -> Self {
104        Self::default().with_delivery(DeliverySettings::Serial)
105    }
106
107    /// Make a workflow with parallel delivery behavior.
108    pub fn parallel() -> Self {
109        Self::default().with_delivery(DeliverySettings::Parallel)
110    }
111
112    pub fn with_delivery(mut self, delivery: DeliverySettings) -> Self {
113        self.delivery = delivery;
114        self
115    }
116    pub fn delivery(&self) -> &DeliverySettings {
117        &self.delivery
118    }
119    pub fn delivery_mut(&mut self) -> &mut DeliverySettings {
120        &mut self.delivery
121    }
122
123    pub fn with_scope(mut self, scope: ScopeSettings) -> Self {
124        self.scope = scope;
125        self
126    }
127    pub fn scope(&self) -> &ScopeSettings {
128        &self.scope
129    }
130    pub fn scope_mut(&mut self) -> &mut ScopeSettings {
131        &mut self.scope
132    }
133
134    /// Transform the settings to be uninterruptible
135    pub fn uninterruptible(mut self) -> Self {
136        self.scope.set_uninterruptible(true);
137        self
138    }
139}
140
141impl From<()> for WorkflowSettings {
142    fn from(_: ()) -> Self {
143        WorkflowSettings::default()
144    }
145}
146
147impl From<ScopeSettings> for WorkflowSettings {
148    fn from(value: ScopeSettings) -> Self {
149        WorkflowSettings::new().with_scope(value)
150    }
151}
152
153impl From<DeliverySettings> for WorkflowSettings {
154    fn from(value: DeliverySettings) -> Self {
155        WorkflowSettings::new().with_delivery(value)
156    }
157}
158
159/// Settings which determine how the workflow delivers its requests: in serial
160/// (handling one request at a time) or in parallel (allowing multiple requests
161/// at a time).
162#[derive(Default)]
163pub enum DeliverySettings {
164    /// This workflow can only run one session at a time. If a new request comes
165    /// in for this workflow when the workflow is already being used, either
166    /// * the new request will be queued until the current request is finished, or
167    /// * the current request will be cancelled ([`Supplanted`][1]) so the new request can begin
168    ///
169    /// [1]: crate::Supplanted
170    Serial,
171
172    /// The workflow can run any number of sessions at a time. If multiple
173    /// requests with the same [`DeliveryLabelId`][1] try to run at the same time,
174    /// those requests will follow the serial delivery behavior.
175    ///
176    /// [1]: crate::DeliveryLabelId
177    #[default]
178    Parallel,
179}
180
181/// Settings which determine how the top-level scope of the workflow behaves.
182#[derive(Default, Clone)]
183pub struct ScopeSettings {
184    /// Should we prevent the scope from being interrupted (e.g. cancelled)?
185    /// False by default, meaning by default scopes can be cancelled or
186    /// interrupted.
187    uninterruptible: bool,
188}
189
190impl ScopeSettings {
191    /// Make a new [`ScopeSettings`] with default values.
192    pub fn new() -> Self {
193        Self::default()
194    }
195
196    /// Make a new [`ScopeSettings`] for an uninterruptible scope.
197    pub fn uninterruptible() -> Self {
198        Self {
199            uninterruptible: true,
200        }
201    }
202
203    /// Check if the scope will be set to uninterruptible.
204    pub fn is_uninterruptible(&self) -> bool {
205        self.uninterruptible
206    }
207
208    /// Set whether the scope will be set to uninterruptible.
209    pub fn set_uninterruptible(&mut self, uninterruptible: bool) {
210        self.uninterruptible = uninterruptible;
211    }
212}
213
214impl From<()> for ScopeSettings {
215    fn from(_: ()) -> Self {
216        ScopeSettings::default()
217    }
218}
219
220impl<'w, 's> SpawnWorkflowExt for Commands<'w, 's> {
221    fn spawn_workflow<Request, Response, Streams, Settings>(
222        &mut self,
223        build: impl FnOnce(Scope<Request, Response, Streams>, &mut Builder) -> Settings,
224    ) -> Service<Request, Response, Streams>
225    where
226        Request: 'static + Send + Sync,
227        Response: 'static + Send + Sync,
228        Streams: StreamPack,
229        Settings: Into<WorkflowSettings>,
230    {
231        let scope_id = self.spawn(()).id();
232        let ScopeEndpoints {
233            terminal,
234            enter_scope,
235            finish_scope_cancel,
236        } = OperateScope::<Request, Response, Streams>::add(None, scope_id, None, self);
237
238        let mut builder = Builder {
239            scope: scope_id,
240            finish_scope_cancel,
241            commands: self,
242        };
243
244        let streams = Streams::spawn_workflow_streams(&mut builder);
245
246        let scope = Scope {
247            input: Output::new(scope_id, enter_scope),
248            terminate: InputSlot::new(scope_id, terminal),
249            streams,
250        };
251
252        let settings: WorkflowSettings = build(scope, &mut builder).into();
253
254        let mut service = self.spawn((
255            ServiceBundle::<WorkflowService<Request, Response, Streams>>::new(),
256            WorkflowStorage::new(scope_id),
257            Streams::StreamAvailableBundle::default(),
258        ));
259        settings
260            .delivery
261            .apply_entity_commands::<Request>(&mut service);
262        let service = service.id();
263        self.entity(scope_id)
264            .insert(ScopeSettingsStorage(settings.scope))
265            .set_parent(service);
266
267        WorkflowService::<Request, Response, Streams>::cast(service)
268    }
269}
270
271impl SpawnWorkflowExt for World {
272    fn spawn_workflow<Request, Response, Streams, W>(
273        &mut self,
274        build: impl FnOnce(Scope<Request, Response, Streams>, &mut Builder) -> W,
275    ) -> Service<Request, Response, Streams>
276    where
277        Request: 'static + Send + Sync,
278        Response: 'static + Send + Sync,
279        Streams: StreamPack,
280        W: Into<WorkflowSettings>,
281    {
282        let mut command_queue = CommandQueue::default();
283        let mut commands = Commands::new(&mut command_queue, self);
284        let service = commands.spawn_workflow(build);
285        command_queue.apply(self);
286        service
287    }
288}
289
290#[cfg(test)]
291mod tests {
292    use crate::{prelude::*, testing::*};
293
294    #[test]
295    fn test_simple_workflows() {
296        let mut context = TestingContext::minimal_plugins();
297
298        let workflow = context.spawn_io_workflow(|scope, builder| {
299            scope
300                .input
301                .chain(builder)
302                .map_block(add)
303                .connect(scope.terminate);
304        });
305
306        let mut promise =
307            context.command(|commands| commands.request((2.0, 2.0), workflow).take_response());
308
309        context.run_with_conditions(&mut promise, Duration::from_secs(1));
310        assert!(promise.take().available().is_some_and(|v| v == 4.0));
311        assert!(context.no_unhandled_errors());
312
313        let workflow = context.spawn_io_workflow(|scope, builder| {
314            let add_node = builder.create_map_block(add);
315            builder.connect(scope.input, add_node.input);
316            builder.connect(add_node.output, scope.terminate);
317        });
318
319        let mut promise =
320            context.command(|commands| commands.request((3.0, 3.0), workflow).take_response());
321
322        context.run_with_conditions(&mut promise, Duration::from_secs(1));
323        assert!(promise.take().available().is_some_and(|v| v == 6.0));
324        assert!(context.no_unhandled_errors());
325    }
326}