wick_test/
runner.rs

1use std::time::Duration;
2
3use flow_component::SharedComponent;
4use tap_harness::{TestBlock, TestRunner};
5use tokio_stream::StreamExt;
6use wick_interface_types::{Field, OperationSignature};
7use wick_packet::{Entity, Invocation, RuntimeConfig};
8
9use crate::assertion_packet::ToAssertionPacket;
10use crate::{get_payload, TestError, UnitTest};
11
12#[must_use]
13pub fn get_description(test: &UnitTest) -> String {
14  format!(
15    "(test name='{}', operation='{}')",
16    test.test.name().map_or("Test", |v| v.as_str()),
17    test.test.operation()
18  )
19}
20
21pub async fn run_test<'a, 'b>(
22  name: String,
23  defs: Vec<&'a mut UnitTest<'a>>,
24  id: Option<&'b str>,
25  component: SharedComponent,
26  root_config: Option<RuntimeConfig>,
27) -> Result<TestRunner, TestError> {
28  let mut harness = TestRunner::new(Some(name));
29
30  for (i, def) in defs.into_iter().enumerate() {
31    let entity = id.map_or_else(
32      || Entity::local(def.test.operation()),
33      |id| Entity::operation(id, def.test.operation()),
34    );
35    let block = run_unit(i, def, entity, component.clone(), root_config.clone()).await?;
36    harness.add_block(block);
37  }
38
39  harness.run();
40  Ok(harness)
41}
42
43fn get_operation<'a>(component: &'a SharedComponent, operation: &str) -> Result<&'a OperationSignature, TestError> {
44  component
45    .signature()
46    .get_operation(operation)
47    .ok_or(TestError::OpNotFound(operation.to_owned()))
48}
49
50fn validate_config(name: Option<&String>, config: Option<&RuntimeConfig>, fields: &[Field]) -> Result<(), TestError> {
51  wick_packet::validation::expect_configuration_matches(name.unwrap_or(&"Test".to_owned()), config, fields)
52    .map_err(TestError::ConfigUnsatisfied)
53}
54
55#[allow(clippy::too_many_lines)]
56async fn run_unit<'a>(
57  _i: usize,
58  def: &'a mut UnitTest<'a>,
59  entity: Entity,
60  component: SharedComponent,
61  root_config: Option<RuntimeConfig>,
62) -> Result<TestBlock, TestError> {
63  let span = info_span!("unit test", name = def.test.name());
64
65  let op_config = def.test.config().and_then(|v| v.value().cloned());
66  let signature = get_operation(&component, def.test.operation())?;
67  validate_config(def.test.name(), op_config.as_ref(), &signature.config)?;
68
69  let (stream, inherent, explicit_done) = get_payload(def, root_config.as_ref(), op_config.as_ref())?;
70  let test_name = get_description(def);
71  let mut test_block = TestBlock::new(Some(test_name.clone()));
72  let prefix = |msg: &str| format!("{}: {}", test_name, if msg.is_empty() { "wick test" } else { msg });
73
74  span.in_scope(|| info!(%entity, "invoke"));
75
76  let invocation = Invocation::new(Entity::test(&test_name), entity, stream, inherent, &span);
77
78  let fut = tokio::time::timeout(
79    Duration::from_secs(5),
80    component.handle(invocation, op_config.clone(), Default::default()),
81  );
82
83  let result = fut
84    .await
85    .map_err(|e| TestError::InvocationTimeout(e.to_string()))?
86    .map_err(|e| TestError::InvocationFailed(e.to_string()));
87
88  if let Err(e) = result {
89    test_block.fail(prefix("invocation"), Some(vec![format!("Invocation failed: {}", e)]));
90    return Ok(test_block);
91  }
92
93  let stream = result.unwrap();
94
95  let packets = stream
96    .collect::<Result<Vec<_>, wick_packet::Error>>()
97    .await
98    .map_err(|e| TestError::InvocationFailed(e.to_string()))?;
99
100  let mut diagnostics = vec!["Actual Invocation Output (as JSON): ".to_owned()];
101  let mut output_lines: Vec<_> = packets.iter().map(|o| format!("{}", o.to_json())).collect();
102  diagnostics.append(&mut output_lines);
103  test_block.add_diagnostic_messages(diagnostics);
104
105  def.set_actual(packets);
106
107  let mut index = 0;
108
109  let success = loop {
110    if index >= def.test.outputs().len() {
111      // We've already checked all the outputs, so we're done.
112      break true;
113    }
114    let expected = def.test.outputs().get(index).unwrap();
115    let expected = expected.to_assertion_packet(root_config.as_ref(), op_config.as_ref())?;
116    if let Err(e) = def.check_next(&expected) {
117      match e {
118        TestError::Assertion(_ex, _act, assertion) => match assertion {
119          crate::error::AssertionFailure::Payload(exv, acv) => {
120            let diagnostic = assert_json_diff::assert_json_matches_no_panic(
121              &acv,
122              &exv,
123              assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive),
124            );
125            let diagnostic = Some(split_and_indent(&diagnostic.err().unwrap_or_default(), 3));
126
127            test_block.fail(prefix("payload data mismatch"), diagnostic);
128          }
129          crate::error::AssertionFailure::Flags(exf, acf) => {
130            test_block.fail(prefix("flag mismatch"), diag_flags(acf, exf));
131          }
132          crate::error::AssertionFailure::Name(exn, acf) => {
133            test_block.fail(prefix("port name mismatch"), diag_compare(&acf, &exn));
134          }
135          e @ crate::error::AssertionFailure::ActualNoData => {
136            test_block.fail(prefix("actual packet had no data"), Some(vec![e.to_string()]));
137          }
138          e @ crate::error::AssertionFailure::ExpectedNoData => {
139            test_block.fail(prefix("expected packet had no data"), Some(vec![e.to_string()]));
140          }
141          e @ crate::error::AssertionFailure::Contains(_) => {
142            test_block.fail(prefix("loose equality failure"), Some(vec![e.to_string()]));
143          }
144          e @ crate::error::AssertionFailure::Ordering(_) => {
145            test_block.fail(prefix("comparison failure"), Some(vec![e.to_string()]));
146          }
147          e @ crate::error::AssertionFailure::Regex(_) => {
148            test_block.fail(prefix("regex match failed"), Some(vec![e.to_string()]));
149          }
150        },
151        e => {
152          test_block.fail(prefix("other error"), Some(vec![e.to_string()]));
153        }
154      }
155      break false;
156    };
157
158    index += 1;
159  };
160
161  if success {
162    if let Err(packets) = def.finalize(&explicit_done) {
163      test_block.fail(
164        prefix("retrieved more packets than test expected."),
165        Some(packets.into_iter().map(|p| format!("{:?}", p)).collect()),
166      );
167    } else {
168      test_block.succeed(prefix("invocation succeeded"), None);
169    }
170  }
171
172  Ok(test_block)
173}
174
175fn diag_compare(actual: &str, expected: &str) -> Option<Vec<String>> {
176  let mut lines = vec!["Actual: ".to_owned()];
177  lines.extend(split_and_indent(actual, 3));
178  lines.push("Expected: ".to_owned());
179  lines.extend(split_and_indent(expected, 3));
180  Some(lines)
181}
182
183fn diag_flags(actual: u8, expected: u8) -> Option<Vec<String>> {
184  Some(vec![format!("Actual: {}", actual), format!("Expected: {}", expected)])
185}
186
187fn split_and_indent(text: &str, spaces: u8) -> Vec<String> {
188  let mut lines = vec![];
189  for line in text.lines() {
190    lines.push(format!("{:spaces$}{}", "", line, spaces = spaces as usize));
191  }
192  lines
193}