crossflow/
gate.rs

1/*
2 * Copyright (C) 2024 Open Source Robotics Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16*/
17
18/// Indicate whether a buffer gate should open or close.
19#[derive(Clone, Copy, Debug, PartialEq, Eq)]
20pub enum Gate {
21    /// Open the buffer gate so that listeners (including [join][1] operations)
22    /// can resume getting woken when the value in a buffer changes. They will
23    /// receive a wakeup immediately when a gate switches from closed to open,
24    /// even if none of the data inside the buffer has changed.
25    ///
26    /// [1]: crate::Joinable::join
27    Open,
28    /// Close the buffer gate so that listeners (including [join][1] operations)
29    /// will not be woken up when the data in the buffer gets modified. This
30    /// effectively blocks the workflow nodes that are downstream of the buffer.
31    /// Data will build up in the buffer according to its [`BufferSettings`][2].
32    ///
33    /// [1]: crate::Joinable::join
34    /// [2]: crate::BufferSettings
35    Closed,
36}
37
38impl Gate {
39    /// Is this action supposed to open a gate?
40    pub fn is_open(&self) -> bool {
41        matches!(self, Self::Open)
42    }
43
44    /// Is this action supposed to close a gate?
45    pub fn is_closed(&self) -> bool {
46        matches!(self, Self::Closed)
47    }
48}
49
50/// Pass this as input into a dynamic gate node. Dynamic gate nodes may open or
51/// close a buffer gate based on what action you pass into it. The data will be
52/// passed along as output from the dynamic gate node once the action is
53/// complete. Dynamic gate nodes are created using [`create_gate`][1] or [`then_gate`][2].
54///
55/// If you know that you always want the gate to open or close at a certain
56/// point in the workflow, then you can use static gate nodes instead using
57/// [`create_gate_open`][3], [`create_gate_close`][4], [`then_gate_open`][5],
58/// or [`then_gate_close`][6].
59///
60/// See [`Gate`] to understand what hapens when a gate is open or closed.
61///
62/// [1]: crate::Builder::create_gate
63/// [2]: crate::Chain::then_gate
64/// [3]: crate::Builder::create_gate_open
65/// [4]: crate::Builder::create_gate_close
66/// [5]: crate::Chain::then_gate_open
67/// [6]: crate::Chain::then_gate_close
68pub struct GateRequest<T> {
69    /// Indicate what action the gate should take
70    pub action: Gate,
71    /// Indicate what data should be passed along after the gate action has
72    /// completed.
73    pub data: T,
74}
75
76#[cfg(test)]
77mod tests {
78    use crate::{prelude::*, testing::*};
79
80    #[test]
81    fn test_gate_actions() {
82        let mut context = TestingContext::minimal_plugins();
83
84        let workflow = context.spawn_io_workflow(|scope, builder| {
85            let fork_input = scope.input.fork_clone(builder);
86            let buffer = builder.create_buffer(BufferSettings::keep_all());
87
88            fork_input
89                .clone_chain(builder)
90                .then_gate_close(buffer)
91                .connect(buffer.input_slot());
92
93            builder
94                .listen(buffer)
95                .consume_buffer::<8>()
96                .connect(scope.terminate);
97
98            fork_input
99                .clone_chain(builder)
100                .with_access(buffer)
101                .then(push_value.into_blocking_callback())
102                .then_gate_open(buffer)
103                .unused();
104        });
105
106        let mut promise = context.command(|commands| commands.request(2, workflow).take_response());
107
108        context.run_with_conditions(&mut promise, 1);
109        assert!(promise.take().available().is_some_and(|v| v.len() == 2));
110        assert!(context.no_unhandled_errors());
111    }
112
113    fn push_value(In((value, key)): In<(i32, BufferKey<i32>)>, mut access: BufferAccessMut<i32>) {
114        access.get_mut(&key).unwrap().push(value);
115    }
116}