crossflow/
map.rs

1/*
2 * Copyright (C) 2024 Open Source Robotics Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16*/
17
18use crate::{
19    AddOperation, AsyncMap, BlockingMap, OperateAsyncMap, OperateBlockingMap, ProvideOnce,
20    Provider, Sendish, StreamPack,
21};
22
23use bevy_ecs::prelude::{Commands, Entity};
24
25use std::future::Future;
26
27/// A newtype to indicate that the map definition is given directly by F.
28#[derive(Clone, Copy)]
29pub struct MapDef<F>(F);
30
31/// Maps are used to perform simple transformations of data which do not require
32/// any direct [`World`][4] access or any persistent system state. They are more
33/// efficient than [`Service`][5] or [`Callback`][6] when suitable.
34///
35/// There are two kinds of functions that can be used as maps:
36/// * **Blocking**: A regular function with a single input and any output.
37///   All system execution will be blocked while this is running, similar to flushing [`Commands`].
38/// * **Async**: A function that takes a single input and returns something that implements the [`Future`] trait.
39///   The [`Future`] will be executed in the [`AsyncComputeTaskPool`][7] unless the `single_threaded_async` feature is active.
40///
41/// If you want to insert a map into a workflow or series, you can pass your function into one of the following,
42/// depending on whether you want blocking or async:
43/// * **Blocking**
44///   * [`Chain::map_block`](crate::Chain::map_block)
45///   * [`Chain::map_block_node`](crate::Chain::map_block_node)
46///   * [`Builder::create_map_block`](crate::Builder::create_map_block)
47///   * [`Series::map_block`](crate::Series::map_block)
48/// * **Async**
49///   * [`Chain::map_async`](crate::Chain::map_async)
50///   * [`Chain::map_async_node`](crate::Chain::map_async_node)
51///   * [`Builder::create_map_async`](crate::Builder::create_map_async)
52///   * [`Series::map_async`](crate::Series::map_async)
53///
54/// If you want your map to emit streams, you will need your input argument to be
55/// [`BlockingMap`] or [`AsyncMap`]. In that case you need to use one of the following:
56///
57/// * [`Chain::map`](crate::Chain::map)
58/// * [`Builder::create_map`](crate::Builder::create_map)
59/// * [`Series::map`](crate::Series::map)
60///
61/// You can also use [`.as_map`][1], [`.into_blocking_map`][2], [`.into_async_map`][3]
62/// to convert a suitable function into a [`Provider`] that can be passed into
63/// any function that accepts a [`Provider`].
64///
65/// [1]: AsMap::as_map
66/// [2]: IntoBlockingMap::into_blocking_map
67/// [3]: IntoAsyncMap::into_async_map
68/// [4]: bevy_ecs::prelude::World
69/// [5]: crate::Service
70/// [6]: crate::Callback
71/// [7]: bevy_tasks::AsyncComputeTaskPool
72#[allow(clippy::wrong_self_convention)]
73pub trait AsMap<M> {
74    type MapType;
75    /// Convert an [`FnMut`] that takes a single input of [`BlockingMap`] or
76    /// [`AsyncMap`] into a [`Provider`].
77    fn as_map(self) -> Self::MapType;
78}
79
80pub type RequestOfMap<M, F> = <<F as AsMap<M>>::MapType as ProvideOnce>::Request;
81pub type ResponseOfMap<M, F> = <<F as AsMap<M>>::MapType as ProvideOnce>::Response;
82pub type StreamsOfMap<M, F> = <<F as AsMap<M>>::MapType as ProvideOnce>::Streams;
83
84/// A trait that all different ways of defining a Blocking Map must funnel into.
85pub(crate) trait CallBlockingMap<Request, Response, Streams: StreamPack> {
86    fn call(&mut self, input: BlockingMap<Request, Streams>) -> Response;
87}
88
89impl<F, Request, Response, Streams> CallBlockingMap<Request, Response, Streams> for MapDef<F>
90where
91    F: FnMut(BlockingMap<Request, Streams>) -> Response + 'static + Send + Sync,
92    Request: 'static + Send + Sync,
93    Response: 'static + Send + Sync,
94    Streams: StreamPack,
95{
96    fn call(&mut self, request: BlockingMap<Request, Streams>) -> Response {
97        (self.0)(request)
98    }
99}
100
101/// A newtype to mark the definition of a BlockingMap.
102///
103/// Maps cannot contain Bevy Systems; they can only contain objects that
104/// implement [`FnMut`].
105pub struct BlockingMapDef<Def, Request, Response, Streams> {
106    def: Def,
107    _ignore: std::marker::PhantomData<fn(Request, Response, Streams)>,
108}
109
110impl<Def: Clone, Request, Response, Streams> Clone
111    for BlockingMapDef<Def, Request, Response, Streams>
112{
113    fn clone(&self) -> Self {
114        Self {
115            def: self.def.clone(),
116            _ignore: Default::default(),
117        }
118    }
119}
120
121impl<Def, Request, Response, Streams> ProvideOnce
122    for BlockingMapDef<Def, Request, Response, Streams>
123where
124    Def: CallBlockingMap<Request, Response, Streams> + 'static + Send + Sync,
125    Request: 'static + Send + Sync,
126    Response: 'static + Send + Sync,
127    Streams: StreamPack,
128{
129    type Request = Request;
130    type Response = Response;
131    type Streams = Streams;
132
133    fn connect(
134        self,
135        scope: Option<Entity>,
136        source: Entity,
137        target: Entity,
138        commands: &mut Commands,
139    ) {
140        commands.queue(AddOperation::new(
141            scope,
142            source,
143            OperateBlockingMap::new(target, self.def),
144        ));
145    }
146}
147
148impl<Def, Request, Response, Streams> Provider for BlockingMapDef<Def, Request, Response, Streams>
149where
150    Def: CallBlockingMap<Request, Response, Streams> + 'static + Send + Sync,
151    Request: 'static + Send + Sync,
152    Response: 'static + Send + Sync,
153    Streams: StreamPack,
154{
155}
156
157pub struct BlockingMapMarker;
158
159impl<F, Request, Response, Streams> AsMap<(Request, Response, Streams, BlockingMapMarker)> for F
160where
161    F: FnMut(BlockingMap<Request, Streams>) -> Response + 'static + Send + Sync,
162    Request: 'static + Send + Sync,
163    Response: 'static + Send + Sync,
164    Streams: StreamPack,
165{
166    type MapType = BlockingMapDef<MapDef<F>, Request, Response, Streams>;
167    fn as_map(self) -> Self::MapType {
168        BlockingMapDef {
169            def: MapDef(self),
170            _ignore: Default::default(),
171        }
172    }
173}
174
175/// Convert any [`FnMut`] into a [`BlockingMapDef`].
176pub trait IntoBlockingMap<M> {
177    type MapType;
178    fn into_blocking_map(self) -> Self::MapType;
179}
180
181impl<F, Request, Response, Streams> IntoBlockingMap<(Request, Response, Streams)> for F
182where
183    F: FnMut(Request) -> Response + 'static + Send + Sync,
184    Request: 'static + Send + Sync,
185    Response: 'static + Send + Sync,
186    Streams: StreamPack,
187{
188    type MapType = BlockingMapDef<BlockingMapAdapter<F>, Request, Response, Streams>;
189    /// Convert any single input function into a [`Provider`] whose request type
190    /// is the single input type of the function and whose response type is the
191    /// return type of the function.
192    fn into_blocking_map(self) -> Self::MapType {
193        BlockingMapDef {
194            def: BlockingMapAdapter(self),
195            _ignore: Default::default(),
196        }
197    }
198}
199
200pub struct BlockingMapAdapter<F>(F);
201
202impl<F, Request, Response> CallBlockingMap<Request, Response, ()> for BlockingMapAdapter<F>
203where
204    F: FnMut(Request) -> Response,
205{
206    fn call(&mut self, BlockingMap { request, .. }: BlockingMap<Request, ()>) -> Response {
207        (self.0)(request)
208    }
209}
210
211pub(crate) trait CallAsyncMap<Request, Task, Streams: StreamPack> {
212    fn call(&mut self, input: AsyncMap<Request, Streams>) -> Task;
213}
214
215impl<F, Request, Task, Streams> CallAsyncMap<Request, Task, Streams> for MapDef<F>
216where
217    F: FnMut(AsyncMap<Request, Streams>) -> Task + 'static + Send + Sync,
218    Request: 'static + Send + Sync,
219    Task: 'static + Send,
220    Streams: StreamPack,
221{
222    fn call(&mut self, input: AsyncMap<Request, Streams>) -> Task {
223        (self.0)(input)
224    }
225}
226
227pub struct AsyncMapMarker;
228
229impl<F, Request, Task, Streams> AsMap<(Request, Task, Streams, AsyncMapMarker)> for F
230where
231    F: FnMut(AsyncMap<Request, Streams>) -> Task + 'static + Send + Sync,
232    Task: Future + 'static + Sendish,
233    Request: 'static + Send + Sync,
234    Task::Output: 'static + Send + Sync,
235    Streams: StreamPack,
236{
237    type MapType = AsyncMapDef<MapDef<F>, Request, Task, Streams>;
238    fn as_map(self) -> Self::MapType {
239        AsyncMapDef {
240            def: MapDef(self),
241            _ignore: Default::default(),
242        }
243    }
244}
245
246/// A newtype to mark the definition of an AsyncMap.
247///
248/// Maps cannot contain Bevy Systems; they can only contain objects that
249/// implement [`FnMut`].
250pub struct AsyncMapDef<Def, Request, Task, Streams> {
251    def: Def,
252    _ignore: std::marker::PhantomData<fn(Request, Task, Streams)>,
253}
254
255impl<Def: Clone, Request, Task, Streams> Clone for AsyncMapDef<Def, Request, Task, Streams> {
256    fn clone(&self) -> Self {
257        Self {
258            def: self.def.clone(),
259            _ignore: Default::default(),
260        }
261    }
262}
263
264impl<Def, Request, Task, Streams> ProvideOnce for AsyncMapDef<Def, Request, Task, Streams>
265where
266    Def: CallAsyncMap<Request, Task, Streams> + 'static + Send + Sync,
267    Task: Future + 'static + Sendish,
268    Request: 'static + Send + Sync,
269    Task::Output: 'static + Send + Sync,
270    Streams: StreamPack,
271{
272    type Request = Request;
273    type Response = Task::Output;
274    type Streams = Streams;
275
276    fn connect(
277        self,
278        scope: Option<Entity>,
279        source: Entity,
280        target: Entity,
281        commands: &mut Commands,
282    ) {
283        commands.queue(AddOperation::new(
284            scope,
285            source,
286            OperateAsyncMap::new(target, self.def),
287        ));
288    }
289}
290
291impl<Def, Request, Task, Streams> Provider for AsyncMapDef<Def, Request, Task, Streams>
292where
293    Def: CallAsyncMap<Request, Task, Streams> + 'static + Send + Sync,
294    Task: Future + 'static + Sendish,
295    Request: 'static + Send + Sync,
296    Task::Output: 'static + Send + Sync,
297    Streams: StreamPack,
298{
299}
300
301pub trait IntoAsyncMap<M> {
302    type MapType;
303    fn into_async_map(self) -> Self::MapType;
304}
305
306impl<F, Request, Task> IntoAsyncMap<(Request, Task)> for F
307where
308    F: FnMut(Request) -> Task + 'static + Send + Sync,
309    Request: 'static + Send + Sync,
310    Task: Future + 'static + Sendish,
311    Task::Output: 'static + Send + Sync,
312{
313    type MapType = AsyncMapDef<AsyncMapAdapter<F>, Request, Task, ()>;
314    /// Convert any
315    fn into_async_map(self) -> Self::MapType {
316        AsyncMapDef {
317            def: AsyncMapAdapter(self),
318            _ignore: Default::default(),
319        }
320    }
321}
322
323pub struct AsyncMapAdapter<F>(F);
324
325impl<F, Request, Task> CallAsyncMap<Request, Task, ()> for AsyncMapAdapter<F>
326where
327    F: FnMut(Request) -> Task + 'static + Send + Sync,
328    Task: Future + 'static + Sendish,
329{
330    fn call(&mut self, AsyncMap { request, .. }: AsyncMap<Request, ()>) -> Task {
331        (self.0)(request)
332    }
333}