1use anyhow::{Context, Result, anyhow};
2use chrono::{DateTime, NaiveDate, Utc};
3use reqwest::blocking::Client;
4use serde::de::DeserializeOwned;
5use serde::{Deserialize, Serialize};
6use shiplog_cache::{ApiCache, CacheKey};
7use shiplog_coverage::{day_windows, month_windows, week_windows, window_len_days};
8use shiplog_ids::{EventId, RunId};
9use shiplog_ports::{IngestOutput, Ingestor};
10use shiplog_schema::coverage::{Completeness, CoverageManifest, CoverageSlice, TimeWindow};
11use shiplog_schema::event::{
12 Actor, EventEnvelope, EventKind, EventPayload, Link, PullRequestEvent, PullRequestState,
13 RepoRef, RepoVisibility, ReviewEvent, SourceRef, SourceSystem,
14};
15use std::path::PathBuf;
16use std::thread::sleep;
17use std::time::Duration;
18use url::Url;
19
20#[derive(Debug)]
21pub struct GithubIngestor {
22 pub user: String,
23 pub since: NaiveDate,
24 pub until: NaiveDate,
25 pub mode: String,
27 pub include_reviews: bool,
28 pub fetch_details: bool,
29 pub throttle_ms: u64,
30 pub token: Option<String>,
31 pub api_base: String,
33 pub cache: Option<ApiCache>,
35}
36
37impl GithubIngestor {
38 pub fn new(user: String, since: NaiveDate, until: NaiveDate) -> Self {
39 Self {
40 user,
41 since,
42 until,
43 mode: "merged".to_string(),
44 include_reviews: false,
45 fetch_details: true,
46 throttle_ms: 0,
47 token: None,
48 api_base: "https://api.github.com".to_string(),
49 cache: None,
50 }
51 }
52
53 pub fn with_cache(mut self, cache_dir: impl Into<PathBuf>) -> Result<Self> {
55 let cache_path = cache_dir.into().join("github-api-cache.db");
56 let cache = ApiCache::open(cache_path)?;
57 self.cache = Some(cache);
58 Ok(self)
59 }
60
61 pub fn with_in_memory_cache(mut self) -> Result<Self> {
63 let cache = ApiCache::open_in_memory()?;
64 self.cache = Some(cache);
65 Ok(self)
66 }
67
68 fn html_base_url(&self) -> String {
69 if let Ok(u) = Url::parse(&self.api_base) {
70 let scheme = u.scheme();
71 if let Some(host) = u.host_str() {
72 if host == "api.github.com" {
73 return "https://github.com".to_string();
74 }
75 let port_suffix = u.port().map(|p| format!(":{p}")).unwrap_or_default();
76 return format!("{scheme}://{host}{port_suffix}");
77 }
78 }
79 "https://github.com".to_string()
80 }
81
82 fn client(&self) -> Result<Client> {
83 Client::builder()
84 .user_agent(concat!("shiplog/", env!("CARGO_PKG_VERSION")))
85 .build()
86 .context("build reqwest client")
87 }
88
89 fn api_url(&self, path: &str) -> String {
90 format!("{}{}", self.api_base.trim_end_matches('/'), path)
91 }
92
93 fn throttle(&self) {
94 if self.throttle_ms > 0 {
95 sleep(Duration::from_millis(self.throttle_ms));
96 }
97 }
98
99 fn get_json<T: DeserializeOwned>(
100 &self,
101 client: &Client,
102 url: &str,
103 params: &[(&str, String)],
104 ) -> Result<T> {
105 let url_with_params = if params.is_empty() {
107 url.to_string()
108 } else {
109 let query: Vec<String> = params.iter().map(|(k, v)| format!("{}={}", k, v)).collect();
110 format!("{}?{}", url, query.join("&"))
111 };
112
113 let mut req = client
114 .get(&url_with_params)
115 .header("Accept", "application/vnd.github+json");
116 req = req.header("X-GitHub-Api-Version", "2022-11-28");
117 if let Some(t) = &self.token {
118 req = req.bearer_auth(t);
119 }
120 let resp = req
121 .send()
122 .with_context(|| format!("GET {url_with_params}"))?;
123 self.throttle();
124
125 if !resp.status().is_success() {
126 let status = resp.status();
127 let body = resp.text().unwrap_or_default();
128 return Err(anyhow!("GitHub API error {status}: {body}"));
129 }
130
131 resp.json::<T>()
132 .with_context(|| format!("parse json from {url_with_params}"))
133 }
134}
135
136impl Ingestor for GithubIngestor {
137 fn ingest(&self) -> Result<IngestOutput> {
138 if self.since >= self.until {
139 return Err(anyhow!("since must be < until"));
140 }
141
142 let client = self.client()?;
143 let run_id = RunId::now("shiplog");
144 let mut slices: Vec<CoverageSlice> = Vec::new();
145 let mut warnings: Vec<String> = Vec::new();
146 let mut completeness = Completeness::Complete;
147
148 let mut events: Vec<EventEnvelope> = Vec::new();
149
150 let pr_query_builder = |w: &TimeWindow| self.build_pr_query(w);
152 let (pr_items, pr_slices, pr_partial) =
153 self.collect_search_items(&client, pr_query_builder, self.since, self.until, "prs")?;
154 slices.extend(pr_slices);
155 if pr_partial {
156 completeness = Completeness::Partial;
157 }
158
159 events.extend(self.items_to_pr_events(&client, pr_items)?);
160
161 if self.include_reviews {
163 warnings.push("Reviews are collected via search + per-PR review fetch; treat as best-effort coverage.".to_string());
164 let review_query_builder = |w: &TimeWindow| self.build_reviewed_query(w);
165 let (review_items, review_slices, review_partial) = self.collect_search_items(
166 &client,
167 review_query_builder,
168 self.since,
169 self.until,
170 "reviews",
171 )?;
172 slices.extend(review_slices);
173 if review_partial {
174 completeness = Completeness::Partial;
175 }
176 events.extend(self.items_to_review_events(&client, review_items)?);
177 }
178
179 events.sort_by_key(|e| e.occurred_at);
181
182 let cov = CoverageManifest {
183 run_id,
184 generated_at: Utc::now(),
185 user: self.user.clone(),
186 window: TimeWindow {
187 since: self.since,
188 until: self.until,
189 },
190 mode: self.mode.clone(),
191 sources: vec!["github".to_string()],
192 slices,
193 warnings,
194 completeness,
195 };
196
197 Ok(IngestOutput {
198 events,
199 coverage: cov,
200 })
201 }
202}
203
204impl GithubIngestor {
205 fn build_pr_query(&self, w: &TimeWindow) -> String {
206 let (start, end) = github_inclusive_range(w);
207 match self.mode.as_str() {
208 "created" => format!("is:pr author:{} created:{}..{}", self.user, start, end),
209 _ => format!(
210 "is:pr is:merged author:{} merged:{}..{}",
211 self.user, start, end
212 ),
213 }
214 }
215
216 fn build_reviewed_query(&self, w: &TimeWindow) -> String {
217 let (start, end) = github_inclusive_range(w);
220 format!("is:pr reviewed-by:{} updated:{}..{}", self.user, start, end)
221 }
222
223 fn collect_search_items<F>(
230 &self,
231 client: &Client,
232 make_query: F,
233 since: NaiveDate,
234 until: NaiveDate,
235 label: &str,
236 ) -> Result<(Vec<SearchIssueItem>, Vec<CoverageSlice>, bool)>
237 where
238 F: Fn(&TimeWindow) -> String,
239 {
240 let mut slices: Vec<CoverageSlice> = Vec::new();
241 let mut items: Vec<SearchIssueItem> = Vec::new();
242 let mut partial = false;
243
244 for w in month_windows(since, until) {
245 let (mut i, mut s, p) =
246 self.collect_window(client, &make_query, &w, Granularity::Month, label)?;
247 items.append(&mut i);
248 slices.append(&mut s);
249 partial |= p;
250 }
251
252 Ok((items, slices, partial))
253 }
254
255 fn collect_window<F>(
256 &self,
257 client: &Client,
258 make_query: &F,
259 window: &TimeWindow,
260 gran: Granularity,
261 label: &str,
262 ) -> Result<(Vec<SearchIssueItem>, Vec<CoverageSlice>, bool)>
263 where
264 F: Fn(&TimeWindow) -> String,
265 {
266 if window.since >= window.until {
267 return Ok((vec![], vec![], false));
268 }
269
270 let query = make_query(window);
271 let (meta_total, meta_incomplete) = self.search_meta(client, &query)?;
272 let mut slices = vec![CoverageSlice {
273 window: window.clone(),
274 query: query.clone(),
275 total_count: meta_total,
276 fetched: 0,
277 incomplete_results: Some(meta_incomplete),
278 notes: vec![format!("probe:{label}")],
279 }];
280
281 let need_subdivide = meta_total > 1000 || meta_incomplete;
283 let can_subdivide = gran != Granularity::Day && window_len_days(window) > 1;
284
285 if need_subdivide && can_subdivide {
286 slices[0].notes.push(format!(
287 "subdivide:{}",
288 if meta_total > 1000 {
289 "cap"
290 } else {
291 "incomplete"
292 }
293 ));
294
295 let mut out_items = Vec::new();
296 let mut out_slices = slices;
297 let mut partial = false;
298
299 let subs = match gran {
300 Granularity::Month => week_windows(window.since, window.until),
301 Granularity::Week => day_windows(window.since, window.until),
302 Granularity::Day => vec![],
303 };
304
305 for sub in subs {
306 let (mut i, mut s, p) =
307 self.collect_window(client, make_query, &sub, gran.next(), label)?;
308 out_items.append(&mut i);
309 out_slices.append(&mut s);
310 partial |= p;
311 }
312 return Ok((out_items, out_slices, partial));
313 }
314
315 let mut partial = false;
317 if meta_total > 1000 || meta_incomplete {
318 partial = true;
319 slices[0]
320 .notes
321 .push("partial:unresolvable_at_this_granularity".to_string());
322 }
323
324 let fetched_items = self.fetch_all_search_items(client, &query)?;
325 let fetched = fetched_items.len() as u64;
326
327 slices.push(CoverageSlice {
329 window: window.clone(),
330 query: query.clone(),
331 total_count: meta_total,
332 fetched,
333 incomplete_results: Some(meta_incomplete),
334 notes: vec![format!("fetch:{label}")],
335 });
336
337 Ok((fetched_items, slices, partial))
338 }
339
340 fn search_meta(&self, client: &Client, q: &str) -> Result<(u64, bool)> {
341 let url = self.api_url("/search/issues");
342 let resp: SearchResponse<SearchIssueItem> = self.get_json(
343 client,
344 &url,
345 &[
346 ("q", q.to_string()),
347 ("per_page", "1".to_string()),
348 ("page", "1".to_string()),
349 ],
350 )?;
351 Ok((resp.total_count, resp.incomplete_results))
352 }
353
354 fn fetch_all_search_items(&self, client: &Client, q: &str) -> Result<Vec<SearchIssueItem>> {
355 let url = self.api_url("/search/issues");
356 let mut out: Vec<SearchIssueItem> = Vec::new();
357 let per_page = 100;
358 let max_pages = 10; for page in 1..=max_pages {
360 let resp: SearchResponse<SearchIssueItem> = self.get_json(
361 client,
362 &url,
363 &[
364 ("q", q.to_string()),
365 ("per_page", per_page.to_string()),
366 ("page", page.to_string()),
367 ],
368 )?;
369 let items_len = resp.items.len();
370 out.extend(resp.items);
371 if out.len() as u64 >= resp.total_count.min(1000) {
372 break;
373 }
374 if items_len < per_page {
375 break;
376 }
377 }
378 Ok(out)
379 }
380
381 fn items_to_pr_events(
382 &self,
383 client: &Client,
384 items: Vec<SearchIssueItem>,
385 ) -> Result<Vec<EventEnvelope>> {
386 let mut out = Vec::new();
387 for item in items {
388 if let Some(pr_ref) = &item.pull_request {
389 let html_base = self.html_base_url();
390 let (repo_full_name, repo_html_url) =
391 repo_from_repo_url(&item.repository_url, &html_base);
392
393 let (title, created_at, merged_at, additions, deletions, changed_files, visibility) =
394 if self.fetch_details {
395 match self.fetch_pr_details(client, &pr_ref.url) {
396 Ok(d) => {
397 let vis = if d.base.repo.private_field {
398 RepoVisibility::Private
399 } else {
400 RepoVisibility::Public
401 };
402 (
403 d.title,
404 d.created_at,
405 d.merged_at,
406 Some(d.additions),
407 Some(d.deletions),
408 Some(d.changed_files),
409 vis,
410 )
411 }
412 Err(_) => {
413 (
415 item.title.clone(),
416 item.created_at.unwrap_or_else(Utc::now),
417 None,
418 None,
419 None,
420 None,
421 RepoVisibility::Unknown,
422 )
423 }
424 }
425 } else {
426 (
427 item.title.clone(),
428 item.created_at.unwrap_or_else(Utc::now),
429 None,
430 None,
431 None,
432 None,
433 RepoVisibility::Unknown,
434 )
435 };
436
437 let occurred_at = match self.mode.as_str() {
438 "created" => created_at,
439 _ => merged_at.unwrap_or(created_at),
440 };
441
442 let state = if merged_at.is_some() {
443 PullRequestState::Merged
444 } else {
445 PullRequestState::Unknown
446 };
447
448 let id = EventId::from_parts([
449 "github",
450 "pr",
451 &repo_full_name,
452 &item.number.to_string(),
453 ]);
454
455 let ev = EventEnvelope {
456 id,
457 kind: EventKind::PullRequest,
458 occurred_at,
459 actor: Actor {
460 login: self.user.clone(),
461 id: None,
462 },
463 repo: RepoRef {
464 full_name: repo_full_name,
465 html_url: Some(repo_html_url),
466 visibility,
467 },
468 payload: EventPayload::PullRequest(PullRequestEvent {
469 number: item.number,
470 title,
471 state,
472 created_at,
473 merged_at,
474 additions,
475 deletions,
476 changed_files,
477 touched_paths_hint: vec![],
478 window: None,
479 }),
480 tags: vec![],
481 links: vec![Link {
482 label: "pr".into(),
483 url: item.html_url.clone(),
484 }],
485 source: SourceRef {
486 system: SourceSystem::Github,
487 url: Some(pr_ref.url.clone()),
488 opaque_id: Some(item.id.to_string()),
489 },
490 };
491
492 out.push(ev);
493 }
494 }
495 Ok(out)
496 }
497
498 fn items_to_review_events(
499 &self,
500 client: &Client,
501 items: Vec<SearchIssueItem>,
502 ) -> Result<Vec<EventEnvelope>> {
503 let mut out = Vec::new();
504 for item in items {
505 let Some(pr_ref) = &item.pull_request else {
506 continue;
507 };
508 let html_base = self.html_base_url();
509 let (repo_full_name, repo_html_url) =
510 repo_from_repo_url(&item.repository_url, &html_base);
511
512 let reviews = self.fetch_pr_reviews(client, &pr_ref.url)?;
514 for r in reviews {
515 if r.user.login != self.user {
516 continue;
517 }
518 let submitted = match r.submitted_at {
519 Some(s) => s,
520 None => continue,
521 };
522 let submitted_date = submitted.date_naive();
523 if submitted_date < self.since || submitted_date >= self.until {
524 continue;
525 }
526
527 let id = EventId::from_parts([
528 "github",
529 "review",
530 &repo_full_name,
531 &item.number.to_string(),
532 &r.id.to_string(),
533 ]);
534
535 let ev = EventEnvelope {
536 id,
537 kind: EventKind::Review,
538 occurred_at: submitted,
539 actor: Actor {
540 login: self.user.clone(),
541 id: None,
542 },
543 repo: RepoRef {
544 full_name: repo_full_name.clone(),
545 html_url: Some(repo_html_url.clone()),
546 visibility: RepoVisibility::Unknown,
547 },
548 payload: EventPayload::Review(ReviewEvent {
549 pull_number: item.number,
550 pull_title: item.title.clone(),
551 submitted_at: submitted,
552 state: r.state,
553 window: None,
554 }),
555 tags: vec![],
556 links: vec![Link {
557 label: "pr".into(),
558 url: item.html_url.clone(),
559 }],
560 source: SourceRef {
561 system: SourceSystem::Github,
562 url: Some(pr_ref.url.clone()),
563 opaque_id: Some(r.id.to_string()),
564 },
565 };
566
567 out.push(ev);
568 }
569 }
570 Ok(out)
571 }
572
573 fn fetch_pr_details(&self, client: &Client, pr_api_url: &str) -> Result<PullRequestDetails> {
574 let cache_key = CacheKey::pr_details(pr_api_url);
576 #[allow(clippy::collapsible_if)]
577 if let Some(ref cache) = self.cache {
578 if let Some(cached) = cache.get::<PullRequestDetails>(&cache_key)? {
579 return Ok(cached);
580 }
581 }
582
583 let details: PullRequestDetails = self.get_json(client, pr_api_url, &[])?;
585
586 if let Some(ref cache) = self.cache {
588 cache.set(&cache_key, &details)?;
589 }
590
591 Ok(details)
592 }
593
594 fn fetch_pr_reviews(
595 &self,
596 client: &Client,
597 pr_api_url: &str,
598 ) -> Result<Vec<PullRequestReview>> {
599 let url = format!("{pr_api_url}/reviews");
600 let mut out = Vec::new();
601 let per_page = 100;
602 for page in 1..=10 {
603 let cache_key = CacheKey::pr_reviews(pr_api_url, page);
604
605 let page_reviews: Vec<PullRequestReview> = if let Some(ref cache) = self.cache {
607 if let Some(cached) = cache.get::<Vec<PullRequestReview>>(&cache_key)? {
608 cached
609 } else {
610 let reviews: Vec<PullRequestReview> = self.get_json(
612 client,
613 &url,
614 &[
615 ("per_page", per_page.to_string()),
616 ("page", page.to_string()),
617 ],
618 )?;
619 cache.set(&cache_key, &reviews)?;
621 reviews
622 }
623 } else {
624 self.get_json(
626 client,
627 &url,
628 &[
629 ("per_page", per_page.to_string()),
630 ("page", page.to_string()),
631 ],
632 )?
633 };
634
635 let n = page_reviews.len();
636 out.extend(page_reviews);
637 if n < per_page {
638 break;
639 }
640 }
641 Ok(out)
642 }
643}
644
645#[derive(Copy, Clone, Debug, PartialEq, Eq)]
646enum Granularity {
647 Month,
648 Week,
649 Day,
650}
651
652impl Granularity {
653 fn next(&self) -> Granularity {
654 match self {
655 Granularity::Month => Granularity::Week,
656 Granularity::Week => Granularity::Day,
657 Granularity::Day => Granularity::Day,
658 }
659 }
660}
661
662fn github_inclusive_range(w: &TimeWindow) -> (String, String) {
663 let start = w.since.format("%Y-%m-%d").to_string();
664 let end_date = w.until.pred_opt().unwrap_or(w.until);
665 let end = end_date.format("%Y-%m-%d").to_string();
666 (start, end)
667}
668
669fn repo_from_repo_url(repo_api_url: &str, html_base: &str) -> (String, String) {
670 #[allow(clippy::collapsible_if)]
671 if let Ok(u) = Url::parse(repo_api_url) {
672 if let Some(segs) = u.path_segments() {
673 let v: Vec<&str> = segs.collect();
674 if v.len() >= 3 && v[0] == "repos" {
675 let owner = v[1];
676 let repo = v[2];
677 let full = format!("{}/{}", owner, repo);
678 let html = format!("{}/{}/{}", html_base.trim_end_matches('/'), owner, repo);
679 return (full, html);
680 }
681 }
682 }
683 ("unknown/unknown".to_string(), html_base.to_string())
684}
685
686#[derive(Debug, Deserialize)]
688struct SearchResponse<T> {
689 total_count: u64,
690 incomplete_results: bool,
691 items: Vec<T>,
692}
693
694#[derive(Debug, Deserialize)]
695struct SearchIssueItem {
696 id: u64,
697 number: u64,
698 title: String,
699 html_url: String,
700 repository_url: String,
701 pull_request: Option<SearchPullRequestRef>,
702
703 created_at: Option<DateTime<Utc>>,
705}
706
707#[derive(Debug, Deserialize)]
708struct SearchPullRequestRef {
709 url: String,
710}
711
712#[derive(Debug, Deserialize, Serialize, Clone)]
713struct PullRequestDetails {
714 title: String,
715 created_at: DateTime<Utc>,
716 merged_at: Option<DateTime<Utc>>,
717 additions: u64,
718 deletions: u64,
719 changed_files: u64,
720 base: PullBase,
721}
722
723#[derive(Debug, Deserialize, Serialize, Clone)]
724struct PullBase {
725 repo: PullRepo,
726}
727
728#[derive(Debug, Deserialize, Serialize, Clone)]
729struct PullRepo {
730 full_name: String,
731 html_url: String,
732 #[serde(rename = "private")]
733 private_field: bool,
734}
735
736#[derive(Debug, Deserialize, Serialize, Clone)]
737struct PullRequestReview {
738 id: u64,
739 state: String,
740 submitted_at: Option<DateTime<Utc>>,
741 user: ReviewUser,
742}
743
744#[derive(Debug, Deserialize, Serialize, Clone)]
745struct ReviewUser {
746 login: String,
747}