blackjack/
wait.rs

1// Copyright 2024 Ole Kliemann
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::check::assert_expr;
5use crate::collector::{Bucket, CollectedData, CollectedDataContainer};
6use crate::config::Config;
7use crate::error::{AssertDiagnostic, Error, Result, TestFailure, TestFailures};
8use crate::test_spec::WaitSpec;
9use tokio::time::{sleep, Duration};
10
11fn check_spec_against_data(
12    wait_spec: &WaitSpec,
13    collected_data: &CollectedData,
14) -> std::result::Result<(), AssertDiagnostic> {
15    let default: Bucket = Default::default();
16    let data = collected_data
17        .buckets
18        .get(&wait_spec.target)
19        .or_else(|| Some(&default))
20        .unwrap()
21        .data
22        .iter()
23        .map(|(_, value)| value)
24        .collect::<Vec<&serde_json::Value>>();
25    let expr = &wait_spec.condition;
26    assert_expr(&data, &expr)
27}
28
29pub async fn wait_for_all(
30    wait_specs: Vec<WaitSpec>,
31    collected_data: CollectedDataContainer,
32) -> Result<()> {
33    let mut timeout = wait_specs.iter().map(|spec| spec.timeout).max().unwrap() * 10;
34    timeout = timeout * Config::get().timeout_scaling.ceil() as u16;
35    log::debug!("Found max timeout cycles: {timeout}");
36
37    log::debug!("Waiting for {} conditions", wait_specs.len());
38    let mut wait_specs = wait_specs;
39    while timeout > 0 && wait_specs.len() > 0 {
40        log::trace!("trying to lock mutex");
41        let data = collected_data.lock().await;
42        log::trace!("mutex locked");
43        wait_specs = wait_specs
44            .into_iter()
45            .filter(|w| check_spec_against_data(w, &*data).is_err())
46            .collect();
47        drop(data);
48        timeout = timeout - 1;
49        log::trace!("Still {} conditions unfulfilled", wait_specs.len());
50        log::trace!("sleeping");
51        sleep(Duration::from_millis(100)).await;
52    }
53    let result = if wait_specs.len() == 0 {
54        Ok(())
55    } else {
56        let data = collected_data.lock().await;
57        let mut errors: Vec<TestFailure> = Vec::new();
58        for spec in wait_specs {
59            if let Err(assert_diagnostic) = check_spec_against_data(&spec, &*data) {
60                errors.push(TestFailure {
61                    assert_diagnostic,
62                    spec,
63                });
64            }
65        }
66        if errors.len() == 0 {
67            Ok(())
68        } else {
69            Err(Error::ConditionsFailed(TestFailures(errors)))
70        }
71    };
72    log::debug!("Wait concluded with {result:?}");
73    result
74}