gr/remote/
query.rs

1use std::borrow::Borrow;
2use std::iter::Iterator;
3use std::sync::Arc;
4
5use serde::Serialize;
6
7use crate::api_traits::Timestamp;
8use crate::backoff::{Backoff, Exponential};
9use crate::display::DisplayBody;
10use crate::http::throttle::{self, ThrottleStrategy};
11use crate::time;
12use crate::{
13    api_defaults,
14    api_traits::{ApiOperation, NumberDeltaErr},
15    display, error,
16    http::{self, Body, Headers, Paginator, Request, Resource},
17    io::{HttpResponse, HttpRunner},
18    json_load_page, json_loads,
19    remote::ListBodyArgs,
20    time::sort_filter_by_date,
21    Result,
22};
23
24fn get_remote_resource_headers<R: HttpRunner<Response = HttpResponse>>(
25    runner: &Arc<R>,
26    url: &str,
27    request_headers: Headers,
28    api_operation: ApiOperation,
29) -> Result<HttpResponse> {
30    send_request::<_, String>(
31        runner,
32        url,
33        None,
34        request_headers,
35        http::Method::HEAD,
36        api_operation,
37    )
38}
39
40pub fn num_pages<R: HttpRunner<Response = HttpResponse>>(
41    runner: &Arc<R>,
42    url: &str,
43    request_headers: Headers,
44    api_operation: ApiOperation,
45) -> Result<Option<u32>> {
46    let response = get_remote_resource_headers(runner, url, request_headers, api_operation)?;
47    match response.get_page_headers().borrow() {
48        Some(page_header) => {
49            if let Some(last_page) = page_header.last_page() {
50                return Ok(Some(last_page.number));
51            }
52            Ok(None)
53        }
54        // Github does not return page headers when there is only one page, so
55        // we assume 1 page in this case.
56        None => Ok(Some(1)),
57    }
58}
59
60pub fn num_resources<R: HttpRunner<Response = HttpResponse>>(
61    runner: &Arc<R>,
62    url: &str,
63    request_headers: Headers,
64    api_operation: ApiOperation,
65) -> Result<Option<NumberDeltaErr>> {
66    let response = get_remote_resource_headers(runner, url, request_headers, api_operation)?;
67    match response.get_page_headers().borrow() {
68        Some(page_header) => {
69            // total resources per_page * total_pages
70            if let Some(last_page) = page_header.last_page() {
71                let count = last_page.number * page_header.per_page;
72                return Ok(Some(NumberDeltaErr {
73                    num: count,
74                    delta: page_header.per_page,
75                }));
76            }
77            Ok(None)
78        }
79        None => {
80            // Github does not return page headers when there is only one page, so
81            // we assume 1 page in this case.
82            Ok(Some(NumberDeltaErr {
83                num: 1,
84                delta: api_defaults::DEFAULT_PER_PAGE,
85            }))
86        }
87    }
88}
89
90pub fn query_error(url: &str, response: &HttpResponse) -> error::GRError {
91    error::GRError::RemoteServerError(format!(
92        "Failed to submit request to URL: {} with status code: {} and body: {}",
93        url, response.status, response.body
94    ))
95}
96
97pub fn send<R: HttpRunner<Response = HttpResponse>, D: Serialize, T>(
98    runner: &Arc<R>,
99    url: &str,
100    body: Option<&Body<D>>,
101    request_headers: Headers,
102    operation: ApiOperation,
103    mapper: impl Fn(&serde_json::Value) -> T,
104    method: http::Method,
105) -> Result<T> {
106    let response = send_request(runner, url, body, request_headers, method, operation)?;
107    let body = json_loads(&response.body)?;
108    Ok(mapper(&body))
109}
110
111pub fn send_json<R: HttpRunner<Response = HttpResponse>, D: Serialize>(
112    runner: &Arc<R>,
113    url: &str,
114    body: Option<&Body<D>>,
115    request_headers: Headers,
116    operation: ApiOperation,
117    method: http::Method,
118) -> Result<serde_json::Value> {
119    let response = send_request(runner, url, body, request_headers, method, operation)?;
120    json_loads(&response.body)
121}
122
123pub fn send_raw<R: HttpRunner<Response = HttpResponse>, D: Serialize>(
124    runner: &Arc<R>,
125    url: &str,
126    body: Option<&Body<D>>,
127    request_headers: Headers,
128    operation: ApiOperation,
129    method: http::Method,
130) -> Result<HttpResponse> {
131    send_request(runner, url, body, request_headers, method, operation)
132}
133
134pub fn get<R: HttpRunner<Response = HttpResponse>, D: Serialize, T>(
135    runner: &Arc<R>,
136    url: &str,
137    body: Option<&Body<D>>,
138    request_headers: Headers,
139    operation: ApiOperation,
140    mapper: impl Fn(&serde_json::Value) -> T,
141) -> Result<T> {
142    let response = send_request(
143        runner,
144        url,
145        body,
146        request_headers,
147        http::Method::GET,
148        operation,
149    )?;
150    let body = json_loads(&response.body)?;
151    Ok(mapper(&body))
152}
153
154pub fn get_json<R: HttpRunner<Response = HttpResponse>, D: Serialize>(
155    runner: &Arc<R>,
156    url: &str,
157    body: Option<&Body<D>>,
158    request_headers: Headers,
159    operation: ApiOperation,
160) -> Result<serde_json::Value> {
161    let response = send_request(
162        runner,
163        url,
164        body,
165        request_headers,
166        http::Method::GET,
167        operation,
168    )?;
169    json_loads(&response.body)
170}
171
172pub fn get_raw<R: HttpRunner<Response = HttpResponse>, D: Serialize>(
173    runner: &Arc<R>,
174    url: &str,
175    body: Option<&Body<D>>,
176    request_headers: Headers,
177    operation: ApiOperation,
178) -> Result<HttpResponse> {
179    send_request(
180        runner,
181        url,
182        body,
183        request_headers,
184        http::Method::GET,
185        operation,
186    )
187}
188
189fn send_request<R: HttpRunner<Response = HttpResponse>, T: Serialize>(
190    runner: &Arc<R>,
191    url: &str,
192    body: Option<&Body<T>>,
193    request_headers: Headers,
194    method: http::Method,
195    operation: ApiOperation,
196) -> Result<HttpResponse> {
197    let mut request = if let Some(body) = body {
198        http::Request::builder()
199            .method(method.clone())
200            .resource(Resource::new(url, Some(operation)))
201            .body(body)
202            .headers(request_headers)
203            .build()
204            .unwrap()
205    } else {
206        http::Request::builder()
207            .method(method.clone())
208            .resource(Resource::new(url, Some(operation)))
209            .headers(request_headers)
210            .build()
211            .unwrap()
212    };
213    let response = runner.run(&mut request)?;
214    // TODO: Might not be the right place as some APIs might still need to check
215    // the response status code. See github merge request request reviewers when
216    // a 422 is considered an error.
217    if !response.is_ok(&method) {
218        return Err(query_error(url, &response).into());
219    }
220    Ok(response)
221}
222
223pub fn paged<R, T>(
224    runner: &Arc<R>,
225    url: &str,
226    list_args: Option<ListBodyArgs>,
227    request_headers: Headers,
228    iter_over_sub_array: Option<&str>,
229    operation: ApiOperation,
230    mapper: impl Fn(&serde_json::Value) -> T,
231) -> Result<Vec<T>>
232where
233    R: HttpRunner<Response = HttpResponse>,
234    T: Clone + Timestamp + Into<DisplayBody>,
235{
236    let request = build_list_request(url, &list_args, request_headers, operation);
237    let mut throttle_time = None;
238    let mut throttle_range = None;
239    let mut backoff_max_retries = 0;
240    let mut backoff_wait_time = 60;
241    if let Some(list_args) = &list_args {
242        throttle_time = list_args.throttle_time;
243        throttle_range = list_args.throttle_range;
244        backoff_max_retries = list_args.get_args.backoff_max_retries;
245        backoff_wait_time = list_args.get_args.backoff_retry_after;
246    }
247    let throttle_strategy: Box<dyn ThrottleStrategy> = match throttle_time {
248        Some(throttle_time) => Box::new(throttle::PreFixed::new(throttle_time)),
249        None => match throttle_range {
250            Some((min, max)) => Box::new(throttle::Random::new(min, max)),
251            None => Box::new(throttle::AutoRate::default()),
252        },
253    };
254    let backoff = Backoff::new(
255        runner,
256        backoff_max_retries,
257        backoff_wait_time,
258        time::now_epoch_seconds,
259        Box::new(Exponential),
260        Box::new(throttle::DynamicFixed),
261    );
262    let paginator = Paginator::new(runner, request, url, backoff, throttle_strategy);
263    let all_data = paginator
264        .map(|response| {
265            let response = response?;
266            if !response.is_ok(&http::Method::GET) {
267                return Err(query_error(url, &response).into());
268            }
269            if iter_over_sub_array.is_some() {
270                let body = json_loads(&response.body)?;
271                let paged_data = body[iter_over_sub_array.unwrap()]
272                    .as_array()
273                    .ok_or_else(|| {
274                        error::GRError::RemoteUnexpectedResponseContract(format!(
275                            "Expected an array of {} but got: {}",
276                            iter_over_sub_array.unwrap(),
277                            response.body
278                        ))
279                    })?
280                    .iter()
281                    .fold(Vec::new(), |mut paged_data, data| {
282                        paged_data.push(mapper(data));
283                        paged_data
284                    });
285                if let Some(list_args) = &list_args {
286                    if list_args.flush {
287                        display::print(
288                            &mut std::io::stdout(),
289                            paged_data,
290                            list_args.get_args.clone(),
291                        )
292                        .unwrap();
293                        return Ok(Vec::new());
294                    }
295                }
296                return Ok(paged_data);
297            }
298            let paged_data =
299                json_load_page(&response.body)?
300                    .iter()
301                    .fold(Vec::new(), |mut paged_data, data| {
302                        paged_data.push(mapper(data));
303                        paged_data
304                    });
305            if let Some(list_args) = &list_args {
306                if list_args.flush {
307                    display::print(
308                        &mut std::io::stdout(),
309                        paged_data,
310                        list_args.get_args.clone(),
311                    )
312                    .unwrap();
313                    return Ok(Vec::new());
314                }
315            }
316            Ok(paged_data)
317        })
318        .collect::<Result<Vec<Vec<T>>>>()
319        .map(|paged_data| paged_data.into_iter().flatten().collect());
320    match all_data {
321        Ok(paged_data) => Ok(sort_filter_by_date(paged_data, list_args)?),
322        Err(err) => Err(err),
323    }
324}
325
326fn build_list_request<'a>(
327    url: &str,
328    list_args: &Option<ListBodyArgs>,
329    request_headers: Headers,
330    operation: ApiOperation,
331) -> Request<'a, ()> {
332    let mut request: http::Request<()> =
333        http::Request::new(url, http::Method::GET).with_api_operation(operation);
334    request.set_headers(request_headers);
335    if let Some(list_args) = list_args {
336        if let Some(from_page) = list_args.page {
337            let url = if url.contains('?') {
338                format!("{}&page={}", url, &from_page)
339            } else {
340                format!("{}?page={}", url, &from_page)
341            };
342            request.set_max_pages(list_args.max_pages.unwrap());
343            request.set_url(&url);
344        }
345    }
346    request
347}
348
349#[cfg(test)]
350mod test {
351    use std::rc::Rc;
352
353    use crate::{
354        io::{FlowControlHeaders, Page, PageHeader},
355        test::utils::MockRunner,
356    };
357
358    use super::*;
359
360    #[test]
361    fn test_numpages_assume_one_if_pages_not_available() {
362        let response = HttpResponse::builder().status(200).build().unwrap();
363        let client = Arc::new(MockRunner::new(vec![response]));
364        let url = "https://github.com/api/v4/projects/1/pipelines";
365        let headers = Headers::new();
366        let operation = ApiOperation::Pipeline;
367        let num_pages = num_pages(&client, url, headers, operation).unwrap();
368        assert_eq!(Some(1), num_pages);
369    }
370
371    #[test]
372    fn test_numpages_error_on_404() {
373        let response = HttpResponse::builder().status(404).build().unwrap();
374        let client = Arc::new(MockRunner::new(vec![response]));
375        let url = "https://github.com/api/v4/projects/1/pipelines";
376        let headers = Headers::new();
377        let operation = ApiOperation::Pipeline;
378        assert!(num_pages(&client, url, headers, operation).is_err());
379    }
380
381    #[test]
382    fn test_num_resources_assume_one_if_pages_not_available() {
383        let headers = Headers::new();
384        let response = HttpResponse::builder().status(200).build().unwrap();
385        let client = Arc::new(MockRunner::new(vec![response]));
386        let url = "https://github.com/api/v4/projects/1/pipelines?page=1";
387        let num_resources = num_resources(&client, url, headers, ApiOperation::Pipeline).unwrap();
388        assert_eq!(30, num_resources.unwrap().delta);
389    }
390
391    #[test]
392    fn test_num_resources_with_last_page_and_per_page_available() {
393        let mut headers = Headers::new();
394        // Value doesn't matter as we are injecting the header processor
395        // enforcing the last page and per_page values.
396        headers.set("link", "");
397        let next_page = Page::new("https://gitlab.com/api/v4/projects/1/pipelines?page=2", 2);
398        let last_page = Page::new("https://gitlab.com/api/v4/projects/1/pipelines?page=4", 4);
399        let mut page_header = PageHeader::new();
400        page_header.set_next_page(next_page);
401        page_header.set_last_page(last_page);
402        page_header.per_page = 20;
403        let flow_control_header =
404            FlowControlHeaders::new(Rc::new(Some(page_header)), Rc::new(None));
405        let response = HttpResponse::builder()
406            .status(200)
407            .headers(headers)
408            .flow_control_headers(flow_control_header)
409            .build()
410            .unwrap();
411        let client = Arc::new(MockRunner::new(vec![response]));
412        let url = "https://gitlab.com/api/v4/projects/1/pipelines?page=1";
413        let num_resources = num_resources(&client, url, Headers::new(), ApiOperation::Pipeline)
414            .unwrap()
415            .unwrap();
416        assert_eq!(80, num_resources.num);
417        assert_eq!(20, num_resources.delta);
418    }
419
420    #[test]
421    fn test_numresources_error_on_404() {
422        let response = HttpResponse::builder().status(404).build().unwrap();
423        let client = Arc::new(MockRunner::new(vec![response]));
424        let url = "https://github.com/api/v4/projects/1/pipelines";
425        let headers = Headers::new();
426        let operation = ApiOperation::Pipeline;
427        assert!(num_resources(&client, url, headers, operation).is_err());
428    }
429}