ogcapi-proxy 0.2.0

OGC API proxy service
Documentation
use std::{
    collections::HashMap,
    convert::Infallible,
    pin::Pin,
    sync::Arc,
    task::{Context, Poll},
};

use axum::{
    Json, RequestExt,
    body::Body,
    http::{Request, Response, StatusCode, Uri, uri::PathAndQuery},
    response::IntoResponse,
};
use http_body_util::BodyExt;
use hyper_rustls::HttpsConnector;
use hyper_util::client::legacy::connect::{Connect, HttpConnector};
use ogcapi_types::common::{Collection, Collections};
use tokio::sync::RwLock;
use tower::Service;

use crate::{CollectionProxy, extractors::RemoteUrl, proxied_linked::ProxiedLinked};

/// Proxy for multiple OGC API Collections
///
/// This can provide individual collections based on their (proxy) collection_id
/// or all of the proxied [Collections].
#[derive(Debug, Clone)]
pub struct CollectionsProxy<
    C: Connect + Clone + Send + Sync + 'static = HttpsConnector<HttpConnector>,
> {
    collections: Arc<RwLock<HashMap<String, CollectionProxy<C>>>>,
}

impl<T: IntoIterator<Item = (String, String)>> From<T>
    for CollectionsProxy<HttpsConnector<HttpConnector>>
{
    fn from(value: T) -> Self {
        Self {
            collections: Arc::new(RwLock::new(HashMap::from_iter(value.into_iter().map(
                |(proxy_path, target_url)| {
                    (
                        proxy_path.clone(),
                        CollectionProxy::new(proxy_path, target_url),
                    )
                },
            )))),
        }
    }
}

impl<C: Connect + Clone + Send + Sync + 'static> CollectionsProxy<C> {

    /// Create a new CollectionsProxy from a Map of `collection_id` and [CollectionProxy].
    pub fn new(collections: Arc<RwLock<HashMap<String, CollectionProxy<C>>>>) -> Self {
        Self { collections }
    }

    /// Create a new empty CollectionsProxy.
    pub fn empty() -> Self {
        Self {
            collections: Arc::new(RwLock::new(HashMap::new())),
        }
    }

    pub async fn handle_request(
        &mut self,
        mut req: Request<Body>,
    ) -> Result<Response<Body>, Infallible> {
        // TODO could this be made more flexible?
        let collection_id = req.uri().path().split('/').nth(1);

        if let Some(collection_id) = collection_id {
            if collection_id.is_empty() {
                let proxy_uri = req.extract_parts::<RemoteUrl>().await.unwrap().0;
                Ok(Json(self.get_collections(proxy_uri).await).into_response())
            } else {
                let collections = self.collections.read().await;
                let collection = collections.get(collection_id);
                if let Some(collection) = collection {
                    collection.handle_request(req).await
                } else {
                    Ok((StatusCode::NOT_FOUND).into_response())
                }
            }
        } else {
            Ok((StatusCode::NOT_FOUND).into_response())
        }
    }

    /// Insert a proxied collection in this CollectionsProxy
    /// If the `collection_id` of the collection_proxy already exists it is
    /// replaced with the new proxy and the old proxy is returned.
    pub async fn insert_collection(
        &mut self,
        collection_proxy: CollectionProxy<C>,
    ) -> Option<CollectionProxy<C>> {
        self.collections.write().await.insert(
            collection_proxy.collection_id().to_owned(),
            collection_proxy,
        )
    }

    /// Check if this CollectionsProxy contains a proxy for the given collection_id
    pub async fn contains(&self, collection_id: &str) -> bool {
        self.collections.read().await.contains_key(collection_id)
    }

    async fn get_collections(&mut self, proxy_uri: Uri) -> Collections {
        // let collections: JoinSet<_> = state.clone().collection_proxies.iter_mut().map(|(id, collection_proxy)| async move {
        //     let req = axum::http::request::Builder::new()
        //         .method("GET")
        //         .uri(format!("/collections/{id}"))
        //         .body(Body::empty()).unwrap();
        //     collection_proxy.call(req).await
        // }).collect()
        // collections.join_all().await;

        let collections_map = self.collections.read().await;

        let mut collections: Vec<Collection> = Vec::with_capacity(collections_map.len());

        for (_, collection_proxy) in collections_map.iter() {
            let collection_uri = format!("{proxy_uri}/{}", collection_proxy.collection_id());

            // FIXME
            let mut uri_parts = proxy_uri.clone().into_parts();
            uri_parts.path_and_query = Some(PathAndQuery::from_static("/"));
            let hack_uri: Uri = uri_parts.try_into().unwrap();
            let req = axum::http::request::Builder::new()
                .method("GET")
                .uri(hack_uri)
                .body(Body::empty())
                .unwrap();
            let res = collection_proxy.handle_request(req).await.unwrap();
            if res.status().is_success() {
                let body = res.into_body();
                let mut collection: Collection =
                    serde_json::from_slice(body.collect().await.unwrap().to_bytes().as_ref())
                        .unwrap();
                collection.rewrite_links(collection_proxy.target(), &collection_uri);
                collections.push(collection);
            }
        }

        let number_returned = collections.len();
        Collections {
            collections,
            // TODO
            links: vec![],
            time_stamp: None,
            number_matched: Some(collections_map.len() as u64),
            number_returned: Some(number_returned as u64),
            crs: vec![],
        }
    }
}

impl<C: Connect + Clone + Send + Sync + 'static> Service<Request<axum::body::Body>>
    for CollectionsProxy<C>
{
    type Response = axum::http::Response<axum::body::Body>;
    type Error = Infallible;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: axum::http::Request<Body>) -> Self::Future {
        // FIXME this clone is incorrect, look at tower service docs
        let mut this = self.clone();
        Box::pin(async move { this.handle_request(req).await })
    }
}