1use crate::check::assert_expr;
5use crate::collector::{Bucket, CollectedData, CollectedDataContainer};
6use crate::config::Config;
7use crate::error::{AssertDiagnostic, Error, Result, TestFailure, TestFailures};
8use crate::test_spec::WaitSpec;
9use tokio::time::{sleep, Duration};
10
11fn check_spec_against_data(
12 wait_spec: &WaitSpec,
13 collected_data: &CollectedData,
14) -> std::result::Result<(), AssertDiagnostic> {
15 let default: Bucket = Default::default();
16 let data = collected_data
17 .buckets
18 .get(&wait_spec.target)
19 .or_else(|| Some(&default))
20 .unwrap()
21 .data
22 .iter()
23 .map(|(_, value)| value)
24 .collect::<Vec<&serde_json::Value>>();
25 let expr = &wait_spec.condition;
26 assert_expr(&data, &expr)
27}
28
29pub async fn wait_for_all(
30 wait_specs: Vec<WaitSpec>,
31 collected_data: CollectedDataContainer,
32) -> Result<()> {
33 let mut timeout = wait_specs.iter().map(|spec| spec.timeout).max().unwrap() * 10;
34 timeout = timeout * Config::get().timeout_scaling.ceil() as u16;
35 log::debug!("Found max timeout cycles: {timeout}");
36
37 log::debug!("Waiting for {} conditions", wait_specs.len());
38 let mut wait_specs = wait_specs;
39 while timeout > 0 && wait_specs.len() > 0 {
40 log::trace!("trying to lock mutex");
41 let data = collected_data.lock().await;
42 log::trace!("mutex locked");
43 wait_specs = wait_specs
44 .into_iter()
45 .filter(|w| check_spec_against_data(w, &*data).is_err())
46 .collect();
47 drop(data);
48 timeout = timeout - 1;
49 log::trace!("Still {} conditions unfulfilled", wait_specs.len());
50 log::trace!("sleeping");
51 sleep(Duration::from_millis(100)).await;
52 }
53 let result = if wait_specs.len() == 0 {
54 Ok(())
55 } else {
56 let data = collected_data.lock().await;
57 let mut errors: Vec<TestFailure> = Vec::new();
58 for spec in wait_specs {
59 if let Err(assert_diagnostic) = check_spec_against_data(&spec, &*data) {
60 errors.push(TestFailure {
61 assert_diagnostic,
62 spec,
63 });
64 }
65 }
66 if errors.len() == 0 {
67 Ok(())
68 } else {
69 Err(Error::ConditionsFailed(TestFailures(errors)))
70 }
71 };
72 log::debug!("Wait concluded with {result:?}");
73 result
74}