gr/remote/
query.rs

1use std::borrow::Borrow;
2use std::iter::Iterator;
3use std::sync::Arc;
4
5use serde::Serialize;
6
7use crate::api_traits::Timestamp;
8use crate::backoff::{Backoff, Exponential};
9use crate::display::DisplayBody;
10use crate::http::throttle::{self, ThrottleStrategy};
11use crate::time;
12use crate::{
13    api_defaults,
14    api_traits::{ApiOperation, NumberDeltaErr},
15    display, error,
16    http::{self, Body, Headers, Paginator, Request, Resource},
17    io::{HttpResponse, HttpRunner},
18    json_load_page, json_loads,
19    remote::ListBodyArgs,
20    time::sort_filter_by_date,
21    Result,
22};
23
24fn get_remote_resource_headers<R: HttpRunner<Response = HttpResponse>>(
25    runner: &Arc<R>,
26    url: &str,
27    request_headers: Headers,
28    api_operation: ApiOperation,
29) -> Result<HttpResponse> {
30    send_request::<_, String>(
31        runner,
32        url,
33        None,
34        request_headers,
35        http::Method::HEAD,
36        api_operation,
37    )
38}
39
40pub fn num_pages<R: HttpRunner<Response = HttpResponse>>(
41    runner: &Arc<R>,
42    url: &str,
43    request_headers: Headers,
44    api_operation: ApiOperation,
45) -> Result<Option<u32>> {
46    let response = get_remote_resource_headers(runner, url, request_headers, api_operation)?;
47    match response.get_page_headers().borrow() {
48        Some(page_header) => {
49            if let Some(last_page) = page_header.last_page() {
50                return Ok(Some(last_page.number));
51            }
52            Ok(None)
53        }
54        // Github does not return page headers when there is only one page, so
55        // we assume 1 page in this case.
56        None => Ok(Some(1)),
57    }
58}
59
60pub fn num_resources<R: HttpRunner<Response = HttpResponse>>(
61    runner: &Arc<R>,
62    url: &str,
63    request_headers: Headers,
64    api_operation: ApiOperation,
65) -> Result<Option<NumberDeltaErr>> {
66    let response = get_remote_resource_headers(runner, url, request_headers, api_operation)?;
67    match response.get_page_headers().borrow() {
68        Some(page_header) => {
69            // total resources per_page * total_pages
70            if let Some(last_page) = page_header.last_page() {
71                let count = last_page.number * page_header.per_page;
72                return Ok(Some(NumberDeltaErr {
73                    num: count,
74                    delta: page_header.per_page,
75                }));
76            }
77            Ok(None)
78        }
79        None => {
80            // Github does not return page headers when there is only one page, so
81            // we assume 1 page in this case.
82            Ok(Some(NumberDeltaErr {
83                num: 1,
84                delta: api_defaults::DEFAULT_PER_PAGE,
85            }))
86        }
87    }
88}
89
90pub fn query_error(url: &str, response: &HttpResponse) -> error::GRError {
91    error::GRError::RemoteServerError(format!(
92        "Failed to submit request to URL: {} with status code: {} and body: {}",
93        url, response.status, response.body
94    ))
95}
96
97pub fn send<R: HttpRunner<Response = HttpResponse>, D: Serialize, T>(
98    runner: &Arc<R>,
99    url: &str,
100    body: Option<&Body<D>>,
101    request_headers: Headers,
102    operation: ApiOperation,
103    mapper: impl Fn(&serde_json::Value) -> T,
104    method: http::Method,
105) -> Result<T> {
106    let response = send_request(runner, url, body, request_headers, method, operation)?;
107    let body = json_loads(&response.body)?;
108    Ok(mapper(&body))
109}
110
111pub fn send_json<R: HttpRunner<Response = HttpResponse>, D: Serialize>(
112    runner: &Arc<R>,
113    url: &str,
114    body: Option<&Body<D>>,
115    request_headers: Headers,
116    operation: ApiOperation,
117    method: http::Method,
118) -> Result<serde_json::Value> {
119    let response = send_request(runner, url, body, request_headers, method, operation)?;
120    json_loads(&response.body)
121}
122
123pub fn send_raw<R: HttpRunner<Response = HttpResponse>, D: Serialize>(
124    runner: &Arc<R>,
125    url: &str,
126    body: Option<&Body<D>>,
127    request_headers: Headers,
128    operation: ApiOperation,
129    method: http::Method,
130) -> Result<HttpResponse> {
131    send_request(runner, url, body, request_headers, method, operation)
132}
133
134pub fn get<R: HttpRunner<Response = HttpResponse>, D: Serialize, T>(
135    runner: &Arc<R>,
136    url: &str,
137    body: Option<&Body<D>>,
138    request_headers: Headers,
139    operation: ApiOperation,
140    mapper: impl Fn(&serde_json::Value) -> T,
141) -> Result<T> {
142    let response = send_request(
143        runner,
144        url,
145        body,
146        request_headers,
147        http::Method::GET,
148        operation,
149    )?;
150    let body = json_loads(&response.body)?;
151    Ok(mapper(&body))
152}
153
154pub fn get_json<R: HttpRunner<Response = HttpResponse>, D: Serialize>(
155    runner: &Arc<R>,
156    url: &str,
157    body: Option<&Body<D>>,
158    request_headers: Headers,
159    operation: ApiOperation,
160) -> Result<serde_json::Value> {
161    let response = send_request(
162        runner,
163        url,
164        body,
165        request_headers,
166        http::Method::GET,
167        operation,
168    )?;
169    json_loads(&response.body)
170}
171
172pub fn get_raw<R: HttpRunner<Response = HttpResponse>, D: Serialize>(
173    runner: &Arc<R>,
174    url: &str,
175    body: Option<&Body<D>>,
176    request_headers: Headers,
177    operation: ApiOperation,
178) -> Result<HttpResponse> {
179    send_request(
180        runner,
181        url,
182        body,
183        request_headers,
184        http::Method::GET,
185        operation,
186    )
187}
188
189fn send_request<R: HttpRunner<Response = HttpResponse>, T: Serialize>(
190    runner: &Arc<R>,
191    url: &str,
192    body: Option<&Body<T>>,
193    request_headers: Headers,
194    method: http::Method,
195    operation: ApiOperation,
196) -> Result<HttpResponse> {
197    let mut request = if let Some(body) = body {
198        http::Request::builder()
199            .method(method.clone())
200            .resource(Resource::new(url, Some(operation)))
201            .body(body)
202            .headers(request_headers)
203            .build()
204            .unwrap()
205    } else {
206        http::Request::builder()
207            .method(method.clone())
208            .resource(Resource::new(url, Some(operation)))
209            .headers(request_headers)
210            .build()
211            .unwrap()
212    };
213    let response = runner.run(&mut request)?;
214    // TODO: Might not be the right place as some APIs might still need to check
215    // the response status code. See github merge request request reviewers when
216    // a 422 is considered an error.
217    if !response.is_ok(&method) {
218        return Err(query_error(url, &response).into());
219    }
220    Ok(response)
221}
222
223pub fn paged<R, T>(
224    runner: &Arc<R>,
225    url: &str,
226    list_args: Option<ListBodyArgs>,
227    request_headers: Headers,
228    iter_over_sub_array: Option<&str>,
229    operation: ApiOperation,
230    mapper: impl Fn(&serde_json::Value) -> T,
231) -> Result<Vec<T>>
232where
233    R: HttpRunner<Response = HttpResponse>,
234    T: Clone + Timestamp + Into<DisplayBody>,
235{
236    let request = build_list_request(url, &list_args, request_headers, operation);
237    let mut throttle_time = None;
238    let mut throttle_range = None;
239    let mut backoff_max_retries = 0;
240    let mut backoff_wait_time = 60;
241    if let Some(list_args) = &list_args {
242        throttle_time = list_args.throttle_time;
243        throttle_range = list_args.throttle_range;
244        backoff_max_retries = list_args.get_args.backoff_max_retries;
245        backoff_wait_time = list_args.get_args.backoff_retry_after;
246    }
247    let throttle_strategy: Box<dyn ThrottleStrategy> = match throttle_time {
248        Some(throttle_time) => Box::new(throttle::PreFixed::new(throttle_time)),
249        None => match throttle_range {
250            Some((min, max)) => Box::new(throttle::Random::new(min, max)),
251            None => Box::new(throttle::AutoRate::default()),
252        },
253    };
254    let backoff = Backoff::new(
255        runner,
256        backoff_max_retries,
257        backoff_wait_time,
258        time::now_epoch_seconds,
259        Box::new(Exponential),
260        Box::new(throttle::DynamicFixed),
261    );
262    let paginator = Paginator::new(runner, request, url, backoff, throttle_strategy);
263    let all_data = paginator
264        .map(|response| {
265            let response = response?;
266            if !response.is_ok(&http::Method::GET) {
267                return Err(query_error(url, &response).into());
268            }
269            if let Some(iter_item) = iter_over_sub_array {
270                let body = json_loads(&response.body)?;
271                let paged_data = body[iter_item]
272                    .as_array()
273                    .ok_or_else(|| {
274                        error::GRError::RemoteUnexpectedResponseContract(format!(
275                            "Expected an array of {} but got: {}",
276                            iter_item, response.body
277                        ))
278                    })?
279                    .iter()
280                    .fold(Vec::new(), |mut paged_data, data| {
281                        paged_data.push(mapper(data));
282                        paged_data
283                    });
284                if let Some(list_args) = &list_args {
285                    if list_args.flush {
286                        display::print(
287                            &mut std::io::stdout(),
288                            paged_data,
289                            list_args.get_args.clone(),
290                        )
291                        .unwrap();
292                        return Ok(Vec::new());
293                    }
294                }
295                return Ok(paged_data);
296            }
297            let paged_data =
298                json_load_page(&response.body)?
299                    .iter()
300                    .fold(Vec::new(), |mut paged_data, data| {
301                        paged_data.push(mapper(data));
302                        paged_data
303                    });
304            if let Some(list_args) = &list_args {
305                if list_args.flush {
306                    display::print(
307                        &mut std::io::stdout(),
308                        paged_data,
309                        list_args.get_args.clone(),
310                    )
311                    .unwrap();
312                    return Ok(Vec::new());
313                }
314            }
315            Ok(paged_data)
316        })
317        .collect::<Result<Vec<Vec<T>>>>()
318        .map(|paged_data| paged_data.into_iter().flatten().collect());
319    match all_data {
320        Ok(paged_data) => Ok(sort_filter_by_date(paged_data, list_args)?),
321        Err(err) => Err(err),
322    }
323}
324
325fn build_list_request<'a>(
326    url: &str,
327    list_args: &Option<ListBodyArgs>,
328    request_headers: Headers,
329    operation: ApiOperation,
330) -> Request<'a, ()> {
331    let mut request: http::Request<()> =
332        http::Request::new(url, http::Method::GET).with_api_operation(operation);
333    request.set_headers(request_headers);
334    if let Some(list_args) = list_args {
335        if let Some(from_page) = list_args.page {
336            let url = if url.contains('?') {
337                format!("{}&page={}", url, &from_page)
338            } else {
339                format!("{}?page={}", url, &from_page)
340            };
341            request.set_max_pages(list_args.max_pages.unwrap());
342            request.set_url(&url);
343        }
344    }
345    request
346}
347
348#[cfg(test)]
349mod test {
350    use std::rc::Rc;
351
352    use crate::{
353        io::{FlowControlHeaders, Page, PageHeader},
354        test::utils::MockRunner,
355    };
356
357    use super::*;
358
359    #[test]
360    fn test_numpages_assume_one_if_pages_not_available() {
361        let response = HttpResponse::builder().status(200).build().unwrap();
362        let client = Arc::new(MockRunner::new(vec![response]));
363        let url = "https://github.com/api/v4/projects/1/pipelines";
364        let headers = Headers::new();
365        let operation = ApiOperation::Pipeline;
366        let num_pages = num_pages(&client, url, headers, operation).unwrap();
367        assert_eq!(Some(1), num_pages);
368    }
369
370    #[test]
371    fn test_numpages_error_on_404() {
372        let response = HttpResponse::builder().status(404).build().unwrap();
373        let client = Arc::new(MockRunner::new(vec![response]));
374        let url = "https://github.com/api/v4/projects/1/pipelines";
375        let headers = Headers::new();
376        let operation = ApiOperation::Pipeline;
377        assert!(num_pages(&client, url, headers, operation).is_err());
378    }
379
380    #[test]
381    fn test_num_resources_assume_one_if_pages_not_available() {
382        let headers = Headers::new();
383        let response = HttpResponse::builder().status(200).build().unwrap();
384        let client = Arc::new(MockRunner::new(vec![response]));
385        let url = "https://github.com/api/v4/projects/1/pipelines?page=1";
386        let num_resources = num_resources(&client, url, headers, ApiOperation::Pipeline).unwrap();
387        assert_eq!(30, num_resources.unwrap().delta);
388    }
389
390    #[test]
391    fn test_num_resources_with_last_page_and_per_page_available() {
392        let mut headers = Headers::new();
393        // Value doesn't matter as we are injecting the header processor
394        // enforcing the last page and per_page values.
395        headers.set("link", "");
396        let next_page = Page::new("https://gitlab.com/api/v4/projects/1/pipelines?page=2", 2);
397        let last_page = Page::new("https://gitlab.com/api/v4/projects/1/pipelines?page=4", 4);
398        let mut page_header = PageHeader::new();
399        page_header.set_next_page(next_page);
400        page_header.set_last_page(last_page);
401        page_header.per_page = 20;
402        let flow_control_header =
403            FlowControlHeaders::new(Rc::new(Some(page_header)), Rc::new(None));
404        let response = HttpResponse::builder()
405            .status(200)
406            .headers(headers)
407            .flow_control_headers(flow_control_header)
408            .build()
409            .unwrap();
410        let client = Arc::new(MockRunner::new(vec![response]));
411        let url = "https://gitlab.com/api/v4/projects/1/pipelines?page=1";
412        let num_resources = num_resources(&client, url, Headers::new(), ApiOperation::Pipeline)
413            .unwrap()
414            .unwrap();
415        assert_eq!(80, num_resources.num);
416        assert_eq!(20, num_resources.delta);
417    }
418
419    #[test]
420    fn test_numresources_error_on_404() {
421        let response = HttpResponse::builder().status(404).build().unwrap();
422        let client = Arc::new(MockRunner::new(vec![response]));
423        let url = "https://github.com/api/v4/projects/1/pipelines";
424        let headers = Headers::new();
425        let operation = ApiOperation::Pipeline;
426        assert!(num_resources(&client, url, headers, operation).is_err());
427    }
428}