zino_http/request/
mod.rs

1//! Request context and validation.
2
3use crate::{
4    helper,
5    response::{Rejection, Response, ResponseCode},
6};
7use bytes::Bytes;
8use multer::Multipart;
9use serde::de::DeserializeOwned;
10use std::{borrow::Cow, net::IpAddr, str::FromStr, sync::Arc, time::Instant};
11use zino_channel::{CloudEvent, Subscription};
12use zino_core::{
13    JsonValue, Map, SharedString, Uuid,
14    application::Agent,
15    error::Error,
16    extension::HeaderMapExt,
17    model::{ModelHooks, Query},
18    trace::{TraceContext, TraceState},
19    warn,
20};
21use zino_storage::NamedFile;
22
23#[cfg(feature = "auth")]
24use zino_auth::{AccessKeyId, Authentication, ParseSecurityTokenError, SecurityToken, SessionId};
25
26#[cfg(feature = "auth")]
27use zino_core::{datetime::DateTime, extension::JsonObjectExt, validation::Validation};
28
29#[cfg(feature = "cookie")]
30use cookie::{Cookie, SameSite};
31
32#[cfg(feature = "jwt")]
33use jwt_simple::algorithms::MACLike;
34#[cfg(feature = "jwt")]
35use zino_auth::JwtClaims;
36
37#[cfg(any(feature = "cookie", feature = "jwt"))]
38use std::time::Duration;
39
40#[cfg(feature = "i18n")]
41use crate::i18n;
42#[cfg(feature = "i18n")]
43use fluent::FluentArgs;
44#[cfg(feature = "i18n")]
45use unic_langid::LanguageIdentifier;
46
47mod context;
48
49pub use context::Context;
50
51/// Request context.
52pub trait RequestContext {
53    /// The method type.
54    type Method: AsRef<str>;
55    /// The uri type.
56    type Uri;
57
58    /// Returns the request method.
59    fn request_method(&self) -> &Self::Method;
60
61    /// Returns the original request URI regardless of nesting.
62    fn original_uri(&self) -> &Self::Uri;
63
64    /// Returns the route that matches the request.
65    fn matched_route(&self) -> Cow<'_, str>;
66
67    /// Returns the request path regardless of nesting.
68    fn request_path(&self) -> &str;
69
70    /// Gets the query string of the request.
71    fn get_query_string(&self) -> Option<&str>;
72
73    /// Gets an HTTP header value with the given name.
74    fn get_header(&self, name: &str) -> Option<&str>;
75
76    /// Returns the client's remote IP.
77    fn client_ip(&self) -> Option<IpAddr>;
78
79    /// Gets the request context.
80    fn get_context(&self) -> Option<Arc<Context>>;
81
82    /// Gets the request scoped data.
83    fn get_data<T: Clone + Send + Sync + 'static>(&self) -> Option<T>;
84
85    /// Sets the request scoped data and returns the old value
86    /// if an item of this type was already stored.
87    fn set_data<T: Clone + Send + Sync + 'static>(&mut self, value: T) -> Option<T>;
88
89    /// Reads the entire request body into `Bytes`.
90    async fn read_body_bytes(&mut self) -> Result<Bytes, Error>;
91
92    /// Returns the request path segments.
93    #[inline]
94    fn path_segments(&self) -> Vec<&str> {
95        self.request_path().trim_matches('/').split('/').collect()
96    }
97
98    /// Creates a new request context.
99    fn new_context(&self) -> Context {
100        // Emit metrics.
101        #[cfg(feature = "metrics")]
102        {
103            metrics::gauge!("zino_http_requests_in_flight").increment(1.0);
104            metrics::counter!(
105                "zino_http_requests_total",
106                "method" => self.request_method().as_ref().to_owned(),
107                "route" => self.matched_route().into_owned(),
108            )
109            .increment(1);
110        }
111
112        // Parse tracing headers.
113        let request_id = self
114            .get_header("x-request-id")
115            .and_then(|s| s.parse().ok())
116            .unwrap_or_else(Uuid::now_v7);
117        let trace_id = self
118            .get_trace_context()
119            .map_or_else(Uuid::now_v7, |t| Uuid::from_u128(t.trace_id()));
120        let session_id = self
121            .get_header("x-session-id")
122            .or_else(|| self.get_header("session_id"))
123            .and_then(|s| s.parse().ok());
124
125        // Generate new context.
126        let mut ctx = Context::new(request_id);
127        ctx.set_instance(self.request_path());
128        ctx.set_trace_id(trace_id);
129        ctx.set_session_id(session_id);
130
131        // Set locale.
132        #[cfg(feature = "i18n")]
133        {
134            #[cfg(feature = "cookie")]
135            if let Some(cookie) = self.get_cookie("locale") {
136                ctx.set_locale(cookie.value());
137                return ctx;
138            }
139
140            let supported_locales = i18n::SUPPORTED_LOCALES.as_slice();
141            let locale = self
142                .get_header("accept-language")
143                .and_then(|languages| helper::select_language(languages, supported_locales))
144                .unwrap_or(&i18n::DEFAULT_LOCALE);
145            ctx.set_locale(locale);
146        }
147        ctx
148    }
149
150    /// Returns the trace context by parsing the `traceparent` and `tracestate` header values.
151    #[inline]
152    fn get_trace_context(&self) -> Option<TraceContext> {
153        let traceparent = self.get_header("traceparent")?;
154        let mut trace_context = TraceContext::from_traceparent(traceparent)?;
155        if let Some(tracestate) = self.get_header("tracestate") {
156            *trace_context.trace_state_mut() = TraceState::from_tracestate(tracestate);
157        }
158        Some(trace_context)
159    }
160
161    /// Creates a new `TraceContext`.
162    fn new_trace_context(&self) -> TraceContext {
163        let mut trace_context = self
164            .get_trace_context()
165            .or_else(|| {
166                self.get_context()
167                    .map(|ctx| TraceContext::with_trace_id(ctx.trace_id()))
168            })
169            .map(|t| t.child())
170            .unwrap_or_default();
171        trace_context.record_trace_state();
172        trace_context
173    }
174
175    /// Creates a new cookie with the given name and value.
176    #[cfg(feature = "cookie")]
177    fn new_cookie(
178        &self,
179        name: SharedString,
180        value: SharedString,
181        max_age: Option<Duration>,
182    ) -> Cookie<'static> {
183        let mut cookie_builder = Cookie::build((name, value))
184            .http_only(true)
185            .secure(true)
186            .same_site(SameSite::Lax)
187            .path(self.request_path().to_owned());
188        if let Some(max_age) = max_age.and_then(|d| d.try_into().ok()) {
189            cookie_builder = cookie_builder.max_age(max_age);
190        }
191        cookie_builder.build()
192    }
193
194    /// Gets a cookie with the given name.
195    #[cfg(feature = "cookie")]
196    fn get_cookie(&self, name: &str) -> Option<Cookie<'_>> {
197        self.get_header("cookie")?.split(';').find_map(|cookie| {
198            if let Some((key, value)) = cookie.split_once('=') {
199                (key == name).then(|| Cookie::new(key, value))
200            } else {
201                None
202            }
203        })
204    }
205
206    /// Returns the start time.
207    #[inline]
208    fn start_time(&self) -> Instant {
209        self.get_context()
210            .map(|ctx| ctx.start_time())
211            .unwrap_or_else(Instant::now)
212    }
213
214    /// Returns the instance.
215    #[inline]
216    fn instance(&self) -> String {
217        self.get_context()
218            .map(|ctx| ctx.instance().to_owned())
219            .unwrap_or_else(|| self.request_path().to_owned())
220    }
221
222    /// Returns the request ID.
223    #[inline]
224    fn request_id(&self) -> Uuid {
225        self.get_context()
226            .map(|ctx| ctx.request_id())
227            .unwrap_or_default()
228    }
229
230    /// Returns the trace ID.
231    #[inline]
232    fn trace_id(&self) -> Uuid {
233        self.get_context()
234            .map(|ctx| ctx.trace_id())
235            .unwrap_or_default()
236    }
237
238    /// Returns the session ID.
239    #[inline]
240    fn session_id(&self) -> Option<String> {
241        self.get_context()
242            .and_then(|ctx| ctx.session_id().map(|s| s.to_owned()))
243    }
244
245    /// Returns the locale.
246    #[cfg(feature = "i18n")]
247    #[inline]
248    fn locale(&self) -> Option<LanguageIdentifier> {
249        self.get_context().and_then(|ctx| ctx.locale().cloned())
250    }
251
252    /// Gets the data type by parsing the `content-type` header.
253    ///
254    /// # Note
255    ///
256    /// Currently, we support the following values: `bytes` | `csv` | `form` | `json` | `multipart`
257    /// | `ndjson` | `text`.
258    fn data_type(&self) -> Option<&str> {
259        self.get_header("content-type")
260            .map(|content_type| {
261                if let Some((essence, _)) = content_type.split_once(';') {
262                    essence
263                } else {
264                    content_type
265                }
266            })
267            .map(helper::get_data_type)
268    }
269
270    /// Gets the route parameter by name.
271    /// The name should not include `:`, `*`, `{` or `}`.
272    ///
273    /// # Note
274    ///
275    /// Please note that it does not handle the percent-decoding.
276    /// You can use [`decode_param()`](Self::decode_param) or [`parse_param()`](Self::parse_param)
277    /// if you need percent-decoding.
278    fn get_param(&self, name: &str) -> Option<&str> {
279        const CAPTURES: [char; 4] = [':', '*', '{', '}'];
280        if let Some(index) = self
281            .matched_route()
282            .split('/')
283            .position(|segment| segment.trim_matches(CAPTURES.as_slice()) == name)
284        {
285            self.request_path().splitn(index + 2, '/').nth(index)
286        } else {
287            None
288        }
289    }
290
291    /// Decodes the UTF-8 percent-encoded route parameter by name.
292    fn decode_param(&self, name: &str) -> Result<Cow<'_, str>, Rejection> {
293        if let Some(value) = self.get_param(name) {
294            percent_encoding::percent_decode_str(value)
295                .decode_utf8()
296                .map_err(|err| Rejection::from_validation_entry(name.to_owned(), err).context(self))
297        } else {
298            Err(Rejection::from_validation_entry(
299                name.to_owned(),
300                warn!("param `{}` does not exist", name),
301            )
302            .context(self))
303        }
304    }
305
306    /// Parses the route parameter by name as an instance of type `T`.
307    /// The name should not include `:`, `*`, `{` or `}`.
308    fn parse_param<T: FromStr<Err: Into<Error>>>(&self, name: &str) -> Result<T, Rejection> {
309        if let Some(param) = self.get_param(name) {
310            percent_encoding::percent_decode_str(param)
311                .decode_utf8_lossy()
312                .parse::<T>()
313                .map_err(|err| Rejection::from_validation_entry(name.to_owned(), err).context(self))
314        } else {
315            Err(Rejection::from_validation_entry(
316                name.to_owned(),
317                warn!("param `{}` does not exist", name),
318            )
319            .context(self))
320        }
321    }
322
323    /// Gets the query value of the URI by name.
324    ///
325    /// # Note
326    ///
327    /// Please note that it does not handle the percent-decoding.
328    /// You can use [`decode_query()`](Self::decode_query) or [`parse_query()`](Self::parse_query)
329    /// if you need percent-decoding.
330    fn get_query(&self, name: &str) -> Option<&str> {
331        self.get_query_string()?.split('&').find_map(|param| {
332            if let Some((key, value)) = param.split_once('=') {
333                (key == name).then_some(value)
334            } else {
335                None
336            }
337        })
338    }
339
340    /// Decodes the UTF-8 percent-encoded query value of the URI by name.
341    fn decode_query(&self, name: &str) -> Result<Cow<'_, str>, Rejection> {
342        if let Some(value) = self.get_query(name) {
343            percent_encoding::percent_decode_str(value)
344                .decode_utf8()
345                .map_err(|err| Rejection::from_validation_entry(name.to_owned(), err).context(self))
346        } else {
347            Err(Rejection::from_validation_entry(
348                name.to_owned(),
349                warn!("query value `{}` does not exist", name),
350            )
351            .context(self))
352        }
353    }
354
355    /// Parses the query as an instance of type `T`.
356    /// Returns a default value of `T` when the query is empty.
357    /// If the query has a `timestamp` parameter, it will be used to prevent replay attacks.
358    fn parse_query<T: Default + DeserializeOwned>(&self) -> Result<T, Rejection> {
359        if let Some(query) = self.get_query_string() {
360            #[cfg(feature = "jwt")]
361            if let Some(timestamp) = self.get_query("timestamp").and_then(|s| s.parse().ok()) {
362                let duration = DateTime::from_timestamp(timestamp).span_between_now();
363                if duration > zino_auth::default_time_tolerance() {
364                    let err = warn!("timestamp `{}` can not be trusted", timestamp);
365                    let rejection = Rejection::from_validation_entry("timestamp", err);
366                    return Err(rejection.context(self));
367                }
368            }
369            serde_qs::from_str::<T>(query)
370                .map_err(|err| Rejection::from_validation_entry("query", err).context(self))
371        } else {
372            Ok(T::default())
373        }
374    }
375
376    /// Parses the request body as an instance of type `T`.
377    ///
378    /// # Note
379    ///
380    /// Currently, we have built-in support for the following `content-type` header values:
381    ///
382    /// - `application/json`
383    /// - `application/problem+json`
384    /// - `application/x-www-form-urlencoded`
385    async fn parse_body<T: DeserializeOwned>(&mut self) -> Result<T, Rejection> {
386        let data_type = self.data_type().unwrap_or("form");
387        if data_type.contains('/') {
388            let err = warn!(
389                "deserialization of the data type `{}` is unsupported",
390                data_type
391            );
392            let rejection = Rejection::from_validation_entry("data_type", err).context(self);
393            return Err(rejection);
394        }
395
396        let is_form = data_type == "form";
397        let bytes = self
398            .read_body_bytes()
399            .await
400            .map_err(|err| Rejection::from_validation_entry("body", err).context(self))?;
401        if is_form {
402            serde_qs::from_bytes(&bytes)
403                .map_err(|err| Rejection::from_validation_entry("body", err).context(self))
404        } else {
405            serde_json::from_slice(&bytes)
406                .map_err(|err| Rejection::from_validation_entry("body", err).context(self))
407        }
408    }
409
410    /// Parses the request body as a multipart, which is commonly used with file uploads.
411    async fn parse_multipart(&mut self) -> Result<Multipart, Rejection> {
412        let Some(content_type) = self.get_header("content-type") else {
413            return Err(Rejection::from_validation_entry(
414                "content_type",
415                warn!("invalid `content-type` header"),
416            )
417            .context(self));
418        };
419        match multer::parse_boundary(content_type) {
420            Ok(boundary) => {
421                let result = self.read_body_bytes().await.map_err(|err| err.to_string());
422                let stream = futures::stream::once(async { result });
423                Ok(Multipart::new(stream, boundary))
424            }
425            Err(err) => Err(Rejection::from_validation_entry("boundary", err).context(self)),
426        }
427    }
428
429    /// Parses the request body as a file.
430    async fn parse_file(&mut self) -> Result<NamedFile, Rejection> {
431        let multipart = self.parse_multipart().await?;
432        NamedFile::try_from_multipart(multipart)
433            .await
434            .map_err(|err| Rejection::from_validation_entry("body", err).context(self))
435    }
436
437    /// Parses the request body as a list of files.
438    async fn parse_files(&mut self) -> Result<Vec<NamedFile>, Rejection> {
439        let multipart = self.parse_multipart().await?;
440        NamedFile::try_collect_from_multipart(multipart)
441            .await
442            .map_err(|err| Rejection::from_validation_entry("body", err).context(self))
443    }
444
445    /// Parses the multipart form as an instance of `T` with the `name` and a list of files.
446    async fn parse_form<T: DeserializeOwned>(
447        &mut self,
448        name: &str,
449    ) -> Result<(Option<T>, Vec<NamedFile>), Rejection> {
450        let multipart = self.parse_multipart().await?;
451        helper::parse_form(multipart, name)
452            .await
453            .map_err(|err| Rejection::from_validation_entry("body", err).context(self))
454    }
455
456    /// Parses the `multipart/form-data` as an instance of type `T` and a list of files.
457    async fn parse_form_data<T: DeserializeOwned>(
458        &mut self,
459    ) -> Result<(T, Vec<NamedFile>), Rejection> {
460        let multipart = self.parse_multipart().await?;
461        helper::parse_form_data(multipart)
462            .await
463            .map_err(|err| Rejection::from_validation_entry("body", err).context(self))
464    }
465
466    /// Attempts to construct an instance of `Authentication` from an HTTP request.
467    /// The value is extracted from the query or the `authorization` header.
468    /// By default, the `Accept` header value is ignored and
469    /// the canonicalized resource is set to the request path.
470    #[cfg(feature = "auth")]
471    fn parse_authentication(&self) -> Result<Authentication, Rejection> {
472        let method = self.request_method();
473        let query = self.parse_query::<Map>().unwrap_or_default();
474        let mut authentication = Authentication::new(method.as_ref());
475        let mut validation = Validation::new();
476        if let Some(signature) = query.get_str("signature") {
477            authentication.set_signature(signature.to_owned());
478            if let Some(access_key_id) = query.parse_string("access_key_id") {
479                authentication.set_access_key_id(access_key_id);
480            } else {
481                validation.record("access_key_id", "should be nonempty");
482            }
483            if let Some(Ok(secs)) = query.parse_i64("expires") {
484                if DateTime::now().timestamp() <= secs {
485                    let expires = DateTime::from_timestamp(secs);
486                    authentication.set_expires(Some(expires));
487                } else {
488                    validation.record("expires", "valid period has expired");
489                }
490            } else {
491                validation.record("expires", "invalid timestamp");
492            }
493            if !validation.is_success() {
494                return Err(Rejection::bad_request(validation).context(self));
495            }
496        } else if let Some(authorization) = self.get_header("authorization") {
497            if let Some((service_name, token)) = authorization.split_once(' ') {
498                authentication.set_service_name(service_name);
499                if let Some((access_key_id, signature)) = token.split_once(':') {
500                    authentication.set_access_key_id(access_key_id);
501                    authentication.set_signature(signature.to_owned());
502                } else {
503                    validation.record("authorization", "invalid header value");
504                }
505            } else {
506                validation.record("authorization", "invalid service name");
507            }
508            if !validation.is_success() {
509                return Err(Rejection::bad_request(validation).context(self));
510            }
511        }
512        if let Some(content_md5) = self.get_header("content-md5") {
513            authentication.set_content_md5(content_md5.to_owned());
514        }
515        if let Some(date) = self.get_header("date") {
516            match DateTime::parse_utc_str(date) {
517                Ok(date) => {
518                    #[cfg(feature = "jwt")]
519                    if date.span_between_now() <= zino_auth::default_time_tolerance() {
520                        authentication.set_date_header("date", date);
521                    } else {
522                        validation.record("date", "untrusted date");
523                    }
524                    #[cfg(not(feature = "jwt"))]
525                    authentication.set_date_header("date", date);
526                }
527                Err(err) => {
528                    validation.record_fail("date", err);
529                    return Err(Rejection::bad_request(validation).context(self));
530                }
531            }
532        }
533        authentication.set_content_type(self.get_header("content-type").map(|s| s.to_owned()));
534        authentication.set_resource(self.request_path().to_owned(), None);
535        Ok(authentication)
536    }
537
538    /// Attempts to construct an instance of `AccessKeyId` from an HTTP request.
539    /// The value is extracted from the query parameter `access_key_id`
540    /// or the `authorization` header.
541    #[cfg(feature = "auth")]
542    fn parse_access_key_id(&self) -> Result<AccessKeyId, Rejection> {
543        if let Some(access_key_id) = self.get_query("access_key_id") {
544            Ok(access_key_id.into())
545        } else {
546            let mut validation = Validation::new();
547            if let Some(authorization) = self.get_header("authorization") {
548                if let Some((_, token)) = authorization.split_once(' ') {
549                    let access_key_id = if let Some((access_key_id, _)) = token.split_once(':') {
550                        access_key_id
551                    } else {
552                        token
553                    };
554                    return Ok(access_key_id.into());
555                } else {
556                    validation.record("authorization", "invalid service name");
557                }
558            } else {
559                validation.record("authorization", "invalid value to get the access key id");
560            }
561            Err(Rejection::bad_request(validation).context(self))
562        }
563    }
564
565    /// Attempts to construct an instance of `SecurityToken` from an HTTP request.
566    /// The value is extracted from the `x-security-token` header.
567    #[cfg(feature = "auth")]
568    fn parse_security_token(&self, key: &[u8]) -> Result<SecurityToken, Rejection> {
569        use ParseSecurityTokenError::*;
570        let query = self.parse_query::<Map>()?;
571        let mut validation = Validation::new();
572        if let Some(token) = self
573            .get_header("x-security-token")
574            .or_else(|| query.get_str("security_token"))
575        {
576            match SecurityToken::parse_with(token.to_owned(), key) {
577                Ok(security_token) => {
578                    if let Some(access_key_id) = query.get_str("access_key_id") {
579                        if security_token.access_key_id().as_str() != access_key_id {
580                            validation.record("access_key_id", "untrusted access key ID");
581                        }
582                    }
583                    if let Some(Ok(expires)) = query.parse_i64("expires") {
584                        if security_token.expires_at().timestamp() != expires {
585                            validation.record("expires", "untrusted timestamp");
586                        }
587                    }
588                    if validation.is_success() {
589                        return Ok(security_token);
590                    }
591                }
592                Err(err) => {
593                    let field = match err {
594                        DecodeError(_) | InvalidFormat => "security_token",
595                        ParseExpiresError(_) | ValidPeriodExpired(_) => "expires",
596                    };
597                    validation.record_fail(field, err);
598                }
599            }
600        } else {
601            validation.record("security_token", "should be nonempty");
602        }
603        Err(Rejection::bad_request(validation).context(self))
604    }
605
606    /// Attempts to construct an instance of `SessionId` from an HTTP request.
607    /// The value is extracted from the `x-session-id` or `session-id` header.
608    #[cfg(feature = "auth")]
609    fn parse_session_id(&self) -> Result<SessionId, Rejection> {
610        self.get_header("x-session-id")
611            .or_else(|| self.get_header("session-id"))
612            .ok_or_else(|| {
613                Rejection::from_validation_entry(
614                    "session_id",
615                    warn!("a `session-id` or `x-session-id` header is required"),
616                )
617                .context(self)
618            })
619            .and_then(|session_id| {
620                SessionId::parse(session_id).map_err(|err| {
621                    Rejection::from_validation_entry("session_id", err).context(self)
622                })
623            })
624    }
625
626    /// Attempts to construct an instance of `JwtClaims` from an HTTP request.
627    /// The value is extracted from the query parameter `access_token` or
628    /// the `authorization` header.
629    #[cfg(feature = "jwt")]
630    fn parse_jwt_claims<T, K>(&self, key: &K) -> Result<JwtClaims<T>, Rejection>
631    where
632        T: Default + serde::Serialize + DeserializeOwned,
633        K: MACLike,
634    {
635        let (param, mut token) = match self.get_query("access_token") {
636            Some(access_token) => ("access_token", access_token),
637            None => ("authorization", ""),
638        };
639        if let Some(authorization) = self.get_header("authorization") {
640            token = authorization
641                .strip_prefix("Bearer ")
642                .unwrap_or(authorization);
643        }
644        if token.is_empty() {
645            let mut validation = Validation::new();
646            validation.record(param, "JWT token is absent");
647            return Err(Rejection::bad_request(validation).context(self));
648        }
649
650        let mut options = zino_auth::default_verification_options();
651        options.reject_before = self
652            .get_query("timestamp")
653            .and_then(|s| s.parse().ok())
654            .map(|i| Duration::from_secs(i).into());
655        options.required_nonce = self.get_query("nonce").map(|s| s.to_owned());
656
657        match key.verify_token(token, Some(options)) {
658            Ok(claims) => Ok(claims.into()),
659            Err(err) => {
660                let message = format!("401 Unauthorized: {err}");
661                Err(Rejection::with_message(message).context(self))
662            }
663        }
664    }
665
666    /// Returns a `Response` or `Rejection` from a model query validation.
667    /// The data is extracted from [`parse_query()`](RequestContext::parse_query).
668    fn query_validation<S>(&self, query: &mut Query) -> Result<Response<S>, Rejection>
669    where
670        Self: Sized,
671        S: ResponseCode,
672    {
673        match self.parse_query() {
674            Ok(data) => {
675                let validation = query.read_map(&data);
676                if validation.is_success() {
677                    Ok(Response::with_context(S::OK, self))
678                } else {
679                    Err(Rejection::bad_request(validation).context(self))
680                }
681            }
682            Err(rejection) => Err(rejection),
683        }
684    }
685
686    /// Returns a `Response` or `Rejection` from a model validation.
687    /// The data is extracted from [`parse_body()`](RequestContext::parse_body).
688    async fn model_validation<M, S>(&mut self, model: &mut M) -> Result<Response<S>, Rejection>
689    where
690        Self: Sized,
691        M: ModelHooks,
692        S: ResponseCode,
693    {
694        let data_type = self.data_type().unwrap_or("form");
695        if data_type.contains('/') {
696            let err = warn!(
697                "deserialization of the data type `{}` is unsupported",
698                data_type
699            );
700            let rejection = Rejection::from_validation_entry("data_type", err).context(self);
701            return Err(rejection);
702        }
703        M::before_extract()
704            .await
705            .map_err(|err| Rejection::from_error(err).context(self))?;
706
707        let is_form = data_type == "form";
708        let bytes = self
709            .read_body_bytes()
710            .await
711            .map_err(|err| Rejection::from_validation_entry("body", err).context(self))?;
712        let extension = self.get_data::<M::Extension>();
713        if is_form {
714            let mut data = serde_qs::from_bytes(&bytes)
715                .map_err(|err| Rejection::from_validation_entry("body", err).context(self))?;
716            match M::before_validation(&mut data, extension.as_ref()).await {
717                Ok(()) => {
718                    let validation = model.read_map(&data);
719                    model
720                        .after_validation(&mut data)
721                        .await
722                        .map_err(|err| Rejection::from_error(err).context(self))?;
723                    if let Some(extension) = extension {
724                        model
725                            .after_extract(extension)
726                            .await
727                            .map_err(|err| Rejection::from_error(err).context(self))?;
728                    }
729                    if validation.is_success() {
730                        Ok(Response::with_context(S::OK, self))
731                    } else {
732                        Err(Rejection::bad_request(validation).context(self))
733                    }
734                }
735                Err(err) => Err(Rejection::from_error(err).context(self)),
736            }
737        } else {
738            let mut data = serde_json::from_slice(&bytes)
739                .map_err(|err| Rejection::from_validation_entry("body", err).context(self))?;
740            match M::before_validation(&mut data, extension.as_ref()).await {
741                Ok(()) => {
742                    let validation = model.read_map(&data);
743                    model
744                        .after_validation(&mut data)
745                        .await
746                        .map_err(|err| Rejection::from_error(err).context(self))?;
747                    if let Some(extension) = extension {
748                        model
749                            .after_extract(extension)
750                            .await
751                            .map_err(|err| Rejection::from_error(err).context(self))?;
752                    }
753                    if validation.is_success() {
754                        Ok(Response::with_context(S::OK, self))
755                    } else {
756                        Err(Rejection::bad_request(validation).context(self))
757                    }
758                }
759                Err(err) => Err(Rejection::from_error(err).context(self)),
760            }
761        }
762    }
763
764    /// Makes an HTTP request to the provided URL.
765    async fn fetch(&self, url: &str, options: Option<&Map>) -> Result<reqwest::Response, Error> {
766        let trace_context = self.new_trace_context();
767        Agent::request_builder(url, options)?
768            .header("traceparent", trace_context.traceparent())
769            .header("tracestate", trace_context.tracestate())
770            .send()
771            .await
772            .map_err(Error::from)
773    }
774
775    /// Makes an HTTP request to the provided URL and
776    /// deserializes the response body via JSON.
777    async fn fetch_json<T: DeserializeOwned>(
778        &self,
779        url: &str,
780        options: Option<&Map>,
781    ) -> Result<T, Error> {
782        let response = self.fetch(url, options).await?.error_for_status()?;
783        let data = if response.headers().has_json_content_type() {
784            response.json().await?
785        } else {
786            let text = response.text().await?;
787            serde_json::from_str(&text)?
788        };
789        Ok(data)
790    }
791
792    /// Translates the localization message.
793    #[cfg(feature = "i18n")]
794    fn translate(&self, message: &str, args: Option<FluentArgs>) -> Result<SharedString, Error> {
795        if let Some(locale) = self.locale() {
796            i18n::translate(&locale, message, args)
797        } else {
798            let default_locale = i18n::DEFAULT_LOCALE.parse()?;
799            i18n::translate(&default_locale, message, args)
800        }
801    }
802
803    /// Constructs a new subscription instance.
804    fn subscription(&self) -> Subscription {
805        let mut subscription = self.parse_query::<Subscription>().unwrap_or_default();
806        if subscription.session_id().is_none() {
807            if let Some(session_id) = self.session_id() {
808                subscription.set_session_id(Some(session_id));
809            }
810        }
811        subscription
812    }
813
814    /// Constructs a new cloud event instance.
815    fn cloud_event(&self, event_type: SharedString, data: JsonValue) -> CloudEvent {
816        let id = self.request_id();
817        let source = self.instance();
818        let mut event = CloudEvent::new(id, source, event_type);
819        if let Some(session_id) = self.session_id() {
820            event.set_session_id(session_id);
821        }
822        event.set_data(data);
823        event
824    }
825}