blackjack/
run_test.rs

1// Copyright 2024 Ole Kliemann
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::collector::{Bucket, CollectedDataContainer, Collector};
5use crate::config::Config;
6use crate::error::{Error, FailedTest, Result, TestResult};
7use crate::file::{list_directories, list_files};
8use crate::manifest::ManifestHandle;
9use crate::namespace::NamespaceHandle;
10use crate::result_formatting::log_result;
11use crate::script::execute_script;
12use crate::test_spec::{EnvSubst, StepSpec, TestSpec, TestType, WaitSpec};
13use crate::wait::wait_for_all;
14use kube::Client;
15use std::cmp;
16use std::collections::HashMap;
17use std::path::{Path, PathBuf};
18use tokio::task::{JoinHandle, JoinSet};
19use tokio::time::{sleep, Duration};
20
21fn make_namespace(name: &String) -> String {
22    let mut truncated_name = name.clone();
23    truncated_name.truncate(32);
24    format!(
25        "{}-{}-{}",
26        truncated_name,
27        random_word::gen_len(8, random_word::Lang::En)
28            .or_else(|| Some(""))
29            .unwrap(),
30        random_word::gen_len(8, random_word::Lang::En)
31            .or_else(|| Some(""))
32            .unwrap()
33    )
34}
35
36async fn run_step(
37    client: Client,
38    dirname: PathBuf,
39    test_name: &str,
40    step: StepSpec,
41    manifests: &mut Vec<ManifestHandle>,
42    collectors: &mut Vec<Collector>,
43    collected_data: &CollectedDataContainer,
44    inherited_env: HashMap<String, String>,
45) -> Result<HashMap<String, String>> {
46    let mut env: HashMap<String, String> = inherited_env;
47    log::debug!("Creating collector");
48    let watches: Vec<_> = step.watch.into_iter().map(|w| w.subst_env(&env)).collect();
49    collectors.push(Collector::new(client.clone(), watches, collected_data.clone()).await?);
50
51    log::debug!("Setting buckets");
52    for bucket_spec in &step.bucket {
53        let mut data = collected_data.lock().await;
54        (*data)
55            .buckets
56            .entry(bucket_spec.name.clone())
57            .and_modify(|bucket| bucket.allowed_operations = bucket_spec.operations.clone())
58            .or_insert_with(|| Bucket::new(bucket_spec.operations.clone()));
59    }
60
61    log::debug!("Applying manifests");
62    for apply in step.apply {
63        let apply = apply.subst_env(&env);
64        log::debug!("Creating manifest: {:?}", apply);
65        let handle = ManifestHandle::new(apply, dirname.clone(), client.clone()).await?;
66        log::debug!("Applying manifest");
67        handle.apply().await?;
68        manifests.push(handle);
69    }
70
71    log::debug!("Deleting resources");
72    for delete in step.delete {
73        let delete = delete.subst_env(&env);
74        log::debug!("Deleting manifest: {:?}", delete);
75        ManifestHandle::new(delete, dirname.clone(), client.clone())
76            .await?
77            .delete()
78            .await?;
79    }
80
81    log::debug!("Running scripts");
82    for script in step.script {
83        let (status, stdout, stderr) = execute_script(&script, dirname.clone(), &mut env).await?;
84        status
85            .success()
86            .then_some(())
87            .ok_or(Error::ScriptFailed(stdout, stderr))?;
88    }
89    log::debug!(
90        "{}/{} environment after script: {:?}",
91        test_name,
92        step.name,
93        env
94    );
95
96    log::debug!("Sleeping");
97    if step.sleep > 0 {
98        sleep(Duration::from_secs(
99            (step.sleep * Config::get().timeout_scaling.ceil() as u16).into(),
100        ))
101        .await;
102    }
103
104    log::debug!("Waiting");
105    let wait: Vec<WaitSpec> = step.wait.into_iter().map(|w| w.subst_env(&env)).collect();
106    if wait.len() > 0 {
107        wait_for_all(wait, collected_data.clone()).await?;
108    }
109
110    log::debug!("Done");
111    Ok(env)
112}
113
114async fn run_steps(
115    client: Client,
116    namespace: &String,
117    test_spec: TestSpec,
118    manifests: &mut Vec<ManifestHandle>,
119    collectors: &mut Vec<Collector>,
120    collected_data: &CollectedDataContainer,
121) -> TestResult {
122    let mut env: HashMap<String, String> = HashMap::new();
123    env.insert("BLACKJACK_NAMESPACE".to_string(), namespace.to_string());
124    for step in test_spec.steps {
125        log::info!("Running step {}/{}", test_spec.name, step.name);
126        log::debug!(
127            "{}/{} current environment: {:?}",
128            test_spec.name,
129            step.name,
130            env
131        );
132        let step_name = step.name.clone();
133        env = run_step(
134            client.clone(),
135            test_spec.dir.clone(),
136            &test_spec.name,
137            step,
138            manifests,
139            collectors,
140            collected_data,
141            env,
142        )
143        .await
144        .map_err(|err| {
145            log::error!("Test step {}/{} failed", test_spec.name, step_name);
146            FailedTest {
147                test_name: test_spec.name.clone(),
148                step_name: step_name,
149                failure: err,
150            }
151        })?;
152    }
153
154    Ok(test_spec.name.clone())
155}
156
157async fn run_test(client: Client, test_spec: TestSpec) -> (TestResult, TestSpec, Option<JoinHandle<()>>) {
158    let namespace = make_namespace(&test_spec.name);
159    log::info!(
160        "Running test '{}' with unique namespace '{}'",
161        test_spec.name,
162        namespace
163    );
164    let namespace_handle = NamespaceHandle::new(client.clone(), &namespace);
165    let ns = namespace_handle.create().await.map_err(|err| FailedTest {
166        test_name: test_spec.name.clone(),
167        step_name: "".to_string(),
168        failure: err,
169    });
170    if ns.is_err() {
171        return (Err(ns.unwrap_err()), test_spec, None);
172    }
173
174    let mut manifests = Vec::<ManifestHandle>::new();
175    let collected_data = Collector::new_data();
176    let mut collectors = Vec::<Collector>::new();
177
178    let test_name = test_spec.name.clone();
179    let test_task = run_steps(
180        client.clone(),
181        &namespace,
182        test_spec.clone(),
183        &mut manifests,
184        &mut collectors,
185        &collected_data,
186    );
187    let sigint = tokio::signal::ctrl_c();
188    let result = tokio::select! {
189        test_result = test_task => test_result,
190        _ = sigint => {
191            log::info!("Received SIGINT, exiting...");
192            Err(FailedTest {
193                test_name: test_name,
194                step_name: "".to_string(),
195                failure: Error::SIGINT,
196            })
197        }
198    };
199
200    log::debug!("step returned with success: {}", result.is_ok());
201
202    log::debug!("initiating cleanup");
203    let cleanup_task = tokio::task::spawn(async move {
204        let mut results: Vec<Result<()>> = vec![];
205        for mut collector in collectors {
206            results.push(collector.stop().await);
207        }
208        {
209            let data = collected_data.lock().await;
210            results.push((*data).cleanup(client).await);
211        }
212        for manifest in manifests {
213            results.push(manifest.delete().await);
214        }
215        results.push(namespace_handle.delete().await);
216        for error in results.into_iter().filter(|r| r.is_err()) {
217            log::warn!("Errors during cleanup: {:?}", error.unwrap_err());
218        }
219    });
220
221    log::debug!("cleanup done");
222    (result, test_spec, Some(cleanup_task))
223}
224
225async fn run_all_tests(
226    client: Client,
227    test_specs: Vec<TestSpec>,
228    parallel: u16,
229    attempts: u16,
230) -> Result<Vec<TestResult>> {
231    let mut results: Vec<TestResult> = vec![];
232    let mut tasks = JoinSet::new();
233    let mut it = test_specs.into_iter();
234    let mut cleanup_tasks: Vec<JoinHandle<()>> = vec![];
235    let mut attempt_counter: HashMap<String, u16> = HashMap::new();
236
237    let mut next = it.next();
238    loop {
239        while next.is_some() && (tasks.len() < parallel.into()) {
240            let client = client.clone();
241            tasks.spawn(async move { run_test(client, next.unwrap()).await });
242            next = it.next();
243        }
244        if let Some(result) = tasks.join_next().await {
245            let (test_result, test_spec, cleanup_task) =
246                result.map_err(|err| Error::JoinError(err))?;
247            attempt_counter
248                .entry(test_spec.name.clone())
249                .and_modify(|i| *i += 1)
250                .or_insert(1);
251            if let Some(ct) = cleanup_task {
252                cleanup_tasks.push(ct);
253            }
254            if test_result.is_ok() {
255                results.push(test_result);
256            } else {
257                let attempts = test_spec.attempts.or(Some(attempts)).unwrap();
258                if attempt_counter.get(&test_spec.name).unwrap() < &attempts {
259                    it = it.chain(std::iter::once(test_spec)).collect::<Vec<_>>().into_iter();
260                    if next.is_none() {
261                        next = it.next();
262                    }
263                } else {
264                    results.push(test_result);
265                    while next.is_some() {
266                        let test_spec = next.unwrap();
267                        results.push(Err(FailedTest {
268                            test_name: test_spec.name,
269                            step_name: "".to_string(),
270                            failure: Error::NotExecuted,
271                        }));
272                        next = it.next();
273                    }
274                }
275            }
276        } else {
277            break;
278        }
279    }
280    log::info!("Waiting for all cleanup tasks");
281    for task in cleanup_tasks {
282        let sigint = tokio::signal::ctrl_c();
283        tokio::select! {
284            _ = task => {},
285            _ = sigint => {
286                log::info!("Received another SIGINT, exiting without cleanup");
287                break;
288            }
289        };
290    }
291
292    Ok(results)
293}
294
295pub async fn run_test_suite(dirname: &Path) -> Result<()> {
296    let client = Client::try_default().await?;
297    let test_specs = discover_tests(&dirname.to_path_buf()).await?;
298    let mut sorted_test_specs = test_specs
299        .into_iter()
300        .fold(HashMap::new(), |mut map, item| {
301            map.entry(item.test_type.clone())
302                .or_insert(Vec::new())
303                .push(item);
304            map
305        });
306    for (_, tests) in &mut sorted_test_specs {
307        tests.sort_by(|lhs, rhs| match (&lhs.ordering, &rhs.ordering) {
308            (Some(ref l), Some(ref r)) => l.cmp(r),
309            (Some(_), None) => cmp::Ordering::Greater,
310            (None, Some(_)) => cmp::Ordering::Less,
311            (None, None) => cmp::Ordering::Equal,
312        });
313    }
314    let mut results: Vec<TestResult> = vec![];
315    log::info!("Running cluster tests");
316    if let Some(cluster_tests) = sorted_test_specs.remove(&TestType::Cluster) {
317        results.append(
318            &mut run_all_tests(
319                client.clone(),
320                cluster_tests,
321                Config::get().cluster.parallel,
322                Config::get().cluster.attempts,
323            )
324            .await?,
325        );
326    }
327    if results.iter().all(|r| r.is_ok()) {
328        log::info!("Running user tests");
329        if let Some(user_tests) = sorted_test_specs.remove(&TestType::User) {
330            results.append(
331                &mut run_all_tests(
332                    client.clone(),
333                    user_tests,
334                    Config::get().user.parallel,
335                    Config::get().user.attempts,
336                )
337                .await?,
338            );
339        }
340    } else {
341        log::error!("Skipping user tests after cluster test failed");
342    }
343    if results.is_empty() {
344        return Err(Error::NoTestsFoundError);
345    }
346    let mut success = true;
347    for result in results {
348        log_result(&result);
349        if result.is_err() {
350            success = false;
351        }
352    }
353    success.then_some(()).ok_or(Error::SomeTestsFailedError)
354}
355
356async fn discover_tests(dirname: &PathBuf) -> Result<Vec<TestSpec>> {
357    log::trace!("Discovering tests: {dirname:?}");
358    let mut result: Vec<TestSpec> = vec![];
359    let files = list_files(dirname).await?;
360    if files
361        .iter()
362        .filter_map(|e| e.file_name())
363        .find(|&x| x == "test.yaml")
364        .is_some()
365    {
366        result.push(TestSpec::new_from_file(dirname.clone()).await?);
367    } else {
368        let dirs: Vec<PathBuf> = list_directories(dirname).await?;
369        log::trace!("Descending into {dirs:?}");
370        for dir in dirs {
371            result.append(&mut Box::pin(discover_tests(&dir)).await?);
372        }
373    }
374    Ok(result)
375}