ogcapi_proxy/
collection_proxy.rs1use std::{
2 convert::Infallible,
3 fmt::Debug,
4 pin::Pin,
5 str::FromStr,
6 task::{Context, Poll},
7};
8
9use axum::{
10 Json, RequestExt,
11 body::Body,
12 http::{HeaderValue, StatusCode, Uri, header::CONTENT_TYPE, uri::PathAndQuery},
13 response::IntoResponse,
14};
15use axum_reverse_proxy::ReverseProxy;
16use http_body_util::BodyExt;
17use hyper_rustls::HttpsConnector;
18use hyper_util::client::legacy::connect::{Connect, HttpConnector};
19use ogcapi_types::common::{
20 Link,
21 media_type::{GEO_JSON, JSON},
22};
23use serde_json::{Map, Value, json};
24use tower::Service;
25
26use crate::{extractors::RemoteUrl, proxied_linked::ProxiedLinked};
27
28#[derive(Clone)]
77pub struct CollectionProxy<
78 C: Connect + Clone + Send + Sync + 'static = HttpsConnector<HttpConnector>,
79> {
80 collection_id: String,
81 proxy: ReverseProxy<C>,
82}
83
84impl<C: Connect + Clone + Send + Sync + 'static> Debug for CollectionProxy<C> {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 f.debug_struct("CollectionProxy")
87 .field("collection_id", &self.collection_id)
88 .field("remote_collection_url", &self.proxy.target())
89 .finish()
90 }
91}
92
93impl CollectionProxy<HttpsConnector<HttpConnector>> {
94 pub fn new(path: String, remote_collection_url: String) -> Self {
95 Self {
96 collection_id: path.clone(),
100 proxy: ReverseProxy::new(
101 format!("/{}", path.trim_start_matches("/")),
102 remote_collection_url,
103 ),
104 }
105 }
106}
107
108impl<C: Connect + Clone + Send + Sync + 'static> CollectionProxy<C> {
109 pub fn new_with_client(
110 path: String,
111 remote_collection_url: String,
112 client: hyper_util::client::legacy::Client<C, Body>,
113 ) -> Self {
114 Self {
115 collection_id: path.clone(),
116 proxy: ReverseProxy::new_with_client(
117 format!("/{}", path.trim_start_matches("/")),
118 remote_collection_url,
119 client,
120 ),
121 }
122 }
123
124 pub async fn handle_request(
125 &self,
126 mut req: axum::http::Request<Body>,
127 ) -> Result<axum::http::Response<axum::body::Body>, Infallible> {
128 let proxy_uri = req.extract_parts::<RemoteUrl>().await.unwrap().0;
129 let mut parts = proxy_uri.into_parts();
130 parts.path_and_query = parts
131 .path_and_query
132 .map(|path_and_query| PathAndQuery::from_str(path_and_query.path()).unwrap());
133 let request_uri_without_query: Uri = parts.try_into().unwrap();
134
135 rewrite_req_to_accept_json(&mut req);
137
138 let response = self.proxy.proxy_request(req).await.unwrap();
140
141 if response.status().is_success()
142 && response
144 .headers()
145 .get(CONTENT_TYPE)
146 .is_some_and(|ct| ct == JSON || ct == GEO_JSON)
147 {
148 let body = response.into_body();
149 let mut value: Value =
151 serde_json::from_slice(body.collect().await.unwrap().to_bytes().as_ref()).unwrap();
152
153 let object: &mut Map<String, Value> = value.as_object_mut().unwrap();
154 if let Some((key, links_value)) = object.remove_entry("links") {
155 let mut links: Vec<Link> = serde_json::from_value(links_value).unwrap();
156 links.rewrite_links(
157 self.target(),
159 &request_uri_without_query.to_string(),
160 );
162 object.insert(key, json!(links));
163 }
164 Ok((StatusCode::OK, Json(value)).into_response())
165 } else {
166 Ok(response)
167 }
168 }
169
170 pub fn target(&self) -> &str {
172 self.proxy.target()
173 }
174
175 pub fn path(&self) -> &str {
177 self.proxy.path()
178 }
179
180 pub fn collection_id(&self) -> &str {
182 self.collection_id.trim_start_matches('/')
183 }
184}
185
186fn rewrite_req_to_accept_json(req: &mut axum::http::Request<Body>) {
187 req.headers_mut().insert(
188 "accept",
189 HeaderValue::from_str("application/json, application/geo+json").unwrap(),
190 );
191}
192
193impl<C> Service<axum::http::Request<Body>> for CollectionProxy<C>
194where
195 C: Connect + Clone + Send + Sync + 'static,
196{
197 type Response = axum::http::Response<axum::body::Body>;
198 type Error = Infallible;
199 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
200
201 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
202 Poll::Ready(Ok(()))
203 }
204
205 fn call(&mut self, req: axum::http::Request<Body>) -> Self::Future {
206 let this = self.clone();
207 Box::pin(async move { this.handle_request(req).await })
208 }
209}