ogcapi_proxy/
collections_proxy.rs

1use std::{
2    collections::HashMap,
3    convert::Infallible,
4    pin::Pin,
5    sync::Arc,
6    task::{Context, Poll},
7};
8
9use axum::{
10    Json, RequestExt,
11    body::Body,
12    http::{Request, Response, StatusCode, Uri, uri::PathAndQuery},
13    response::IntoResponse,
14};
15use http_body_util::BodyExt;
16use hyper_rustls::HttpsConnector;
17use hyper_util::client::legacy::connect::{Connect, HttpConnector};
18use ogcapi_types::common::{Collection, Collections};
19use tokio::sync::RwLock;
20use tower::Service;
21
22use crate::{CollectionProxy, extractors::RemoteUrl, proxied_linked::ProxiedLinked};
23
24/// Proxy for multiple OGC API Collections
25///
26/// This can provide individual collections based on their (proxy) collection_id
27/// or all of the proxied [Collections].
28#[derive(Debug, Clone)]
29pub struct CollectionsProxy<
30    C: Connect + Clone + Send + Sync + 'static = HttpsConnector<HttpConnector>,
31> {
32    collections: Arc<RwLock<HashMap<String, CollectionProxy<C>>>>,
33}
34
35impl<T: IntoIterator<Item = (String, String)>> From<T>
36    for CollectionsProxy<HttpsConnector<HttpConnector>>
37{
38    fn from(value: T) -> Self {
39        Self {
40            collections: Arc::new(RwLock::new(HashMap::from_iter(value.into_iter().map(
41                |(proxy_path, target_url)| {
42                    (
43                        proxy_path.clone(),
44                        CollectionProxy::new(proxy_path, target_url),
45                    )
46                },
47            )))),
48        }
49    }
50}
51
52impl<C: Connect + Clone + Send + Sync + 'static> CollectionsProxy<C> {
53
54    /// Create a new CollectionsProxy from a Map of `collection_id` and [CollectionProxy].
55    pub fn new(collections: Arc<RwLock<HashMap<String, CollectionProxy<C>>>>) -> Self {
56        Self { collections }
57    }
58
59    /// Create a new empty CollectionsProxy.
60    pub fn empty() -> Self {
61        Self {
62            collections: Arc::new(RwLock::new(HashMap::new())),
63        }
64    }
65
66    pub async fn handle_request(
67        &mut self,
68        mut req: Request<Body>,
69    ) -> Result<Response<Body>, Infallible> {
70        // TODO could this be made more flexible?
71        let collection_id = req.uri().path().split('/').nth(1);
72
73        if let Some(collection_id) = collection_id {
74            if collection_id.is_empty() {
75                let proxy_uri = req.extract_parts::<RemoteUrl>().await.unwrap().0;
76                Ok(Json(self.get_collections(proxy_uri).await).into_response())
77            } else {
78                let collections = self.collections.read().await;
79                let collection = collections.get(collection_id);
80                if let Some(collection) = collection {
81                    collection.handle_request(req).await
82                } else {
83                    Ok((StatusCode::NOT_FOUND).into_response())
84                }
85            }
86        } else {
87            Ok((StatusCode::NOT_FOUND).into_response())
88        }
89    }
90
91    /// Insert a proxied collection in this CollectionsProxy
92    /// If the `collection_id` of the collection_proxy already exists it is
93    /// replaced with the new proxy and the old proxy is returned.
94    pub async fn insert_collection(
95        &mut self,
96        collection_proxy: CollectionProxy<C>,
97    ) -> Option<CollectionProxy<C>> {
98        self.collections.write().await.insert(
99            collection_proxy.collection_id().to_owned(),
100            collection_proxy,
101        )
102    }
103
104    /// Check if this CollectionsProxy contains a proxy for the given collection_id
105    pub async fn contains(&self, collection_id: &str) -> bool {
106        self.collections.read().await.contains_key(collection_id)
107    }
108
109    async fn get_collections(&mut self, proxy_uri: Uri) -> Collections {
110        // let collections: JoinSet<_> = state.clone().collection_proxies.iter_mut().map(|(id, collection_proxy)| async move {
111        //     let req = axum::http::request::Builder::new()
112        //         .method("GET")
113        //         .uri(format!("/collections/{id}"))
114        //         .body(Body::empty()).unwrap();
115        //     collection_proxy.call(req).await
116        // }).collect()
117        // collections.join_all().await;
118
119        let collections_map = self.collections.read().await;
120
121        let mut collections: Vec<Collection> = Vec::with_capacity(collections_map.len());
122
123        for (_, collection_proxy) in collections_map.iter() {
124            let collection_uri = format!("{proxy_uri}/{}", collection_proxy.collection_id());
125
126            // FIXME
127            let mut uri_parts = proxy_uri.clone().into_parts();
128            uri_parts.path_and_query = Some(PathAndQuery::from_static("/"));
129            let hack_uri: Uri = uri_parts.try_into().unwrap();
130            let req = axum::http::request::Builder::new()
131                .method("GET")
132                .uri(hack_uri)
133                .body(Body::empty())
134                .unwrap();
135            let res = collection_proxy.handle_request(req).await.unwrap();
136            if res.status().is_success() {
137                let body = res.into_body();
138                let mut collection: Collection =
139                    serde_json::from_slice(body.collect().await.unwrap().to_bytes().as_ref())
140                        .unwrap();
141                collection.rewrite_links(collection_proxy.target(), &collection_uri);
142                collections.push(collection);
143            }
144        }
145
146        let number_returned = collections.len();
147        Collections {
148            collections,
149            // TODO
150            links: vec![],
151            time_stamp: None,
152            number_matched: Some(collections_map.len() as u64),
153            number_returned: Some(number_returned as u64),
154            crs: vec![],
155        }
156    }
157}
158
159impl<C: Connect + Clone + Send + Sync + 'static> Service<Request<axum::body::Body>>
160    for CollectionsProxy<C>
161{
162    type Response = axum::http::Response<axum::body::Body>;
163    type Error = Infallible;
164    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
165
166    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
167        Poll::Ready(Ok(()))
168    }
169
170    fn call(&mut self, req: axum::http::Request<Body>) -> Self::Future {
171        // FIXME this clone is incorrect, look at tower service docs
172        let mut this = self.clone();
173        Box::pin(async move { this.handle_request(req).await })
174    }
175}