casper_json_rpc/
filters.rs1#[cfg(test)]
7mod tests;
8
9use bytes::Bytes;
10use http::{header::CONTENT_TYPE, HeaderMap, StatusCode};
11use serde_json::{json, Map, Value};
12use tracing::{debug, trace, warn};
13use warp::{
14 body,
15 filters::BoxedFilter,
16 reject::{self, Rejection},
17 reply::{self, WithStatus},
18 Filter,
19};
20
21use crate::{
22 error::{Error, ReservedErrorCode},
23 rejections::{BodyTooLarge, MissingContentTypeHeader, MissingId, UnsupportedMediaType},
24 request::{ErrorOrRejection, Request},
25 request_handlers::RequestHandlers,
26 response::Response,
27};
28
29const CONTENT_TYPE_VALUE: &str = "application/json";
30
31pub fn base_filter<P: AsRef<str>>(path: P, max_body_bytes: u32) -> BoxedFilter<()> {
39 let path = path.as_ref().to_string();
40 warp::path::path(path)
41 .and(warp::path::end())
42 .and(warp::filters::method::post())
43 .and(
44 warp::filters::header::headers_cloned().and_then(|headers: HeaderMap| async move {
45 for (name, value) in headers.iter() {
46 if name.as_str() == CONTENT_TYPE.as_str() {
47 if value
48 .as_bytes()
49 .eq_ignore_ascii_case(CONTENT_TYPE_VALUE.as_bytes())
50 {
51 return Ok(());
52 } else {
53 trace!(content_type = ?value.to_str(), "invalid {}", CONTENT_TYPE);
54 return Err(reject::custom(UnsupportedMediaType));
55 }
56 }
57 }
58 trace!("missing {}", CONTENT_TYPE);
59 Err(reject::custom(MissingContentTypeHeader))
60 }),
61 )
62 .untuple_one()
63 .and(body::content_length_limit(max_body_bytes as u64).or_else(
64 move |_rejection| async move { Err(reject::custom(BodyTooLarge(max_body_bytes))) },
65 ))
66 .boxed()
67}
68
69async fn handle_body(
79 body: Bytes,
80 handlers: RequestHandlers,
81 allow_unknown_fields: bool,
82) -> Result<Response, Rejection> {
83 let response = match serde_json::from_slice::<Map<String, Value>>(&body) {
84 Ok(unvalidated_request) => match Request::new(unvalidated_request, allow_unknown_fields) {
85 Ok(request) => handlers.handle_request(request).await,
86 Err(ErrorOrRejection::Error { id, error }) => {
87 debug!(?error, "got an invalid request");
88 Response::new_failure(id, error)
89 }
90 Err(ErrorOrRejection::Rejection(rejection)) => {
91 debug!(?rejection, "rejecting an invalid request");
92 return Err(rejection);
93 }
94 },
95 Err(error) => {
96 debug!(%error, "got bad json");
97 let error = Error::new(ReservedErrorCode::ParseError, error.to_string());
98 Response::new_failure(Value::Null, error)
99 }
100 };
101 Ok(response)
102}
103
104pub fn main_filter(
116 handlers: RequestHandlers,
117 allow_unknown_fields: bool,
118) -> BoxedFilter<(WithStatus<reply::Json>,)> {
119 body::bytes()
120 .and_then(move |body| {
121 let handlers = handlers.clone();
122 async move { handle_body(body, handlers, allow_unknown_fields).await }
123 })
124 .map(|response| reply::with_status(reply::json(&response), StatusCode::OK))
125 .boxed()
126}
127
128pub async fn handle_rejection(error: Rejection) -> Result<WithStatus<reply::Json>, Rejection> {
135 let code;
136 let message;
137
138 if let Some(rejection) = error.find::<UnsupportedMediaType>() {
139 trace!("{:?}", rejection);
140 message = rejection.to_string();
141 code = StatusCode::UNSUPPORTED_MEDIA_TYPE;
142 } else if let Some(rejection) = error.find::<MissingContentTypeHeader>() {
143 trace!("{:?}", rejection);
144 message = rejection.to_string();
145 code = StatusCode::BAD_REQUEST;
146 } else if let Some(rejection) = error.find::<MissingId>() {
147 trace!("{:?}", rejection);
148 message = rejection.to_string();
149 code = StatusCode::BAD_REQUEST;
150 } else if let Some(rejection) = error.find::<BodyTooLarge>() {
151 trace!("{:?}", rejection);
152 message = rejection.to_string();
153 code = StatusCode::PAYLOAD_TOO_LARGE;
154 } else if error.is_not_found() {
155 trace!("{:?}", error);
156 message = "Path not found".to_string();
157 code = StatusCode::NOT_FOUND;
158 } else if let Some(rejection) = error.find::<reject::MethodNotAllowed>() {
159 trace!("{:?}", rejection);
160 message = rejection.to_string();
161 code = StatusCode::METHOD_NOT_ALLOWED;
162 } else if let Some(rejection) = error.find::<reject::InvalidHeader>() {
163 trace!("{:?}", rejection);
164 message = rejection.to_string();
165 code = StatusCode::BAD_REQUEST;
166 } else if let Some(rejection) = error.find::<reject::MissingHeader>() {
167 trace!("{:?}", rejection);
168 message = rejection.to_string();
169 code = StatusCode::BAD_REQUEST;
170 } else if let Some(rejection) = error.find::<reject::InvalidQuery>() {
171 trace!("{:?}", rejection);
172 message = rejection.to_string();
173 code = StatusCode::BAD_REQUEST;
174 } else if let Some(rejection) = error.find::<reject::MissingCookie>() {
175 trace!("{:?}", rejection);
176 message = rejection.to_string();
177 code = StatusCode::BAD_REQUEST;
178 } else if let Some(rejection) = error.find::<reject::LengthRequired>() {
179 trace!("{:?}", rejection);
180 message = rejection.to_string();
181 code = StatusCode::LENGTH_REQUIRED;
182 } else if let Some(rejection) = error.find::<reject::PayloadTooLarge>() {
183 trace!("{:?}", rejection);
184 message = rejection.to_string();
185 code = StatusCode::PAYLOAD_TOO_LARGE;
186 } else if let Some(rejection) = error.find::<reject::UnsupportedMediaType>() {
187 trace!("{:?}", rejection);
188 message = rejection.to_string();
189 code = StatusCode::UNSUPPORTED_MEDIA_TYPE;
190 } else if let Some(rejection) = error.find::<warp::filters::cors::CorsForbidden>() {
191 trace!("{:?}", rejection);
192 message = rejection.to_string();
193 code = StatusCode::FORBIDDEN;
194 } else {
195 warn!(?error, "unhandled warp rejection in json-rpc server");
197 message = format!("Internal server error: unhandled rejection: {:?}", error);
198 code = StatusCode::INTERNAL_SERVER_ERROR;
199 }
200
201 Ok(reply::with_status(
202 reply::json(&json!({ "message": message })),
203 code,
204 ))
205}