1use reqwest::Client;
2use serde_json::{Map, Value};
3use std::collections::HashMap;
4use std::future::Future;
5use std::pin::Pin;
6use std::time::{Duration, Instant};
7
8use crate::assertions::{evaluate_assertions, has_status_assertion};
9use crate::core::types::{
10 Pipeline, RuntimeEnvGroup, RuntimeSpec, StepExecutionResult, StepRequest, StepResponse,
11};
12use crate::execution::cancel::await_with_cancel;
13use crate::execution::http::{parse_absolute_http_url, parse_method};
14use crate::execution::logging::{log_step_request, log_step_response};
15use crate::template::resolve::resolve_template_variables;
16
17fn noop_request_start_gate<'a>(
18 _: &'a StepRequest,
19) -> Pin<Box<dyn Future<Output = bool> + Send + 'a>> {
20 Box::pin(async { true })
21}
22
23pub async fn execute_pipeline(
24 pipeline: &Pipeline,
25 selected_base_url_key: Option<&str>,
26) -> Vec<StepExecutionResult> {
27 let client = Client::new();
28 execute_pipeline_with_client_runtime_hooks(
29 &client,
30 pipeline,
31 selected_base_url_key,
32 None,
33 None,
34 None,
35 |_| {},
36 |_| {},
37 || false,
38 noop_request_start_gate,
39 )
40 .await
41}
42
43pub async fn execute_pipeline_with_client(
44 client: &Client,
45 pipeline: &Pipeline,
46 selected_base_url_key: Option<&str>,
47) -> Vec<StepExecutionResult> {
48 execute_pipeline_with_client_runtime_hooks(
49 client,
50 pipeline,
51 selected_base_url_key,
52 None,
53 None,
54 None,
55 |_| {},
56 |_| {},
57 || false,
58 noop_request_start_gate,
59 )
60 .await
61}
62
63pub async fn execute_pipeline_with_hooks<FStart, FResult, FCancel>(
64 pipeline: &Pipeline,
65 selected_base_url_key: Option<&str>,
66 on_step_start: FStart,
67 on_step_result: FResult,
68 should_cancel: FCancel,
69) -> Vec<StepExecutionResult>
70where
71 FStart: FnMut(&str),
72 FResult: FnMut(&StepExecutionResult),
73 FCancel: FnMut() -> bool,
74{
75 let client = Client::new();
76 execute_pipeline_with_client_runtime_hooks(
77 &client,
78 pipeline,
79 selected_base_url_key,
80 None,
81 None,
82 None,
83 on_step_start,
84 on_step_result,
85 should_cancel,
86 noop_request_start_gate,
87 )
88 .await
89}
90
91pub async fn execute_pipeline_with_specs_hooks<FStart, FResult, FCancel>(
92 pipeline: &Pipeline,
93 selected_base_url_key: Option<&str>,
94 specs: Option<&[RuntimeSpec]>,
95 on_step_start: FStart,
96 on_step_result: FResult,
97 should_cancel: FCancel,
98) -> Vec<StepExecutionResult>
99where
100 FStart: FnMut(&str),
101 FResult: FnMut(&StepExecutionResult),
102 FCancel: FnMut() -> bool,
103{
104 let client = Client::new();
105 execute_pipeline_with_client_runtime_hooks(
106 &client,
107 pipeline,
108 selected_base_url_key,
109 specs,
110 None,
111 None,
112 on_step_start,
113 on_step_result,
114 should_cancel,
115 noop_request_start_gate,
116 )
117 .await
118}
119
120pub async fn execute_pipeline_with_runtime_hooks<FStart, FResult, FCancel>(
121 pipeline: &Pipeline,
122 selected_base_url_key: Option<&str>,
123 specs: Option<&[RuntimeSpec]>,
124 env_groups: Option<&[RuntimeEnvGroup]>,
125 selected_env_group_slug: Option<&str>,
126 on_step_start: FStart,
127 on_step_result: FResult,
128 should_cancel: FCancel,
129) -> Vec<StepExecutionResult>
130where
131 FStart: FnMut(&str),
132 FResult: FnMut(&StepExecutionResult),
133 FCancel: FnMut() -> bool,
134{
135 let client = Client::new();
136 execute_pipeline_with_client_runtime_hooks(
137 &client,
138 pipeline,
139 selected_base_url_key,
140 specs,
141 env_groups,
142 selected_env_group_slug,
143 on_step_start,
144 on_step_result,
145 should_cancel,
146 noop_request_start_gate,
147 )
148 .await
149}
150
151pub async fn execute_pipeline_with_runtime_request_gate<FStart, FResult, FCancel, FGate>(
152 pipeline: &Pipeline,
153 selected_base_url_key: Option<&str>,
154 specs: Option<&[RuntimeSpec]>,
155 env_groups: Option<&[RuntimeEnvGroup]>,
156 selected_env_group_slug: Option<&str>,
157 on_step_start: FStart,
158 on_step_result: FResult,
159 should_cancel: FCancel,
160 on_request_start: FGate,
161) -> Vec<StepExecutionResult>
162where
163 FStart: FnMut(&str),
164 FResult: FnMut(&StepExecutionResult),
165 FCancel: FnMut() -> bool,
166 FGate: for<'a> FnMut(&'a StepRequest) -> Pin<Box<dyn Future<Output = bool> + Send + 'a>> + Send,
167{
168 let client = Client::new();
169 execute_pipeline_with_client_runtime_hooks(
170 &client,
171 pipeline,
172 selected_base_url_key,
173 specs,
174 env_groups,
175 selected_env_group_slug,
176 on_step_start,
177 on_step_result,
178 should_cancel,
179 on_request_start,
180 )
181 .await
182}
183
184pub async fn execute_pipeline_with_client_runtime_request_gate<FStart, FResult, FCancel, FGate>(
185 client: &Client,
186 pipeline: &Pipeline,
187 selected_base_url_key: Option<&str>,
188 specs: Option<&[RuntimeSpec]>,
189 env_groups: Option<&[RuntimeEnvGroup]>,
190 selected_env_group_slug: Option<&str>,
191 on_step_start: FStart,
192 on_step_result: FResult,
193 should_cancel: FCancel,
194 on_request_start: FGate,
195) -> Vec<StepExecutionResult>
196where
197 FStart: FnMut(&str),
198 FResult: FnMut(&StepExecutionResult),
199 FCancel: FnMut() -> bool,
200 FGate: for<'a> FnMut(&'a StepRequest) -> Pin<Box<dyn Future<Output = bool> + Send + 'a>> + Send,
201{
202 execute_pipeline_with_client_runtime_hooks(
203 client,
204 pipeline,
205 selected_base_url_key,
206 specs,
207 env_groups,
208 selected_env_group_slug,
209 on_step_start,
210 on_step_result,
211 should_cancel,
212 on_request_start,
213 )
214 .await
215}
216
217pub async fn execute_pipeline_with_client_hooks<FStart, FResult, FCancel>(
218 client: &Client,
219 pipeline: &Pipeline,
220 selected_base_url_key: Option<&str>,
221 on_step_start: FStart,
222 on_step_result: FResult,
223 should_cancel: FCancel,
224) -> Vec<StepExecutionResult>
225where
226 FStart: FnMut(&str),
227 FResult: FnMut(&StepExecutionResult),
228 FCancel: FnMut() -> bool,
229{
230 execute_pipeline_with_client_runtime_hooks(
231 client,
232 pipeline,
233 selected_base_url_key,
234 None,
235 None,
236 None,
237 on_step_start,
238 on_step_result,
239 should_cancel,
240 noop_request_start_gate,
241 )
242 .await
243}
244
245fn finalize_step_result<FResult>(
246 step_id: &str,
247 result: StepExecutionResult,
248 context: &mut HashMap<String, StepExecutionResult>,
249 results: &mut Vec<StepExecutionResult>,
250 on_step_result: &mut FResult,
251) -> bool
252where
253 FResult: FnMut(&StepExecutionResult),
254{
255 let should_stop_pipeline = result.status == "error";
256 context.insert(step_id.to_owned(), result.clone());
257 on_step_result(&result);
258 results.push(result);
259 should_stop_pipeline
260}
261
262async fn execute_pipeline_with_client_runtime_hooks<FStart, FResult, FCancel, FGate>(
263 client: &Client,
264 pipeline: &Pipeline,
265 selected_base_url_key: Option<&str>,
266 specs: Option<&[RuntimeSpec]>,
267 env_groups: Option<&[RuntimeEnvGroup]>,
268 selected_env_group_slug: Option<&str>,
269 on_step_start: FStart,
270 on_step_result: FResult,
271 should_cancel: FCancel,
272 on_request_start: FGate,
273) -> Vec<StepExecutionResult>
274where
275 FStart: FnMut(&str),
276 FResult: FnMut(&StepExecutionResult),
277 FCancel: FnMut() -> bool,
278 FGate: for<'a> FnMut(&'a StepRequest) -> Pin<Box<dyn Future<Output = bool> + Send + 'a>> + Send,
279{
280 execute_pipeline_with_client_runtime_hooks_from_index(
281 client,
282 pipeline,
283 selected_base_url_key,
284 0,
285 HashMap::new(),
286 specs,
287 env_groups,
288 selected_env_group_slug,
289 on_step_start,
290 on_step_result,
291 should_cancel,
292 on_request_start,
293 )
294 .await
295}
296
297pub async fn execute_pipeline_from_step_with_client_runtime_hooks<FStart, FResult, FCancel, FGate>(
298 client: &Client,
299 pipeline: &Pipeline,
300 start_step_id: &str,
301 initial_context: HashMap<String, StepExecutionResult>,
302 specs: Option<&[RuntimeSpec]>,
303 env_groups: Option<&[RuntimeEnvGroup]>,
304 selected_env_group_slug: Option<&str>,
305 on_step_start: FStart,
306 on_step_result: FResult,
307 should_cancel: FCancel,
308 on_request_start: FGate,
309) -> Vec<StepExecutionResult>
310where
311 FStart: FnMut(&str),
312 FResult: FnMut(&StepExecutionResult),
313 FCancel: FnMut() -> bool,
314 FGate: for<'a> FnMut(&'a StepRequest) -> Pin<Box<dyn Future<Output = bool> + Send + 'a>> + Send,
315{
316 let start_index = pipeline
317 .steps
318 .iter()
319 .position(|step| step.id == start_step_id)
320 .unwrap_or(pipeline.steps.len());
321
322 execute_pipeline_with_client_runtime_hooks_from_index(
323 client,
324 pipeline,
325 None,
326 start_index,
327 initial_context,
328 specs,
329 env_groups,
330 selected_env_group_slug,
331 on_step_start,
332 on_step_result,
333 should_cancel,
334 on_request_start,
335 )
336 .await
337}
338
339async fn execute_pipeline_with_client_runtime_hooks_from_index<FStart, FResult, FCancel, FGate>(
340 client: &Client,
341 pipeline: &Pipeline,
342 _selected_base_url_key: Option<&str>,
343 start_index: usize,
344 initial_context: HashMap<String, StepExecutionResult>,
345 specs: Option<&[RuntimeSpec]>,
346 env_groups: Option<&[RuntimeEnvGroup]>,
347 selected_env_group_slug: Option<&str>,
348 mut on_step_start: FStart,
349 mut on_step_result: FResult,
350 mut should_cancel: FCancel,
351 mut on_request_start: FGate,
352) -> Vec<StepExecutionResult>
353where
354 FStart: FnMut(&str),
355 FResult: FnMut(&StepExecutionResult),
356 FCancel: FnMut() -> bool,
357 FGate: for<'a> FnMut(&'a StepRequest) -> Pin<Box<dyn Future<Output = bool> + Send + 'a>> + Send,
358{
359 let mut context = initial_context;
360 let mut results = Vec::with_capacity(pipeline.steps.len().saturating_sub(start_index));
361
362 'steps: for step in pipeline.steps.iter().skip(start_index) {
363 if should_cancel() {
364 break;
365 }
366
367 let delay_ms = step.delay.unwrap_or(0);
368 let max_attempts = step.retry.unwrap_or(0).saturating_add(1);
369
370 for attempt in 1..=max_attempts {
371 if should_cancel() {
372 break 'steps;
373 }
374
375 if delay_ms > 0 {
376 let Some(_) = await_with_cancel(
377 tokio::time::sleep(Duration::from_millis(delay_ms)),
378 &mut should_cancel,
379 )
380 .await
381 else {
382 break 'steps;
383 };
384 }
385
386 on_step_start(&step.id);
387 let start = Instant::now();
388
389 let resolved_url = resolve_template_variables(
390 &Value::String(step.url.clone()),
391 &context,
392 specs,
393 env_groups,
394 selected_env_group_slug,
395 )
396 .as_str()
397 .unwrap_or(step.url.as_str())
398 .to_owned();
399
400 let resolved_headers = resolve_template_variables(
401 &serde_json::to_value(&step.headers).unwrap_or(Value::Object(Map::new())),
402 &context,
403 specs,
404 env_groups,
405 selected_env_group_slug,
406 )
407 .as_object()
408 .map(|m| {
409 m.iter()
410 .map(|(k, v)| (k.clone(), v.as_str().unwrap_or_default().to_owned()))
411 .collect::<HashMap<String, String>>()
412 })
413 .unwrap_or_default();
414
415 let resolved_body = step.body.as_ref().map(|body| {
416 resolve_template_variables(
417 body,
418 &context,
419 specs,
420 env_groups,
421 selected_env_group_slug,
422 )
423 });
424
425 let method = match parse_method(&step.method) {
426 Ok(method) => method,
427 Err(err) => {
428 let result = StepExecutionResult {
429 step_id: step.id.clone(),
430 status: "error".to_owned(),
431 request: Some(StepRequest {
432 method: step.method.clone(),
433 url: resolved_url.clone(),
434 headers: resolved_headers.clone(),
435 body: resolved_body.clone(),
436 }),
437 response: None,
438 error: Some(err),
439 duration: Some(start.elapsed().as_millis()),
440 attempts: if max_attempts > 1 {
441 Some(attempt)
442 } else {
443 None
444 },
445 attempt: Some(attempt),
446 max_attempts: Some(max_attempts),
447 assert_results: None,
448 };
449 log_step_response(&step.id, None, result.error.as_deref());
450
451 if attempt < max_attempts {
452 continue;
453 }
454 if finalize_step_result(
455 &step.id,
456 result,
457 &mut context,
458 &mut results,
459 &mut on_step_result,
460 ) {
461 break 'steps;
462 }
463 break;
464 }
465 };
466
467 let url = match parse_absolute_http_url(&resolved_url) {
468 Ok(url) => url,
469 Err(err) => {
470 let result = StepExecutionResult {
471 step_id: step.id.clone(),
472 status: "error".to_owned(),
473 request: Some(StepRequest {
474 method: step.method.clone(),
475 url: resolved_url.clone(),
476 headers: resolved_headers.clone(),
477 body: resolved_body.clone(),
478 }),
479 response: None,
480 error: Some(err),
481 duration: Some(start.elapsed().as_millis()),
482 attempts: if max_attempts > 1 {
483 Some(attempt)
484 } else {
485 None
486 },
487 attempt: Some(attempt),
488 max_attempts: Some(max_attempts),
489 assert_results: None,
490 };
491 log_step_response(&step.id, None, result.error.as_deref());
492
493 if attempt < max_attempts {
494 continue;
495 }
496 if finalize_step_result(
497 &step.id,
498 result,
499 &mut context,
500 &mut results,
501 &mut on_step_result,
502 ) {
503 break 'steps;
504 }
505 break;
506 }
507 };
508
509 let mut request_builder = client.request(method, url);
510
511 for (k, v) in &resolved_headers {
512 request_builder = request_builder.header(k, v);
513 }
514
515 if let Some(body) = &resolved_body {
516 if !step.method.eq_ignore_ascii_case("GET")
517 && !step.method.eq_ignore_ascii_case("HEAD")
518 {
519 request_builder = request_builder.json(body);
520 }
521 }
522
523 let request = StepRequest {
524 method: step.method.clone(),
525 url: resolved_url.clone(),
526 headers: resolved_headers.clone(),
527 body: resolved_body.clone(),
528 };
529 log_step_request(&step.id, &request);
530 let request_admitted = on_request_start(&request).await;
531 if !request_admitted {
532 break 'steps;
533 }
534 if should_cancel() {
535 break 'steps;
536 }
537
538 let Some(send_result) =
539 await_with_cancel(request_builder.send(), &mut should_cancel).await
540 else {
541 break 'steps;
542 };
543
544 match send_result {
545 Ok(response) => {
546 let status = response.status();
547 let status_text = status.canonical_reason().unwrap_or("").to_owned();
548 let mut headers = HashMap::new();
549 for (k, v) in response.headers() {
550 headers.insert(
551 k.as_str().to_owned(),
552 v.to_str().unwrap_or_default().to_owned(),
553 );
554 }
555
556 let content_type = headers
557 .iter()
558 .find(|(k, _)| k.eq_ignore_ascii_case("content-type"))
559 .map(|(_, v)| v.as_str())
560 .unwrap_or("");
561
562 let body = if content_type.contains("application/json") {
563 let Some(body_result) =
564 await_with_cancel(response.json::<Value>(), &mut should_cancel).await
565 else {
566 break 'steps;
567 };
568 match body_result {
569 Ok(value) => value,
570 Err(err) => {
571 let result = StepExecutionResult {
572 step_id: step.id.clone(),
573 status: "error".to_owned(),
574 request: Some(request),
575 response: None,
576 error: Some(err.to_string()),
577 duration: Some(start.elapsed().as_millis()),
578 attempts: if max_attempts > 1 {
579 Some(attempt)
580 } else {
581 None
582 },
583 attempt: Some(attempt),
584 max_attempts: Some(max_attempts),
585 assert_results: None,
586 };
587 log_step_response(&step.id, None, result.error.as_deref());
588
589 if attempt < max_attempts {
590 continue;
591 }
592 if finalize_step_result(
593 &step.id,
594 result,
595 &mut context,
596 &mut results,
597 &mut on_step_result,
598 ) {
599 break 'steps;
600 }
601 break;
602 }
603 }
604 } else {
605 let Some(body_result) =
606 await_with_cancel(response.text(), &mut should_cancel).await
607 else {
608 break 'steps;
609 };
610 Value::String(body_result.unwrap_or_default())
611 };
612
613 let http_error = (!status.is_success())
614 .then(|| format!("HTTP {} {}", status.as_u16(), status_text));
615 let mut result = StepExecutionResult {
616 step_id: step.id.clone(),
617 status: "success".to_owned(),
618 request: Some(request),
619 response: Some(StepResponse {
620 status: status.as_u16(),
621 status_text: status_text.clone(),
622 headers,
623 body,
624 }),
625 error: http_error.clone(),
626 duration: Some(start.elapsed().as_millis()),
627 attempts: if max_attempts > 1 {
628 Some(attempt)
629 } else {
630 None
631 },
632 attempt: Some(attempt),
633 max_attempts: Some(max_attempts),
634 assert_results: None,
635 };
636
637 let has_status_assert = has_status_assertion(step);
638 let assert_results = evaluate_assertions(
639 step,
640 &result,
641 &context,
642 specs,
643 env_groups,
644 selected_env_group_slug,
645 );
646 let assertion_failed = assert_results.iter().any(|r| !r.passed);
647 if !assert_results.is_empty() {
648 if assertion_failed {
649 result.status = "error".to_owned();
650 let failed_count = assert_results.iter().filter(|r| !r.passed).count();
651 result.error = Some(match result.error {
652 Some(err) => {
653 format!("{} | {} assertion(s) failed", err, failed_count)
654 }
655 None => format!("{} assertion(s) failed", failed_count),
656 });
657 } else if http_error.is_some() {
658 if has_status_assert {
659 result.status = "success".to_owned();
660 result.error = None;
661 } else {
662 result.status = "error".to_owned();
663 }
664 }
665 result.assert_results = Some(assert_results);
666 } else if http_error.is_some() {
667 result.status = "error".to_owned();
668 }
669
670 log_step_response(&step.id, result.response.as_ref(), result.error.as_deref());
671
672 if assertion_failed && attempt < max_attempts {
673 continue;
674 }
675
676 if finalize_step_result(
677 &step.id,
678 result,
679 &mut context,
680 &mut results,
681 &mut on_step_result,
682 ) {
683 break 'steps;
684 }
685 break;
686 }
687 Err(err) => {
688 let result = StepExecutionResult {
689 step_id: step.id.clone(),
690 status: "error".to_owned(),
691 request: Some(request),
692 response: None,
693 error: Some(err.to_string()),
694 duration: Some(start.elapsed().as_millis()),
695 attempts: if max_attempts > 1 {
696 Some(attempt)
697 } else {
698 None
699 },
700 attempt: Some(attempt),
701 max_attempts: Some(max_attempts),
702 assert_results: None,
703 };
704 log_step_response(&step.id, None, result.error.as_deref());
705
706 if attempt < max_attempts {
707 continue;
708 }
709
710 if finalize_step_result(
711 &step.id,
712 result,
713 &mut context,
714 &mut results,
715 &mut on_step_result,
716 ) {
717 break 'steps;
718 }
719 break;
720 }
721 }
722 }
723 }
724
725 results
726}
727
728#[cfg(test)]
729mod tests {
730 use super::*;
731 use crate::core::types::{
732 Pipeline, PipelineStep, RuntimeSpec, StepAssertion, StepRequest, StepResponse,
733 };
734 use httpmock::Method::{GET, POST};
735 use httpmock::MockServer;
736 use serde_json::json;
737 use std::sync::Arc;
738 use std::sync::atomic::{AtomicBool, Ordering};
739
740 #[tokio::test]
741 async fn executes_from_step_with_seeded_previous_results() {
742 let server = MockServer::start_async().await;
743 let protected = server
744 .mock_async(|when, then| {
745 when.method(GET)
746 .path("/protected")
747 .header("authorization", "Bearer abc123");
748 then.status(200)
749 .header("content-type", "application/json")
750 .json_body(json!({ "ok": true }));
751 })
752 .await;
753
754 let pipeline = Pipeline {
755 id: Some("pipe-1".to_owned()),
756 name: "Pipe".to_owned(),
757 description: None,
758 steps: vec![
759 PipelineStep {
760 id: "login".to_owned(),
761 name: "Login".to_owned(),
762 description: None,
763 method: "POST".to_owned(),
764 url: format!("{}/login", server.base_url()),
765 headers: HashMap::new(),
766 body: None,
767 operation_id: None,
768 delay: None,
769 retry: None,
770 asserts: Vec::new(),
771 },
772 PipelineStep {
773 id: "protected".to_owned(),
774 name: "Protected".to_owned(),
775 description: None,
776 method: "GET".to_owned(),
777 url: format!("{}/protected", server.base_url()),
778 headers: HashMap::from([(
779 "Authorization".to_owned(),
780 "Bearer {{steps.login.token}}".to_owned(),
781 )]),
782 body: None,
783 operation_id: None,
784 delay: None,
785 retry: None,
786 asserts: Vec::new(),
787 },
788 ],
789 };
790
791 let seeded = HashMap::from([(
792 "login".to_owned(),
793 StepExecutionResult {
794 step_id: "login".to_owned(),
795 status: "success".to_owned(),
796 request: Some(StepRequest {
797 method: "POST".to_owned(),
798 url: format!("{}/login", server.base_url()),
799 headers: HashMap::new(),
800 body: None,
801 }),
802 response: Some(StepResponse {
803 status: 200,
804 status_text: "OK".to_owned(),
805 headers: HashMap::new(),
806 body: json!({ "token": "abc123" }),
807 }),
808 error: None,
809 duration: Some(1),
810 attempts: Some(1),
811 attempt: Some(1),
812 max_attempts: Some(1),
813 assert_results: None,
814 },
815 )]);
816
817 let mut started = Vec::new();
818 let results = execute_pipeline_from_step_with_client_runtime_hooks(
819 &reqwest::Client::new(),
820 &pipeline,
821 "protected",
822 seeded,
823 None,
824 None,
825 None,
826 |step_id| started.push(step_id.to_owned()),
827 |_| {},
828 || false,
829 |_| Box::pin(async { true }),
830 )
831 .await;
832
833 assert_eq!(started, vec!["protected"]);
834 assert_eq!(results.len(), 1);
835 assert_eq!(results[0].step_id, "protected");
836 assert_eq!(results[0].status, "success");
837 protected.assert_async().await;
838 }
839
840 #[tokio::test]
841 async fn executes_pipeline_with_interpolation_and_assertions() {
842 let server = MockServer::start_async().await;
843
844 let create_user = server
845 .mock_async(|when, then| {
846 when.method(POST).path("/users");
847 then.status(201)
848 .header("content-type", "application/json")
849 .json_body(
850 json!({ "id": "u-1", "token": "token-123", "email": "john@example.com" }),
851 );
852 })
853 .await;
854
855 let get_user = server
856 .mock_async(|when, then| {
857 when.method(GET).path("/users/u-1");
858 then.status(200)
859 .header("content-type", "application/json")
860 .json_body(json!({ "id": "u-1", "email": "john@example.com" }));
861 })
862 .await;
863
864 let pipeline = Pipeline {
865 id: None,
866 name: "User flow".to_owned(),
867 description: Some("Pipeline test".to_owned()),
868 steps: vec![
869 PipelineStep {
870 id: "create_user".to_owned(),
871 name: "Create".to_owned(),
872 description: None,
873 method: "POST".to_owned(),
874 url: format!("{}/users", server.base_url()),
875 headers: HashMap::from([(
876 "content-type".to_owned(),
877 "application/json".to_owned(),
878 )]),
879 body: Some(json!({ "name": "{{helpers.name}}" })),
880 operation_id: None,
881 delay: None,
882 retry: None,
883 asserts: vec![
884 StepAssertion {
885 field: "status".to_owned(),
886 operator: "equals".to_owned(),
887 expected: Some("201".to_owned()),
888 },
889 StepAssertion {
890 field: "body.id".to_owned(),
891 operator: "exists".to_owned(),
892 expected: None,
893 },
894 ],
895 },
896 PipelineStep {
897 id: "get_user".to_owned(),
898 name: "Get".to_owned(),
899 description: None,
900 method: "GET".to_owned(),
901 url: format!("{}/users/{{{{steps.create_user.id}}}}", server.base_url()),
902 headers: HashMap::new(),
903 body: None,
904 operation_id: None,
905 delay: None,
906 retry: None,
907 asserts: vec![
908 StepAssertion {
909 field: "status".to_owned(),
910 operator: "equals".to_owned(),
911 expected: Some("{{steps.create_user.status}}".to_owned()),
912 },
913 StepAssertion {
914 field: "body.email".to_owned(),
915 operator: "contains".to_owned(),
916 expected: Some("@".to_owned()),
917 },
918 ],
919 },
920 ],
921 };
922
923 let results = execute_pipeline(&pipeline, None).await;
924
925 create_user.assert_async().await;
926 get_user.assert_async().await;
927
928 assert_eq!(results.len(), 2);
929 assert_eq!(results[0].status, "success");
930 assert_eq!(results[1].status, "error");
931 assert!(
932 results[1]
933 .error
934 .as_ref()
935 .is_some_and(|err| err.contains("assertion(s) failed"))
936 );
937 }
938
939 #[tokio::test]
940 async fn resolves_spec_url_variable() {
941 let server_dev = MockServer::start_async().await;
942 let server_prod = MockServer::start_async().await;
943
944 let _dev = server_dev
945 .mock_async(|when, then| {
946 when.method(GET).path("/health");
947 then.status(200).body("dev");
948 })
949 .await;
950
951 let prod = server_prod
952 .mock_async(|when, then| {
953 when.method(GET).path("/health");
954 then.status(200).body("prod");
955 })
956 .await;
957
958 let pipeline = Pipeline {
959 id: None,
960 name: "Env".to_owned(),
961 description: None,
962 steps: vec![PipelineStep {
963 id: "health".to_owned(),
964 name: "Health".to_owned(),
965 description: None,
966 method: "GET".to_owned(),
967 url: "{{specs.users-api.url.prod}}/health".to_owned(),
968 headers: HashMap::new(),
969 body: None,
970 operation_id: None,
971 delay: None,
972 retry: None,
973 asserts: vec![],
974 }],
975 };
976 let specs = [RuntimeSpec {
977 slug: "users-api".to_owned(),
978 servers: HashMap::from([
979 ("dev".to_owned(), server_dev.base_url()),
980 ("prod".to_owned(), server_prod.base_url()),
981 ]),
982 }];
983
984 let results = execute_pipeline_with_specs_hooks(
985 &pipeline,
986 Some("dev"),
987 Some(&specs),
988 |_| {},
989 |_| {},
990 || false,
991 )
992 .await;
993
994 prod.assert_async().await;
995 assert_eq!(results.len(), 1);
996 assert_eq!(results[0].status, "success");
997 assert_eq!(
998 results[0].response.as_ref().map(|r| r.body.clone()),
999 Some(Value::String("prod".to_owned()))
1000 );
1001 }
1002
1003 #[tokio::test]
1004 async fn request_gate_can_decline_before_http_send() {
1005 let server = MockServer::start_async().await;
1006 let call = server
1007 .mock_async(|when, then| {
1008 when.method(GET).path("/blocked");
1009 then.status(200).body("should not be called");
1010 })
1011 .await;
1012
1013 let pipeline = Pipeline {
1014 id: None,
1015 name: "Gate".to_owned(),
1016 description: None,
1017 steps: vec![PipelineStep {
1018 id: "blocked".to_owned(),
1019 name: "Blocked".to_owned(),
1020 description: None,
1021 method: "GET".to_owned(),
1022 url: format!("{}/blocked", server.base_url()),
1023 headers: HashMap::new(),
1024 body: None,
1025 operation_id: None,
1026 delay: None,
1027 retry: None,
1028 asserts: vec![],
1029 }],
1030 };
1031
1032 let results = execute_pipeline_with_runtime_request_gate(
1033 &pipeline,
1034 None,
1035 None,
1036 None,
1037 None,
1038 |_| {},
1039 |_| {},
1040 || false,
1041 |_| Box::pin(async { false }),
1042 )
1043 .await;
1044
1045 assert!(results.is_empty());
1046 call.assert_calls_async(0).await;
1047 }
1048
1049 #[tokio::test]
1050 async fn client_runtime_request_gate_uses_provided_client() {
1051 let server = MockServer::start_async().await;
1052 let call = server
1053 .mock_async(|when, then| {
1054 when.method(GET).path("/shared-client");
1055 then.status(200).body("ok");
1056 })
1057 .await;
1058
1059 let pipeline = Pipeline {
1060 id: None,
1061 name: "Shared client".to_owned(),
1062 description: None,
1063 steps: vec![PipelineStep {
1064 id: "shared".to_owned(),
1065 name: "Shared".to_owned(),
1066 description: None,
1067 method: "GET".to_owned(),
1068 url: format!("{}/shared-client", server.base_url()),
1069 headers: HashMap::new(),
1070 body: None,
1071 operation_id: None,
1072 delay: None,
1073 retry: None,
1074 asserts: vec![],
1075 }],
1076 };
1077
1078 let client = Client::new();
1079 let results = execute_pipeline_with_client_runtime_request_gate(
1080 &client,
1081 &pipeline,
1082 None,
1083 None,
1084 None,
1085 None,
1086 |_| {},
1087 |_| {},
1088 || false,
1089 |_| Box::pin(async { true }),
1090 )
1091 .await;
1092
1093 assert_eq!(results.len(), 1);
1094 assert_eq!(results[0].status, "success");
1095 call.assert_calls_async(1).await;
1096 }
1097
1098 #[tokio::test]
1099 async fn marks_step_as_error_when_assertion_fails() {
1100 let server = MockServer::start_async().await;
1101
1102 let call = server
1103 .mock_async(|when, then| {
1104 when.method(GET).path("/status");
1105 then.status(200)
1106 .header("content-type", "application/json")
1107 .json_body(json!({ "ok": true }));
1108 })
1109 .await;
1110
1111 let pipeline = Pipeline {
1112 id: None,
1113 name: "Assert".to_owned(),
1114 description: None,
1115 steps: vec![PipelineStep {
1116 id: "status".to_owned(),
1117 name: "Status".to_owned(),
1118 description: None,
1119 method: "GET".to_owned(),
1120 url: format!("{}/status", server.base_url()),
1121 headers: HashMap::new(),
1122 body: None,
1123 operation_id: None,
1124 delay: None,
1125 retry: None,
1126 asserts: vec![StepAssertion {
1127 field: "status".to_owned(),
1128 operator: "equals".to_owned(),
1129 expected: Some("201".to_owned()),
1130 }],
1131 }],
1132 };
1133
1134 let results = execute_pipeline(&pipeline, None).await;
1135
1136 call.assert_async().await;
1137 assert_eq!(results.len(), 1);
1138 assert_eq!(results[0].status, "error");
1139 assert!(
1140 results[0]
1141 .error
1142 .as_ref()
1143 .is_some_and(|err| err.contains("1 assertion(s) failed"))
1144 );
1145 }
1146
1147 #[tokio::test]
1148 async fn stops_pipeline_after_step_failure() {
1149 let server = MockServer::start_async().await;
1150
1151 let failing_step = server
1152 .mock_async(|when, then| {
1153 when.method(GET).path("/fails");
1154 then.status(500).body("internal error");
1155 })
1156 .await;
1157
1158 let next_step = server
1159 .mock_async(|when, then| {
1160 when.method(GET).path("/next");
1161 then.status(200).body("ok");
1162 })
1163 .await;
1164
1165 let pipeline = Pipeline {
1166 id: None,
1167 name: "Stop on failure".to_owned(),
1168 description: None,
1169 steps: vec![
1170 PipelineStep {
1171 id: "fails".to_owned(),
1172 name: "Fails".to_owned(),
1173 description: None,
1174 method: "GET".to_owned(),
1175 url: format!("{}/fails", server.base_url()),
1176 headers: HashMap::new(),
1177 body: None,
1178 operation_id: None,
1179 delay: None,
1180 retry: None,
1181 asserts: vec![StepAssertion {
1182 field: "status".to_owned(),
1183 operator: "equals".to_owned(),
1184 expected: Some("201".to_owned()),
1185 }],
1186 },
1187 PipelineStep {
1188 id: "next".to_owned(),
1189 name: "Next".to_owned(),
1190 description: None,
1191 method: "GET".to_owned(),
1192 url: format!("{}/next", server.base_url()),
1193 headers: HashMap::new(),
1194 body: None,
1195 operation_id: None,
1196 delay: None,
1197 retry: None,
1198 asserts: vec![],
1199 },
1200 ],
1201 };
1202
1203 let results = execute_pipeline(&pipeline, None).await;
1204
1205 failing_step.assert_async().await;
1206 assert_eq!(next_step.calls_async().await, 0);
1207 assert_eq!(results.len(), 1);
1208 assert_eq!(results[0].step_id, "fails");
1209 assert_eq!(results[0].status, "error");
1210 assert!(
1211 results[0]
1212 .error
1213 .as_ref()
1214 .is_some_and(|err| err.contains("HTTP 500") && err.contains("assertion(s) failed"))
1215 );
1216 }
1217
1218 #[tokio::test]
1219 async fn executes_create_user_and_send_email_case_from_json_payload() {
1220 let server = MockServer::start_async().await;
1221
1222 let create_user = server
1223 .mock_async(|when, then| {
1224 when.method(POST).path("/users");
1225 then.status(201)
1226 .header("content-type", "application/json")
1227 .json_body(json!({
1228 "$id": "usr-100",
1229 "name": "John Doe",
1230 "email": "john@example.com"
1231 }));
1232 })
1233 .await;
1234
1235 let send_email = server
1236 .mock_async(|when, then| {
1237 when.method(POST)
1238 .path("/emails")
1239 .json_body(json!({ "to": "john@example.com", "name": "John Doe" }));
1240 then.status(201)
1241 .header("content-type", "application/json")
1242 .json_body(json!({ "queued": true }));
1243 })
1244 .await;
1245
1246 let payload = json!({
1247 "name": "Criar Usuário e Enviar Email",
1248 "description": "Pipeline de cadastro completo",
1249 "steps": [
1250 {
1251 "id": "create_user",
1252 "name": "Criar Usuário",
1253 "description": "Cria um novo usuário com dados aleatórios",
1254 "headers": {
1255 "Content-Type": "application/json"
1256 },
1257 "method": "POST",
1258 "url": format!("{}/users", server.base_url()),
1259 "body": {
1260 "id": "{{helpers.uuid}}",
1261 "name": "{{helpers.name}}",
1262 "email": "{{helpers.email}}",
1263 "cpf": "{{helpers.cpf}}"
1264 },
1265 "operationId": "createUser",
1266 "asserts": [
1267 {
1268 "field": "status",
1269 "operator": "equals",
1270 "expected": "201"
1271 },
1272 {
1273 "field": "body.$id",
1274 "operator": "exists"
1275 },
1276 {
1277 "field": "body.email",
1278 "operator": "contains",
1279 "expected": "@"
1280 }
1281 ]
1282 },
1283 {
1284 "id": "send_email",
1285 "name": "Enviar Email de Boas-Vindas",
1286 "description": "Envia email usando dados do step anterior",
1287 "headers": {
1288 "Content-Type": "application/json"
1289 },
1290 "method": "POST",
1291 "url": format!("{}/emails", server.base_url()),
1292 "body": {
1293 "to": "{{steps.create_user.email}}",
1294 "name": "{{steps.create_user.name}}"
1295 },
1296 "asserts": [
1297 {
1298 "field": "status",
1299 "operator": "equals",
1300 "expected": "201"
1301 }
1302 ]
1303 }
1304 ],
1305 "id": "e3045988"
1306 });
1307
1308 let pipeline: Pipeline =
1309 serde_json::from_value(payload).expect("pipeline payload is valid");
1310 let results = execute_pipeline(&pipeline, None).await;
1311
1312 create_user.assert_async().await;
1313 send_email.assert_async().await;
1314
1315 assert_eq!(results.len(), 2);
1316 assert_eq!(results[0].status, "success");
1317 assert_eq!(results[1].status, "success");
1318 assert_eq!(pipeline.id, Some("e3045988".to_owned()));
1319 assert_eq!(
1320 pipeline.steps[0].operation_id.as_deref(),
1321 Some("createUser")
1322 );
1323 assert!(
1324 results[0]
1325 .request
1326 .as_ref()
1327 .and_then(|r| r.body.as_ref())
1328 .and_then(|b| b.get("cpf"))
1329 .and_then(|v| v.as_str())
1330 .is_some_and(|cpf| cpf.len() == 14 && cpf.contains('.') && cpf.contains('-'))
1331 );
1332 }
1333
1334 #[tokio::test]
1335 async fn cancels_in_flight_future_when_cancel_flag_changes() {
1336 let cancelled = Arc::new(AtomicBool::new(false));
1337 let cancelled_writer = Arc::clone(&cancelled);
1338
1339 tokio::spawn(async move {
1340 tokio::time::sleep(Duration::from_millis(30)).await;
1341 cancelled_writer.store(true, Ordering::SeqCst);
1342 });
1343
1344 let mut should_cancel = || cancelled.load(Ordering::SeqCst);
1345 let result = await_with_cancel(
1346 async {
1347 tokio::time::sleep(Duration::from_millis(500)).await;
1348 "done"
1349 },
1350 &mut should_cancel,
1351 )
1352 .await;
1353
1354 assert!(result.is_none());
1355 }
1356
1357 #[tokio::test]
1358 async fn retries_when_assertions_fail() {
1359 let server = MockServer::start_async().await;
1360 let status_call = server
1361 .mock_async(|when, then| {
1362 when.method(GET).path("/status");
1363 then.status(200)
1364 .header("content-type", "application/json")
1365 .json_body(json!({ "ok": false }));
1366 })
1367 .await;
1368
1369 let pipeline = Pipeline {
1370 id: None,
1371 name: "Retry assertions".to_owned(),
1372 description: None,
1373 steps: vec![PipelineStep {
1374 id: "status".to_owned(),
1375 name: "Status".to_owned(),
1376 description: None,
1377 method: "GET".to_owned(),
1378 url: format!("{}/status", server.base_url()),
1379 headers: HashMap::new(),
1380 body: None,
1381 operation_id: None,
1382 delay: Some(0),
1383 retry: Some(2),
1384 asserts: vec![StepAssertion {
1385 field: "body.ok".to_owned(),
1386 operator: "equals".to_owned(),
1387 expected: Some("true".to_owned()),
1388 }],
1389 }],
1390 };
1391
1392 let results = execute_pipeline(&pipeline, None).await;
1393
1394 assert_eq!(results.len(), 1);
1395 assert_eq!(results[0].status, "error");
1396 assert_eq!(results[0].attempt, Some(3));
1397 assert_eq!(results[0].max_attempts, Some(3));
1398 status_call.assert_calls_async(3).await;
1399 }
1400
1401 #[tokio::test]
1402 async fn does_not_retry_on_http_error_without_assertions() {
1403 let server = MockServer::start_async().await;
1404 let call = server
1405 .mock_async(|when, then| {
1406 when.method(GET).path("/fails");
1407 then.status(500).body("internal error");
1408 })
1409 .await;
1410
1411 let pipeline = Pipeline {
1412 id: None,
1413 name: "No retry on HTTP".to_owned(),
1414 description: None,
1415 steps: vec![PipelineStep {
1416 id: "fails".to_owned(),
1417 name: "Fails".to_owned(),
1418 description: None,
1419 method: "GET".to_owned(),
1420 url: format!("{}/fails", server.base_url()),
1421 headers: HashMap::new(),
1422 body: None,
1423 operation_id: None,
1424 delay: Some(0),
1425 retry: Some(5),
1426 asserts: vec![],
1427 }],
1428 };
1429
1430 let results = execute_pipeline(&pipeline, None).await;
1431
1432 assert_eq!(results.len(), 1);
1433 assert_eq!(results[0].status, "error");
1434 assert_eq!(results[0].attempt, Some(1));
1435 assert_eq!(results[0].max_attempts, Some(6));
1436 call.assert_calls_async(1).await;
1437 }
1438
1439 #[tokio::test]
1440 async fn accepts_404_when_status_assert_matches() {
1441 let server = MockServer::start_async().await;
1442 let call = server
1443 .mock_async(|when, then| {
1444 when.method(GET).path("/missing");
1445 then.status(404)
1446 .header("content-type", "application/json")
1447 .json_body(json!({ "message": "not found" }));
1448 })
1449 .await;
1450
1451 let pipeline = Pipeline {
1452 id: None,
1453 name: "Expected 404".to_owned(),
1454 description: None,
1455 steps: vec![PipelineStep {
1456 id: "missing".to_owned(),
1457 name: "Missing".to_owned(),
1458 description: None,
1459 method: "GET".to_owned(),
1460 url: format!("{}/missing", server.base_url()),
1461 headers: HashMap::new(),
1462 body: None,
1463 operation_id: None,
1464 delay: None,
1465 retry: Some(5),
1466 asserts: vec![StepAssertion {
1467 field: "status".to_owned(),
1468 operator: "equals".to_owned(),
1469 expected: Some("404".to_owned()),
1470 }],
1471 }],
1472 };
1473
1474 let results = execute_pipeline(&pipeline, None).await;
1475
1476 assert_eq!(results.len(), 1);
1477 assert_eq!(results[0].status, "success");
1478 assert_eq!(results[0].error, None);
1479 assert_eq!(results[0].attempt, Some(1));
1480 assert_eq!(
1481 results[0]
1482 .response
1483 .as_ref()
1484 .and_then(|response| response.body.get("message"))
1485 .and_then(|value| value.as_str()),
1486 Some("not found")
1487 );
1488 call.assert_calls_async(1).await;
1489 }
1490
1491 #[tokio::test]
1492 async fn accepts_500_when_status_assert_matches() {
1493 let server = MockServer::start_async().await;
1494 let call = server
1495 .mock_async(|when, then| {
1496 when.method(GET).path("/boom");
1497 then.status(500).body("internal error");
1498 })
1499 .await;
1500
1501 let pipeline = Pipeline {
1502 id: None,
1503 name: "Expected 500".to_owned(),
1504 description: None,
1505 steps: vec![PipelineStep {
1506 id: "boom".to_owned(),
1507 name: "Boom".to_owned(),
1508 description: None,
1509 method: "GET".to_owned(),
1510 url: format!("{}/boom", server.base_url()),
1511 headers: HashMap::new(),
1512 body: None,
1513 operation_id: None,
1514 delay: None,
1515 retry: None,
1516 asserts: vec![StepAssertion {
1517 field: "status".to_owned(),
1518 operator: "equals".to_owned(),
1519 expected: Some("500".to_owned()),
1520 }],
1521 }],
1522 };
1523
1524 let results = execute_pipeline(&pipeline, None).await;
1525
1526 assert_eq!(results.len(), 1);
1527 assert_eq!(results[0].status, "success");
1528 assert_eq!(results[0].error, None);
1529 assert_eq!(
1530 results[0]
1531 .response
1532 .as_ref()
1533 .map(|response| response.body.clone()),
1534 Some(Value::String("internal error".to_owned()))
1535 );
1536 call.assert_calls_async(1).await;
1537 }
1538
1539 #[tokio::test]
1540 async fn accepts_array_index_assertions_in_response_body() {
1541 let server = MockServer::start_async().await;
1542 let call = server
1543 .mock_async(|when, then| {
1544 when.method(GET).path("/runtime");
1545 then.status(200)
1546 .header("content-type", "application/json")
1547 .json_body(json!({
1548 "app": {
1549 "status": "ready"
1550 },
1551 "pods": [
1552 {
1553 "podName": "app-keep-manual-123",
1554 "phase": "Running"
1555 }
1556 ],
1557 "containers": [
1558 {
1559 "name": "app-keep-manual"
1560 }
1561 ]
1562 }));
1563 })
1564 .await;
1565
1566 let pipeline = Pipeline {
1567 id: None,
1568 name: "Runtime arrays".to_owned(),
1569 description: None,
1570 steps: vec![PipelineStep {
1571 id: "runtime".to_owned(),
1572 name: "Runtime".to_owned(),
1573 description: None,
1574 method: "GET".to_owned(),
1575 url: format!("{}/runtime", server.base_url()),
1576 headers: HashMap::new(),
1577 body: None,
1578 operation_id: None,
1579 delay: None,
1580 retry: None,
1581 asserts: vec![
1582 StepAssertion {
1583 field: "status".to_owned(),
1584 operator: "equals".to_owned(),
1585 expected: Some("200".to_owned()),
1586 },
1587 StepAssertion {
1588 field: "body.app.status".to_owned(),
1589 operator: "equals".to_owned(),
1590 expected: Some("ready".to_owned()),
1591 },
1592 StepAssertion {
1593 field: "body.pods.0.podName".to_owned(),
1594 operator: "exists".to_owned(),
1595 expected: None,
1596 },
1597 StepAssertion {
1598 field: "body.pods.0.phase".to_owned(),
1599 operator: "equals".to_owned(),
1600 expected: Some("Running".to_owned()),
1601 },
1602 StepAssertion {
1603 field: "body.containers.0.name".to_owned(),
1604 operator: "exists".to_owned(),
1605 expected: None,
1606 },
1607 ],
1608 }],
1609 };
1610
1611 let results = execute_pipeline(&pipeline, None).await;
1612
1613 assert_eq!(results.len(), 1);
1614 assert_eq!(results[0].status, "success");
1615 assert_eq!(results[0].error, None);
1616 assert!(
1617 results[0]
1618 .assert_results
1619 .as_ref()
1620 .is_some_and(|items| items.iter().all(|item| item.passed))
1621 );
1622 call.assert_calls_async(1).await;
1623 }
1624
1625 #[tokio::test]
1626 async fn retries_when_status_assert_fails_on_http_error() {
1627 let server = MockServer::start_async().await;
1628 let call = server
1629 .mock_async(|when, then| {
1630 when.method(GET).path("/missing");
1631 then.status(404).body("not found");
1632 })
1633 .await;
1634
1635 let pipeline = Pipeline {
1636 id: None,
1637 name: "Retry status assert".to_owned(),
1638 description: None,
1639 steps: vec![PipelineStep {
1640 id: "missing".to_owned(),
1641 name: "Missing".to_owned(),
1642 description: None,
1643 method: "GET".to_owned(),
1644 url: format!("{}/missing", server.base_url()),
1645 headers: HashMap::new(),
1646 body: None,
1647 operation_id: None,
1648 delay: None,
1649 retry: Some(2),
1650 asserts: vec![StepAssertion {
1651 field: "status".to_owned(),
1652 operator: "equals".to_owned(),
1653 expected: Some("200".to_owned()),
1654 }],
1655 }],
1656 };
1657
1658 let results = execute_pipeline(&pipeline, None).await;
1659
1660 assert_eq!(results.len(), 1);
1661 assert_eq!(results[0].status, "error");
1662 assert_eq!(results[0].attempt, Some(3));
1663 assert_eq!(results[0].max_attempts, Some(3));
1664 assert!(
1665 results[0]
1666 .error
1667 .as_ref()
1668 .is_some_and(|err| err.contains("HTTP 404"))
1669 );
1670 call.assert_calls_async(3).await;
1671 }
1672
1673 #[tokio::test]
1674 async fn keeps_http_error_when_status_assert_passes_but_body_assert_fails() {
1675 let server = MockServer::start_async().await;
1676 let call = server
1677 .mock_async(|when, then| {
1678 when.method(GET).path("/missing");
1679 then.status(404)
1680 .header("content-type", "application/json")
1681 .json_body(json!({ "message": "not found" }));
1682 })
1683 .await;
1684
1685 let pipeline = Pipeline {
1686 id: None,
1687 name: "HTTP plus body assert failure".to_owned(),
1688 description: None,
1689 steps: vec![PipelineStep {
1690 id: "missing".to_owned(),
1691 name: "Missing".to_owned(),
1692 description: None,
1693 method: "GET".to_owned(),
1694 url: format!("{}/missing", server.base_url()),
1695 headers: HashMap::new(),
1696 body: None,
1697 operation_id: None,
1698 delay: None,
1699 retry: Some(1),
1700 asserts: vec![
1701 StepAssertion {
1702 field: "status".to_owned(),
1703 operator: "equals".to_owned(),
1704 expected: Some("404".to_owned()),
1705 },
1706 StepAssertion {
1707 field: "body.code".to_owned(),
1708 operator: "equals".to_owned(),
1709 expected: Some("x".to_owned()),
1710 },
1711 ],
1712 }],
1713 };
1714
1715 let results = execute_pipeline(&pipeline, None).await;
1716
1717 assert_eq!(results.len(), 1);
1718 assert_eq!(results[0].status, "error");
1719 assert_eq!(results[0].attempt, Some(2));
1720 assert_eq!(results[0].max_attempts, Some(2));
1721 assert!(
1722 results[0]
1723 .error
1724 .as_ref()
1725 .is_some_and(|err| err.contains("HTTP 404") && err.contains("assertion(s) failed"))
1726 );
1727 call.assert_calls_async(2).await;
1728 }
1729
1730 #[tokio::test]
1731 async fn keeps_http_error_without_status_assert_even_if_body_assert_passes() {
1732 let server = MockServer::start_async().await;
1733 let call = server
1734 .mock_async(|when, then| {
1735 when.method(GET).path("/missing");
1736 then.status(404)
1737 .header("content-type", "application/json")
1738 .json_body(json!({ "message": "not found" }));
1739 })
1740 .await;
1741
1742 let pipeline = Pipeline {
1743 id: None,
1744 name: "Body assert only".to_owned(),
1745 description: None,
1746 steps: vec![PipelineStep {
1747 id: "missing".to_owned(),
1748 name: "Missing".to_owned(),
1749 description: None,
1750 method: "GET".to_owned(),
1751 url: format!("{}/missing", server.base_url()),
1752 headers: HashMap::new(),
1753 body: None,
1754 operation_id: None,
1755 delay: None,
1756 retry: Some(3),
1757 asserts: vec![StepAssertion {
1758 field: "body.message".to_owned(),
1759 operator: "equals".to_owned(),
1760 expected: Some("not found".to_owned()),
1761 }],
1762 }],
1763 };
1764
1765 let results = execute_pipeline(&pipeline, None).await;
1766
1767 assert_eq!(results.len(), 1);
1768 assert_eq!(results[0].status, "error");
1769 assert_eq!(results[0].attempt, Some(1));
1770 assert_eq!(results[0].max_attempts, Some(4));
1771 assert!(
1772 results[0]
1773 .error
1774 .as_ref()
1775 .is_some_and(|err| err.contains("HTTP 404"))
1776 );
1777 call.assert_calls_async(1).await;
1778 }
1779
1780 #[tokio::test]
1781 async fn accepts_http_error_when_status_assert_uses_other_operator() {
1782 let server = MockServer::start_async().await;
1783 let call = server
1784 .mock_async(|when, then| {
1785 when.method(GET).path("/missing");
1786 then.status(404).body("not found");
1787 })
1788 .await;
1789
1790 let pipeline = Pipeline {
1791 id: None,
1792 name: "Expected non-200".to_owned(),
1793 description: None,
1794 steps: vec![PipelineStep {
1795 id: "missing".to_owned(),
1796 name: "Missing".to_owned(),
1797 description: None,
1798 method: "GET".to_owned(),
1799 url: format!("{}/missing", server.base_url()),
1800 headers: HashMap::new(),
1801 body: None,
1802 operation_id: None,
1803 delay: None,
1804 retry: None,
1805 asserts: vec![StepAssertion {
1806 field: "status".to_owned(),
1807 operator: "not_equals".to_owned(),
1808 expected: Some("200".to_owned()),
1809 }],
1810 }],
1811 };
1812
1813 let results = execute_pipeline(&pipeline, None).await;
1814
1815 assert_eq!(results.len(), 1);
1816 assert_eq!(results[0].status, "success");
1817 assert_eq!(results[0].error, None);
1818 call.assert_calls_async(1).await;
1819 }
1820
1821 #[tokio::test]
1822 async fn applies_delay_before_each_attempt() {
1823 let server = MockServer::start_async().await;
1824 let call = server
1825 .mock_async(|when, then| {
1826 when.method(GET).path("/delayed");
1827 then.status(200)
1828 .header("content-type", "application/json")
1829 .json_body(json!({ "ok": false }));
1830 })
1831 .await;
1832
1833 let pipeline = Pipeline {
1834 id: None,
1835 name: "Delay before attempts".to_owned(),
1836 description: None,
1837 steps: vec![PipelineStep {
1838 id: "delayed".to_owned(),
1839 name: "Delayed".to_owned(),
1840 description: None,
1841 method: "GET".to_owned(),
1842 url: format!("{}/delayed", server.base_url()),
1843 headers: HashMap::new(),
1844 body: None,
1845 operation_id: None,
1846 delay: Some(30),
1847 retry: Some(2),
1848 asserts: vec![StepAssertion {
1849 field: "body.ok".to_owned(),
1850 operator: "equals".to_owned(),
1851 expected: Some("true".to_owned()),
1852 }],
1853 }],
1854 };
1855
1856 let started = std::time::Instant::now();
1857 let results = execute_pipeline(&pipeline, None).await;
1858 let elapsed = started.elapsed();
1859
1860 assert_eq!(results.len(), 1);
1861 assert_eq!(results[0].attempt, Some(3));
1862 assert_eq!(results[0].max_attempts, Some(3));
1863 assert!(elapsed >= Duration::from_millis(75));
1864 call.assert_calls_async(3).await;
1865 }
1866}