1use hyper::{Body, Method, Request, StatusCode, Uri};
2use serde::Deserialize;
3use serde_json::json;
4
5pub mod parse;
6
7use parse::UrlQueryParams;
8
9#[derive(Default, Deserialize, Debug, Clone)]
10pub struct ProxyConfig {
11 pub bind_addr: Option<String>, pub upstream_addr: Option<String>, pub unsafe_all_indices: Option<bool>,
14 pub enable_cors: Option<bool>,
15 pub index: Vec<IndexConfig>,
16}
17
18#[derive(Deserialize, Debug, Clone)]
19pub struct IndexConfig {
20 pub name: String,
21}
22
23impl ProxyConfig {
24 pub fn allow_index(&self, name: &str) -> bool {
25 if self.unsafe_all_indices == Some(true) {
26 return true;
27 }
28 for index in &self.index {
29 if index.name == name {
30 return true;
31 }
32 }
33 false
34 }
35}
36
37#[derive(Debug)]
38pub enum ProxyError {
39 HttpError(String),
40 UpstreamError(String),
41 ParseError(String),
42 UnknownIndex(String),
43 NotSupported(String),
44}
45
46impl ProxyError {
47 pub fn http_status_code(&self) -> StatusCode {
48 match self {
49 ProxyError::HttpError(_) => StatusCode::BAD_REQUEST,
50 ProxyError::UpstreamError(_) => StatusCode::BAD_GATEWAY,
51 ProxyError::ParseError(_) => StatusCode::BAD_REQUEST,
52 ProxyError::UnknownIndex(_) => StatusCode::NOT_FOUND,
53 ProxyError::NotSupported(_) => StatusCode::FORBIDDEN,
54 }
55 }
56
57 pub fn to_json_value(&self) -> serde_json::Value {
58 let (type_slug, reason) = match self {
59 ProxyError::HttpError(s) => ("http-error", s.clone()),
60 ProxyError::UpstreamError(s) => ("upstream-error", s.clone()),
61 ProxyError::ParseError(s) => ("parse-error", s.clone()),
62 ProxyError::UnknownIndex(index) => (
63 "unknown-index",
64 format!(
65 "index does not exists, or public access not allowed: {}",
66 index
67 ),
68 ),
69 ProxyError::NotSupported(s) => ("not-supported", s.clone()),
70 };
71
72 json!({
73 "error": {
74 "reason": reason,
75 "type": type_slug,
76 },
77 "status": self.http_status_code().as_u16(),
78 })
79 }
80}
81
82pub async fn filter_request(
83 req: Request<Body>,
84 config: &ProxyConfig,
85) -> Result<Request<Body>, ProxyError> {
86 let (parts, body) = req.into_parts();
87
88 let mut req_path = parts.uri.path();
90 if req_path.starts_with('/') {
91 req_path = &req_path[1..];
92 }
93 let path_chunks: Vec<&str> = req_path.split('/').collect();
94 if path_chunks.len() > 3 {
95 return Err(ProxyError::NotSupported(
96 "only request paths with up to three segments allowed".to_string(),
97 ));
98 }
99
100 let params: UrlQueryParams = serde_urlencoded::from_str(parts.uri.query().unwrap_or(""))
101 .map_err(|e| ProxyError::ParseError(e.to_string()))?;
102
103 let body = match (&parts.method, path_chunks.as_slice()) {
105 (&Method::GET, [""]) | (&Method::HEAD, [""]) | (&Method::OPTIONS, [""]) => Body::empty(),
106 (&Method::HEAD, ["_search", "scroll"]) | (&Method::OPTIONS, ["_search", "scroll"]) => {
107 Body::empty()
108 }
109 (&Method::GET, ["_search", "scroll"])
110 | (&Method::POST, ["_search", "scroll"])
111 | (&Method::DELETE, ["_search", "scroll"]) => {
112 let whole_body = hyper::body::to_bytes(body)
113 .await
114 .map_err(|e| ProxyError::HttpError(e.to_string()))?;
115 filter_scroll_request(¶ms, &whole_body, config)?
116 }
117 (&Method::HEAD, [index, "_search"]) | (&Method::OPTIONS, [index, "_search"]) => {
118 filter_search_request(index, ¶ms, &[], config)?
119 }
120 (&Method::GET, [index, "_search"]) | (&Method::POST, [index, "_search"]) => {
121 let whole_body = hyper::body::to_bytes(body)
122 .await
123 .map_err(|e| ProxyError::HttpError(e.to_string()))?;
124 filter_search_request(index, ¶ms, &whole_body, config)?
125 }
126 (&Method::HEAD, [index, "_count"]) | (&Method::OPTIONS, [index, "_count"]) => {
127 filter_search_request(index, ¶ms, &[], config)?
128 }
129 (&Method::GET, [index, "_count"]) | (&Method::POST, [index, "_count"]) => {
130 let whole_body = hyper::body::to_bytes(body)
131 .await
132 .map_err(|e| ProxyError::HttpError(e.to_string()))?;
133 filter_search_request(index, ¶ms, &whole_body, config)?
134 }
135 (&Method::GET, [index, "_doc", _key])
136 | (&Method::GET, [index, "_source", _key])
137 | (&Method::HEAD, [index, "_doc", _key])
138 | (&Method::OPTIONS, [index, "_source", _key]) => {
139 filter_read_request(index, path_chunks[1], ¶ms, config)?
140 }
141 (&Method::GET, [index, ""])
142 | (&Method::HEAD, [index, ""])
143 | (&Method::OPTIONS, [index, ""]) => {
144 filter_read_request(index, path_chunks[1], ¶ms, config)?
145 }
146 (&Method::GET, [index]) | (&Method::HEAD, [index]) | (&Method::OPTIONS, [index]) => {
147 if config.unsafe_all_indices != Some(true) {
151 filter_read_request(index, "", ¶ms, config)?
152 } else {
153 return Err(ProxyError::NotSupported(
154 "unknown elasticsearch API endpoint".to_string(),
155 ));
156 }
157 }
158 (&Method::GET, [index, "_mapping"])
159 | (&Method::HEAD, [index, "_mapping"])
160 | (&Method::OPTIONS, [index, "_mapping"]) => {
161 filter_read_request(index, path_chunks[1], ¶ms, config)?
162 }
163 _ => Err(ProxyError::NotSupported(
164 "unknown elasticsearch API endpoint".to_string(),
165 ))?,
166 };
167
168 let upstream_query = serde_urlencoded::to_string(params).expect("re-encoding URL parameters");
169 let upstream_query_and_params = if !upstream_query.is_empty() {
170 format!("{}?{}", req_path, upstream_query)
171 } else {
172 req_path.to_string()
173 };
174 let upstream_uri = Uri::builder()
175 .scheme("http")
176 .authority(
177 config
178 .upstream_addr
179 .as_ref()
180 .unwrap_or(&"localhost:9200".to_string())
181 .as_str(),
182 )
183 .path_and_query(upstream_query_and_params.as_str())
184 .build()
185 .expect("constructing upstream request URI");
186
187 let upstream_req = Request::builder()
188 .uri(upstream_uri)
189 .method(&parts.method)
190 .header("Content-Type", "application/json; charset=UTF-8")
191 .body(body)
192 .expect("constructing upstream request");
193
194 Ok(upstream_req)
195}
196pub fn filter_scroll_request(
197 _params: &UrlQueryParams,
198 body: &[u8],
199 _config: &ProxyConfig,
200) -> Result<Body, ProxyError> {
201 if !body.is_empty() {
202 let parsed: parse::ScrollBody =
203 serde_json::from_slice(body).map_err(|e| ProxyError::ParseError(e.to_string()))?;
204 match &parsed.scroll_id {
206 parse::StringOrArray::String(single) => {
207 if single == "_all" || single.len() < 8 {
208 return Err(ProxyError::NotSupported(format!(
209 "short scroll_id: {}",
210 single
211 )));
212 }
213 }
214 parse::StringOrArray::Array(array) => {
215 for single in array {
216 if single == "_all" || single.len() < 8 {
217 return Err(ProxyError::NotSupported(format!(
218 "short scroll_id: {}",
219 single
220 )));
221 }
222 }
223 }
224 }
225 Ok(Body::from(serde_json::to_string(&parsed).unwrap()))
226 } else {
227 Ok(Body::empty())
228 }
229}
230
231pub fn filter_read_request(
232 index: &str,
233 _endpoint: &str,
234 _params: &UrlQueryParams,
235 config: &ProxyConfig,
236) -> Result<Body, ProxyError> {
237 if !config.allow_index(index) {
238 return Err(ProxyError::UnknownIndex(index.to_string()));
239 }
240 Ok(Body::empty())
241}
242
243pub fn filter_search_request(
244 index: &str,
245 _params: &UrlQueryParams,
246 body: &[u8],
247 config: &ProxyConfig,
248) -> Result<Body, ProxyError> {
249 if !config.allow_index(index) {
250 return Err(ProxyError::UnknownIndex(index.to_string()));
251 }
252 if !body.is_empty() {
254 let parsed: parse::SearchBody =
255 serde_json::from_slice(body).map_err(|e| ProxyError::ParseError(e.to_string()))?;
256 Ok(Body::from(serde_json::to_string(&parsed).unwrap()))
257 } else {
258 Ok(Body::empty())
259 }
260}