Skip to main content

previa_engine/execution/
engine.rs

1use reqwest::Client;
2use serde_json::{Map, Value};
3use std::collections::HashMap;
4use std::future::Future;
5use std::pin::Pin;
6use std::time::{Duration, Instant};
7
8use crate::assertions::{evaluate_assertions, has_status_assertion};
9use crate::core::types::{
10    Pipeline, RuntimeEnvGroup, RuntimeSpec, StepExecutionResult, StepRequest, StepResponse,
11};
12use crate::execution::cancel::await_with_cancel;
13use crate::execution::http::{parse_absolute_http_url, parse_method};
14use crate::execution::logging::{log_step_request, log_step_response};
15use crate::template::resolve::resolve_template_variables;
16
17fn noop_request_start_gate<'a>(
18    _: &'a StepRequest,
19) -> Pin<Box<dyn Future<Output = bool> + Send + 'a>> {
20    Box::pin(async { true })
21}
22
23pub async fn execute_pipeline(
24    pipeline: &Pipeline,
25    selected_base_url_key: Option<&str>,
26) -> Vec<StepExecutionResult> {
27    let client = Client::new();
28    execute_pipeline_with_client_runtime_hooks(
29        &client,
30        pipeline,
31        selected_base_url_key,
32        None,
33        None,
34        None,
35        |_| {},
36        |_| {},
37        || false,
38        noop_request_start_gate,
39    )
40    .await
41}
42
43pub async fn execute_pipeline_with_client(
44    client: &Client,
45    pipeline: &Pipeline,
46    selected_base_url_key: Option<&str>,
47) -> Vec<StepExecutionResult> {
48    execute_pipeline_with_client_runtime_hooks(
49        client,
50        pipeline,
51        selected_base_url_key,
52        None,
53        None,
54        None,
55        |_| {},
56        |_| {},
57        || false,
58        noop_request_start_gate,
59    )
60    .await
61}
62
63pub async fn execute_pipeline_with_hooks<FStart, FResult, FCancel>(
64    pipeline: &Pipeline,
65    selected_base_url_key: Option<&str>,
66    on_step_start: FStart,
67    on_step_result: FResult,
68    should_cancel: FCancel,
69) -> Vec<StepExecutionResult>
70where
71    FStart: FnMut(&str),
72    FResult: FnMut(&StepExecutionResult),
73    FCancel: FnMut() -> bool,
74{
75    let client = Client::new();
76    execute_pipeline_with_client_runtime_hooks(
77        &client,
78        pipeline,
79        selected_base_url_key,
80        None,
81        None,
82        None,
83        on_step_start,
84        on_step_result,
85        should_cancel,
86        noop_request_start_gate,
87    )
88    .await
89}
90
91pub async fn execute_pipeline_with_specs_hooks<FStart, FResult, FCancel>(
92    pipeline: &Pipeline,
93    selected_base_url_key: Option<&str>,
94    specs: Option<&[RuntimeSpec]>,
95    on_step_start: FStart,
96    on_step_result: FResult,
97    should_cancel: FCancel,
98) -> Vec<StepExecutionResult>
99where
100    FStart: FnMut(&str),
101    FResult: FnMut(&StepExecutionResult),
102    FCancel: FnMut() -> bool,
103{
104    let client = Client::new();
105    execute_pipeline_with_client_runtime_hooks(
106        &client,
107        pipeline,
108        selected_base_url_key,
109        specs,
110        None,
111        None,
112        on_step_start,
113        on_step_result,
114        should_cancel,
115        noop_request_start_gate,
116    )
117    .await
118}
119
120pub async fn execute_pipeline_with_runtime_hooks<FStart, FResult, FCancel>(
121    pipeline: &Pipeline,
122    selected_base_url_key: Option<&str>,
123    specs: Option<&[RuntimeSpec]>,
124    env_groups: Option<&[RuntimeEnvGroup]>,
125    selected_env_group_slug: Option<&str>,
126    on_step_start: FStart,
127    on_step_result: FResult,
128    should_cancel: FCancel,
129) -> Vec<StepExecutionResult>
130where
131    FStart: FnMut(&str),
132    FResult: FnMut(&StepExecutionResult),
133    FCancel: FnMut() -> bool,
134{
135    let client = Client::new();
136    execute_pipeline_with_client_runtime_hooks(
137        &client,
138        pipeline,
139        selected_base_url_key,
140        specs,
141        env_groups,
142        selected_env_group_slug,
143        on_step_start,
144        on_step_result,
145        should_cancel,
146        noop_request_start_gate,
147    )
148    .await
149}
150
151pub async fn execute_pipeline_with_runtime_request_gate<FStart, FResult, FCancel, FGate>(
152    pipeline: &Pipeline,
153    selected_base_url_key: Option<&str>,
154    specs: Option<&[RuntimeSpec]>,
155    env_groups: Option<&[RuntimeEnvGroup]>,
156    selected_env_group_slug: Option<&str>,
157    on_step_start: FStart,
158    on_step_result: FResult,
159    should_cancel: FCancel,
160    on_request_start: FGate,
161) -> Vec<StepExecutionResult>
162where
163    FStart: FnMut(&str),
164    FResult: FnMut(&StepExecutionResult),
165    FCancel: FnMut() -> bool,
166    FGate: for<'a> FnMut(&'a StepRequest) -> Pin<Box<dyn Future<Output = bool> + Send + 'a>> + Send,
167{
168    let client = Client::new();
169    execute_pipeline_with_client_runtime_hooks(
170        &client,
171        pipeline,
172        selected_base_url_key,
173        specs,
174        env_groups,
175        selected_env_group_slug,
176        on_step_start,
177        on_step_result,
178        should_cancel,
179        on_request_start,
180    )
181    .await
182}
183
184pub async fn execute_pipeline_with_client_runtime_request_gate<FStart, FResult, FCancel, FGate>(
185    client: &Client,
186    pipeline: &Pipeline,
187    selected_base_url_key: Option<&str>,
188    specs: Option<&[RuntimeSpec]>,
189    env_groups: Option<&[RuntimeEnvGroup]>,
190    selected_env_group_slug: Option<&str>,
191    on_step_start: FStart,
192    on_step_result: FResult,
193    should_cancel: FCancel,
194    on_request_start: FGate,
195) -> Vec<StepExecutionResult>
196where
197    FStart: FnMut(&str),
198    FResult: FnMut(&StepExecutionResult),
199    FCancel: FnMut() -> bool,
200    FGate: for<'a> FnMut(&'a StepRequest) -> Pin<Box<dyn Future<Output = bool> + Send + 'a>> + Send,
201{
202    execute_pipeline_with_client_runtime_hooks(
203        client,
204        pipeline,
205        selected_base_url_key,
206        specs,
207        env_groups,
208        selected_env_group_slug,
209        on_step_start,
210        on_step_result,
211        should_cancel,
212        on_request_start,
213    )
214    .await
215}
216
217pub async fn execute_pipeline_with_client_hooks<FStart, FResult, FCancel>(
218    client: &Client,
219    pipeline: &Pipeline,
220    selected_base_url_key: Option<&str>,
221    on_step_start: FStart,
222    on_step_result: FResult,
223    should_cancel: FCancel,
224) -> Vec<StepExecutionResult>
225where
226    FStart: FnMut(&str),
227    FResult: FnMut(&StepExecutionResult),
228    FCancel: FnMut() -> bool,
229{
230    execute_pipeline_with_client_runtime_hooks(
231        client,
232        pipeline,
233        selected_base_url_key,
234        None,
235        None,
236        None,
237        on_step_start,
238        on_step_result,
239        should_cancel,
240        noop_request_start_gate,
241    )
242    .await
243}
244
245fn finalize_step_result<FResult>(
246    step_id: &str,
247    result: StepExecutionResult,
248    context: &mut HashMap<String, StepExecutionResult>,
249    results: &mut Vec<StepExecutionResult>,
250    on_step_result: &mut FResult,
251) -> bool
252where
253    FResult: FnMut(&StepExecutionResult),
254{
255    let should_stop_pipeline = result.status == "error";
256    context.insert(step_id.to_owned(), result.clone());
257    on_step_result(&result);
258    results.push(result);
259    should_stop_pipeline
260}
261
262async fn execute_pipeline_with_client_runtime_hooks<FStart, FResult, FCancel, FGate>(
263    client: &Client,
264    pipeline: &Pipeline,
265    selected_base_url_key: Option<&str>,
266    specs: Option<&[RuntimeSpec]>,
267    env_groups: Option<&[RuntimeEnvGroup]>,
268    selected_env_group_slug: Option<&str>,
269    on_step_start: FStart,
270    on_step_result: FResult,
271    should_cancel: FCancel,
272    on_request_start: FGate,
273) -> Vec<StepExecutionResult>
274where
275    FStart: FnMut(&str),
276    FResult: FnMut(&StepExecutionResult),
277    FCancel: FnMut() -> bool,
278    FGate: for<'a> FnMut(&'a StepRequest) -> Pin<Box<dyn Future<Output = bool> + Send + 'a>> + Send,
279{
280    execute_pipeline_with_client_runtime_hooks_from_index(
281        client,
282        pipeline,
283        selected_base_url_key,
284        0,
285        HashMap::new(),
286        specs,
287        env_groups,
288        selected_env_group_slug,
289        on_step_start,
290        on_step_result,
291        should_cancel,
292        on_request_start,
293    )
294    .await
295}
296
297pub async fn execute_pipeline_from_step_with_client_runtime_hooks<FStart, FResult, FCancel, FGate>(
298    client: &Client,
299    pipeline: &Pipeline,
300    start_step_id: &str,
301    initial_context: HashMap<String, StepExecutionResult>,
302    specs: Option<&[RuntimeSpec]>,
303    env_groups: Option<&[RuntimeEnvGroup]>,
304    selected_env_group_slug: Option<&str>,
305    on_step_start: FStart,
306    on_step_result: FResult,
307    should_cancel: FCancel,
308    on_request_start: FGate,
309) -> Vec<StepExecutionResult>
310where
311    FStart: FnMut(&str),
312    FResult: FnMut(&StepExecutionResult),
313    FCancel: FnMut() -> bool,
314    FGate: for<'a> FnMut(&'a StepRequest) -> Pin<Box<dyn Future<Output = bool> + Send + 'a>> + Send,
315{
316    let start_index = pipeline
317        .steps
318        .iter()
319        .position(|step| step.id == start_step_id)
320        .unwrap_or(pipeline.steps.len());
321
322    execute_pipeline_with_client_runtime_hooks_from_index(
323        client,
324        pipeline,
325        None,
326        start_index,
327        initial_context,
328        specs,
329        env_groups,
330        selected_env_group_slug,
331        on_step_start,
332        on_step_result,
333        should_cancel,
334        on_request_start,
335    )
336    .await
337}
338
339async fn execute_pipeline_with_client_runtime_hooks_from_index<FStart, FResult, FCancel, FGate>(
340    client: &Client,
341    pipeline: &Pipeline,
342    _selected_base_url_key: Option<&str>,
343    start_index: usize,
344    initial_context: HashMap<String, StepExecutionResult>,
345    specs: Option<&[RuntimeSpec]>,
346    env_groups: Option<&[RuntimeEnvGroup]>,
347    selected_env_group_slug: Option<&str>,
348    mut on_step_start: FStart,
349    mut on_step_result: FResult,
350    mut should_cancel: FCancel,
351    mut on_request_start: FGate,
352) -> Vec<StepExecutionResult>
353where
354    FStart: FnMut(&str),
355    FResult: FnMut(&StepExecutionResult),
356    FCancel: FnMut() -> bool,
357    FGate: for<'a> FnMut(&'a StepRequest) -> Pin<Box<dyn Future<Output = bool> + Send + 'a>> + Send,
358{
359    let mut context = initial_context;
360    let mut results = Vec::with_capacity(pipeline.steps.len().saturating_sub(start_index));
361
362    'steps: for step in pipeline.steps.iter().skip(start_index) {
363        if should_cancel() {
364            break;
365        }
366
367        let delay_ms = step.delay.unwrap_or(0);
368        let max_attempts = step.retry.unwrap_or(0).saturating_add(1);
369
370        for attempt in 1..=max_attempts {
371            if should_cancel() {
372                break 'steps;
373            }
374
375            if delay_ms > 0 {
376                let Some(_) = await_with_cancel(
377                    tokio::time::sleep(Duration::from_millis(delay_ms)),
378                    &mut should_cancel,
379                )
380                .await
381                else {
382                    break 'steps;
383                };
384            }
385
386            on_step_start(&step.id);
387            let start = Instant::now();
388
389            let resolved_url = resolve_template_variables(
390                &Value::String(step.url.clone()),
391                &context,
392                specs,
393                env_groups,
394                selected_env_group_slug,
395            )
396            .as_str()
397            .unwrap_or(step.url.as_str())
398            .to_owned();
399
400            let resolved_headers = resolve_template_variables(
401                &serde_json::to_value(&step.headers).unwrap_or(Value::Object(Map::new())),
402                &context,
403                specs,
404                env_groups,
405                selected_env_group_slug,
406            )
407            .as_object()
408            .map(|m| {
409                m.iter()
410                    .map(|(k, v)| (k.clone(), v.as_str().unwrap_or_default().to_owned()))
411                    .collect::<HashMap<String, String>>()
412            })
413            .unwrap_or_default();
414
415            let resolved_body = step.body.as_ref().map(|body| {
416                resolve_template_variables(
417                    body,
418                    &context,
419                    specs,
420                    env_groups,
421                    selected_env_group_slug,
422                )
423            });
424
425            let method = match parse_method(&step.method) {
426                Ok(method) => method,
427                Err(err) => {
428                    let result = StepExecutionResult {
429                        step_id: step.id.clone(),
430                        status: "error".to_owned(),
431                        request: Some(StepRequest {
432                            method: step.method.clone(),
433                            url: resolved_url.clone(),
434                            headers: resolved_headers.clone(),
435                            body: resolved_body.clone(),
436                        }),
437                        response: None,
438                        error: Some(err),
439                        duration: Some(start.elapsed().as_millis()),
440                        attempts: if max_attempts > 1 {
441                            Some(attempt)
442                        } else {
443                            None
444                        },
445                        attempt: Some(attempt),
446                        max_attempts: Some(max_attempts),
447                        assert_results: None,
448                    };
449                    log_step_response(&step.id, None, result.error.as_deref());
450
451                    if attempt < max_attempts {
452                        continue;
453                    }
454                    if finalize_step_result(
455                        &step.id,
456                        result,
457                        &mut context,
458                        &mut results,
459                        &mut on_step_result,
460                    ) {
461                        break 'steps;
462                    }
463                    break;
464                }
465            };
466
467            let url = match parse_absolute_http_url(&resolved_url) {
468                Ok(url) => url,
469                Err(err) => {
470                    let result = StepExecutionResult {
471                        step_id: step.id.clone(),
472                        status: "error".to_owned(),
473                        request: Some(StepRequest {
474                            method: step.method.clone(),
475                            url: resolved_url.clone(),
476                            headers: resolved_headers.clone(),
477                            body: resolved_body.clone(),
478                        }),
479                        response: None,
480                        error: Some(err),
481                        duration: Some(start.elapsed().as_millis()),
482                        attempts: if max_attempts > 1 {
483                            Some(attempt)
484                        } else {
485                            None
486                        },
487                        attempt: Some(attempt),
488                        max_attempts: Some(max_attempts),
489                        assert_results: None,
490                    };
491                    log_step_response(&step.id, None, result.error.as_deref());
492
493                    if attempt < max_attempts {
494                        continue;
495                    }
496                    if finalize_step_result(
497                        &step.id,
498                        result,
499                        &mut context,
500                        &mut results,
501                        &mut on_step_result,
502                    ) {
503                        break 'steps;
504                    }
505                    break;
506                }
507            };
508
509            let mut request_builder = client.request(method, url);
510
511            for (k, v) in &resolved_headers {
512                request_builder = request_builder.header(k, v);
513            }
514
515            if let Some(body) = &resolved_body {
516                if !step.method.eq_ignore_ascii_case("GET")
517                    && !step.method.eq_ignore_ascii_case("HEAD")
518                {
519                    request_builder = request_builder.json(body);
520                }
521            }
522
523            let request = StepRequest {
524                method: step.method.clone(),
525                url: resolved_url.clone(),
526                headers: resolved_headers.clone(),
527                body: resolved_body.clone(),
528            };
529            log_step_request(&step.id, &request);
530            let request_admitted = on_request_start(&request).await;
531            if !request_admitted {
532                break 'steps;
533            }
534            if should_cancel() {
535                break 'steps;
536            }
537
538            let Some(send_result) =
539                await_with_cancel(request_builder.send(), &mut should_cancel).await
540            else {
541                break 'steps;
542            };
543
544            match send_result {
545                Ok(response) => {
546                    let status = response.status();
547                    let status_text = status.canonical_reason().unwrap_or("").to_owned();
548                    let mut headers = HashMap::new();
549                    for (k, v) in response.headers() {
550                        headers.insert(
551                            k.as_str().to_owned(),
552                            v.to_str().unwrap_or_default().to_owned(),
553                        );
554                    }
555
556                    let content_type = headers
557                        .iter()
558                        .find(|(k, _)| k.eq_ignore_ascii_case("content-type"))
559                        .map(|(_, v)| v.as_str())
560                        .unwrap_or("");
561
562                    let body = if content_type.contains("application/json") {
563                        let Some(body_result) =
564                            await_with_cancel(response.json::<Value>(), &mut should_cancel).await
565                        else {
566                            break 'steps;
567                        };
568                        match body_result {
569                            Ok(value) => value,
570                            Err(err) => {
571                                let result = StepExecutionResult {
572                                    step_id: step.id.clone(),
573                                    status: "error".to_owned(),
574                                    request: Some(request),
575                                    response: None,
576                                    error: Some(err.to_string()),
577                                    duration: Some(start.elapsed().as_millis()),
578                                    attempts: if max_attempts > 1 {
579                                        Some(attempt)
580                                    } else {
581                                        None
582                                    },
583                                    attempt: Some(attempt),
584                                    max_attempts: Some(max_attempts),
585                                    assert_results: None,
586                                };
587                                log_step_response(&step.id, None, result.error.as_deref());
588
589                                if attempt < max_attempts {
590                                    continue;
591                                }
592                                if finalize_step_result(
593                                    &step.id,
594                                    result,
595                                    &mut context,
596                                    &mut results,
597                                    &mut on_step_result,
598                                ) {
599                                    break 'steps;
600                                }
601                                break;
602                            }
603                        }
604                    } else {
605                        let Some(body_result) =
606                            await_with_cancel(response.text(), &mut should_cancel).await
607                        else {
608                            break 'steps;
609                        };
610                        Value::String(body_result.unwrap_or_default())
611                    };
612
613                    let http_error = (!status.is_success())
614                        .then(|| format!("HTTP {} {}", status.as_u16(), status_text));
615                    let mut result = StepExecutionResult {
616                        step_id: step.id.clone(),
617                        status: "success".to_owned(),
618                        request: Some(request),
619                        response: Some(StepResponse {
620                            status: status.as_u16(),
621                            status_text: status_text.clone(),
622                            headers,
623                            body,
624                        }),
625                        error: http_error.clone(),
626                        duration: Some(start.elapsed().as_millis()),
627                        attempts: if max_attempts > 1 {
628                            Some(attempt)
629                        } else {
630                            None
631                        },
632                        attempt: Some(attempt),
633                        max_attempts: Some(max_attempts),
634                        assert_results: None,
635                    };
636
637                    let has_status_assert = has_status_assertion(step);
638                    let assert_results = evaluate_assertions(
639                        step,
640                        &result,
641                        &context,
642                        specs,
643                        env_groups,
644                        selected_env_group_slug,
645                    );
646                    let assertion_failed = assert_results.iter().any(|r| !r.passed);
647                    if !assert_results.is_empty() {
648                        if assertion_failed {
649                            result.status = "error".to_owned();
650                            let failed_count = assert_results.iter().filter(|r| !r.passed).count();
651                            result.error = Some(match result.error {
652                                Some(err) => {
653                                    format!("{} | {} assertion(s) failed", err, failed_count)
654                                }
655                                None => format!("{} assertion(s) failed", failed_count),
656                            });
657                        } else if http_error.is_some() {
658                            if has_status_assert {
659                                result.status = "success".to_owned();
660                                result.error = None;
661                            } else {
662                                result.status = "error".to_owned();
663                            }
664                        }
665                        result.assert_results = Some(assert_results);
666                    } else if http_error.is_some() {
667                        result.status = "error".to_owned();
668                    }
669
670                    log_step_response(&step.id, result.response.as_ref(), result.error.as_deref());
671
672                    if assertion_failed && attempt < max_attempts {
673                        continue;
674                    }
675
676                    if finalize_step_result(
677                        &step.id,
678                        result,
679                        &mut context,
680                        &mut results,
681                        &mut on_step_result,
682                    ) {
683                        break 'steps;
684                    }
685                    break;
686                }
687                Err(err) => {
688                    let result = StepExecutionResult {
689                        step_id: step.id.clone(),
690                        status: "error".to_owned(),
691                        request: Some(request),
692                        response: None,
693                        error: Some(err.to_string()),
694                        duration: Some(start.elapsed().as_millis()),
695                        attempts: if max_attempts > 1 {
696                            Some(attempt)
697                        } else {
698                            None
699                        },
700                        attempt: Some(attempt),
701                        max_attempts: Some(max_attempts),
702                        assert_results: None,
703                    };
704                    log_step_response(&step.id, None, result.error.as_deref());
705
706                    if attempt < max_attempts {
707                        continue;
708                    }
709
710                    if finalize_step_result(
711                        &step.id,
712                        result,
713                        &mut context,
714                        &mut results,
715                        &mut on_step_result,
716                    ) {
717                        break 'steps;
718                    }
719                    break;
720                }
721            }
722        }
723    }
724
725    results
726}
727
728#[cfg(test)]
729mod tests {
730    use super::*;
731    use crate::core::types::{
732        Pipeline, PipelineStep, RuntimeSpec, StepAssertion, StepRequest, StepResponse,
733    };
734    use httpmock::Method::{GET, POST};
735    use httpmock::MockServer;
736    use serde_json::json;
737    use std::sync::Arc;
738    use std::sync::atomic::{AtomicBool, Ordering};
739
740    #[tokio::test]
741    async fn executes_from_step_with_seeded_previous_results() {
742        let server = MockServer::start_async().await;
743        let protected = server
744            .mock_async(|when, then| {
745                when.method(GET)
746                    .path("/protected")
747                    .header("authorization", "Bearer abc123");
748                then.status(200)
749                    .header("content-type", "application/json")
750                    .json_body(json!({ "ok": true }));
751            })
752            .await;
753
754        let pipeline = Pipeline {
755            id: Some("pipe-1".to_owned()),
756            name: "Pipe".to_owned(),
757            description: None,
758            steps: vec![
759                PipelineStep {
760                    id: "login".to_owned(),
761                    name: "Login".to_owned(),
762                    description: None,
763                    method: "POST".to_owned(),
764                    url: format!("{}/login", server.base_url()),
765                    headers: HashMap::new(),
766                    body: None,
767                    operation_id: None,
768                    delay: None,
769                    retry: None,
770                    asserts: Vec::new(),
771                },
772                PipelineStep {
773                    id: "protected".to_owned(),
774                    name: "Protected".to_owned(),
775                    description: None,
776                    method: "GET".to_owned(),
777                    url: format!("{}/protected", server.base_url()),
778                    headers: HashMap::from([(
779                        "Authorization".to_owned(),
780                        "Bearer {{steps.login.token}}".to_owned(),
781                    )]),
782                    body: None,
783                    operation_id: None,
784                    delay: None,
785                    retry: None,
786                    asserts: Vec::new(),
787                },
788            ],
789        };
790
791        let seeded = HashMap::from([(
792            "login".to_owned(),
793            StepExecutionResult {
794                step_id: "login".to_owned(),
795                status: "success".to_owned(),
796                request: Some(StepRequest {
797                    method: "POST".to_owned(),
798                    url: format!("{}/login", server.base_url()),
799                    headers: HashMap::new(),
800                    body: None,
801                }),
802                response: Some(StepResponse {
803                    status: 200,
804                    status_text: "OK".to_owned(),
805                    headers: HashMap::new(),
806                    body: json!({ "token": "abc123" }),
807                }),
808                error: None,
809                duration: Some(1),
810                attempts: Some(1),
811                attempt: Some(1),
812                max_attempts: Some(1),
813                assert_results: None,
814            },
815        )]);
816
817        let mut started = Vec::new();
818        let results = execute_pipeline_from_step_with_client_runtime_hooks(
819            &reqwest::Client::new(),
820            &pipeline,
821            "protected",
822            seeded,
823            None,
824            None,
825            None,
826            |step_id| started.push(step_id.to_owned()),
827            |_| {},
828            || false,
829            |_| Box::pin(async { true }),
830        )
831        .await;
832
833        assert_eq!(started, vec!["protected"]);
834        assert_eq!(results.len(), 1);
835        assert_eq!(results[0].step_id, "protected");
836        assert_eq!(results[0].status, "success");
837        protected.assert_async().await;
838    }
839
840    #[tokio::test]
841    async fn executes_pipeline_with_interpolation_and_assertions() {
842        let server = MockServer::start_async().await;
843
844        let create_user = server
845            .mock_async(|when, then| {
846                when.method(POST).path("/users");
847                then.status(201)
848                    .header("content-type", "application/json")
849                    .json_body(
850                        json!({ "id": "u-1", "token": "token-123", "email": "john@example.com" }),
851                    );
852            })
853            .await;
854
855        let get_user = server
856            .mock_async(|when, then| {
857                when.method(GET).path("/users/u-1");
858                then.status(200)
859                    .header("content-type", "application/json")
860                    .json_body(json!({ "id": "u-1", "email": "john@example.com" }));
861            })
862            .await;
863
864        let pipeline = Pipeline {
865            id: None,
866            name: "User flow".to_owned(),
867            description: Some("Pipeline test".to_owned()),
868            steps: vec![
869                PipelineStep {
870                    id: "create_user".to_owned(),
871                    name: "Create".to_owned(),
872                    description: None,
873                    method: "POST".to_owned(),
874                    url: format!("{}/users", server.base_url()),
875                    headers: HashMap::from([(
876                        "content-type".to_owned(),
877                        "application/json".to_owned(),
878                    )]),
879                    body: Some(json!({ "name": "{{helpers.name}}" })),
880                    operation_id: None,
881                    delay: None,
882                    retry: None,
883                    asserts: vec![
884                        StepAssertion {
885                            field: "status".to_owned(),
886                            operator: "equals".to_owned(),
887                            expected: Some("201".to_owned()),
888                        },
889                        StepAssertion {
890                            field: "body.id".to_owned(),
891                            operator: "exists".to_owned(),
892                            expected: None,
893                        },
894                    ],
895                },
896                PipelineStep {
897                    id: "get_user".to_owned(),
898                    name: "Get".to_owned(),
899                    description: None,
900                    method: "GET".to_owned(),
901                    url: format!("{}/users/{{{{steps.create_user.id}}}}", server.base_url()),
902                    headers: HashMap::new(),
903                    body: None,
904                    operation_id: None,
905                    delay: None,
906                    retry: None,
907                    asserts: vec![
908                        StepAssertion {
909                            field: "status".to_owned(),
910                            operator: "equals".to_owned(),
911                            expected: Some("{{steps.create_user.status}}".to_owned()),
912                        },
913                        StepAssertion {
914                            field: "body.email".to_owned(),
915                            operator: "contains".to_owned(),
916                            expected: Some("@".to_owned()),
917                        },
918                    ],
919                },
920            ],
921        };
922
923        let results = execute_pipeline(&pipeline, None).await;
924
925        create_user.assert_async().await;
926        get_user.assert_async().await;
927
928        assert_eq!(results.len(), 2);
929        assert_eq!(results[0].status, "success");
930        assert_eq!(results[1].status, "error");
931        assert!(
932            results[1]
933                .error
934                .as_ref()
935                .is_some_and(|err| err.contains("assertion(s) failed"))
936        );
937    }
938
939    #[tokio::test]
940    async fn resolves_spec_url_variable() {
941        let server_dev = MockServer::start_async().await;
942        let server_prod = MockServer::start_async().await;
943
944        let _dev = server_dev
945            .mock_async(|when, then| {
946                when.method(GET).path("/health");
947                then.status(200).body("dev");
948            })
949            .await;
950
951        let prod = server_prod
952            .mock_async(|when, then| {
953                when.method(GET).path("/health");
954                then.status(200).body("prod");
955            })
956            .await;
957
958        let pipeline = Pipeline {
959            id: None,
960            name: "Env".to_owned(),
961            description: None,
962            steps: vec![PipelineStep {
963                id: "health".to_owned(),
964                name: "Health".to_owned(),
965                description: None,
966                method: "GET".to_owned(),
967                url: "{{specs.users-api.url.prod}}/health".to_owned(),
968                headers: HashMap::new(),
969                body: None,
970                operation_id: None,
971                delay: None,
972                retry: None,
973                asserts: vec![],
974            }],
975        };
976        let specs = [RuntimeSpec {
977            slug: "users-api".to_owned(),
978            servers: HashMap::from([
979                ("dev".to_owned(), server_dev.base_url()),
980                ("prod".to_owned(), server_prod.base_url()),
981            ]),
982        }];
983
984        let results = execute_pipeline_with_specs_hooks(
985            &pipeline,
986            Some("dev"),
987            Some(&specs),
988            |_| {},
989            |_| {},
990            || false,
991        )
992        .await;
993
994        prod.assert_async().await;
995        assert_eq!(results.len(), 1);
996        assert_eq!(results[0].status, "success");
997        assert_eq!(
998            results[0].response.as_ref().map(|r| r.body.clone()),
999            Some(Value::String("prod".to_owned()))
1000        );
1001    }
1002
1003    #[tokio::test]
1004    async fn request_gate_can_decline_before_http_send() {
1005        let server = MockServer::start_async().await;
1006        let call = server
1007            .mock_async(|when, then| {
1008                when.method(GET).path("/blocked");
1009                then.status(200).body("should not be called");
1010            })
1011            .await;
1012
1013        let pipeline = Pipeline {
1014            id: None,
1015            name: "Gate".to_owned(),
1016            description: None,
1017            steps: vec![PipelineStep {
1018                id: "blocked".to_owned(),
1019                name: "Blocked".to_owned(),
1020                description: None,
1021                method: "GET".to_owned(),
1022                url: format!("{}/blocked", server.base_url()),
1023                headers: HashMap::new(),
1024                body: None,
1025                operation_id: None,
1026                delay: None,
1027                retry: None,
1028                asserts: vec![],
1029            }],
1030        };
1031
1032        let results = execute_pipeline_with_runtime_request_gate(
1033            &pipeline,
1034            None,
1035            None,
1036            None,
1037            None,
1038            |_| {},
1039            |_| {},
1040            || false,
1041            |_| Box::pin(async { false }),
1042        )
1043        .await;
1044
1045        assert!(results.is_empty());
1046        call.assert_calls_async(0).await;
1047    }
1048
1049    #[tokio::test]
1050    async fn client_runtime_request_gate_uses_provided_client() {
1051        let server = MockServer::start_async().await;
1052        let call = server
1053            .mock_async(|when, then| {
1054                when.method(GET).path("/shared-client");
1055                then.status(200).body("ok");
1056            })
1057            .await;
1058
1059        let pipeline = Pipeline {
1060            id: None,
1061            name: "Shared client".to_owned(),
1062            description: None,
1063            steps: vec![PipelineStep {
1064                id: "shared".to_owned(),
1065                name: "Shared".to_owned(),
1066                description: None,
1067                method: "GET".to_owned(),
1068                url: format!("{}/shared-client", server.base_url()),
1069                headers: HashMap::new(),
1070                body: None,
1071                operation_id: None,
1072                delay: None,
1073                retry: None,
1074                asserts: vec![],
1075            }],
1076        };
1077
1078        let client = Client::new();
1079        let results = execute_pipeline_with_client_runtime_request_gate(
1080            &client,
1081            &pipeline,
1082            None,
1083            None,
1084            None,
1085            None,
1086            |_| {},
1087            |_| {},
1088            || false,
1089            |_| Box::pin(async { true }),
1090        )
1091        .await;
1092
1093        assert_eq!(results.len(), 1);
1094        assert_eq!(results[0].status, "success");
1095        call.assert_calls_async(1).await;
1096    }
1097
1098    #[tokio::test]
1099    async fn marks_step_as_error_when_assertion_fails() {
1100        let server = MockServer::start_async().await;
1101
1102        let call = server
1103            .mock_async(|when, then| {
1104                when.method(GET).path("/status");
1105                then.status(200)
1106                    .header("content-type", "application/json")
1107                    .json_body(json!({ "ok": true }));
1108            })
1109            .await;
1110
1111        let pipeline = Pipeline {
1112            id: None,
1113            name: "Assert".to_owned(),
1114            description: None,
1115            steps: vec![PipelineStep {
1116                id: "status".to_owned(),
1117                name: "Status".to_owned(),
1118                description: None,
1119                method: "GET".to_owned(),
1120                url: format!("{}/status", server.base_url()),
1121                headers: HashMap::new(),
1122                body: None,
1123                operation_id: None,
1124                delay: None,
1125                retry: None,
1126                asserts: vec![StepAssertion {
1127                    field: "status".to_owned(),
1128                    operator: "equals".to_owned(),
1129                    expected: Some("201".to_owned()),
1130                }],
1131            }],
1132        };
1133
1134        let results = execute_pipeline(&pipeline, None).await;
1135
1136        call.assert_async().await;
1137        assert_eq!(results.len(), 1);
1138        assert_eq!(results[0].status, "error");
1139        assert!(
1140            results[0]
1141                .error
1142                .as_ref()
1143                .is_some_and(|err| err.contains("1 assertion(s) failed"))
1144        );
1145    }
1146
1147    #[tokio::test]
1148    async fn stops_pipeline_after_step_failure() {
1149        let server = MockServer::start_async().await;
1150
1151        let failing_step = server
1152            .mock_async(|when, then| {
1153                when.method(GET).path("/fails");
1154                then.status(500).body("internal error");
1155            })
1156            .await;
1157
1158        let next_step = server
1159            .mock_async(|when, then| {
1160                when.method(GET).path("/next");
1161                then.status(200).body("ok");
1162            })
1163            .await;
1164
1165        let pipeline = Pipeline {
1166            id: None,
1167            name: "Stop on failure".to_owned(),
1168            description: None,
1169            steps: vec![
1170                PipelineStep {
1171                    id: "fails".to_owned(),
1172                    name: "Fails".to_owned(),
1173                    description: None,
1174                    method: "GET".to_owned(),
1175                    url: format!("{}/fails", server.base_url()),
1176                    headers: HashMap::new(),
1177                    body: None,
1178                    operation_id: None,
1179                    delay: None,
1180                    retry: None,
1181                    asserts: vec![StepAssertion {
1182                        field: "status".to_owned(),
1183                        operator: "equals".to_owned(),
1184                        expected: Some("201".to_owned()),
1185                    }],
1186                },
1187                PipelineStep {
1188                    id: "next".to_owned(),
1189                    name: "Next".to_owned(),
1190                    description: None,
1191                    method: "GET".to_owned(),
1192                    url: format!("{}/next", server.base_url()),
1193                    headers: HashMap::new(),
1194                    body: None,
1195                    operation_id: None,
1196                    delay: None,
1197                    retry: None,
1198                    asserts: vec![],
1199                },
1200            ],
1201        };
1202
1203        let results = execute_pipeline(&pipeline, None).await;
1204
1205        failing_step.assert_async().await;
1206        assert_eq!(next_step.calls_async().await, 0);
1207        assert_eq!(results.len(), 1);
1208        assert_eq!(results[0].step_id, "fails");
1209        assert_eq!(results[0].status, "error");
1210        assert!(
1211            results[0]
1212                .error
1213                .as_ref()
1214                .is_some_and(|err| err.contains("HTTP 500") && err.contains("assertion(s) failed"))
1215        );
1216    }
1217
1218    #[tokio::test]
1219    async fn executes_create_user_and_send_email_case_from_json_payload() {
1220        let server = MockServer::start_async().await;
1221
1222        let create_user = server
1223            .mock_async(|when, then| {
1224                when.method(POST).path("/users");
1225                then.status(201)
1226                    .header("content-type", "application/json")
1227                    .json_body(json!({
1228                        "$id": "usr-100",
1229                        "name": "John Doe",
1230                        "email": "john@example.com"
1231                    }));
1232            })
1233            .await;
1234
1235        let send_email = server
1236            .mock_async(|when, then| {
1237                when.method(POST)
1238                    .path("/emails")
1239                    .json_body(json!({ "to": "john@example.com", "name": "John Doe" }));
1240                then.status(201)
1241                    .header("content-type", "application/json")
1242                    .json_body(json!({ "queued": true }));
1243            })
1244            .await;
1245
1246        let payload = json!({
1247            "name": "Criar Usuário e Enviar Email",
1248            "description": "Pipeline de cadastro completo",
1249            "steps": [
1250                {
1251                    "id": "create_user",
1252                    "name": "Criar Usuário",
1253                    "description": "Cria um novo usuário com dados aleatórios",
1254                    "headers": {
1255                        "Content-Type": "application/json"
1256                    },
1257                    "method": "POST",
1258                    "url": format!("{}/users", server.base_url()),
1259                    "body": {
1260                        "id": "{{helpers.uuid}}",
1261                        "name": "{{helpers.name}}",
1262                        "email": "{{helpers.email}}",
1263                        "cpf": "{{helpers.cpf}}"
1264                    },
1265                    "operationId": "createUser",
1266                    "asserts": [
1267                        {
1268                            "field": "status",
1269                            "operator": "equals",
1270                            "expected": "201"
1271                        },
1272                        {
1273                            "field": "body.$id",
1274                            "operator": "exists"
1275                        },
1276                        {
1277                            "field": "body.email",
1278                            "operator": "contains",
1279                            "expected": "@"
1280                        }
1281                    ]
1282                },
1283                {
1284                    "id": "send_email",
1285                    "name": "Enviar Email de Boas-Vindas",
1286                    "description": "Envia email usando dados do step anterior",
1287                    "headers": {
1288                        "Content-Type": "application/json"
1289                    },
1290                    "method": "POST",
1291                    "url": format!("{}/emails", server.base_url()),
1292                    "body": {
1293                        "to": "{{steps.create_user.email}}",
1294                        "name": "{{steps.create_user.name}}"
1295                    },
1296                    "asserts": [
1297                        {
1298                            "field": "status",
1299                            "operator": "equals",
1300                            "expected": "201"
1301                        }
1302                    ]
1303                }
1304            ],
1305            "id": "e3045988"
1306        });
1307
1308        let pipeline: Pipeline =
1309            serde_json::from_value(payload).expect("pipeline payload is valid");
1310        let results = execute_pipeline(&pipeline, None).await;
1311
1312        create_user.assert_async().await;
1313        send_email.assert_async().await;
1314
1315        assert_eq!(results.len(), 2);
1316        assert_eq!(results[0].status, "success");
1317        assert_eq!(results[1].status, "success");
1318        assert_eq!(pipeline.id, Some("e3045988".to_owned()));
1319        assert_eq!(
1320            pipeline.steps[0].operation_id.as_deref(),
1321            Some("createUser")
1322        );
1323        assert!(
1324            results[0]
1325                .request
1326                .as_ref()
1327                .and_then(|r| r.body.as_ref())
1328                .and_then(|b| b.get("cpf"))
1329                .and_then(|v| v.as_str())
1330                .is_some_and(|cpf| cpf.len() == 14 && cpf.contains('.') && cpf.contains('-'))
1331        );
1332    }
1333
1334    #[tokio::test]
1335    async fn cancels_in_flight_future_when_cancel_flag_changes() {
1336        let cancelled = Arc::new(AtomicBool::new(false));
1337        let cancelled_writer = Arc::clone(&cancelled);
1338
1339        tokio::spawn(async move {
1340            tokio::time::sleep(Duration::from_millis(30)).await;
1341            cancelled_writer.store(true, Ordering::SeqCst);
1342        });
1343
1344        let mut should_cancel = || cancelled.load(Ordering::SeqCst);
1345        let result = await_with_cancel(
1346            async {
1347                tokio::time::sleep(Duration::from_millis(500)).await;
1348                "done"
1349            },
1350            &mut should_cancel,
1351        )
1352        .await;
1353
1354        assert!(result.is_none());
1355    }
1356
1357    #[tokio::test]
1358    async fn retries_when_assertions_fail() {
1359        let server = MockServer::start_async().await;
1360        let status_call = server
1361            .mock_async(|when, then| {
1362                when.method(GET).path("/status");
1363                then.status(200)
1364                    .header("content-type", "application/json")
1365                    .json_body(json!({ "ok": false }));
1366            })
1367            .await;
1368
1369        let pipeline = Pipeline {
1370            id: None,
1371            name: "Retry assertions".to_owned(),
1372            description: None,
1373            steps: vec![PipelineStep {
1374                id: "status".to_owned(),
1375                name: "Status".to_owned(),
1376                description: None,
1377                method: "GET".to_owned(),
1378                url: format!("{}/status", server.base_url()),
1379                headers: HashMap::new(),
1380                body: None,
1381                operation_id: None,
1382                delay: Some(0),
1383                retry: Some(2),
1384                asserts: vec![StepAssertion {
1385                    field: "body.ok".to_owned(),
1386                    operator: "equals".to_owned(),
1387                    expected: Some("true".to_owned()),
1388                }],
1389            }],
1390        };
1391
1392        let results = execute_pipeline(&pipeline, None).await;
1393
1394        assert_eq!(results.len(), 1);
1395        assert_eq!(results[0].status, "error");
1396        assert_eq!(results[0].attempt, Some(3));
1397        assert_eq!(results[0].max_attempts, Some(3));
1398        status_call.assert_calls_async(3).await;
1399    }
1400
1401    #[tokio::test]
1402    async fn does_not_retry_on_http_error_without_assertions() {
1403        let server = MockServer::start_async().await;
1404        let call = server
1405            .mock_async(|when, then| {
1406                when.method(GET).path("/fails");
1407                then.status(500).body("internal error");
1408            })
1409            .await;
1410
1411        let pipeline = Pipeline {
1412            id: None,
1413            name: "No retry on HTTP".to_owned(),
1414            description: None,
1415            steps: vec![PipelineStep {
1416                id: "fails".to_owned(),
1417                name: "Fails".to_owned(),
1418                description: None,
1419                method: "GET".to_owned(),
1420                url: format!("{}/fails", server.base_url()),
1421                headers: HashMap::new(),
1422                body: None,
1423                operation_id: None,
1424                delay: Some(0),
1425                retry: Some(5),
1426                asserts: vec![],
1427            }],
1428        };
1429
1430        let results = execute_pipeline(&pipeline, None).await;
1431
1432        assert_eq!(results.len(), 1);
1433        assert_eq!(results[0].status, "error");
1434        assert_eq!(results[0].attempt, Some(1));
1435        assert_eq!(results[0].max_attempts, Some(6));
1436        call.assert_calls_async(1).await;
1437    }
1438
1439    #[tokio::test]
1440    async fn accepts_404_when_status_assert_matches() {
1441        let server = MockServer::start_async().await;
1442        let call = server
1443            .mock_async(|when, then| {
1444                when.method(GET).path("/missing");
1445                then.status(404)
1446                    .header("content-type", "application/json")
1447                    .json_body(json!({ "message": "not found" }));
1448            })
1449            .await;
1450
1451        let pipeline = Pipeline {
1452            id: None,
1453            name: "Expected 404".to_owned(),
1454            description: None,
1455            steps: vec![PipelineStep {
1456                id: "missing".to_owned(),
1457                name: "Missing".to_owned(),
1458                description: None,
1459                method: "GET".to_owned(),
1460                url: format!("{}/missing", server.base_url()),
1461                headers: HashMap::new(),
1462                body: None,
1463                operation_id: None,
1464                delay: None,
1465                retry: Some(5),
1466                asserts: vec![StepAssertion {
1467                    field: "status".to_owned(),
1468                    operator: "equals".to_owned(),
1469                    expected: Some("404".to_owned()),
1470                }],
1471            }],
1472        };
1473
1474        let results = execute_pipeline(&pipeline, None).await;
1475
1476        assert_eq!(results.len(), 1);
1477        assert_eq!(results[0].status, "success");
1478        assert_eq!(results[0].error, None);
1479        assert_eq!(results[0].attempt, Some(1));
1480        assert_eq!(
1481            results[0]
1482                .response
1483                .as_ref()
1484                .and_then(|response| response.body.get("message"))
1485                .and_then(|value| value.as_str()),
1486            Some("not found")
1487        );
1488        call.assert_calls_async(1).await;
1489    }
1490
1491    #[tokio::test]
1492    async fn accepts_500_when_status_assert_matches() {
1493        let server = MockServer::start_async().await;
1494        let call = server
1495            .mock_async(|when, then| {
1496                when.method(GET).path("/boom");
1497                then.status(500).body("internal error");
1498            })
1499            .await;
1500
1501        let pipeline = Pipeline {
1502            id: None,
1503            name: "Expected 500".to_owned(),
1504            description: None,
1505            steps: vec![PipelineStep {
1506                id: "boom".to_owned(),
1507                name: "Boom".to_owned(),
1508                description: None,
1509                method: "GET".to_owned(),
1510                url: format!("{}/boom", server.base_url()),
1511                headers: HashMap::new(),
1512                body: None,
1513                operation_id: None,
1514                delay: None,
1515                retry: None,
1516                asserts: vec![StepAssertion {
1517                    field: "status".to_owned(),
1518                    operator: "equals".to_owned(),
1519                    expected: Some("500".to_owned()),
1520                }],
1521            }],
1522        };
1523
1524        let results = execute_pipeline(&pipeline, None).await;
1525
1526        assert_eq!(results.len(), 1);
1527        assert_eq!(results[0].status, "success");
1528        assert_eq!(results[0].error, None);
1529        assert_eq!(
1530            results[0]
1531                .response
1532                .as_ref()
1533                .map(|response| response.body.clone()),
1534            Some(Value::String("internal error".to_owned()))
1535        );
1536        call.assert_calls_async(1).await;
1537    }
1538
1539    #[tokio::test]
1540    async fn accepts_array_index_assertions_in_response_body() {
1541        let server = MockServer::start_async().await;
1542        let call = server
1543            .mock_async(|when, then| {
1544                when.method(GET).path("/runtime");
1545                then.status(200)
1546                    .header("content-type", "application/json")
1547                    .json_body(json!({
1548                        "app": {
1549                            "status": "ready"
1550                        },
1551                        "pods": [
1552                            {
1553                                "podName": "app-keep-manual-123",
1554                                "phase": "Running"
1555                            }
1556                        ],
1557                        "containers": [
1558                            {
1559                                "name": "app-keep-manual"
1560                            }
1561                        ]
1562                    }));
1563            })
1564            .await;
1565
1566        let pipeline = Pipeline {
1567            id: None,
1568            name: "Runtime arrays".to_owned(),
1569            description: None,
1570            steps: vec![PipelineStep {
1571                id: "runtime".to_owned(),
1572                name: "Runtime".to_owned(),
1573                description: None,
1574                method: "GET".to_owned(),
1575                url: format!("{}/runtime", server.base_url()),
1576                headers: HashMap::new(),
1577                body: None,
1578                operation_id: None,
1579                delay: None,
1580                retry: None,
1581                asserts: vec![
1582                    StepAssertion {
1583                        field: "status".to_owned(),
1584                        operator: "equals".to_owned(),
1585                        expected: Some("200".to_owned()),
1586                    },
1587                    StepAssertion {
1588                        field: "body.app.status".to_owned(),
1589                        operator: "equals".to_owned(),
1590                        expected: Some("ready".to_owned()),
1591                    },
1592                    StepAssertion {
1593                        field: "body.pods.0.podName".to_owned(),
1594                        operator: "exists".to_owned(),
1595                        expected: None,
1596                    },
1597                    StepAssertion {
1598                        field: "body.pods.0.phase".to_owned(),
1599                        operator: "equals".to_owned(),
1600                        expected: Some("Running".to_owned()),
1601                    },
1602                    StepAssertion {
1603                        field: "body.containers.0.name".to_owned(),
1604                        operator: "exists".to_owned(),
1605                        expected: None,
1606                    },
1607                ],
1608            }],
1609        };
1610
1611        let results = execute_pipeline(&pipeline, None).await;
1612
1613        assert_eq!(results.len(), 1);
1614        assert_eq!(results[0].status, "success");
1615        assert_eq!(results[0].error, None);
1616        assert!(
1617            results[0]
1618                .assert_results
1619                .as_ref()
1620                .is_some_and(|items| items.iter().all(|item| item.passed))
1621        );
1622        call.assert_calls_async(1).await;
1623    }
1624
1625    #[tokio::test]
1626    async fn retries_when_status_assert_fails_on_http_error() {
1627        let server = MockServer::start_async().await;
1628        let call = server
1629            .mock_async(|when, then| {
1630                when.method(GET).path("/missing");
1631                then.status(404).body("not found");
1632            })
1633            .await;
1634
1635        let pipeline = Pipeline {
1636            id: None,
1637            name: "Retry status assert".to_owned(),
1638            description: None,
1639            steps: vec![PipelineStep {
1640                id: "missing".to_owned(),
1641                name: "Missing".to_owned(),
1642                description: None,
1643                method: "GET".to_owned(),
1644                url: format!("{}/missing", server.base_url()),
1645                headers: HashMap::new(),
1646                body: None,
1647                operation_id: None,
1648                delay: None,
1649                retry: Some(2),
1650                asserts: vec![StepAssertion {
1651                    field: "status".to_owned(),
1652                    operator: "equals".to_owned(),
1653                    expected: Some("200".to_owned()),
1654                }],
1655            }],
1656        };
1657
1658        let results = execute_pipeline(&pipeline, None).await;
1659
1660        assert_eq!(results.len(), 1);
1661        assert_eq!(results[0].status, "error");
1662        assert_eq!(results[0].attempt, Some(3));
1663        assert_eq!(results[0].max_attempts, Some(3));
1664        assert!(
1665            results[0]
1666                .error
1667                .as_ref()
1668                .is_some_and(|err| err.contains("HTTP 404"))
1669        );
1670        call.assert_calls_async(3).await;
1671    }
1672
1673    #[tokio::test]
1674    async fn keeps_http_error_when_status_assert_passes_but_body_assert_fails() {
1675        let server = MockServer::start_async().await;
1676        let call = server
1677            .mock_async(|when, then| {
1678                when.method(GET).path("/missing");
1679                then.status(404)
1680                    .header("content-type", "application/json")
1681                    .json_body(json!({ "message": "not found" }));
1682            })
1683            .await;
1684
1685        let pipeline = Pipeline {
1686            id: None,
1687            name: "HTTP plus body assert failure".to_owned(),
1688            description: None,
1689            steps: vec![PipelineStep {
1690                id: "missing".to_owned(),
1691                name: "Missing".to_owned(),
1692                description: None,
1693                method: "GET".to_owned(),
1694                url: format!("{}/missing", server.base_url()),
1695                headers: HashMap::new(),
1696                body: None,
1697                operation_id: None,
1698                delay: None,
1699                retry: Some(1),
1700                asserts: vec![
1701                    StepAssertion {
1702                        field: "status".to_owned(),
1703                        operator: "equals".to_owned(),
1704                        expected: Some("404".to_owned()),
1705                    },
1706                    StepAssertion {
1707                        field: "body.code".to_owned(),
1708                        operator: "equals".to_owned(),
1709                        expected: Some("x".to_owned()),
1710                    },
1711                ],
1712            }],
1713        };
1714
1715        let results = execute_pipeline(&pipeline, None).await;
1716
1717        assert_eq!(results.len(), 1);
1718        assert_eq!(results[0].status, "error");
1719        assert_eq!(results[0].attempt, Some(2));
1720        assert_eq!(results[0].max_attempts, Some(2));
1721        assert!(
1722            results[0]
1723                .error
1724                .as_ref()
1725                .is_some_and(|err| err.contains("HTTP 404") && err.contains("assertion(s) failed"))
1726        );
1727        call.assert_calls_async(2).await;
1728    }
1729
1730    #[tokio::test]
1731    async fn keeps_http_error_without_status_assert_even_if_body_assert_passes() {
1732        let server = MockServer::start_async().await;
1733        let call = server
1734            .mock_async(|when, then| {
1735                when.method(GET).path("/missing");
1736                then.status(404)
1737                    .header("content-type", "application/json")
1738                    .json_body(json!({ "message": "not found" }));
1739            })
1740            .await;
1741
1742        let pipeline = Pipeline {
1743            id: None,
1744            name: "Body assert only".to_owned(),
1745            description: None,
1746            steps: vec![PipelineStep {
1747                id: "missing".to_owned(),
1748                name: "Missing".to_owned(),
1749                description: None,
1750                method: "GET".to_owned(),
1751                url: format!("{}/missing", server.base_url()),
1752                headers: HashMap::new(),
1753                body: None,
1754                operation_id: None,
1755                delay: None,
1756                retry: Some(3),
1757                asserts: vec![StepAssertion {
1758                    field: "body.message".to_owned(),
1759                    operator: "equals".to_owned(),
1760                    expected: Some("not found".to_owned()),
1761                }],
1762            }],
1763        };
1764
1765        let results = execute_pipeline(&pipeline, None).await;
1766
1767        assert_eq!(results.len(), 1);
1768        assert_eq!(results[0].status, "error");
1769        assert_eq!(results[0].attempt, Some(1));
1770        assert_eq!(results[0].max_attempts, Some(4));
1771        assert!(
1772            results[0]
1773                .error
1774                .as_ref()
1775                .is_some_and(|err| err.contains("HTTP 404"))
1776        );
1777        call.assert_calls_async(1).await;
1778    }
1779
1780    #[tokio::test]
1781    async fn accepts_http_error_when_status_assert_uses_other_operator() {
1782        let server = MockServer::start_async().await;
1783        let call = server
1784            .mock_async(|when, then| {
1785                when.method(GET).path("/missing");
1786                then.status(404).body("not found");
1787            })
1788            .await;
1789
1790        let pipeline = Pipeline {
1791            id: None,
1792            name: "Expected non-200".to_owned(),
1793            description: None,
1794            steps: vec![PipelineStep {
1795                id: "missing".to_owned(),
1796                name: "Missing".to_owned(),
1797                description: None,
1798                method: "GET".to_owned(),
1799                url: format!("{}/missing", server.base_url()),
1800                headers: HashMap::new(),
1801                body: None,
1802                operation_id: None,
1803                delay: None,
1804                retry: None,
1805                asserts: vec![StepAssertion {
1806                    field: "status".to_owned(),
1807                    operator: "not_equals".to_owned(),
1808                    expected: Some("200".to_owned()),
1809                }],
1810            }],
1811        };
1812
1813        let results = execute_pipeline(&pipeline, None).await;
1814
1815        assert_eq!(results.len(), 1);
1816        assert_eq!(results[0].status, "success");
1817        assert_eq!(results[0].error, None);
1818        call.assert_calls_async(1).await;
1819    }
1820
1821    #[tokio::test]
1822    async fn applies_delay_before_each_attempt() {
1823        let server = MockServer::start_async().await;
1824        let call = server
1825            .mock_async(|when, then| {
1826                when.method(GET).path("/delayed");
1827                then.status(200)
1828                    .header("content-type", "application/json")
1829                    .json_body(json!({ "ok": false }));
1830            })
1831            .await;
1832
1833        let pipeline = Pipeline {
1834            id: None,
1835            name: "Delay before attempts".to_owned(),
1836            description: None,
1837            steps: vec![PipelineStep {
1838                id: "delayed".to_owned(),
1839                name: "Delayed".to_owned(),
1840                description: None,
1841                method: "GET".to_owned(),
1842                url: format!("{}/delayed", server.base_url()),
1843                headers: HashMap::new(),
1844                body: None,
1845                operation_id: None,
1846                delay: Some(30),
1847                retry: Some(2),
1848                asserts: vec![StepAssertion {
1849                    field: "body.ok".to_owned(),
1850                    operator: "equals".to_owned(),
1851                    expected: Some("true".to_owned()),
1852                }],
1853            }],
1854        };
1855
1856        let started = std::time::Instant::now();
1857        let results = execute_pipeline(&pipeline, None).await;
1858        let elapsed = started.elapsed();
1859
1860        assert_eq!(results.len(), 1);
1861        assert_eq!(results[0].attempt, Some(3));
1862        assert_eq!(results[0].max_attempts, Some(3));
1863        assert!(elapsed >= Duration::from_millis(75));
1864        call.assert_calls_async(3).await;
1865    }
1866}