Skip to main content

previa_engine/execution/
http_step.rs

1use reqwest::{Client, Method, Url};
2use serde_json::{Map, Value};
3use std::collections::HashMap;
4use std::future::Future;
5use std::time::Instant;
6
7use crate::assertions::{evaluate_assertions, has_status_assertion};
8use crate::core::types::{
9    PipelineStep, RuntimeEnvGroup, RuntimeSpec, StepExecutionResult, StepRequest, StepResponse,
10};
11use crate::execution::cancel::await_with_cancel;
12use crate::execution::http::{parse_absolute_http_url, parse_method};
13use crate::execution::logging::log_step_response;
14use crate::template::resolve::resolve_template_variables;
15
16#[derive(Debug, Clone)]
17pub struct PreparedHttpStep {
18    pub step_id: String,
19    pub attempt: usize,
20    pub max_attempts: usize,
21    pub method: Method,
22    pub url: Url,
23    pub request: StepRequest,
24    started_at: Instant,
25}
26
27#[derive(Debug)]
28pub struct StartedHttpStep {
29    pub request: StepRequest,
30    pub response: reqwest::Response,
31    started_at: Instant,
32    attempt: usize,
33    max_attempts: usize,
34}
35
36pub fn prepare_http_step(
37    step: &PipelineStep,
38    context: &HashMap<String, StepExecutionResult>,
39    specs: Option<&[RuntimeSpec]>,
40    env_groups: Option<&[RuntimeEnvGroup]>,
41    selected_env_group_slug: Option<&str>,
42    attempt: usize,
43    max_attempts: usize,
44) -> Result<PreparedHttpStep, StepExecutionResult> {
45    let started_at = Instant::now();
46    let resolved_url = resolve_template_variables(
47        &Value::String(step.url.clone()),
48        context,
49        specs,
50        env_groups,
51        selected_env_group_slug,
52    )
53    .as_str()
54    .unwrap_or(step.url.as_str())
55    .to_owned();
56
57    let resolved_headers = resolve_template_variables(
58        &serde_json::to_value(&step.headers).unwrap_or(Value::Object(Map::new())),
59        context,
60        specs,
61        env_groups,
62        selected_env_group_slug,
63    )
64    .as_object()
65    .map(|m| {
66        m.iter()
67            .map(|(k, v)| (k.clone(), v.as_str().unwrap_or_default().to_owned()))
68            .collect::<HashMap<String, String>>()
69    })
70    .unwrap_or_default();
71
72    let resolved_body = step.body.as_ref().map(|body| {
73        resolve_template_variables(body, context, specs, env_groups, selected_env_group_slug)
74    });
75
76    let request = StepRequest {
77        method: step.method.clone(),
78        url: resolved_url.clone(),
79        headers: resolved_headers,
80        body: resolved_body,
81    };
82
83    let method = match parse_method(&step.method) {
84        Ok(method) => method,
85        Err(err) => {
86            return Err(invalid_step_result(
87                step,
88                request,
89                err,
90                started_at,
91                attempt,
92                max_attempts,
93            ));
94        }
95    };
96
97    let url = match parse_absolute_http_url(&resolved_url) {
98        Ok(url) => url,
99        Err(err) => {
100            return Err(invalid_step_result(
101                step,
102                request,
103                err,
104                started_at,
105                attempt,
106                max_attempts,
107            ));
108        }
109    };
110
111    Ok(PreparedHttpStep {
112        step_id: step.id.clone(),
113        attempt,
114        max_attempts,
115        method,
116        url,
117        request,
118        started_at,
119    })
120}
121
122pub async fn send_prepared_http_step<FCancel>(
123    client: &Client,
124    prepared: PreparedHttpStep,
125    step: &PipelineStep,
126    context: &HashMap<String, StepExecutionResult>,
127    specs: Option<&[RuntimeSpec]>,
128    env_groups: Option<&[RuntimeEnvGroup]>,
129    selected_env_group_slug: Option<&str>,
130    should_cancel: FCancel,
131) -> Option<StepExecutionResult>
132where
133    FCancel: FnMut() -> bool,
134{
135    send_prepared_http_step_with_hooks(
136        client,
137        prepared,
138        step,
139        context,
140        specs,
141        env_groups,
142        selected_env_group_slug,
143        should_cancel,
144        || async {},
145        || async {},
146        || async {},
147    )
148    .await
149}
150
151pub async fn send_prepared_http_step_with_hooks<
152    FCancel,
153    FStart,
154    FStartFuture,
155    FSend,
156    FSendFuture,
157    FBody,
158    FBodyFuture,
159>(
160    client: &Client,
161    prepared: PreparedHttpStep,
162    step: &PipelineStep,
163    context: &HashMap<String, StepExecutionResult>,
164    specs: Option<&[RuntimeSpec]>,
165    env_groups: Option<&[RuntimeEnvGroup]>,
166    selected_env_group_slug: Option<&str>,
167    mut should_cancel: FCancel,
168    on_send_started: FStart,
169    on_send_returned: FSend,
170    on_body_completed: FBody,
171) -> Option<StepExecutionResult>
172where
173    FCancel: FnMut() -> bool,
174    FStart: FnMut() -> FStartFuture,
175    FStartFuture: Future<Output = ()>,
176    FSend: FnMut() -> FSendFuture,
177    FSendFuture: Future<Output = ()>,
178    FBody: FnMut() -> FBodyFuture,
179    FBodyFuture: Future<Output = ()>,
180{
181    let started = start_prepared_http_step_with_hooks(
182        client,
183        prepared,
184        step,
185        || should_cancel(),
186        on_send_started,
187        on_send_returned,
188    )
189    .await?;
190
191    match started {
192        Ok(started) => {
193            complete_started_http_step_with_hook(
194                started,
195                step,
196                context,
197                specs,
198                env_groups,
199                selected_env_group_slug,
200                should_cancel,
201                on_body_completed,
202            )
203            .await
204        }
205        Err(result) => Some(result),
206    }
207}
208
209pub async fn start_prepared_http_step_with_hooks<
210    FCancel,
211    FStart,
212    FStartFuture,
213    FSend,
214    FSendFuture,
215>(
216    client: &Client,
217    prepared: PreparedHttpStep,
218    step: &PipelineStep,
219    mut should_cancel: FCancel,
220    mut on_send_started: FStart,
221    mut on_send_returned: FSend,
222) -> Option<Result<StartedHttpStep, StepExecutionResult>>
223where
224    FCancel: FnMut() -> bool,
225    FStart: FnMut() -> FStartFuture,
226    FStartFuture: Future<Output = ()>,
227    FSend: FnMut() -> FSendFuture,
228    FSendFuture: Future<Output = ()>,
229{
230    let mut request_builder = client.request(prepared.method.clone(), prepared.url.clone());
231
232    for (key, value) in &prepared.request.headers {
233        request_builder = request_builder.header(key, value);
234    }
235
236    if let Some(body) = prepared.request.body.as_ref() {
237        if !prepared.request.method.eq_ignore_ascii_case("GET")
238            && !prepared.request.method.eq_ignore_ascii_case("HEAD")
239        {
240            request_builder = request_builder.json(body);
241        }
242    }
243
244    let request = prepared.request.clone();
245    if should_cancel() {
246        return None;
247    }
248    on_send_started().await;
249    let Some(send_result) = await_with_cancel(request_builder.send(), &mut should_cancel).await
250    else {
251        return None;
252    };
253    on_send_returned().await;
254
255    match send_result {
256        Ok(response) => Some(Ok(StartedHttpStep {
257            request,
258            response,
259            started_at: prepared.started_at,
260            attempt: prepared.attempt,
261            max_attempts: prepared.max_attempts,
262        })),
263        Err(err) => {
264            let result = step_result(
265                step,
266                request,
267                None,
268                Some(err.to_string()),
269                "error",
270                prepared.started_at,
271                prepared.attempt,
272                prepared.max_attempts,
273                None,
274            );
275            log_step_response(&step.id, None, result.error.as_deref());
276            Some(Err(result))
277        }
278    }
279}
280
281pub async fn complete_started_http_step_with_hook<FCancel, FBody, FBodyFuture>(
282    started: StartedHttpStep,
283    step: &PipelineStep,
284    context: &HashMap<String, StepExecutionResult>,
285    specs: Option<&[RuntimeSpec]>,
286    env_groups: Option<&[RuntimeEnvGroup]>,
287    selected_env_group_slug: Option<&str>,
288    mut should_cancel: FCancel,
289    mut on_body_completed: FBody,
290) -> Option<StepExecutionResult>
291where
292    FCancel: FnMut() -> bool,
293    FBody: FnMut() -> FBodyFuture,
294    FBodyFuture: Future<Output = ()>,
295{
296    let status = started.response.status();
297    let status_text = status.canonical_reason().unwrap_or("").to_owned();
298    let mut headers = HashMap::new();
299    for (key, value) in started.response.headers() {
300        headers.insert(
301            key.as_str().to_owned(),
302            value.to_str().unwrap_or_default().to_owned(),
303        );
304    }
305
306    let content_type = headers
307        .iter()
308        .find(|(key, _)| key.eq_ignore_ascii_case("content-type"))
309        .map(|(_, value)| value.as_str())
310        .unwrap_or("");
311
312    let body = if content_type.contains("application/json") {
313        let Some(body_result) =
314            await_with_cancel(started.response.json::<Value>(), &mut should_cancel).await
315        else {
316            return None;
317        };
318        on_body_completed().await;
319        match body_result {
320            Ok(value) => value,
321            Err(err) => {
322                let result = step_result(
323                    step,
324                    started.request,
325                    None,
326                    Some(err.to_string()),
327                    "error",
328                    started.started_at,
329                    started.attempt,
330                    started.max_attempts,
331                    None,
332                );
333                log_step_response(&step.id, None, result.error.as_deref());
334                return Some(result);
335            }
336        }
337    } else {
338        let Some(body_result) =
339            await_with_cancel(started.response.text(), &mut should_cancel).await
340        else {
341            return None;
342        };
343        on_body_completed().await;
344        Value::String(body_result.unwrap_or_default())
345    };
346
347    let http_error =
348        (!status.is_success()).then(|| format!("HTTP {} {}", status.as_u16(), status_text));
349    let mut result = step_result(
350        step,
351        started.request,
352        Some(StepResponse {
353            status: status.as_u16(),
354            status_text: status_text.clone(),
355            headers,
356            body,
357        }),
358        http_error.clone(),
359        "success",
360        started.started_at,
361        started.attempt,
362        started.max_attempts,
363        None,
364    );
365
366    let has_status_assert = has_status_assertion(step);
367    let assert_results = evaluate_assertions(
368        step,
369        &result,
370        context,
371        specs,
372        env_groups,
373        selected_env_group_slug,
374    );
375    let assertion_failed = assert_results.iter().any(|result| !result.passed);
376    if !assert_results.is_empty() {
377        if assertion_failed {
378            result.status = "error".to_owned();
379            let failed_count = assert_results
380                .iter()
381                .filter(|result| !result.passed)
382                .count();
383            result.error = Some(match result.error {
384                Some(err) => format!("{} | {} assertion(s) failed", err, failed_count),
385                None => format!("{} assertion(s) failed", failed_count),
386            });
387        } else if http_error.is_some() {
388            if has_status_assert {
389                result.status = "success".to_owned();
390                result.error = None;
391            } else {
392                result.status = "error".to_owned();
393            }
394        }
395        result.assert_results = Some(assert_results);
396    } else if http_error.is_some() {
397        result.status = "error".to_owned();
398    }
399
400    log_step_response(&step.id, result.response.as_ref(), result.error.as_deref());
401    Some(result)
402}
403
404fn invalid_step_result(
405    step: &PipelineStep,
406    request: StepRequest,
407    error: String,
408    started_at: Instant,
409    attempt: usize,
410    max_attempts: usize,
411) -> StepExecutionResult {
412    step_result(
413        step,
414        request,
415        None,
416        Some(error),
417        "error",
418        started_at,
419        attempt,
420        max_attempts,
421        None,
422    )
423}
424
425fn step_result(
426    step: &PipelineStep,
427    request: StepRequest,
428    response: Option<StepResponse>,
429    error: Option<String>,
430    status: &str,
431    started_at: Instant,
432    attempt: usize,
433    max_attempts: usize,
434    assert_results: Option<Vec<crate::core::types::AssertionResult>>,
435) -> StepExecutionResult {
436    StepExecutionResult {
437        step_id: step.id.clone(),
438        status: status.to_owned(),
439        request: Some(request),
440        response,
441        error,
442        duration: Some(started_at.elapsed().as_millis()),
443        attempts: if max_attempts > 1 {
444            Some(attempt)
445        } else {
446            None
447        },
448        attempt: Some(attempt),
449        max_attempts: Some(max_attempts),
450        assert_results,
451    }
452}
453
454#[cfg(test)]
455mod tests {
456    use super::*;
457    use crate::core::types::PipelineStep;
458    use httpmock::Method::GET;
459    use serde_json::json;
460    use std::collections::HashMap;
461
462    #[tokio::test]
463    async fn sends_prepared_step_and_returns_success_result() {
464        let server = httpmock::MockServer::start_async().await;
465        server
466            .mock_async(|when, then| {
467                when.method(GET).path("/users");
468                then.status(200)
469                    .header("content-type", "application/json")
470                    .json_body(json!({"ok": true}));
471            })
472            .await;
473
474        let client = reqwest::Client::new();
475        let step = PipelineStep {
476            id: "get-users".to_owned(),
477            name: "GET users".to_owned(),
478            description: None,
479            method: "GET".to_owned(),
480            url: format!("{}/users", server.base_url()),
481            headers: HashMap::new(),
482            body: None,
483            operation_id: None,
484            delay: None,
485            retry: None,
486            asserts: vec![],
487        };
488        let context = HashMap::new();
489
490        let prepared = prepare_http_step(&step, &context, None, None, None, 1, 1)
491            .expect("step should prepare");
492
493        let result =
494            send_prepared_http_step(&client, prepared, &step, &context, None, None, None, || {
495                false
496            })
497            .await
498            .expect("send should not be cancelled");
499
500        assert_eq!(result.step_id, "get-users");
501        assert_eq!(result.status, "success");
502        assert_eq!(result.response.as_ref().map(|r| r.status), Some(200));
503    }
504
505    #[tokio::test]
506    async fn hooks_report_send_started_before_send_returned() {
507        let server = httpmock::MockServer::start_async().await;
508        server
509            .mock_async(|when, then| {
510                when.method(GET).path("/users");
511                then.status(200)
512                    .header("content-type", "application/json")
513                    .json_body(json!({"ok": true}));
514            })
515            .await;
516
517        let client = reqwest::Client::new();
518        let step = PipelineStep {
519            id: "get-users".to_owned(),
520            name: "GET users".to_owned(),
521            description: None,
522            method: "GET".to_owned(),
523            url: format!("{}/users", server.base_url()),
524            headers: HashMap::new(),
525            body: None,
526            operation_id: None,
527            delay: None,
528            retry: None,
529            asserts: vec![],
530        };
531        let context = HashMap::new();
532        let prepared = prepare_http_step(&step, &context, None, None, None, 1, 1)
533            .expect("step should prepare");
534        let events = std::sync::Arc::new(std::sync::Mutex::new(Vec::<&'static str>::new()));
535
536        let started_events = std::sync::Arc::clone(&events);
537        let returned_events = std::sync::Arc::clone(&events);
538        let result = send_prepared_http_step_with_hooks(
539            &client,
540            prepared,
541            &step,
542            &context,
543            None,
544            None,
545            None,
546            || false,
547            move || {
548                let events = std::sync::Arc::clone(&started_events);
549                async move {
550                    events.lock().expect("events lock").push("started");
551                }
552            },
553            move || {
554                let events = std::sync::Arc::clone(&returned_events);
555                async move {
556                    events.lock().expect("events lock").push("returned");
557                }
558            },
559            || async {},
560        )
561        .await
562        .expect("send should not be cancelled");
563
564        assert_eq!(result.status, "success");
565        assert_eq!(
566            events.lock().expect("events lock").as_slice(),
567            ["started", "returned"]
568        );
569    }
570
571    #[tokio::test]
572    async fn split_http_helpers_start_send_before_body_completion() {
573        let server = httpmock::MockServer::start_async().await;
574        server
575            .mock_async(|when, then| {
576                when.method(GET).path("/users");
577                then.status(200)
578                    .header("content-type", "application/json")
579                    .json_body(json!({"ok": true}));
580            })
581            .await;
582
583        let client = reqwest::Client::new();
584        let step = PipelineStep {
585            id: "get-users".to_owned(),
586            name: "GET users".to_owned(),
587            description: None,
588            method: "GET".to_owned(),
589            url: format!("{}/users", server.base_url()),
590            headers: HashMap::new(),
591            body: None,
592            operation_id: None,
593            delay: None,
594            retry: None,
595            asserts: vec![],
596        };
597        let context = HashMap::new();
598        let prepared = prepare_http_step(&step, &context, None, None, None, 1, 1)
599            .expect("step should prepare");
600        let events = std::sync::Arc::new(std::sync::Mutex::new(Vec::<&'static str>::new()));
601
602        let started_events = std::sync::Arc::clone(&events);
603        let returned_events = std::sync::Arc::clone(&events);
604        let started = start_prepared_http_step_with_hooks(
605            &client,
606            prepared,
607            &step,
608            || false,
609            move || {
610                let events = std::sync::Arc::clone(&started_events);
611                async move {
612                    events.lock().expect("events lock").push("started");
613                }
614            },
615            move || {
616                let events = std::sync::Arc::clone(&returned_events);
617                async move {
618                    events.lock().expect("events lock").push("returned");
619                }
620            },
621        )
622        .await
623        .expect("start should not be cancelled")
624        .expect("request should start");
625
626        assert_eq!(
627            events.lock().expect("events lock").as_slice(),
628            ["started", "returned"]
629        );
630
631        let body_events = std::sync::Arc::clone(&events);
632        let result = complete_started_http_step_with_hook(
633            started,
634            &step,
635            &context,
636            None,
637            None,
638            None,
639            || false,
640            move || {
641                let events = std::sync::Arc::clone(&body_events);
642                async move {
643                    events.lock().expect("events lock").push("body");
644                }
645            },
646        )
647        .await
648        .expect("complete should not be cancelled");
649
650        assert_eq!(result.status, "success");
651        assert_eq!(
652            events.lock().expect("events lock").as_slice(),
653            ["started", "returned", "body"]
654        );
655    }
656}