dora_message/integration_testing_format.rs
1//! Use these types for integration testing nodes.
2
3use std::{
4 collections::{BTreeMap, BTreeSet},
5 path::PathBuf,
6};
7
8use crate::{
9 config::Input,
10 descriptor::EnvValue,
11 id::{DataId, NodeId},
12 metadata::MetadataParameters,
13};
14
15/// Defines the input data and events for integration testing a node.
16///
17/// Most of the fields are similar to the fields defined in the [`Node`](crate::descriptor::Node)
18/// struct, which is used to define nodes in a dataflow YAML file.
19///
20/// For integration testing, the most important field is the [`events`](Self::events) field, which
21/// specifies the events that should be sent to the node during the test.
22#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
23pub struct IntegrationTestInput {
24 /// Unique node identifier. Must not contain `/` characters.
25 ///
26 /// Node IDs can be arbitrary strings with the following limitations:
27 ///
28 /// - They must not contain any `/` characters (slashes).
29 /// - We do not recommend using whitespace characters (e.g. spaces) in IDs
30 ///
31 /// Each node must have an ID field.
32 ///
33 /// ## Example
34 ///
35 /// ```yaml
36 /// nodes:
37 /// - id: camera_node
38 /// - id: some_other_node
39 /// ```
40 pub id: NodeId,
41
42 /// Human-readable node name for documentation.
43 ///
44 /// This optional field can be used to define a more descriptive name in addition to a short
45 /// [`id`](Self::id).
46 ///
47 /// ## Example
48 ///
49 /// ```yaml
50 /// nodes:
51 /// - id: camera_node
52 /// name: "Camera Input Handler"
53 pub name: Option<String>,
54
55 /// Detailed description of the node's functionality.
56 ///
57 /// ## Example
58 ///
59 /// ```yaml
60 /// nodes:
61 /// - id: camera_node
62 /// description: "Captures video frames from webcam"
63 /// ```
64 pub description: Option<String>,
65
66 /// Command-line arguments passed to the executable.
67 ///
68 /// The command-line arguments that should be passed to the executable/script specified in `path`.
69 /// The arguments should be separated by space.
70 /// This field is optional and defaults to an empty argument list.
71 ///
72 /// ## Example
73 /// ```yaml
74 /// nodes:
75 /// - id: example
76 /// path: example-node
77 /// args: -v --some-flag foo
78 /// ```
79 #[serde(default, skip_serializing_if = "Option::is_none")]
80 pub args: Option<String>,
81
82 /// Environment variables for node builds and execution.
83 ///
84 /// Key-value map of environment variables that should be set for both the
85 /// [`build`](Self::build) operation and the node execution (i.e. when the node is spawned
86 /// through [`path`](Self::path)).
87 ///
88 /// Supports strings, numbers, and booleans.
89 ///
90 /// ## Example
91 ///
92 /// ```yaml
93 /// nodes:
94 /// - id: example-node
95 /// path: path/to/node
96 /// env:
97 /// DEBUG: true
98 /// PORT: 8080
99 /// API_KEY: "secret-key"
100 /// ```
101 pub env: Option<BTreeMap<String, EnvValue>>,
102
103 /// Output data identifiers produced by this node.
104 ///
105 /// List of output identifiers that the node sends.
106 /// Must contain all `output_id` values that the node uses when sending output, e.g. through the
107 /// [`send_output`](https://docs.rs/dora-node-api/latest/dora_node_api/struct.DoraNode.html#method.send_output)
108 /// function.
109 ///
110 /// ## Example
111 ///
112 /// ```yaml
113 /// nodes:
114 /// - id: example-node
115 /// outputs:
116 /// - processed_image
117 /// - metadata
118 /// ```
119 #[serde(default)]
120 pub outputs: BTreeSet<DataId>,
121
122 /// Input data connections from other nodes.
123 ///
124 /// Defines the inputs that this node is subscribing to.
125 ///
126 /// The `inputs` field should be a key-value map of the following format:
127 ///
128 /// `input_id: source_node_id/source_node_output_id`
129 ///
130 /// The components are defined as follows:
131 ///
132 /// - `input_id` is the local identifier that should be used for this input.
133 ///
134 /// This will map to the `id` field of
135 /// [`Event::Input`](https://docs.rs/dora-node-api/latest/dora_node_api/enum.Event.html#variant.Input)
136 /// events sent to the node event loop.
137 /// - `source_node_id` should be the `id` field of the node that sends the output that we want
138 /// to subscribe to
139 /// - `source_node_output_id` should be the identifier of the output that that we want
140 /// to subscribe to
141 ///
142 /// ## Example
143 ///
144 /// ```yaml
145 /// nodes:
146 /// - id: example-node
147 /// outputs:
148 /// - one
149 /// - two
150 /// - id: receiver
151 /// inputs:
152 /// my_input: example-node/two
153 /// ```
154 #[serde(default)]
155 pub inputs: BTreeMap<DataId, Input>,
156
157 /// Redirect stdout/stderr to a data output.
158 ///
159 /// This field can be used to send all stdout and stderr output of the node as a Dora output.
160 /// Each output line is sent as a separate message.
161 ///
162 ///
163 /// ## Example
164 ///
165 /// ```yaml
166 /// nodes:
167 /// - id: example
168 /// send_stdout_as: stdout_output
169 /// - id: logger
170 /// inputs:
171 /// example_output: example/stdout_output
172 /// ```
173 #[serde(skip_serializing_if = "Option::is_none")]
174 pub send_stdout_as: Option<String>,
175
176 /// List of incoming events for the integration test.
177 ///
178 /// The node event stream will yield these events during the test. Once the list is exhausted,
179 /// the event stream will close itself.
180 pub events: Vec<TimedIncomingEvent>,
181}
182
183impl IntegrationTestInput {
184 pub fn new(id: NodeId, events: Vec<TimedIncomingEvent>) -> Self {
185 Self {
186 id,
187 name: None,
188 description: None,
189 args: None,
190 env: None,
191 outputs: BTreeSet::new(),
192 inputs: BTreeMap::new(),
193 send_stdout_as: None,
194 events,
195 }
196 }
197}
198
199/// An incoming event with a time offset.
200#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
201pub struct TimedIncomingEvent {
202 /// The time offset in seconds from the start of the node.
203 pub time_offset_secs: f64,
204 /// The incoming event.
205 #[serde(flatten)]
206 pub event: IncomingEvent,
207}
208
209/// An event that is sent to a node during an integration test.
210///
211/// This struct is very similar to the `Event` enum used during normal node operation.
212#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
213#[serde(tag = "type")]
214pub enum IncomingEvent {
215 Stop,
216 Input {
217 id: DataId,
218 metadata: Option<MetadataParameters>,
219 #[serde(flatten)]
220 data: Option<Box<InputData>>,
221 },
222 InputClosed {
223 id: DataId,
224 },
225 AllInputsClosed,
226}
227
228/// Represents the data of an incoming input event for integration testing.
229#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
230#[serde(untagged)]
231pub enum InputData {
232 /// Converts the given JSON object to the closest Arrow representation.
233 ///
234 /// An optional data type can be provided to guide the conversion.
235 JsonObject {
236 /// The input data as JSON.
237 ///
238 /// This can be a JSON array, object, string, number, boolean, etc. Dora automatically
239 /// converts the JSON to an Apache Arrow array, wrapping the data if needed (e.g. wrap
240 /// bare integers into an array because Arrow requires all data to be in array form).
241 data: serde_json::Value,
242 /// Specifies the arrow `DataType` of the `data` field.
243 ///
244 /// This field is optional. If not set, Dora will try to infer the data type automatically.
245 ///
246 /// Use this field if the exact data type is important (e.g. to distinguish between
247 /// different integer sizes).
248 data_type: Option<serde_json::Value>,
249 },
250 /// Load data from an Arrow IPC file.
251 ///
252 /// The data must be in the
253 /// [Arrow IPC file format](https://arrow.apache.org/docs/python/ipc.html#writing-and-reading-random-access-files)
254 ArrowFile {
255 /// The path to the Arrow IPC file.
256 path: PathBuf,
257 /// The optional batch index to read from the file.
258 ///
259 /// Arrow IPC files can contain multiple record batches. Only one batch is read per input
260 /// event. This field specifies which batch to read. Defaults to `0`.
261 #[serde(default)]
262 batch_index: usize,
263 /// Optional column name to read from the record batch.
264 ///
265 /// If not set, the entire record batch is read and converted to an Arrow `StructArray`.
266 column: Option<String>,
267 },
268}