dora_node_api/
integration_testing.rs

1#![allow(clippy::test_attr_in_doctest)]
2
3//! This document describes how to write [integration tests] for Dora nodes.
4//!
5//! [integration tests]: https://en.wikipedia.org/wiki/Integration_testing
6//!
7//! # Usage
8//!
9//! There are currently three ways to run integration tests for Dora nodes:
10//!
11//! - calling [`setup_integration_testing`] before invoking the node's `main` function
12//! - testing the compiled executable by setting environment variables
13//! - using [`DoraNode::init_testing`](crate::DoraNode::init_testing) tests independent
14//!   of the `main` function
15//!
16//!
17//! ## Using `setup_integration_testing`
18//!
19//! The most straightforward way to write integration tests for a Dora node is to call the
20//! [`setup_integration_testing`] function in the test function and then invoke the node's `main`
21//! function. This way, the full behavior of the node (including the `main` function) is tested.
22//!
23//! This approach requires that the node's `main` function uses the
24//! [`DoraNode::init_from_env`](crate::DoraNode::init_from_env) function for initialization.
25//!
26//! See the [`setup_integration_testing`] function documentation for details.
27//!
28//! ### Example
29//! ```rust, ignore
30//! #[test]
31//! fn test_main_function() -> eyre::Result<()> {
32//!    // specify the events that should be sent to the node
33//!    let events = vec![
34//!        TimedIncomingEvent {
35//!            time_offset_secs: 0.01,
36//!            event: IncomingEvent::Input {
37//!                id: "tick".into(),
38//!                metadata: None,
39//!                data: None,
40//!            },
41//!        },
42//!        TimedIncomingEvent {
43//!            time_offset_secs: 0.055,
44//!            event: IncomingEvent::Stop,
45//!        },
46//!    ];
47//!    let inputs = dora_node_api::integration_testing::TestingInput::Input(
48//!        IntegrationTestInput::new("node_id".parse().unwrap(), events),
49//!    );
50//!
51//!    // send the node's outputs to a channel so we can verify them later
52//!    let (tx, rx) = flume::unbounded();
53//!    let outputs = dora_node_api::integration_testing::TestingOutput::ToChannel(tx);
54//!
55//!    // don't include time offsets in the outputs to make them deterministic
56//!    let options = TestingOptions {
57//!        skip_output_time_offsets: true,
58//!    };
59//!
60//!     // setup the integration testing environment -> this will make `DoraNode::init_from_env``
61//!     // initialize the node in testing mode
62//!     integration_testing::setup_integration_testing(inputs, outputs, options);
63//!
64//!     // call the node's main function
65//!     crate::main()?;
66//!
67//!     // collect the nodes's outputs and compare them
68//!     let outputs = rx.try_iter().collect::<Vec<_>>();
69//!     assert_eq!(outputs, expected_outputs);
70//!
71//!     Ok(())
72//! }
73//! ```
74//!
75//! ## Testing through Environment Variables
76//!
77//! Dora also supports testing nodes after compilation. This is the most comprehensive way to ensure
78//! that a node behaves as expected, as the testing will be done on the exact same executable as
79//! used in the dataflow.
80//!
81//! To enable this, the
82//! [`DoraNode::init_from_env`](crate::DoraNode::init_from_env) function provides
83//! built-in support for integration testing through environment variables.
84//!
85//! To start an integration test, run a node executable directly (i.e. don't go through the dora
86//! cli) with the following environment variables set:
87//!
88//! - `DORA_TEST_WITH_INPUTS`: Path to a JSON file that contains the inputs that should be sent
89//!   to the node.
90//!   - The file format is defined through the [`IntegrationTestInput`] struct (encoded as JSON).
91//! - `DORA_TEST_WRITE_OUTPUTS_TO` (optional): Path at which the output
92//!   [JSONL](https://jsonltools.com/what-is-jsonl) file should be written.
93//!   - defaults to a `outputs.jsonl` file next to the given inputs file
94//!   - See the [`TestingOutput`] struct for details on the output file format.
95//! - `DORA_TEST_NO_OUTPUT_TIME_OFFSET` (optional): If set to any value, the output JSONL file
96//!   will not contain time offsets for the outputs.
97//!   - This is useful to get deterministic outputs that can be compared against expected outputs.
98//!     (The time offsets depend on your machine, system load, OS, etc and thus vary between runs.)
99//!
100//! In integration test mode, the node will not connect to any Dora daemon or other nodes. Instead,
101//! it will read all incoming events from the given inputs file and write all outputs to the output
102//! file. The output file can then be compared against expected outputs to verify that the node
103//! behaves as intended.
104//!
105//! ### Input File Format
106//!
107//! The input file must be a JSON file that can be deserialized to a [`IntegrationTestInput`]
108//! instance.
109//!
110//! ### Example
111//!
112//! ```bash
113//! > DORA_TEST_WITH_INPUTS=path/to/inputs.json DORA_TEST_NO_OUTPUT_TIME_OFFSET=1 cargo r -p rust-dataflow-example-status-node
114//! ```
115//!
116//! ## Using `DoraNode::init_testing`
117//!
118//! While we recommend to include the `main` function in integration tests (as described above),
119//! there might also be cases where you want to run additional tests independent of the
120//! `main` function. To make this easier, Dora provides a
121//! [`DoraNode::init_testing`](crate::DoraNode::init_testing) initialization function to initialize
122//! an integration testing environment directly.
123//!
124//! This function is roughly equivalent to calling `setup_integration_testing` followed by
125//! `DoraNode::init_from_env`, but it does not modify any global state or thread-local state.
126//! Thus, it can be even used to run multiple integration tests as part of a single test function.
127//!
128//! ### Example
129//!
130//! ```rust, ignore
131//! #[test]
132//! fn test_run_function() -> eyre::Result<()> {
133//!     // specify the events that should be sent to the node
134//!     let events = vec![...]; // see above for details
135//!     let inputs = dora_node_api::integration_testing::TestingInput::Input(
136//!          IntegrationTestInput::new("node_id".parse().unwrap(), events),
137//!     );
138//!
139//!     let outputs = dora_node_api::integration_testing::TestingOutput::ToFile(
140//!        std::path::PathBuf::from("path/to/outputs.jsonl"),
141//!     );
142//!
143//!     let (node, events) = DoraNode::init_testing(inputs, outputs, Default::default())?;
144//!     do_something_with_node_and_events(node, events)?;
145//! }
146//! ```
147//!
148//!
149//! ## Generating Input Files
150//!
151//! While manually writing input files is possible, it is often more convenient to autogenerate them
152//! by recording actual dataflow runs. This can be done by running a Dora dataflow with the
153//! **`DORA_WRITE_EVENTS_TO`** environment variable set to a folder path. This will instruct all
154//! nodes in the started dataflow to write out their received inputs to a `inputs-{node_id}.json`
155//! file in the given folder.
156//!
157//! The file format used for these input files is identical to the format expected by the
158//! `DORA_TEST_WITH_INPUTS` environment variable. Thus, the generated files can be directly used
159//! as input files for integration tests.
160//!
161//! Note that the implementation of this feature is currently not optimized. All incoming events
162//! are buffered in memory before being written out at the end of the node's execution. Thus, it is
163//! not recommended to use this feature with long-running dataflows or dataflows that process
164//! large amounts of data.
165
166use std::cell::Cell;
167
168pub use dora_message::integration_testing_format::{self, IntegrationTestInput};
169
170thread_local! {
171    static TESTING_ENV: Cell<Option<Box<TestingCommunication>>> = const { Cell::new(None) };
172}
173
174pub(crate) fn take_testing_communication() -> Option<Box<TestingCommunication>> {
175    TESTING_ENV.with(|env| env.take())
176}
177
178/// Sets up an integration testing environment for the current thread.
179///
180/// This overrides the default behavior of
181/// [`DoraNode::init_from_env`](crate::DoraNode::init_from_env) to initialize
182/// a testing node with the given input, output, and options.
183///
184/// ## Implementation Details
185///
186/// This function sets up thread-local state that is read by the `DoraNode::init_from_env` function.
187/// Thus, it only affects the current thread.
188///
189/// Calls to `DoraNode::init_from_env` will consume the testing environment set up by this function
190/// and reset the thread-local state afterwards. Thus, subsequent calls to `DoraNode::init_from_env`
191/// will _not_ run in testing mode unless this function is called again.
192pub fn setup_integration_testing(
193    input: TestingInput,
194    output: TestingOutput,
195    options: TestingOptions,
196) {
197    TESTING_ENV.with(|env| {
198        env.set(Some(Box::new(TestingCommunication {
199            input,
200            output,
201            options,
202        })))
203    });
204}
205
206pub(crate) struct TestingCommunication {
207    pub input: TestingInput,
208    pub output: TestingOutput,
209    pub options: TestingOptions,
210}
211
212/// Provides input data for integration testing.
213pub enum TestingInput {
214    /// Loads the integration test input from the given JSON file.
215    ///
216    /// The given file must deserialize to an [`IntegrationTestInput`] instance.
217    FromJsonFile(std::path::PathBuf),
218    /// Directly provides the integration test input.
219    Input(IntegrationTestInput),
220}
221
222/// Specifies where to write the output data of an integration test.
223///
224/// ## Output File Format
225///
226/// The output file is a [JSONL](https://jsonltools.com/what-is-jsonl) (_"JSON lines"_) file. Each
227/// line in the file is a separate JSON object that represents one output sent by the node.
228///
229/// The following fields are present in each output object:
230///
231/// - `id` (string): The output identifier that was used when sending the output.
232/// - `data`: The output data (if any), encoded as JSON.
233///   - see below for details on the encoding
234/// - `data_type` (string or object): The [arrow::datatypes::DataType] of the output data
235///   - Serialized to JSON using the type's `serde::Serialize` implementation.
236/// - `time_offset_secs` (float, optional): The time offset in seconds between the start of the
237///   node and the time when the output was sent.
238///   - This field is omitted if the `DORA_TEST_NO_OUTPUT_TIME_OFFSET` environment is set.
239///
240///
241/// ## Output Data Encoding
242///
243/// We use the following steps to encode the `data` field of each output object:
244///
245/// - Convert the data to a [`RecordBatch`](arrow::array::RecordBatch) through:
246///   ```rust,ignore
247///   RecordBatch::try_from_iter([("inner", data)])
248///   ```
249/// - Convert the `RecordBatch` to a JSON object through the [`arrow_json::ArrayWriter`].
250pub enum TestingOutput {
251    /// Writes the output to the given JSONL file.
252    ///
253    /// The file will be created or overwritten.
254    ToFile(std::path::PathBuf),
255    /// Writes the output as JSONL file to the given writer.
256    ToWriter(Box<dyn std::io::Write + Send>),
257    /// Sends each output as a JSON object to the given [`flume::Receiver`].
258    ///
259    /// Note: When using a bounded channel, the node may block when the channel is full.
260    ToChannel(flume::Sender<serde_json::Map<String, serde_json::Value>>),
261}
262
263/// Options for integration testing.
264#[derive(Debug, Clone, Default)]
265pub struct TestingOptions {
266    /// Whether to skip including time offsets in the output.
267    ///
268    /// Skipping time offsets makes the outputs deterministic and easier to compare against
269    /// expected outputs.
270    pub skip_output_time_offsets: bool,
271}