ci_manager/ci_provider/
github.rs

1use std::io::Read;
2
3pub mod util;
4
5use crate::{
6    ci_provider::github::util::{
7        distance_to_other_issues, job_error_logs_from_log_and_failed_jobs_and_steps,
8        repo_url_to_run_url, run_url_to_job_url, JobErrorLog,
9    },
10    err_parse::parse_error_message,
11    issue::{FailedJob, FirstFailedStep},
12    *,
13};
14use hyper::body;
15use octocrab::{
16    models::{
17        issues::Issue,
18        workflows::{Conclusion, Job, Run},
19        Label, RunId,
20    },
21    params::{workflows::Filter, State},
22    Octocrab, *,
23};
24
25use super::util::*;
26use anyhow::Result;
27
28pub static GITHUB_CLIENT: OnceLock<GitHub> = OnceLock::new();
29
30pub struct GitHub {
31    client: Octocrab,
32}
33
34impl GitHub {
35    /// Get a reference to the global config
36    pub fn get() -> &'static GitHub {
37        GITHUB_CLIENT.get_or_init(|| Self::init().expect("Failed to initialize GitHub client"))
38    }
39
40    fn init() -> Result<GitHub> {
41        let github_client = match env::var("GITHUB_TOKEN") {
42            Ok(token) => GitHub::new(&token)?,
43            Err(e) => {
44                log::debug!("{e:?}");
45                log::warn!("GITHUB_TOKEN not set, using unauthenticated client");
46                Self {
47                    client: Octocrab::default(),
48                }
49            }
50        };
51        Ok(github_client)
52    }
53
54    fn new(token: &str) -> Result<Self> {
55        let client = Octocrab::builder()
56            .personal_token(token.to_owned())
57            .build()?;
58        Ok(Self { client })
59    }
60
61    pub async fn create_issue_from_run(
62        &self,
63        repo: &String,
64        run_id: &String,
65        label: &String,
66        kind: &commands::WorkflowKind,
67        no_duplicate: bool,
68        title: &String,
69    ) -> Result<()> {
70        log::debug!(
71            "Creating issue from:\n\
72            \trepo: {repo}\n\
73            \trun_id: {run_id}\n\
74            \tlabel: {label}\n\
75            \tkind: {kind}\n\
76            \tno_duplicate: {no_duplicate}\n\
77            \ttitle: {title}",
78        );
79        let (owner, repo) = repo_to_owner_repo_fragments(repo)?;
80        let run_url = repo_url_to_run_url(&format!("github.com/{owner}/{repo}"), run_id);
81        let run_id: u64 = run_id.parse()?;
82
83        let workflow_run = self.workflow_run(&owner, &repo, RunId(run_id)).await?;
84        log::debug!("{workflow_run:?}");
85
86        if workflow_run.conclusion != Some("failure".to_string()) {
87            log::info!(
88                "Workflow run didn't fail, but has conclusion: {:?}. Continuing...",
89                workflow_run.conclusion
90            );
91        }
92
93        let mut jobs = self.workflow_run_jobs(&owner, &repo, RunId(run_id)).await?;
94        log::info!("Got {} job(s) for the workflow run", jobs.len());
95        if jobs.is_empty() {
96            bail!("No jobs found for the workflow run");
97        }
98
99        // Take only jobs from the most recent attempt
100        let max_attempt = jobs
101            .iter()
102            .max_by_key(|job| job.run_attempt)
103            .unwrap()
104            .run_attempt;
105        jobs.retain(|job| job.run_attempt == max_attempt);
106
107        let jobs = jobs; // Make immutable again
108
109        let failed_jobs = jobs
110            .iter()
111            .filter(|job| job.conclusion == Some(Conclusion::Failure))
112            .collect::<Vec<_>>();
113
114        log::info!(
115            "Found {} failed job(s): {}",
116            failed_jobs.len(),
117            failed_jobs
118                .iter()
119                .map(|j| j.name.as_str())
120                .collect::<Vec<_>>()
121                .join(", ")
122        );
123
124        let failed_steps = failed_jobs
125            .iter()
126            .flat_map(|job| job.steps.iter())
127            .filter(|step| step.conclusion == Some(Conclusion::Failure))
128            .collect::<Vec<_>>();
129        log::info!(
130            "Found {} failed step(s): {}",
131            failed_steps.len(),
132            failed_steps
133                .iter()
134                .map(|s| s.name.as_str())
135                .collect::<Vec<_>>()
136                .join(", ")
137        );
138        failed_steps.iter().for_each(|step| {
139            log::debug!("{step:?}");
140        });
141
142        let logs = self
143            .download_workflow_run_logs(&owner, &repo, RunId(run_id))
144            .await?;
145        log::info!("Downloaded {} logs", logs.len());
146        log::info!(
147            "Log names sorted by timestamp:\n{logs}",
148            logs = logs
149                .iter()
150                .map(|log| log.name.as_str())
151                .collect::<Vec<_>>()
152                .join("\n")
153        );
154        logs.iter().for_each(|log| {
155            log::debug!("{log:?}");
156        });
157
158        let job_error_logs: Vec<JobErrorLog> = job_error_logs_from_log_and_failed_jobs_and_steps(
159            &logs,
160            failed_jobs.as_slice(),
161            &failed_steps,
162        );
163
164        util::log_info_downloaded_job_error_logs(&job_error_logs);
165
166        // Parse to a github issue
167        // Map the GitHub Job to a `FailedJob`
168        let failed_jobs = job_error_logs
169            .iter()
170            .map(|job| {
171                let job_id_str = job.job_id.to_string();
172                let job_url = run_url_to_job_url(&run_url, &job_id_str);
173                let continuous_errorlog_msgs = job.logs_as_str();
174                let first_failed_step: FirstFailedStep = match job.failed_step_logs.first() {
175                    Some(first_failed_step_log) => {
176                        FirstFailedStep::StepName(first_failed_step_log.step_name.to_owned())
177                    }
178                    // This can happen if the job times out while waiting for a runner to pick it up
179                    // Relevant issue: https://github.com/luftkode/ci-manager/issues/4
180                    None => FirstFailedStep::NoStepsExecuted,
181                };
182                let parsed_msg = parse_error_message(&continuous_errorlog_msgs, *kind).unwrap();
183                FailedJob::new(
184                    job.job_name.to_owned(),
185                    job_id_str,
186                    job_url,
187                    first_failed_step,
188                    parsed_msg,
189                )
190            })
191            .collect();
192
193        let mut issue = issue::Issue::new(
194            title.to_owned(),
195            run_id.to_string(),
196            run_url,
197            failed_jobs,
198            label.to_owned(),
199        );
200        log::debug!("generic issue instance: {issue:?}");
201        // Check if-no-duplicate is set
202        if no_duplicate {
203            log::info!("No-duplicate flag is set, checking for similar issues");
204            // Then check if a similar issue exists
205            let open_issues = self
206                .issues_at(
207                    &owner,
208                    &repo,
209                    DateFilter::None,
210                    State::Open,
211                    LabelFilter::All([label]),
212                )
213                .await?;
214            log::info!(
215                "Found {num_issues} open issue(s) with label {label}",
216                num_issues = open_issues.len()
217            );
218            let min_distance = distance_to_other_issues(&issue.body(), &open_issues);
219            log::info!("Minimum distance to similar issue: {min_distance}");
220            match min_distance {
221                0 => {
222                    log::warn!("An issue with the exact same body already exists. Exiting...");
223                    return Ok(());
224                }
225                _ if min_distance < issue::similarity::LEVENSHTEIN_THRESHOLD => {
226                    log::warn!("An issue with a similar body already exists. Exiting...");
227                    return Ok(());
228                }
229                _ => log::info!("No similar issue found. Continuing..."),
230            }
231        }
232
233        // Get all labels for the repo, and create the ones that don't exist
234        let all_labels = self.get_all_labels(&owner, &repo).await?;
235        log::info!("Got {num_labels} label(s)", num_labels = all_labels.len());
236        let labels_to_create: Vec<String> = issue
237            .labels()
238            .iter()
239            .filter(|label| !all_labels.iter().any(|l| l.name.eq(*label)))
240            .cloned()
241            .collect();
242        if !labels_to_create.is_empty() {
243            log::info!(
244                "{} label(s) determined for the issue-to-be-created do not yet exist on the repo, and will be created: {labels_to_create:?}",
245                labels_to_create.len()
246            );
247        }
248
249        // Check if dry-run is set
250        if Config::global().dry_run() {
251            // Then print the issue to be created instead of creating it
252            println!("####################################");
253            println!("DRY RUN MODE! The following issue would be created:");
254            println!("==== ISSUE TITLE ==== \n{}", issue.title());
255            println!("==== ISSUE LABEL(S) ==== \n{}", issue.labels().join(","));
256            println!("==== START OF ISSUE BODY ==== \n{}", issue.body());
257            println!("==== END OF ISSUE BODY ====");
258        } else {
259            // Create the labels that don't exist
260            for issue_label in labels_to_create {
261                log::info!("Creating label: {issue_label}");
262                self.client
263                    .issues(&owner, &repo)
264                    .create_label(issue_label, "FF0000", "")
265                    .await?; // Await the completion of the create_label future
266            }
267            self.create_issue(&owner, &repo, issue).await?;
268        }
269
270        Ok(())
271    }
272
273    pub async fn open_issues(&self, owner: &str, repo: &str) -> Result<Vec<Issue>> {
274        self.issues(
275            owner,
276            repo,
277            State::Open,
278            DateFilter::None,
279            LabelFilter::none(),
280        )
281        .await
282    }
283
284    pub async fn issues_at<I, S>(
285        &self,
286        owner: &str,
287        repo: &str,
288        date: DateFilter,
289        state: State,
290        labels: LabelFilter<I, S>,
291    ) -> Result<Vec<Issue>>
292    where
293        S: AsRef<str> + fmt::Display + fmt::Debug,
294        I: IntoIterator<Item = S> + Clone + fmt::Debug,
295    {
296        log::debug!("Getting issues for {owner}/{repo} with date={date:?}, state={state:?}, labels={labels:?}");
297        self.issues(owner, repo, state, date, labels).await
298    }
299
300    /// Create an issue
301    pub async fn create_issue(
302        &self,
303        owner: &str,
304        repo: &str,
305        mut issue: issue::Issue,
306    ) -> Result<()> {
307        let body_str = issue.body();
308        log::debug!(
309            "Creating issue for {owner}/{repo} with\n\
310        \ttitle:  {title}\n\
311        \tlabels: {labels:?}\n\
312        \tbody:   {body}",
313            title = issue.title(),
314            body = body_str,
315            labels = issue.labels()
316        );
317        // The maximum size of a GitHub issue body is 65536
318        if issue.body().len() > 65536 {
319            log::error!(
320                "Issue body is too long: {len} characters. Maximum for GitHub issues is 65536. Exiting...",
321                len = issue.body().len()
322            );
323            bail!("Issue body is too long");
324        }
325
326        self.client
327            .issues(owner, repo)
328            .create(issue.title())
329            .body(issue.body())
330            .labels(issue.labels().to_vec())
331            .send()
332            .await?;
333        Ok(())
334    }
335
336    // Utility function to get issues
337    async fn issues<I, S>(
338        &self,
339        owner: &str,
340        repo: &str,
341        state: State,
342        date: DateFilter,
343        labels: LabelFilter<I, S>,
344    ) -> Result<Vec<Issue>>
345    where
346        S: AsRef<str> + fmt::Display + fmt::Debug,
347        I: IntoIterator<Item = S> + Clone,
348    {
349        let label_filter = labels.to_string();
350
351        let date_filter = date.to_string();
352
353        let issue_state = match state {
354            State::Open => "is:open",
355            State::Closed => "is:closed",
356            State::All => "",
357            _ => bail!("Invalid state"),
358        };
359
360        let query_str =
361            format!("repo:{owner}/{repo} is:issue {issue_state} {date_filter} {label_filter}");
362        log::debug!("Query string={query_str}");
363        let issues = self
364            .client
365            .search()
366            .issues_and_pull_requests(&query_str)
367            .send()
368            .await?;
369
370        Ok(issues.items)
371    }
372
373    pub async fn get_all_labels(&self, owner: &str, repo: &str) -> Result<Vec<Label>> {
374        let label_page = self
375            .client
376            .issues(owner, repo)
377            .list_labels_for_repo()
378            .send()
379            .await?;
380        Ok(label_page.items)
381    }
382
383    pub async fn workflow_run(&self, owner: &str, repo: &str, run_id: RunId) -> Result<Run> {
384        log::debug!("Getting workflow run {run_id} for {owner}/{repo}");
385        let run = self.client.workflows(owner, repo).get(run_id).await?;
386        Ok(run)
387    }
388
389    pub async fn workflow_run_jobs(
390        &self,
391        owner: &str,
392        repo: &str,
393        run_id: RunId,
394    ) -> Result<Vec<Job>> {
395        log::debug!("Getting workflow run jobs for {run_id} for {owner}/{repo}");
396        let jobs = self
397            .client
398            .workflows(owner, repo)
399            .list_jobs(run_id)
400            .page(1u8)
401            .filter(Filter::All)
402            .send()
403            .await?;
404        Ok(jobs.items)
405    }
406
407    /// Get the entire raw log for a job
408    ///
409    /// # Note
410    /// The log does not contain the name of the workflow steps, only the output of the steps. It is
411    /// therefore not feasible to parse the log to find the step that failed.
412    /// Instead use [`download_workflow_run_logs`][GitHub::download_workflow_run_logs] to get the logs for the entire workflow run.
413    pub async fn download_job_logs(&self, owner: &str, repo: &str, job_id: u64) -> Result<String> {
414        use http_body_util::BodyExt;
415        use hyper::Uri;
416        log::debug!("Downloading logs for job {job_id} for {owner}/{repo}");
417        // Workaround until https://github.com/XAMPPRocky/octocrab/issues/394 is fixed
418        // adapted from: https://github.com/XAMPPRocky/octocrab/issues/394#issuecomment-1586054876
419
420        // route: https://docs.github.com/en/rest/actions/workflow-jobs?apiVersion=2022-11-28#download-job-logs-for-a-workflow-run
421        let route = format!("/repos/{owner}/{repo}/actions/jobs/{job_id}/logs");
422        let uri = Uri::builder().path_and_query(route).build()?;
423        // The endpoint returns a link to the logs, so configure the client to follow the redirect and return the data
424        let data_response = self
425            .client
426            .follow_location_to_data(self.client._get(uri).await?)
427            .await?;
428        let boxbody = data_response.into_body();
429        // Read the streaming body into a byte vector
430        let body_bytes = BodyExt::collect(boxbody).await?.to_bytes().to_vec();
431        log::debug!("Downloaded {} bytes", body_bytes.len());
432        let body_str = String::from_utf8_lossy(&body_bytes).to_string();
433        Ok(body_str)
434    }
435
436    /// Download the logs for a workflow run as a zip file, and extract the logs into a vector of [`JobLog`]s
437    /// sorted by the timestamp appearing in the logs.
438    ///
439    /// # Note
440    /// The logs are from the entire workflow run and all attempts, not just the most recent attempt.
441    pub async fn download_workflow_run_logs(
442        &self,
443        owner: &str,
444        repo: &str,
445        run_id: RunId,
446    ) -> Result<Vec<JobLog>> {
447        log::debug!("Downloading logs for {run_id} for {owner}/{repo}");
448        let logs_zip = self
449            .client
450            .actions()
451            .download_workflow_run_logs(owner, repo, run_id)
452            .await?;
453
454        log::debug!("Downloaded logs: {} bytes", logs_zip.len());
455        let zip_reader = io::Cursor::new(logs_zip);
456        let mut archive = zip::ZipArchive::new(zip_reader)?;
457
458        log::info!(
459            "Extracting {} log(s) from downloaded zip archive",
460            archive.len()
461        );
462
463        let mut logs = Vec::new();
464        for i in 0..archive.len() {
465            let mut file = archive.by_index(i)?;
466            log::info!("Extracting file: {} | size={}", file.name(), file.size());
467            if file.size() == 0 {
468                log::debug!("Skipping empty file: {}", file.name());
469                continue;
470            }
471
472            let mut contents = String::with_capacity(1024);
473            file.read_to_string(&mut contents)?;
474            logs.push(JobLog::new(file.name().to_string(), contents));
475        }
476
477        log::debug!("Extracted logs: {} characters", logs.len());
478        log::trace!("{logs:?}");
479
480        // The logs are received in a random order, so we sort them by timestamp
481        logs.sort_unstable_by(|a, b| {
482            let a = timestamp_from_log(&a.content).unwrap();
483            let b = timestamp_from_log(&b.content).unwrap();
484            a.cmp(&b)
485        });
486
487        Ok(logs)
488    }
489}
490
491#[cfg(test)]
492mod tests {
493    use super::*;
494    use octocrab::models::workflows::Conclusion;
495    use pretty_assertions::{assert_eq, assert_ne};
496
497    #[tokio::test]
498    async fn test_get_issues() {
499        let issues = GitHub::get()
500            .issues_at(
501                "docker",
502                "buildx",
503                DateFilter::Created(Date {
504                    year: 2019,
505                    month: 6,
506                    day: 2,
507                }),
508                State::Closed,
509                LabelFilter::none(),
510            )
511            .await
512            .unwrap();
513        assert_eq!(issues.len(), 1);
514        assert_eq!(issues[0].title, "Building for ARM causes error often");
515        assert_eq!(issues[0].number, 88);
516    }
517
518    #[tokio::test]
519    async fn test_get_issues_by_label() {
520        let issues = GitHub::get()
521            .issues(
522                "docker",
523                "buildx",
524                State::Open,
525                DateFilter::None,
526                LabelFilter::All(["kind/bug", "area/bake"]),
527            )
528            .await
529            .unwrap();
530        println!("{}", issues.len());
531        assert_ne!(issues.len(), 0);
532    }
533
534    #[tokio::test]
535    async fn test_get_workflow_run() {
536        let run = GitHub::get()
537            .workflow_run("gregerspoulsen", "artisan_tools", RunId(8172341325))
538            .await
539            .unwrap();
540        //println!("{run:?}");
541        assert_eq!(run.id, RunId(8172341325));
542        assert_eq!(run.status, "completed");
543    }
544
545    #[tokio::test]
546    async fn test_get_workflow_failed_run() {
547        let run = GitHub::get()
548            .workflow_run("gregerspoulsen", "artisan_tools", RunId(8172179418))
549            .await
550            .unwrap();
551        println!("{run:?}");
552        assert_eq!(run.id, RunId(8172179418));
553        assert_eq!(run.status, "completed");
554        assert_eq!(run.conclusion, Some("failure".to_string()));
555    }
556
557    #[tokio::test]
558    #[ignore = "Needs a valid GITHUB_TOKEN with read access to public repos"]
559    async fn test_get_workflow_run_jobs() {
560        let jobs = GitHub::get()
561            .workflow_run_jobs("gregerspoulsen", "artisan_tools", RunId(8172179418))
562            .await
563            .unwrap();
564        assert_eq!(jobs.len(), 1);
565        assert_eq!(jobs[0].conclusion, Some(Conclusion::Failure));
566        assert_eq!(jobs[0].steps.len(), 5);
567        assert_eq!(jobs[0].steps[0].name, "Set up job");
568
569        let failed_jobs = jobs
570            .iter()
571            .filter(|job| job.conclusion == Some(Conclusion::Failure))
572            .collect::<Vec<_>>();
573        let failed_steps = failed_jobs[0]
574            .steps
575            .iter()
576            .filter(|step| step.conclusion == Some(Conclusion::Failure))
577            .collect::<Vec<_>>();
578        assert_eq!(failed_steps.len(), 1);
579        assert_eq!(failed_steps[0].name, "Run tests");
580    }
581
582    #[tokio::test]
583    #[ignore = "Might fail when running with `cargo test` (If another test sets the GITHUB_TOKEN env var)"]
584    async fn test_download_workflow_run_logs() {
585        let owner = "docker";
586        let repo = "buildx";
587        let run_id = RunId(8302026485);
588        GitHub::init().unwrap();
589        let logs = GitHub::get()
590            .download_workflow_run_logs(owner, repo, run_id)
591            .await
592            .unwrap();
593        for log in &logs {
594            eprintln!("{}\n{}", log.name, log.content);
595        }
596        assert_eq!(logs.len(), 37);
597    }
598}