1use std::borrow::Borrow;
2use std::iter::Iterator;
3use std::sync::Arc;
4
5use serde::Serialize;
6
7use crate::api_traits::Timestamp;
8use crate::backoff::{Backoff, Exponential};
9use crate::display::DisplayBody;
10use crate::http::throttle::{self, ThrottleStrategy};
11use crate::time;
12use crate::{
13 api_defaults,
14 api_traits::{ApiOperation, NumberDeltaErr},
15 display, error,
16 http::{self, Body, Headers, Paginator, Request, Resource},
17 io::{HttpResponse, HttpRunner},
18 json_load_page, json_loads,
19 remote::ListBodyArgs,
20 time::sort_filter_by_date,
21 Result,
22};
23
24fn get_remote_resource_headers<R: HttpRunner<Response = HttpResponse>>(
25 runner: &Arc<R>,
26 url: &str,
27 request_headers: Headers,
28 api_operation: ApiOperation,
29) -> Result<HttpResponse> {
30 send_request::<_, String>(
31 runner,
32 url,
33 None,
34 request_headers,
35 http::Method::HEAD,
36 api_operation,
37 )
38}
39
40pub fn num_pages<R: HttpRunner<Response = HttpResponse>>(
41 runner: &Arc<R>,
42 url: &str,
43 request_headers: Headers,
44 api_operation: ApiOperation,
45) -> Result<Option<u32>> {
46 let response = get_remote_resource_headers(runner, url, request_headers, api_operation)?;
47 match response.get_page_headers().borrow() {
48 Some(page_header) => {
49 if let Some(last_page) = page_header.last_page() {
50 return Ok(Some(last_page.number));
51 }
52 Ok(None)
53 }
54 None => Ok(Some(1)),
57 }
58}
59
60pub fn num_resources<R: HttpRunner<Response = HttpResponse>>(
61 runner: &Arc<R>,
62 url: &str,
63 request_headers: Headers,
64 api_operation: ApiOperation,
65) -> Result<Option<NumberDeltaErr>> {
66 let response = get_remote_resource_headers(runner, url, request_headers, api_operation)?;
67 match response.get_page_headers().borrow() {
68 Some(page_header) => {
69 if let Some(last_page) = page_header.last_page() {
71 let count = last_page.number * page_header.per_page;
72 return Ok(Some(NumberDeltaErr {
73 num: count,
74 delta: page_header.per_page,
75 }));
76 }
77 Ok(None)
78 }
79 None => {
80 Ok(Some(NumberDeltaErr {
83 num: 1,
84 delta: api_defaults::DEFAULT_PER_PAGE,
85 }))
86 }
87 }
88}
89
90pub fn query_error(url: &str, response: &HttpResponse) -> error::GRError {
91 error::GRError::RemoteServerError(format!(
92 "Failed to submit request to URL: {} with status code: {} and body: {}",
93 url, response.status, response.body
94 ))
95}
96
97pub fn send<R: HttpRunner<Response = HttpResponse>, D: Serialize, T>(
98 runner: &Arc<R>,
99 url: &str,
100 body: Option<&Body<D>>,
101 request_headers: Headers,
102 operation: ApiOperation,
103 mapper: impl Fn(&serde_json::Value) -> T,
104 method: http::Method,
105) -> Result<T> {
106 let response = send_request(runner, url, body, request_headers, method, operation)?;
107 let body = json_loads(&response.body)?;
108 Ok(mapper(&body))
109}
110
111pub fn send_json<R: HttpRunner<Response = HttpResponse>, D: Serialize>(
112 runner: &Arc<R>,
113 url: &str,
114 body: Option<&Body<D>>,
115 request_headers: Headers,
116 operation: ApiOperation,
117 method: http::Method,
118) -> Result<serde_json::Value> {
119 let response = send_request(runner, url, body, request_headers, method, operation)?;
120 json_loads(&response.body)
121}
122
123pub fn send_raw<R: HttpRunner<Response = HttpResponse>, D: Serialize>(
124 runner: &Arc<R>,
125 url: &str,
126 body: Option<&Body<D>>,
127 request_headers: Headers,
128 operation: ApiOperation,
129 method: http::Method,
130) -> Result<HttpResponse> {
131 send_request(runner, url, body, request_headers, method, operation)
132}
133
134pub fn get<R: HttpRunner<Response = HttpResponse>, D: Serialize, T>(
135 runner: &Arc<R>,
136 url: &str,
137 body: Option<&Body<D>>,
138 request_headers: Headers,
139 operation: ApiOperation,
140 mapper: impl Fn(&serde_json::Value) -> T,
141) -> Result<T> {
142 let response = send_request(
143 runner,
144 url,
145 body,
146 request_headers,
147 http::Method::GET,
148 operation,
149 )?;
150 let body = json_loads(&response.body)?;
151 Ok(mapper(&body))
152}
153
154pub fn get_json<R: HttpRunner<Response = HttpResponse>, D: Serialize>(
155 runner: &Arc<R>,
156 url: &str,
157 body: Option<&Body<D>>,
158 request_headers: Headers,
159 operation: ApiOperation,
160) -> Result<serde_json::Value> {
161 let response = send_request(
162 runner,
163 url,
164 body,
165 request_headers,
166 http::Method::GET,
167 operation,
168 )?;
169 json_loads(&response.body)
170}
171
172pub fn get_raw<R: HttpRunner<Response = HttpResponse>, D: Serialize>(
173 runner: &Arc<R>,
174 url: &str,
175 body: Option<&Body<D>>,
176 request_headers: Headers,
177 operation: ApiOperation,
178) -> Result<HttpResponse> {
179 send_request(
180 runner,
181 url,
182 body,
183 request_headers,
184 http::Method::GET,
185 operation,
186 )
187}
188
189fn send_request<R: HttpRunner<Response = HttpResponse>, T: Serialize>(
190 runner: &Arc<R>,
191 url: &str,
192 body: Option<&Body<T>>,
193 request_headers: Headers,
194 method: http::Method,
195 operation: ApiOperation,
196) -> Result<HttpResponse> {
197 let mut request = if let Some(body) = body {
198 http::Request::builder()
199 .method(method.clone())
200 .resource(Resource::new(url, Some(operation)))
201 .body(body)
202 .headers(request_headers)
203 .build()
204 .unwrap()
205 } else {
206 http::Request::builder()
207 .method(method.clone())
208 .resource(Resource::new(url, Some(operation)))
209 .headers(request_headers)
210 .build()
211 .unwrap()
212 };
213 let response = runner.run(&mut request)?;
214 if !response.is_ok(&method) {
218 return Err(query_error(url, &response).into());
219 }
220 Ok(response)
221}
222
223pub fn paged<R, T>(
224 runner: &Arc<R>,
225 url: &str,
226 list_args: Option<ListBodyArgs>,
227 request_headers: Headers,
228 iter_over_sub_array: Option<&str>,
229 operation: ApiOperation,
230 mapper: impl Fn(&serde_json::Value) -> T,
231) -> Result<Vec<T>>
232where
233 R: HttpRunner<Response = HttpResponse>,
234 T: Clone + Timestamp + Into<DisplayBody>,
235{
236 let request = build_list_request(url, &list_args, request_headers, operation);
237 let mut throttle_time = None;
238 let mut throttle_range = None;
239 let mut backoff_max_retries = 0;
240 let mut backoff_wait_time = 60;
241 if let Some(list_args) = &list_args {
242 throttle_time = list_args.throttle_time;
243 throttle_range = list_args.throttle_range;
244 backoff_max_retries = list_args.get_args.backoff_max_retries;
245 backoff_wait_time = list_args.get_args.backoff_retry_after;
246 }
247 let throttle_strategy: Box<dyn ThrottleStrategy> = match throttle_time {
248 Some(throttle_time) => Box::new(throttle::PreFixed::new(throttle_time)),
249 None => match throttle_range {
250 Some((min, max)) => Box::new(throttle::Random::new(min, max)),
251 None => Box::new(throttle::AutoRate::default()),
252 },
253 };
254 let backoff = Backoff::new(
255 runner,
256 backoff_max_retries,
257 backoff_wait_time,
258 time::now_epoch_seconds,
259 Box::new(Exponential),
260 Box::new(throttle::DynamicFixed),
261 );
262 let paginator = Paginator::new(runner, request, url, backoff, throttle_strategy);
263 let all_data = paginator
264 .map(|response| {
265 let response = response?;
266 if !response.is_ok(&http::Method::GET) {
267 return Err(query_error(url, &response).into());
268 }
269 if iter_over_sub_array.is_some() {
270 let body = json_loads(&response.body)?;
271 let paged_data = body[iter_over_sub_array.unwrap()]
272 .as_array()
273 .ok_or_else(|| {
274 error::GRError::RemoteUnexpectedResponseContract(format!(
275 "Expected an array of {} but got: {}",
276 iter_over_sub_array.unwrap(),
277 response.body
278 ))
279 })?
280 .iter()
281 .fold(Vec::new(), |mut paged_data, data| {
282 paged_data.push(mapper(data));
283 paged_data
284 });
285 if let Some(list_args) = &list_args {
286 if list_args.flush {
287 display::print(
288 &mut std::io::stdout(),
289 paged_data,
290 list_args.get_args.clone(),
291 )
292 .unwrap();
293 return Ok(Vec::new());
294 }
295 }
296 return Ok(paged_data);
297 }
298 let paged_data =
299 json_load_page(&response.body)?
300 .iter()
301 .fold(Vec::new(), |mut paged_data, data| {
302 paged_data.push(mapper(data));
303 paged_data
304 });
305 if let Some(list_args) = &list_args {
306 if list_args.flush {
307 display::print(
308 &mut std::io::stdout(),
309 paged_data,
310 list_args.get_args.clone(),
311 )
312 .unwrap();
313 return Ok(Vec::new());
314 }
315 }
316 Ok(paged_data)
317 })
318 .collect::<Result<Vec<Vec<T>>>>()
319 .map(|paged_data| paged_data.into_iter().flatten().collect());
320 match all_data {
321 Ok(paged_data) => Ok(sort_filter_by_date(paged_data, list_args)?),
322 Err(err) => Err(err),
323 }
324}
325
326fn build_list_request<'a>(
327 url: &str,
328 list_args: &Option<ListBodyArgs>,
329 request_headers: Headers,
330 operation: ApiOperation,
331) -> Request<'a, ()> {
332 let mut request: http::Request<()> =
333 http::Request::new(url, http::Method::GET).with_api_operation(operation);
334 request.set_headers(request_headers);
335 if let Some(list_args) = list_args {
336 if let Some(from_page) = list_args.page {
337 let url = if url.contains('?') {
338 format!("{}&page={}", url, &from_page)
339 } else {
340 format!("{}?page={}", url, &from_page)
341 };
342 request.set_max_pages(list_args.max_pages.unwrap());
343 request.set_url(&url);
344 }
345 }
346 request
347}
348
349#[cfg(test)]
350mod test {
351 use std::rc::Rc;
352
353 use crate::{
354 io::{FlowControlHeaders, Page, PageHeader},
355 test::utils::MockRunner,
356 };
357
358 use super::*;
359
360 #[test]
361 fn test_numpages_assume_one_if_pages_not_available() {
362 let response = HttpResponse::builder().status(200).build().unwrap();
363 let client = Arc::new(MockRunner::new(vec![response]));
364 let url = "https://github.com/api/v4/projects/1/pipelines";
365 let headers = Headers::new();
366 let operation = ApiOperation::Pipeline;
367 let num_pages = num_pages(&client, url, headers, operation).unwrap();
368 assert_eq!(Some(1), num_pages);
369 }
370
371 #[test]
372 fn test_numpages_error_on_404() {
373 let response = HttpResponse::builder().status(404).build().unwrap();
374 let client = Arc::new(MockRunner::new(vec![response]));
375 let url = "https://github.com/api/v4/projects/1/pipelines";
376 let headers = Headers::new();
377 let operation = ApiOperation::Pipeline;
378 assert!(num_pages(&client, url, headers, operation).is_err());
379 }
380
381 #[test]
382 fn test_num_resources_assume_one_if_pages_not_available() {
383 let headers = Headers::new();
384 let response = HttpResponse::builder().status(200).build().unwrap();
385 let client = Arc::new(MockRunner::new(vec![response]));
386 let url = "https://github.com/api/v4/projects/1/pipelines?page=1";
387 let num_resources = num_resources(&client, url, headers, ApiOperation::Pipeline).unwrap();
388 assert_eq!(30, num_resources.unwrap().delta);
389 }
390
391 #[test]
392 fn test_num_resources_with_last_page_and_per_page_available() {
393 let mut headers = Headers::new();
394 headers.set("link", "");
397 let next_page = Page::new("https://gitlab.com/api/v4/projects/1/pipelines?page=2", 2);
398 let last_page = Page::new("https://gitlab.com/api/v4/projects/1/pipelines?page=4", 4);
399 let mut page_header = PageHeader::new();
400 page_header.set_next_page(next_page);
401 page_header.set_last_page(last_page);
402 page_header.per_page = 20;
403 let flow_control_header =
404 FlowControlHeaders::new(Rc::new(Some(page_header)), Rc::new(None));
405 let response = HttpResponse::builder()
406 .status(200)
407 .headers(headers)
408 .flow_control_headers(flow_control_header)
409 .build()
410 .unwrap();
411 let client = Arc::new(MockRunner::new(vec![response]));
412 let url = "https://gitlab.com/api/v4/projects/1/pipelines?page=1";
413 let num_resources = num_resources(&client, url, Headers::new(), ApiOperation::Pipeline)
414 .unwrap()
415 .unwrap();
416 assert_eq!(80, num_resources.num);
417 assert_eq!(20, num_resources.delta);
418 }
419
420 #[test]
421 fn test_numresources_error_on_404() {
422 let response = HttpResponse::builder().status(404).build().unwrap();
423 let client = Arc::new(MockRunner::new(vec![response]));
424 let url = "https://github.com/api/v4/projects/1/pipelines";
425 let headers = Headers::new();
426 let operation = ApiOperation::Pipeline;
427 assert!(num_resources(&client, url, headers, operation).is_err());
428 }
429}