tcfetch/
lib.rs

1pub mod gh;
2mod ghwpt;
3mod hgmo;
4pub mod taskcluster;
5mod utils;
6
7use regex::Regex;
8use std::io;
9use std::path::{Path, PathBuf};
10use std::sync::{Arc, Mutex};
11use taskcluster::{tasks_complete, TaskGroupTask, Taskcluster, TaskclusterCI};
12use thiserror::Error;
13use utils::download;
14
15pub type Result<T> = std::result::Result<T, Error>;
16
17#[derive(Error, Debug)]
18pub enum Error {
19    #[error(transparent)]
20    Reqwest(#[from] reqwest::Error),
21    #[error(transparent)]
22    Serde(#[from] serde_json::Error),
23    #[error(transparent)]
24    Io(#[from] io::Error),
25    #[error("{0}")]
26    String(String),
27}
28
29fn fetch_job_logs(
30    client: &reqwest::blocking::Client,
31    taskcluster: &Taskcluster,
32    out_dir: &Path,
33    tasks: Vec<TaskGroupTask>,
34    artifact_name: &str,
35    compress: bool,
36) -> Vec<(TaskGroupTask, PathBuf)> {
37    let mut pool = scoped_threadpool::Pool::new(8);
38    let paths = Arc::new(Mutex::new(Vec::with_capacity(tasks.len())));
39
40    // TODO: Convert this to async
41    pool.scoped(|scope| {
42        for task in tasks.into_iter() {
43            scope.execute(|| {
44                let task_id = task.status.taskId.clone();
45                let client = client.clone();
46
47                let artifacts = match taskcluster.get_artifacts(&client, &task_id) {
48                    Ok(x) => x,
49                    Err(err) => {
50                        eprintln!("{}", err);
51                        return;
52                    }
53                };
54                // TODO: this selects too many artifacts, should split on separator and check for an exact match
55                let artifact = artifacts
56                    .iter()
57                    .find(|&artifact| artifact.name.ends_with(artifact_name));
58                if let Some(artifact) = artifact {
59                    let ext = if compress { ".zstd" } else { "" };
60
61                    let name = PathBuf::from(format!(
62                        "{}-{}-{}{}",
63                        task.task.metadata.name.replace('/', "-"),
64                        &task.status.taskId,
65                        artifact_name,
66                        ext
67                    ));
68                    let dest = out_dir.join(name);
69
70                    if dest.exists() {
71                        println!("{} exists locally, skipping", dest.to_string_lossy());
72                    } else {
73                        let log_url = taskcluster.get_log_url(&task_id, artifact);
74
75                        println!("Downloading {} to {}", log_url, dest.to_string_lossy());
76                        download(&client, &dest, &log_url, compress);
77                    }
78                    {
79                        let mut paths = paths.lock().unwrap();
80                        (*paths).push((task, dest));
81                    }
82                }
83            });
84        }
85    });
86    Arc::try_unwrap(paths).unwrap().into_inner().unwrap()
87}
88
89fn include_task(task: &TaskGroupTask, task_filters: &[TaskFilter]) -> bool {
90    let name = &task.task.metadata.name;
91    task_filters.iter().all(|filter| filter.is_match(name))
92}
93
94#[derive(Debug)]
95pub struct TaskFilter {
96    filter_re: Regex,
97    invert: bool,
98}
99
100impl TaskFilter {
101    pub fn new(filter_str: &str) -> Result<TaskFilter> {
102        let invert = filter_str.starts_with('!');
103        let mut re_str = if invert { &filter_str[1..] } else { filter_str };
104        let filter_string: String;
105        if !re_str.starts_with('^') {
106            filter_string = format!("^.*(?:{})", re_str);
107            re_str = &filter_string
108        }
109        Regex::new(re_str)
110            .map(|filter_re| TaskFilter { filter_re, invert })
111            .map_err(|_| {
112                Error::String(format!(
113                    "Filter `{}` can't be parsed as a regular expression",
114                    filter_str
115                ))
116            })
117    }
118
119    pub(crate) fn is_match(&self, name: &str) -> bool {
120        let mut is_match = self.filter_re.is_match(name);
121        if self.invert {
122            is_match = !is_match;
123        }
124        is_match
125    }
126}
127
128fn get_ci(repo: &str, taskcluster_base: Option<&str>) -> Option<Box<dyn TaskclusterCI>> {
129    match repo {
130        "wpt" => Some(Box::new(ghwpt::GithubCI::new(taskcluster_base))),
131        _ => {
132            if let Some(ci) = hgmo::HgmoCI::for_repo(taskcluster_base, repo.into()) {
133                Some(Box::new(ci))
134            } else {
135                None
136            }
137        }
138    }
139}
140
141pub fn check_complete(taskcluster_base: Option<&str>, repo: &str, commit: &str) -> Result<bool> {
142    let client = reqwest::blocking::Client::new();
143    let ci = get_ci(repo, taskcluster_base)
144        .ok_or_else(|| Error::String(format!("No such repo {}", repo)))?;
145    let taskgroups = ci.get_taskgroups(&client, commit)?;
146    let mut tasks = Vec::new();
147    for taskgroup in taskgroups {
148        tasks.extend(ci.taskcluster().get_taskgroup_tasks(&client, &taskgroup)?)
149    }
150    Ok(tasks_complete(tasks.iter()))
151}
152
153pub fn download_artifacts(
154    taskcluster_base: Option<&str>,
155    repo: &str,
156    commit: &str,
157    task_filters: Option<Vec<TaskFilter>>,
158    artifact_name: Option<&str>,
159    check_complete: bool,
160    out_dir: &Path,
161    compress: bool,
162) -> Result<Vec<(TaskGroupTask, PathBuf)>> {
163    let client = reqwest::blocking::Client::new();
164
165    let ci = get_ci(repo, taskcluster_base)
166        .ok_or_else(|| Error::String(format!("No such repo {}", repo)))?;
167
168    let task_filters = task_filters.unwrap_or_else(|| ci.default_task_filter());
169    let taskgroups = ci.get_taskgroups(&client, commit)?;
170    let artifact_name = artifact_name.unwrap_or_else(|| ci.default_artifact_name());
171
172    let mut tasks = Vec::new();
173    for taskgroup in taskgroups {
174        tasks.extend(ci.taskcluster().get_taskgroup_tasks(&client, &taskgroup)?)
175    }
176    let tasks: Vec<TaskGroupTask> = tasks
177        .into_iter()
178        .filter(|task| include_task(task, &task_filters))
179        .collect();
180
181    if check_complete && !tasks_complete(tasks.iter()) {
182        return Err(Error::String("wpt tasks are not yet complete".into()));
183    }
184
185    Ok(fetch_job_logs(
186        &client,
187        ci.taskcluster(),
188        out_dir,
189        tasks,
190        artifact_name,
191        compress,
192    ))
193}