1pub mod gh;
2mod ghwpt;
3mod hgmo;
4pub mod taskcluster;
5mod utils;
6
7use regex::Regex;
8use std::io;
9use std::path::{Path, PathBuf};
10use std::sync::{Arc, Mutex};
11use taskcluster::{tasks_complete, TaskGroupTask, Taskcluster, TaskclusterCI};
12use thiserror::Error;
13use utils::download;
14
15pub type Result<T> = std::result::Result<T, Error>;
16
17#[derive(Error, Debug)]
18pub enum Error {
19 #[error(transparent)]
20 Reqwest(#[from] reqwest::Error),
21 #[error(transparent)]
22 Serde(#[from] serde_json::Error),
23 #[error(transparent)]
24 Io(#[from] io::Error),
25 #[error("{0}")]
26 String(String),
27}
28
29fn fetch_job_logs(
30 client: &reqwest::blocking::Client,
31 taskcluster: &Taskcluster,
32 out_dir: &Path,
33 tasks: Vec<TaskGroupTask>,
34 artifact_name: &str,
35 compress: bool,
36) -> Vec<(TaskGroupTask, PathBuf)> {
37 let mut pool = scoped_threadpool::Pool::new(8);
38 let paths = Arc::new(Mutex::new(Vec::with_capacity(tasks.len())));
39
40 pool.scoped(|scope| {
42 for task in tasks.into_iter() {
43 scope.execute(|| {
44 let task_id = task.status.taskId.clone();
45 let client = client.clone();
46
47 let artifacts = match taskcluster.get_artifacts(&client, &task_id) {
48 Ok(x) => x,
49 Err(err) => {
50 eprintln!("{}", err);
51 return;
52 }
53 };
54 let artifact = artifacts
56 .iter()
57 .find(|&artifact| artifact.name.ends_with(artifact_name));
58 if let Some(artifact) = artifact {
59 let ext = if compress { ".zstd" } else { "" };
60
61 let name = PathBuf::from(format!(
62 "{}-{}-{}{}",
63 task.task.metadata.name.replace('/', "-"),
64 &task.status.taskId,
65 artifact_name,
66 ext
67 ));
68 let dest = out_dir.join(name);
69
70 if dest.exists() {
71 println!("{} exists locally, skipping", dest.to_string_lossy());
72 } else {
73 let log_url = taskcluster.get_log_url(&task_id, artifact);
74
75 println!("Downloading {} to {}", log_url, dest.to_string_lossy());
76 download(&client, &dest, &log_url, compress);
77 }
78 {
79 let mut paths = paths.lock().unwrap();
80 (*paths).push((task, dest));
81 }
82 }
83 });
84 }
85 });
86 Arc::try_unwrap(paths).unwrap().into_inner().unwrap()
87}
88
89fn include_task(task: &TaskGroupTask, task_filters: &[TaskFilter]) -> bool {
90 let name = &task.task.metadata.name;
91 task_filters.iter().all(|filter| filter.is_match(name))
92}
93
94#[derive(Debug)]
95pub struct TaskFilter {
96 filter_re: Regex,
97 invert: bool,
98}
99
100impl TaskFilter {
101 pub fn new(filter_str: &str) -> Result<TaskFilter> {
102 let invert = filter_str.starts_with('!');
103 let mut re_str = if invert { &filter_str[1..] } else { filter_str };
104 let filter_string: String;
105 if !re_str.starts_with('^') {
106 filter_string = format!("^.*(?:{})", re_str);
107 re_str = &filter_string
108 }
109 Regex::new(re_str)
110 .map(|filter_re| TaskFilter { filter_re, invert })
111 .map_err(|_| {
112 Error::String(format!(
113 "Filter `{}` can't be parsed as a regular expression",
114 filter_str
115 ))
116 })
117 }
118
119 pub(crate) fn is_match(&self, name: &str) -> bool {
120 let mut is_match = self.filter_re.is_match(name);
121 if self.invert {
122 is_match = !is_match;
123 }
124 is_match
125 }
126}
127
128fn get_ci(repo: &str, taskcluster_base: Option<&str>) -> Option<Box<dyn TaskclusterCI>> {
129 match repo {
130 "wpt" => Some(Box::new(ghwpt::GithubCI::new(taskcluster_base))),
131 _ => {
132 if let Some(ci) = hgmo::HgmoCI::for_repo(taskcluster_base, repo.into()) {
133 Some(Box::new(ci))
134 } else {
135 None
136 }
137 }
138 }
139}
140
141pub fn check_complete(taskcluster_base: Option<&str>, repo: &str, commit: &str) -> Result<bool> {
142 let client = reqwest::blocking::Client::new();
143 let ci = get_ci(repo, taskcluster_base)
144 .ok_or_else(|| Error::String(format!("No such repo {}", repo)))?;
145 let taskgroups = ci.get_taskgroups(&client, commit)?;
146 let mut tasks = Vec::new();
147 for taskgroup in taskgroups {
148 tasks.extend(ci.taskcluster().get_taskgroup_tasks(&client, &taskgroup)?)
149 }
150 Ok(tasks_complete(tasks.iter()))
151}
152
153pub fn download_artifacts(
154 taskcluster_base: Option<&str>,
155 repo: &str,
156 commit: &str,
157 task_filters: Option<Vec<TaskFilter>>,
158 artifact_name: Option<&str>,
159 check_complete: bool,
160 out_dir: &Path,
161 compress: bool,
162) -> Result<Vec<(TaskGroupTask, PathBuf)>> {
163 let client = reqwest::blocking::Client::new();
164
165 let ci = get_ci(repo, taskcluster_base)
166 .ok_or_else(|| Error::String(format!("No such repo {}", repo)))?;
167
168 let task_filters = task_filters.unwrap_or_else(|| ci.default_task_filter());
169 let taskgroups = ci.get_taskgroups(&client, commit)?;
170 let artifact_name = artifact_name.unwrap_or_else(|| ci.default_artifact_name());
171
172 let mut tasks = Vec::new();
173 for taskgroup in taskgroups {
174 tasks.extend(ci.taskcluster().get_taskgroup_tasks(&client, &taskgroup)?)
175 }
176 let tasks: Vec<TaskGroupTask> = tasks
177 .into_iter()
178 .filter(|task| include_task(task, &task_filters))
179 .collect();
180
181 if check_complete && !tasks_complete(tasks.iter()) {
182 return Err(Error::String("wpt tasks are not yet complete".into()));
183 }
184
185 Ok(fetch_job_logs(
186 &client,
187 ci.taskcluster(),
188 out_dir,
189 tasks,
190 artifact_name,
191 compress,
192 ))
193}