1use crate::collector::{Bucket, CollectedDataContainer, Collector};
5use crate::config::Config;
6use crate::error::{Error, FailedTest, Result, TestResult};
7use crate::file::{list_directories, list_files};
8use crate::manifest::ManifestHandle;
9use crate::namespace::NamespaceHandle;
10use crate::result_formatting::log_result;
11use crate::script::execute_script;
12use crate::test_spec::{EnvSubst, StepSpec, TestSpec, TestType, WaitSpec};
13use crate::wait::wait_for_all;
14use kube::Client;
15use std::cmp;
16use std::collections::HashMap;
17use std::path::{Path, PathBuf};
18use tokio::task::{JoinHandle, JoinSet};
19use tokio::time::{sleep, Duration};
20
21fn make_namespace(name: &String) -> String {
22 let mut truncated_name = name.clone();
23 truncated_name.truncate(32);
24 format!(
25 "{}-{}-{}",
26 truncated_name,
27 random_word::gen_len(8, random_word::Lang::En)
28 .or_else(|| Some(""))
29 .unwrap(),
30 random_word::gen_len(8, random_word::Lang::En)
31 .or_else(|| Some(""))
32 .unwrap()
33 )
34}
35
36async fn run_step(
37 client: Client,
38 dirname: PathBuf,
39 test_name: &str,
40 step: StepSpec,
41 manifests: &mut Vec<ManifestHandle>,
42 collectors: &mut Vec<Collector>,
43 collected_data: &CollectedDataContainer,
44 inherited_env: HashMap<String, String>,
45) -> Result<HashMap<String, String>> {
46 let mut env: HashMap<String, String> = inherited_env;
47 log::debug!("Creating collector");
48 let watches: Vec<_> = step.watch.into_iter().map(|w| w.subst_env(&env)).collect();
49 collectors.push(Collector::new(client.clone(), watches, collected_data.clone()).await?);
50
51 log::debug!("Setting buckets");
52 for bucket_spec in &step.bucket {
53 let mut data = collected_data.lock().await;
54 (*data)
55 .buckets
56 .entry(bucket_spec.name.clone())
57 .and_modify(|bucket| bucket.allowed_operations = bucket_spec.operations.clone())
58 .or_insert_with(|| Bucket::new(bucket_spec.operations.clone()));
59 }
60
61 log::debug!("Applying manifests");
62 for apply in step.apply {
63 let apply = apply.subst_env(&env);
64 log::debug!("Creating manifest: {:?}", apply);
65 let handle = ManifestHandle::new(apply, dirname.clone(), client.clone()).await?;
66 log::debug!("Applying manifest");
67 handle.apply().await?;
68 manifests.push(handle);
69 }
70
71 log::debug!("Deleting resources");
72 for delete in step.delete {
73 let delete = delete.subst_env(&env);
74 log::debug!("Deleting manifest: {:?}", delete);
75 ManifestHandle::new(delete, dirname.clone(), client.clone())
76 .await?
77 .delete()
78 .await?;
79 }
80
81 log::debug!("Running scripts");
82 for script in step.script {
83 let (status, stdout, stderr) = execute_script(&script, dirname.clone(), &mut env).await?;
84 status
85 .success()
86 .then_some(())
87 .ok_or(Error::ScriptFailed(stdout, stderr))?;
88 }
89 log::debug!(
90 "{}/{} environment after script: {:?}",
91 test_name,
92 step.name,
93 env
94 );
95
96 log::debug!("Sleeping");
97 if step.sleep > 0 {
98 sleep(Duration::from_secs(
99 (step.sleep * Config::get().timeout_scaling.ceil() as u16).into(),
100 ))
101 .await;
102 }
103
104 log::debug!("Waiting");
105 let wait: Vec<WaitSpec> = step.wait.into_iter().map(|w| w.subst_env(&env)).collect();
106 if wait.len() > 0 {
107 wait_for_all(wait, collected_data.clone()).await?;
108 }
109
110 log::debug!("Done");
111 Ok(env)
112}
113
114async fn run_steps(
115 client: Client,
116 namespace: &String,
117 test_spec: TestSpec,
118 manifests: &mut Vec<ManifestHandle>,
119 collectors: &mut Vec<Collector>,
120 collected_data: &CollectedDataContainer,
121) -> TestResult {
122 let mut env: HashMap<String, String> = HashMap::new();
123 env.insert("BLACKJACK_NAMESPACE".to_string(), namespace.to_string());
124 for step in test_spec.steps {
125 log::info!("Running step {}/{}", test_spec.name, step.name);
126 log::debug!(
127 "{}/{} current environment: {:?}",
128 test_spec.name,
129 step.name,
130 env
131 );
132 let step_name = step.name.clone();
133 env = run_step(
134 client.clone(),
135 test_spec.dir.clone(),
136 &test_spec.name,
137 step,
138 manifests,
139 collectors,
140 collected_data,
141 env,
142 )
143 .await
144 .map_err(|err| {
145 log::error!("Test step {}/{} failed", test_spec.name, step_name);
146 FailedTest {
147 test_name: test_spec.name.clone(),
148 step_name: step_name,
149 failure: err,
150 }
151 })?;
152 }
153
154 Ok(test_spec.name.clone())
155}
156
157async fn run_test(client: Client, test_spec: TestSpec) -> (TestResult, TestSpec, Option<JoinHandle<()>>) {
158 let namespace = make_namespace(&test_spec.name);
159 log::info!(
160 "Running test '{}' with unique namespace '{}'",
161 test_spec.name,
162 namespace
163 );
164 let namespace_handle = NamespaceHandle::new(client.clone(), &namespace);
165 let ns = namespace_handle.create().await.map_err(|err| FailedTest {
166 test_name: test_spec.name.clone(),
167 step_name: "".to_string(),
168 failure: err,
169 });
170 if ns.is_err() {
171 return (Err(ns.unwrap_err()), test_spec, None);
172 }
173
174 let mut manifests = Vec::<ManifestHandle>::new();
175 let collected_data = Collector::new_data();
176 let mut collectors = Vec::<Collector>::new();
177
178 let test_name = test_spec.name.clone();
179 let test_task = run_steps(
180 client.clone(),
181 &namespace,
182 test_spec.clone(),
183 &mut manifests,
184 &mut collectors,
185 &collected_data,
186 );
187 let sigint = tokio::signal::ctrl_c();
188 let result = tokio::select! {
189 test_result = test_task => test_result,
190 _ = sigint => {
191 log::info!("Received SIGINT, exiting...");
192 Err(FailedTest {
193 test_name: test_name,
194 step_name: "".to_string(),
195 failure: Error::SIGINT,
196 })
197 }
198 };
199
200 log::debug!("step returned with success: {}", result.is_ok());
201
202 log::debug!("initiating cleanup");
203 let cleanup_task = tokio::task::spawn(async move {
204 let mut results: Vec<Result<()>> = vec![];
205 for mut collector in collectors {
206 results.push(collector.stop().await);
207 }
208 {
209 let data = collected_data.lock().await;
210 results.push((*data).cleanup(client).await);
211 }
212 for manifest in manifests {
213 results.push(manifest.delete().await);
214 }
215 results.push(namespace_handle.delete().await);
216 for error in results.into_iter().filter(|r| r.is_err()) {
217 log::warn!("Errors during cleanup: {:?}", error.unwrap_err());
218 }
219 });
220
221 log::debug!("cleanup done");
222 (result, test_spec, Some(cleanup_task))
223}
224
225async fn run_all_tests(
226 client: Client,
227 test_specs: Vec<TestSpec>,
228 parallel: u16,
229 attempts: u16,
230) -> Result<Vec<TestResult>> {
231 let mut results: Vec<TestResult> = vec![];
232 let mut tasks = JoinSet::new();
233 let mut it = test_specs.into_iter();
234 let mut cleanup_tasks: Vec<JoinHandle<()>> = vec![];
235 let mut attempt_counter: HashMap<String, u16> = HashMap::new();
236
237 let mut next = it.next();
238 loop {
239 while next.is_some() && (tasks.len() < parallel.into()) {
240 let client = client.clone();
241 tasks.spawn(async move { run_test(client, next.unwrap()).await });
242 next = it.next();
243 }
244 if let Some(result) = tasks.join_next().await {
245 let (test_result, test_spec, cleanup_task) =
246 result.map_err(|err| Error::JoinError(err))?;
247 attempt_counter
248 .entry(test_spec.name.clone())
249 .and_modify(|i| *i += 1)
250 .or_insert(1);
251 if let Some(ct) = cleanup_task {
252 cleanup_tasks.push(ct);
253 }
254 if test_result.is_ok() {
255 results.push(test_result);
256 } else {
257 let attempts = test_spec.attempts.or(Some(attempts)).unwrap();
258 if attempt_counter.get(&test_spec.name).unwrap() < &attempts {
259 it = it.chain(std::iter::once(test_spec)).collect::<Vec<_>>().into_iter();
260 if next.is_none() {
261 next = it.next();
262 }
263 } else {
264 results.push(test_result);
265 while next.is_some() {
266 let test_spec = next.unwrap();
267 results.push(Err(FailedTest {
268 test_name: test_spec.name,
269 step_name: "".to_string(),
270 failure: Error::NotExecuted,
271 }));
272 next = it.next();
273 }
274 }
275 }
276 } else {
277 break;
278 }
279 }
280 log::info!("Waiting for all cleanup tasks");
281 for task in cleanup_tasks {
282 let sigint = tokio::signal::ctrl_c();
283 tokio::select! {
284 _ = task => {},
285 _ = sigint => {
286 log::info!("Received another SIGINT, exiting without cleanup");
287 break;
288 }
289 };
290 }
291
292 Ok(results)
293}
294
295pub async fn run_test_suite(dirname: &Path) -> Result<()> {
296 let client = Client::try_default().await?;
297 let test_specs = discover_tests(&dirname.to_path_buf()).await?;
298 let mut sorted_test_specs = test_specs
299 .into_iter()
300 .fold(HashMap::new(), |mut map, item| {
301 map.entry(item.test_type.clone())
302 .or_insert(Vec::new())
303 .push(item);
304 map
305 });
306 for (_, tests) in &mut sorted_test_specs {
307 tests.sort_by(|lhs, rhs| match (&lhs.ordering, &rhs.ordering) {
308 (Some(ref l), Some(ref r)) => l.cmp(r),
309 (Some(_), None) => cmp::Ordering::Greater,
310 (None, Some(_)) => cmp::Ordering::Less,
311 (None, None) => cmp::Ordering::Equal,
312 });
313 }
314 let mut results: Vec<TestResult> = vec![];
315 log::info!("Running cluster tests");
316 if let Some(cluster_tests) = sorted_test_specs.remove(&TestType::Cluster) {
317 results.append(
318 &mut run_all_tests(
319 client.clone(),
320 cluster_tests,
321 Config::get().cluster.parallel,
322 Config::get().cluster.attempts,
323 )
324 .await?,
325 );
326 }
327 if results.iter().all(|r| r.is_ok()) {
328 log::info!("Running user tests");
329 if let Some(user_tests) = sorted_test_specs.remove(&TestType::User) {
330 results.append(
331 &mut run_all_tests(
332 client.clone(),
333 user_tests,
334 Config::get().user.parallel,
335 Config::get().user.attempts,
336 )
337 .await?,
338 );
339 }
340 } else {
341 log::error!("Skipping user tests after cluster test failed");
342 }
343 if results.is_empty() {
344 return Err(Error::NoTestsFoundError);
345 }
346 let mut success = true;
347 for result in results {
348 log_result(&result);
349 if result.is_err() {
350 success = false;
351 }
352 }
353 success.then_some(()).ok_or(Error::SomeTestsFailedError)
354}
355
356async fn discover_tests(dirname: &PathBuf) -> Result<Vec<TestSpec>> {
357 log::trace!("Discovering tests: {dirname:?}");
358 let mut result: Vec<TestSpec> = vec![];
359 let files = list_files(dirname).await?;
360 if files
361 .iter()
362 .filter_map(|e| e.file_name())
363 .find(|&x| x == "test.yaml")
364 .is_some()
365 {
366 result.push(TestSpec::new_from_file(dirname.clone()).await?);
367 } else {
368 let dirs: Vec<PathBuf> = list_directories(dirname).await?;
369 log::trace!("Descending into {dirs:?}");
370 for dir in dirs {
371 result.append(&mut Box::pin(discover_tests(&dir)).await?);
372 }
373 }
374 Ok(result)
375}