use std::{
collections::HashMap,
convert::Infallible,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use axum::{
Json, RequestExt,
body::Body,
http::{Request, Response, StatusCode, Uri, uri::PathAndQuery},
response::IntoResponse,
};
use http_body_util::BodyExt;
use hyper_rustls::HttpsConnector;
use hyper_util::client::legacy::connect::{Connect, HttpConnector};
use ogcapi_types::common::{Collection, Collections};
use tokio::sync::RwLock;
use tower::Service;
use crate::{CollectionProxy, extractors::RemoteUrl, proxied_linked::ProxiedLinked};
#[derive(Debug, Clone)]
pub struct CollectionsProxy<
C: Connect + Clone + Send + Sync + 'static = HttpsConnector<HttpConnector>,
> {
collections: Arc<RwLock<HashMap<String, CollectionProxy<C>>>>,
}
impl<T: IntoIterator<Item = (String, String)>> From<T>
for CollectionsProxy<HttpsConnector<HttpConnector>>
{
fn from(value: T) -> Self {
Self {
collections: Arc::new(RwLock::new(HashMap::from_iter(value.into_iter().map(
|(proxy_path, target_url)| {
(
proxy_path.clone(),
CollectionProxy::new(proxy_path, target_url),
)
},
)))),
}
}
}
impl<C: Connect + Clone + Send + Sync + 'static> CollectionsProxy<C> {
pub fn new(collections: Arc<RwLock<HashMap<String, CollectionProxy<C>>>>) -> Self {
Self { collections }
}
pub fn empty() -> Self {
Self {
collections: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn handle_request(
&mut self,
mut req: Request<Body>,
) -> Result<Response<Body>, Infallible> {
let collection_id = req.uri().path().split('/').nth(1);
if let Some(collection_id) = collection_id {
if collection_id.is_empty() {
let proxy_uri = req.extract_parts::<RemoteUrl>().await.unwrap().0;
Ok(Json(self.get_collections(proxy_uri).await).into_response())
} else {
let collections = self.collections.read().await;
let collection = collections.get(collection_id);
if let Some(collection) = collection {
collection.handle_request(req).await
} else {
Ok((StatusCode::NOT_FOUND).into_response())
}
}
} else {
Ok((StatusCode::NOT_FOUND).into_response())
}
}
pub async fn insert_collection(
&mut self,
collection_proxy: CollectionProxy<C>,
) -> Option<CollectionProxy<C>> {
self.collections.write().await.insert(
collection_proxy.collection_id().to_owned(),
collection_proxy,
)
}
pub async fn contains(&self, collection_id: &str) -> bool {
self.collections.read().await.contains_key(collection_id)
}
async fn get_collections(&mut self, proxy_uri: Uri) -> Collections {
let collections_map = self.collections.read().await;
let mut collections: Vec<Collection> = Vec::with_capacity(collections_map.len());
for (_, collection_proxy) in collections_map.iter() {
let collection_uri = format!("{proxy_uri}/{}", collection_proxy.collection_id());
let mut uri_parts = proxy_uri.clone().into_parts();
uri_parts.path_and_query = Some(PathAndQuery::from_static("/"));
let hack_uri: Uri = uri_parts.try_into().unwrap();
let req = axum::http::request::Builder::new()
.method("GET")
.uri(hack_uri)
.body(Body::empty())
.unwrap();
let res = collection_proxy.handle_request(req).await.unwrap();
if res.status().is_success() {
let body = res.into_body();
let mut collection: Collection =
serde_json::from_slice(body.collect().await.unwrap().to_bytes().as_ref())
.unwrap();
collection.rewrite_links(collection_proxy.target(), &collection_uri);
collections.push(collection);
}
}
let number_returned = collections.len();
Collections {
collections,
links: vec![],
time_stamp: None,
number_matched: Some(collections_map.len() as u64),
number_returned: Some(number_returned as u64),
crs: vec![],
}
}
}
impl<C: Connect + Clone + Send + Sync + 'static> Service<Request<axum::body::Body>>
for CollectionsProxy<C>
{
type Response = axum::http::Response<axum::body::Body>;
type Error = Infallible;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: axum::http::Request<Body>) -> Self::Future {
let mut this = self.clone();
Box::pin(async move { this.handle_request(req).await })
}
}