crossflow/
lib.rs

1/*
2 * Copyright (C) 2023 Open Source Robotics Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16*/
17
18//! ![sense-think-act workflow](https://raw.githubusercontent.com/open-rmf/crossflow/main/assets/figures/sense-think-act_workflow.svg)
19//!
20//! Crossflow is a workflow execution library built on the [Bevy](https://bevyengine.org)
21//! game engine that allows you to transform [bevy systems](https://bevyengine.org/learn/quick-start/getting-started/ecs/)
22//! into services and workflows that can be used for reactive service-oriented
23//! programming.
24//!
25//! Crossflow can be used to make general-purpose async programming easier (especially
26//! inside of bevy applications) using the native Rust API, or it can be used for
27//! visual graph-based programming by defining workflows with JSON (using an editor
28//! or generating with a planner) and then loading and executing those workflows at runtime.
29//!
30//! ## Services
31//!
32//! One primitive element of reactive programming is a [service](https://en.wikipedia.org/wiki/Service_(systems_architecture)).
33//! In crossflow, a [`Service`] is a bevy system that is associated with an
34//! entity and can be created using [`Commands::spawn_service`](SpawnServicesExt::spawn_service)
35//! or [`App::add_service`](AddServicesExt::add_service).
36//!
37//! When you spawn a service you will immediately receive a [`Service`] object
38//! which references the newly spawned service. If you do not want to hang onto the [`Service`]
39//! object, you can find previously spawned services later using the [`ServiceDiscovery`]
40//! system parameter.
41//!
42//! Sometimes [`Service`] is not quite the right fit for your use case, so crossflow
43//! offers a generalization of services callled [`Provider`] which has some
44//! more options for defining a reactive element.
45//!
46//! ## Workflows
47//!
48//! For complex async workflows, a single bevy system may not be sufficient.
49//! You can instead build workflows using [`.spawn_workflow`](SpawnWorkflowExt::spawn_workflow)
50//! on [`Commands`](bevy_ecs::prelude::Commands) or [`World`](bevy_ecs::prelude::World).
51//! A workflow lets you create a graph of [nodes](Node) where each node is a
52//! [service](Service) (or more generally a [provider](Provider)) with an input,
53//! an output, and possibly streams.
54//!
55//! There are various operations that can be performed between nodes, such as
56//! forking and joining. These operations are built using [`Chain`].
57//!
58//! When you spawn your workflow, you will receive a [`Service`] object that
59//! lets you use the workflow as if it's an ordinary service.
60//!
61//! ## Series
62//!
63//! Services and workflows are reusable building blocks for creating a reactive
64//! application. In order to actually run them, call [`Commands::request`](RequestExt::request)
65//! which will provide you with a [`Series`]. A series is a one-time-use
66//! reaction to a request which you can chain to subsequent reactions using
67//! [`Series::then`]. Any series that you create will only run exactly once.
68//!
69//! Once you've finished building your series, use [`Series::detach`] to let it
70//! run freely, or use [`Series::take`] to get a [`Recipient`] of the final
71//! result.
72
73mod async_execution;
74pub use async_execution::Sendish;
75
76pub mod buffer;
77pub use buffer::*;
78
79pub mod re_exports;
80
81pub mod builder;
82pub use builder::*;
83
84pub mod callback;
85pub use callback::*;
86
87pub mod cancel;
88pub use cancel::*;
89
90pub mod chain;
91pub use chain::*;
92
93pub mod channel;
94pub use channel::*;
95
96#[cfg(feature = "diagram")]
97pub mod diagram;
98#[cfg(feature = "diagram")]
99pub use diagram::*;
100
101pub mod disposal;
102pub use disposal::*;
103
104pub mod errors;
105pub use errors::*;
106
107pub mod flush;
108pub use flush::*;
109
110pub mod gate;
111pub use gate::*;
112
113pub mod series;
114pub use series::*;
115
116pub mod input;
117pub use input::*;
118
119pub mod map;
120pub use map::*;
121
122pub mod map_once;
123pub use map_once::*;
124
125pub mod node;
126pub use node::*;
127
128pub mod operation;
129pub use operation::*;
130
131pub mod promise;
132pub use promise::*;
133
134pub mod provider;
135pub use provider::*;
136
137pub mod request;
138pub use request::*;
139
140pub mod service;
141pub use service::*;
142
143pub mod stream;
144pub use stream::*;
145
146pub mod workflow;
147pub use workflow::*;
148
149pub mod testing;
150
151pub(crate) mod utils;
152#[allow(unused)]
153pub(crate) use utils::*;
154
155#[cfg(feature = "trace")]
156pub mod trace;
157#[cfg(feature = "trace")]
158pub use trace::*;
159
160#[cfg(feature = "trace")]
161pub const fn trace_supported() -> bool {
162    true
163}
164
165#[cfg(not(feature = "trace"))]
166pub const fn trace_supported() -> bool {
167    false
168}
169
170pub mod trim;
171pub use trim::*;
172
173pub mod type_info;
174pub use type_info::*;
175
176use bevy_app::prelude::{App, Plugin, Update};
177use bevy_ecs::prelude::{Entity, In};
178
179extern crate self as crossflow;
180
181/// Use `BlockingService` to indicate that your system is a blocking [`Service`].
182///
183/// A blocking service will have exclusive world access while it runs, which
184/// means no other system will be able to run simultaneously. Each service is
185/// associated with its own unique entity which can be used to store state or
186/// configuration parameters.
187///
188/// Some services might need to know what entity is providing the service, e.g.
189/// if the service provider is configured with additional components that need
190/// to be queried when a request comes in. For that you can check the `provider`
191/// field of `BlockingService`:
192///
193/// ```
194/// use bevy_ecs::prelude::*;
195/// use crossflow::prelude::*;
196///
197/// #[derive(Component, Resource)]
198/// struct Precision(i32);
199///
200/// fn rounding_service(
201///     In(BlockingService{request, provider, ..}): BlockingServiceInput<f64>,
202///     service_precision: Query<&Precision>,
203///     global_precision: Res<Precision>,
204/// ) -> f64 {
205///     let precision = service_precision.get(provider).unwrap_or(&*global_precision).0;
206///     (request * 10_f64.powi(precision)).floor() * 10_f64.powi(-precision)
207/// }
208/// ```
209#[non_exhaustive]
210pub struct BlockingService<Request, Streams: StreamPack = ()> {
211    /// The input data of the request
212    pub request: Request,
213    /// The buffer to hold stream output data until the function is finished
214    pub streams: Streams::StreamBuffers,
215    /// The entity providing the service
216    pub provider: Entity,
217    /// The node in a workflow or series that asked for the service
218    pub source: Entity,
219    /// The unique session ID for the workflow
220    pub session: Entity,
221}
222
223/// Use this to reduce bracket noise when you need `In<BlockingService<R>>`.
224pub type BlockingServiceInput<Request, Streams = ()> = In<BlockingService<Request, Streams>>;
225
226/// Use `AsyncService` to indicate that your system is an async [`Service`]. Being
227/// async means it must return a [`Future<Output=Response>`](std::future::Future)
228/// which will be processed by a task pool.
229///
230/// This comes with a [`Channel`] that allows your Future to interact with Bevy's
231/// ECS asynchronously while it is polled from inside the task pool.
232#[non_exhaustive]
233pub struct AsyncService<Request, Streams: StreamPack = ()> {
234    /// The input data of the request
235    pub request: Request,
236    /// Stream channels that will let you send stream information. This will
237    /// usually be a [`StreamChannel`] or a (possibly nested) tuple of
238    /// `StreamChannel`s, whichever matches the [`StreamPack`] description.
239    pub streams: Streams::StreamChannels,
240    /// The channel that allows querying and syncing with the world while the
241    /// service runs asynchronously.
242    pub channel: Channel,
243    /// The entity providing the service
244    pub provider: Entity,
245    /// The node in a workflow or series that asked for the service
246    pub source: Entity,
247    /// The unique session ID for the workflow
248    pub session: Entity,
249}
250
251/// Use this to reduce backet noise when you need `In<`[`AsyncService<R, S>`]`>`.
252pub type AsyncServiceInput<Request, Streams = ()> = In<AsyncService<Request, Streams>>;
253
254/// Use `ContinuousService` to indicate that your system is a [`Service`] that
255/// runs incrementally inside of a schedule with each update of the Bevy ECS.
256pub struct ContinuousService<Request, Response, Streams: StreamPack = ()> {
257    /// Pass this into a [`ContinuousQuery`] to access the ongoing requests for
258    /// this service. While accessing the ongoing requests, you will also be
259    /// able to send streams and responses for the requests.
260    pub key: ContinuousServiceKey<Request, Response, Streams>,
261}
262
263/// Use this to reduce the bracket noise when you need `In<`[`ContinuousService`]`>`.
264pub type ContinuousServiceInput<Request, Response, Streams = ()> =
265    In<ContinuousService<Request, Response, Streams>>;
266
267/// Use BlockingCallback to indicate that your system is meant to define a
268/// blocking [`Callback`]. Callbacks are different from services because they are
269/// not associated with any entity.
270///
271/// Alternatively any Bevy system with an input of `In<Request>` can be converted
272/// into a blocking callback by applying
273/// [`.into_blocking_callback()`](crate::IntoBlockingCallback).
274#[non_exhaustive]
275pub struct BlockingCallback<Request, Streams: StreamPack = ()> {
276    /// The input data of the request
277    pub request: Request,
278    /// The buffer to hold stream output data until the function is finished
279    pub streams: Streams::StreamBuffers,
280    /// The node in a workflow or series that asked for the callback
281    pub source: Entity,
282    /// The unique session ID for the workflow
283    pub session: Entity,
284}
285
286/// Use this to reduce bracket noise when you need `In<`[`BlockingCallback<R, S>`]`>`.
287pub type BlockingCallbackInput<Request, Streams = ()> = In<BlockingCallback<Request, Streams>>;
288
289/// Use AsyncCallback to indicate that your system or function is meant to define
290/// an async [`Callback`]. An async callback is not associated with any entity,
291/// and it must return a [`Future<Output=Response>`](std::future::Future) that
292/// will be polled by the async task pool.
293#[non_exhaustive]
294pub struct AsyncCallback<Request, Streams: StreamPack = ()> {
295    /// The input data of the request
296    pub request: Request,
297    /// Stream channels that will let you send stream information. This will
298    /// usually be a [`StreamChannel`] or a (possibly nested) tuple of
299    /// `StreamChannel`s, whichever matches the [`StreamPack`] description.
300    pub streams: Streams::StreamChannels,
301    /// The channel that allows querying and syncing with the world while the
302    /// callback executes asynchronously.
303    pub channel: Channel,
304    /// The node in a workflow or series that asked for the callback
305    pub source: Entity,
306    /// The unique session ID for the workflow
307    pub session: Entity,
308}
309
310/// Use this to reduce bracket noise when you need `In<`[`AsyncCallback<R, S>`]`>`.
311pub type AsyncCallbackInput<Request, Streams = ()> = In<AsyncCallback<Request, Streams>>;
312
313/// Use `BlockingMap`` to indicate that your function is a blocking map. A map
314/// is not associated with any entity, and it cannot be a Bevy System. These
315/// restrictions allow them to be processed more efficiently.
316#[non_exhaustive]
317pub struct BlockingMap<Request, Streams: StreamPack = ()> {
318    /// The input data of the request
319    pub request: Request,
320    /// The buffer to hold stream output data until the function is finished
321    pub streams: Streams::StreamBuffers,
322    /// The node in a workflow or series that asked for the callback
323    pub source: Entity,
324    /// The unique session ID for the workflow
325    pub session: Entity,
326}
327
328/// Use AsyncMap to indicate that your function is an async map. A Map is not
329/// associated with any entity, and it cannot be a Bevy System. These
330/// restrictions allow them to be processed more efficiently.
331///
332/// An async map must return a [`Future<Output=Response>`](std::future::Future)
333/// that will be polled by the async task pool.
334#[non_exhaustive]
335pub struct AsyncMap<Request, Streams: StreamPack = ()> {
336    /// The input data of the request
337    pub request: Request,
338    /// Stream channels that will let you send stream information. This will
339    /// usually be a [`StreamChannel`] or a (possibly nested) tuple of
340    /// `StreamChannel`s, whichever matches the [`StreamPack`] description.
341    pub streams: Streams::StreamChannels,
342    /// The channel that allows querying and syncing with the world while the
343    /// map executes asynchronously.
344    pub channel: Channel,
345    /// The node in a workflow or series that asked for the callback
346    pub source: Entity,
347    /// The unique session ID for the workflow
348    pub session: Entity,
349}
350
351/// This plugin simply adds [`flush_execution()`] to the [`Update`] schedule of your
352/// applicatation. For more fine-grained control you can call `flush_execution`
353/// yourself and configure its relationship to other systems as you see fit.
354///
355/// If you do not have at least one usage of `flush_execution()` somewhere in
356/// your application then workflows will not work.
357#[derive(Default)]
358pub struct CrossflowPlugin {}
359
360impl Plugin for CrossflowPlugin {
361    fn build(&self, app: &mut App) {
362        app.add_systems(Update, flush_execution());
363
364        #[cfg(feature = "trace")]
365        {
366            app.add_event::<OperationStarted>();
367        }
368    }
369}
370
371/// This plugin adds [`CrossflowPlugin`] plus a few more that allows plus a few
372/// more plugins that allow create a sufficient but minimal app for executing
373/// workflows.
374///
375/// Use [`CrossflowPlugin`] if you want to use crossflow as a library within an
376/// existing app. Use [`CrossflowExecutorApp`] if you want to set up an app from
377/// scratch whose main purpose is to execute workflows.
378#[derive(Default)]
379pub struct CrossflowExecutorApp {}
380
381impl Plugin for CrossflowExecutorApp {
382    fn build(&self, app: &mut App) {
383        app.add_plugins((
384            CrossflowPlugin::default(),
385            bevy_app::TaskPoolPlugin::default(),
386            bevy_diagnostic::FrameCountPlugin,
387            bevy_app::ScheduleRunnerPlugin::default(),
388        ));
389    }
390}
391
392pub mod prelude {
393    pub use crate::{
394        buffer::{
395            Accessible, Accessor, AnyBuffer, AnyBufferKey, AnyBufferMut, AnyBufferWorldAccess,
396            AnyMessageBox, AsAnyBuffer, Buffer, BufferAccess, BufferAccessMut, BufferGateAccess,
397            BufferGateAccessMut, BufferKey, BufferMap, BufferMapLayout, BufferSettings,
398            BufferWorldAccess, Bufferable, Buffering, IncompatibleLayout, IterBufferable, Joinable,
399            Joined, RetentionPolicy,
400        },
401        builder::Builder,
402        callback::{AsCallback, Callback, IntoAsyncCallback, IntoBlockingCallback},
403        chain::{Chain, ForkCloneBuilder, UnzipBuilder, Unzippable},
404        flush::flush_execution,
405        map::{AsMap, IntoAsyncMap, IntoBlockingMap},
406        map_once::{AsMapOnce, IntoAsyncMapOnce, IntoBlockingMapOnce},
407        node::{ForkCloneOutput, InputSlot, Node, Output},
408        promise::{Promise, PromiseState},
409        provider::{ProvideOnce, Provider},
410        request::{RequestExt, RunCommandsOnWorldExt},
411        series::{Recipient, Series},
412        service::{
413            traits::*, AddContinuousServicesExt, AddServicesExt, AsDeliveryInstructions,
414            DeliveryInstructions, DeliveryLabel, DeliveryLabelId, IntoAsyncService,
415            IntoBlockingService, Service, ServiceDiscovery, ServiceInstructions, SpawnServicesExt,
416        },
417        stream::{DynamicallyNamedStream, NamedValue, Stream, StreamFilter, StreamOf, StreamPack},
418        trim::{TrimBranch, TrimPoint},
419        workflow::{DeliverySettings, Scope, ScopeSettings, SpawnWorkflowExt, WorkflowSettings},
420        AsyncCallback, AsyncCallbackInput, AsyncMap, AsyncService, AsyncServiceInput,
421        BlockingCallback, BlockingCallbackInput, BlockingMap, BlockingService,
422        BlockingServiceInput, ContinuousQuery, ContinuousService, ContinuousServiceInput,
423        CrossflowExecutorApp, CrossflowPlugin,
424    };
425
426    pub use bevy_ecs::prelude::{In, World};
427
428    #[cfg(feature = "diagram")]
429    pub use crate::{
430        buffer::{
431            JsonBuffer, JsonBufferKey, JsonBufferMut, JsonBufferView, JsonBufferWorldAccess,
432            JsonMessage,
433        },
434        diagram::{Diagram, DiagramElementRegistry, DiagramError, NodeBuilderOptions, Section},
435    };
436
437    pub use futures::FutureExt;
438}
439
440pub use bevy_app;
441pub use bevy_derive;
442pub use bevy_diagnostic;
443pub use bevy_ecs;
444pub use bevy_tasks;
445pub use bevy_time;
446pub use bevy_utils;