ogcapi_proxy/
collections_proxy.rs1use std::{
2 collections::HashMap,
3 convert::Infallible,
4 pin::Pin,
5 sync::Arc,
6 task::{Context, Poll},
7};
8
9use axum::{
10 Json, RequestExt,
11 body::Body,
12 http::{Request, Response, StatusCode, Uri, uri::PathAndQuery},
13 response::IntoResponse,
14};
15use http_body_util::BodyExt;
16use hyper_rustls::HttpsConnector;
17use hyper_util::client::legacy::connect::{Connect, HttpConnector};
18use ogcapi_types::common::{Collection, Collections};
19use tokio::sync::RwLock;
20use tower::Service;
21
22use crate::{CollectionProxy, extractors::RemoteUrl, proxied_linked::ProxiedLinked};
23
24#[derive(Debug, Clone)]
29pub struct CollectionsProxy<
30 C: Connect + Clone + Send + Sync + 'static = HttpsConnector<HttpConnector>,
31> {
32 collections: Arc<RwLock<HashMap<String, CollectionProxy<C>>>>,
33}
34
35impl<T: IntoIterator<Item = (String, String)>> From<T>
36 for CollectionsProxy<HttpsConnector<HttpConnector>>
37{
38 fn from(value: T) -> Self {
39 Self {
40 collections: Arc::new(RwLock::new(HashMap::from_iter(value.into_iter().map(
41 |(proxy_path, target_url)| {
42 (
43 proxy_path.clone(),
44 CollectionProxy::new(proxy_path, target_url),
45 )
46 },
47 )))),
48 }
49 }
50}
51
52impl<C: Connect + Clone + Send + Sync + 'static> CollectionsProxy<C> {
53
54 pub fn new(collections: Arc<RwLock<HashMap<String, CollectionProxy<C>>>>) -> Self {
56 Self { collections }
57 }
58
59 pub fn empty() -> Self {
61 Self {
62 collections: Arc::new(RwLock::new(HashMap::new())),
63 }
64 }
65
66 pub async fn handle_request(
67 &mut self,
68 mut req: Request<Body>,
69 ) -> Result<Response<Body>, Infallible> {
70 let collection_id = req.uri().path().split('/').nth(1);
72
73 if let Some(collection_id) = collection_id {
74 if collection_id.is_empty() {
75 let proxy_uri = req.extract_parts::<RemoteUrl>().await.unwrap().0;
76 Ok(Json(self.get_collections(proxy_uri).await).into_response())
77 } else {
78 let collections = self.collections.read().await;
79 let collection = collections.get(collection_id);
80 if let Some(collection) = collection {
81 collection.handle_request(req).await
82 } else {
83 Ok((StatusCode::NOT_FOUND).into_response())
84 }
85 }
86 } else {
87 Ok((StatusCode::NOT_FOUND).into_response())
88 }
89 }
90
91 pub async fn insert_collection(
95 &mut self,
96 collection_proxy: CollectionProxy<C>,
97 ) -> Option<CollectionProxy<C>> {
98 self.collections.write().await.insert(
99 collection_proxy.collection_id().to_owned(),
100 collection_proxy,
101 )
102 }
103
104 pub async fn contains(&self, collection_id: &str) -> bool {
106 self.collections.read().await.contains_key(collection_id)
107 }
108
109 async fn get_collections(&mut self, proxy_uri: Uri) -> Collections {
110 let collections_map = self.collections.read().await;
120
121 let mut collections: Vec<Collection> = Vec::with_capacity(collections_map.len());
122
123 for (_, collection_proxy) in collections_map.iter() {
124 let collection_uri = format!("{proxy_uri}/{}", collection_proxy.collection_id());
125
126 let mut uri_parts = proxy_uri.clone().into_parts();
128 uri_parts.path_and_query = Some(PathAndQuery::from_static("/"));
129 let hack_uri: Uri = uri_parts.try_into().unwrap();
130 let req = axum::http::request::Builder::new()
131 .method("GET")
132 .uri(hack_uri)
133 .body(Body::empty())
134 .unwrap();
135 let res = collection_proxy.handle_request(req).await.unwrap();
136 if res.status().is_success() {
137 let body = res.into_body();
138 let mut collection: Collection =
139 serde_json::from_slice(body.collect().await.unwrap().to_bytes().as_ref())
140 .unwrap();
141 collection.rewrite_links(collection_proxy.target(), &collection_uri);
142 collections.push(collection);
143 }
144 }
145
146 let number_returned = collections.len();
147 Collections {
148 collections,
149 links: vec![],
151 time_stamp: None,
152 number_matched: Some(collections_map.len() as u64),
153 number_returned: Some(number_returned as u64),
154 crs: vec![],
155 }
156 }
157}
158
159impl<C: Connect + Clone + Send + Sync + 'static> Service<Request<axum::body::Body>>
160 for CollectionsProxy<C>
161{
162 type Response = axum::http::Response<axum::body::Body>;
163 type Error = Infallible;
164 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
165
166 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
167 Poll::Ready(Ok(()))
168 }
169
170 fn call(&mut self, req: axum::http::Request<Body>) -> Self::Future {
171 let mut this = self.clone();
173 Box::pin(async move { this.handle_request(req).await })
174 }
175}