es_public_proxy/
lib.rs

1use hyper::{Body, Method, Request, StatusCode, Uri};
2use serde::Deserialize;
3use serde_json::json;
4
5pub mod parse;
6
7use parse::UrlQueryParams;
8
9#[derive(Default, Deserialize, Debug, Clone)]
10pub struct ProxyConfig {
11    pub bind_addr: Option<String>,     // 127.0.0.1:9292
12    pub upstream_addr: Option<String>, // 127.0.0.1:9200
13    pub unsafe_all_indices: Option<bool>,
14    pub enable_cors: Option<bool>,
15    pub index: Vec<IndexConfig>,
16}
17
18#[derive(Deserialize, Debug, Clone)]
19pub struct IndexConfig {
20    pub name: String,
21}
22
23impl ProxyConfig {
24    pub fn allow_index(&self, name: &str) -> bool {
25        if self.unsafe_all_indices == Some(true) {
26            return true;
27        }
28        for index in &self.index {
29            if index.name == name {
30                return true;
31            }
32        }
33        false
34    }
35}
36
37#[derive(Debug)]
38pub enum ProxyError {
39    HttpError(String),
40    UpstreamError(String),
41    ParseError(String),
42    UnknownIndex(String),
43    NotSupported(String),
44}
45
46impl ProxyError {
47    pub fn http_status_code(&self) -> StatusCode {
48        match self {
49            ProxyError::HttpError(_) => StatusCode::BAD_REQUEST,
50            ProxyError::UpstreamError(_) => StatusCode::BAD_GATEWAY,
51            ProxyError::ParseError(_) => StatusCode::BAD_REQUEST,
52            ProxyError::UnknownIndex(_) => StatusCode::NOT_FOUND,
53            ProxyError::NotSupported(_) => StatusCode::FORBIDDEN,
54        }
55    }
56
57    pub fn to_json_value(&self) -> serde_json::Value {
58        let (type_slug, reason) = match self {
59            ProxyError::HttpError(s) => ("http-error", s.clone()),
60            ProxyError::UpstreamError(s) => ("upstream-error", s.clone()),
61            ProxyError::ParseError(s) => ("parse-error", s.clone()),
62            ProxyError::UnknownIndex(index) => (
63                "unknown-index",
64                format!(
65                    "index does not exists, or public access not allowed: {}",
66                    index
67                ),
68            ),
69            ProxyError::NotSupported(s) => ("not-supported", s.clone()),
70        };
71
72        json!({
73            "error": {
74                "reason": reason,
75                "type": type_slug,
76            },
77            "status": self.http_status_code().as_u16(),
78        })
79    }
80}
81
82pub async fn filter_request(
83    req: Request<Body>,
84    config: &ProxyConfig,
85) -> Result<Request<Body>, ProxyError> {
86    let (parts, body) = req.into_parts();
87
88    // split path into at most 3 chunks
89    let mut req_path = parts.uri.path();
90    if req_path.starts_with('/') {
91        req_path = &req_path[1..];
92    }
93    let path_chunks: Vec<&str> = req_path.split('/').collect();
94    if path_chunks.len() > 3 {
95        return Err(ProxyError::NotSupported(
96            "only request paths with up to three segments allowed".to_string(),
97        ));
98    }
99
100    let params: UrlQueryParams = serde_urlencoded::from_str(parts.uri.query().unwrap_or(""))
101        .map_err(|e| ProxyError::ParseError(e.to_string()))?;
102
103    // this is sort of like a router
104    let body = match (&parts.method, path_chunks.as_slice()) {
105        (&Method::GET, [""]) | (&Method::HEAD, [""]) | (&Method::OPTIONS, [""]) => Body::empty(),
106        (&Method::HEAD, ["_search", "scroll"]) | (&Method::OPTIONS, ["_search", "scroll"]) => {
107            Body::empty()
108        }
109        (&Method::GET, ["_search", "scroll"])
110        | (&Method::POST, ["_search", "scroll"])
111        | (&Method::DELETE, ["_search", "scroll"]) => {
112            let whole_body = hyper::body::to_bytes(body)
113                .await
114                .map_err(|e| ProxyError::HttpError(e.to_string()))?;
115            filter_scroll_request(&params, &whole_body, config)?
116        }
117        (&Method::HEAD, [index, "_search"]) | (&Method::OPTIONS, [index, "_search"]) => {
118            filter_search_request(index, &params, &[], config)?
119        }
120        (&Method::GET, [index, "_search"]) | (&Method::POST, [index, "_search"]) => {
121            let whole_body = hyper::body::to_bytes(body)
122                .await
123                .map_err(|e| ProxyError::HttpError(e.to_string()))?;
124            filter_search_request(index, &params, &whole_body, config)?
125        }
126        (&Method::HEAD, [index, "_count"]) | (&Method::OPTIONS, [index, "_count"]) => {
127            filter_search_request(index, &params, &[], config)?
128        }
129        (&Method::GET, [index, "_count"]) | (&Method::POST, [index, "_count"]) => {
130            let whole_body = hyper::body::to_bytes(body)
131                .await
132                .map_err(|e| ProxyError::HttpError(e.to_string()))?;
133            filter_search_request(index, &params, &whole_body, config)?
134        }
135        (&Method::GET, [index, "_doc", _key])
136        | (&Method::GET, [index, "_source", _key])
137        | (&Method::HEAD, [index, "_doc", _key])
138        | (&Method::OPTIONS, [index, "_source", _key]) => {
139            filter_read_request(index, path_chunks[1], &params, config)?
140        }
141        (&Method::GET, [index, ""])
142        | (&Method::HEAD, [index, ""])
143        | (&Method::OPTIONS, [index, ""]) => {
144            filter_read_request(index, path_chunks[1], &params, config)?
145        }
146        (&Method::GET, [index]) | (&Method::HEAD, [index]) | (&Method::OPTIONS, [index]) => {
147            // only allow operations on index name (no trailing slash) if not "unsafe_all_indices"
148            // (aka, only if indexes are explicitly enumerated)
149            // otherwise all top-level API endpoints would be allowed
150            if config.unsafe_all_indices != Some(true) {
151                filter_read_request(index, "", &params, config)?
152            } else {
153                return Err(ProxyError::NotSupported(
154                    "unknown elasticsearch API endpoint".to_string(),
155                ));
156            }
157        }
158        (&Method::GET, [index, "_mapping"])
159        | (&Method::HEAD, [index, "_mapping"])
160        | (&Method::OPTIONS, [index, "_mapping"]) => {
161            filter_read_request(index, path_chunks[1], &params, config)?
162        }
163        _ => Err(ProxyError::NotSupported(
164            "unknown elasticsearch API endpoint".to_string(),
165        ))?,
166    };
167
168    let upstream_query = serde_urlencoded::to_string(params).expect("re-encoding URL parameters");
169    let upstream_query_and_params = if !upstream_query.is_empty() {
170        format!("{}?{}", req_path, upstream_query)
171    } else {
172        req_path.to_string()
173    };
174    let upstream_uri = Uri::builder()
175        .scheme("http")
176        .authority(
177            config
178                .upstream_addr
179                .as_ref()
180                .unwrap_or(&"localhost:9200".to_string())
181                .as_str(),
182        )
183        .path_and_query(upstream_query_and_params.as_str())
184        .build()
185        .expect("constructing upstream request URI");
186
187    let upstream_req = Request::builder()
188        .uri(upstream_uri)
189        .method(&parts.method)
190        .header("Content-Type", "application/json; charset=UTF-8")
191        .body(body)
192        .expect("constructing upstream request");
193
194    Ok(upstream_req)
195}
196pub fn filter_scroll_request(
197    _params: &UrlQueryParams,
198    body: &[u8],
199    _config: &ProxyConfig,
200) -> Result<Body, ProxyError> {
201    if !body.is_empty() {
202        let parsed: parse::ScrollBody =
203            serde_json::from_slice(body).map_err(|e| ProxyError::ParseError(e.to_string()))?;
204        // check that scroll_id is not "_all" or too short
205        match &parsed.scroll_id {
206            parse::StringOrArray::String(single) => {
207                if single == "_all" || single.len() < 8 {
208                    return Err(ProxyError::NotSupported(format!(
209                        "short scroll_id: {}",
210                        single
211                    )));
212                }
213            }
214            parse::StringOrArray::Array(array) => {
215                for single in array {
216                    if single == "_all" || single.len() < 8 {
217                        return Err(ProxyError::NotSupported(format!(
218                            "short scroll_id: {}",
219                            single
220                        )));
221                    }
222                }
223            }
224        }
225        Ok(Body::from(serde_json::to_string(&parsed).unwrap()))
226    } else {
227        Ok(Body::empty())
228    }
229}
230
231pub fn filter_read_request(
232    index: &str,
233    _endpoint: &str,
234    _params: &UrlQueryParams,
235    config: &ProxyConfig,
236) -> Result<Body, ProxyError> {
237    if !config.allow_index(index) {
238        return Err(ProxyError::UnknownIndex(index.to_string()));
239    }
240    Ok(Body::empty())
241}
242
243pub fn filter_search_request(
244    index: &str,
245    _params: &UrlQueryParams,
246    body: &[u8],
247    config: &ProxyConfig,
248) -> Result<Body, ProxyError> {
249    if !config.allow_index(index) {
250        return Err(ProxyError::UnknownIndex(index.to_string()));
251    }
252    // XXX: more checks
253    if !body.is_empty() {
254        let parsed: parse::SearchBody =
255            serde_json::from_slice(body).map_err(|e| ProxyError::ParseError(e.to_string()))?;
256        Ok(Body::from(serde_json::to_string(&parsed).unwrap()))
257    } else {
258        Ok(Body::empty())
259    }
260}