1use std::io::Read;
2
3pub mod util;
4
5use crate::{
6 ci_provider::github::util::{
7 distance_to_other_issues, job_error_logs_from_log_and_failed_jobs_and_steps,
8 repo_url_to_run_url, run_url_to_job_url, JobErrorLog,
9 },
10 err_parse::parse_error_message,
11 issue::{FailedJob, FirstFailedStep},
12 *,
13};
14use hyper::body;
15use octocrab::{
16 models::{
17 issues::Issue,
18 workflows::{Conclusion, Job, Run},
19 Label, RunId,
20 },
21 params::{workflows::Filter, State},
22 Octocrab, *,
23};
24
25use super::util::*;
26use anyhow::Result;
27
28pub static GITHUB_CLIENT: OnceLock<GitHub> = OnceLock::new();
29
30pub struct GitHub {
31 client: Octocrab,
32}
33
34impl GitHub {
35 pub fn get() -> &'static GitHub {
37 GITHUB_CLIENT.get_or_init(|| Self::init().expect("Failed to initialize GitHub client"))
38 }
39
40 fn init() -> Result<GitHub> {
41 let github_client = match env::var("GITHUB_TOKEN") {
42 Ok(token) => GitHub::new(&token)?,
43 Err(e) => {
44 log::debug!("{e:?}");
45 log::warn!("GITHUB_TOKEN not set, using unauthenticated client");
46 Self {
47 client: Octocrab::default(),
48 }
49 }
50 };
51 Ok(github_client)
52 }
53
54 fn new(token: &str) -> Result<Self> {
55 let client = Octocrab::builder()
56 .personal_token(token.to_owned())
57 .build()?;
58 Ok(Self { client })
59 }
60
61 pub async fn create_issue_from_run(
62 &self,
63 repo: &String,
64 run_id: &String,
65 label: &String,
66 kind: &commands::WorkflowKind,
67 no_duplicate: bool,
68 title: &String,
69 ) -> Result<()> {
70 log::debug!(
71 "Creating issue from:\n\
72 \trepo: {repo}\n\
73 \trun_id: {run_id}\n\
74 \tlabel: {label}\n\
75 \tkind: {kind}\n\
76 \tno_duplicate: {no_duplicate}\n\
77 \ttitle: {title}",
78 );
79 let (owner, repo) = repo_to_owner_repo_fragments(repo)?;
80 let run_url = repo_url_to_run_url(&format!("github.com/{owner}/{repo}"), run_id);
81 let run_id: u64 = run_id.parse()?;
82
83 let workflow_run = self.workflow_run(&owner, &repo, RunId(run_id)).await?;
84 log::debug!("{workflow_run:?}");
85
86 if workflow_run.conclusion != Some("failure".to_string()) {
87 log::info!(
88 "Workflow run didn't fail, but has conclusion: {:?}. Continuing...",
89 workflow_run.conclusion
90 );
91 }
92
93 let mut jobs = self.workflow_run_jobs(&owner, &repo, RunId(run_id)).await?;
94 log::info!("Got {} job(s) for the workflow run", jobs.len());
95 if jobs.is_empty() {
96 bail!("No jobs found for the workflow run");
97 }
98
99 let max_attempt = jobs
101 .iter()
102 .max_by_key(|job| job.run_attempt)
103 .unwrap()
104 .run_attempt;
105 jobs.retain(|job| job.run_attempt == max_attempt);
106
107 let jobs = jobs; let failed_jobs = jobs
110 .iter()
111 .filter(|job| job.conclusion == Some(Conclusion::Failure))
112 .collect::<Vec<_>>();
113
114 log::info!(
115 "Found {} failed job(s): {}",
116 failed_jobs.len(),
117 failed_jobs
118 .iter()
119 .map(|j| j.name.as_str())
120 .collect::<Vec<_>>()
121 .join(", ")
122 );
123
124 let failed_steps = failed_jobs
125 .iter()
126 .flat_map(|job| job.steps.iter())
127 .filter(|step| step.conclusion == Some(Conclusion::Failure))
128 .collect::<Vec<_>>();
129 log::info!(
130 "Found {} failed step(s): {}",
131 failed_steps.len(),
132 failed_steps
133 .iter()
134 .map(|s| s.name.as_str())
135 .collect::<Vec<_>>()
136 .join(", ")
137 );
138 failed_steps.iter().for_each(|step| {
139 log::debug!("{step:?}");
140 });
141
142 let logs = self
143 .download_workflow_run_logs(&owner, &repo, RunId(run_id))
144 .await?;
145 log::info!("Downloaded {} logs", logs.len());
146 log::info!(
147 "Log names sorted by timestamp:\n{logs}",
148 logs = logs
149 .iter()
150 .map(|log| log.name.as_str())
151 .collect::<Vec<_>>()
152 .join("\n")
153 );
154 logs.iter().for_each(|log| {
155 log::debug!("{log:?}");
156 });
157
158 let job_error_logs: Vec<JobErrorLog> = job_error_logs_from_log_and_failed_jobs_and_steps(
159 &logs,
160 failed_jobs.as_slice(),
161 &failed_steps,
162 );
163
164 util::log_info_downloaded_job_error_logs(&job_error_logs);
165
166 let failed_jobs = job_error_logs
169 .iter()
170 .map(|job| {
171 let job_id_str = job.job_id.to_string();
172 let job_url = run_url_to_job_url(&run_url, &job_id_str);
173 let continuous_errorlog_msgs = job.logs_as_str();
174 let first_failed_step: FirstFailedStep = match job.failed_step_logs.first() {
175 Some(first_failed_step_log) => {
176 FirstFailedStep::StepName(first_failed_step_log.step_name.to_owned())
177 }
178 None => FirstFailedStep::NoStepsExecuted,
181 };
182 let parsed_msg = parse_error_message(&continuous_errorlog_msgs, *kind).unwrap();
183 FailedJob::new(
184 job.job_name.to_owned(),
185 job_id_str,
186 job_url,
187 first_failed_step,
188 parsed_msg,
189 )
190 })
191 .collect();
192
193 let mut issue = issue::Issue::new(
194 title.to_owned(),
195 run_id.to_string(),
196 run_url,
197 failed_jobs,
198 label.to_owned(),
199 );
200 log::debug!("generic issue instance: {issue:?}");
201 if no_duplicate {
203 log::info!("No-duplicate flag is set, checking for similar issues");
204 let open_issues = self
206 .issues_at(
207 &owner,
208 &repo,
209 DateFilter::None,
210 State::Open,
211 LabelFilter::All([label]),
212 )
213 .await?;
214 log::info!(
215 "Found {num_issues} open issue(s) with label {label}",
216 num_issues = open_issues.len()
217 );
218 let min_distance = distance_to_other_issues(&issue.body(), &open_issues);
219 log::info!("Minimum distance to similar issue: {min_distance}");
220 match min_distance {
221 0 => {
222 log::warn!("An issue with the exact same body already exists. Exiting...");
223 return Ok(());
224 }
225 _ if min_distance < issue::similarity::LEVENSHTEIN_THRESHOLD => {
226 log::warn!("An issue with a similar body already exists. Exiting...");
227 return Ok(());
228 }
229 _ => log::info!("No similar issue found. Continuing..."),
230 }
231 }
232
233 let all_labels = self.get_all_labels(&owner, &repo).await?;
235 log::info!("Got {num_labels} label(s)", num_labels = all_labels.len());
236 let labels_to_create: Vec<String> = issue
237 .labels()
238 .iter()
239 .filter(|label| !all_labels.iter().any(|l| l.name.eq(*label)))
240 .cloned()
241 .collect();
242 if !labels_to_create.is_empty() {
243 log::info!(
244 "{} label(s) determined for the issue-to-be-created do not yet exist on the repo, and will be created: {labels_to_create:?}",
245 labels_to_create.len()
246 );
247 }
248
249 if Config::global().dry_run() {
251 println!("####################################");
253 println!("DRY RUN MODE! The following issue would be created:");
254 println!("==== ISSUE TITLE ==== \n{}", issue.title());
255 println!("==== ISSUE LABEL(S) ==== \n{}", issue.labels().join(","));
256 println!("==== START OF ISSUE BODY ==== \n{}", issue.body());
257 println!("==== END OF ISSUE BODY ====");
258 } else {
259 for issue_label in labels_to_create {
261 log::info!("Creating label: {issue_label}");
262 self.client
263 .issues(&owner, &repo)
264 .create_label(issue_label, "FF0000", "")
265 .await?; }
267 self.create_issue(&owner, &repo, issue).await?;
268 }
269
270 Ok(())
271 }
272
273 pub async fn open_issues(&self, owner: &str, repo: &str) -> Result<Vec<Issue>> {
274 self.issues(
275 owner,
276 repo,
277 State::Open,
278 DateFilter::None,
279 LabelFilter::none(),
280 )
281 .await
282 }
283
284 pub async fn issues_at<I, S>(
285 &self,
286 owner: &str,
287 repo: &str,
288 date: DateFilter,
289 state: State,
290 labels: LabelFilter<I, S>,
291 ) -> Result<Vec<Issue>>
292 where
293 S: AsRef<str> + fmt::Display + fmt::Debug,
294 I: IntoIterator<Item = S> + Clone + fmt::Debug,
295 {
296 log::debug!("Getting issues for {owner}/{repo} with date={date:?}, state={state:?}, labels={labels:?}");
297 self.issues(owner, repo, state, date, labels).await
298 }
299
300 pub async fn create_issue(
302 &self,
303 owner: &str,
304 repo: &str,
305 mut issue: issue::Issue,
306 ) -> Result<()> {
307 let body_str = issue.body();
308 log::debug!(
309 "Creating issue for {owner}/{repo} with\n\
310 \ttitle: {title}\n\
311 \tlabels: {labels:?}\n\
312 \tbody: {body}",
313 title = issue.title(),
314 body = body_str,
315 labels = issue.labels()
316 );
317 if issue.body().len() > 65536 {
319 log::error!(
320 "Issue body is too long: {len} characters. Maximum for GitHub issues is 65536. Exiting...",
321 len = issue.body().len()
322 );
323 bail!("Issue body is too long");
324 }
325
326 self.client
327 .issues(owner, repo)
328 .create(issue.title())
329 .body(issue.body())
330 .labels(issue.labels().to_vec())
331 .send()
332 .await?;
333 Ok(())
334 }
335
336 async fn issues<I, S>(
338 &self,
339 owner: &str,
340 repo: &str,
341 state: State,
342 date: DateFilter,
343 labels: LabelFilter<I, S>,
344 ) -> Result<Vec<Issue>>
345 where
346 S: AsRef<str> + fmt::Display + fmt::Debug,
347 I: IntoIterator<Item = S> + Clone,
348 {
349 let label_filter = labels.to_string();
350
351 let date_filter = date.to_string();
352
353 let issue_state = match state {
354 State::Open => "is:open",
355 State::Closed => "is:closed",
356 State::All => "",
357 _ => bail!("Invalid state"),
358 };
359
360 let query_str =
361 format!("repo:{owner}/{repo} is:issue {issue_state} {date_filter} {label_filter}");
362 log::debug!("Query string={query_str}");
363 let issues = self
364 .client
365 .search()
366 .issues_and_pull_requests(&query_str)
367 .send()
368 .await?;
369
370 Ok(issues.items)
371 }
372
373 pub async fn get_all_labels(&self, owner: &str, repo: &str) -> Result<Vec<Label>> {
374 let label_page = self
375 .client
376 .issues(owner, repo)
377 .list_labels_for_repo()
378 .send()
379 .await?;
380 Ok(label_page.items)
381 }
382
383 pub async fn workflow_run(&self, owner: &str, repo: &str, run_id: RunId) -> Result<Run> {
384 log::debug!("Getting workflow run {run_id} for {owner}/{repo}");
385 let run = self.client.workflows(owner, repo).get(run_id).await?;
386 Ok(run)
387 }
388
389 pub async fn workflow_run_jobs(
390 &self,
391 owner: &str,
392 repo: &str,
393 run_id: RunId,
394 ) -> Result<Vec<Job>> {
395 log::debug!("Getting workflow run jobs for {run_id} for {owner}/{repo}");
396 let jobs = self
397 .client
398 .workflows(owner, repo)
399 .list_jobs(run_id)
400 .page(1u8)
401 .filter(Filter::All)
402 .send()
403 .await?;
404 Ok(jobs.items)
405 }
406
407 pub async fn download_job_logs(&self, owner: &str, repo: &str, job_id: u64) -> Result<String> {
414 use http_body_util::BodyExt;
415 use hyper::Uri;
416 log::debug!("Downloading logs for job {job_id} for {owner}/{repo}");
417 let route = format!("/repos/{owner}/{repo}/actions/jobs/{job_id}/logs");
422 let uri = Uri::builder().path_and_query(route).build()?;
423 let data_response = self
425 .client
426 .follow_location_to_data(self.client._get(uri).await?)
427 .await?;
428 let boxbody = data_response.into_body();
429 let body_bytes = BodyExt::collect(boxbody).await?.to_bytes().to_vec();
431 log::debug!("Downloaded {} bytes", body_bytes.len());
432 let body_str = String::from_utf8_lossy(&body_bytes).to_string();
433 Ok(body_str)
434 }
435
436 pub async fn download_workflow_run_logs(
442 &self,
443 owner: &str,
444 repo: &str,
445 run_id: RunId,
446 ) -> Result<Vec<JobLog>> {
447 log::debug!("Downloading logs for {run_id} for {owner}/{repo}");
448 let logs_zip = self
449 .client
450 .actions()
451 .download_workflow_run_logs(owner, repo, run_id)
452 .await?;
453
454 log::debug!("Downloaded logs: {} bytes", logs_zip.len());
455 let zip_reader = io::Cursor::new(logs_zip);
456 let mut archive = zip::ZipArchive::new(zip_reader)?;
457
458 log::info!(
459 "Extracting {} log(s) from downloaded zip archive",
460 archive.len()
461 );
462
463 let mut logs = Vec::new();
464 for i in 0..archive.len() {
465 let mut file = archive.by_index(i)?;
466 log::info!("Extracting file: {} | size={}", file.name(), file.size());
467 if file.size() == 0 {
468 log::debug!("Skipping empty file: {}", file.name());
469 continue;
470 }
471
472 let mut contents = String::with_capacity(1024);
473 file.read_to_string(&mut contents)?;
474 logs.push(JobLog::new(file.name().to_string(), contents));
475 }
476
477 log::debug!("Extracted logs: {} characters", logs.len());
478 log::trace!("{logs:?}");
479
480 logs.sort_unstable_by(|a, b| {
482 let a = timestamp_from_log(&a.content).unwrap();
483 let b = timestamp_from_log(&b.content).unwrap();
484 a.cmp(&b)
485 });
486
487 Ok(logs)
488 }
489}
490
491#[cfg(test)]
492mod tests {
493 use super::*;
494 use octocrab::models::workflows::Conclusion;
495 use pretty_assertions::{assert_eq, assert_ne};
496
497 #[tokio::test]
498 async fn test_get_issues() {
499 let issues = GitHub::get()
500 .issues_at(
501 "docker",
502 "buildx",
503 DateFilter::Created(Date {
504 year: 2019,
505 month: 6,
506 day: 2,
507 }),
508 State::Closed,
509 LabelFilter::none(),
510 )
511 .await
512 .unwrap();
513 assert_eq!(issues.len(), 1);
514 assert_eq!(issues[0].title, "Building for ARM causes error often");
515 assert_eq!(issues[0].number, 88);
516 }
517
518 #[tokio::test]
519 async fn test_get_issues_by_label() {
520 let issues = GitHub::get()
521 .issues(
522 "docker",
523 "buildx",
524 State::Open,
525 DateFilter::None,
526 LabelFilter::All(["kind/bug", "area/bake"]),
527 )
528 .await
529 .unwrap();
530 println!("{}", issues.len());
531 assert_ne!(issues.len(), 0);
532 }
533
534 #[tokio::test]
535 async fn test_get_workflow_run() {
536 let run = GitHub::get()
537 .workflow_run("gregerspoulsen", "artisan_tools", RunId(8172341325))
538 .await
539 .unwrap();
540 assert_eq!(run.id, RunId(8172341325));
542 assert_eq!(run.status, "completed");
543 }
544
545 #[tokio::test]
546 async fn test_get_workflow_failed_run() {
547 let run = GitHub::get()
548 .workflow_run("gregerspoulsen", "artisan_tools", RunId(8172179418))
549 .await
550 .unwrap();
551 println!("{run:?}");
552 assert_eq!(run.id, RunId(8172179418));
553 assert_eq!(run.status, "completed");
554 assert_eq!(run.conclusion, Some("failure".to_string()));
555 }
556
557 #[tokio::test]
558 #[ignore = "Needs a valid GITHUB_TOKEN with read access to public repos"]
559 async fn test_get_workflow_run_jobs() {
560 let jobs = GitHub::get()
561 .workflow_run_jobs("gregerspoulsen", "artisan_tools", RunId(8172179418))
562 .await
563 .unwrap();
564 assert_eq!(jobs.len(), 1);
565 assert_eq!(jobs[0].conclusion, Some(Conclusion::Failure));
566 assert_eq!(jobs[0].steps.len(), 5);
567 assert_eq!(jobs[0].steps[0].name, "Set up job");
568
569 let failed_jobs = jobs
570 .iter()
571 .filter(|job| job.conclusion == Some(Conclusion::Failure))
572 .collect::<Vec<_>>();
573 let failed_steps = failed_jobs[0]
574 .steps
575 .iter()
576 .filter(|step| step.conclusion == Some(Conclusion::Failure))
577 .collect::<Vec<_>>();
578 assert_eq!(failed_steps.len(), 1);
579 assert_eq!(failed_steps[0].name, "Run tests");
580 }
581
582 #[tokio::test]
583 #[ignore = "Might fail when running with `cargo test` (If another test sets the GITHUB_TOKEN env var)"]
584 async fn test_download_workflow_run_logs() {
585 let owner = "docker";
586 let repo = "buildx";
587 let run_id = RunId(8302026485);
588 GitHub::init().unwrap();
589 let logs = GitHub::get()
590 .download_workflow_run_logs(owner, repo, run_id)
591 .await
592 .unwrap();
593 for log in &logs {
594 eprintln!("{}\n{}", log.name, log.content);
595 }
596 assert_eq!(logs.len(), 37);
597 }
598}