evg_api_rs/
lib.rs

1pub mod models;
2
3use async_stream::stream;
4use async_trait::async_trait;
5use futures::stream::Stream;
6use futures::stream::StreamExt;
7use models::stats::EvgTaskStats;
8use models::stats::EvgTaskStatsRequest;
9use models::stats::EvgTestStats;
10use models::stats::EvgTestStatsRequest;
11use models::version::EvgVersion;
12use models::{build::EvgBuild, patch::EvgPatch};
13use models::{task::EvgTask, test::EvgTest};
14use reqwest::{
15    header::{HeaderMap, HeaderValue, LINK},
16    Client, Response,
17};
18use serde::Deserialize;
19use std::path::Path;
20use std::pin::Pin;
21use std::{error::Error, fs};
22
23pub type BoxedStream<T> = Pin<Box<dyn Stream<Item = T>>>;
24pub type EvgError = Box<dyn Error + Sync + Send>;
25
26const DEFAULT_CONFIG_FILE: &str = ".evergreen.yml";
27
28#[derive(Debug, Deserialize, Clone)]
29struct EvergreenConfigFile {
30    pub user: String,
31    pub api_key: String,
32    pub api_server_host: String,
33}
34
35fn get_evg_config(path: &Path) -> Result<EvergreenConfigFile, EvgError> {
36    let contents = fs::read_to_string(&path)?;
37    let evg_config: EvergreenConfigFile = serde_yaml::from_str(&contents)?;
38    Ok(evg_config)
39}
40
41#[async_trait]
42pub trait EvgApiClient: Sync + Send {
43    /// Get details about the given task.
44    async fn get_task(&self, task_id: &str) -> Result<EvgTask, EvgError>;
45    /// Get details about the given version.
46    async fn get_version(&self, version_id: &str) -> Result<EvgVersion, EvgError>;
47    /// Get details about the given build.
48    async fn get_build(&self, build_id: &str) -> Result<Option<EvgBuild>, EvgError>;
49    /// Get the tests belonging to the given task.
50    async fn get_tests(&self, task_id: &str) -> Result<Vec<EvgTest>, EvgError>;
51    /// Get test stats for the given query.
52    async fn get_test_stats(
53        &self,
54        project_id: &str,
55        query: &EvgTestStatsRequest,
56    ) -> Result<Vec<EvgTestStats>, EvgError>;
57    /// Get task stats for the given query.
58    async fn get_task_stats(
59        &self,
60        project_id: &str,
61        query: &EvgTaskStatsRequest,
62    ) -> Result<Vec<EvgTaskStats>, EvgError>;
63    /// Stream version of an evergreen project.
64    fn stream_versions(&self, project_id: &str) -> BoxedStream<EvgVersion>;
65    /// Stream user patches of an evergreen project.
66    fn stream_user_patches(&self, user_id: &str, limit: Option<usize>) -> BoxedStream<EvgPatch>;
67    /// Stream patches of an evergreen project.
68    fn stream_project_patches(
69        &self,
70        project_id: &str,
71        limit: Option<usize>,
72    ) -> BoxedStream<EvgPatch>;
73    /// Stream tasks of an evergreen build.
74    fn stream_build_tasks(&self, build_id: &str, status: Option<&str>) -> BoxedStream<EvgTask>;
75    /// Stream the contents of a task level log.
76    fn stream_log(&self, task: &EvgTask, log_name: &str) -> BoxedStream<String>;
77    /// Stream the contents of a test level log.
78    fn stream_test_log(&self, test: &EvgTest) -> BoxedStream<String>;
79}
80
81#[derive(Clone)]
82pub struct EvgClient {
83    evg_config: EvergreenConfigFile,
84    client: Client,
85}
86
87impl EvgClient {
88    /// Create a new EvgClient based on the default evergreen auth file location (~/.evergreen.yml).
89    pub fn new() -> Result<EvgClient, EvgError> {
90        let home = std::env::var("HOME")?;
91        let path = format!("{}/{}", home, DEFAULT_CONFIG_FILE);
92        Self::from_file(Path::new(&path))
93    }
94
95    /// Create a new EvgClient based on the evergreen auth file at the provided location.
96    pub fn from_file(config_file: &Path) -> Result<EvgClient, EvgError> {
97        let evg_config = get_evg_config(config_file)?;
98        let mut headers = HeaderMap::new();
99        headers.insert("Api-User", HeaderValue::from_str(&evg_config.user)?);
100        headers.insert("Api-Key", HeaderValue::from_str(&evg_config.api_key)?);
101        let client = reqwest::Client::builder()
102            .default_headers(headers)
103            .build()?;
104
105        Ok(EvgClient { evg_config, client })
106    }
107
108    fn build_url(&self, endpoint: &str, arg: &str) -> String {
109        format!(
110            "{}/rest/v2/{}/{}",
111            self.evg_config.api_server_host, endpoint, arg
112        )
113    }
114}
115
116#[async_trait]
117impl EvgApiClient for EvgClient {
118    async fn get_task(&self, task_id: &str) -> Result<EvgTask, EvgError> {
119        let url = self.build_url("tasks", task_id);
120        let response = self.client.get(&url).send().await?;
121        Ok(response.json().await?)
122    }
123
124    async fn get_version(&self, version_id: &str) -> Result<EvgVersion, EvgError> {
125        let url = self.build_url("versions", version_id);
126        let response = self.client.get(&url).send().await?;
127        Ok(response.json().await?)
128    }
129
130    async fn get_build(&self, build_id: &str) -> Result<Option<EvgBuild>, EvgError> {
131        let url = self.build_url("builds", build_id);
132        let response = self.client.get(&url).send().await?;
133        if response.status() == 404 {
134            Ok(None)
135        } else {
136            Ok(Some(response.json().await?))
137        }
138    }
139
140    async fn get_tests(&self, task_id: &str) -> Result<Vec<EvgTest>, EvgError> {
141        let url = format!("{}/tests", self.build_url("tasks", task_id));
142        let mut results: Vec<EvgTest> = vec![];
143        let mut response = self.client.get(&url).send().await?;
144        loop {
145            let next_link = next_link(&response);
146            let result_batch: Vec<EvgTest> = response.json().await?;
147            results.extend(result_batch);
148
149            if let Some(next) = next_link {
150                response = self.client.get(&next).send().await?;
151            } else {
152                break;
153            }
154        }
155        Ok(results)
156    }
157
158    async fn get_test_stats(
159        &self,
160        project_id: &str,
161        query: &EvgTestStatsRequest,
162    ) -> Result<Vec<EvgTestStats>, EvgError> {
163        let url = format!("{}/test_stats", self.build_url("projects", project_id));
164        let response = self.client.get(&url).query(query).send().await?;
165
166        Ok(response.json().await?)
167    }
168
169    async fn get_task_stats(
170        &self,
171        project_id: &str,
172        query: &EvgTaskStatsRequest,
173    ) -> Result<Vec<EvgTaskStats>, EvgError> {
174        let url = format!("{}/task_stats", self.build_url("projects", project_id));
175        let response = self.client.get(&url).query(query).send().await?;
176        Ok(response.json().await?)
177    }
178
179    fn stream_versions(&self, project_id: &str) -> BoxedStream<EvgVersion> {
180        let url = format!(
181            "{}/versions?requester=gitter_request",
182            self.build_url("projects", project_id)
183        );
184        let client = self.client.clone();
185
186        Box::pin(stream! {
187            let mut response = client.get(&url).send().await.unwrap();
188            loop {
189                let next_link = next_link(&response);
190                let result_batch: Vec<EvgVersion> = response.json().await.unwrap();
191                for version in result_batch {
192                    yield version;
193                }
194
195                if let Some(next) = next_link {
196                    response = client.get(&next).send().await.unwrap();
197                } else {
198                    break;
199                }
200            }
201        })
202    }
203
204    fn stream_user_patches(&self, user_id: &str, limit: Option<usize>) -> BoxedStream<EvgPatch> {
205        let mut url = format!("{}/patches", self.build_url("users", user_id));
206        if let Some(l) = limit {
207            url = format!("{}?limit={}", url, l);
208        }
209        let client = self.client.clone();
210
211        Box::pin(stream! {
212            let mut response = client.get(&url).send().await.unwrap();
213            loop {
214                let next_link = next_link(&response);
215                let result_batch: Vec<EvgPatch> = response.json().await.unwrap();
216                for patch in result_batch {
217                    yield patch;
218                }
219
220                if let Some(next) = next_link {
221                    response = client.get(&next).send().await.unwrap();
222                } else {
223                    break;
224                }
225            }
226        })
227    }
228
229    fn stream_project_patches(
230        &self,
231        project_id: &str,
232        limit: Option<usize>,
233    ) -> BoxedStream<EvgPatch> {
234        let mut url = format!("{}/patches", self.build_url("projects", project_id));
235        if let Some(l) = limit {
236            url = format!("{}?limit={}", url, l);
237        }
238        let client = self.client.clone();
239
240        Box::pin(stream! {
241            let mut response = client.get(&url).send().await.unwrap();
242            loop {
243                let next_link = next_link(&response);
244                let result_batch: Vec<EvgPatch> = response.json().await.unwrap();
245                for patch in result_batch {
246                    yield patch;
247                }
248
249                if let Some(next) = next_link {
250                    response = client.get(&next).send().await.unwrap();
251                } else {
252                    break;
253                }
254            }
255        })
256    }
257
258    fn stream_build_tasks(&self, build_id: &str, status: Option<&str>) -> BoxedStream<EvgTask> {
259        let mut url = format!("{}/tasks", self.build_url("builds", build_id));
260        if let Some(s) = status {
261            url = format!("{}?status={}", url, s);
262        }
263        let client = self.client.clone();
264
265        Box::pin(stream! {
266            let mut response = client.get(&url).send().await.unwrap();
267            loop {
268                let next_link = next_link(&response);
269                let result_batch: Vec<EvgTask> = response.json().await.unwrap();
270                for patch in result_batch {
271                    yield patch;
272                }
273
274                if let Some(next) = next_link {
275                    response = client.get(&next).send().await.unwrap();
276                } else {
277                    break;
278                }
279            }
280        })
281    }
282
283    fn stream_log(&self, task: &EvgTask, log_name: &str) -> BoxedStream<String> {
284        let task_log = format!("{}&text=true", task.logs.get(log_name).unwrap());
285        let stream_future = self.client.get(&task_log).send();
286        Box::pin(stream! {
287            let mut stream = stream_future.await.unwrap().bytes_stream();
288            while let Some(item) = stream.next().await {
289                match item {
290                    Ok(bytes) => {
291                        let lines = std::str::from_utf8(&bytes).unwrap().split('\n');
292                        for l in lines {
293                            yield l.to_string();
294                        }
295                    }
296                    _ => break,
297                }
298            }
299        })
300    }
301
302    fn stream_test_log(&self, test: &EvgTest) -> BoxedStream<String> {
303        let stream_future = self.client.get(&test.logs.url_raw).send();
304
305        Box::pin(stream! {
306            let mut stream = stream_future.await.unwrap().bytes_stream();
307            while let Some(item) = stream.next().await {
308                match item {
309                    Ok(bytes) => {
310                        let lines = std::str::from_utf8(&bytes).unwrap().split('\n');
311                        for l in lines {
312                            yield l.to_string();
313                        }
314                    }
315                    _ => break,
316                }
317            }
318        })
319    }
320}
321
322fn next_link(response: &Response) -> Option<String> {
323    if let Some(header) = response.headers().get(LINK) {
324        let links = parse_link_header::parse(header.to_str().unwrap()).unwrap();
325        let next_link = links.get(&Some("next".to_string()));
326
327        return next_link.map(|l| l.uri.to_string());
328    }
329    None
330}
331
332#[cfg(test)]
333mod tests {
334    use super::*;
335    use http::response::Builder;
336    use reqwest::Response;
337
338    #[test]
339    fn test_next_link_should_return_link_if_it_exists() {
340        let response = Response::from(Builder::new()
341            .status(200)
342            .header(LINK, String::from("<https://evergreen.mongodb.com/rest/v2/tasks/task_id/tests?limit=100&start_at=abc123>; rel=\"next\""))
343            .body("foo")
344            .unwrap());
345
346        let next_link = next_link(&response);
347
348        assert_eq!(next_link, Some(String::from("https://evergreen.mongodb.com/rest/v2/tasks/task_id/tests?limit=100&start_at=abc123")));
349    }
350
351    #[test]
352    fn test_next_link_should_return_none_if_no_next() {
353        let response = Response::from(Builder::new()
354            .status(200)
355            .header(LINK, String::from("<https://evergreen.mongodb.com/rest/v2/tasks/task_id/tests?limit=100&start_at=abc123>; rel=\"other\""))
356            .body("foo")
357            .unwrap());
358
359        let next_link = next_link(&response);
360
361        assert_eq!(next_link, None);
362    }
363
364    #[test]
365    fn test_next_link_should_return_none_if_no_link_header() {
366        let response = Response::from(Builder::new()
367            .status(200)
368            .header("something", String::from("<https://evergreen.mongodb.com/rest/v2/tasks/task_id/tests?limit=100&start_at=abc123>; rel=\"other\""))
369            .body("foo")
370            .unwrap());
371
372        let next_link = next_link(&response);
373
374        assert_eq!(next_link, None);
375    }
376}