dora_node_api/integration_testing.rs
1#![allow(clippy::test_attr_in_doctest)]
2
3//! This document describes how to write [integration tests] for Dora nodes.
4//!
5//! [integration tests]: https://en.wikipedia.org/wiki/Integration_testing
6//!
7//! # Usage
8//!
9//! There are currently three ways to run integration tests for Dora nodes:
10//!
11//! - calling [`setup_integration_testing`] before invoking the node's `main` function
12//! - testing the compiled executable by setting environment variables
13//! - using [`DoraNode::init_testing`](crate::DoraNode::init_testing) tests independent
14//! of the `main` function
15//!
16//!
17//! ## Using `setup_integration_testing`
18//!
19//! The most straightforward way to write integration tests for a Dora node is to call the
20//! [`setup_integration_testing`] function in the test function and then invoke the node's `main`
21//! function. This way, the full behavior of the node (including the `main` function) is tested.
22//!
23//! This approach requires that the node's `main` function uses the
24//! [`DoraNode::init_from_env`](crate::DoraNode::init_from_env) function for initialization.
25//!
26//! See the [`setup_integration_testing`] function documentation for details.
27//!
28//! ### Example
29//! ```rust, ignore
30//! #[test]
31//! fn test_main_function() -> eyre::Result<()> {
32//! // specify the events that should be sent to the node
33//! let events = vec![
34//! TimedIncomingEvent {
35//! time_offset_secs: 0.01,
36//! event: IncomingEvent::Input {
37//! id: "tick".into(),
38//! metadata: None,
39//! data: None,
40//! },
41//! },
42//! TimedIncomingEvent {
43//! time_offset_secs: 0.055,
44//! event: IncomingEvent::Stop,
45//! },
46//! ];
47//! let inputs = dora_node_api::integration_testing::TestingInput::Input(
48//! IntegrationTestInput::new("node_id".parse().unwrap(), events),
49//! );
50//!
51//! // send the node's outputs to a channel so we can verify them later
52//! let (tx, rx) = flume::unbounded();
53//! let outputs = dora_node_api::integration_testing::TestingOutput::ToChannel(tx);
54//!
55//! // don't include time offsets in the outputs to make them deterministic
56//! let options = TestingOptions {
57//! skip_output_time_offsets: true,
58//! };
59//!
60//! // setup the integration testing environment -> this will make `DoraNode::init_from_env``
61//! // initialize the node in testing mode
62//! integration_testing::setup_integration_testing(inputs, outputs, options);
63//!
64//! // call the node's main function
65//! crate::main()?;
66//!
67//! // collect the nodes's outputs and compare them
68//! let outputs = rx.try_iter().collect::<Vec<_>>();
69//! assert_eq!(outputs, expected_outputs);
70//!
71//! Ok(())
72//! }
73//! ```
74//!
75//! ## Testing through Environment Variables
76//!
77//! Dora also supports testing nodes after compilation. This is the most comprehensive way to ensure
78//! that a node behaves as expected, as the testing will be done on the exact same executable as
79//! used in the dataflow.
80//!
81//! To enable this, the
82//! [`DoraNode::init_from_env`](crate::DoraNode::init_from_env) function provides
83//! built-in support for integration testing through environment variables.
84//!
85//! To start an integration test, run a node executable directly (i.e. don't go through the dora
86//! cli) with the following environment variables set:
87//!
88//! - `DORA_TEST_WITH_INPUTS`: Path to a JSON file that contains the inputs that should be sent
89//! to the node.
90//! - The file format is defined through the [`IntegrationTestInput`] struct (encoded as JSON).
91//! - `DORA_TEST_WRITE_OUTPUTS_TO` (optional): Path at which the output
92//! [JSONL](https://jsonltools.com/what-is-jsonl) file should be written.
93//! - defaults to a `outputs.jsonl` file next to the given inputs file
94//! - See the [`TestingOutput`] struct for details on the output file format.
95//! - `DORA_TEST_NO_OUTPUT_TIME_OFFSET` (optional): If set to any value, the output JSONL file
96//! will not contain time offsets for the outputs.
97//! - This is useful to get deterministic outputs that can be compared against expected outputs.
98//! (The time offsets depend on your machine, system load, OS, etc and thus vary between runs.)
99//!
100//! In integration test mode, the node will not connect to any Dora daemon or other nodes. Instead,
101//! it will read all incoming events from the given inputs file and write all outputs to the output
102//! file. The output file can then be compared against expected outputs to verify that the node
103//! behaves as intended.
104//!
105//! ### Input File Format
106//!
107//! The input file must be a JSON file that can be deserialized to a [`IntegrationTestInput`]
108//! instance.
109//!
110//! ### Example
111//!
112//! ```bash
113//! > DORA_TEST_WITH_INPUTS=path/to/inputs.json DORA_TEST_NO_OUTPUT_TIME_OFFSET=1 cargo r -p rust-dataflow-example-status-node
114//! ```
115//!
116//! ## Using `DoraNode::init_testing`
117//!
118//! While we recommend to include the `main` function in integration tests (as described above),
119//! there might also be cases where you want to run additional tests independent of the
120//! `main` function. To make this easier, Dora provides a
121//! [`DoraNode::init_testing`](crate::DoraNode::init_testing) initialization function to initialize
122//! an integration testing environment directly.
123//!
124//! This function is roughly equivalent to calling `setup_integration_testing` followed by
125//! `DoraNode::init_from_env`, but it does not modify any global state or thread-local state.
126//! Thus, it can be even used to run multiple integration tests as part of a single test function.
127//!
128//! ### Example
129//!
130//! ```rust, ignore
131//! #[test]
132//! fn test_run_function() -> eyre::Result<()> {
133//! // specify the events that should be sent to the node
134//! let events = vec![...]; // see above for details
135//! let inputs = dora_node_api::integration_testing::TestingInput::Input(
136//! IntegrationTestInput::new("node_id".parse().unwrap(), events),
137//! );
138//!
139//! let outputs = dora_node_api::integration_testing::TestingOutput::ToFile(
140//! std::path::PathBuf::from("path/to/outputs.jsonl"),
141//! );
142//!
143//! let (node, events) = DoraNode::init_testing(inputs, outputs, Default::default())?;
144//! do_something_with_node_and_events(node, events)?;
145//! }
146//! ```
147//!
148//!
149//! ## Generating Input Files
150//!
151//! While manually writing input files is possible, it is often more convenient to autogenerate them
152//! by recording actual dataflow runs. This can be done by running a Dora dataflow with the
153//! **`DORA_WRITE_EVENTS_TO`** environment variable set to a folder path. This will instruct all
154//! nodes in the started dataflow to write out their received inputs to a `inputs-{node_id}.json`
155//! file in the given folder.
156//!
157//! The file format used for these input files is identical to the format expected by the
158//! `DORA_TEST_WITH_INPUTS` environment variable. Thus, the generated files can be directly used
159//! as input files for integration tests.
160//!
161//! Note that the implementation of this feature is currently not optimized. All incoming events
162//! are buffered in memory before being written out at the end of the node's execution. Thus, it is
163//! not recommended to use this feature with long-running dataflows or dataflows that process
164//! large amounts of data.
165
166use std::cell::Cell;
167
168pub use dora_message::integration_testing_format::{self, IntegrationTestInput};
169
170thread_local! {
171 static TESTING_ENV: Cell<Option<Box<TestingCommunication>>> = const { Cell::new(None) };
172}
173
174pub(crate) fn take_testing_communication() -> Option<Box<TestingCommunication>> {
175 TESTING_ENV.with(|env| env.take())
176}
177
178/// Sets up an integration testing environment for the current thread.
179///
180/// This overrides the default behavior of
181/// [`DoraNode::init_from_env`](crate::DoraNode::init_from_env) to initialize
182/// a testing node with the given input, output, and options.
183///
184/// ## Implementation Details
185///
186/// This function sets up thread-local state that is read by the `DoraNode::init_from_env` function.
187/// Thus, it only affects the current thread.
188///
189/// Calls to `DoraNode::init_from_env` will consume the testing environment set up by this function
190/// and reset the thread-local state afterwards. Thus, subsequent calls to `DoraNode::init_from_env`
191/// will _not_ run in testing mode unless this function is called again.
192pub fn setup_integration_testing(
193 input: TestingInput,
194 output: TestingOutput,
195 options: TestingOptions,
196) {
197 TESTING_ENV.with(|env| {
198 env.set(Some(Box::new(TestingCommunication {
199 input,
200 output,
201 options,
202 })))
203 });
204}
205
206pub(crate) struct TestingCommunication {
207 pub input: TestingInput,
208 pub output: TestingOutput,
209 pub options: TestingOptions,
210}
211
212/// Provides input data for integration testing.
213pub enum TestingInput {
214 /// Loads the integration test input from the given JSON file.
215 ///
216 /// The given file must deserialize to an [`IntegrationTestInput`] instance.
217 FromJsonFile(std::path::PathBuf),
218 /// Directly provides the integration test input.
219 Input(IntegrationTestInput),
220}
221
222/// Specifies where to write the output data of an integration test.
223///
224/// ## Output File Format
225///
226/// The output file is a [JSONL](https://jsonltools.com/what-is-jsonl) (_"JSON lines"_) file. Each
227/// line in the file is a separate JSON object that represents one output sent by the node.
228///
229/// The following fields are present in each output object:
230///
231/// - `id` (string): The output identifier that was used when sending the output.
232/// - `data`: The output data (if any), encoded as JSON.
233/// - see below for details on the encoding
234/// - `data_type` (string or object): The [arrow::datatypes::DataType] of the output data
235/// - Serialized to JSON using the type's `serde::Serialize` implementation.
236/// - `time_offset_secs` (float, optional): The time offset in seconds between the start of the
237/// node and the time when the output was sent.
238/// - This field is omitted if the `DORA_TEST_NO_OUTPUT_TIME_OFFSET` environment is set.
239///
240///
241/// ## Output Data Encoding
242///
243/// We use the following steps to encode the `data` field of each output object:
244///
245/// - Convert the data to a [`RecordBatch`](arrow::array::RecordBatch) through:
246/// ```rust,ignore
247/// RecordBatch::try_from_iter([("inner", data)])
248/// ```
249/// - Convert the `RecordBatch` to a JSON object through the [`arrow_json::ArrayWriter`].
250pub enum TestingOutput {
251 /// Writes the output to the given JSONL file.
252 ///
253 /// The file will be created or overwritten.
254 ToFile(std::path::PathBuf),
255 /// Writes the output as JSONL file to the given writer.
256 ToWriter(Box<dyn std::io::Write + Send>),
257 /// Sends each output as a JSON object to the given [`flume::Receiver`].
258 ///
259 /// Note: When using a bounded channel, the node may block when the channel is full.
260 ToChannel(flume::Sender<serde_json::Map<String, serde_json::Value>>),
261}
262
263/// Options for integration testing.
264#[derive(Debug, Clone, Default)]
265pub struct TestingOptions {
266 /// Whether to skip including time offsets in the output.
267 ///
268 /// Skipping time offsets makes the outputs deterministic and easier to compare against
269 /// expected outputs.
270 pub skip_output_time_offsets: bool,
271}