zino_http/request/
mod.rs

1//! Request context and validation.
2
3use crate::{
4    helper,
5    response::{Rejection, Response, ResponseCode},
6};
7use bytes::Bytes;
8use multer::Multipart;
9use serde::de::DeserializeOwned;
10use std::{borrow::Cow, net::IpAddr, str::FromStr, sync::Arc, time::Instant};
11use zino_channel::{CloudEvent, Subscription};
12use zino_core::{
13    JsonValue, Map, SharedString, Uuid,
14    application::Agent,
15    error::Error,
16    extension::HeaderMapExt,
17    model::{ModelHooks, Query},
18    trace::{TraceContext, TraceState},
19    warn,
20};
21use zino_storage::NamedFile;
22
23#[cfg(feature = "auth")]
24use zino_auth::{AccessKeyId, Authentication, ParseSecurityTokenError, SecurityToken, SessionId};
25
26#[cfg(feature = "auth")]
27use zino_core::{datetime::DateTime, extension::JsonObjectExt, validation::Validation};
28
29#[cfg(feature = "cookie")]
30use cookie::{Cookie, SameSite};
31
32#[cfg(feature = "jwt")]
33use jwt_simple::algorithms::MACLike;
34#[cfg(feature = "jwt")]
35use zino_auth::JwtClaims;
36
37#[cfg(any(feature = "cookie", feature = "jwt"))]
38use std::time::Duration;
39
40#[cfg(feature = "i18n")]
41use fluent::FluentArgs;
42#[cfg(feature = "i18n")]
43use unic_langid::LanguageIdentifier;
44#[cfg(feature = "i18n")]
45use zino_core::i18n::{Intl, IntlError};
46
47mod context;
48
49pub use context::Context;
50
51/// Request context.
52pub trait RequestContext {
53    /// The method type.
54    type Method: AsRef<str>;
55    /// The uri type.
56    type Uri;
57
58    /// Returns the request method.
59    fn request_method(&self) -> &Self::Method;
60
61    /// Returns the original request URI regardless of nesting.
62    fn original_uri(&self) -> &Self::Uri;
63
64    /// Returns the route that matches the request.
65    fn matched_route(&self) -> Cow<'_, str>;
66
67    /// Returns the request path regardless of nesting.
68    fn request_path(&self) -> &str;
69
70    /// Gets the query string of the request.
71    fn get_query_string(&self) -> Option<&str>;
72
73    /// Gets an HTTP header value with the given name.
74    fn get_header(&self, name: &str) -> Option<&str>;
75
76    /// Returns the client's remote IP.
77    fn client_ip(&self) -> Option<IpAddr>;
78
79    /// Gets the request context.
80    fn get_context(&self) -> Option<Arc<Context>>;
81
82    /// Gets the request scoped data.
83    fn get_data<T: Clone + Send + Sync + 'static>(&self) -> Option<T>;
84
85    /// Sets the request scoped data and returns the old value
86    /// if an item of this type was already stored.
87    fn set_data<T: Clone + Send + Sync + 'static>(&mut self, value: T) -> Option<T>;
88
89    /// Reads the entire request body into `Bytes`.
90    async fn read_body_bytes(&mut self) -> Result<Bytes, Error>;
91
92    /// Returns the request path segments.
93    #[inline]
94    fn path_segments(&self) -> Vec<&str> {
95        self.request_path().trim_matches('/').split('/').collect()
96    }
97
98    /// Creates a new request context.
99    fn new_context(&self) -> Context {
100        // Emit metrics.
101        #[cfg(feature = "metrics")]
102        {
103            metrics::gauge!("zino_http_requests_in_flight").increment(1.0);
104            metrics::counter!(
105                "zino_http_requests_total",
106                "method" => self.request_method().as_ref().to_owned(),
107                "route" => self.matched_route().into_owned(),
108            )
109            .increment(1);
110        }
111
112        // Parse tracing headers.
113        let request_id = self
114            .get_header("x-request-id")
115            .and_then(|s| s.parse().ok())
116            .unwrap_or_else(Uuid::now_v7);
117        let trace_id = self
118            .get_trace_context()
119            .map_or_else(Uuid::now_v7, |t| Uuid::from_u128(t.trace_id()));
120        let session_id = self
121            .get_header("x-session-id")
122            .or_else(|| self.get_header("session_id"))
123            .and_then(|s| s.parse().ok());
124
125        // Generate new context.
126        let mut ctx = Context::new(request_id);
127        ctx.set_instance(self.request_path().to_owned());
128        ctx.set_trace_id(trace_id);
129        ctx.set_session_id(session_id);
130
131        // Set locale.
132        #[cfg(feature = "i18n")]
133        {
134            #[cfg(feature = "cookie")]
135            if let Some(cookie) = self.get_cookie("locale") {
136                if let Ok(locale) = cookie.value().parse() {
137                    ctx.set_locale(locale);
138                    return ctx;
139                }
140            }
141
142            if let Some(locale) = self
143                .get_header("accept-language")
144                .and_then(Intl::select_language)
145            {
146                ctx.set_locale(locale);
147            } else {
148                ctx.set_locale(Intl::default_locale().to_owned());
149            }
150        }
151        ctx
152    }
153
154    /// Returns the trace context by parsing the `traceparent` and `tracestate` header values.
155    #[inline]
156    fn get_trace_context(&self) -> Option<TraceContext> {
157        let traceparent = self.get_header("traceparent")?;
158        let mut trace_context = TraceContext::from_traceparent(traceparent)?;
159        if let Some(tracestate) = self.get_header("tracestate") {
160            *trace_context.trace_state_mut() = TraceState::from_tracestate(tracestate);
161        }
162        Some(trace_context)
163    }
164
165    /// Creates a new `TraceContext`.
166    fn new_trace_context(&self) -> TraceContext {
167        let mut trace_context = self
168            .get_trace_context()
169            .or_else(|| {
170                self.get_context()
171                    .map(|ctx| TraceContext::with_trace_id(ctx.trace_id()))
172            })
173            .map(|t| t.child())
174            .unwrap_or_default();
175        trace_context.record_trace_state();
176        trace_context
177    }
178
179    /// Creates a new cookie with the given name and value.
180    #[cfg(feature = "cookie")]
181    fn new_cookie(
182        &self,
183        name: SharedString,
184        value: SharedString,
185        max_age: Option<Duration>,
186    ) -> Cookie<'static> {
187        let mut cookie_builder = Cookie::build((name, value))
188            .http_only(true)
189            .secure(true)
190            .same_site(SameSite::Lax)
191            .path(self.request_path().to_owned());
192        if let Some(max_age) = max_age.and_then(|d| d.try_into().ok()) {
193            cookie_builder = cookie_builder.max_age(max_age);
194        }
195        cookie_builder.build()
196    }
197
198    /// Gets a cookie with the given name.
199    #[cfg(feature = "cookie")]
200    fn get_cookie(&self, name: &str) -> Option<Cookie<'_>> {
201        self.get_header("cookie")?.split(';').find_map(|cookie| {
202            if let Some((key, value)) = cookie.split_once('=') {
203                (key == name).then(|| Cookie::new(key, value))
204            } else {
205                None
206            }
207        })
208    }
209
210    /// Returns the start time.
211    #[inline]
212    fn start_time(&self) -> Instant {
213        self.get_context()
214            .map(|ctx| ctx.start_time())
215            .unwrap_or_else(Instant::now)
216    }
217
218    /// Returns the instance.
219    #[inline]
220    fn instance(&self) -> String {
221        self.get_context()
222            .map(|ctx| ctx.instance().to_owned())
223            .unwrap_or_else(|| self.request_path().to_owned())
224    }
225
226    /// Returns the request ID.
227    #[inline]
228    fn request_id(&self) -> Uuid {
229        self.get_context()
230            .map(|ctx| ctx.request_id())
231            .unwrap_or_default()
232    }
233
234    /// Returns the trace ID.
235    #[inline]
236    fn trace_id(&self) -> Uuid {
237        self.get_context()
238            .map(|ctx| ctx.trace_id())
239            .unwrap_or_default()
240    }
241
242    /// Returns the session ID.
243    #[inline]
244    fn session_id(&self) -> Option<String> {
245        self.get_context()
246            .and_then(|ctx| ctx.session_id().map(|s| s.to_owned()))
247    }
248
249    /// Returns the locale.
250    #[cfg(feature = "i18n")]
251    #[inline]
252    fn locale(&self) -> Option<LanguageIdentifier> {
253        self.get_context().and_then(|ctx| ctx.locale().cloned())
254    }
255
256    /// Gets the data type by parsing the `content-type` header.
257    ///
258    /// # Note
259    ///
260    /// Currently, we support the following values: `bytes` | `csv` | `form` | `json` | `multipart`
261    /// | `ndjson` | `text`.
262    fn data_type(&self) -> Option<&str> {
263        self.get_header("content-type")
264            .map(|content_type| {
265                if let Some((essence, _)) = content_type.split_once(';') {
266                    essence
267                } else {
268                    content_type
269                }
270            })
271            .map(helper::get_data_type)
272    }
273
274    /// Gets the route parameter by name.
275    /// The name should not include `:`, `*`, `{` or `}`.
276    ///
277    /// # Note
278    ///
279    /// Please note that it does not handle the percent-decoding.
280    /// You can use [`decode_param()`](Self::decode_param) or [`parse_param()`](Self::parse_param)
281    /// if you need percent-decoding.
282    fn get_param(&self, name: &str) -> Option<&str> {
283        const CAPTURES: [char; 4] = [':', '*', '{', '}'];
284        if let Some(index) = self
285            .matched_route()
286            .split('/')
287            .position(|segment| segment.trim_matches(CAPTURES.as_slice()) == name)
288        {
289            self.request_path().splitn(index + 2, '/').nth(index)
290        } else {
291            None
292        }
293    }
294
295    /// Decodes the UTF-8 percent-encoded route parameter by name.
296    fn decode_param(&self, name: &str) -> Result<Cow<'_, str>, Rejection> {
297        if let Some(value) = self.get_param(name) {
298            percent_encoding::percent_decode_str(value)
299                .decode_utf8()
300                .map_err(|err| Rejection::from_validation_entry(name.to_owned(), err).context(self))
301        } else {
302            Err(Rejection::from_validation_entry(
303                name.to_owned(),
304                warn!("param `{}` does not exist", name),
305            )
306            .context(self))
307        }
308    }
309
310    /// Parses the route parameter by name as an instance of type `T`.
311    /// The name should not include `:`, `*`, `{` or `}`.
312    fn parse_param<T: FromStr<Err: Into<Error>>>(&self, name: &str) -> Result<T, Rejection> {
313        if let Some(param) = self.get_param(name) {
314            percent_encoding::percent_decode_str(param)
315                .decode_utf8_lossy()
316                .parse::<T>()
317                .map_err(|err| Rejection::from_validation_entry(name.to_owned(), err).context(self))
318        } else {
319            Err(Rejection::from_validation_entry(
320                name.to_owned(),
321                warn!("param `{}` does not exist", name),
322            )
323            .context(self))
324        }
325    }
326
327    /// Gets the query value of the URI by name.
328    ///
329    /// # Note
330    ///
331    /// Please note that it does not handle the percent-decoding.
332    /// You can use [`decode_query()`](Self::decode_query) or [`parse_query()`](Self::parse_query)
333    /// if you need percent-decoding.
334    fn get_query(&self, name: &str) -> Option<&str> {
335        self.get_query_string()?.split('&').find_map(|param| {
336            if let Some((key, value)) = param.split_once('=') {
337                (key == name).then_some(value)
338            } else {
339                None
340            }
341        })
342    }
343
344    /// Decodes the UTF-8 percent-encoded query value of the URI by name.
345    fn decode_query(&self, name: &str) -> Result<Cow<'_, str>, Rejection> {
346        if let Some(value) = self.get_query(name) {
347            percent_encoding::percent_decode_str(value)
348                .decode_utf8()
349                .map_err(|err| Rejection::from_validation_entry(name.to_owned(), err).context(self))
350        } else {
351            Err(Rejection::from_validation_entry(
352                name.to_owned(),
353                warn!("query value `{}` does not exist", name),
354            )
355            .context(self))
356        }
357    }
358
359    /// Parses the query as an instance of type `T`.
360    /// Returns a default value of `T` when the query is empty.
361    /// If the query has a `timestamp` parameter, it will be used to prevent replay attacks.
362    fn parse_query<T: Default + DeserializeOwned>(&self) -> Result<T, Rejection> {
363        if let Some(query) = self.get_query_string() {
364            #[cfg(feature = "jwt")]
365            if let Some(timestamp) = self.get_query("timestamp").and_then(|s| s.parse().ok()) {
366                let duration = DateTime::from_timestamp(timestamp).span_between_now();
367                if duration > zino_auth::default_time_tolerance() {
368                    let err = warn!("timestamp `{}` can not be trusted", timestamp);
369                    let rejection = Rejection::from_validation_entry("timestamp", err);
370                    return Err(rejection.context(self));
371                }
372            }
373            serde_qs::from_str::<T>(query)
374                .map_err(|err| Rejection::from_validation_entry("query", err).context(self))
375        } else {
376            Ok(T::default())
377        }
378    }
379
380    /// Parses the request body as an instance of type `T`.
381    ///
382    /// # Note
383    ///
384    /// Currently, we have built-in support for the following `content-type` header values:
385    ///
386    /// - `application/json`
387    /// - `application/problem+json`
388    /// - `application/x-www-form-urlencoded`
389    async fn parse_body<T: DeserializeOwned>(&mut self) -> Result<T, Rejection> {
390        let data_type = self.data_type().unwrap_or("form");
391        if data_type.contains('/') {
392            let err = warn!(
393                "deserialization of the data type `{}` is unsupported",
394                data_type
395            );
396            let rejection = Rejection::from_validation_entry("data_type", err).context(self);
397            return Err(rejection);
398        }
399
400        let is_form = data_type == "form";
401        let bytes = self
402            .read_body_bytes()
403            .await
404            .map_err(|err| Rejection::from_validation_entry("body", err).context(self))?;
405        if is_form {
406            serde_qs::from_bytes(&bytes)
407                .map_err(|err| Rejection::from_validation_entry("body", err).context(self))
408        } else {
409            serde_json::from_slice(&bytes)
410                .map_err(|err| Rejection::from_validation_entry("body", err).context(self))
411        }
412    }
413
414    /// Parses the request body as a multipart, which is commonly used with file uploads.
415    async fn parse_multipart(&mut self) -> Result<Multipart<'_>, Rejection> {
416        let Some(content_type) = self.get_header("content-type") else {
417            return Err(Rejection::from_validation_entry(
418                "content_type",
419                warn!("invalid `content-type` header"),
420            )
421            .context(self));
422        };
423        match multer::parse_boundary(content_type) {
424            Ok(boundary) => {
425                let result = self.read_body_bytes().await.map_err(|err| err.to_string());
426                let stream = futures::stream::once(async { result });
427                Ok(Multipart::new(stream, boundary))
428            }
429            Err(err) => Err(Rejection::from_validation_entry("boundary", err).context(self)),
430        }
431    }
432
433    /// Parses the request body as a file.
434    async fn parse_file(&mut self) -> Result<NamedFile, Rejection> {
435        let multipart = self.parse_multipart().await?;
436        NamedFile::try_from_multipart(multipart)
437            .await
438            .map_err(|err| Rejection::from_validation_entry("body", err).context(self))
439    }
440
441    /// Parses the request body as a list of files.
442    async fn parse_files(&mut self) -> Result<Vec<NamedFile>, Rejection> {
443        let multipart = self.parse_multipart().await?;
444        NamedFile::try_collect_from_multipart(multipart)
445            .await
446            .map_err(|err| Rejection::from_validation_entry("body", err).context(self))
447    }
448
449    /// Parses the multipart form as an instance of `T` with the `name` and a list of files.
450    async fn parse_form<T: DeserializeOwned>(
451        &mut self,
452        name: &str,
453    ) -> Result<(Option<T>, Vec<NamedFile>), Rejection> {
454        let multipart = self.parse_multipart().await?;
455        helper::parse_form(multipart, name)
456            .await
457            .map_err(|err| Rejection::from_validation_entry("body", err).context(self))
458    }
459
460    /// Parses the `multipart/form-data` as an instance of type `T` and a list of files.
461    async fn parse_form_data<T: DeserializeOwned>(
462        &mut self,
463    ) -> Result<(T, Vec<NamedFile>), Rejection> {
464        let multipart = self.parse_multipart().await?;
465        helper::parse_form_data(multipart)
466            .await
467            .map_err(|err| Rejection::from_validation_entry("body", err).context(self))
468    }
469
470    /// Attempts to construct an instance of `Authentication` from an HTTP request.
471    /// The value is extracted from the query or the `authorization` header.
472    /// By default, the `Accept` header value is ignored and
473    /// the canonicalized resource is set to the request path.
474    #[cfg(feature = "auth")]
475    fn parse_authentication(&self) -> Result<Authentication, Rejection> {
476        let method = self.request_method();
477        let query = self.parse_query::<Map>().unwrap_or_default();
478        let mut authentication = Authentication::new(method.as_ref());
479        let mut validation = Validation::new();
480        if let Some(signature) = query.get_str("signature") {
481            authentication.set_signature(signature.to_owned());
482            if let Some(access_key_id) = query.parse_string("access_key_id") {
483                authentication.set_access_key_id(access_key_id);
484            } else {
485                validation.record("access_key_id", "should be nonempty");
486            }
487            if let Some(Ok(secs)) = query.parse_i64("expires") {
488                if DateTime::now().timestamp() <= secs {
489                    let expires = DateTime::from_timestamp(secs);
490                    authentication.set_expires(Some(expires));
491                } else {
492                    validation.record("expires", "valid period has expired");
493                }
494            } else {
495                validation.record("expires", "invalid timestamp");
496            }
497            if !validation.is_success() {
498                return Err(Rejection::bad_request(validation).context(self));
499            }
500        } else if let Some(authorization) = self.get_header("authorization") {
501            if let Some((service_name, token)) = authorization.split_once(' ') {
502                authentication.set_service_name(service_name);
503                if let Some((access_key_id, signature)) = token.split_once(':') {
504                    authentication.set_access_key_id(access_key_id);
505                    authentication.set_signature(signature.to_owned());
506                } else {
507                    validation.record("authorization", "invalid header value");
508                }
509            } else {
510                validation.record("authorization", "invalid service name");
511            }
512            if !validation.is_success() {
513                return Err(Rejection::bad_request(validation).context(self));
514            }
515        }
516        if let Some(content_md5) = self.get_header("content-md5") {
517            authentication.set_content_md5(content_md5.to_owned());
518        }
519        if let Some(date) = self.get_header("date") {
520            match DateTime::parse_utc_str(date) {
521                Ok(date) => {
522                    #[cfg(feature = "jwt")]
523                    if date.span_between_now() <= zino_auth::default_time_tolerance() {
524                        authentication.set_date_header("date", date);
525                    } else {
526                        validation.record("date", "untrusted date");
527                    }
528                    #[cfg(not(feature = "jwt"))]
529                    authentication.set_date_header("date", date);
530                }
531                Err(err) => {
532                    validation.record_fail("date", err);
533                    return Err(Rejection::bad_request(validation).context(self));
534                }
535            }
536        }
537        authentication.set_content_type(self.get_header("content-type").map(|s| s.to_owned()));
538        authentication.set_resource(self.request_path().to_owned(), None);
539        Ok(authentication)
540    }
541
542    /// Attempts to construct an instance of `AccessKeyId` from an HTTP request.
543    /// The value is extracted from the query parameter `access_key_id`
544    /// or the `authorization` header.
545    #[cfg(feature = "auth")]
546    fn parse_access_key_id(&self) -> Result<AccessKeyId, Rejection> {
547        if let Some(access_key_id) = self.get_query("access_key_id") {
548            Ok(access_key_id.into())
549        } else {
550            let mut validation = Validation::new();
551            if let Some(authorization) = self.get_header("authorization") {
552                if let Some((_, token)) = authorization.split_once(' ') {
553                    let access_key_id = if let Some((access_key_id, _)) = token.split_once(':') {
554                        access_key_id
555                    } else {
556                        token
557                    };
558                    return Ok(access_key_id.into());
559                } else {
560                    validation.record("authorization", "invalid service name");
561                }
562            } else {
563                validation.record("authorization", "invalid value to get the access key id");
564            }
565            Err(Rejection::bad_request(validation).context(self))
566        }
567    }
568
569    /// Attempts to construct an instance of `SecurityToken` from an HTTP request.
570    /// The value is extracted from the `x-security-token` header.
571    #[cfg(feature = "auth")]
572    fn parse_security_token(&self, key: &[u8]) -> Result<SecurityToken, Rejection> {
573        use ParseSecurityTokenError::*;
574        let query = self.parse_query::<Map>()?;
575        let mut validation = Validation::new();
576        if let Some(token) = self
577            .get_header("x-security-token")
578            .or_else(|| query.get_str("security_token"))
579        {
580            match SecurityToken::parse_with(token.to_owned(), key) {
581                Ok(security_token) => {
582                    if let Some(access_key_id) = query.get_str("access_key_id") {
583                        if security_token.access_key_id().as_str() != access_key_id {
584                            validation.record("access_key_id", "untrusted access key ID");
585                        }
586                    }
587                    if let Some(Ok(expires)) = query.parse_i64("expires") {
588                        if security_token.expires_at().timestamp() != expires {
589                            validation.record("expires", "untrusted timestamp");
590                        }
591                    }
592                    if validation.is_success() {
593                        return Ok(security_token);
594                    }
595                }
596                Err(err) => {
597                    let field = match err {
598                        DecodeError(_) | InvalidFormat => "security_token",
599                        ParseExpiresError(_) | ValidPeriodExpired(_) => "expires",
600                    };
601                    validation.record_fail(field, err);
602                }
603            }
604        } else {
605            validation.record("security_token", "should be nonempty");
606        }
607        Err(Rejection::bad_request(validation).context(self))
608    }
609
610    /// Attempts to construct an instance of `SessionId` from an HTTP request.
611    /// The value is extracted from the `x-session-id` or `session-id` header.
612    #[cfg(feature = "auth")]
613    fn parse_session_id(&self) -> Result<SessionId, Rejection> {
614        self.get_header("x-session-id")
615            .or_else(|| self.get_header("session-id"))
616            .ok_or_else(|| {
617                Rejection::from_validation_entry(
618                    "session_id",
619                    warn!("a `session-id` or `x-session-id` header is required"),
620                )
621                .context(self)
622            })
623            .and_then(|session_id| {
624                SessionId::parse(session_id).map_err(|err| {
625                    Rejection::from_validation_entry("session_id", err).context(self)
626                })
627            })
628    }
629
630    /// Attempts to construct an instance of `JwtClaims` from an HTTP request.
631    /// The value is extracted from the query parameter `access_token` or
632    /// the `authorization` header.
633    #[cfg(feature = "jwt")]
634    fn parse_jwt_claims<T, K>(&self, key: &K) -> Result<JwtClaims<T>, Rejection>
635    where
636        T: Default + serde::Serialize + DeserializeOwned,
637        K: MACLike,
638    {
639        let (param, mut token) = match self.get_query("access_token") {
640            Some(access_token) => ("access_token", access_token),
641            None => ("authorization", ""),
642        };
643        if let Some(authorization) = self.get_header("authorization") {
644            token = authorization
645                .strip_prefix("Bearer ")
646                .unwrap_or(authorization);
647        } else if cfg!(feature = "cookie") {
648            let value = self.get_header("cookie").and_then(|s| {
649                s.split(';').find_map(|cookie| {
650                    if let Some((key, value)) = cookie.split_once('=') {
651                        (key == "access_token").then_some(value)
652                    } else {
653                        None
654                    }
655                })
656            });
657            if let Some(access_token) = value {
658                token = access_token;
659            }
660        }
661        if token.is_empty() {
662            let mut validation = Validation::new();
663            validation.record(param, "JWT should be nonempty");
664            return Err(Rejection::bad_request(validation).context(self));
665        }
666
667        let mut options = zino_auth::default_verification_options();
668        options.reject_before = self
669            .get_query("timestamp")
670            .and_then(|s| s.parse().ok())
671            .map(|i| Duration::from_secs(i).into());
672        options.required_nonce = self.get_query("nonce").map(|s| s.to_owned());
673
674        match key.verify_token(token, Some(options)) {
675            Ok(claims) => Ok(claims.into()),
676            Err(err) => {
677                let message = format!("401 Unauthorized: {err}");
678                Err(Rejection::with_message(message).context(self))
679            }
680        }
681    }
682
683    /// Returns a `Response` or `Rejection` from a model query validation.
684    /// The data is extracted from [`parse_query()`](RequestContext::parse_query).
685    fn query_validation<S>(&self, query: &mut Query) -> Result<Response<S>, Rejection>
686    where
687        Self: Sized,
688        S: ResponseCode,
689    {
690        match self.parse_query() {
691            Ok(data) => {
692                let validation = query.read_map(&data);
693                if validation.is_success() {
694                    Ok(Response::with_context(S::OK, self))
695                } else {
696                    Err(Rejection::bad_request(validation).context(self))
697                }
698            }
699            Err(rejection) => Err(rejection),
700        }
701    }
702
703    /// Returns a `Response` or `Rejection` from a model validation.
704    /// The data is extracted from [`parse_body()`](RequestContext::parse_body).
705    async fn model_validation<M, S>(&mut self, model: &mut M) -> Result<Response<S>, Rejection>
706    where
707        Self: Sized,
708        M: ModelHooks,
709        S: ResponseCode,
710    {
711        let data_type = self.data_type().unwrap_or("form");
712        if data_type.contains('/') {
713            let err = warn!(
714                "deserialization of the data type `{}` is unsupported",
715                data_type
716            );
717            let rejection = Rejection::from_validation_entry("data_type", err).context(self);
718            return Err(rejection);
719        }
720        M::before_extract()
721            .await
722            .map_err(|err| Rejection::from_error(err).context(self))?;
723
724        let is_form = data_type == "form";
725        let bytes = self
726            .read_body_bytes()
727            .await
728            .map_err(|err| Rejection::from_validation_entry("body", err).context(self))?;
729        let extension = self.get_data::<M::Extension>();
730        if is_form {
731            let mut data = serde_qs::from_bytes(&bytes)
732                .map_err(|err| Rejection::from_validation_entry("body", err).context(self))?;
733            match M::before_validation(&mut data, extension.as_ref()).await {
734                Ok(()) => {
735                    let validation = model.read_map(&data);
736                    model
737                        .after_validation(&mut data)
738                        .await
739                        .map_err(|err| Rejection::from_error(err).context(self))?;
740                    if let Some(extension) = extension {
741                        model
742                            .after_extract(extension)
743                            .await
744                            .map_err(|err| Rejection::from_error(err).context(self))?;
745                    }
746                    if validation.is_success() {
747                        Ok(Response::with_context(S::OK, self))
748                    } else {
749                        Err(Rejection::bad_request(validation).context(self))
750                    }
751                }
752                Err(err) => Err(Rejection::from_error(err).context(self)),
753            }
754        } else {
755            let mut data = serde_json::from_slice(&bytes)
756                .map_err(|err| Rejection::from_validation_entry("body", err).context(self))?;
757            match M::before_validation(&mut data, extension.as_ref()).await {
758                Ok(()) => {
759                    let validation = model.read_map(&data);
760                    model
761                        .after_validation(&mut data)
762                        .await
763                        .map_err(|err| Rejection::from_error(err).context(self))?;
764                    if let Some(extension) = extension {
765                        model
766                            .after_extract(extension)
767                            .await
768                            .map_err(|err| Rejection::from_error(err).context(self))?;
769                    }
770                    if validation.is_success() {
771                        Ok(Response::with_context(S::OK, self))
772                    } else {
773                        Err(Rejection::bad_request(validation).context(self))
774                    }
775                }
776                Err(err) => Err(Rejection::from_error(err).context(self)),
777            }
778        }
779    }
780
781    /// Makes an HTTP request to the provided URL.
782    async fn fetch(&self, url: &str, options: Option<&Map>) -> Result<reqwest::Response, Error> {
783        let trace_context = self.new_trace_context();
784        Agent::request_builder(url, options)?
785            .header("traceparent", trace_context.traceparent())
786            .header("tracestate", trace_context.tracestate())
787            .send()
788            .await
789            .map_err(Error::from)
790    }
791
792    /// Makes an HTTP request to the provided URL and
793    /// deserializes the response body via JSON.
794    async fn fetch_json<T: DeserializeOwned>(
795        &self,
796        url: &str,
797        options: Option<&Map>,
798    ) -> Result<T, Error> {
799        let response = self.fetch(url, options).await?.error_for_status()?;
800        let data = if response.headers().has_json_content_type() {
801            response.json().await?
802        } else {
803            let text = response.text().await?;
804            serde_json::from_str(&text)?
805        };
806        Ok(data)
807    }
808
809    /// Translates the localization message.
810    #[cfg(feature = "i18n")]
811    fn translate(
812        &self,
813        message: &str,
814        args: Option<FluentArgs>,
815    ) -> Result<SharedString, IntlError> {
816        if let Some(locale) = self.locale() {
817            Intl::translate_with(message, args, &locale)
818        } else {
819            Intl::translate(message, args)
820        }
821    }
822
823    /// Constructs a new subscription instance.
824    fn subscription(&self) -> Subscription {
825        let mut subscription = self.parse_query::<Subscription>().unwrap_or_default();
826        if subscription.session_id().is_none()
827            && let Some(session_id) = self.session_id()
828        {
829            subscription.set_session_id(Some(session_id));
830        }
831        subscription
832    }
833
834    /// Constructs a new cloud event instance.
835    fn cloud_event(&self, event_type: SharedString, data: JsonValue) -> CloudEvent {
836        let id = self.request_id();
837        let source = self.instance();
838        let mut event = CloudEvent::new(id, source, event_type);
839        if let Some(session_id) = self.session_id() {
840            event.set_session_id(session_id);
841        }
842        event.set_data(data);
843        event
844    }
845}