1use reqwest::{Client, Method, Url};
2use serde_json::{Map, Value};
3use std::collections::HashMap;
4use std::future::Future;
5use std::time::Instant;
6
7use crate::assertions::{evaluate_assertions, has_status_assertion};
8use crate::core::types::{
9 PipelineStep, RuntimeEnvGroup, RuntimeSpec, StepExecutionResult, StepRequest, StepResponse,
10};
11use crate::execution::cancel::await_with_cancel;
12use crate::execution::http::{parse_absolute_http_url, parse_method};
13use crate::execution::logging::log_step_response;
14use crate::template::resolve::resolve_template_variables;
15
16#[derive(Debug, Clone)]
17pub struct PreparedHttpStep {
18 pub step_id: String,
19 pub attempt: usize,
20 pub max_attempts: usize,
21 pub method: Method,
22 pub url: Url,
23 pub request: StepRequest,
24 started_at: Instant,
25}
26
27#[derive(Debug)]
28pub struct StartedHttpStep {
29 pub request: StepRequest,
30 pub response: reqwest::Response,
31 started_at: Instant,
32 attempt: usize,
33 max_attempts: usize,
34}
35
36pub fn prepare_http_step(
37 step: &PipelineStep,
38 context: &HashMap<String, StepExecutionResult>,
39 specs: Option<&[RuntimeSpec]>,
40 env_groups: Option<&[RuntimeEnvGroup]>,
41 selected_env_group_slug: Option<&str>,
42 attempt: usize,
43 max_attempts: usize,
44) -> Result<PreparedHttpStep, StepExecutionResult> {
45 let started_at = Instant::now();
46 let resolved_url = resolve_template_variables(
47 &Value::String(step.url.clone()),
48 context,
49 specs,
50 env_groups,
51 selected_env_group_slug,
52 )
53 .as_str()
54 .unwrap_or(step.url.as_str())
55 .to_owned();
56
57 let resolved_headers = resolve_template_variables(
58 &serde_json::to_value(&step.headers).unwrap_or(Value::Object(Map::new())),
59 context,
60 specs,
61 env_groups,
62 selected_env_group_slug,
63 )
64 .as_object()
65 .map(|m| {
66 m.iter()
67 .map(|(k, v)| (k.clone(), v.as_str().unwrap_or_default().to_owned()))
68 .collect::<HashMap<String, String>>()
69 })
70 .unwrap_or_default();
71
72 let resolved_body = step.body.as_ref().map(|body| {
73 resolve_template_variables(body, context, specs, env_groups, selected_env_group_slug)
74 });
75
76 let request = StepRequest {
77 method: step.method.clone(),
78 url: resolved_url.clone(),
79 headers: resolved_headers,
80 body: resolved_body,
81 };
82
83 let method = match parse_method(&step.method) {
84 Ok(method) => method,
85 Err(err) => {
86 return Err(invalid_step_result(
87 step,
88 request,
89 err,
90 started_at,
91 attempt,
92 max_attempts,
93 ));
94 }
95 };
96
97 let url = match parse_absolute_http_url(&resolved_url) {
98 Ok(url) => url,
99 Err(err) => {
100 return Err(invalid_step_result(
101 step,
102 request,
103 err,
104 started_at,
105 attempt,
106 max_attempts,
107 ));
108 }
109 };
110
111 Ok(PreparedHttpStep {
112 step_id: step.id.clone(),
113 attempt,
114 max_attempts,
115 method,
116 url,
117 request,
118 started_at,
119 })
120}
121
122pub async fn send_prepared_http_step<FCancel>(
123 client: &Client,
124 prepared: PreparedHttpStep,
125 step: &PipelineStep,
126 context: &HashMap<String, StepExecutionResult>,
127 specs: Option<&[RuntimeSpec]>,
128 env_groups: Option<&[RuntimeEnvGroup]>,
129 selected_env_group_slug: Option<&str>,
130 should_cancel: FCancel,
131) -> Option<StepExecutionResult>
132where
133 FCancel: FnMut() -> bool,
134{
135 send_prepared_http_step_with_hooks(
136 client,
137 prepared,
138 step,
139 context,
140 specs,
141 env_groups,
142 selected_env_group_slug,
143 should_cancel,
144 || async {},
145 || async {},
146 || async {},
147 )
148 .await
149}
150
151pub async fn send_prepared_http_step_with_hooks<
152 FCancel,
153 FStart,
154 FStartFuture,
155 FSend,
156 FSendFuture,
157 FBody,
158 FBodyFuture,
159>(
160 client: &Client,
161 prepared: PreparedHttpStep,
162 step: &PipelineStep,
163 context: &HashMap<String, StepExecutionResult>,
164 specs: Option<&[RuntimeSpec]>,
165 env_groups: Option<&[RuntimeEnvGroup]>,
166 selected_env_group_slug: Option<&str>,
167 mut should_cancel: FCancel,
168 on_send_started: FStart,
169 on_send_returned: FSend,
170 on_body_completed: FBody,
171) -> Option<StepExecutionResult>
172where
173 FCancel: FnMut() -> bool,
174 FStart: FnMut() -> FStartFuture,
175 FStartFuture: Future<Output = ()>,
176 FSend: FnMut() -> FSendFuture,
177 FSendFuture: Future<Output = ()>,
178 FBody: FnMut() -> FBodyFuture,
179 FBodyFuture: Future<Output = ()>,
180{
181 let started = start_prepared_http_step_with_hooks(
182 client,
183 prepared,
184 step,
185 || should_cancel(),
186 on_send_started,
187 on_send_returned,
188 )
189 .await?;
190
191 match started {
192 Ok(started) => {
193 complete_started_http_step_with_hook(
194 started,
195 step,
196 context,
197 specs,
198 env_groups,
199 selected_env_group_slug,
200 should_cancel,
201 on_body_completed,
202 )
203 .await
204 }
205 Err(result) => Some(result),
206 }
207}
208
209pub async fn start_prepared_http_step_with_hooks<
210 FCancel,
211 FStart,
212 FStartFuture,
213 FSend,
214 FSendFuture,
215>(
216 client: &Client,
217 prepared: PreparedHttpStep,
218 step: &PipelineStep,
219 mut should_cancel: FCancel,
220 mut on_send_started: FStart,
221 mut on_send_returned: FSend,
222) -> Option<Result<StartedHttpStep, StepExecutionResult>>
223where
224 FCancel: FnMut() -> bool,
225 FStart: FnMut() -> FStartFuture,
226 FStartFuture: Future<Output = ()>,
227 FSend: FnMut() -> FSendFuture,
228 FSendFuture: Future<Output = ()>,
229{
230 let mut request_builder = client.request(prepared.method.clone(), prepared.url.clone());
231
232 for (key, value) in &prepared.request.headers {
233 request_builder = request_builder.header(key, value);
234 }
235
236 if let Some(body) = prepared.request.body.as_ref() {
237 if !prepared.request.method.eq_ignore_ascii_case("GET")
238 && !prepared.request.method.eq_ignore_ascii_case("HEAD")
239 {
240 request_builder = request_builder.json(body);
241 }
242 }
243
244 let request = prepared.request.clone();
245 if should_cancel() {
246 return None;
247 }
248 on_send_started().await;
249 let Some(send_result) = await_with_cancel(request_builder.send(), &mut should_cancel).await
250 else {
251 return None;
252 };
253 on_send_returned().await;
254
255 match send_result {
256 Ok(response) => Some(Ok(StartedHttpStep {
257 request,
258 response,
259 started_at: prepared.started_at,
260 attempt: prepared.attempt,
261 max_attempts: prepared.max_attempts,
262 })),
263 Err(err) => {
264 let result = step_result(
265 step,
266 request,
267 None,
268 Some(err.to_string()),
269 "error",
270 prepared.started_at,
271 prepared.attempt,
272 prepared.max_attempts,
273 None,
274 );
275 log_step_response(&step.id, None, result.error.as_deref());
276 Some(Err(result))
277 }
278 }
279}
280
281pub async fn complete_started_http_step_with_hook<FCancel, FBody, FBodyFuture>(
282 started: StartedHttpStep,
283 step: &PipelineStep,
284 context: &HashMap<String, StepExecutionResult>,
285 specs: Option<&[RuntimeSpec]>,
286 env_groups: Option<&[RuntimeEnvGroup]>,
287 selected_env_group_slug: Option<&str>,
288 mut should_cancel: FCancel,
289 mut on_body_completed: FBody,
290) -> Option<StepExecutionResult>
291where
292 FCancel: FnMut() -> bool,
293 FBody: FnMut() -> FBodyFuture,
294 FBodyFuture: Future<Output = ()>,
295{
296 let status = started.response.status();
297 let status_text = status.canonical_reason().unwrap_or("").to_owned();
298 let mut headers = HashMap::new();
299 for (key, value) in started.response.headers() {
300 headers.insert(
301 key.as_str().to_owned(),
302 value.to_str().unwrap_or_default().to_owned(),
303 );
304 }
305
306 let content_type = headers
307 .iter()
308 .find(|(key, _)| key.eq_ignore_ascii_case("content-type"))
309 .map(|(_, value)| value.as_str())
310 .unwrap_or("");
311
312 let body = if content_type.contains("application/json") {
313 let Some(body_result) =
314 await_with_cancel(started.response.json::<Value>(), &mut should_cancel).await
315 else {
316 return None;
317 };
318 on_body_completed().await;
319 match body_result {
320 Ok(value) => value,
321 Err(err) => {
322 let result = step_result(
323 step,
324 started.request,
325 None,
326 Some(err.to_string()),
327 "error",
328 started.started_at,
329 started.attempt,
330 started.max_attempts,
331 None,
332 );
333 log_step_response(&step.id, None, result.error.as_deref());
334 return Some(result);
335 }
336 }
337 } else {
338 let Some(body_result) =
339 await_with_cancel(started.response.text(), &mut should_cancel).await
340 else {
341 return None;
342 };
343 on_body_completed().await;
344 Value::String(body_result.unwrap_or_default())
345 };
346
347 let http_error =
348 (!status.is_success()).then(|| format!("HTTP {} {}", status.as_u16(), status_text));
349 let mut result = step_result(
350 step,
351 started.request,
352 Some(StepResponse {
353 status: status.as_u16(),
354 status_text: status_text.clone(),
355 headers,
356 body,
357 }),
358 http_error.clone(),
359 "success",
360 started.started_at,
361 started.attempt,
362 started.max_attempts,
363 None,
364 );
365
366 let has_status_assert = has_status_assertion(step);
367 let assert_results = evaluate_assertions(
368 step,
369 &result,
370 context,
371 specs,
372 env_groups,
373 selected_env_group_slug,
374 );
375 let assertion_failed = assert_results.iter().any(|result| !result.passed);
376 if !assert_results.is_empty() {
377 if assertion_failed {
378 result.status = "error".to_owned();
379 let failed_count = assert_results
380 .iter()
381 .filter(|result| !result.passed)
382 .count();
383 result.error = Some(match result.error {
384 Some(err) => format!("{} | {} assertion(s) failed", err, failed_count),
385 None => format!("{} assertion(s) failed", failed_count),
386 });
387 } else if http_error.is_some() {
388 if has_status_assert {
389 result.status = "success".to_owned();
390 result.error = None;
391 } else {
392 result.status = "error".to_owned();
393 }
394 }
395 result.assert_results = Some(assert_results);
396 } else if http_error.is_some() {
397 result.status = "error".to_owned();
398 }
399
400 log_step_response(&step.id, result.response.as_ref(), result.error.as_deref());
401 Some(result)
402}
403
404fn invalid_step_result(
405 step: &PipelineStep,
406 request: StepRequest,
407 error: String,
408 started_at: Instant,
409 attempt: usize,
410 max_attempts: usize,
411) -> StepExecutionResult {
412 step_result(
413 step,
414 request,
415 None,
416 Some(error),
417 "error",
418 started_at,
419 attempt,
420 max_attempts,
421 None,
422 )
423}
424
425fn step_result(
426 step: &PipelineStep,
427 request: StepRequest,
428 response: Option<StepResponse>,
429 error: Option<String>,
430 status: &str,
431 started_at: Instant,
432 attempt: usize,
433 max_attempts: usize,
434 assert_results: Option<Vec<crate::core::types::AssertionResult>>,
435) -> StepExecutionResult {
436 StepExecutionResult {
437 step_id: step.id.clone(),
438 status: status.to_owned(),
439 request: Some(request),
440 response,
441 error,
442 duration: Some(started_at.elapsed().as_millis()),
443 attempts: if max_attempts > 1 {
444 Some(attempt)
445 } else {
446 None
447 },
448 attempt: Some(attempt),
449 max_attempts: Some(max_attempts),
450 assert_results,
451 }
452}
453
454#[cfg(test)]
455mod tests {
456 use super::*;
457 use crate::core::types::PipelineStep;
458 use httpmock::Method::GET;
459 use serde_json::json;
460 use std::collections::HashMap;
461
462 #[tokio::test]
463 async fn sends_prepared_step_and_returns_success_result() {
464 let server = httpmock::MockServer::start_async().await;
465 server
466 .mock_async(|when, then| {
467 when.method(GET).path("/users");
468 then.status(200)
469 .header("content-type", "application/json")
470 .json_body(json!({"ok": true}));
471 })
472 .await;
473
474 let client = reqwest::Client::new();
475 let step = PipelineStep {
476 id: "get-users".to_owned(),
477 name: "GET users".to_owned(),
478 description: None,
479 method: "GET".to_owned(),
480 url: format!("{}/users", server.base_url()),
481 headers: HashMap::new(),
482 body: None,
483 operation_id: None,
484 delay: None,
485 retry: None,
486 asserts: vec![],
487 };
488 let context = HashMap::new();
489
490 let prepared = prepare_http_step(&step, &context, None, None, None, 1, 1)
491 .expect("step should prepare");
492
493 let result =
494 send_prepared_http_step(&client, prepared, &step, &context, None, None, None, || {
495 false
496 })
497 .await
498 .expect("send should not be cancelled");
499
500 assert_eq!(result.step_id, "get-users");
501 assert_eq!(result.status, "success");
502 assert_eq!(result.response.as_ref().map(|r| r.status), Some(200));
503 }
504
505 #[tokio::test]
506 async fn hooks_report_send_started_before_send_returned() {
507 let server = httpmock::MockServer::start_async().await;
508 server
509 .mock_async(|when, then| {
510 when.method(GET).path("/users");
511 then.status(200)
512 .header("content-type", "application/json")
513 .json_body(json!({"ok": true}));
514 })
515 .await;
516
517 let client = reqwest::Client::new();
518 let step = PipelineStep {
519 id: "get-users".to_owned(),
520 name: "GET users".to_owned(),
521 description: None,
522 method: "GET".to_owned(),
523 url: format!("{}/users", server.base_url()),
524 headers: HashMap::new(),
525 body: None,
526 operation_id: None,
527 delay: None,
528 retry: None,
529 asserts: vec![],
530 };
531 let context = HashMap::new();
532 let prepared = prepare_http_step(&step, &context, None, None, None, 1, 1)
533 .expect("step should prepare");
534 let events = std::sync::Arc::new(std::sync::Mutex::new(Vec::<&'static str>::new()));
535
536 let started_events = std::sync::Arc::clone(&events);
537 let returned_events = std::sync::Arc::clone(&events);
538 let result = send_prepared_http_step_with_hooks(
539 &client,
540 prepared,
541 &step,
542 &context,
543 None,
544 None,
545 None,
546 || false,
547 move || {
548 let events = std::sync::Arc::clone(&started_events);
549 async move {
550 events.lock().expect("events lock").push("started");
551 }
552 },
553 move || {
554 let events = std::sync::Arc::clone(&returned_events);
555 async move {
556 events.lock().expect("events lock").push("returned");
557 }
558 },
559 || async {},
560 )
561 .await
562 .expect("send should not be cancelled");
563
564 assert_eq!(result.status, "success");
565 assert_eq!(
566 events.lock().expect("events lock").as_slice(),
567 ["started", "returned"]
568 );
569 }
570
571 #[tokio::test]
572 async fn split_http_helpers_start_send_before_body_completion() {
573 let server = httpmock::MockServer::start_async().await;
574 server
575 .mock_async(|when, then| {
576 when.method(GET).path("/users");
577 then.status(200)
578 .header("content-type", "application/json")
579 .json_body(json!({"ok": true}));
580 })
581 .await;
582
583 let client = reqwest::Client::new();
584 let step = PipelineStep {
585 id: "get-users".to_owned(),
586 name: "GET users".to_owned(),
587 description: None,
588 method: "GET".to_owned(),
589 url: format!("{}/users", server.base_url()),
590 headers: HashMap::new(),
591 body: None,
592 operation_id: None,
593 delay: None,
594 retry: None,
595 asserts: vec![],
596 };
597 let context = HashMap::new();
598 let prepared = prepare_http_step(&step, &context, None, None, None, 1, 1)
599 .expect("step should prepare");
600 let events = std::sync::Arc::new(std::sync::Mutex::new(Vec::<&'static str>::new()));
601
602 let started_events = std::sync::Arc::clone(&events);
603 let returned_events = std::sync::Arc::clone(&events);
604 let started = start_prepared_http_step_with_hooks(
605 &client,
606 prepared,
607 &step,
608 || false,
609 move || {
610 let events = std::sync::Arc::clone(&started_events);
611 async move {
612 events.lock().expect("events lock").push("started");
613 }
614 },
615 move || {
616 let events = std::sync::Arc::clone(&returned_events);
617 async move {
618 events.lock().expect("events lock").push("returned");
619 }
620 },
621 )
622 .await
623 .expect("start should not be cancelled")
624 .expect("request should start");
625
626 assert_eq!(
627 events.lock().expect("events lock").as_slice(),
628 ["started", "returned"]
629 );
630
631 let body_events = std::sync::Arc::clone(&events);
632 let result = complete_started_http_step_with_hook(
633 started,
634 &step,
635 &context,
636 None,
637 None,
638 None,
639 || false,
640 move || {
641 let events = std::sync::Arc::clone(&body_events);
642 async move {
643 events.lock().expect("events lock").push("body");
644 }
645 },
646 )
647 .await
648 .expect("complete should not be cancelled");
649
650 assert_eq!(result.status, "success");
651 assert_eq!(
652 events.lock().expect("events lock").as_slice(),
653 ["started", "returned", "body"]
654 );
655 }
656}