1use futures::StreamExt;
2use futures::stream::FuturesOrdered;
3use tokio::select;
4use tracing::debug;
5use tracing::instrument;
6
7use htsget_config::types::{JsonResponse, Request, Response};
8use htsget_search::HtsGet;
9
10use crate::HtsGetError::InvalidInput;
11use crate::{
12 Endpoint, HtsGetError, PostRequest, Result, convert_to_query, match_format_from_query,
13 merge_responses,
14};
15
16#[instrument(level = "debug", skip_all, ret)]
20pub async fn get(
21 searcher: impl HtsGet + Send + Sync + 'static,
22 request: Request,
23 endpoint: Endpoint,
24) -> Result<JsonResponse> {
25 let format = match_format_from_query(&endpoint, request.query())?;
26 let query = convert_to_query(request, format)?;
27
28 debug!(endpoint = ?endpoint, query = ?query, "getting GET response");
29
30 searcher
31 .search(query)
32 .await
33 .map_err(Into::into)
34 .map(JsonResponse::from)
35}
36
37#[instrument(level = "debug", skip_all, ret)]
40pub async fn post(
41 searcher: impl HtsGet + Clone + Send + Sync + 'static,
42 body: PostRequest,
43 request: Request,
44 endpoint: Endpoint,
45) -> Result<JsonResponse> {
46 if !request.query().is_empty() {
47 return Err(InvalidInput(
48 "query parameters should be empty for a POST request".to_string(),
49 ));
50 }
51
52 let queries = body.get_queries(request, &endpoint)?;
53
54 debug!(endpoint = ?endpoint, queries = ?queries, "getting POST response");
55
56 let mut futures = FuturesOrdered::new();
57 for query in queries {
58 let owned_searcher = searcher.clone();
59 futures.push_back(tokio::spawn(
60 async move { owned_searcher.search(query).await },
61 ));
62 }
63 let mut responses: Vec<Response> = Vec::new();
64 loop {
65 select! {
66 Some(next) = futures.next() => responses.push(next.map_err(|err| HtsGetError::InternalError(err.to_string()))?.map_err(HtsGetError::from)?),
67 else => break
68 }
69 }
70
71 Ok(JsonResponse::from(
72 merge_responses(responses).expect("expected at least one response"),
73 ))
74}