crossflow/
node.rs

1/*
2 * Copyright (C) 2024 Open Source Robotics Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16*/
17
18use bevy_ecs::prelude::Entity;
19
20use crate::{
21    AddBranchToForkClone, AddOperation, Builder, Chain, ForkClone, ForkTargetStorage,
22    SingleInputStorage, StreamPack, UnusedTarget,
23};
24
25pub mod dyn_node;
26
27/// A collection of all the inputs and outputs for a node within a workflow.
28#[derive(Debug)]
29#[must_use]
30pub struct Node<Request, Response, Streams: StreamPack = ()> {
31    /// The input slot for the node. Connect outputs into this slot to trigger
32    /// the node.
33    pub input: InputSlot<Request>,
34    /// The final output of the node. Build off of this to handle the response
35    /// that comes out of the node.
36    pub output: Output<Response>,
37    /// The streams that come out of the node. A stream may fire off data any
38    /// number of times while a node is active. Each stream can fire off data
39    /// independently. Once the final output of the node is sent, no more
40    /// stream data will come out.
41    pub streams: Streams::StreamOutputPack,
42}
43
44/// The slot that receives input for a node. When building a workflow, you can
45/// connect the output of a node to this, as long as the types match.
46///
47/// Any number of node outputs can be connected to one input slot.
48#[must_use]
49pub struct InputSlot<Request> {
50    scope: Entity,
51    source: Entity,
52    _ignore: std::marker::PhantomData<fn(Request)>,
53}
54
55impl<T> Clone for InputSlot<T> {
56    fn clone(&self) -> Self {
57        *self
58    }
59}
60
61impl<T> Copy for InputSlot<T> {}
62
63impl<Request> std::fmt::Debug for InputSlot<Request> {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        f.debug_struct(format!("Input<{}>", std::any::type_name::<Request>()).as_str())
66            .field("scope", &self.scope)
67            .field("source", &self.source)
68            .finish()
69    }
70}
71
72impl<Request> InputSlot<Request> {
73    pub fn id(&self) -> Entity {
74        self.source
75    }
76    pub fn scope(&self) -> Entity {
77        self.scope
78    }
79    pub(crate) fn new(scope: Entity, source: Entity) -> Self {
80        Self {
81            scope,
82            source,
83            _ignore: Default::default(),
84        }
85    }
86}
87
88/// The output of a node. This can only be connected to one input slot. If the
89/// `Response` parameter can be cloned then you can call [`Self::fork_clone`] to
90/// transform this into a [`ForkCloneOutput`] and then connect the output into
91/// any number of input slots.
92///
93/// `Output` intentionally does not implement copy or clone because it must only
94/// be consumed exactly once.
95#[must_use]
96pub struct Output<Response> {
97    scope: Entity,
98    target: Entity,
99    _ignore: std::marker::PhantomData<fn(Response)>,
100}
101
102impl<Response> std::fmt::Debug for Output<Response> {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        f.debug_struct(format!("Output<{}>", std::any::type_name::<Response>()).as_str())
105            .field("scope", &self.scope)
106            .field("target", &self.target)
107            .finish()
108    }
109}
110
111impl<Response: 'static + Send + Sync> Output<Response> {
112    /// Create a chain that builds off of this response.
113    pub fn chain<'w, 's, 'a, 'b>(
114        self,
115        builder: &'b mut Builder<'w, 's, 'a>,
116    ) -> Chain<'w, 's, 'a, 'b, Response>
117    where
118        Response: 'static + Send + Sync,
119    {
120        assert_eq!(self.scope, builder.scope());
121        Chain::new(self.target, builder)
122    }
123
124    /// Create a node that will fork the output along multiple branches, giving
125    /// a clone of the output to each branch.
126    pub fn fork_clone(self, builder: &mut Builder) -> ForkCloneOutput<Response>
127    where
128        Response: Clone,
129    {
130        assert_eq!(self.scope, builder.scope());
131        builder.commands.queue(AddOperation::new(
132            Some(self.scope),
133            self.target,
134            ForkClone::<Response>::new(ForkTargetStorage::new()),
135        ));
136        ForkCloneOutput::new(self.scope, self.target)
137    }
138
139    /// Get the entity that this output will be sent to.
140    pub fn id(&self) -> Entity {
141        self.target
142    }
143
144    /// Get the scope that this output exists inside of.
145    pub fn scope(&self) -> Entity {
146        self.scope
147    }
148
149    pub(crate) fn new(scope: Entity, target: Entity) -> Self {
150        Self {
151            scope,
152            target,
153            _ignore: Default::default(),
154        }
155    }
156}
157
158/// The output of a cloning fork node. Use [`Self::clone_output`] to create a
159/// cloned output that you can connect to an input slot.
160#[must_use]
161pub struct ForkCloneOutput<Response> {
162    scope: Entity,
163    source: Entity,
164    _ignore: std::marker::PhantomData<fn(Response)>,
165}
166
167impl<Response: 'static + Send + Sync> ForkCloneOutput<Response> {
168    pub fn clone_output(&self, builder: &mut Builder) -> Output<Response> {
169        assert_eq!(self.scope, builder.scope());
170        let target = builder
171            .commands
172            .spawn((SingleInputStorage::new(self.id()), UnusedTarget))
173            .id();
174        builder.commands.queue(AddBranchToForkClone {
175            source: self.source,
176            target,
177        });
178
179        Output::new(self.scope, target)
180    }
181
182    pub fn clone_chain<'w, 's, 'a, 'b>(
183        &self,
184        builder: &'b mut Builder<'w, 's, 'a>,
185    ) -> Chain<'w, 's, 'a, 'b, Response> {
186        let output = self.clone_output(builder);
187        output.chain(builder)
188    }
189
190    pub fn id(&self) -> Entity {
191        self.source
192    }
193
194    pub fn scope(&self) -> Entity {
195        self.scope
196    }
197
198    pub(crate) fn new(scope: Entity, source: Entity) -> Self {
199        Self {
200            scope,
201            source,
202            _ignore: Default::default(),
203        }
204    }
205}
206
207/// The output of a fork result operation. Each output can be connected to one
208/// input slot.
209pub struct ForkResultOutput<T, E> {
210    /// This output will be sent if an [`Ok`] is sent into the fork.
211    pub ok: Output<T>,
212    /// This output will be sent if an [`Err`] is sent into the fork.
213    pub err: Output<E>,
214}
215
216/// The output of a fork option operation. Each output can be connected to one
217/// input slot.
218pub struct ForkOptionOutput<T> {
219    /// This output will be sent if a [`Some`] is sent into the fork.
220    pub some: Output<T>,
221    /// This output will be sent if a [`None`] is sent into the fork.
222    pub none: Output<()>,
223}