1use std::time::Duration;
2
3use flow_component::SharedComponent;
4use tap_harness::{TestBlock, TestRunner};
5use tokio_stream::StreamExt;
6use wick_interface_types::{Field, OperationSignature};
7use wick_packet::{Entity, Invocation, RuntimeConfig};
8
9use crate::assertion_packet::ToAssertionPacket;
10use crate::{get_payload, TestError, UnitTest};
11
12#[must_use]
13pub fn get_description(test: &UnitTest) -> String {
14 format!(
15 "(test name='{}', operation='{}')",
16 test.test.name().map_or("Test", |v| v.as_str()),
17 test.test.operation()
18 )
19}
20
21pub async fn run_test<'a, 'b>(
22 name: String,
23 defs: Vec<&'a mut UnitTest<'a>>,
24 id: Option<&'b str>,
25 component: SharedComponent,
26 root_config: Option<RuntimeConfig>,
27) -> Result<TestRunner, TestError> {
28 let mut harness = TestRunner::new(Some(name));
29
30 for (i, def) in defs.into_iter().enumerate() {
31 let entity = id.map_or_else(
32 || Entity::local(def.test.operation()),
33 |id| Entity::operation(id, def.test.operation()),
34 );
35 let block = run_unit(i, def, entity, component.clone(), root_config.clone()).await?;
36 harness.add_block(block);
37 }
38
39 harness.run();
40 Ok(harness)
41}
42
43fn get_operation<'a>(component: &'a SharedComponent, operation: &str) -> Result<&'a OperationSignature, TestError> {
44 component
45 .signature()
46 .get_operation(operation)
47 .ok_or(TestError::OpNotFound(operation.to_owned()))
48}
49
50fn validate_config(name: Option<&String>, config: Option<&RuntimeConfig>, fields: &[Field]) -> Result<(), TestError> {
51 wick_packet::validation::expect_configuration_matches(name.unwrap_or(&"Test".to_owned()), config, fields)
52 .map_err(TestError::ConfigUnsatisfied)
53}
54
55#[allow(clippy::too_many_lines)]
56async fn run_unit<'a>(
57 _i: usize,
58 def: &'a mut UnitTest<'a>,
59 entity: Entity,
60 component: SharedComponent,
61 root_config: Option<RuntimeConfig>,
62) -> Result<TestBlock, TestError> {
63 let span = info_span!("unit test", name = def.test.name());
64
65 let op_config = def.test.config().and_then(|v| v.value().cloned());
66 let signature = get_operation(&component, def.test.operation())?;
67 validate_config(def.test.name(), op_config.as_ref(), &signature.config)?;
68
69 let (stream, inherent, explicit_done) = get_payload(def, root_config.as_ref(), op_config.as_ref())?;
70 let test_name = get_description(def);
71 let mut test_block = TestBlock::new(Some(test_name.clone()));
72 let prefix = |msg: &str| format!("{}: {}", test_name, if msg.is_empty() { "wick test" } else { msg });
73
74 span.in_scope(|| info!(%entity, "invoke"));
75
76 let invocation = Invocation::new(Entity::test(&test_name), entity, stream, inherent, &span);
77
78 let fut = tokio::time::timeout(
79 Duration::from_secs(5),
80 component.handle(invocation, op_config.clone(), Default::default()),
81 );
82
83 let result = fut
84 .await
85 .map_err(|e| TestError::InvocationTimeout(e.to_string()))?
86 .map_err(|e| TestError::InvocationFailed(e.to_string()));
87
88 if let Err(e) = result {
89 test_block.fail(prefix("invocation"), Some(vec![format!("Invocation failed: {}", e)]));
90 return Ok(test_block);
91 }
92
93 let stream = result.unwrap();
94
95 let packets = stream
96 .collect::<Result<Vec<_>, wick_packet::Error>>()
97 .await
98 .map_err(|e| TestError::InvocationFailed(e.to_string()))?;
99
100 let mut diagnostics = vec!["Actual Invocation Output (as JSON): ".to_owned()];
101 let mut output_lines: Vec<_> = packets.iter().map(|o| format!("{}", o.to_json())).collect();
102 diagnostics.append(&mut output_lines);
103 test_block.add_diagnostic_messages(diagnostics);
104
105 def.set_actual(packets);
106
107 let mut index = 0;
108
109 let success = loop {
110 if index >= def.test.outputs().len() {
111 break true;
113 }
114 let expected = def.test.outputs().get(index).unwrap();
115 let expected = expected.to_assertion_packet(root_config.as_ref(), op_config.as_ref())?;
116 if let Err(e) = def.check_next(&expected) {
117 match e {
118 TestError::Assertion(_ex, _act, assertion) => match assertion {
119 crate::error::AssertionFailure::Payload(exv, acv) => {
120 let diagnostic = assert_json_diff::assert_json_matches_no_panic(
121 &acv,
122 &exv,
123 assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive),
124 );
125 let diagnostic = Some(split_and_indent(&diagnostic.err().unwrap_or_default(), 3));
126
127 test_block.fail(prefix("payload data mismatch"), diagnostic);
128 }
129 crate::error::AssertionFailure::Flags(exf, acf) => {
130 test_block.fail(prefix("flag mismatch"), diag_flags(acf, exf));
131 }
132 crate::error::AssertionFailure::Name(exn, acf) => {
133 test_block.fail(prefix("port name mismatch"), diag_compare(&acf, &exn));
134 }
135 e @ crate::error::AssertionFailure::ActualNoData => {
136 test_block.fail(prefix("actual packet had no data"), Some(vec![e.to_string()]));
137 }
138 e @ crate::error::AssertionFailure::ExpectedNoData => {
139 test_block.fail(prefix("expected packet had no data"), Some(vec![e.to_string()]));
140 }
141 e @ crate::error::AssertionFailure::Contains(_) => {
142 test_block.fail(prefix("loose equality failure"), Some(vec![e.to_string()]));
143 }
144 e @ crate::error::AssertionFailure::Ordering(_) => {
145 test_block.fail(prefix("comparison failure"), Some(vec![e.to_string()]));
146 }
147 e @ crate::error::AssertionFailure::Regex(_) => {
148 test_block.fail(prefix("regex match failed"), Some(vec![e.to_string()]));
149 }
150 },
151 e => {
152 test_block.fail(prefix("other error"), Some(vec![e.to_string()]));
153 }
154 }
155 break false;
156 };
157
158 index += 1;
159 };
160
161 if success {
162 if let Err(packets) = def.finalize(&explicit_done) {
163 test_block.fail(
164 prefix("retrieved more packets than test expected."),
165 Some(packets.into_iter().map(|p| format!("{:?}", p)).collect()),
166 );
167 } else {
168 test_block.succeed(prefix("invocation succeeded"), None);
169 }
170 }
171
172 Ok(test_block)
173}
174
175fn diag_compare(actual: &str, expected: &str) -> Option<Vec<String>> {
176 let mut lines = vec!["Actual: ".to_owned()];
177 lines.extend(split_and_indent(actual, 3));
178 lines.push("Expected: ".to_owned());
179 lines.extend(split_and_indent(expected, 3));
180 Some(lines)
181}
182
183fn diag_flags(actual: u8, expected: u8) -> Option<Vec<String>> {
184 Some(vec![format!("Actual: {}", actual), format!("Expected: {}", expected)])
185}
186
187fn split_and_indent(text: &str, spaces: u8) -> Vec<String> {
188 let mut lines = vec![];
189 for line in text.lines() {
190 lines.push(format!("{:spaces$}{}", "", line, spaces = spaces as usize));
191 }
192 lines
193}