1pub mod models;
2
3use async_stream::stream;
4use async_trait::async_trait;
5use futures::stream::Stream;
6use futures::stream::StreamExt;
7use models::stats::EvgTaskStats;
8use models::stats::EvgTaskStatsRequest;
9use models::stats::EvgTestStats;
10use models::stats::EvgTestStatsRequest;
11use models::version::EvgVersion;
12use models::{build::EvgBuild, patch::EvgPatch};
13use models::{task::EvgTask, test::EvgTest};
14use reqwest::{
15 header::{HeaderMap, HeaderValue, LINK},
16 Client, Response,
17};
18use serde::Deserialize;
19use std::path::Path;
20use std::pin::Pin;
21use std::{error::Error, fs};
22
23pub type BoxedStream<T> = Pin<Box<dyn Stream<Item = T>>>;
24pub type EvgError = Box<dyn Error + Sync + Send>;
25
26const DEFAULT_CONFIG_FILE: &str = ".evergreen.yml";
27
28#[derive(Debug, Deserialize, Clone)]
29struct EvergreenConfigFile {
30 pub user: String,
31 pub api_key: String,
32 pub api_server_host: String,
33}
34
35fn get_evg_config(path: &Path) -> Result<EvergreenConfigFile, EvgError> {
36 let contents = fs::read_to_string(&path)?;
37 let evg_config: EvergreenConfigFile = serde_yaml::from_str(&contents)?;
38 Ok(evg_config)
39}
40
41#[async_trait]
42pub trait EvgApiClient: Sync + Send {
43 async fn get_task(&self, task_id: &str) -> Result<EvgTask, EvgError>;
45 async fn get_version(&self, version_id: &str) -> Result<EvgVersion, EvgError>;
47 async fn get_build(&self, build_id: &str) -> Result<Option<EvgBuild>, EvgError>;
49 async fn get_tests(&self, task_id: &str) -> Result<Vec<EvgTest>, EvgError>;
51 async fn get_test_stats(
53 &self,
54 project_id: &str,
55 query: &EvgTestStatsRequest,
56 ) -> Result<Vec<EvgTestStats>, EvgError>;
57 async fn get_task_stats(
59 &self,
60 project_id: &str,
61 query: &EvgTaskStatsRequest,
62 ) -> Result<Vec<EvgTaskStats>, EvgError>;
63 fn stream_versions(&self, project_id: &str) -> BoxedStream<EvgVersion>;
65 fn stream_user_patches(&self, user_id: &str, limit: Option<usize>) -> BoxedStream<EvgPatch>;
67 fn stream_project_patches(
69 &self,
70 project_id: &str,
71 limit: Option<usize>,
72 ) -> BoxedStream<EvgPatch>;
73 fn stream_build_tasks(&self, build_id: &str, status: Option<&str>) -> BoxedStream<EvgTask>;
75 fn stream_log(&self, task: &EvgTask, log_name: &str) -> BoxedStream<String>;
77 fn stream_test_log(&self, test: &EvgTest) -> BoxedStream<String>;
79}
80
81#[derive(Clone)]
82pub struct EvgClient {
83 evg_config: EvergreenConfigFile,
84 client: Client,
85}
86
87impl EvgClient {
88 pub fn new() -> Result<EvgClient, EvgError> {
90 let home = std::env::var("HOME")?;
91 let path = format!("{}/{}", home, DEFAULT_CONFIG_FILE);
92 Self::from_file(Path::new(&path))
93 }
94
95 pub fn from_file(config_file: &Path) -> Result<EvgClient, EvgError> {
97 let evg_config = get_evg_config(config_file)?;
98 let mut headers = HeaderMap::new();
99 headers.insert("Api-User", HeaderValue::from_str(&evg_config.user)?);
100 headers.insert("Api-Key", HeaderValue::from_str(&evg_config.api_key)?);
101 let client = reqwest::Client::builder()
102 .default_headers(headers)
103 .build()?;
104
105 Ok(EvgClient { evg_config, client })
106 }
107
108 fn build_url(&self, endpoint: &str, arg: &str) -> String {
109 format!(
110 "{}/rest/v2/{}/{}",
111 self.evg_config.api_server_host, endpoint, arg
112 )
113 }
114}
115
116#[async_trait]
117impl EvgApiClient for EvgClient {
118 async fn get_task(&self, task_id: &str) -> Result<EvgTask, EvgError> {
119 let url = self.build_url("tasks", task_id);
120 let response = self.client.get(&url).send().await?;
121 Ok(response.json().await?)
122 }
123
124 async fn get_version(&self, version_id: &str) -> Result<EvgVersion, EvgError> {
125 let url = self.build_url("versions", version_id);
126 let response = self.client.get(&url).send().await?;
127 Ok(response.json().await?)
128 }
129
130 async fn get_build(&self, build_id: &str) -> Result<Option<EvgBuild>, EvgError> {
131 let url = self.build_url("builds", build_id);
132 let response = self.client.get(&url).send().await?;
133 if response.status() == 404 {
134 Ok(None)
135 } else {
136 Ok(Some(response.json().await?))
137 }
138 }
139
140 async fn get_tests(&self, task_id: &str) -> Result<Vec<EvgTest>, EvgError> {
141 let url = format!("{}/tests", self.build_url("tasks", task_id));
142 let mut results: Vec<EvgTest> = vec![];
143 let mut response = self.client.get(&url).send().await?;
144 loop {
145 let next_link = next_link(&response);
146 let result_batch: Vec<EvgTest> = response.json().await?;
147 results.extend(result_batch);
148
149 if let Some(next) = next_link {
150 response = self.client.get(&next).send().await?;
151 } else {
152 break;
153 }
154 }
155 Ok(results)
156 }
157
158 async fn get_test_stats(
159 &self,
160 project_id: &str,
161 query: &EvgTestStatsRequest,
162 ) -> Result<Vec<EvgTestStats>, EvgError> {
163 let url = format!("{}/test_stats", self.build_url("projects", project_id));
164 let response = self.client.get(&url).query(query).send().await?;
165
166 Ok(response.json().await?)
167 }
168
169 async fn get_task_stats(
170 &self,
171 project_id: &str,
172 query: &EvgTaskStatsRequest,
173 ) -> Result<Vec<EvgTaskStats>, EvgError> {
174 let url = format!("{}/task_stats", self.build_url("projects", project_id));
175 let response = self.client.get(&url).query(query).send().await?;
176 Ok(response.json().await?)
177 }
178
179 fn stream_versions(&self, project_id: &str) -> BoxedStream<EvgVersion> {
180 let url = format!(
181 "{}/versions?requester=gitter_request",
182 self.build_url("projects", project_id)
183 );
184 let client = self.client.clone();
185
186 Box::pin(stream! {
187 let mut response = client.get(&url).send().await.unwrap();
188 loop {
189 let next_link = next_link(&response);
190 let result_batch: Vec<EvgVersion> = response.json().await.unwrap();
191 for version in result_batch {
192 yield version;
193 }
194
195 if let Some(next) = next_link {
196 response = client.get(&next).send().await.unwrap();
197 } else {
198 break;
199 }
200 }
201 })
202 }
203
204 fn stream_user_patches(&self, user_id: &str, limit: Option<usize>) -> BoxedStream<EvgPatch> {
205 let mut url = format!("{}/patches", self.build_url("users", user_id));
206 if let Some(l) = limit {
207 url = format!("{}?limit={}", url, l);
208 }
209 let client = self.client.clone();
210
211 Box::pin(stream! {
212 let mut response = client.get(&url).send().await.unwrap();
213 loop {
214 let next_link = next_link(&response);
215 let result_batch: Vec<EvgPatch> = response.json().await.unwrap();
216 for patch in result_batch {
217 yield patch;
218 }
219
220 if let Some(next) = next_link {
221 response = client.get(&next).send().await.unwrap();
222 } else {
223 break;
224 }
225 }
226 })
227 }
228
229 fn stream_project_patches(
230 &self,
231 project_id: &str,
232 limit: Option<usize>,
233 ) -> BoxedStream<EvgPatch> {
234 let mut url = format!("{}/patches", self.build_url("projects", project_id));
235 if let Some(l) = limit {
236 url = format!("{}?limit={}", url, l);
237 }
238 let client = self.client.clone();
239
240 Box::pin(stream! {
241 let mut response = client.get(&url).send().await.unwrap();
242 loop {
243 let next_link = next_link(&response);
244 let result_batch: Vec<EvgPatch> = response.json().await.unwrap();
245 for patch in result_batch {
246 yield patch;
247 }
248
249 if let Some(next) = next_link {
250 response = client.get(&next).send().await.unwrap();
251 } else {
252 break;
253 }
254 }
255 })
256 }
257
258 fn stream_build_tasks(&self, build_id: &str, status: Option<&str>) -> BoxedStream<EvgTask> {
259 let mut url = format!("{}/tasks", self.build_url("builds", build_id));
260 if let Some(s) = status {
261 url = format!("{}?status={}", url, s);
262 }
263 let client = self.client.clone();
264
265 Box::pin(stream! {
266 let mut response = client.get(&url).send().await.unwrap();
267 loop {
268 let next_link = next_link(&response);
269 let result_batch: Vec<EvgTask> = response.json().await.unwrap();
270 for patch in result_batch {
271 yield patch;
272 }
273
274 if let Some(next) = next_link {
275 response = client.get(&next).send().await.unwrap();
276 } else {
277 break;
278 }
279 }
280 })
281 }
282
283 fn stream_log(&self, task: &EvgTask, log_name: &str) -> BoxedStream<String> {
284 let task_log = format!("{}&text=true", task.logs.get(log_name).unwrap());
285 let stream_future = self.client.get(&task_log).send();
286 Box::pin(stream! {
287 let mut stream = stream_future.await.unwrap().bytes_stream();
288 while let Some(item) = stream.next().await {
289 match item {
290 Ok(bytes) => {
291 let lines = std::str::from_utf8(&bytes).unwrap().split('\n');
292 for l in lines {
293 yield l.to_string();
294 }
295 }
296 _ => break,
297 }
298 }
299 })
300 }
301
302 fn stream_test_log(&self, test: &EvgTest) -> BoxedStream<String> {
303 let stream_future = self.client.get(&test.logs.url_raw).send();
304
305 Box::pin(stream! {
306 let mut stream = stream_future.await.unwrap().bytes_stream();
307 while let Some(item) = stream.next().await {
308 match item {
309 Ok(bytes) => {
310 let lines = std::str::from_utf8(&bytes).unwrap().split('\n');
311 for l in lines {
312 yield l.to_string();
313 }
314 }
315 _ => break,
316 }
317 }
318 })
319 }
320}
321
322fn next_link(response: &Response) -> Option<String> {
323 if let Some(header) = response.headers().get(LINK) {
324 let links = parse_link_header::parse(header.to_str().unwrap()).unwrap();
325 let next_link = links.get(&Some("next".to_string()));
326
327 return next_link.map(|l| l.uri.to_string());
328 }
329 None
330}
331
332#[cfg(test)]
333mod tests {
334 use super::*;
335 use http::response::Builder;
336 use reqwest::Response;
337
338 #[test]
339 fn test_next_link_should_return_link_if_it_exists() {
340 let response = Response::from(Builder::new()
341 .status(200)
342 .header(LINK, String::from("<https://evergreen.mongodb.com/rest/v2/tasks/task_id/tests?limit=100&start_at=abc123>; rel=\"next\""))
343 .body("foo")
344 .unwrap());
345
346 let next_link = next_link(&response);
347
348 assert_eq!(next_link, Some(String::from("https://evergreen.mongodb.com/rest/v2/tasks/task_id/tests?limit=100&start_at=abc123")));
349 }
350
351 #[test]
352 fn test_next_link_should_return_none_if_no_next() {
353 let response = Response::from(Builder::new()
354 .status(200)
355 .header(LINK, String::from("<https://evergreen.mongodb.com/rest/v2/tasks/task_id/tests?limit=100&start_at=abc123>; rel=\"other\""))
356 .body("foo")
357 .unwrap());
358
359 let next_link = next_link(&response);
360
361 assert_eq!(next_link, None);
362 }
363
364 #[test]
365 fn test_next_link_should_return_none_if_no_link_header() {
366 let response = Response::from(Builder::new()
367 .status(200)
368 .header("something", String::from("<https://evergreen.mongodb.com/rest/v2/tasks/task_id/tests?limit=100&start_at=abc123>; rel=\"other\""))
369 .body("foo")
370 .unwrap());
371
372 let next_link = next_link(&response);
373
374 assert_eq!(next_link, None);
375 }
376}