1#![allow(missing_docs)] use std::fmt;
4use std::pin::Pin;
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6
7use futures::{future, prelude::*, Future as StdFuture, Stream as StdStream};
8#[cfg(feature = "httpcache")]
9use http::header::IF_NONE_MATCH;
10use http::header::{HeaderMap, HeaderValue};
11use http::header::{ACCEPT, AUTHORIZATION, ETAG, LINK, USER_AGENT};
12use http::{Method, StatusCode};
13#[cfg(feature = "httpcache")]
14use hyperx::header::LinkValue;
15use hyperx::header::{qitem, Link, RelationType};
16use log::{debug, trace};
17use mime::Mime;
18use reqwest::Url;
19use reqwest::{Body, Client};
20use serde::de::DeserializeOwned;
21
22#[doc(hidden)] #[cfg(feature = "httpcache")]
24pub mod http_cache;
25#[macro_use]
26mod macros; pub mod errors;
28pub mod projects;
29pub mod repository;
30pub mod pull_requests;
31pub mod work_items;
32
33pub use crate::errors::{Error, ErrorKind, Result};
34#[cfg(feature = "httpcache")]
35pub use crate::http_cache::{BoxedHttpCache, HttpCache};
36
37use crate::projects::{Project, Projects};
38use crate::repository::{Repositories, Repository};
39use crate::work_items::{WorkItem, WorkItems};
40
41const DEFAULT_HOST: &str = "https://dev.azure.com";
42pub type Future<T> = Pin<Box<dyn StdFuture<Output = Result<T>> + Send>>;
44
45pub type Stream<T> = Pin<Box<dyn StdStream<Item = Result<T>> + Send>>;
47
48const X_RATELIMIT_LIMIT: &str = "x-ratelimit-limit";
52const X_RATELIMIT_REMAINING: &str = "x-ratelimit-remaining";
53const X_RATELIMIT_RESET: &str = "x-ratelimit-reset";
54
55#[derive(Clone, Copy)]
56pub enum MediaType {
57 Json,
59 JsonPatch,
61}
62
63impl Default for MediaType {
64 fn default() -> MediaType {
65 MediaType::Json
66 }
67}
68
69impl From<MediaType> for Mime {
70 fn from(media: MediaType) -> Mime {
71 match media {
72 MediaType::Json => "application/json".parse().unwrap(),
73 MediaType::JsonPatch => "application/json-patch+json".parse().unwrap(),
74 }
75 }
76}
77
78#[derive(Clone, Copy, Debug, PartialEq)]
80pub enum AuthenticationConstraint {
81 Unconstrained,
83}
84
85#[derive(Clone, Copy, Debug, PartialEq)]
87pub enum SortDirection {
88 Asc,
90 Desc,
92}
93
94impl fmt::Display for SortDirection {
95 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96 match *self {
97 SortDirection::Asc => "asc",
98 SortDirection::Desc => "desc",
99 }
100 .fmt(f)
101 }
102}
103
104impl Default for SortDirection {
105 fn default() -> SortDirection {
106 SortDirection::Asc
107 }
108}
109
110#[derive(Clone, Copy, Debug, PartialEq)]
113pub enum ApiVersion {
114 V5_1,
115 V5_0,
116 V7_1Preview
117}
118
119impl fmt::Display for ApiVersion {
120 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121 match *self {
122 ApiVersion::V5_1 => "api-version=5.1",
123 ApiVersion::V5_0 => "api-version=5.0",
124 ApiVersion::V7_1Preview => "api-version=7.1-preview"
125 }
126 .fmt(f)
127 }
128}
129
130impl Default for ApiVersion {
131 fn default() -> ApiVersion {
132 ApiVersion::V5_1
133 }
134}
135
136#[derive(Debug, PartialEq, Clone)]
138pub enum Credentials {
139 Token(String),
142 Basic(String),
144 Client(String, String),
147}
148
149#[derive(Clone, Debug)]
151pub struct AzureClient {
152 host: String,
153 agent: String,
154 org: String,
155 client: Client,
156 credentials: Option<Credentials>,
157 #[cfg(feature = "httpcache")]
158 http_cache: BoxedHttpCache,
159 api_version: ApiVersion,
160}
161
162impl AzureClient {
163 pub fn new<A, O, C>(agent: A, org: O, credentials: C) -> Result<Self>
164 where
165 A: Into<String>,
166 O: Into<String>,
167 C: Into<Option<Credentials>>,
168 {
169 Self::host(DEFAULT_HOST, agent, org, credentials)
170 }
171
172 pub fn host<H, O, A, C>(host: H, agent: A, org: O, credentials: C) -> Result<Self>
173 where
174 H: Into<String>,
175 A: Into<String>,
176 O: Into<String>,
177 C: Into<Option<Credentials>>,
178 {
179 let http = Client::builder().build()?;
180 #[cfg(feature = "httpcache")]
181 {
182 Ok(Self::custom(
183 host,
184 agent,
185 org,
186 credentials,
187 http,
188 HttpCache::noop(),
189 ))
190 }
191 #[cfg(not(feature = "httpcache"))]
192 {
193 Ok(Self::custom(host, agent, org, credentials, http))
194 }
195 }
196
197 #[cfg(feature = "httpcache")]
198 pub fn custom<H, A, O, CR>(
199 host: H,
200 agent: A,
201 org: O,
202 credentials: CR,
203 http: Client,
204 http_cache: BoxedHttpCache,
205 ) -> Self
206 where
207 H: Into<String>,
208 A: Into<String>,
209 O: Into<String>,
210 CR: Into<Option<Credentials>>,
211 {
212 Self {
213 host: host.into(),
214 agent: agent.into(),
215 org: org.into(),
216 client: http,
217 credentials: credentials.into(),
218 http_cache,
219 api_version: ApiVersion::default(),
220 }
221 }
222
223 #[cfg(not(feature = "httpcache"))]
224 pub fn custom<H, A, O, CR>(host: H, agent: A, org: O, credentials: CR, http: Client) -> Self
225 where
226 H: Into<String>,
227 A: Into<String>,
228 O: Into<String>,
229 CR: Into<Option<Credentials>>,
230 {
231 Self {
232 host: host.into(),
233 agent: agent.into(),
234 org: org.into(),
235 client: http,
236 credentials: credentials.into(),
237 api_version: ApiVersion::default(),
238 }
239 }
240
241 pub fn set_credentials<CR>(&mut self, credentials: CR)
242 where
243 CR: Into<Option<Credentials>>,
244 {
245 self.credentials = credentials.into();
246 }
247 pub fn set_api_version<V>(&mut self, version: V)
248 where
249 V: Into<ApiVersion>,
250 {
251 self.api_version = version.into();
252 }
253
254 pub fn set_host<H>(&mut self, host: H)
255 where
256 H: Into<String>,
257 {
258 self.host = host.into();
259 }
260
261 pub fn set_organization<O>(&mut self, org: O)
262 where
263 O: Into<String>,
264 {
265 self.org = org.into();
266 }
267
268 pub async fn work_item(&self, id: usize) -> Result<WorkItem> {
269 Ok(self.get(&format!("/{}/_apis/wit/workItems/{}", self.org, id)).await?)
270 }
271
272 pub async fn query_work_items(&self, query: &str) -> Result<Vec<WorkItem>> {
273 let work_items_refs: WorkItems = self.post(&format!("/{}/_apis/wit/wiql", self.org), query.as_bytes().into()).await?;
274 let mut work_items = Vec::new();
275 for work_item in work_items_refs.work_items {
276 work_items.push(self.work_item(work_item.id).await?);
277 }
278 Ok(work_items)
279 }
280
281 pub async fn work_items(&self) -> Result<Vec<WorkItem>> {
282 let query = r#"{ "query": "Select * From WorkItems" }"#;
283 self.query_work_items(&query).await
284 }
285
286 pub fn projects(&self) -> Projects {
287 Projects::new(self.clone())
288 }
289
290 pub fn project<P>(&self, project: P) -> Project
291 where
292 P: Into<String>,
293 {
294 Project::new(self.clone(), project)
295 }
296
297 pub fn repo<P, R>(&self, project: P, repo: R) -> Repository
298 where
299 P: Into<String>,
300 R: Into<String>,
301 {
302 Repository::new(self.clone(), project, repo)
303 }
304
305 pub fn org_repos(&self) {
309 }
311
312 pub fn repos<P>(&self, project: P) -> Repositories
316 where
317 P: Into<String>,
318 {
319 Repositories::new(self.clone(), project)
320 }
321
322 fn credentials(&self, authentication: AuthenticationConstraint) -> Option<&Credentials> {
323 match (authentication, self.credentials.as_ref()) {
324 (AuthenticationConstraint::Unconstrained, creds) => creds,
325 }
326 }
327
328 fn url_and_auth(
329 &self,
330 uri: &str,
331 authentication: AuthenticationConstraint,
332 ) -> Future<(Url, Option<String>)> {
333 let mut m = uri.to_owned();
334 m.push_str(&format!("?{}", self.api_version.to_string()));
335 let parsed_url = m.parse::<Url>();
336
337 match self.credentials(authentication) {
338 Some(&Credentials::Client(ref id, ref secret)) => Box::pin(future::ready(
339 parsed_url
340 .map(|mut u| {
341 u.query_pairs_mut()
342 .append_pair("client_id", id)
343 .append_pair("client_secret", secret);
344 (u, None)
345 })
346 .map_err(Error::from),
347 )),
348 Some(&Credentials::Token(ref token)) => {
349 let auth = format!("token {}", token);
350 Box::pin(future::ready(
351 parsed_url.map(|u| (u, Some(auth))).map_err(Error::from),
352 ))
353 }
354 Some(&Credentials::Basic(ref token)) => {
355
356 let b = base64::encode(format!("pat:{}", token));
357 let auth = format!("Basic {}", b);
358 Box::pin(future::ready(
359 parsed_url.map(|u| (u, Some(auth))).map_err(Error::from),
360 ))
361 }
362 None => Box::pin(future::ready(
363 parsed_url.map(|u| (u, None)).map_err(Error::from),
364 )),
365 }
366 }
367
368 fn request<Out>(
369 &self,
370 method: Method,
371 uri: &str,
372 body: Option<Vec<u8>>,
373 media_type: MediaType,
374 authentication: AuthenticationConstraint,
375 ) -> Future<(Option<Link>, Out)>
376 where
377 Out: DeserializeOwned + 'static + Send,
378 {
379 let url_and_auth = self.url_and_auth(uri, authentication);
380
381 let instance = self.clone();
382 #[cfg(feature = "httpcache")]
383 let uri2 = uri.to_string();
384 let body2 = body.clone();
385 let method2 = method.clone();
386 let response = url_and_auth
387 .map_err(Error::from)
388 .and_then(move |(url, auth)| {
389 #[cfg(not(feature = "httpcache"))]
390 let mut req = instance.client.request(method2, url);
391
392 #[cfg(feature = "httpcache")]
393 let mut req = {
394 let mut req = instance.client.request(method2.clone(), url);
395 if method2 == Method::GET {
396 if let Ok(etag) = instance.http_cache.lookup_etag(&uri2) {
397 req = req.header(IF_NONE_MATCH, etag);
398 }
399 }
400 req
401 };
402
403 req = req.header(USER_AGENT, &*instance.agent);
404 req = req.header(
405 ACCEPT,
406 &*format!("{}", qitem::<Mime>(From::from(media_type))),
407 );
408 req = req.header(
409 "Content-Type",
410 &*format!("{}", qitem::<Mime>(From::from(media_type))),
411 );
412
413 if let Some(auth_str) = auth {
414 req = req.header(AUTHORIZATION, &*auth_str);
415 }
416
417 trace!("Body: {:?}", &body2);
418 if let Some(body) = body2 {
419 req = req.body(Body::from(body));
420 }
421 debug!("Request: {:?}", &req);
422 req.send().map_err(Error::from)
423 });
424
425 #[cfg(feature = "httpcache")]
426 let instance2 = self.clone();
427
428 #[cfg(feature = "httpcache")]
429 let uri3 = uri.to_string();
430 Box::pin(response.and_then(move |response| {
431 #[cfg(not(feature = "httpcache"))]
432 let (remaining, reset) = get_header_values(response.headers());
433 #[cfg(feature = "httpcache")]
434 let (remaining, reset, etag) = get_header_values(response.headers());
435
436 let status = response.status();
437 let link = response
438 .headers()
439 .get(LINK)
440 .and_then(|l| l.to_str().ok())
441 .and_then(|l| l.parse().ok());
442
443 Box::pin(
444 response
445 .bytes()
446 .map_err(Error::from)
447 .and_then(move |response_body| async move {
448 if status.is_success() {
449 debug!(
450 "response payload {}",
451 String::from_utf8_lossy(&response_body)
452 );
453 #[cfg(feature = "httpcache")]
454 {
455 if let Some(etag) = etag {
456 let next_link = link.as_ref().and_then(|l| next_link(&l));
457 if let Err(e) = instance2.http_cache.cache_response(
458 &uri3,
459 &response_body,
460 &etag,
461 &next_link,
462 ) {
463 debug!("Failed to cache body & etag: {}", e);
465 }
466 }
467 }
468 let parsed_response : std::result::Result<Out, serde_json::error::Error> = if status == StatusCode::NO_CONTENT { serde_json::from_str("null") } else { serde_json::from_slice::<Out>(&response_body) };
469 parsed_response
470 .map(|out| (link, out))
471 .map_err(|error| ErrorKind::Codec(error).into())
472 } else if status == StatusCode::NOT_MODIFIED {
473 #[cfg(feature = "httpcache")]
476 {
477 instance2
478 .http_cache
479 .lookup_body(&uri3)
480 .map_err(Error::from)
481 .and_then(|body| {
482 serde_json::from_str::<Out>(&body)
483 .map_err(Error::from)
484 .and_then(|out| {
485 let link = match link {
486 Some(link) => Ok(Some(link)),
487 None => instance2
488 .http_cache
489 .lookup_next_link(&uri3)
490 .map(|next_link| next_link.map(|next| {
491 let next = LinkValue::new(next).push_rel(RelationType::Next);
492 Link::new(vec![next])
493 }))
494 };
495 link.map(|link| (link, out))
496 })
497 })
498 }
499 #[cfg(not(feature = "httpcache"))]
500 {
501 unreachable!("this should not be reachable without the httpcache feature enabled")
502 }
503 } else {
504 let error = match (remaining, reset) {
505 (Some(remaining), Some(reset)) if remaining == 0 => {
506 let now = SystemTime::now()
507 .duration_since(UNIX_EPOCH)
508 .unwrap()
509 .as_secs();
510 ErrorKind::RateLimit {
511 reset: Duration::from_secs(u64::from(reset) - now),
512 }
513 }
514 _ => ErrorKind::Fault {
515 code: status,
516 error: serde_json::from_slice(&response_body)?,
517 },
518 };
519 Err(error.into())
520 }
521 }),
522 )
523 }))
524 }
525
526 fn request_entity<D>(
527 &self,
528 method: Method,
529 uri: &str,
530 body: Option<Vec<u8>>,
531 media_type: MediaType,
532 authentication: AuthenticationConstraint,
533 ) -> Future<D>
534 where
535 D: DeserializeOwned + 'static + Send,
536 {
537 Box::pin(
538 self.request(method, uri, body, media_type, authentication)
539 .map_ok(|(_, entity)| entity),
540 )
541 }
542
543 fn get<D>(&self, uri: &str) -> Future<D>
544 where
545 D: DeserializeOwned + 'static + Send,
546 {
547 self.get_media(uri, MediaType::Json)
548 }
549
550 fn get_media<D>(&self, uri: &str, media: MediaType) -> Future<D>
551 where
552 D: DeserializeOwned + 'static + Send,
553 {
554 let uri = self.host.clone() + uri;
555 self.request_entity(
556 Method::GET,
557 &uri,
558 None,
559 media,
560 AuthenticationConstraint::Unconstrained,
561 )
562 }
563
564 fn delete<D>(&self, uri: &str) -> Future<D>
566 where
567 D: DeserializeOwned + 'static + Send,
568 {
569 self.request_entity(
570 Method::DELETE,
571 &(self.host.clone() + uri),
572 None,
573 MediaType::Json,
574 AuthenticationConstraint::Unconstrained,
575 )
576 }
577
578 fn post<D>(&self, uri: &str, message: Vec<u8>) -> Future<D>
579 where
580 D: DeserializeOwned + 'static + Send,
581 {
582 self.post_media(
583 uri,
584 message,
585 MediaType::Json,
586 AuthenticationConstraint::Unconstrained,
587 )
588 }
589
590 fn post_media<D>(
591 &self,
592 uri: &str,
593 message: Vec<u8>,
594 media: MediaType,
595 authentication: AuthenticationConstraint,
596 ) -> Future<D>
597 where
598 D: DeserializeOwned + 'static + Send,
599 {
600 self.request_entity(
601 Method::POST,
602 &(self.host.clone() + uri),
603 Some(message),
604 media,
605 authentication,
606 )
607 }
608
609 fn patch_media<D>(&self, uri: &str, message: Vec<u8>, media: MediaType) -> Future<D>
610 where
611 D: DeserializeOwned + 'static + Send,
612 {
613 self.request_entity(
614 Method::PATCH,
615 &(self.host.clone() + uri),
616 Some(message),
617 media,
618 AuthenticationConstraint::Unconstrained,
619 )
620 }
621
622 fn patch<D>(&self, uri: &str, message: Vec<u8>) -> Future<D>
623 where
624 D: DeserializeOwned + 'static + Send,
625 {
626 self.patch_media(uri, message, MediaType::Json)
627 }
628}
629
630#[allow(dead_code)]
631fn next_link(l: &Link) -> Option<String> {
632 l.values()
633 .into_iter()
634 .find(|v| v.rel().unwrap_or(&[]).get(0) == Some(&RelationType::Next))
635 .map(|v| v.link().to_owned())
636}
637
638#[cfg(not(feature = "httpcache"))]
639type HeaderValues = (Option<u32>, Option<u32>);
640#[cfg(feature = "httpcache")]
641type HeaderValues = (Option<u32>, Option<u32>, Option<Vec<u8>>);
642
643fn get_header_values(headers: &HeaderMap<HeaderValue>) -> HeaderValues {
645 if let Some(value) = headers.get(X_RATELIMIT_LIMIT) {
646 debug!("x-rate-limit-limit: {:?}", value)
647 }
648 let remaining = headers
649 .get(X_RATELIMIT_REMAINING)
650 .and_then(|val| val.to_str().ok())
651 .and_then(|val| val.parse::<u32>().ok());
652 let reset = headers
653 .get(X_RATELIMIT_RESET)
654 .and_then(|val| val.to_str().ok())
655 .and_then(|val| val.parse::<u32>().ok());
656 if let Some(value) = remaining {
657 debug!("x-rate-limit-remaining: {}", value)
658 }
659 if let Some(value) = reset {
660 debug!("x-rate-limit-reset: {}", value)
661 }
662 let etag = headers.get(ETAG);
663 if let Some(value) = etag {
664 debug!("etag: {:?}", value)
665 }
666
667 #[cfg(feature = "httpcache")]
668 {
669 let etag = etag.map(|etag| etag.as_bytes().to_vec());
670 (remaining, reset, etag)
671 }
672 #[cfg(not(feature = "httpcache"))]
673 (remaining, reset)
674}
675
676#[cfg(test)]
677mod tests {
678 use super::*;
679
680 #[test]
681 fn default_sort_direction() {
682 let default: SortDirection = Default::default();
683 assert_eq!(default, SortDirection::Asc)
684 }
685
686 #[test]
687 #[cfg(not(feature = "httpcache"))]
688 fn header_values() {
689 let empty = HeaderMap::new();
690 let actual = get_header_values(&empty);
691 let expected = (None, None);
692 assert_eq!(actual, expected);
693
694 let mut all_valid = HeaderMap::new();
695 all_valid.insert(X_RATELIMIT_REMAINING, HeaderValue::from_static("1234"));
696 all_valid.insert(X_RATELIMIT_RESET, HeaderValue::from_static("5678"));
697 let actual = get_header_values(&all_valid);
698 let expected = (Some(1234), Some(5678));
699 assert_eq!(actual, expected);
700
701 let mut invalid = HeaderMap::new();
702 invalid.insert(X_RATELIMIT_REMAINING, HeaderValue::from_static("foo"));
703 invalid.insert(X_RATELIMIT_RESET, HeaderValue::from_static("bar"));
704 let actual = get_header_values(&invalid);
705 let expected = (None, None);
706 assert_eq!(actual, expected);
707 }
708
709 #[test]
710 #[cfg(feature = "httpcache")]
711 fn header_values() {
712 let empty = HeaderMap::new();
713 let actual = get_header_values(&empty);
714 let expected = (None, None, None);
715 assert_eq!(actual, expected);
716
717 let mut all_valid = HeaderMap::new();
718 all_valid.insert(X_RATELIMIT_REMAINING, HeaderValue::from_static("1234"));
719 all_valid.insert(X_RATELIMIT_RESET, HeaderValue::from_static("5678"));
720 all_valid.insert(ETAG, HeaderValue::from_static("foobar"));
721 let actual = get_header_values(&all_valid);
722 let expected = (Some(1234), Some(5678), Some(b"foobar".to_vec()));
723 assert_eq!(actual, expected);
724
725 let mut invalid = HeaderMap::new();
726 invalid.insert(X_RATELIMIT_REMAINING, HeaderValue::from_static("foo"));
727 invalid.insert(X_RATELIMIT_RESET, HeaderValue::from_static("bar"));
728 invalid.insert(ETAG, HeaderValue::from_static(""));
729 let actual = get_header_values(&invalid);
730 let expected = (None, None, Some(Vec::new()));
731 assert_eq!(actual, expected);
732 }
733}