azure_rust/
lib.rs

1#![allow(missing_docs)] // todo: make this a deny eventually
2
3use std::fmt;
4use std::pin::Pin;
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6
7use futures::{future, prelude::*, Future as StdFuture, Stream as StdStream};
8#[cfg(feature = "httpcache")]
9use http::header::IF_NONE_MATCH;
10use http::header::{HeaderMap, HeaderValue};
11use http::header::{ACCEPT, AUTHORIZATION, ETAG, LINK, USER_AGENT};
12use http::{Method, StatusCode};
13#[cfg(feature = "httpcache")]
14use hyperx::header::LinkValue;
15use hyperx::header::{qitem, Link, RelationType};
16use log::{debug, trace};
17use mime::Mime;
18use reqwest::Url;
19use reqwest::{Body, Client};
20use serde::de::DeserializeOwned;
21
22#[doc(hidden)] // public for doc testing and integration testing only
23#[cfg(feature = "httpcache")]
24pub mod http_cache;
25#[macro_use]
26mod macros; // expose json! macro to child modules
27pub mod errors;
28pub mod projects;
29pub mod repository;
30pub mod pull_requests;
31pub mod work_items;
32
33pub use crate::errors::{Error, ErrorKind, Result};
34#[cfg(feature = "httpcache")]
35pub use crate::http_cache::{BoxedHttpCache, HttpCache};
36
37use crate::projects::{Project, Projects};
38use crate::repository::{Repositories, Repository};
39use crate::work_items::{WorkItem, WorkItems};
40
41const DEFAULT_HOST: &str = "https://dev.azure.com";
42/// A type alias for `Futures` that may return `azure_rs::Errors`
43pub type Future<T> = Pin<Box<dyn StdFuture<Output = Result<T>> + Send>>;
44
45/// A type alias for `Streams` that may result in `azure_rs::Errors`
46pub type Stream<T> = Pin<Box<dyn StdStream<Item = Result<T>> + Send>>;
47
48/// Rate limiting
49///
50/// https://docs.microsoft.com/en-us/azure/devops/integrate/concepts/rate-limits?view=azure-devops#api-client-experience
51const X_RATELIMIT_LIMIT: &str = "x-ratelimit-limit";
52const X_RATELIMIT_REMAINING: &str = "x-ratelimit-remaining";
53const X_RATELIMIT_RESET: &str = "x-ratelimit-reset";
54
55#[derive(Clone, Copy)]
56pub enum MediaType {
57    /// Return json (the default)
58    Json,
59    /// Return json in preview form
60    JsonPatch,
61}
62
63impl Default for MediaType {
64    fn default() -> MediaType {
65        MediaType::Json
66    }
67}
68
69impl From<MediaType> for Mime {
70    fn from(media: MediaType) -> Mime {
71        match media {
72            MediaType::Json => "application/json".parse().unwrap(),
73            MediaType::JsonPatch => "application/json-patch+json".parse().unwrap(),
74        }
75    }
76}
77
78/// Controls what sort of authentication is required for this request
79#[derive(Clone, Copy, Debug, PartialEq)]
80pub enum AuthenticationConstraint {
81    /// No constraint
82    Unconstrained,
83}
84
85/// enum representation of Azure list sorting options
86#[derive(Clone, Copy, Debug, PartialEq)]
87pub enum SortDirection {
88    /// Sort in ascending order (the default)
89    Asc,
90    /// Sort in descending order
91    Desc,
92}
93
94impl fmt::Display for SortDirection {
95    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96        match *self {
97            SortDirection::Asc => "asc",
98            SortDirection::Desc => "desc",
99        }
100        .fmt(f)
101    }
102}
103
104impl Default for SortDirection {
105    fn default() -> SortDirection {
106        SortDirection::Asc
107    }
108}
109
110/// enum representation of Azure api versions
111/// by default 5.1
112#[derive(Clone, Copy, Debug, PartialEq)]
113pub enum ApiVersion {
114    V5_1,
115    V5_0,
116    V7_1Preview
117}
118
119impl fmt::Display for ApiVersion {
120    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121        match *self {
122            ApiVersion::V5_1 => "api-version=5.1",
123            ApiVersion::V5_0 => "api-version=5.0",
124            ApiVersion::V7_1Preview => "api-version=7.1-preview"
125        }
126        .fmt(f)
127    }
128}
129
130impl Default for ApiVersion {
131    fn default() -> ApiVersion {
132        ApiVersion::V5_1
133    }
134}
135
136/// Various forms of authentication credentials supported by Azure
137#[derive(Debug, PartialEq, Clone)]
138pub enum Credentials {
139    /// Oauth token string
140    /// https://developer.github.com/v3/#oauth2-token-sent-in-a-header
141    Token(String),
142    /// Basic authentication base64 encoded
143    Basic(String),
144    /// Oauth client id and secret
145    /// https://developer.github.com/v3/#oauth2-keysecret
146    Client(String, String),
147}
148
149/// Entry point interface for interacting with Azure API
150#[derive(Clone, Debug)]
151pub struct AzureClient {
152    host: String,
153    agent: String,
154    org: String,
155    client: Client,
156    credentials: Option<Credentials>,
157    #[cfg(feature = "httpcache")]
158    http_cache: BoxedHttpCache,
159    api_version: ApiVersion,
160}
161
162impl AzureClient {
163    pub fn new<A, O, C>(agent: A, org: O, credentials: C) -> Result<Self>
164    where
165        A: Into<String>,
166        O: Into<String>,
167        C: Into<Option<Credentials>>,
168    {
169        Self::host(DEFAULT_HOST, agent, org, credentials)
170    }
171
172    pub fn host<H, O, A, C>(host: H, agent: A, org: O, credentials: C) -> Result<Self>
173    where
174        H: Into<String>,
175        A: Into<String>,
176        O: Into<String>,
177        C: Into<Option<Credentials>>,
178    {
179        let http = Client::builder().build()?;
180        #[cfg(feature = "httpcache")]
181        {
182            Ok(Self::custom(
183                host,
184                agent,
185                org,
186                credentials,
187                http,
188                HttpCache::noop(),
189            ))
190        }
191        #[cfg(not(feature = "httpcache"))]
192        {
193            Ok(Self::custom(host, agent, org, credentials, http))
194        }
195    }
196
197    #[cfg(feature = "httpcache")]
198    pub fn custom<H, A, O, CR>(
199        host: H,
200        agent: A,
201        org: O,
202        credentials: CR,
203        http: Client,
204        http_cache: BoxedHttpCache,
205    ) -> Self
206    where
207        H: Into<String>,
208        A: Into<String>,
209        O: Into<String>,
210        CR: Into<Option<Credentials>>,
211    {
212        Self {
213            host: host.into(),
214            agent: agent.into(),
215            org: org.into(),
216            client: http,
217            credentials: credentials.into(),
218            http_cache,
219            api_version: ApiVersion::default(),
220        }
221    }
222
223    #[cfg(not(feature = "httpcache"))]
224    pub fn custom<H, A, O, CR>(host: H, agent: A, org: O, credentials: CR, http: Client) -> Self
225    where
226        H: Into<String>,
227        A: Into<String>,
228        O: Into<String>,
229        CR: Into<Option<Credentials>>,
230    {
231        Self {
232            host: host.into(),
233            agent: agent.into(),
234            org: org.into(),
235            client: http,
236            credentials: credentials.into(),
237            api_version: ApiVersion::default(),
238        }
239    }
240
241    pub fn set_credentials<CR>(&mut self, credentials: CR)
242    where
243        CR: Into<Option<Credentials>>,
244    {
245        self.credentials = credentials.into();
246    }
247    pub fn set_api_version<V>(&mut self, version: V)
248    where
249        V: Into<ApiVersion>,
250    {
251        self.api_version = version.into();
252    }
253
254    pub fn set_host<H>(&mut self, host: H)
255    where
256    H: Into<String>,
257    {
258        self.host = host.into();
259    }
260
261    pub fn set_organization<O>(&mut self, org: O)
262    where
263        O: Into<String>,
264    {
265        self.org = org.into();
266    }
267
268    pub async fn work_item(&self, id: usize) -> Result<WorkItem> {
269        Ok(self.get(&format!("/{}/_apis/wit/workItems/{}", self.org, id)).await?)
270    }
271
272    pub async fn query_work_items(&self, query: &str) -> Result<Vec<WorkItem>> {
273        let work_items_refs: WorkItems = self.post(&format!("/{}/_apis/wit/wiql", self.org), query.as_bytes().into()).await?;
274        let mut work_items = Vec::new();
275        for work_item in work_items_refs.work_items {
276            work_items.push(self.work_item(work_item.id).await?);
277        }
278        Ok(work_items)
279    }
280
281    pub async fn work_items(&self) -> Result<Vec<WorkItem>> {
282        let query = r#"{ "query": "Select * From WorkItems" }"#;
283        self.query_work_items(&query).await
284    }
285
286    pub fn projects(&self) -> Projects {
287        Projects::new(self.clone())
288    }
289
290    pub fn project<P>(&self, project: P) -> Project
291    where
292        P: Into<String>,
293    {
294        Project::new(self.clone(), project)
295    }
296
297    pub fn repo<P, R>(&self, project: P, repo: R) -> Repository
298    where
299        P: Into<String>,
300        R: Into<String>,
301    {
302        Repository::new(self.clone(), project, repo)
303    }
304
305    /// Get all repos in a organization
306    ///
307    /// GET https://dev.azure.com/{organization}/_apis/git/repositories?api-version=
308    pub fn org_repos(&self) {
309        // TODO
310    }
311
312    /// Get all repos in a project
313    ///
314    /// GET https://dev.azure.com/{organization}/{project}/_apis/git/repositories?api-version=5.1
315    pub fn repos<P>(&self, project: P) -> Repositories
316    where
317        P: Into<String>,
318    {
319        Repositories::new(self.clone(), project)
320    }
321
322    fn credentials(&self, authentication: AuthenticationConstraint) -> Option<&Credentials> {
323        match (authentication, self.credentials.as_ref()) {
324            (AuthenticationConstraint::Unconstrained, creds) => creds,
325        }
326    }
327
328    fn url_and_auth(
329        &self,
330        uri: &str,
331        authentication: AuthenticationConstraint,
332    ) -> Future<(Url, Option<String>)> {
333        let mut m = uri.to_owned();
334        m.push_str(&format!("?{}", self.api_version.to_string()));
335        let parsed_url = m.parse::<Url>();
336
337        match self.credentials(authentication) {
338            Some(&Credentials::Client(ref id, ref secret)) => Box::pin(future::ready(
339                parsed_url
340                    .map(|mut u| {
341                        u.query_pairs_mut()
342                            .append_pair("client_id", id)
343                            .append_pair("client_secret", secret);
344                        (u, None)	
345                    })
346                    .map_err(Error::from),
347            )),
348            Some(&Credentials::Token(ref token)) => {
349                let auth = format!("token {}", token);
350                Box::pin(future::ready(
351                    parsed_url.map(|u| (u, Some(auth))).map_err(Error::from),
352                ))
353            }
354            Some(&Credentials::Basic(ref token)) => {
355                
356                let b = base64::encode(format!("pat:{}", token));
357                let auth = format!("Basic {}", b);
358                Box::pin(future::ready(
359                    parsed_url.map(|u| (u, Some(auth))).map_err(Error::from),
360                ))
361            }
362            None => Box::pin(future::ready(
363                parsed_url.map(|u| (u, None)).map_err(Error::from),
364            )),
365        }
366    }
367
368    fn request<Out>(
369        &self,
370        method: Method,
371        uri: &str,
372        body: Option<Vec<u8>>,
373        media_type: MediaType,
374        authentication: AuthenticationConstraint,
375    ) -> Future<(Option<Link>, Out)>
376    where
377        Out: DeserializeOwned + 'static + Send,
378    {
379        let url_and_auth = self.url_and_auth(uri, authentication);
380
381        let instance = self.clone();
382        #[cfg(feature = "httpcache")]
383        let uri2 = uri.to_string();
384        let body2 = body.clone();
385        let method2 = method.clone();
386        let response = url_and_auth
387            .map_err(Error::from)
388            .and_then(move |(url, auth)| {
389                #[cfg(not(feature = "httpcache"))]
390                let mut req = instance.client.request(method2, url);
391
392                #[cfg(feature = "httpcache")]
393                let mut req = {
394                    let mut req = instance.client.request(method2.clone(), url);
395                    if method2 == Method::GET {
396                        if let Ok(etag) = instance.http_cache.lookup_etag(&uri2) {
397                            req = req.header(IF_NONE_MATCH, etag);
398                        }
399                    }
400                    req
401                };
402
403                req = req.header(USER_AGENT, &*instance.agent);
404                req = req.header(
405                    ACCEPT,
406                    &*format!("{}", qitem::<Mime>(From::from(media_type))),
407                );
408                req = req.header(
409                    "Content-Type",
410                    &*format!("{}", qitem::<Mime>(From::from(media_type))),
411                );
412
413                if let Some(auth_str) = auth {
414                    req = req.header(AUTHORIZATION, &*auth_str);
415                }
416
417                trace!("Body: {:?}", &body2);
418                if let Some(body) = body2 {
419                    req = req.body(Body::from(body));
420                }
421                debug!("Request: {:?}", &req);
422                req.send().map_err(Error::from)
423            });
424
425        #[cfg(feature = "httpcache")]
426        let instance2 = self.clone();
427
428        #[cfg(feature = "httpcache")]
429        let uri3 = uri.to_string();
430        Box::pin(response.and_then(move |response| {
431            #[cfg(not(feature = "httpcache"))]
432            let (remaining, reset) = get_header_values(response.headers());
433            #[cfg(feature = "httpcache")]
434            let (remaining, reset, etag) = get_header_values(response.headers());
435
436            let status = response.status();
437            let link = response
438                .headers()
439                .get(LINK)
440                .and_then(|l| l.to_str().ok())
441                .and_then(|l| l.parse().ok());
442
443            Box::pin(
444                response
445                    .bytes()
446                    .map_err(Error::from)
447                    .and_then(move |response_body| async move {
448                        if status.is_success() {
449                            debug!(
450                                "response payload {}",
451                                String::from_utf8_lossy(&response_body)
452                            );
453                            #[cfg(feature = "httpcache")]
454                            {
455                                if let Some(etag) = etag {
456                                    let next_link = link.as_ref().and_then(|l| next_link(&l));
457                                    if let Err(e) = instance2.http_cache.cache_response(
458                                        &uri3,
459                                        &response_body,
460                                        &etag,
461                                        &next_link,
462                                    ) {
463                                        // failing to cache isn't fatal, so just log & swallow the error
464                                        debug!("Failed to cache body & etag: {}", e);
465                                    }
466                                }
467                            }
468                            let parsed_response : std::result::Result<Out, serde_json::error::Error> = if status == StatusCode::NO_CONTENT { serde_json::from_str("null") } else { serde_json::from_slice::<Out>(&response_body) };
469                            parsed_response
470                                .map(|out| (link, out))
471                                .map_err(|error| ErrorKind::Codec(error).into())
472                        } else if status == StatusCode::NOT_MODIFIED {
473                            // only supported case is when client provides if-none-match
474                            // header when cargo builds with --cfg feature="httpcache"
475                            #[cfg(feature = "httpcache")]
476                            {
477                                instance2
478                                    .http_cache
479                                    .lookup_body(&uri3)
480                                    .map_err(Error::from)
481                                    .and_then(|body| {
482                                        serde_json::from_str::<Out>(&body)
483                                            .map_err(Error::from)
484                                            .and_then(|out| {
485                                                let link = match link {
486                                                    Some(link) => Ok(Some(link)),
487                                                    None => instance2
488                                                        .http_cache
489                                                        .lookup_next_link(&uri3)
490                                                        .map(|next_link| next_link.map(|next| {
491                                                            let next = LinkValue::new(next).push_rel(RelationType::Next);
492                                                            Link::new(vec![next])
493                                                        }))
494                                                };
495                                                link.map(|link| (link, out))
496                                            })
497                                    })
498                            }
499                            #[cfg(not(feature = "httpcache"))]
500                            {
501                                unreachable!("this should not be reachable without the httpcache feature enabled")
502                            }
503                        } else {
504                            let error = match (remaining, reset) {
505                                (Some(remaining), Some(reset)) if remaining == 0 => {
506                                    let now = SystemTime::now()
507                                        .duration_since(UNIX_EPOCH)
508                                        .unwrap()
509                                        .as_secs();
510                                    ErrorKind::RateLimit {
511                                        reset: Duration::from_secs(u64::from(reset) - now),
512                                    }
513                                }
514                                _ => ErrorKind::Fault {
515                                    code: status,
516                                    error: serde_json::from_slice(&response_body)?,
517                                },
518                            };
519                            Err(error.into())
520                        }
521                    }),
522            )
523        }))
524    }
525
526    fn request_entity<D>(
527        &self,
528        method: Method,
529        uri: &str,
530        body: Option<Vec<u8>>,
531        media_type: MediaType,
532        authentication: AuthenticationConstraint,
533    ) -> Future<D>
534    where
535        D: DeserializeOwned + 'static + Send,
536    {
537        Box::pin(
538            self.request(method, uri, body, media_type, authentication)
539                .map_ok(|(_, entity)| entity),
540        )
541    }
542
543    fn get<D>(&self, uri: &str) -> Future<D>
544    where
545        D: DeserializeOwned + 'static + Send,
546    {
547        self.get_media(uri, MediaType::Json)
548    }
549
550    fn get_media<D>(&self, uri: &str, media: MediaType) -> Future<D>
551    where
552        D: DeserializeOwned + 'static + Send,
553    {
554        let uri = self.host.clone() + uri;
555        self.request_entity(
556            Method::GET,
557            &uri,
558            None,
559            media,
560            AuthenticationConstraint::Unconstrained,
561        )
562    }
563
564    /// a delete request that returns a response
565    fn delete<D>(&self, uri: &str) -> Future<D>
566    where
567        D: DeserializeOwned + 'static + Send,
568    {
569        self.request_entity(
570            Method::DELETE,
571            &(self.host.clone() + uri),
572            None,
573            MediaType::Json,
574            AuthenticationConstraint::Unconstrained,
575        )
576    }
577
578    fn post<D>(&self, uri: &str, message: Vec<u8>) -> Future<D>
579    where
580        D: DeserializeOwned + 'static + Send,
581    {
582        self.post_media(
583            uri,
584            message,
585            MediaType::Json,
586            AuthenticationConstraint::Unconstrained,
587        )
588    }
589
590    fn post_media<D>(
591        &self,
592        uri: &str,
593        message: Vec<u8>,
594        media: MediaType,
595        authentication: AuthenticationConstraint,
596    ) -> Future<D>
597    where
598        D: DeserializeOwned + 'static + Send,
599    {
600        self.request_entity(
601            Method::POST,
602            &(self.host.clone() + uri),
603            Some(message),
604            media,
605            authentication,
606        )
607    }
608
609    fn patch_media<D>(&self, uri: &str, message: Vec<u8>, media: MediaType) -> Future<D>
610    where
611        D: DeserializeOwned + 'static + Send,
612    {
613        self.request_entity(
614            Method::PATCH,
615            &(self.host.clone() + uri),
616            Some(message),
617            media,
618            AuthenticationConstraint::Unconstrained,
619        )
620    }
621
622    fn patch<D>(&self, uri: &str, message: Vec<u8>) -> Future<D>
623    where
624        D: DeserializeOwned + 'static + Send,
625    {
626        self.patch_media(uri, message, MediaType::Json)
627    }
628}
629
630#[allow(dead_code)]
631fn next_link(l: &Link) -> Option<String> {
632    l.values()
633        .into_iter()
634        .find(|v| v.rel().unwrap_or(&[]).get(0) == Some(&RelationType::Next))
635        .map(|v| v.link().to_owned())
636}
637
638#[cfg(not(feature = "httpcache"))]
639type HeaderValues = (Option<u32>, Option<u32>);
640#[cfg(feature = "httpcache")]
641type HeaderValues = (Option<u32>, Option<u32>, Option<Vec<u8>>);
642
643/// [See docs](https://docs.microsoft.com/en-us/azure/azure-resource-manager/management/request-limits-and-throttling)
644fn get_header_values(headers: &HeaderMap<HeaderValue>) -> HeaderValues {
645    if let Some(value) = headers.get(X_RATELIMIT_LIMIT) {
646        debug!("x-rate-limit-limit: {:?}", value)
647    }
648    let remaining = headers
649        .get(X_RATELIMIT_REMAINING)
650        .and_then(|val| val.to_str().ok())
651        .and_then(|val| val.parse::<u32>().ok());
652    let reset = headers
653        .get(X_RATELIMIT_RESET)
654        .and_then(|val| val.to_str().ok())
655        .and_then(|val| val.parse::<u32>().ok());
656    if let Some(value) = remaining {
657        debug!("x-rate-limit-remaining: {}", value)
658    }
659    if let Some(value) = reset {
660        debug!("x-rate-limit-reset: {}", value)
661    }
662    let etag = headers.get(ETAG);
663    if let Some(value) = etag {
664        debug!("etag: {:?}", value)
665    }
666
667    #[cfg(feature = "httpcache")]
668    {
669        let etag = etag.map(|etag| etag.as_bytes().to_vec());
670        (remaining, reset, etag)
671    }
672    #[cfg(not(feature = "httpcache"))]
673    (remaining, reset)
674}
675
676#[cfg(test)]
677mod tests {
678    use super::*;
679
680    #[test]
681    fn default_sort_direction() {
682        let default: SortDirection = Default::default();
683        assert_eq!(default, SortDirection::Asc)
684    }
685
686    #[test]
687    #[cfg(not(feature = "httpcache"))]
688    fn header_values() {
689        let empty = HeaderMap::new();
690        let actual = get_header_values(&empty);
691        let expected = (None, None);
692        assert_eq!(actual, expected);
693
694        let mut all_valid = HeaderMap::new();
695        all_valid.insert(X_RATELIMIT_REMAINING, HeaderValue::from_static("1234"));
696        all_valid.insert(X_RATELIMIT_RESET, HeaderValue::from_static("5678"));
697        let actual = get_header_values(&all_valid);
698        let expected = (Some(1234), Some(5678));
699        assert_eq!(actual, expected);
700
701        let mut invalid = HeaderMap::new();
702        invalid.insert(X_RATELIMIT_REMAINING, HeaderValue::from_static("foo"));
703        invalid.insert(X_RATELIMIT_RESET, HeaderValue::from_static("bar"));
704        let actual = get_header_values(&invalid);
705        let expected = (None, None);
706        assert_eq!(actual, expected);
707    }
708
709    #[test]
710    #[cfg(feature = "httpcache")]
711    fn header_values() {
712        let empty = HeaderMap::new();
713        let actual = get_header_values(&empty);
714        let expected = (None, None, None);
715        assert_eq!(actual, expected);
716
717        let mut all_valid = HeaderMap::new();
718        all_valid.insert(X_RATELIMIT_REMAINING, HeaderValue::from_static("1234"));
719        all_valid.insert(X_RATELIMIT_RESET, HeaderValue::from_static("5678"));
720        all_valid.insert(ETAG, HeaderValue::from_static("foobar"));
721        let actual = get_header_values(&all_valid);
722        let expected = (Some(1234), Some(5678), Some(b"foobar".to_vec()));
723        assert_eq!(actual, expected);
724
725        let mut invalid = HeaderMap::new();
726        invalid.insert(X_RATELIMIT_REMAINING, HeaderValue::from_static("foo"));
727        invalid.insert(X_RATELIMIT_RESET, HeaderValue::from_static("bar"));
728        invalid.insert(ETAG, HeaderValue::from_static(""));
729        let actual = get_header_values(&invalid);
730        let expected = (None, None, Some(Vec::new()));
731        assert_eq!(actual, expected);
732    }
733}